arrow_back

Dataflow によるサーバーレス データ処理 - 分岐するパイプライン(Python)

ログイン 参加
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

Dataflow によるサーバーレス データ処理 - 分岐するパイプライン(Python)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

概要

このラボでは、次の作業を行います。

  • ブランチがあるパイプラインを実装する
  • 書き込む前にデータをフィルタする
  • カスタム コマンドライン パラメータをパイプラインに追加する

前提条件:

  • Python に関する基本的な知識

前のラボでは、基本的な抽出、変換、読み込みの連続的なパイプラインを作成し、対応する Dataflow テンプレートを使用して Google Cloud Storage のバッチ データ ストレージを取り込みました。このパイプラインは、以下に示す変換のシーケンスで構成されています。

入力、変換、PCollection、変換、PCollection、変換、出力の順序で流れる要素を持つパイプライン フロー図。

しかし多くの場合、パイプラインの構造はこのように単純ではありません。このラボでは、より高度な連続的でないパイプラインを構築します。

今回のユースケースではリソース消費量を最適化します。プロダクトによってリソースの利用状況は異なります。また、一つの企業内でもすべてのデータが同じように使われるわけではなく、たとえば分析ワークロードで定期的にクエリされるデータもあれば、復元にのみ使用されるデータもあります。このラボでは、最初のラボで作成したパイプラインのリソース消費量を最適化するために、アナリストが使用するデータのみを BigQuery に保存し、他のデータは非常に低コストで耐久性の高いストレージ サービスである Google Cloud Storage の Coldline Storage にアーカイブします。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

Jupyter ノートブック ベースの開発環境の設定

このラボでは、すべてのコマンドをノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [ワークベンチ] をクリックします。

  2. Notebooks API を有効にします。

  3. ワークベンチのページで [新規作成] をクリックします。

  4. 表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを に、ゾーンを に設定します。

  5. [環境] で [Apache Beam] を選択します。

  6. ダイアログ ボックスの下部にある [作成] をクリックします。

注: 環境の完全なプロビジョニングには 3~5 分かかる場合があります。処理が完了するまでお待ちください。 注: [Notebook API を有効にする] をクリックして Notebooks API を有効にします。
  1. 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。

IDE_link

  1. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

ターミナルを開く

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

展開された [表示] メニューでハイライト表示されているエクスプローラ オプション

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。ファイルが開き、コードを追加または変更できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する

複数の変換が同じ PCollection を処理する

このラボでは、Google Cloud Storage と BigQuery の両方にデータを書き込む、分岐するパイプラインを作成します。

分岐するパイプラインを作成する方法の一つは、2 つの異なる変換を同じ PCollection に適用することにより、2 つの異なる PCollection を作成することです。

[PCollection1] = [Initial Input PCollection] | [A Transform] [PCollection2] = [Initial Input PCollection] | [A Different Transform]

分岐するパイプラインを実装する

このセクションや後のセクションでヒントが必要な場合は、Google Cloud training-data-analyst ページにあるソリューションを参照してください。

タスク 1. Cloud Storage に書き込むブランチを追加する

このタスクを完了するには、Cloud Storage に書き込むブランチを追加し、既存のパイプラインを変更します。

パイプラインの順序: 入力、変換、PCollection、変換、PCollection、変換、出力。PCollection の最初のインスタンスでブランチが始まった後、変換、出力の順に流れている。

適切なラボを開く

  • IDE 環境のターミナルで、次のコマンドを実行します。
# ディレクトリをラボに移動する cd 2_Branching_Pipelines/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. IDE 環境のターミナルで、次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. 最後に、Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習バージョンのコードを含むディレクトリに移動する cd $BASE_DIR
  1. IDE で my_pipeline.py を開きます。これは training-data-analyst/quests/dataflow_python/2_Branching_Pipelines/labs/ にあります。

  2. パイプラインの本体部分を定義している run() メソッドまで下にスクロールします。現在は次のような内容です。

(p | 'ReadFromGCS' >> beam.io.ReadFromText(input) | 'ParseJson' >> beam.Map(parse_json) | 'WriteToBQ' >> beam.io.WriteToBigQuery( output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) )
  1. 各要素が json から dict に変換される前に、textio.WriteToText を使用して Cloud Storage への書き込みを行う新しい分岐変換を追加して、このコードを変換します。

このセクションや後のセクションでヒントが必要な場合は、Google Cloud training-data-analyst ページにあるソリューションを参照してください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

タスク 2. フィールドでデータをフィルタする

この時点では、すべてのデータが 2 回保存されるため、新しいパイプラインでもリソース消費量はほとんど減りません。リソース消費量を改善するには、重複するデータの量を減らす必要があります。

Google Cloud Storage バケットは、アーカイブおよびバックアップ ストレージとして機能することを目的としているので、すべてのデータを保存することが重要になります。その一方で、BigQuery には必ずしもすべてのデータを送る必要はありません。

  1. たとえば、データ アナリストが頻繁に確認する対象が、ウェブサイトでユーザーがどのリソースにアクセスしているか、そしてそのアクセス パターンが地域と時間によってどのように異なるかであると仮定します。この場合、必要なフィールドはごく一部です。すでに JSON をパースして辞書型に変換しているので、pop メソッドを使用して容易に Python の呼び出し可能オブジェクトからフィールドを除くことができます。
def drop_field(element): element.pop('field_name') return element
  1. このタスクを完了するには、Python の呼び出し可能オブジェクトを beam.Map とともに使用して、アナリストが BigQuery で使用しないフィールド user_agent を除きます。

タスク 3. 要素でデータをフィルタする

Apache Beam にはフィルタリングの方法が数多くあります。Python 辞書形式の PCollection を取り扱っているため、最も簡単なのはブール値を返す関数である(匿名の)ラムダ関数をフィルタとして利用する方法で、beam.Filter とともに使用します。次に例を示します。

purchases | beam.Filter(lambda element : element['cost_cents'] > 20*100)
  • このタスクを完了するには、パイプラインに beam.Filter 変換を追加します。どのような条件でもフィルタできますが、たとえば num_bytes が 120 以上である行を除くということを試してみると良いでしょう。

タスク 4. カスタム コマンドライン パラメータを追加する

パイプラインには現在、入力のパスや BigQuery のテーブルの場所など、多くのパラメータがハードコードされていますが、Cloud Storage の任意の JSON ファイルを読み取ることができれば、パイプラインがさらに便利になります。この機能を追加するには、一連のコマンドライン パラメータへの追加が必要です。

現在は、コマンドライン引数の読み込みと解析に ArgumentParser を使用しています。そして、パイプライン作成時に指定した PipelineOptions() オブジェクトに引数を渡します。

parser = argparse.ArgumentParser(description='...') # 引数を定義して解析する options = PipelineOptions() # オプションからオプション値を設定する p = beam.Pipeline(options=options)

PipelineOptions を使用して、ArgumentParser で読み込んだオプションを解釈します。このパーサーに新しいコマンドライン引数を追加するには、以下の構文を使用します。

parser.add_argument('--argument_name', required=True, help='Argument description')

コードでコマンドライン パラメータにアクセスするには、引数を解析した結果できる辞書のフィールドを参照します。

opts = parser.parse_args() arg_value = opts.arg_name
  • このタスクを完了するには、入力パス、Google Cloud Storage 出力パス、BigQuery テーブル名を示すコマンドライン パラメータを追加して、定数の代わりにこれらのパラメータにアクセスするようにパイプライン コードを更新します。

タスク 5. パイプラインに null 値許容フィールドを追加する

お気づきかもしれませんが、前回のラボで作成した BigQuery テーブルには、すべてのフィールドを REQUIRED とする次のようなスキーマがありました。

列見出し(フィールド名、タイプ、モード、ポリシータグ、説明)の下に複数のデータ行が表示された [スキーマ] タブで開いている BigQuery ログページ

データが存在しない NULLABLE フィールドがある Apache Beam スキーマを、パイプラインの実行自体およびその実行の結果である BigQuery テーブルの両方について作成するのが良いでしょう。

次のように、NULL 値を許容したいフィールドに新しいプロパティ mode を追加して JSON BigQuery スキーマを更新することができます。

{ "name": "field_name", "type": "STRING", "mode": "NULLABLE" }
  • このタスクを完成するには、BigQuery スキーマの lat フィールドと lon フィールドを null 値許容に設定します。

タスク 6. コマンドラインでパイプラインを実行する

  • このタスクを完了するには、コマンドラインでパイプラインを実行して適切なパラメータを渡します。生成される BigQuery スキーマの NULLABLE フィールドを忘れずにメモしておいてください。コードは次のようになります。
# 環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export COLDLINE_BUCKET=${BUCKET}-coldline export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export OUTPUT_PATH=${PIPELINE_FOLDER}-coldline/pipeline_output export TABLE_NAME=${PROJECT_ID}:logs.logs_filtered cd $BASE_DIR python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --outputPath=${OUTPUT_PATH} \ --tableName=${TABLE_NAME} 注: パイプラインが問題なく構築されているのにコードや Dataflow サービスの構成ミスによりエラーが大量に発生する場合には、「runner」を「DirectRunner」に戻してローカルで実行すると、より速くフィードバックを得ることができます。今回のケースはデータセットが小規模で、DirectRunner が対応している機能のみを使用しているため、この手法が有効です。

タスク 7. パイプラインの結果を確認する

  1. Dataflow の [ジョブ] ページに移動して、実行中のジョブを確認します。グラフは次のようになっているはずです。

間に複数のポイントがある ReadFrom GS から DropInputs へのジョブフローと、ReadFrom GS から WriteToGCS に直接流れるジョブフローが表示されたフロー図

  1. Filter 関数を表すノード(上の図では FilterFn)をクリックします。右側に表示されたパネルで、入力として追加された要素が出力として書き込まれた要素よりも多いことが確認できます。

  2. 次に Cloud Storage への書き込みを表すノードをクリックします。すべての要素が書き込まれているので、この数字は Filter 関数への入力の要素数と一致しているはずです。

  3. パイプラインが終了したら、テーブルに対してクエリを実行して BigQuery で結果を確認します。テーブル内のレコード数は Filter 関数で出力された要素の数と一致しているはずです。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 コマンドラインでパイプラインを実行する

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします