arrow_back

Dataflow を使用したサーバーレスのデータ処理 — Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Java)

ログイン 参加
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

Dataflow を使用したサーバーレスのデータ処理 — Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Java)

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 中級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

概要

このラボの内容:

  • Google Cloud Storage から元データを取得して Google BigQuery に書き込む、抽出 / 加工 / 読み込みのバッチ パイプラインを Apache Beam で構築する。
  • Apache Beam パイプラインを Cloud Dataflow で実行する。
  • パイプラインの実行をパラメータ化する。

前提条件:

  • Java に関する基本的な知識。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

IDE の設定

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。

注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。ファイル エクスプローラ ボタンをクリックして確認します。

file_explorer

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

new_terminal

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud_auth

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

gce_reset

Apache Beam と Cloud Dataflow

5 分程度

Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。

Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルであり、エンドユーザーが Java、Python、または Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、データセットが小規模な場合にはローカルの開発マシンで、大規模の場合には Cloud Dataflow で実行できます。ただし、Apache Beam はオープンソースであるため、ランナーは他にもあり、Apache Flink や Apache Spark などで Beam パイプラインを実行することもできます。

dBeam モデル アーキテクチャの図。

ラボのパート 1。ETL パイプラインをゼロから作成する

はじめに

このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。

データセットとユースケースのレビュー

このクエストの各ラボの入力データは、共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれるその他のデータに類似すると想定されています。この最初のラボではデータがバッチソースとして扱われ、後のラボではデータがストリーミング ソースとして扱われます。タスクは、後のデータ分析のために、データを読み取って解析してからサーバーレス データ ウェアハウスである BigQuery に書き込むことです。

適切なラボを開く

  • IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリをラボに変更する cd 1_Basic_ETL/labs export BASE_DIR=$(pwd)

pom.xml ファイルを修正する

実際のパイプライン コードの編集を開始する前に、必要な依存関係を追加する必要があります。

  1. 1_Basic_ETL/labs にある pom.xml ファイルの dependency タグ内に、次の依存関係を追加します。
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> <version>${beam.version}</version> </dependency>
  1. <beam.version> タグは、インストールする Beam のバージョンを指定するためにすでに pom.xml に追加されています。ファイルを保存します。

  2. 最後に、パイプラインで使用するためにこれらの依存関係をダウンロードします。

# pom.xml にリストされている依存関係をダウンロードする mvn clean dependency:resolve

最初のパイプラインを作成する

1 時間

タスク 1. 合成データを生成する

  1. シェルで次のコマンドを実行し、合成ウェブサーバー ログを生成するスクリプトが含まれているリポジトリのクローンを作成します。
# 関連コードが含まれているディレクトリに移動する cd $BASE_DIR/../.. # GCS バケットと BQ データセットを作成する source create_batch_sinks.sh # ウェブサーバーのログイベントのバッチを生成するスクリプトを実行する bash generate_batch_events.sh # 一部のサンプル イベントを確認する head events.json

このスクリプトにより、次のような行が含まれた events.json というファイルが作成されます。

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

次に、このファイルは自動的に gs://<YOUR-PROJECT-ID>/events.json の Google Cloud Storage バケットにコピーされます。

  1. Google Cloud Storage に移動し、ストレージ バケットに events.json というファイルが含まれていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 合成データを生成する

タスク 2. ソースからデータを読み取る

このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。

  1. IDE で MyPipeline.java を開きます。これは 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline にあります。次のパッケージがインポートされることを確認します。
import com.google.gson.Gson; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  1. run() メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。
Pipeline pipeline = Pipeline.create(options); // 処理を実行する pipeline.run();

Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の PCollection を作成にするには、ルート変換をパイプライン オブジェクトに適用します。ルート変換により、指定した外部データソースまたは一部のローカルデータから PCollection が作成されます。

Beam SDK のルート変換には、Read と Create の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、PCollection をメモリ内の java.util.Collection から作成します。これは特にテストで有用です。

次のサンプルコードは、TextIO.Read ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は Pipeline オブジェクト p に適用され、パイプライン データを PCollection<String> の形式で返します。「ReadLines」は変換に付けられた名前です。後で、より大規模なパイプラインに取り組む際に役立つでしょう。

PCollection<String> lines = pipeline.apply("ReadLines", TextIO.read().from("gs://path/to/input.txt"));
  1. run() メソッド内で、「input」という文字列定数を作成し、値を gs://<YOUR-PROJECT-ID>/events.json に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。

  2. TextIO.read() 変換の呼び出しを通じて、events.json の全イベントの文字列で構成される PCollection を作成します。

  3. MyPipeline.java の先頭に適切な import ステートメントを追加します。この例の場合は import org.apache.beam.sdk.values.PCollection; です。

タスク 3. パイプラインを実行し、動作することを確認する

  • ターミナルに戻り、$BASE_DIR フォルダに移動して mvn compile exec:java コマンドを実行します。
cd $BASE_DIR # 環境変数を設定する export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} 注: ビルドに失敗した場合は mvn clean install コマンドを実行します

この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。しかし、この実行によって有用なワークフローが示されます。より高コストな計算を行う前に、ローカルマシン上で DirectRunner を使用して、パイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runnerDataflowRunner に変更できます。

タスク 4. 変換を追加する

ヒントが必要な場合は、ソリューションを参照してください。

変換とは、データを変更する処理です。Apache Beam では、変換は PTransform クラスが行います。変換オペレーションは実行時に、独立した多数のワーカーで実行されます。あらゆる PTransform の入力と出力は PCollection です。お気づきでないかもしれませんが、実際、Google Cloud Storage からデータを読み取った際に PTransform をすでに使用しています。それにより、変数に割り当てたかどうかに関係なく、文字列の PCollection が作成されました。

Beam では PCollection に対して汎用 apply メソッドを使用するので、変換を順次に連結できます。たとえば次のように、変換を連結して順次パイプラインを作成できます。

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]);

このタスクでは、新しい種類の変換である ParDo を使用します。ParDo は汎用並列処理のための Beam 変換です。ParDo 処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける「Map」フェーズに似ています。ParDo 変換は入力 PCollection の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、0 個、1 個、または複数個の要素を出力 PCollection に生成します。

ParDo は、次のような各種の一般的なデータ処理オペレーションで役に立ちます。

  • データセットをフィルタリング。ParDo を使用して、PCollection の各要素を検討し、その要素を新しいコレクションに出力するか、または廃棄できます。
  • データセット内の各要素をフォーマットまたは型変換。目的の型やフォーマットとは異なる要素が入力 PCollection に含まれている場合は、ParDo を使用して各要素を変換し、結果を新しい PCollection に出力できます。
  • データセット内の各要素の部分を抽出。たとえば、複数のフィールドで構成されるレコードの PCollection がある場合、ParDo を使用して必要なフィールドだけを新しい PCollection に取り出すことができます。
  • データセット内の各要素の計算を実行。ParDo を使用して、PCollection のすべての要素または特定の要素に対して単純な計算や複雑な計算を行い、結果を新しい PCollection として出力できます。
  1. このタスクを完了するには、単一イベントを表す JSON 文字列を読み取って、Gson を使用して解析し、Gson によって返されるカスタム オブジェクトを出力する ParDo 変換を作成します。

ParDo 関数は、インラインで実装するか、静的クラスとして実装できます。インライン ParDo 関数は次のように記述します。

pCollection.apply(ParDo.of(new DoFn<T1, T2>() { @ProcessElement public void processElement(@Element T1 i, OutputReceiver<T2> r) { // 処理を実行する r.output(0); } }));

または、DoFn を拡張する静的クラスとして実装することもできます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。

static class MyDoFn extends DoFn<T1, T2> { @ProcessElement public void processElement(@Element T1 json, OutputReceiver<T2> r) throws Exception { // 処理を実行する r.output(0); } }

続いてパイプライン内は次のようになります。

[Initial Input PCollection].apply(ParDo.of(new MyDoFn());
  1. Gson を使用するには、MyPipeline 内に内部クラスを作成する必要があります。Beam スキーマを利用するために @DefaultSchema アノテーションを追加します。詳しくは後で説明します。Gson の使用例は次のとおりです。
// 他の場所 @DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; String field2; } // DoFn 内 Gson gson = new Gson(); MyClass myClass = gson.fromJson(jsonString, MyClass.class);
  1. 内部クラスに CommmonLog という名前を付けます。適切な状態変数を使用してこの内部クラスを作成する方法については、上記の JSON サンプルをご覧ください。このクラスには、受信 JSON のすべてのキーに対して 1 つの状態変数が必要です。また、その変数では、タイプおよび名前を値およびキーと一致させる必要があります。

  2. ここでは「timestamp」に String、「INTEGER」に Long(BigQuery の INTEGER は INT64)、「FLOAT」には Double(BigQuery の FLOAT は FLOAT64)を使用し、次の BigQuery スキーマに一致させます。

CommonLog の [スキーマ] タブページ。user_id、timestamp、num_bytes などのログ情報が含まれています。

ヒントが必要な場合は、ソリューションを参照してください。

タスク 5. シンクに書き込む

この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の CommonLog を生成します。次のステップでは、これらの CommonLog オブジェクトを BigQuery テーブルに書き込みます。

必要に応じて BigQuery テーブルを作成するようにパイプラインに指示できますが、事前にデータセットを作成する必要があります。これは generate_batch_events.sh スクリプトによってすでに行われています。

データセットは次の方法で調べることができます。

# データセットを調べる bq ls # まだテーブルはない bq ls logs

パイプラインの最終的な PCollection を出力するには、Write 変換をその PCollection に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection の要素を出力できます。Write を使用するとパイプライン内の任意の時点で PCollection を出力できますが、通常はパイプラインの最後にデータを書き出します。

次のサンプルコードは、TextIO.Write 変換を適用して文字列の PCollection をテキスト ファイルに書き込む方法を示しています。

PCollection<String> pCollection = ...; pCollection.apply("WriteMyFile", TextIO.write().to("gs://path/to/output"));
  1. ここでは TextIO.write() の代わりに BigQueryIO.write() を使用します。

この関数には、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつかの項目を指定する必要があります。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期イテレーションで有用)、テーブルが存在しない場合の作成のいずれかを指定できます。デフォルトの場合、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません

Beam スキーマを SDK に追加した後に、.useBeamSchema() を使用し、入力タイプをマークすることで、渡されたオブジェクトからテーブル スキーマを推定するよう変換に対して指示できます。または、.withSchema() を使用してスキーマを明示的に指定することもできますが、その場合は BigQuery TableSchema オブジェクトを作成して渡す必要があります。CommonLog クラスに @DefaultSchema(JavaFieldSchema.class) でアノテーションを付けているため、各変換は、BigQueryIO.write() など、オブジェクトのフィールドの名前とタイプを認識します。

  1. BigQueryIO の「Writing」セクションで、さまざまな他の方法を調べることができます。ここでは、CommonLog オブジェクトにアノテーションを付けているため、次のように .useBeamSchema() を使用し、<YOUR-PROJECT-ID>:logs.logs テーブルをターゲットに設定します。
pCollection.apply(BigQueryIO.<MyObject>write() .to("my-project:output_dataset.output_table") .useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) ); 注: WRITE_TRUNCATE によって、毎回テーブルの削除、再作成が行われます。この方法はパイプラインの初期イテレーション、特にスキーマでのイテレーションの場合に有用です。しかし、予期しない問題が本番環境で簡単に引き起こされる可能性があります。WRITE_APPEND または WRITE_EMPTY の方が安全です。

Beam スキーマで利用できるすべてのタイプは、Schema.FieldType のドキュメントで確認できます。標準 SQL で使用できるすべての BigQuery データ型については、setType のドキュメントで確認できます。興味がある方は、Beam スキーマから BigQuery への変換についてもご確認ください。

タスク 6. パイプラインを実行する

ターミナルに戻って RUNNER 環境変数の値を DataflowRunner に変更し、前述と同じコマンドを使用してパイプラインを実行します。実行開始後は、Dataflow のプロダクト ページに移動して、パイプラインの配置に注目してください。変換に名前を付けている場合は、その名前が表示されます。それぞれの変換名をクリックすると、毎秒処理されている要素の数がリアルタイムで表示されます。

全体の形は Read 変換で始まり Write 変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況を観察できます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。

注: パイプラインの構築が成功していても、コードや Dataflow サービスの構成ミスが原因で多くのエラーが発生する場合は、RUNNER の設定を「DirectRunner」に戻してローカルで実行し、より速やかにフィードバックを得ることができます。今回のケースはデータセットが小規模で、DirectRunner がサポートしている機能のみを使用しているため、この手法が有効です。 # 環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export PIPELINE_FOLDER=gs://${PROJECT_ID} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER}"
  • パイプラインが終了したら、BigQuery のブラウザ ウィンドウに戻り、テーブルに対してクエリを実行します。

コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインを実行する

ラボのパート 2. 基本的な ETL をパラメータ化する

約 20 分

データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。

  1. 開発環境を作成し、パイプラインを開発しました。環境には Apache Beam SDK やその他の依存関係を追加しました。
  2. 開発環境からパイプラインを実行しました。Apache Beam SDK は、Cloud Storage 内のファイルをステージングし、ジョブ リクエスト ファイルを作成して Cloud Dataflow サービスに送信しました。

開発環境を設定(技術者以外のユーザーには不可能でしょう)する必要がなく、ジョブを API 呼び出しで開始する方法があれば、はるかに好ましいでしょう。その方法でパイプラインを実行することもできます。

Dataflow テンプレートでは、パイプラインのコンパイル時に作成される表現を変更し、パラメータ化を可能にすることで、上記の問題を解決しようとしています。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。

  1. デベロッパーが開発環境を作成し、パイプラインを開発します。環境には、Apache Beam SDK やその他の依存関係が含まれます。
  2. デベロッパーがパイプラインを実行し、テンプレートを作成します。Apache Beam SDK が Cloud Storage 内のファイルをステージングし、(ジョブ リクエストに似た)テンプレート ファイルを作成して Cloud Storage 内に保存します。
  3. デベロッパー以外のユーザーや Airflow のような他のワークフロー ツールは、Google Cloud コンソール、gcloud コマンドライン ツール、または REST API を使用して簡単にジョブを実行し、テンプレート ファイル実行リクエストを Cloud Dataflow サービスに送信できます。

このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。

タスク 1. JSON スキーマ ファイルを作成する

.usedBeamSchema() を使用したため、TableSchema オブジェクトを BigQueryIO.writeTableRows() 変換に渡す必要がありませんでしたが、Dataflow テンプレートには、この例のスキーマを表す JSON ファイルを渡す必要があります。

  1. ターミナルを起動し、メイン ディレクトリに戻ります。次のコマンドを実行し、既存の logs.logs テーブルからスキーマを取得します。
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. この出力をファイルに取得し、GCS にアップロードします。追加の sed コマンドは、Dataflow が想定する完全な JSON オブジェクトを構築するためのものです。
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 JSON スキーマ ファイルを作成する

タスク 2. JavaScript ユーザー定義関数を記述する

Cloud Storage to BigQuery Dataflow テンプレートには、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。

このタスクを完了するには、IDE を使用し、以下の内容で .js ファイルを作成してから、それを Google Cloud Storage にコピーします。

  1. 以下の関数を 1_Basic_ETL/ フォルダの transform.js ファイルにコピーします。
function transform(line) { return line; }
  1. 次のコマンドを実行して、ファイルを Google Cloud Storage にコピーします。
cd 1_Basic_ETL/ export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 JavaScript ユーザー定義関数を記述する

タスク 3. Dataflow テンプレートを実行する

  1. Cloud Dataflow ウェブ UI に移動します。
  2. [テンプレートからジョブを作成] をクリックします。
  3. Cloud Dataflow ジョブの名前を入力します。
  4. [Cloud Dataflow テンプレート] で、[Process Data in Bulk (batch)] セクションの [Text Files on Cloud Storage to BigQuery] テンプレートを選択します([ストリーミング] セクションではない点に注意)。
  5. [Cloud Storage 内の JavaScript UDF パス] で、.js へのパスを gs://<YOUR-PROJECT-ID>/transform.js の形式で入力します。
  6. [JSON パス] で、schema.json ファイルへのパスを gs://<YOUR-PROJECT-ID>/schema.json の形式で入力します。
  7. [JavaScript UDF 名] に「transform」と入力します。
  8. [BigQuery 出力テーブル] に「<myprojectid>:logs.logs」と入力します。
  9. [Cloud Storage 入力パス] に、events.json へのパスを gs://<YOUR-PROJECT-ID>/events.json の形式で入力します。
  10. [BigQuery 一時ディレクトリ] に、この同じバケット内の新しいフォルダを入力します。このフォルダはジョブによって作成されます。
  11. [一時的な場所] で、この同じバケット内の 2 つ目の新しいフォルダを入力します。
  12. [暗号化] は [Google が管理する鍵] のままにします。
  13. [ジョブを実行] ボタンをクリックします。

実行中のジョブは Dataflow ウェブ UI 内で確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Dataflow テンプレートを実行する

タスク 4. Dataflow テンプレート コードを確認する

  1. 先ほど使用した Dataflow テンプレートのコードを思い出しましょう。

  2. main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。

  • PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成するところから始まります。
  • TextIO.read() 変換で始まる PTransform のチェーンとして構成されています。
  • read 変換の後の PTransform は少し異なっていて、たとえば、ソース形式が BigQuery テーブル形式とうまく合っていない場合に、JavaScript を使用して入力文字列を変換できます。この機能の使用方法については、こちらのページのドキュメントをご覧ください。
  • JavaScript UDF の後の PTransform は、ライブラリ関数を使用して JSON をテーブルの行に変換します。コードはこちらで確認できます。
  • write PTransform は、グラフのコンパイル時に認識しているスキーマを使用するのではなく、実行時にのみ認識するパラメータを受け取るコードであるため、少し異なる見た目になっています。これは NestedValueProvider クラスによって実現されます。また、前の手順で .useBeamSchema() を使用して Beam スキーマから推定したものではなく、明示的に設定されたスキーマを使用します。
  1. 次のラボもチェックしてください。PTransform のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

このコンテンツは現在ご利用いただけません

We will notify you via email when it becomes available

ありがとうございます。

We will contact you via email if it becomes available