
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Create Cloud Composer environment.
/ 25
Create two Cloud Storage buckets.
/ 25
Create a dataset.
/ 25
Uploading the DAG and dependencies to Cloud Storage
/ 25
世界中のさまざまなロケーションにデータセットがあり、データが Google Cloud Storage バケットまたは BigQuery テーブルに存在するとします。そのデータを整理、統合して分析し、ビジネスに関する分析情報を得るには、どうすればよいでしょうか。
Cloud Composer を使用すると、直感的でグラフィカルなビューで、ワークフローの構築や、リージョンまたはストレージ システム間でのデータ移動を行うことができます。そのほかにも、BigQuery と Cloud Storage の間で相互にデータを簡単かつ確実に転送できるテンプレートが用意されています。
このラボでは、次のタスクを行う Apache Airflow ワークフローを Cloud Composer で作成して実行します。
このラボでは、次の方法について学びます。
こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。
このハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。
[Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウでリンクを開く] を選択します)。
ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。
[ラボの詳細] パネルでも [ユーザー名] を確認できます。
[次へ] をクリックします。
以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。
[ラボの詳細] パネルでも [パスワード] を確認できます。
[次へ] をクリックします。
その後次のように進みます。
その後、このタブで Google Cloud コンソールが開きます。
Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
接続した時点で認証が完了しており、プロジェクトに各自の PROJECT_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。
gcloud
は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
[承認] をクリックします。
出力は次のようになります。
出力:
出力:
出力例:
gcloud
ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。
Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、[プロダクトとページ] セクションの [Composer] をクリックして、Cloud Composer 環境を作成します。
次に、[環境の作成] をクリックします。
プルダウン メニューで [Composer 3] を選択します。
環境に次のパラメータを設定します。
名前: composer-advanced-lab
ロケーション:
イメージのバージョン: composer-3-airflow-n.n.n-build.n(利用可能なイメージの中で最も大きい番号のイメージを選択)
[環境リソース] で [Small] を選択します。
[詳細設定を表示] のプルダウンをクリックし、[Airflow データベース ゾーン] で
その他の設定はすべてデフォルトのままにします。
Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されると、環境作成プロセスが完了したことになります。
「Cloud Storage バケットを作成する」、「ターゲットの BigQuery データセットを作成する」
セクションに進みます。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
このタスクでは、2 つの Cloud Storage マルチリージョン バケットを作成します。これらのバケットは、エクスポートしたテーブルをロケーション間(米国から EU)でコピーするために使用されます。
このバケットに対する公開アクセス禁止を適用する
] チェックボックスをオンにし、[公開アクセスの防止
] ポップアップが表示されたら [確定] をクリックします。この手順を繰り返して、EU
リージョンに別のバケットを作成します。ユニバーサルに一意な名前には、バケットの接尾辞としてロケーションを含める必要があります(例:
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
BigQuery の新しいウェブ UI から、ターゲットとなるBigQuery データセットを EU に作成します。
ナビゲーション メニュー > [BigQuery] に移動します。
[Cloud コンソールの BigQuery へようこそ] メッセージ ボックスが開きます。このメッセージ ボックスにはクイックスタート ガイドへのリンクと、UI の更新情報が表示されます。
[完了] をクリックします。
Qwiklabs のプロジェクト ID の横にあるその他アイコンをクリックし、[データセットを作成] を選択します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。
Airflow を使用し、タスクの有向非巡回グラフ(DAG)としてワークフローを作成します。Airflow スケジューラは、指定された依存関係に従いつつ、ワーカーの配列でタスクを実行します。
DAG - 有向非巡回グラフ(DAG)とはタスクの集まりで、タスク同士の関係や依存状態を反映するように編成されます。
オペレーター - 単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドを実行する際に使用されます。
タスク - オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。
タスク インスタンス - タスクの特定の実行です。DAG、タスク、特定の時点として表され、実行中、成功、失敗、スキップなどのステータスを示します。
コンセプトに関するドキュメントから、Airflow のコンセプトの詳細をご覧いただけます。
Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。bq_copy_across_locations.py はワークフローのコードで、DAG とも呼ばれます。このファイルを開き、どのようにビルドされるかを確認しましょう。次に、ファイルの主な要素の詳細を説明します。
すべてのワークフロー タスクをオーケストレートするために、DAG は次のオペレーターをインポートします。
DummyOperator
: 開始と終了のダミータスクを作成し、DAG をよりわかりやすく視覚的に表示します。BigQueryToCloudStorageOperator
: BigQuery テーブルを Avro 形式で Cloud Storage バケットにエクスポートします。GoogleCloudStorageToGoogleCloudStorageOperator
: Cloud Storage バケット間でファイルをコピーします。GoogleCloudStorageToBigQueryOperator
: Cloud Storage バケット内の Avro ファイルからテーブルをインポートします。read_table_list()
関数は、構成ファイルを読み込み、コピーするテーブルのリストを作成するように定義されています。bq_copy_us_to_eu_01
です。DAG はデフォルトではスケジュール設定されていないため、手動でトリガーする必要があります。Cloud StoragePlugin(AirflowPlugin)
クラスを定義し、Airflow 1.10-stable ブランチからダウンロードしたフックとオペレーターをマッピングします。Composer に戻り、環境のステータスを確認します。
環境は作成済みなので、環境の名前をクリックすると詳細が表示されます。
[環境の詳細] ページでは、Airflow ウェブ UI の URL、Google Kubernetes Engine のクラスタ ID、DAG フォルダに接続されている Cloud Storage バケットの名前などの情報を確認できます。
次の手順は、Cloud Shell で完了する必要があります。
Python 仮想環境を使用して、パッケージのインストール先をシステムとは別の場所にします。
virtualenv
環境をインストールします。 -composer-advanced-YOURDAGSBUCKET-bucket
のような名前です。ラボ中にこの変数を何度か使用します。
Airflow 変数は Airflow 固有の概念であり、環境変数とは異なります。ここでは、デプロイする DAG で使用される 3 つの Airflow 変数(table_list_file_path
、gcs_source_bucket
、gcs_dest_bucket
)を設定します。
キー | 値 | 詳細 |
---|---|---|
table_list_file_path |
/home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv | ソーステーブルとターゲット テーブルがリストされた CSV ファイル(データセットを含む) |
gcs_source_bucket |
{一意の ID}-us | BigQuery の tabledest_bbucks をソースからエクスポートするために使用する Cloud Storage バケット |
gcs_dest_bucket |
{一意の ID}-eu | ターゲットで BigQuery テーブルをインポートするために使用する Cloud Storage バケット |
次の gcloud composer
コマンドでは、Airflow CLI のサブコマンド variables が実行されます。このサブコマンドは gcloud
コマンドライン ツールに引数を渡します。
3 つの変数を設定するために、上記の表のそれぞれのキーと値に対して 1 回ずつ composer コマンド
を実行します。コマンドは次のようになります。
ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable
)は無視しても問題ありません。これは、410.0.0 バージョンの gcloud の gcloud composer environments run
に関する既知の問題です。エラー メッセージが表示されても、変数は適切に設定されます。
ENVIRONMENT_NAME
には、環境の名前を指定します。LOCATION
には、環境がある Compute Engine のリージョンを指定します。gcloud composer コマンドを使用する場合は、--location
フラグを含めるか、gcloud コマンドを実行する前にデフォルトのロケーションを設定する必要があります。KEY
と VALUE
には、変数と設定値を指定します。左側の gcloud
コマンド(gcloud 関連の引数を含む)と、右側の Airflow サブコマンド関連の引数の間に、スペース、ダッシュ 2 つ、スペース( --
)を含めます。また、gcloud composer environments run
コマンドを variables サブコマンドと一緒に使用する場合は、KEY
と VALUE
の引数の間にスペースを含めます。gcs_source_bucket
と gcs_dest_bucket
をタスク 2 で作成したバケットの名前に置き換えて、これらのコマンドを Cloud Shell で実行します。
変数の値を表示するには、Airflow CLI サブコマンド variables に get
引数を指定して実行するか、Airflow UI を使用します。
たとえば、コマンドを実行する場合は次のようになります。
Cloud Composer が自動的に DAG を Airflow 環境に登録します。DAG の変更は 3~5 分以内に行われます。タスクのステータスを Airflow ウェブ インターフェースで確認し、設定どおり、DAG がスケジュール設定されていないことを確認できます。
Cloud コンソールを使用して Airflow ウェブ インターフェースにアクセスする手順は次のとおりです。
先ほど設定した変数は、環境内に保持されています。
[DAG] タブをクリックし、リンクの読み込みが終了するのを待ちます。
DAG を手動でトリガーするには composer_sample_bq_copy_across_locations
の再生ボタンをクリックします。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
DAG ファイルを Cloud Storage の DAGs フォルダにアップロードすると、Cloud Composer がそのファイルを解析します。エラーがない場合は、DAG の一覧にワークフローの名前が表示されます。ワークフローはキューに格納され、スケジュールの条件(設定に基づき、ここでは「None」)が満たされるとすぐに実行されます。
再生ボタンを押すと、実行ステータスが緑色になります。
グラフビューからワークフローをもう一度実行する手順は次のとおりです。
プロセスの実行中にブラウザを更新して、最新の情報を表示します。
Cloud コンソールのページに移動してワークフローのステータスと結果を確認します。
プログラムで米国から EU にテーブルをコピーできました。このラボは、こちらのブログ投稿(投稿者: David Sabater Dinter)の内容に基づくものです。
Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。
マニュアルの最終更新日: 2024 年 6 月 21 日
ラボの最終テスト日: 2024 年 6 月 21 日
Copyright 2025 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
One lab at a time
Confirm to end all existing labs and start this one