
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
このラボでは、次の方法について学びます。
前提条件:
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。729328892908
)をコピーします。{project-number}
はプロジェクト番号に置き換えてください。このラボでは、すべてのコマンドをノートブックのターミナルで実行します。
Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [Workbench] をクリックします。
[Notebooks API を有効にする] をクリックします。
[Workbench] ページで [ユーザー管理のノートブック] を選択し、[新規作成] をクリックします。
表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを
[環境] で [Apache Beam] を選択します。
ダイアログ ボックスの下部にある [作成] をクリックします。
このラボで使用するコード リポジトリをダウンロードします。
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
クローン リポジトリ /training-data-analyst/quests/dataflow_python/
に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab
サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
サブフォルダとに分けられています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
5 分程度
Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。
Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルで、エンドユーザーは Java、Python、Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、小規模データセットであればローカルの開発マシンで、大規模には Cloud Dataflow で実行できます。ただし、オープンソースである Apache Beam では、他のランナーも使用できます。Apache Flink や Apache Spark などで Beam パイプラインを実行できます。
このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。
このクエストの各ラボでは、入力データは共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれている可能性のあるその他のデータに似ていると想定されています。この最初のラボでは、データはバッチソースとして扱われます。後のラボでは、データはストリーミング ソースとして扱われます。タスクは、データを読み取り、解析し、後のデータ分析のためにサーバーレス データ ウェアハウスである BigQuery に書き込むことです。
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
1 時間
このスクリプトによって events.json
というファイルが作成され、次のような行が含まれます。
このファイルは
events.json
というファイルが含まれていることを確認します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。
1_Basic_ETL/lab
に移動し、my_pipeline.py をクリックします。編集パネルにそのファイルが開きます。次のパッケージがインポートされることを確認します。run()
メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の PCollection
を作成にするには、ルート変換をパイプライン オブジェクトに適用する必要があります。ルート変換により、指定した外部データソースまたは一部のローカルデータから PCollection
が作成されます。
Beam SDK のルート変換には、Read と Create の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、メモリ内の list
から PCollection
を作成し、特にテストで有用です。
次のサンプルコードは、ReadFromText
ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は、Pipeline
オブジェクトである p
に適用され、PCollection[str]
形式(パラメータ化された型ヒントに由来する表記法を使用しています)のパイプライン データセットを返します。ReadLines はこの変換に付けられた名前で、後により大規模なパイプラインに取り組む際に役立つでしょう。
run()
メソッド内で、input という文字列定数を作成し、値を gs://<プロジェクト ID>/events.json
に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。
textio.ReadFromText
変換を呼び出す方法により、events.json
のイベントすべての文字列の PCollection
を作成します。
my_pipeline.py
の先頭に適切なインポート ステートメントを追加します。
作業を保存するには、上部のナビゲーション メニューで [ファイル] をクリックし、[保存] を選択します。
$BASE_DIR
フォルダに戻って次のコマンドを実行します。パイプラインを実行する前に、PROJECT_ID
環境変数を設定してください。この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。
しかし、この実行により有用なワークフローが示されます。DirectRunner を使用してローカルマシンでパイプラインを実行することで、より高価な計算を行う前にパイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runner
を DataflowRunner に変更できます。
ヒントが必要な場合は、ソリューションを参照してください。
変換とは、データに変更をもたらすものです。Apache Beam では、変換は PTransform クラスが行います。実行時に、これらのオペレーションは独立した多くのワーカーで実行されます。
あらゆる PTransform
の入力と出力は PCollection
です。実際、お気づきでないかもしれませんが、Google Cloud Storage からデータを読み取った際に PTransform
をすでに使用しています。変数に割り当てたかどうかに関係なく、そのときに文字列の PCollection
が作成されました。
Beam の Python バージョンでは、パイプ演算子 |
で表される汎用 apply メソッドを PCollection
に対して使用するため、変換の連続的な連鎖を行えます。たとえば、次のように変換の連鎖による連続的なパイプラインを作成できます。
このタスクでは、新しい種類の変換である ParDo を使用します。ParDo
は汎用並列処理のための Beam 変換です。
ParDo
処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける Map フェーズに似ています。ParDo
変換は入力 PCollection
の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、出力 PCollection
に 0 個、1 個、または複数個の要素を出力します。
ParDo
はさまざまな一般的データ処理オペレーションに役立ちます。ただし、Python にはプロセスを簡単にする、次のような特別な PTransform
があります。
Filter
を使用して、PCollection
の各要素を検討し、Python の呼び出し可能オブジェクトが返す出力ブール値に応じて、その要素を新しい PCollection
に出力するか廃棄するかできます。PCollection
に含まれている場合は、Map
を使用して各要素を変換し、結果を新しい PCollection
に出力できます。PCollection
がある場合、同様に Map
または FlatMap
を使用して、必要なフィールドだけを新しい PCollection
に取り出すことができます。ParDo
、Map
、FlatMap
を使用して、PCollection
のすべての要素または特定の要素に対して単純な計算または複雑な計算を行い、結果を新しい PCollection
として出力できます。このタスクを完了するには、単一イベントを表す JSON 文字列を読み取って、Python の json
パッケージを使用して解析し、json.loads
が返す辞書を出力する、Map
変換を作成する必要があります。
Map
関数の実装方法には、インラインで行う方法と事前定義された呼び出し可能オブジェクトを介して行う方法の 2 つがあります。インライン Map
関数は次のように記述します。
また、スクリプトの以前の箇所で定義されている Python の呼び出し可能オブジェクトと一緒に、beam.Map
を使用できます。
beam.Map
(や他の軽量の DoFn
)によって実現される以上の柔軟性が必要な場合、DoFn のサブクラスであるカスタム DoFn
を使用して ParDo
を実装できます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。
ヒントが必要な場合は、ソリューションを参照してください。
この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の Python 辞書を出力します。次のステップでは、これらのオブジェクトを BigQuery テーブルに書き込みます。
generate_batch_events.sh
スクリプトによりすでに行われています。次のコードを使用して、データセットを調べることができます。パイプラインの最終 PCollection
を出力するには、Write 変換をその PCollection
に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection
の要素を出力できます。Write を使用してパイプラインでいつでも PCollection
を出力できますが、通常はパイプラインの最後にデータを書き出します。
次のサンプルコードは、WriteToText
変換を適用して文字列の PCollection
をテキスト ファイルに書き込む方法を示しています。
WriteToText
の代わりに WriteToBigQuery
を使用します。この関数は、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつか指定が必要です。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期のイテレーションで有用)、テーブルが存在しない場合に作成、のいずれかを指定できます。デフォルトで、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません。
str
型)、ID(int
型)、残高(float
型)があるとします。この場合、スキーマを単一の行として指定できます。または、JSON として指定します。
最初のケース(単一の文字列)では、フィールドはすべて NULLABLE
とみなされます。JSON 方式の場合は、モードを指定できます。
WRITE_TRUNCATE
は毎回テーブルを削除して再作成します。この方法はパイプラインの初期のイテレーション、特にスキーマでのイテレーションの場合に有用です。しかし、予期しない問題が本番環境で簡単に引き起こされる可能性があります。WRITE_APPEND
または WRITE_EMPTY
の方が安全です。テーブル スキーマの定義とパイプラインへの BigQuery シンクの追加を忘れないでください。ヒントが必要な場合は、ソリューションを参照してください。
DataflowRunner
を使用し、Cloud Dataflow でパイプラインを実行します。全体の形は Read
変換で始まり Write
変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況をモニタリングできます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。
runner
の設定を DirectRunner
に戻してローカルで実行し、迅速にフィードバックを受け取ることができます。今回のケースはデータセットが小規模で、DirectRunner
がサポートしている機能のみを使用しているため、この手法がうまく行きます。コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
約 20 分
データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。
ジョブを API 呼び出しで、または開発環境の設定(技術者以外のユーザーには不可能でしょう)を必要とせずに、開始する方法があるとはるかに好ましいでしょう。この方法でパイプラインを実行することもできます。
Dataflow テンプレートはこの問題の解決を、パイプラインのコンパイル時に作成される表現を変更してパラメータ化を可能にすることにより追求します。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。
このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。
以前と同様に、この例のスキーマを表す JSON ファイルを Dataflow テンプレートに渡す必要があります。
logs.logs
テーブルからスキーマを取得します。sed
コマンドは、Dataflow が期待する完全な JSON オブジェクトを構築するためです。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Cloud Storage to BigQuery Dataflow テンプレートは、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。
このタスクを完了するには、IDE のファイル エクスプローラで、dataflow_python フォルダに新しいファイルを作成します。
新しいファイルを作成するには、[ファイル] >> [新規] >> [テキスト ファイル] とクリックします。
ファイル名を transform.js に変更します。ファイル名を変更するには、ファイル名を右クリックします。
編集パネルで transform.js ファイルを開きます。ファイルをクリックすると開きます。
以下の関数を transform.js ファイルにコピーし、保存します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
events.json
へのパスを schema.json
ファイルへのパスを .js
へのパスを transform
」と入力します。実行中のジョブは Dataflow ウェブ UI 内で確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
先ほど使用した Dataflow テンプレートのコードは、こちらの TextIOToBigQuery ガイドにあります。
main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。
PipelineOptions
オブジェクトを使用して Pipeline
オブジェクトを作成するところから始まります。PTransform
のチェーンとして構成されています。次回のラボもチェックしてください。PTransform
のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
One lab at a time
Confirm to end all existing labs and start this one