チェックポイント
Create Cloud Composer environment.
/ 25
Create two Cloud Storage buckets.
/ 25
Create a dataset.
/ 25
Uploading the DAG and dependencies to Cloud Storage
/ 25
Cloud Composer: 別のロケーションに BigQuery テーブルをコピーする
- GSP283
- 概要
- 設定と要件
- タスク 1. Cloud Composer 環境を作成する
- タスク 2. Cloud Storage バケットを作成する
- タスク 3. BigQuery の宛先データセットを作成する
- タスク 4. Airflow と基本コンセプト、簡単な説明
- タスク 5. ワークフローを定義する
- タスク 6. 環境情報を表示する
- タスク 7. DAG Cloud Storage バケットの変数を作成する
- タスク 8. Airflow 変数を設定する
- タスク 9. DAG と依存関係を Cloud Storage にアップロードする
- タスク 10. Airflow UI を探索する
- タスク 11. 結果を確認する
- Cloud Composer 環境を削除する
- お疲れさまでした
- 次のステップ
GSP283
概要
世界中のさまざまなロケーションにデータセットがあり、データが Google Cloud Storage バケットまたは BigQuery テーブルに存在するとします。そのデータを整理、統合して分析し、ビジネスに関する分析情報を得るには、どうすればよいでしょうか。
Cloud Composer を使用すると、直感的でグラフィカルなビューで、ワークフローの構築や、リージョンまたはストレージ システム間でのデータ移動を行うことができます。そのほかにも、BigQuery と Cloud Storage の間で相互にデータを簡単かつ確実に転送できるテンプレートが用意されています。
このラボでは、次のタスクを行う Apache Airflow ワークフローを Cloud Composer で作成して実行します。
- コピーするテーブルのリストを指定する構成ファイルから読み込む
- 米国にある BigQuery データセットからテーブルのリストを Cloud Storage にエクスポートする
- 米国からエクスポートしたテーブルを EU にある Cloud Storage バケットにコピーする
- テーブルのリストを EU にある BigQuery データセットにインポートする
演習内容
このラボでは、次の方法について学びます。
- Cloud Composer 環境を作成する
- Cloud Storage バケットを作成する
- BigQuery データセットを作成する
- Cloud Composer で Apache Airflow ワークフローを作成して実行し、Cloud Storage と BigQuery の間でデータを移動する
設定と要件
[ラボを開始] ボタンをクリックする前に
こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。
このハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
- 標準的なインターネット ブラウザ(Chrome を推奨)
- ラボを完了するために十分な時間を確保してください。ラボをいったん開始すると一時停止することはできません。
ラボを開始して Google Cloud コンソールにログインする方法
-
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。
- [Google Cloud コンソールを開く] ボタン
- 残り時間
- このラボで使用する必要がある一時的な認証情報
- このラボを行うために必要なその他の情報(ある場合)
-
[Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウでリンクを開く] を選択します)。
ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。 -
必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。
{{{user_0.username | "Username"}}} [ラボの詳細] パネルでも [ユーザー名] を確認できます。
-
[次へ] をクリックします。
-
以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。
{{{user_0.password | "Password"}}} [ラボの詳細] パネルでも [パスワード] を確認できます。
-
[次へ] をクリックします。
重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。 -
その後次のように進みます。
- 利用規約に同意してください。
- 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
- 無料トライアルには登録しないでください。
その後、このタブで Google Cloud コンソールが開きます。
Cloud Shell をアクティブにする
Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
- Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン をクリックします。
接続した時点で認証が完了しており、プロジェクトに各自の PROJECT_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。
gcloud
は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
- (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
-
[承認] をクリックします。
-
出力は次のようになります。
出力:
- (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
出力:
出力例:
gcloud
ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。
タスク 1. Cloud Composer 環境を作成する
-
Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、[プロダクトとページ] セクションの [Composer] をクリックして、Cloud Composer 環境を作成します。
-
次に、[環境の作成] をクリックします。
-
プルダウン メニューで [Composer 3] を選択します。
-
環境に次のパラメータを設定します。
-
名前: composer-advanced-lab
-
ロケーション:
-
イメージのバージョン: composer-3-airflow-n.n.n-build.n(利用可能なイメージの中で最も大きい番号のイメージを選択)
-
[環境リソース] で [Small] を選択します。
-
[詳細設定を表示] のプルダウンをクリックし、[Airflow データベース ゾーン] で
を選択します。
その他の設定はすべてデフォルトのままにします。
- [作成] をクリックします。
Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されると、環境作成プロセスが完了したことになります。
「Cloud Storage バケットを作成する」、「ターゲットの BigQuery データセットを作成する」
セクションに進みます。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. Cloud Storage バケットを作成する
このタスクでは、2 つの Cloud Storage マルチリージョン バケットを作成します。これらのバケットは、エクスポートしたテーブルをロケーション間(米国から EU)でコピーするために使用されます。
米国にバケットを作成する
- [Cloud Storage] > [バケット] に移動して、[作成] をクリックします。
- プロジェクト ID を含めたユニバーサルに一意な名前をバケットに付けます(例:
-us)。 - [ロケーション タイプ] で [us(米国の複数のリージョン)] を選択します。
- 他の値はデフォルトのままにして [作成] をクリックします。
- [
このバケットに対する公開アクセス禁止を適用する
] チェックボックスをオンにし、[公開アクセスの防止
] ポップアップが表示されたら [確定] をクリックします。
EU にバケットを作成する
この手順を繰り返して、EU
リージョンに別のバケットを作成します。ユニバーサルに一意な名前には、バケットの接尾辞としてロケーションを含める必要があります(例:
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 3. BigQuery の宛先データセットを作成する
-
BigQuery の新しいウェブ UI から、ターゲットとなるBigQuery データセットを EU に作成します。
-
ナビゲーション メニュー > [BigQuery] に移動します。
[Cloud コンソールの BigQuery へようこそ] メッセージ ボックスが開きます。このメッセージ ボックスにはクイックスタート ガイドへのリンクと、UI の更新情報が表示されます。
-
[完了] をクリックします。
-
Qwiklabs のプロジェクト ID の横にあるその他アイコンをクリックし、[データセットを作成] を選択します。
- データセット ID nyc_tlc_EU を使用して、ロケーション タイプとして [マルチリージョン] を選択し、プルダウン メニューから [EU] を選択します。
- [データセットを作成] をクリックします。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 4. Airflow と基本コンセプト、簡単な説明
- 環境の構築中に、このラボで使用するサンプル ファイルを確認します。
Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。
Airflow を使用し、タスクの有向非巡回グラフ(DAG)としてワークフローを作成します。Airflow スケジューラは、指定された依存関係に従いつつ、ワーカーの配列でタスクを実行します。
基本コンセプト
DAG - 有向非巡回グラフ(DAG)とはタスクの集まりで、タスク同士の関係や依存状態を反映するように編成されます。
オペレーター - 単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドを実行する際に使用されます。
タスク - オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。
タスク インスタンス - タスクの特定の実行です。DAG、タスク、特定の時点として表され、実行中、成功、失敗、スキップなどのステータスを示します。
コンセプトに関するドキュメントから、Airflow のコンセプトの詳細をご覧いただけます。
タスク 5. ワークフローを定義する
Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。bq_copy_across_locations.py はワークフローのコードで、DAG とも呼ばれます。このファイルを開き、どのようにビルドされるかを確認しましょう。次に、ファイルの主な要素の詳細を説明します。
すべてのワークフロー タスクをオーケストレートするために、DAG は次のオペレーターをインポートします。
-
DummyOperator
: 開始と終了のダミータスクを作成し、DAG をよりわかりやすく視覚的に表示します。 -
BigQueryToCloudStorageOperator
: BigQuery テーブルを Avro 形式で Cloud Storage バケットにエクスポートします。 -
GoogleCloudStorageToGoogleCloudStorageOperator
: Cloud Storage バケット間でファイルをコピーします。 -
GoogleCloudStorageToBigQueryOperator
: Cloud Storage バケット内の Avro ファイルからテーブルをインポートします。
- この例では、
read_table_list()
関数は、構成ファイルを読み込み、コピーするテーブルのリストを作成するように定義されています。
- DAG の名前は
bq_copy_us_to_eu_01
です。DAG はデフォルトではスケジュール設定されていないため、手動でトリガーする必要があります。
- Cloud Storage プラグインを定義するには、
Cloud StoragePlugin(AirflowPlugin)
クラスを定義し、Airflow 1.10-stable ブランチからダウンロードしたフックとオペレーターをマッピングします。
タスク 6. 環境情報を表示する
-
Composer に戻り、環境のステータスを確認します。
-
環境は作成済みなので、環境の名前をクリックすると詳細が表示されます。
[環境の詳細] ページでは、Airflow ウェブ UI の URL、Google Kubernetes Engine のクラスタ ID、DAG フォルダに接続されている Cloud Storage バケットの名前などの情報を確認できます。
次の手順は、Cloud Shell で完了する必要があります。
仮想環境を作成する
Python 仮想環境を使用して、パッケージのインストール先をシステムとは別の場所にします。
-
virtualenv
環境をインストールします。
- 仮想環境をビルドします。
- 仮想環境をアクティブにします。
タスク 7. DAG Cloud Storage バケットの変数を作成する
- Cloud Shell で以下を実行して [環境の詳細] ページの DAG バケット名をコピーし、Cloud Shell 内で参照できるように変数を設定します。
-composer-advanced-YOURDAGSBUCKET-bucket
のような名前です。ラボ中にこの変数を何度か使用します。
タスク 8. Airflow 変数を設定する
Airflow 変数は Airflow 固有の概念であり、環境変数とは異なります。ここでは、デプロイする DAG で使用される 3 つの Airflow 変数(table_list_file_path
、gcs_source_bucket
、gcs_dest_bucket
)を設定します。
キー | 値 | 詳細 |
---|---|---|
table_list_file_path |
/home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv | ソーステーブルとターゲット テーブルがリストされた CSV ファイル(データセットを含む) |
gcs_source_bucket |
{一意の ID}-us | BigQuery の tabledest_bbucks をソースからエクスポートするために使用する Cloud Storage バケット |
gcs_dest_bucket |
{一意の ID}-eu | ターゲットで BigQuery テーブルをインポートするために使用する Cloud Storage バケット |
次の gcloud composer
コマンドでは、Airflow CLI のサブコマンド variables が実行されます。このサブコマンドは gcloud
コマンドライン ツールに引数を渡します。
3 つの変数を設定するために、上記の表のそれぞれのキーと値に対して 1 回ずつ composer コマンド
を実行します。コマンドは次のようになります。
ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable
)は無視しても問題ありません。これは、410.0.0 バージョンの gcloud の gcloud composer environments run
に関する既知の問題です。エラー メッセージが表示されても、変数は適切に設定されます。
-
ENVIRONMENT_NAME
には、環境の名前を指定します。 -
LOCATION
には、環境がある Compute Engine のリージョンを指定します。gcloud composer コマンドを使用する場合は、--location
フラグを含めるか、gcloud コマンドを実行する前にデフォルトのロケーションを設定する必要があります。 -
KEY
とVALUE
には、変数と設定値を指定します。左側のgcloud
コマンド(gcloud 関連の引数を含む)と、右側の Airflow サブコマンド関連の引数の間に、スペース、ダッシュ 2 つ、スペース(--
)を含めます。また、gcloud composer environments run
コマンドを variables サブコマンドと一緒に使用する場合は、KEY
とVALUE
の引数の間にスペースを含めます。
gcs_source_bucket
と gcs_dest_bucket
をタスク 2 で作成したバケットの名前に置き換えて、これらのコマンドを Cloud Shell で実行します。
変数の値を表示するには、Airflow CLI サブコマンド variables に get
引数を指定して実行するか、Airflow UI を使用します。
たとえば、コマンドを実行する場合は次のようになります。
タスク 9. DAG と依存関係を Cloud Storage にアップロードする
- Google Cloud Python ドキュメント サンプル ファイルを Cloud Shell にコピーします。
- サードパーティのフックとオペレーターのコピーを、Composer の DAG Cloud Storage バケットの plugins フォルダにアップロードします。
- 次に、DAG と構成ファイルを、環境内の DAG Cloud Storage バケットにアップロードします。
Cloud Composer が自動的に DAG を Airflow 環境に登録します。DAG の変更は 3~5 分以内に行われます。タスクのステータスを Airflow ウェブ インターフェースで確認し、設定どおり、DAG がスケジュール設定されていないことを確認できます。
タスク 10. Airflow UI を探索する
Cloud コンソールを使用して Airflow ウェブ インターフェースにアクセスする手順は次のとおりです。
- Composer の [環境] ページに戻ります。
- 対象環境の [Airflow ウェブサーバー] 列で、[Airflow] リンクをクリックします。
- ラボの認証情報をクリックします。
- 新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。この段階でデータはまだ読み込み中です。その間にラボを進めることができます。
変数を表示する
先ほど設定した変数は、環境内に保持されています。
- Airflow のメニューバーで [管理] > [変数] を選択すると、変数を確認できます。
DAG をトリガーして手動で実行する
-
[DAG] タブをクリックし、リンクの読み込みが終了するのを待ちます。
-
DAG を手動でトリガーするには
composer_sample_bq_copy_across_locations
の再生ボタンをクリックします。
- [DAG をトリガー] をクリックして、この操作を確認します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
DAG の実行状況を確認する
DAG ファイルを Cloud Storage の DAGs フォルダにアップロードすると、Cloud Composer がそのファイルを解析します。エラーがない場合は、DAG の一覧にワークフローの名前が表示されます。ワークフローはキューに格納され、スケジュールの条件(設定に基づき、ここでは「None」)が満たされるとすぐに実行されます。
再生ボタンを押すと、実行ステータスが緑色になります。
- DAG の名前をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されます。
- ツールバーで [グラフ] をクリックし、各タスクに該当するグラフィックにカーソルを合わせてステータスを表示します。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。
グラフビューからワークフローをもう一度実行する手順は次のとおりです。
- Airflow UI のグラフビューで、開始のグラフィックをクリックします。
- [消去] をクリックしてすべてのタスクをリセットし、[OK] をクリックして確定します。
プロセスの実行中にブラウザを更新して、最新の情報を表示します。
タスク 11. 結果を確認する
Cloud コンソールのページに移動してワークフローのステータスと結果を確認します。
- エクスポートされたテーブルは米国のバケットからヨーロッパの Cloud Storage バケットにコピーされました。[Cloud Storage] をクリックして、ソース(米国)とターゲット(ヨーロッパ)のバケットで Avro の中間ファイルを確認します。
- テーブルのリストが BigQuery データセットにインポートされました。[BigQuery] をクリックし、プロジェクト名と nyc_tlc_EU データセットをクリックして、作成したデータセットからテーブルにアクセスできることを確認します。
Cloud Composer 環境を削除する
- [Composer] の [環境] ページに戻ります。
- Composer 環境の横にあるチェックボックスを選択します。
- [削除] をクリックします。
- もう一度 [削除] をクリックして、ポップアップを確認します。
お疲れさまでした
プログラムで米国から EU にテーブルをコピーできました。このラボは、こちらのブログ投稿(投稿者: David Sabater Dinter)の内容に基づくものです。
次のステップ
- Airflow の詳細な使用方法については、Airflow のウェブサイトまたは Airflow の GitHub プロジェクトをご覧ください。
- Airflow については、ヘルプグループなど、他にも多くのリソースを利用できます。
- Apache JIRA アカウントに登録すると、Apache Airflow JIRA プロジェクトで関心がある問題を再オープンできます。
- Airflow UI の詳細については、ウェブ インターフェースへのアクセスをご覧ください。
Google Cloud トレーニングと認定資格
Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。
マニュアルの最終更新日: 2024 年 6 月 21 日
ラボの最終テスト日: 2024 年 6 月 21 日
Copyright 2024 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。