チェックポイント
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
Dataflow を使用したサーバーレスのデータ処理 — Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Java)
- 概要
- 設定と要件
- Apache Beam と Cloud Dataflow
- ラボのパート 1。ETL パイプラインをゼロから作成する
- タスク 1. 合成データを生成する
- タスク 2. ソースからデータを読み取る
- タスク 3. パイプラインを実行し、動作することを確認する
- タスク 4. 変換を追加する
- タスク 5. シンクに書き込む
- タスク 6. パイプラインを実行する
- ラボのパート 2. 基本的な ETL をパラメータ化する
- タスク 1. JSON スキーマ ファイルを作成する
- タスク 2. JavaScript ユーザー定義関数を記述する
- タスク 3. Dataflow テンプレートを実行する
- タスク 4. Dataflow テンプレート コードを確認する
- ラボを終了する
概要
このラボの内容:
- Google Cloud Storage から元データを取得して Google BigQuery に書き込む、抽出 / 加工 / 読み込みのバッチ パイプラインを Apache Beam で構築する。
- Apache Beam パイプラインを Cloud Dataflow で実行する。
- パイプラインの実行をパラメータ化する。
前提条件:
- Java に関する基本的な知識。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例:
1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。 -
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。 -
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud Shell の有効化
Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。
Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
-
Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。
-
[続行] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。
gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
- 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
出力:
出力例:
- 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
出力:
出力例:
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
-
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
-
Compute Engine のデフォルトのサービス アカウント
{project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。- Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
- プロジェクト番号(例:
729328892908
)をコピーします。 - ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
- ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
- [新しいプリンシパル] に次のように入力します。
-
{project-number}
はプロジェクト番号に置き換えてください。 - [ロール] で、[Project](または [基本])> [編集者] を選択します。
- [保存] をクリックします。
IDE の設定
このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
Apache Beam と Cloud Dataflow
5 分程度
Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。
Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルであり、エンドユーザーが Java、Python、または Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、データセットが小規模な場合にはローカルの開発マシンで、大規模の場合には Cloud Dataflow で実行できます。ただし、Apache Beam はオープンソースであるため、ランナーは他にもあり、Apache Flink や Apache Spark などで Beam パイプラインを実行することもできます。
ラボのパート 1。ETL パイプラインをゼロから作成する
はじめに
このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。
データセットとユースケースのレビュー
このクエストの各ラボの入力データは、共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれるその他のデータに類似すると想定されています。この最初のラボではデータがバッチソースとして扱われ、後のラボではデータがストリーミング ソースとして扱われます。タスクは、後のデータ分析のために、データを読み取って解析してからサーバーレス データ ウェアハウスである BigQuery に書き込むことです。
適切なラボを開く
- IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
pom.xml ファイルを修正する
実際のパイプライン コードの編集を開始する前に、必要な依存関係を追加する必要があります。
-
1_Basic_ETL/labs
にあるpom.xml
ファイルの dependency タグ内に、次の依存関係を追加します。
-
<beam.version>
タグは、インストールする Beam のバージョンを指定するためにすでにpom.xml
に追加されています。ファイルを保存します。 -
最後に、パイプラインで使用するためにこれらの依存関係をダウンロードします。
最初のパイプラインを作成する
1 時間
タスク 1. 合成データを生成する
- シェルで次のコマンドを実行し、合成ウェブサーバー ログを生成するスクリプトが含まれているリポジトリのクローンを作成します。
このスクリプトにより、次のような行が含まれた events.json
というファイルが作成されます。
次に、このファイルは自動的に gs://<YOUR-PROJECT-ID>/events.json
の Google Cloud Storage バケットにコピーされます。
-
Google Cloud Storage に移動し、ストレージ バケットに
events.json
というファイルが含まれていることを確認します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. ソースからデータを読み取る
このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。
- IDE で
MyPipeline.java
を開きます。これは1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline
にあります。次のパッケージがインポートされることを確認します。
-
run()
メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。
Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の PCollection
を作成にするには、ルート変換をパイプライン オブジェクトに適用します。ルート変換により、指定した外部データソースまたは一部のローカルデータから PCollection
が作成されます。
Beam SDK のルート変換には、Read と Create の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、PCollection
をメモリ内の java.util.Collection
から作成します。これは特にテストで有用です。
次のサンプルコードは、TextIO.Read
ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は Pipeline
オブジェクト p に適用され、パイプライン データを PCollection<String>
の形式で返します。「ReadLines」は変換に付けられた名前です。後で、より大規模なパイプラインに取り組む際に役立つでしょう。
-
run()
メソッド内で、「input」という文字列定数を作成し、値をgs://<YOUR-PROJECT-ID>/events.json
に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。 -
TextIO.read() 変換の呼び出しを通じて、
events.json
の全イベントの文字列で構成されるPCollection
を作成します。 -
MyPipeline.java
の先頭に適切な import ステートメントを追加します。この例の場合はimport org.apache.beam.sdk.values.PCollection;
です。
タスク 3. パイプラインを実行し、動作することを確認する
- ターミナルに戻り、
$BASE_DIR
フォルダに移動してmvn compile exec:java
コマンドを実行します。
この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。しかし、この実行によって有用なワークフローが示されます。より高コストな計算を行う前に、ローカルマシン上で DirectRunner を使用して、パイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runner
を DataflowRunner に変更できます。
タスク 4. 変換を追加する
ヒントが必要な場合は、ソリューションを参照してください。
変換とは、データを変更する処理です。Apache Beam では、変換は PTransform クラスが行います。変換オペレーションは実行時に、独立した多数のワーカーで実行されます。あらゆる PTransform
の入力と出力は PCollection
です。お気づきでないかもしれませんが、実際、Google Cloud Storage からデータを読み取った際に PTransform
をすでに使用しています。それにより、変数に割り当てたかどうかに関係なく、文字列の PCollection
が作成されました。
Beam では PCollection
に対して汎用 apply メソッドを使用するので、変換を順次に連結できます。たとえば次のように、変換を連結して順次パイプラインを作成できます。
このタスクでは、新しい種類の変換である ParDo を使用します。ParDo
は汎用並列処理のための Beam 変換です。ParDo
処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける「Map」フェーズに似ています。ParDo
変換は入力 PCollection
の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、0 個、1 個、または複数個の要素を出力 PCollection
に生成します。
ParDo
は、次のような各種の一般的なデータ処理オペレーションで役に立ちます。
- データセットをフィルタリング。
ParDo
を使用して、PCollection
の各要素を検討し、その要素を新しいコレクションに出力するか、または廃棄できます。 - データセット内の各要素をフォーマットまたは型変換。目的の型やフォーマットとは異なる要素が入力
PCollection
に含まれている場合は、ParDo
を使用して各要素を変換し、結果を新しいPCollection
に出力できます。 - データセット内の各要素の部分を抽出。たとえば、複数のフィールドで構成されるレコードの
PCollection
がある場合、ParDo
を使用して必要なフィールドだけを新しいPCollection
に取り出すことができます。 - データセット内の各要素の計算を実行。
ParDo
を使用して、PCollection
のすべての要素または特定の要素に対して単純な計算や複雑な計算を行い、結果を新しいPCollection
として出力できます。
- このタスクを完了するには、単一イベントを表す JSON 文字列を読み取って、Gson を使用して解析し、Gson によって返されるカスタム オブジェクトを出力する
ParDo
変換を作成します。
ParDo
関数は、インラインで実装するか、静的クラスとして実装できます。インライン ParDo
関数は次のように記述します。
または、DoFn を拡張する静的クラスとして実装することもできます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。
続いてパイプライン内は次のようになります。
-
Gson
を使用するには、MyPipeline
内に内部クラスを作成する必要があります。Beam スキーマを利用するために @DefaultSchema アノテーションを追加します。詳しくは後で説明します。Gson
の使用例は次のとおりです。
-
内部クラスに
CommmonLog
という名前を付けます。適切な状態変数を使用してこの内部クラスを作成する方法については、上記の JSON サンプルをご覧ください。このクラスには、受信 JSON のすべてのキーに対して 1 つの状態変数が必要です。また、その変数では、タイプおよび名前を値およびキーと一致させる必要があります。 -
ここでは「timestamp」に
String
、「INTEGER」にLong
(BigQuery の INTEGER は INT64)、「FLOAT」にはDouble
(BigQuery の FLOAT は FLOAT64)を使用し、次の BigQuery スキーマに一致させます。
ヒントが必要な場合は、ソリューションを参照してください。
タスク 5. シンクに書き込む
この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の CommonLog
を生成します。次のステップでは、これらの CommonLog
オブジェクトを BigQuery テーブルに書き込みます。
必要に応じて BigQuery テーブルを作成するようにパイプラインに指示できますが、事前にデータセットを作成する必要があります。これは generate_batch_events.sh
スクリプトによってすでに行われています。
データセットは次の方法で調べることができます。
パイプラインの最終的な PCollection
を出力するには、Write 変換をその PCollection
に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection
の要素を出力できます。Write を使用するとパイプライン内の任意の時点で PCollection
を出力できますが、通常はパイプラインの最後にデータを書き出します。
次のサンプルコードは、TextIO.Write
変換を適用して文字列の PCollection
をテキスト ファイルに書き込む方法を示しています。
- ここでは
TextIO.write()
の代わりに BigQueryIO.write() を使用します。
この関数には、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつかの項目を指定する必要があります。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期イテレーションで有用)、テーブルが存在しない場合の作成のいずれかを指定できます。デフォルトの場合、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません。
Beam スキーマを SDK に追加した後に、.useBeamSchema()
を使用し、入力タイプをマークすることで、渡されたオブジェクトからテーブル スキーマを推定するよう変換に対して指示できます。または、.withSchema()
を使用してスキーマを明示的に指定することもできますが、その場合は BigQuery TableSchema オブジェクトを作成して渡す必要があります。CommonLog
クラスに @DefaultSchema(JavaFieldSchema.class)
でアノテーションを付けているため、各変換は、BigQueryIO.write()
など、オブジェクトのフィールドの名前とタイプを認識します。
-
BigQueryIO の「Writing」セクションで、さまざまな他の方法を調べることができます。ここでは、
CommonLog
オブジェクトにアノテーションを付けているため、次のように.useBeamSchema()
を使用し、<YOUR-PROJECT-ID>:logs.logs
テーブルをターゲットに設定します。
Beam スキーマで利用できるすべてのタイプは、Schema.FieldType のドキュメントで確認できます。標準 SQL で使用できるすべての BigQuery データ型については、setType のドキュメントで確認できます。興味がある方は、Beam スキーマから BigQuery への変換についてもご確認ください。
タスク 6. パイプラインを実行する
ターミナルに戻って RUNNER
環境変数の値を DataflowRunner
に変更し、前述と同じコマンドを使用してパイプラインを実行します。実行開始後は、Dataflow のプロダクト ページに移動して、パイプラインの配置に注目してください。変換に名前を付けている場合は、その名前が表示されます。それぞれの変換名をクリックすると、毎秒処理されている要素の数がリアルタイムで表示されます。
全体の形は Read 変換で始まり Write 変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況を観察できます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。
- パイプラインが終了したら、BigQuery のブラウザ ウィンドウに戻り、テーブルに対してクエリを実行します。
コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボのパート 2. 基本的な ETL をパラメータ化する
約 20 分
データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。
- 開発環境を作成し、パイプラインを開発しました。環境には Apache Beam SDK やその他の依存関係を追加しました。
- 開発環境からパイプラインを実行しました。Apache Beam SDK は、Cloud Storage 内のファイルをステージングし、ジョブ リクエスト ファイルを作成して Cloud Dataflow サービスに送信しました。
開発環境を設定(技術者以外のユーザーには不可能でしょう)する必要がなく、ジョブを API 呼び出しで開始する方法があれば、はるかに好ましいでしょう。その方法でパイプラインを実行することもできます。
Dataflow テンプレートでは、パイプラインのコンパイル時に作成される表現を変更し、パラメータ化を可能にすることで、上記の問題を解決しようとしています。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。
- デベロッパーが開発環境を作成し、パイプラインを開発します。環境には、Apache Beam SDK やその他の依存関係が含まれます。
- デベロッパーがパイプラインを実行し、テンプレートを作成します。Apache Beam SDK が Cloud Storage 内のファイルをステージングし、(ジョブ リクエストに似た)テンプレート ファイルを作成して Cloud Storage 内に保存します。
- デベロッパー以外のユーザーや Airflow のような他のワークフロー ツールは、Google Cloud コンソール、gcloud コマンドライン ツール、または REST API を使用して簡単にジョブを実行し、テンプレート ファイル実行リクエストを Cloud Dataflow サービスに送信できます。
このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。
タスク 1. JSON スキーマ ファイルを作成する
.usedBeamSchema()
を使用したため、TableSchema
オブジェクトを BigQueryIO.writeTableRows()
変換に渡す必要がありませんでしたが、Dataflow テンプレートには、この例のスキーマを表す JSON ファイルを渡す必要があります。
- ターミナルを起動し、メイン ディレクトリに戻ります。次のコマンドを実行し、既存の
logs.logs
テーブルからスキーマを取得します。
- この出力をファイルに取得し、GCS にアップロードします。追加の sed コマンドは、Dataflow が想定する完全な JSON オブジェクトを構築するためのものです。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. JavaScript ユーザー定義関数を記述する
Cloud Storage to BigQuery Dataflow テンプレートには、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。
このタスクを完了するには、IDE を使用し、以下の内容で .js
ファイルを作成してから、それを Google Cloud Storage にコピーします。
- 以下の関数を 1_Basic_ETL/ フォルダの
transform.js
ファイルにコピーします。
- 次のコマンドを実行して、ファイルを Google Cloud Storage にコピーします。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 3. Dataflow テンプレートを実行する
- Cloud Dataflow ウェブ UI に移動します。
- [テンプレートからジョブを作成] をクリックします。
- Cloud Dataflow ジョブの名前を入力します。
- [Cloud Dataflow テンプレート] で、[Process Data in Bulk (batch)] セクションの [Text Files on Cloud Storage to BigQuery] テンプレートを選択します([ストリーミング] セクションではない点に注意)。
- [Cloud Storage 内の JavaScript UDF パス] で、
.js
へのパスをgs://<YOUR-PROJECT-ID>/transform.js
の形式で入力します。 - [JSON パス] で、
schema.json
ファイルへのパスをgs://<YOUR-PROJECT-ID>/schema.json
の形式で入力します。 - [JavaScript UDF 名] に「
transform
」と入力します。 - [BigQuery 出力テーブル] に「
<myprojectid>:logs.logs
」と入力します。 - [Cloud Storage 入力パス] に、
events.json
へのパスをgs://<YOUR-PROJECT-ID>/events.json
の形式で入力します。 - [BigQuery 一時ディレクトリ] に、この同じバケット内の新しいフォルダを入力します。このフォルダはジョブによって作成されます。
- [一時的な場所] で、この同じバケット内の 2 つ目の新しいフォルダを入力します。
- [暗号化] は [Google が管理する鍵] のままにします。
- [ジョブを実行] ボタンをクリックします。
実行中のジョブは Dataflow ウェブ UI 内で確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 4. Dataflow テンプレート コードを確認する
-
先ほど使用した Dataflow テンプレートのコードを思い出しましょう。
-
main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。
-
PipelineOptions
オブジェクトを使用してPipeline
オブジェクトを作成するところから始まります。 -
TextIO.read() 変換で始まる
PTransform
のチェーンとして構成されています。 - read 変換の後の PTransform は少し異なっていて、たとえば、ソース形式が BigQuery テーブル形式とうまく合っていない場合に、JavaScript を使用して入力文字列を変換できます。この機能の使用方法については、こちらのページのドキュメントをご覧ください。
- JavaScript UDF の後の PTransform は、ライブラリ関数を使用して JSON をテーブルの行に変換します。コードはこちらで確認できます。
-
write PTransform は、グラフのコンパイル時に認識しているスキーマを使用するのではなく、実行時にのみ認識するパラメータを受け取るコードであるため、少し異なる見た目になっています。これは NestedValueProvider クラスによって実現されます。また、前の手順で
.useBeamSchema()
を使用して Beam スキーマから推定したものではなく、明示的に設定されたスキーマを使用します。
- 次のラボもチェックしてください。
PTransform
のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。