チェックポイント
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
Dataflow を使用したサーバーレスのデータ処理 - Apache Beam と Cloud Dataflow を使用して ETL パイプラインを作成する(Python)
概要
このラボでは、次の方法について学びます。
- Google Cloud Storage から元データを取得して Google BigQuery に書き込む、抽出、変換、読み込みバッチ パイプラインを Apache Beam で構築する。
- Apache Beam パイプラインを Cloud Dataflow で実行する。
- パイプラインの実行をパラメータ化する。
前提条件:
- Python の基本的な知識。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例:
1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。 -
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。 -
利用規約に同意し、再設定用のリソースページをスキップします。
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
-
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
-
Compute Engine のデフォルトのサービス アカウント
{project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。- Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
- プロジェクト番号(例:
729328892908
)をコピーします。 - ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
- ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
- [新しいプリンシパル] に次のように入力します。
-
{project-number}
はプロジェクト番号に置き換えてください。 - [ロール] で、[Project](または [基本])> [編集者] を選択します。
- [保存] をクリックします。
Jupyter ノートブック ベースの開発環境の設定
このラボでは、すべてのコマンドをノートブックのターミナルで実行します。
-
Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [ワークベンチ] をクリックします。
-
Notebooks API を有効にします。
-
ワークベンチのページで [新規作成] をクリックします。
-
表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを
に、ゾーンを に設定します。 -
[環境] で [Apache Beam] を選択します。
-
ダイアログ ボックスの下部にある [作成] をクリックします。
- 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。
- 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。
コード リポジトリをダウンロードする
このラボで使用するコード リポジトリをダウンロードします。
- 開いたターミナルで、次のコマンドを入力します。
-
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
-
クローン リポジトリ
/training-data-analyst/quests/dataflow_python/
に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納されるlab
サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できるsolution
サブフォルダとに分けられています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Apache Beam と Cloud Dataflow
5 分程度
Cloud Dataflow は、バッチとストリーミングの Apache Beam データ処理パイプラインを実行するためのフルマネージドの Google Cloud Platform サービスです。
Apache Beam は移植性を備えたオープンソースの高度な統合型データ処理プログラミング モデルで、エンドユーザーは Java、Python、Go を使用して、バッチとストリーミングの両方のデータ並列処理パイプラインを定義できます。Apache Beam パイプラインは、小規模データセットであればローカルの開発マシンで、大規模には Cloud Dataflow で実行できます。ただし、オープンソースである Apache Beam では、他のランナーも使用できます。Apache Flink や Apache Spark などで Beam パイプラインを実行できます。
ラボパート 1. ETL パイプラインをゼロから作成する
はじめに
このセクションでは、Apache Beam の抽出、変換、読み込み(ETL)パイプラインをゼロから作成します。
データセットとユースケースのレビュー
このクエストの各ラボでは、入力データは共通ログ形式のウェブサーバー ログ、およびウェブサーバーに含まれている可能性のあるその他のデータに似ていると想定されています。この最初のラボでは、データはバッチソースとして扱われます。後のラボでは、データはストリーミング ソースとして扱われます。タスクは、データを読み取り、解析し、後のデータ分析のためにサーバーレス データ ウェアハウスである BigQuery に書き込むことです。
該当するラボを開く
- IDE のターミナルに戻り、次のコマンドをコピーして貼り付けます。
仮想環境と依存関係を設定する
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
- ターミナルで、このラボでの作業用に仮想環境を作成します。
- 次に、パイプラインの実行に必要なパッケージをインストールします。
- 最後に、Dataflow API が有効になっていることを確認します。
最初のパイプラインを作成する
1 時間
タスク 1. 合成データを生成する
- ターミナルで次のコマンドを実行し、合成ウェブサーバー ログを生成するスクリプトが含まれているリポジトリのクローンを作成します。
このスクリプトによって events.json
というファイルが作成され、次のような行が含まれます。
このファイルは
- 別のブラウザタブで、Google Cloud Storage に移動し、ストレージ バケットに
events.json
というファイルが含まれていることを確認します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. ソースからデータを読み取る
このセクションや後のセクションでヒントが必要な場合は、ソリューションを参照してください。
- ファイル エクスプローラで、ラボフォルダである
1_Basic_ETL/lab
に移動し、my_pipeline.py をクリックします。編集パネルにそのファイルが開きます。次のパッケージがインポートされることを確認します。
-
run()
メソッドまで下方向にスクロールします。このメソッドに現在含まれているパイプラインは何も行いません。PipelineOptions オブジェクトを使用して Pipeline オブジェクトを作成する方法と、メソッドの最終行がパイプラインを実行する点に注意してください。
-
Apache Beam パイプラインのデータはすべて PCollection 内に存在します。パイプラインの最初の
PCollection
を作成にするには、ルート変換をパイプライン オブジェクトに適用する必要があります。ルート変換により、指定した外部データソースまたは一部のローカルデータからPCollection
が作成されます。 -
Beam SDK のルート変換には、Read と Create の 2 種類があります。Read 変換は、外部ソースであるテキスト ファイルやデータベース テーブルなどからデータを読み取ります。Create 変換は、メモリ内の
list
からPCollection
を作成し、特にテストで有用です。
次のサンプルコードは、ReadFromText
ルート変換を適用してテキスト ファイルからデータを読み取る方法を示しています。この変換は、Pipeline
オブジェクトである p
に適用され、PCollection[str]
形式(パラメータ化された型ヒントに由来する表記法を使用しています)のパイプライン データセットを返します。ReadLines はこの変換に付けられた名前で、後により大規模なパイプラインに取り組む際に役立つでしょう。
-
run()
メソッド内で、input という文字列定数を作成し、値をgs://<プロジェクト ID>/events.json
に設定します。後のラボで、この情報を渡すコマンドライン パラメータを使用します。 -
textio.ReadFromText
変換を呼び出す方法により、events.json
のイベントすべての文字列のPCollection
を作成します。 -
my_pipeline.py
の先頭に適切なインポート ステートメントを追加します。 -
作業を保存するには、上部のナビゲーション メニューで [ファイル] をクリックし、[保存] を選択します。
タスク 3. パイプラインを実行し、動作することを確認する
- ターミナルに戻り、
$BASE_DIR
フォルダに戻って次のコマンドを実行します。パイプラインを実行する前に、PROJECT_ID
環境変数を設定してください。
この時点では、パイプラインは実際には何も行いません。データを読み取るだけです。
しかし、この実行により有用なワークフローが示されます。DirectRunner を使用してローカルマシンでパイプラインを実行することで、より高価な計算を行う前にパイプラインをローカルで低コストに確認できます。Google Cloud Dataflow を使用してパイプラインを実行するために、runner
を DataflowRunner に変更できます。
タスク 4. 変換を追加する
ヒントが必要な場合は、ソリューションを参照してください。
変換とは、データに変更をもたらすものです。Apache Beam では、変換は PTransform クラスが行います。実行時に、これらのオペレーションは独立した多くのワーカーで実行されます。
あらゆる PTransform
の入力と出力は PCollection
です。実際、お気づきでないかもしれませんが、Google Cloud Storage からデータを読み取った際に PTransform
をすでに使用しています。変数に割り当てたかどうかに関係なく、そのときに文字列の PCollection
が作成されました。
Beam の Python バージョンでは、パイプ演算子 |
で表される汎用 apply メソッドを PCollection
に対して使用するため、変換の連続的な連鎖を行えます。たとえば、次のように変換の連鎖による連続的なパイプラインを作成できます。
このタスクでは、新しい種類の変換である ParDo を使用します。ParDo
は汎用並列処理のための Beam 変換です。
ParDo
処理パラダイムは、Map/Shuffle/Reduce スタイルのアルゴリズムにおける Map フェーズに似ています。ParDo
変換は入力 PCollection
の各要素を検討し、その要素に対してなんらかの処理関数(ユーザーコード)を実行して、出力 PCollection
に 0 個、1 個、または複数個の要素を出力します。
ParDo
はさまざまな一般的データ処理オペレーションに役立ちます。ただし、Python にはプロセスを簡単にする、次のような特別な PTransform
があります。
-
データセットをフィルタリングする。
Filter
を使用して、PCollection
の各要素を検討し、Python の呼び出し可能オブジェクトが返す出力ブール値に応じて、その要素を新しいPCollection
に出力するか廃棄するかできます。 -
データセット内の各要素を整形または型変換する。目的の型や形式とは異なる要素が入力
PCollection
に含まれている場合は、Map
を使用して各要素を変換し、結果を新しいPCollection
に出力できます。 -
データセット内の各要素の部分を抽出する。たとえば、複数のフィールドで構成されるレコードの
PCollection
がある場合、同様にMap
またはFlatMap
を使用して、必要なフィールドだけを新しいPCollection
に取り出すことができます。 -
データセット内の各要素に対して計算を実行する。
ParDo
、Map
、FlatMap
を使用して、PCollection
のすべての要素または特定の要素に対して単純な計算または複雑な計算を行い、結果を新しいPCollection
として出力できます。
このタスクを完了するには、単一イベントを表す JSON 文字列を読み取って、Python の json
パッケージを使用して解析し、json.loads
が返す辞書を出力する、Map
変換を作成する必要があります。
Map
関数の実装方法には、インラインで行う方法と事前定義された呼び出し可能オブジェクトを介して行う方法の 2 つがあります。インライン Map
関数は次のように記述します。
また、スクリプトの以前の箇所で定義されている Python の呼び出し可能オブジェクトと一緒に、beam.Map
を使用できます。
beam.Map
(や他の軽量の DoFn
)によって実現される以上の柔軟性が必要な場合、DoFn のサブクラスであるカスタム DoFn
を使用して ParDo
を実装できます。この方法により、テスト フレームワークとの統合をさらに簡単に行えます。
ヒントが必要な場合は、ソリューションを参照してください。
タスク 5. シンクに書き込む
この時点で、パイプラインは Google Cloud Storage からファイルを読み取り、各行を解析し、各要素の Python 辞書を出力します。次のステップでは、これらのオブジェクトを BigQuery テーブルに書き込みます。
- BigQuery テーブルを必要に応じて作成するようにパイプラインに指示できますが、事前にデータセットを作成する必要があります。これは
generate_batch_events.sh
スクリプトによりすでに行われています。次のコードを使用して、データセットを調べることができます。
パイプラインの最終 PCollection
を出力するには、Write 変換をその PCollection
に適用します。Write 変換は、データベース テーブルなどの外部データシンクに PCollection
の要素を出力できます。Write を使用してパイプラインでいつでも PCollection
を出力できますが、通常はパイプラインの最後にデータを書き出します。
次のサンプルコードは、WriteToText
変換を適用して文字列の PCollection
をテキスト ファイルに書き込む方法を示しています。
- ここでは、
WriteToText
の代わりにWriteToBigQuery
を使用します。
この関数は、書き込み先となる特定のテーブルや、そのテーブルのスキーマなど、いくつか指定が必要です。オプションで、既存テーブルへの追加、既存テーブルの再作成(パイプラインの初期のイテレーションで有用)、テーブルが存在しない場合に作成、のいずれかを指定できます。デフォルトで、この変換は存在しないテーブルを作成し、空でないテーブルには書き込みません。
- ただし、スキーマの指定が必要です。これには次の 2 つの方法があります。スキーマは単一の文字列または JSON 形式で指定できます。たとえば、辞書に 3 つのフィールド、すなわち名前(
str
型)、ID(int
型)、残高(float
型)があるとします。この場合、スキーマを単一の行として指定できます。
または、JSON として指定します。
最初のケース(単一の文字列)では、フィールドはすべて NULLABLE
とみなされます。JSON 方式の場合は、モードを指定できます。
- テーブル スキーマを定義したら、DAG にシンクを追加できます。
WRITE_TRUNCATE
は毎回テーブルを削除して再作成します。この方法はパイプラインの初期のイテレーション、特にスキーマでのイテレーションの場合に有用です。しかし、予期しない問題が本番環境で簡単に引き起こされる可能性があります。WRITE_APPEND
または WRITE_EMPTY
の方が安全です。テーブル スキーマの定義とパイプラインへの BigQuery シンクの追加を忘れないでください。ヒントが必要な場合は、ソリューションを参照してください。
タスク 6. パイプラインを実行する
- ターミナルに戻り、以前とほぼ同じコマンドを使用してパイプラインを実行します。ただし、今回は
DataflowRunner
を使用し、Cloud Dataflow でパイプラインを実行します。
全体の形は Read
変換で始まり Write
変換で終わる単一の経路になるはずです。パイプラインが実行されると、サービスによるパイプライン ニーズの判断に基づいてワーカーが自動的に追加され、その後不要になると削除されます。Compute Engine に移動すると、この状況をモニタリングできます。Dataflow サービスによって作成された仮想マシンが表示されるはずです。
runner
の設定を DirectRunner
に戻してローカルで実行し、迅速にフィードバックを受け取ることができます。今回のケースはデータセットが小規模で、DirectRunner
がサポートしている機能のみを使用しているため、この手法がうまく行きます。- パイプラインが終了したら、BigQuery のブラウザ ウィンドウに戻り、テーブルに対してクエリを実行します。
コードが期待どおりに機能せず、対処方法がわからないときは、ソリューションを確認してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボのパート 2. 基本的な ETL をパラメータ化する
約 20 分
データ エンジニアの仕事の多くは、繰り返し作業のように予測可能か、他の作業に似ています。しかし、パイプラインを実行するプロセスにはエンジニアリングの専門知識が必要です。終えたばかりのステップを振り返ってみてください。
- 開発環境を作成し、パイプラインを開発しました。環境には Apache Beam SDK やその他の依存関係を追加しました。
- 開発環境からパイプラインを実行しました。Apache Beam SDK は、Cloud Storage 内のファイルをステージングし、ジョブ リクエスト ファイルを作成して Cloud Dataflow サービスに送信しました。
ジョブを API 呼び出しで、または開発環境の設定(技術者以外のユーザーには不可能でしょう)を必要とせずに、開始する方法があるとはるかに好ましいでしょう。この方法でパイプラインを実行することもできます。
Dataflow テンプレートはこの問題の解決を、パイプラインのコンパイル時に作成される表現を変更してパラメータ化を可能にすることにより追求します。残念ながらコマンドライン パラメータの公開ほど単純ではありませんが、これは後のラボで行います。Dataflow テンプレートを使用すると、上記のワークフローは次のようになります。
- デベロッパーが開発環境を作成し、パイプラインを開発します。環境には、Apache Beam SDK やその他の依存関係が含まれます。
- デベロッパーがパイプラインを実行し、テンプレートを作成します。Apache Beam SDK が Cloud Storage 内のファイルをステージングし、(ジョブ リクエストに似た)テンプレート ファイルを作成して Cloud Storage 内に保存します。
- デベロッパー以外のユーザーや Airflow のような他のワークフロー ツールは、Google Cloud コンソール、gcloud コマンドライン ツール、または REST API を使用して簡単にジョブを実行し、テンプレート ファイル実行リクエストを Cloud Dataflow サービスに送信できます。
このラボでは練習として、多数ある Google 作成の Dataflow テンプレートのうち一つを使用し、パート 1 で構築したパイプラインと同じタスクを実現します。
タスク 1. JSON スキーマ ファイルを作成する
以前と同様に、この例のスキーマを表す JSON ファイルを Dataflow テンプレートに渡す必要があります。
- IDE のターミナルに戻ります。次のコマンドを実行して、メイン ディレクトリに戻り、既存の
logs.logs
テーブルからスキーマを取得します。
- この出力をファイルに取得し、GCS にアップロードします。追加の
sed
コマンドは、Dataflow が期待する完全な JSON オブジェクトを構築するためです。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. JavaScript ユーザー定義関数を作成する
Cloud Storage to BigQuery Dataflow テンプレートは、未加工のテキストを有効な JSON に変換する JavaScript 関数が必要です。このケースでは、テキストの各行が有効な JSON であるため、関数はやや単純です。
-
このタスクを完了するには、IDE のファイル エクスプローラで、dataflow_python フォルダに新しいファイルを作成します。
-
新しいファイルを作成するには、[ファイル] >> [新規] >> [テキスト ファイル] とクリックします。
-
ファイル名を transform.js に変更します。ファイル名を変更するには、ファイル名を右クリックします。
-
編集パネルで transform.js ファイルを開きます。ファイルをクリックすると開きます。
-
以下の関数を transform.js ファイルにコピーし、保存します。
- 次のコマンドを実行して、ファイルを Google Cloud Storage にコピーします。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 3. Dataflow テンプレートを実行する
- Cloud Dataflow ウェブ UI に移動します。
- [テンプレートからジョブを作成] をクリックします。
- Cloud Dataflow ジョブのジョブ名を入力します。
- [Dataflow テンプレート] で、[ストリーミング] セクションではなく [データを一括で処理する(バッチ)] セクションの [Cloud Storage 上のテキスト ファイルから BigQuery へ] テンプレートを選択します。
- [Cloud Storage 入力ファイル] に、
events.json
へのパスをの形式で入力します。 - [BigQuery スキーマ ファイルの Cloud Storage の場所] に、
schema.json
ファイルへのパスをの形式で入力します。 - [BigQuery 出力テーブル] に「
」と入力します。 - [一時 BigQuery ディレクトリ] に、この同じバケット内の新しいフォルダを入力します。これはジョブが作成します。
- [一時的な場所] で、この同じバケット内の 2 つ目の新しいフォルダを入力します。
- [暗号化] は 「Google が管理する暗号鍵」のままにします。
- [オプション パラメータ] をクリックして開きます。
- [Cloud Storage 内の JavaScript UDF パス] に、
.js
へのパスをの形式で入力します。 - [JavaScript UDF 名] に「
transform
」と入力します。 - [ジョブを実行] ボタンをクリックします。
実行中のジョブは Dataflow ウェブ UI 内で確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 4. Dataflow テンプレート コードを確認する
先ほど使用した Dataflow テンプレートのコードは、こちらの TextIOToBigQuery ガイドにあります。
-
main メソッドまで下方向にスクロールします。コードは、作成したパイプラインで見覚えがあるはずです。
-
PipelineOptions
オブジェクトを使用してPipeline
オブジェクトを作成するところから始まります。 -
TextIO.read() 変換で始まる
PTransform
のチェーンとして構成されています。 - read 変換の後の PTransform は少し異なっていて、たとえば、ソース形式が BigQuery テーブル形式とそれほど合っていない場合に、JavaScript を使用して入力文字列を変換できます。この機能の使用方法については、こちらのページのドキュメントをご覧ください。
- JavaScript UDF の後の PTransform は、ライブラリ関数を使用して JSON をテーブルの行に変換します。コードはこちらで確認できます。
- write PTransform は、グラフのコンパイル時に認識しているスキーマを使用するのではなく、実行時にのみ認識するパラメータを受け取るコードであるため、少し異なって見えます。これは NestedValueProvider クラスにより実現されます。
-
次回のラボもチェックしてください。PTransform
のチェーンに留まらないパイプラインの作成と、構築したパイプラインをカスタム Dataflow テンプレートに作り変える方法について説明します。
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。