arrow_back

Dataflow を使用したサーバーレスのデータ処理 - Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Python)

ログイン 参加
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

Dataflow を使用したサーバーレスのデータ処理 - Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Python)

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 中級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

概要

このラボでは、次の方法について学びます。

  • Google Cloud Storage から元データを取得して Google BigQuery に書き込む、抽出、変換、読み込みバッチ パイプラインを Apache Beam で構築する。
  • Apache Beam パイプラインを Cloud Dataflow で実行する。
  • パイプラインの実行をパラメータ化する。

前提条件:

  • Python の基本的な知識。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

Jupyter ノートブック ベースの開発環境の設定

このラボでは、すべてのコマンドをノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [ワークベンチ] をクリックします。

  2. Notebooks API を有効にします。

  3. ワークベンチのページで [新規作成] をクリックします。

  4. 表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを に、ゾーンを に設定します。

  5. [環境] で [Apache Beam] を選択します。

  6. ダイアログ ボックスの下部にある [作成] をクリックします。

注: 環境の完全なプロビジョニングには 3~5 分かかる場合があります。処理が完了するまでお待ちください。 注: [Notebook API を有効にする] をクリックして Notebooks API を有効にします。
  1. 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。

IDE_link

  1. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

ターミナルを開く

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

展開された [表示] メニューでハイライト表示されているエクスプローラ オプション

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。ファイルが開き、コードを追加または変更できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する

Apache Beam と Cloud Dataflow

5 分程度

Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。

Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルで、エンドユーザーは Java、Python、Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、小規模データセットであればローカルの開発マシンで、大規模には Cloud Dataflow で実行できます。ただし、オープンソースである Apache Beam では、他のランナーも使用できます。Apache Flink や Apache Spark などで Beam パイプラインを実行できます。

ラボ ネットワーク アーキテクチャの図

ラボパート 1. ETL パイプラインをゼロから作成する

はじめに

このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。

データセットとユースケースのレビュー

このクエストの各ラボでは、入力データは共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれている可能性のあるその他のデータに似ていると想定されています。この最初のラボでは、データはバッチソースとして扱われます。後のラボでは、データはストリーミング ソースとして扱われます。タスクは、データを読み取り、解析し、後のデータ分析のためにサーバーレス データ ウェアハウスである BigQuery に書き込むことです。

該当するラボを開く

  • IDE のターミナルに戻り、次のコマンドをコピーして貼り付けます。
cd 1_Basic_ETL/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. ターミナルで、このラボでの作業用に仮想環境を作成します。
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインの実行に必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. 最後に、Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com

最初のパイプラインを作成する

1 時間

タスク 1. 合成データを生成する

  1. ターミナルで次のコマンドを実行し、合成ウェブサーバー ログを生成するスクリプトが含まれているリポジトリのクローンを作成します。
cd $BASE_DIR/../.. source create_batch_sinks.sh bash generate_batch_events.sh head events.json

このスクリプトによって events.json というファイルが作成され、次のような行が含まれます。

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

このファイルは にある Google Cloud Storage バケットに自動的にコピーされます。

  1. 別のブラウザタブで、Google Cloud Storage に移動し、ストレージ バケットに events.json というファイルが含まれていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 合成データを生成する

タスク 2. ソースからデータを読み取る

このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。

  1. ファイル エクスプローラで、ラボフォルダである 1_Basic_ETL/lab に移動し、my_pipeline.py をクリックします。編集パネルにそのファイルが開きます。次のパッケージがインポートされることを確認します。
import argparse import time import logging import json import apache_beam as beam from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners import DataflowRunner, DirectRunner
  1. run() メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。
options = PipelineOptions() # オプションを設定する p = beam.Pipeline(options=options) # 実行する p.run()
  • Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の PCollection を作成にするには、ルート変換をパイプライン オブジェクトに適用する必要があります。ルート変換により、指定した外部データソースまたは一部のローカルデータから PCollection が作成されます。

  • Beam SDK のルート変換には、ReadCreate の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、メモリ内の list から PCollection を作成し、特にテストで有用です。

次のサンプルコードは、ReadFromText ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は、Pipeline オブジェクトである p に適用され、PCollection[str] 形式(パラメータ化された型ヒントに由来する表記法を使用しています)のパイプライン データセットを返します。ReadLines はこの変換に付けられた名前で、後により大規模なパイプラインに取り組む際に役立つでしょう。

lines = p | "ReadLines" >> beam.io.ReadFromText("gs://path/to/input.txt")
  1. run() メソッド内で、input という文字列定数を作成し、値を gs://<プロジェクト ID>/events.json に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。

  2. textio.ReadFromText 変換を呼び出す方法により、events.json のイベントすべての文字列の PCollection を作成します。

  3. my_pipeline.py の先頭に適切なインポート ステートメントを追加します。

  4. 作業を保存するには、上部のナビゲーション メニューで [ファイル] をクリックし、[保存] を選択します。

タスク 3. パイプラインを実行し、動作することを確認する

  • ターミナルに戻り、$BASE_DIR フォルダに戻って次のコマンドを実行します。パイプラインを実行する前に、PROJECT_ID 環境変数を設定してください。
cd $BASE_DIR # 環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) # パイプラインを実行する python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DirectRunner

この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。

しかし、この実行により有用なワークフローが示されます。DirectRunner を使用してローカルマシンでパイプラインを実行することで、より高価な計算を行う前にパイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runnerDataflowRunner に変更できます。

タスク 4. 変換を追加する

ヒントが必要な場合は、ソリューションを参照してください。

変換とは、データに変更をもたらすものです。Apache Beam では、変換は PTransform クラスが行います。実行時に、これらのオペレーションは独立した多くのワーカーで実行されます。

あらゆる PTransform の入力と出力は PCollection です。実際、お気づきでないかもしれませんが、Google Cloud Storage からデータを読み取った際に PTransform をすでに使用しています。変数に割り当てたかどうかに関係なく、そのときに文字列の PCollection が作成されました。

Beam の Python バージョンでは、パイプ演算子 | で表される汎用 apply メソッドを PCollection に対して使用するため、変換の連続的な連鎖を行えます。たとえば、次のように変換の連鎖による連続的なパイプラインを作成できます。

[Output_PCollection] = ([Input_PCollection] | [First Transform] | [Second Transform] | [Third Transform])

このタスクでは、新しい種類の変換である ParDo を使用します。ParDo は汎用並列処理のための Beam 変換です。

ParDo 処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける Map フェーズに似ています。ParDo 変換は入力 PCollection の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、出力 PCollection に 0 個、1 個、または複数個の要素を出力します。

ParDo はさまざまな一般的データ処理オペレーションに役立ちます。ただし、Python にはプロセスを簡単にする、次のような特別な PTransform があります。

  • データセットをフィルタリングする。Filter を使用して、PCollection の各要素を検討し、Python の呼び出し可能オブジェクトが返す出力ブール値に応じて、その要素を新しい PCollection に出力するか廃棄するかできます。
  • データセット内の各要素を整形または型変換する。目的の型や形式とは異なる要素が入力 PCollection に含まれている場合は、Map を使用して各要素を変換し、結果を新しい PCollection に出力できます。
  • データセット内の各要素の部分を抽出する。たとえば、複数のフィールドで構成されるレコードの PCollection がある場合、同様に Map または FlatMap を使用して、必要なフィールドだけを新しい PCollection に取り出すことができます。
  • データセット内の各要素に対して計算を実行する。ParDoMapFlatMap を使用して、PCollection のすべての要素または特定の要素に対して単純な計算または複雑な計算を行い、結果を新しい PCollection として出力できます。

このタスクを完了するには、単一イベントを表す JSON 文字列を読み取って、Python の json パッケージを使用して解析し、json.loads が返す辞書を出力する、Map 変換を作成する必要があります。

Map 関数の実装方法には、インラインで行う方法と事前定義された呼び出し可能オブジェクトを介して行う方法の 2 つがあります。インライン Map 関数は次のように記述します。

p | beam.Map(lambda x : something(x))

また、スクリプトの以前の箇所で定義されている Python の呼び出し可能オブジェクトと一緒に、beam.Map を使用できます。

def something(x): y = # 何かを行う return y p | beam.Map(something)

beam.Map(や他の軽量の DoFn)によって実現される以上の柔軟性が必要な場合、DoFn のサブクラスであるカスタム DoFn を使用して ParDo を実装できます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。

class MyDoFn(beam.DoFn): def process(self, element): output = #何かを行う yield output p | beam.ParDo(MyDoFn())

ヒントが必要な場合は、ソリューションを参照してください。

タスク 5. シンクに書き込む

この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の Python 辞書を出力します。次のステップでは、これらのオブジェクトを BigQuery テーブルに書き込みます。

  1. BigQuery テーブルを必要に応じて作成するようにパイプラインに指示できますが、事前にデータセットを作成する必要があります。これは generate_batch_events.sh スクリプトによりすでに行われています。次のコードを使用して、データセットを調べることができます。
# データセットを調べる bq ls # まだテーブルはない bq ls logs

パイプラインの最終 PCollection を出力するには、Write 変換をその PCollection に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection の要素を出力できます。Write を使用してパイプラインでいつでも PCollection を出力できますが、通常はパイプラインの最後にデータを書き出します。

次のサンプルコードは、WriteToText 変換を適用して文字列の PCollection をテキスト ファイルに書き込む方法を示しています。

p | "WriteMyFile" >> beam.io.WriteToText("gs://path/to/output")
  1. ここでは、WriteToText の代わりに WriteToBigQuery を使用します。

この関数は、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつか指定が必要です。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期のイテレーションで有用)、テーブルが存在しない場合に作成、のいずれかを指定できます。デフォルトで、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません

  1. ただし、スキーマの指定が必要です。これには次の 2 つの方法があります。スキーマは単一の文字列または JSON 形式で指定できます。たとえば、辞書に 3 つのフィールド、すなわち名前(str 型)、ID(int 型)、残高(float 型)があるとします。この場合、スキーマを単一の行として指定できます。
table_schema = 'name:STRING,id:INTEGER,balance:FLOAT'

または、JSON として指定します。

table_schema = { "fields": [ { "name": "name", "type": "STRING" }, { "name": "id", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "balance", "type": "FLOAT", "mode": "REQUIRED" } ] }

最初のケース(単一の文字列)では、フィールドはすべて NULLABLE とみなされます。JSON 方式の場合は、モードを指定できます。

  1. テーブル スキーマを定義したら、DAG にシンクを追加できます。
p | 'WriteToBQ' >> beam.io.WriteToBigQuery( 'project:dataset.table', schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) 注: WRITE_TRUNCATE は毎回テーブルを削除して再作成します。この方法はパイプラインの初期のイテレーション、特にスキーマでのイテレーションの場合に有用です。しかし、予期しない問題が本番環境で簡単に引き起こされる可能性があります。WRITE_APPEND または WRITE_EMPTY の方が安全です。

テーブル スキーマの定義パイプラインへの BigQuery シンクの追加を忘れないでください。ヒントが必要な場合は、ソリューションを参照してください。

タスク 6. パイプラインを実行する

  1. ターミナルに戻り、以前とほぼ同じコマンドを使用してパイプラインを実行します。ただし、今回は DataflowRunner を使用し、Cloud Dataflow でパイプラインを実行します。
# 環境変数を設定する cd $BASE_DIR export PROJECT_ID=$(gcloud config get-value project) # パイプラインを実行する python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DataflowRunner

全体の形は Read 変換で始まり Write 変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況をモニタリングできます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。

注: パイプラインの構築が順調でも、コードや Dataflow サービスの構成ミスのために多くのエラーが発生する場合、runner の設定を DirectRunner に戻してローカルで実行し、迅速にフィードバックを受け取ることができます。今回のケースはデータセットが小規模で、DirectRunner がサポートしている機能のみを使用しているため、この手法がうまく行きます。
  1. パイプラインが終了したら、BigQuery のブラウザ ウィンドウに戻り、テーブルに対してクエリを実行します。

コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインを実行する

ラボのパート 2. 基本的な ETL をパラメータ化する

約 20 分

データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。

  1. 開発環境を作成し、パイプラインを開発しました。環境には Apache Beam SDK やその他の依存関係を追加しました。
  2. 開発環境からパイプラインを実行しました。Apache Beam SDK は、Cloud Storage 内のファイルをステージングし、ジョブ リクエスト ファイルを作成して Cloud Dataflow サービスに送信しました。

ジョブを API 呼び出しで、または開発環境の設定(技術者以外のユーザーには不可能でしょう)を必要とせずに、開始する方法があるとはるかに好ましいでしょう。この方法でパイプラインを実行することもできます。

Dataflow テンプレートはこの問題の解決を、パイプラインのコンパイル時に作成される表現を変更してパラメータ化を可能にすることにより追求します。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。

  1. デベロッパーが開発環境を作成し、パイプラインを開発します。環境には、Apache Beam SDK やその他の依存関係が含まれます。
  2. デベロッパーがパイプラインを実行し、テンプレートを作成します。Apache Beam SDK が Cloud Storage 内のファイルをステージングし、(ジョブ リクエストに似た)テンプレート ファイルを作成して Cloud Storage 内に保存します。
  3. デベロッパー以外のユーザーや Airflow のような他のワークフロー ツールは、Google Cloud コンソール、gcloud コマンドライン ツール、または REST API を使用して簡単にジョブを実行し、テンプレート ファイル実行リクエストを Cloud Dataflow サービスに送信できます。

このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。

タスク 1. JSON スキーマ ファイルを作成する

以前と同様に、この例のスキーマを表す JSON ファイルを Dataflow テンプレートに渡す必要があります。

  1. IDE のターミナルに戻ります。次のコマンドを実行して、メイン ディレクトリに戻り、既存の logs.logs テーブルからスキーマを取得します。
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. この出力をファイルに取得し、GCS にアップロードします。追加の sed コマンドは、Dataflow が期待する完全な JSON オブジェクトを構築するためです。
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 JSON スキーマ ファイルを作成する

タスク 2. JavaScript ユーザー定義関数を作成する

Cloud Storage to BigQuery Dataflow テンプレートは、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。

  1. このタスクを完了するには、IDE のファイル エクスプローラで、dataflow_python フォルダに新しいファイルを作成します。

  2. 新しいファイルを作成するには、[ファイル] >> [新規] >> [テキスト ファイル] とクリックします。

  3. ファイル名を transform.js に変更します。ファイル名を変更するには、ファイル名を右クリックします。

  4. 編集パネルで transform.js ファイルを開きます。ファイルをクリックすると開きます。

  5. 以下の関数を transform.js ファイルにコピーし、保存します。

function transform(line) { return line; }
  1. 次のコマンドを実行して、ファイルを Google Cloud Storage にコピーします。
export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 JavaScript ファイルに JavaScript ユーザー定義関数を書き込む

タスク 3. Dataflow テンプレートを実行する

  1. Cloud Dataflow ウェブ UI に移動します。
  2. [テンプレートからジョブを作成] をクリックします。
  3. Cloud Dataflow ジョブのジョブ名を入力します。
  4. [Dataflow テンプレート] で、[ストリーミング] セクションではなく [データを一括で処理する(バッチ)] セクションの [Cloud Storage 上のテキスト ファイルから BigQuery へ] テンプレートを選択します。
  5. [Cloud Storage 入力ファイル] に、events.json へのパスを の形式で入力します。
  6. [BigQuery スキーマ ファイルの Cloud Storage の場所] に、schema.json ファイルへのパスを の形式で入力します。
  7. [BigQuery 出力テーブル] に「」と入力します。
  8. [一時 BigQuery ディレクトリ] に、この同じバケット内の新しいフォルダを入力します。これはジョブが作成します。
  9. [一時的な場所] で、この同じバケット内の 2 つ目の新しいフォルダを入力します。
  10. [暗号化] は 「Google が管理する暗号鍵」のままにします。
  11. [オプション パラメータ] をクリックして開きます。
  12. [Cloud Storage 内の JavaScript UDF パス] に、.js へのパスを の形式で入力します。
  13. [JavaScript UDF 名] に「transform」と入力します。
  14. [ジョブを実行] ボタンをクリックします。

実行中のジョブは Dataflow ウェブ UI 内で確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Dataflow テンプレートを実行する

タスク 4. Dataflow テンプレート コードを確認する

先ほど使用した Dataflow テンプレートのコードは、こちらの TextIOToBigQuery ガイドにあります。

  • main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。

    • PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成するところから始まります。
    • TextIO.read() 変換で始まる PTransform のチェーンとして構成されています。
    • read 変換の後の PTransform は少し異なっていて、たとえば、ソース形式が BigQuery テーブル形式とそれほど合っていない場合に、JavaScript を使用して入力文字列を変換できます。この機能の使用方法については、こちらのページのドキュメントをご覧ください。
    • JavaScript UDF の後の PTransform は、ライブラリ関数を使用して JSON をテーブルの行に変換します。コードはこちらで確認できます。
    • write PTransform は、グラフのコンパイル時に認識しているスキーマを使用するのではなく、実行時にのみ認識するパラメータを受け取るコードであるため、少し異なって見えます。これは NestedValueProvider クラスにより実現されます。

次回のラボもチェックしてください。PTransform のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

このコンテンツは現在ご利用いただけません

We will notify you via email when it becomes available

ありがとうございます。

We will contact you via email if it becomes available