arrow_back

Dataflow と BigQuery を使用した Google Cloud での ETL 処理(Python)

ログイン 参加
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

Dataflow と BigQuery を使用した Google Cloud での ETL 処理(Python)

ラボ 1時間 universal_currency_alt クレジット: 5 show_chart 中級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

GSP290

Google Cloud セルフペース ラボ

概要

Google Cloud では、次の Google Cloud サービスを使用して、Python コードを実行し、一般公開されているデータセットから BigQuery にデータを取り込んで変換するデータ パイプラインを構築できます。

  • Cloud Storage
  • Dataflow
  • BigQuery

このラボでは、これらのサービスを使用して、設計上の考慮事項や実装の詳細を含む独自のデータ パイプラインを作成し、プロトタイプが要件を満たしていることを確認します。指示があった場合は、Python ファイルを開き、コメントを読んでください。

演習内容

このラボでは、次の方法について学びます。

  • データの取り込み用の Dataflow パイプライン(Python)の構築と実行
  • データの変換と拡充用の Dataflow パイプライン(Python)の構築と実行

設定と要件

[ラボを開始] ボタンをクリックする前に

こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

このハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
注: このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウント間の競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。
  • ラボを完了するために十分な時間を確保してください。ラボをいったん開始すると一時停止することはできません。
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、このラボでは使用しないでください。アカウントへの追加料金が発生する可能性があります。

ラボを開始して Google Cloud コンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。

    • [Google Cloud コンソールを開く] ボタン
    • 残り時間
    • このラボで使用する必要がある一時的な認証情報
    • このラボを行うために必要なその他の情報(ある場合)
  2. [Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウでリンクを開く] を選択します)。

    ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。

    ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。

    注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
  3. 必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。

    {{{user_0.username | "Username"}}}

    [ラボの詳細] パネルでも [ユーザー名] を確認できます。

  4. [次へ] をクリックします。

  5. 以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。

    {{{user_0.password | "Password"}}}

    [ラボの詳細] パネルでも [パスワード] を確認できます。

  6. [次へ] をクリックします。

    重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
  7. その後次のように進みます。

    • 利用規約に同意してください。
    • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
    • 無料トライアルには登録しないでください。

その後、このタブで Google Cloud コンソールが開きます。

注: Google Cloud のプロダクトやサービスのリストを含むメニューを表示するには、左上のナビゲーション メニューをクリックします。ナビゲーション メニュー アイコン

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン 「Cloud Shell をアクティブにする」アイコン をクリックします。

接続した時点で認証が完了しており、プロジェクトに各自の PROJECT_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。

Your Cloud Platform project in this session is set to YOUR_PROJECT_ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  1. (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
  1. [承認] をクリックします。

  2. 出力は次のようになります。

出力:

ACTIVE: * ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project = <project_ID>

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: Google Cloud における gcloud ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。

タスク 1. Dataflow API が有効になっていることを確認する

必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。

重要: API がすでに有効になっている場合でも、API に接続し直すため、以下の手順 1~4 を行い、API を無効にしてから再度有効にしてください。
  1. Cloud コンソールの上部の検索バーに「Dataflow API」と入力します。検索結果の「Dataflow API」をクリックします。

  2. [管理] をクリックします。

  3. [API を無効にする] をクリックします。

確認を求められたら、[無効にする] をクリックします。

  1. [有効にする] をクリックします。

API が再度有効になると、ページに無効にするオプションが表示されます。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

Dataflow API を無効にしてから再度有効にする

タスク 2. スターター コードをダウンロードする

  1. Cloud Shell で以下のコマンドを実行して、Google Cloud のプロフェッショナル サービスの GitHub から Dataflow の Python 例を取得します。
gsutil -m cp -R gs://spls/gsp290/dataflow-python-examples .
  1. Cloud Shell で変数にプロジェクト ID を設定します。
export PROJECT={{{ project_0.project_id }}} gcloud config set project $PROJECT

タスク 3. Cloud Storage バケットを作成する

  • Cloud Shell でバケット作成コマンドを使用して、プロジェクト内の リージョンに新しいリージョン バケットを作成します。
gsutil mb -c regional -l {{{ project_0.default_region }}} gs://$PROJECT

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

Cloud Storage バケットを作成する

タスク 4. ファイルをバケットにコピーする

  • Cloud Shell で gsutil コマンドを使用して、作成した Cloud Storage バケットにファイルをコピーします。
gsutil cp gs://spls/gsp290/data_files/usa_names.csv gs://$PROJECT/data_files/ gsutil cp gs://spls/gsp290/data_files/head_usa_names.csv gs://$PROJECT/data_files/

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

ファイルをバケットにコピーする

タスク 5. BigQuery データセットを作成する

  • Cloud Shell で BigQuery 内に lake というデータセットを作成します。BigQuery で使用するテーブルはすべてここに読み込まれます。
bq mk lake

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

BigQuery データセット(名前: lake)を作成する

タスク 6. Dataflow パイプラインを構築する

このセクションでは、追記専用の Dataflow を作成し、BigQuery テーブルにデータを取り込みます。組み込みのコードエディタを使用すると、Google Cloud コンソールでコードを表示、編集できます。

追記専用の Dataflow パイプラインの図

Cloud Shell コードエディタを開く

  1. Cloud Shell で [エディタを開く] アイコンをクリックして、ソースコードに移動します。

[エディタを開く] アイコン

  1. プロンプトが表示されたら、[新しいウィンドウで開く] をクリックして、新しいウィンドウでコードエディタを開きます。Cloud Shell エディタを使用すると、Cloud Shell 環境でファイルを編集できます。[エディタ] から Cloud Shell に戻るには、[ターミナルを開く] をクリックします。

タスク 7. Dataflow パイプラインでのデータの取り込み

読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。パイプラインの具体的な処理は以下のとおりです。

  • Cloud Storage からファイルを取り込む
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

タスク 8. パイプラインの Python コードを確認する

コードエディタで、dataflow-python-examples > dataflow_python_examples に移動し、data_ingestion.py ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードによって、データセット lake に BigQuery のテーブルが入力されます。

コードエディタ

タスク 9. Apache Beam パイプラインを実行する

  1. このステップでは Cloud Shell セッションに戻り、必要な Python ライブラリの設定を行います。

このラボの Dataflow ジョブには Python3.8 が必要です。適切なバージョンで作業するには、Python 3.8 Docker コンテナで Dataflow プロセスを実行します。

  1. Cloud Shell で以下を実行して、Python コンテナを起動します。
docker run -it -e PROJECT=$PROJECT -v $(pwd)/dataflow-python-examples:/dataflow python:3.8 /bin/bash

このコマンドによって、Docker コンテナと Python 3.8 の最新の安定版が pull され、コマンドシェルが開き、コンテナ内の次のコマンドが実行されます。-v フラグでソースコードをコンテナの volume として指定しているため、Cloud Shell エディタで編集できるうえ、実行中のコンテナ内でもアクセスできます。

  1. コンテナが pull されて Cloud Shell で実行され始めたら、以下のコマンドで、実行中のコンテナに apache-beam をインストールします。
pip install apache-beam[gcp]==2.59.0
  1. Cloud Shell で実行中のコンテナで、ソースコードがリンクされているディレクトリに移動します。
cd dataflow/

クラウドで取り込み Dataflow パイプラインを実行する

  1. 以下のコマンドを実行すると、必要なワーカーがスピンアップされ、完了するとシャットダウンされます。
python dataflow_python_examples/data_ingestion.py \ --project=$PROJECT --region={{{ project_0.default_region }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. Cloud コンソールに戻り、ナビゲーション メニュー > [Dataflow] を開いて、ジョブのステータスを確認します。

ナビゲーション メニュー > [Dataflow]

  1. ジョブの名前をクリックして進行状況を確認します。[ジョブ ステータス] が「完了しました」になったら、次のステップに進みます。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  2. BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。

ナビゲーション メニュー > [BigQuery]

  1. プロジェクト名をクリックして、lake データセットの usa_names テーブルを表示します。

usa_names テーブル

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names データの例を確認します。
注: usa_names テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

データの取り込み Dataflow パイプラインを構築する

タスク 10. データ変換

読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。具体的には以下を行います。

  • Cloud Storage からファイルを取り込む
  • 読み取った行を辞書オブジェクトに変換する
  • 年を含むデータを、BigQuery が日付として認識できる形式に変換する
  • BigQuery に行を出力する

変換パイプラインの Python コードを確認する

コードエディタで data_transformation.py ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。

タスク 11. Dataflow 変換パイプラインを実行する

クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーがスピンアップされ、完了するとシャットダウンされます。

  1. 次のコマンドを実行します。
python dataflow_python_examples/data_transformation.py \ --project=$PROJECT \ --region={{{ project_0.default_region }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. ナビゲーション メニュー > [Dataflow] に移動して、ジョブの名前をクリックし、ジョブのステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  2. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

  3. lake データセットに usa_names_transformed テーブルが表示されます。

  4. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_transformed データの例を確認します。

注: usa_names_transformed テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

データの変換 Dataflow パイプラインを構築する

タスク 12. データ拡充

読み取りに TextIO、書き込みに BigQueryIO を使用して BigQuery にデータを取り込む Dataflow パイプラインを構築します。具体的には以下を行います。

  • Cloud Storage からファイルを取り込む
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

タスク 13. データ拡充パイプラインの Python コードを確認する

  1. コードエディタで data_enrichment.py ファイルを開きます。

  2. コードの機能を説明したコメントを確認します。このコードによって、BigQuery にデータが入力されます。

83 行目は以下のようになっています。

values = [x.decode('utf8') for x in csv_row]
  1. これを次のように編集します。
values = [x for x in csv_row]
  1. この行を編集し終えたら、エディタの [File] プルダウン メニューを選択して [Save] をクリックし、更新したファイルを保存してください。

タスク 14. データ拡充 Dataflow パイプラインを実行する

クラウドで Dataflow パイプラインを実行します。

  1. Cloud Shell で以下を実行して、必要なワーカーをスピンアップし、完了後にシャットダウンします。
python dataflow_python_examples/data_enrichment.py \ --project=$PROJECT \ --region={{{ project_0.default_region }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. ナビゲーション メニュー > [Dataflow] に移動して、ジョブのステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  2. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに usa_names_enriched テーブルが表示されます。

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_enriched データの例を確認します。
注: usa_names_enriched テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

完了したデータ拡充タスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

データ拡充 Dataflow パイプラインを構築する

タスク 15. データレイクからデータマートへのパイプラインの Python コードを確認する

2 つの BigQuery データソースからデータを読み取り、データソースを結合する Dataflow パイプラインを構築します。具体的には以下を行います。

  • 2 つの BigQuery ソースからファイルを取り込む
  • 2 つのデータソースを結合する
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

コードエディタdata_lake_to_mart.py ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードによって、2 つのテーブルが結合され、BigQuery に結果のデータが入力されます。

タスク 16. Apache Beam パイプラインを実行してデータ結合と BigQuery の結果テーブルの作成を実施する

クラウドで Dataflow パイプラインを実行します。

  1. Cloud Shell で以下のコードブロックを実行して、必要なワーカーをスピンアップし、完了後にシャットダウンします。
python dataflow_python_examples/data_lake_to_mart.py \ --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" \ --max_num_workers=4 \ --project=$PROJECT \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --save_main_session \ --region={{{ project_0.default_region }}}
  1. ナビゲーション メニュー > [Dataflow] に移動し、この新しいジョブの名前をクリックしてステータスを確認します。この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  2. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに orders_denormalized_sideinput テーブルが表示されます。

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、orders_denormalized_sideinput データの例を確認します。
注: orders_denormalized_sideinput テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

完了した結合タスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

データレイクからデータマートへの Dataflow パイプラインを構築する

理解度チェック

今回のラボで学習した内容の理解を深めていただくため、以下の多肢選択問題を用意しました。正解を目指して頑張ってください。

お疲れさまでした

データを BigQuery に取り込んで変換するために、Dataflow を使用して Python コードを実行しました。

次のステップと詳細情報

さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。

Google Cloud トレーニングと認定資格

Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。

マニュアルの最終更新日: 2024 年 2 月 11 日

ラボの最終テスト日: 2023 年 10 月 12 日

Copyright 2024 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします