
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
このラボの内容:
前提条件:
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。
Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。
[続行] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。
gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
出力:
出力例:
出力:
出力例:
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。729328892908
)をコピーします。{project-number}
はプロジェクト番号に置き換えてください。このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
5 分程度
Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。
Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルであり、エンドユーザーが Java、Python、または Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、データセットが小規模な場合にはローカルの開発マシンで、大規模の場合には Cloud Dataflow で実行できます。ただし、Apache Beam はオープンソースであるため、ランナーは他にもあり、Apache Flink や Apache Spark などで Beam パイプラインを実行することもできます。
このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。
このクエストの各ラボの入力データは、共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれるその他のデータに類似すると想定されています。この最初のラボではデータがバッチソースとして扱われ、後のラボではデータがストリーミング ソースとして扱われます。タスクは、後のデータ分析のために、データを読み取って解析してからサーバーレス データ ウェアハウスである BigQuery に書き込むことです。
実際のパイプライン コードの編集を開始する前に、必要な依存関係を追加する必要があります。
1_Basic_ETL/labs
にある pom.xml
ファイルの dependency タグ内に、次の依存関係を追加します。<beam.version>
タグは、インストールする Beam のバージョンを指定するためにすでに pom.xml
に追加されています。ファイルを保存します。
最後に、パイプラインで使用するためにこれらの依存関係をダウンロードします。
1 時間
このスクリプトにより、次のような行が含まれた events.json
というファイルが作成されます。
次に、このファイルは自動的に gs://<YOUR-PROJECT-ID>/events.json
の Google Cloud Storage バケットにコピーされます。
events.json
というファイルが含まれていることを確認します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。
MyPipeline.java
を開きます。これは 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline
にあります。次のパッケージがインポートされることを確認します。run()
メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の PCollection
を作成にするには、ルート変換をパイプライン オブジェクトに適用します。ルート変換により、指定した外部データソースまたは一部のローカルデータから PCollection
が作成されます。
Beam SDK のルート変換には、Read と Create の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、PCollection
をメモリ内の java.util.Collection
から作成します。これは特にテストで有用です。
次のサンプルコードは、TextIO.Read
ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は Pipeline
オブジェクト p に適用され、パイプライン データを PCollection<String>
の形式で返します。「ReadLines」は変換に付けられた名前です。後で、より大規模なパイプラインに取り組む際に役立つでしょう。
run()
メソッド内で、「input」という文字列定数を作成し、値を gs://<YOUR-PROJECT-ID>/events.json
に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。
TextIO.read() 変換の呼び出しを通じて、events.json
の全イベントの文字列で構成される PCollection
を作成します。
MyPipeline.java
の先頭に適切な import ステートメントを追加します。この例の場合は import org.apache.beam.sdk.values.PCollection;
です。
$BASE_DIR
フォルダに移動して mvn compile exec:java
コマンドを実行します。この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。しかし、この実行によって有用なワークフローが示されます。より高コストな計算を行う前に、ローカルマシン上で DirectRunner を使用して、パイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runner
を DataflowRunner に変更できます。
ヒントが必要な場合は、ソリューションを参照してください。
変換とは、データを変更する処理です。Apache Beam では、変換は PTransform クラスが行います。変換オペレーションは実行時に、独立した多数のワーカーで実行されます。あらゆる PTransform
の入力と出力は PCollection
です。お気づきでないかもしれませんが、実際、Google Cloud Storage からデータを読み取った際に PTransform
をすでに使用しています。それにより、変数に割り当てたかどうかに関係なく、文字列の PCollection
が作成されました。
Beam では PCollection
に対して汎用 apply メソッドを使用するので、変換を順次に連結できます。たとえば次のように、変換を連結して順次パイプラインを作成できます。
このタスクでは、新しい種類の変換である ParDo を使用します。ParDo
は汎用並列処理のための Beam 変換です。ParDo
処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける「Map」フェーズに似ています。ParDo
変換は入力 PCollection
の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、0 個、1 個、または複数個の要素を出力 PCollection
に生成します。
ParDo
は、次のような各種の一般的なデータ処理オペレーションで役に立ちます。
ParDo
を使用して、PCollection
の各要素を検討し、その要素を新しいコレクションに出力するか、または廃棄できます。PCollection
に含まれている場合は、ParDo
を使用して各要素を変換し、結果を新しい PCollection
に出力できます。PCollection
がある場合、ParDo
を使用して必要なフィールドだけを新しい PCollection
に取り出すことができます。ParDo
を使用して、PCollection
のすべての要素または特定の要素に対して単純な計算や複雑な計算を行い、結果を新しい PCollection
として出力できます。ParDo
変換を作成します。ParDo
関数は、インラインで実装するか、静的クラスとして実装できます。インライン ParDo
関数は次のように記述します。
または、DoFn を拡張する静的クラスとして実装することもできます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。
続いてパイプライン内は次のようになります。
Gson
を使用するには、MyPipeline
内に内部クラスを作成する必要があります。Beam スキーマを利用するために @DefaultSchema アノテーションを追加します。詳しくは後で説明します。Gson
の使用例は次のとおりです。内部クラスに CommmonLog
という名前を付けます。適切な状態変数を使用してこの内部クラスを作成する方法については、上記の JSON サンプルをご覧ください。このクラスには、受信 JSON のすべてのキーに対して 1 つの状態変数が必要です。また、その変数では、タイプおよび名前を値およびキーと一致させる必要があります。
ここでは「timestamp」に String
、「INTEGER」に Long
(BigQuery の INTEGER は INT64)、「FLOAT」には Double
(BigQuery の FLOAT は FLOAT64)を使用し、次の BigQuery スキーマに一致させます。
ヒントが必要な場合は、ソリューションを参照してください。
この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の CommonLog
を生成します。次のステップでは、これらの CommonLog
オブジェクトを BigQuery テーブルに書き込みます。
必要に応じて BigQuery テーブルを作成するようにパイプラインに指示できますが、事前にデータセットを作成する必要があります。これは generate_batch_events.sh
スクリプトによってすでに行われています。
データセットは次の方法で調べることができます。
パイプラインの最終的な PCollection
を出力するには、Write 変換をその PCollection
に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection
の要素を出力できます。Write を使用するとパイプライン内の任意の時点で PCollection
を出力できますが、通常はパイプラインの最後にデータを書き出します。
次のサンプルコードは、TextIO.Write
変換を適用して文字列の PCollection
をテキスト ファイルに書き込む方法を示しています。
TextIO.write()
の代わりに BigQueryIO.write() を使用します。この関数には、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつかの項目を指定する必要があります。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期イテレーションで有用)、テーブルが存在しない場合の作成のいずれかを指定できます。デフォルトの場合、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません。
Beam スキーマを SDK に追加した後に、.useBeamSchema()
を使用し、入力タイプをマークすることで、渡されたオブジェクトからテーブル スキーマを推定するよう変換に対して指示できます。または、.withSchema()
を使用してスキーマを明示的に指定することもできますが、その場合は BigQuery TableSchema オブジェクトを作成して渡す必要があります。CommonLog
クラスに @DefaultSchema(JavaFieldSchema.class)
でアノテーションを付けているため、各変換は、BigQueryIO.write()
など、オブジェクトのフィールドの名前とタイプを認識します。
CommonLog
オブジェクトにアノテーションを付けているため、次のように .useBeamSchema()
を使用し、<YOUR-PROJECT-ID>:logs.logs
テーブルをターゲットに設定します。Beam スキーマで利用できるすべてのタイプは、Schema.FieldType のドキュメントで確認できます。標準 SQL で使用できるすべての BigQuery データ型については、setType のドキュメントで確認できます。興味がある方は、Beam スキーマから BigQuery への変換についてもご確認ください。
ターミナルに戻って RUNNER
環境変数の値を DataflowRunner
に変更し、前述と同じコマンドを使用してパイプラインを実行します。実行開始後は、Dataflow のプロダクト ページに移動して、パイプラインの配置に注目してください。変換に名前を付けている場合は、その名前が表示されます。それぞれの変換名をクリックすると、毎秒処理されている要素の数がリアルタイムで表示されます。
全体の形は Read 変換で始まり Write 変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況を観察できます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。
コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
約 20 分
データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。
開発環境を設定(技術者以外のユーザーには不可能でしょう)する必要がなく、ジョブを API 呼び出しで開始する方法があれば、はるかに好ましいでしょう。その方法でパイプラインを実行することもできます。
Dataflow テンプレートでは、パイプラインのコンパイル時に作成される表現を変更し、パラメータ化を可能にすることで、上記の問題を解決しようとしています。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。
このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。
.usedBeamSchema()
を使用したため、TableSchema
オブジェクトを BigQueryIO.writeTableRows()
変換に渡す必要がありませんでしたが、Dataflow テンプレートには、この例のスキーマを表す JSON ファイルを渡す必要があります。
logs.logs
テーブルからスキーマを取得します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Cloud Storage to BigQuery Dataflow テンプレートには、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。
このタスクを完了するには、IDE を使用し、以下の内容で .js
ファイルを作成してから、それを Google Cloud Storage にコピーします。
transform.js
ファイルにコピーします。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
.js
へのパスを gs://<YOUR-PROJECT-ID>/transform.js
の形式で入力します。schema.json
ファイルへのパスを gs://<YOUR-PROJECT-ID>/schema.json
の形式で入力します。transform
」と入力します。<myprojectid>:logs.logs
」と入力します。events.json
へのパスを gs://<YOUR-PROJECT-ID>/events.json
の形式で入力します。実行中のジョブは Dataflow ウェブ UI 内で確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
先ほど使用した Dataflow テンプレートのコードを思い出しましょう。
main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。
PipelineOptions
オブジェクトを使用して Pipeline
オブジェクトを作成するところから始まります。PTransform
のチェーンとして構成されています。.useBeamSchema()
を使用して Beam スキーマから推定したものではなく、明示的に設定されたスキーマを使用します。PTransform
のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
One lab at a time
Confirm to end all existing labs and start this one