チェックポイント
Disable and re-enable the Dataflow API
/ 10
Create a Cloud Storage Bucket
/ 10
Copy Files to Your Bucket
/ 10
Create the BigQuery Dataset (name: lake)
/ 20
Build a Data Ingestion Dataflow Pipeline
/ 10
Build a Data Transformation Dataflow Pipeline
/ 10
Build a Data Enrichment Dataflow Pipeline
/ 10
Build a Data lake to Mart Dataflow Pipeline
/ 20
Dataflow と BigQuery を使用した Google Cloud での ETL 処理(Python)
- GSP290
- 概要
- 設定と要件
- タスク 1. Dataflow API が有効になっていることを確認する
- タスク 2. スターター コードをダウンロードする
- タスク 3. Cloud Storage バケットを作成する
- タスク 4. ファイルをバケットにコピーする
- タスク 5. BigQuery データセットを作成する
- タスク 6. Dataflow パイプラインを構築する
- タスク 7. Dataflow パイプラインでのデータの取り込み
- タスク 8. パイプラインの Python コードを確認する
- タスク 9. Apache Beam パイプラインを実行する
- タスク 10. データ変換
- タスク 11. Dataflow 変換パイプラインを実行する
- タスク 12. データ拡充
- タスク 13. データ拡充パイプラインの Python コードを確認する
- タスク 14. データ拡充 Dataflow パイプラインを実行する
- タスク 15. データレイクからデータマートへのパイプラインの Python コードを確認する
- タスク 16. Apache Beam パイプラインを実行してデータ結合と BigQuery の結果テーブルの作成を実施する
- 理解度チェック
- お疲れさまでした
GSP290
概要
Google Cloud では、次の Google Cloud サービスを使用して、Python コードを実行し、一般公開されているデータセットから BigQuery にデータを取り込んで変換するデータ パイプラインを構築できます。
- Cloud Storage
- Dataflow
- BigQuery
このラボでは、これらのサービスを使用して、設計上の考慮事項や実装の詳細を含む独自のデータ パイプラインを作成し、プロトタイプが要件を満たしていることを確認します。指示があった場合は、Python ファイルを開き、コメントを読んでください。
演習内容
このラボでは、次の方法について学びます。
- データの取り込み用の Dataflow パイプライン(Python)の構築と実行
- データの変換と拡充用の Dataflow パイプライン(Python)の構築と実行
設定と要件
[ラボを開始] ボタンをクリックする前に
こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。
このハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
- 標準的なインターネット ブラウザ(Chrome を推奨)
- ラボを完了するために十分な時間を確保してください。ラボをいったん開始すると一時停止することはできません。
ラボを開始して Google Cloud コンソールにログインする方法
-
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。
- [Google Cloud コンソールを開く] ボタン
- 残り時間
- このラボで使用する必要がある一時的な認証情報
- このラボを行うために必要なその他の情報(ある場合)
-
[Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウでリンクを開く] を選択します)。
ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。 -
必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。
{{{user_0.username | "Username"}}} [ラボの詳細] パネルでも [ユーザー名] を確認できます。
-
[次へ] をクリックします。
-
以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。
{{{user_0.password | "Password"}}} [ラボの詳細] パネルでも [パスワード] を確認できます。
-
[次へ] をクリックします。
重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。 -
その後次のように進みます。
- 利用規約に同意してください。
- 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
- 無料トライアルには登録しないでください。
その後、このタブで Google Cloud コンソールが開きます。
Cloud Shell をアクティブにする
Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
- Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン をクリックします。
接続した時点で認証が完了しており、プロジェクトに各自の PROJECT_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。
gcloud
は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
- (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
-
[承認] をクリックします。
-
出力は次のようになります。
出力:
- (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
出力:
出力例:
gcloud
ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。
タスク 1. Dataflow API が有効になっていることを確認する
必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。
-
Cloud コンソールの上部の検索バーに「Dataflow API」と入力します。検索結果の「Dataflow API」をクリックします。
-
[管理] をクリックします。
-
[API を無効にする] をクリックします。
確認を求められたら、[無効にする] をクリックします。
- [有効にする] をクリックします。
API が再度有効になると、ページに無効にするオプションが表示されます。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 2. スターター コードをダウンロードする
- Cloud Shell で以下のコマンドを実行して、Google Cloud のプロフェッショナル サービスの GitHub から Dataflow の Python 例を取得します。
- Cloud Shell で変数にプロジェクト ID を設定します。
タスク 3. Cloud Storage バケットを作成する
- Cloud Shell でバケット作成コマンドを使用して、プロジェクト内の
リージョンに新しいリージョン バケットを作成します。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 4. ファイルをバケットにコピーする
- Cloud Shell で
gsutil
コマンドを使用して、作成した Cloud Storage バケットにファイルをコピーします。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 5. BigQuery データセットを作成する
- Cloud Shell で BigQuery 内に
lake
というデータセットを作成します。BigQuery で使用するテーブルはすべてここに読み込まれます。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 6. Dataflow パイプラインを構築する
このセクションでは、追記専用の Dataflow を作成し、BigQuery テーブルにデータを取り込みます。組み込みのコードエディタを使用すると、Google Cloud コンソールでコードを表示、編集できます。
Cloud Shell コードエディタを開く
- Cloud Shell で [エディタを開く] アイコンをクリックして、ソースコードに移動します。
- プロンプトが表示されたら、[新しいウィンドウで開く] をクリックして、新しいウィンドウでコードエディタを開きます。Cloud Shell エディタを使用すると、Cloud Shell 環境でファイルを編集できます。[エディタ] から Cloud Shell に戻るには、[ターミナルを開く] をクリックします。
タスク 7. Dataflow パイプラインでのデータの取り込み
読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。パイプラインの具体的な処理は以下のとおりです。
- Cloud Storage からファイルを取り込む
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
タスク 8. パイプラインの Python コードを確認する
コードエディタで、dataflow-python-examples
> dataflow_python_examples
に移動し、data_ingestion.py
ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードによって、データセット lake に BigQuery のテーブルが入力されます。
タスク 9. Apache Beam パイプラインを実行する
- このステップでは Cloud Shell セッションに戻り、必要な Python ライブラリの設定を行います。
このラボの Dataflow ジョブには Python3.8
が必要です。適切なバージョンで作業するには、Python 3.8 Docker コンテナで Dataflow プロセスを実行します。
- Cloud Shell で以下を実行して、Python コンテナを起動します。
このコマンドによって、Docker コンテナと Python 3.8 の最新の安定版が pull され、コマンドシェルが開き、コンテナ内の次のコマンドが実行されます。-v
フラグでソースコードをコンテナの volume
として指定しているため、Cloud Shell エディタで編集できるうえ、実行中のコンテナ内でもアクセスできます。
- コンテナが pull されて Cloud Shell で実行され始めたら、以下のコマンドで、実行中のコンテナに
apache-beam
をインストールします。
- Cloud Shell で実行中のコンテナで、ソースコードがリンクされているディレクトリに移動します。
クラウドで取り込み Dataflow パイプラインを実行する
- 以下のコマンドを実行すると、必要なワーカーがスピンアップされ、完了するとシャットダウンされます。
- Cloud コンソールに戻り、ナビゲーション メニュー > [Dataflow] を開いて、ジョブのステータスを確認します。
-
ジョブの名前をクリックして進行状況を確認します。[ジョブ ステータス] が「完了しました」になったら、次のステップに進みます。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
-
BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。
- プロジェクト名をクリックして、
lake
データセットの usa_names テーブルを表示します。
- そのテーブルをクリックし、[プレビュー] タブに移動して、
usa_names
データの例を確認します。
usa_names
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 10. データ変換
読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。具体的には以下を行います。
- Cloud Storage からファイルを取り込む
- 読み取った行を辞書オブジェクトに変換する
- 年を含むデータを、BigQuery が日付として認識できる形式に変換する
- BigQuery に行を出力する
変換パイプラインの Python コードを確認する
コードエディタで data_transformation.py
ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。
タスク 11. Dataflow 変換パイプラインを実行する
クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーがスピンアップされ、完了するとシャットダウンされます。
- 次のコマンドを実行します。
-
ナビゲーション メニュー > [Dataflow] に移動して、ジョブの名前をクリックし、ジョブのステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
-
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
-
lake
データセットに usa_names_transformed テーブルが表示されます。 -
そのテーブルをクリックし、[プレビュー] タブに移動して、
usa_names_transformed
データの例を確認します。
usa_names_transformed
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 12. データ拡充
読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。具体的には以下を行います。
- Cloud Storage からファイルを取り込む
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
タスク 13. データ拡充パイプラインの Python コードを確認する
-
コードエディタで
data_enrichment.py
ファイルを開きます。 -
コードの機能を説明したコメントを確認します。このコードによって、BigQuery にデータが入力されます。
83 行目は以下のようになっています。
- これを次のように編集します。
- この行を編集し終えたら、エディタの [File] プルダウン メニューを選択して [Save] をクリックし、更新したファイルを保存してください。
タスク 14. データ拡充 Dataflow パイプラインを実行する
クラウドで Dataflow パイプラインを実行します。
- Cloud Shell で以下を実行して、必要なワーカーをスピンアップし、完了後にシャットダウンします。
-
ナビゲーション メニュー > [Dataflow] に移動して、ジョブのステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
-
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
lake
データセットに usa_names_enriched テーブルが表示されます。
- そのテーブルをクリックし、[プレビュー] タブに移動して、
usa_names_enriched
データの例を確認します。
usa_names_enriched
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
完了したデータ拡充タスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
タスク 15. データレイクからデータマートへのパイプラインの Python コードを確認する
2 つの BigQuery データソースからデータを読み取り、データソースを結合する Dataflow パイプラインを構築します。具体的には以下を行います。
- 2 つの BigQuery ソースからファイルを取り込む
- 2 つのデータソースを結合する
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
コードエディタで data_lake_to_mart.py
ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードによって、2 つのテーブルが結合され、BigQuery に結果のデータが入力されます。
タスク 16. Apache Beam パイプラインを実行してデータ結合と BigQuery の結果テーブルの作成を実施する
クラウドで Dataflow パイプラインを実行します。
- Cloud Shell で以下のコードブロックを実行して、必要なワーカーをスピンアップし、完了後にシャットダウンします。
-
ナビゲーション メニュー > [Dataflow] に移動し、この新しいジョブの名前をクリックしてステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
-
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
lake
データセットに orders_denormalized_sideinput テーブルが表示されます。
- そのテーブルをクリックし、[プレビュー] タブに移動して、
orders_denormalized_sideinput
データの例を確認します。
orders_denormalized_sideinput
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
完了した結合タスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
理解度チェック
今回のラボで学習した内容の理解を深めていただくため、以下の多肢選択問題を用意しました。正解を目指して頑張ってください。
お疲れさまでした
データを BigQuery に取り込んで変換するために、Dataflow を使用して Python コードを実行しました。
次のステップと詳細情報
さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。
- Dataflow
- BigQuery
- より高度なコンセプトについては、Apache Beam プログラミング ガイドをご覧ください。
- 以下のラボをご確認ください。
Google Cloud トレーニングと認定資格
Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。
マニュアルの最終更新日: 2024 年 2 月 11 日
ラボの最終テスト日: 2023 年 10 月 12 日
Copyright 2024 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。