arrow_back

Cloud Composer: 別のロケーションに BigQuery テーブルをコピーする

ログイン 参加
700 以上のラボとコースにアクセス

Cloud Composer: 別のロケーションに BigQuery テーブルをコピーする

ラボ 1時間 15分 universal_currency_alt クレジット: 5 show_chart 中級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

GSP283

概要

世界中のさまざまなロケーションにデータセットがあり、データが Google Cloud Storage バケットまたは BigQuery テーブルに存在するとします。そのデータを整理、統合して分析し、ビジネスに関する分析情報を得るには、どうすればよいでしょうか。

Cloud Composer を使用すると、直感的でグラフィカルなビューで、ワークフローの構築や、リージョンまたはストレージ システム間でのデータ移動を行うことができます。そのほかにも、BigQuery と Cloud Storage の間で相互にデータを簡単かつ確実に転送できるテンプレートが用意されています。

このラボでは、次のタスクを行う Apache Airflow ワークフローを Cloud Composer で作成して実行します。

  • コピーするテーブルのリストを指定する構成ファイルから読み込む
  • 米国にある BigQuery データセットからテーブルのリストを Cloud Storage にエクスポートする
  • 米国からエクスポートしたテーブルを EU にある Cloud Storage バケットにコピーする
  • テーブルのリストを EU にある BigQuery データセットにインポートする

演習内容

このラボでは、次の方法について学びます。

  • Cloud Composer 環境を作成する
  • Cloud Storage バケットを作成する
  • BigQuery データセットを作成する
  • Cloud Composer で Apache Airflow ワークフローを作成して実行し、Cloud Storage と BigQuery の間でデータを移動する

設定と要件

[ラボを開始] ボタンをクリックする前に

こちらの説明をお読みください。ラボには時間制限があり、一時停止することはできません。タイマーは、Google Cloud のリソースを利用できる時間を示しており、[ラボを開始] をクリックするとスタートします。

このハンズオンラボでは、シミュレーションやデモ環境ではなく実際のクラウド環境を使って、ラボのアクティビティを行います。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
注: このラボの実行には、シークレット モード(推奨)またはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウント間の競合を防ぎ、個人アカウントに追加料金が発生しないようにすることができます。
  • ラボを完了するための時間(開始後は一時停止できません)
注: このラボでは、受講者アカウントのみを使用してください。別の Google Cloud アカウントを使用すると、そのアカウントに料金が発生する可能性があります。

ラボを開始して Google Cloud コンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるダイアログでお支払い方法を選択してください。 左側の [ラボの詳細] ペインには、以下が表示されます。

    • [Google Cloud コンソールを開く] ボタン
    • 残り時間
    • このラボで使用する必要がある一時的な認証情報
    • このラボを行うために必要なその他の情報(ある場合)
  2. [Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウで開く] を選択します)。

    ラボでリソースがスピンアップし、別のタブで [ログイン] ページが表示されます。

    ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。

    注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
  3. 必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。

    {{{user_0.username | "Username"}}}

    [ラボの詳細] ペインでもユーザー名を確認できます。

  4. [次へ] をクリックします。

  5. 以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。

    {{{user_0.password | "Password"}}}

    [ラボの詳細] ペインでもパスワードを確認できます。

  6. [次へ] をクリックします。

    重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
  7. その後次のように進みます。

    • 利用規約に同意してください。
    • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
    • 無料トライアルには登録しないでください。

その後、このタブで Google Cloud コンソールが開きます。

注: Google Cloud のプロダクトやサービスにアクセスするには、ナビゲーション メニューをクリックするか、[検索] フィールドにサービス名またはプロダクト名を入力します。

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン をクリックします。

  2. ウィンドウで次の操作を行います。

    • Cloud Shell 情報ウィンドウで操作を進めます。
    • Cloud Shell が認証情報を使用して Google Cloud API を呼び出すことを承認します。

接続した時点で認証が完了しており、プロジェクトに各自の Project_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  1. (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
  1. [承認] をクリックします。

出力:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} 注: Google Cloud における gcloud ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。

タスク 1. Cloud Composer 環境を作成する

  1. Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、[プロダクトとページ] セクションの [Composer] をクリックして、Cloud Composer 環境を作成します。

  2. 次に、[環境の作成] をクリックします。

  3. プルダウン メニューで [Composer 3] を選択します。

  4. 環境に次のパラメータを設定します。

  • 名前: composer-advanced-lab

  • ロケーション:

  • イメージのバージョン: composer-3-airflow-n.n.n-build.n(利用可能なイメージの中で最も大きい番号のイメージを選択)

  • [環境リソース] で [Small] を選択します。

  • [詳細設定を表示] のプルダウンをクリックし、[Airflow データベース ゾーン] で を選択します。

その他の設定はすべてデフォルトのままにします。

  1. [作成] をクリックします。

Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されると、環境作成プロセスが完了したことになります。

注: 環境設定プロセスが完了するまでに最長で 20 分ほどかかる場合があります。次の「Cloud Storage バケットを作成する」、「ターゲットの BigQuery データセットを作成する」セクションに進みます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Composer 環境を作成する

タスク 2. Cloud Storage バケットを作成する

このタスクでは、2 つの Cloud Storage マルチリージョン バケットを作成します。これらのバケットは、エクスポートしたテーブルをロケーション間(米国から EU)でコピーするために使用されます。

米国にバケットを作成する

  1. [Cloud Storage] > [バケット] に移動して、[作成] をクリックします。
  2. プロジェクト ID を含めたユニバーサルに一意な名前をバケットに付けます(例: -us)。
  3. [ロケーション タイプ] で [us(米国の複数のリージョン)] を選択します。
  4. 他の値はデフォルトのままにして [作成] をクリックします。
  5. [このバケットに対する公開アクセス禁止を適用する] チェックボックスをオンにし、[公開アクセスの防止] ポップアップが表示されたら [確定] をクリックします。

EU にバケットを作成する

この手順を繰り返して、EU リージョンに別のバケットを作成します。ユニバーサルに一意な名前には、バケットの接尾辞としてロケーションを含める必要があります(例: -eu)。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

2 つの Cloud Storage バケットを作成する

タスク 3. BigQuery の宛先データセットを作成する

  1. BigQuery の新しいウェブ UI から、ターゲットとなるBigQuery データセットを EU に作成します。

  2. ナビゲーション メニュー > [BigQuery] に移動します。

[Cloud コンソールの BigQuery へようこそ] メッセージ ボックスが開きます。このメッセージ ボックスにはクイックスタート ガイドへのリンクと、UI の更新情報が表示されます。

  1. [完了] をクリックします。

  2. Qwiklabs のプロジェクト ID の横にあるその他アイコンをクリックし、[データセットを作成] を選択します。

  1. データセット ID nyc_tlc_EU を使用して、ロケーション タイプとして [マルチリージョン] を選択し、プルダウン メニューから [EU] を選択します。

  1. [データセットを作成] をクリックします。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

データセットを作成する

タスク 4. Airflow と基本コンセプト、簡単な説明

  • 環境の構築中に、このラボで使用するサンプル ファイルを確認します。

Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。

Airflow を使用し、タスクの有向非巡回グラフ(DAG)としてワークフローを作成します。Airflow スケジューラは、指定された依存関係に従いつつ、ワーカーの配列でタスクを実行します。

基本コンセプト

DAG - 有向非巡回グラフ(DAG)とはタスクの集まりで、タスク同士の関係や依存状態を反映するように編成されます。

オペレーター - 単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドを実行する際に使用されます。

タスク - オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。

タスク インスタンス - タスクの特定の実行です。DAG、タスク、特定の時点として表され、実行中、成功、失敗、スキップなどのステータスを示します。

コンセプトに関するドキュメントから、Airflow のコンセプトの詳細をご覧いただけます。

タスク 5. ワークフローを定義する

Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。bq_copy_across_locations.py はワークフローのコードで、DAG とも呼ばれます。このファイルを開き、どのようにビルドされるかを確認しましょう。次に、ファイルの主な要素の詳細を説明します。

すべてのワークフロー タスクをオーケストレートするために、DAG は次のオペレーターをインポートします。

  1. DummyOperator: 開始と終了のダミータスクを作成し、DAG をよりわかりやすく視覚的に表示します。
  2. BigQueryToCloudStorageOperator: BigQuery テーブルを Avro 形式で Cloud Storage バケットにエクスポートします。
  3. GoogleCloudStorageToGoogleCloudStorageOperator: Cloud Storage バケット間でファイルをコピーします。
  4. GoogleCloudStorageToBigQueryOperator: Cloud Storage バケット内の Avro ファイルからテーブルをインポートします。
  • この例では、read_table_list() 関数は、構成ファイルを読み込み、コピーするテーブルのリストを作成するように定義されています。
# -------------------------------------------------------------------------------- # Functions # -------------------------------------------------------------------------------- def read_table_list(table_list_file): """ Reads the master CSV file that will help in creating Airflow tasks in the DAG dynamically. :param table_list_file: (String) The file location of the master file, e.g. '/home/airflow/framework/master.csv' :return master_record_all: (List) List of Python dictionaries containing the information for a single row in master CSV file. """ master_record_all = [] logger.info('Reading table_list_file from : %s' % str(table_list_file)) try: with open(table_list_file, 'rb') as csv_file: csv_reader = csv.reader(csv_file) next(csv_reader) # skip the headers for row in csv_reader: logger.info(row) master_record = { 'table_source': row[0], 'table_dest': row[1] } master_record_all.append(master_record) return master_record_all except IOError as e: logger.error('Error opening table_list_file %s: ' % str( table_list_file), e)
  • DAG の名前は bq_copy_us_to_eu_01 です。DAG はデフォルトではスケジュール設定されていないため、手動でトリガーする必要があります。
default_args = { 'owner': 'airflow', 'start_date': datetime.today(), 'depends_on_past': False, 'email': [''], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } # DAG object. with models.DAG('bq_copy_us_to_eu_01', default_args=default_args, schedule_interval=None) as dag:
  • Cloud Storage プラグインを定義するには、Cloud StoragePlugin(AirflowPlugin) クラスを定義し、Airflow 1.10-stable ブランチからダウンロードしたフックとオペレーターをマッピングします。
# Import operator from plugins from gcs_plugin.operators import gcs_to_gcs

タスク 6. 環境情報を表示する

  1. Composer に戻り、環境のステータスを確認します。

  2. 環境は作成済みなので、環境の名前をクリックすると詳細が表示されます。

[環境の詳細] ページでは、Airflow ウェブ UI の URL、Google Kubernetes Engine のクラスタ ID、DAG フォルダに接続されている Cloud Storage バケットの名前などの情報を確認できます。

注: Cloud Composer は、Cloud Storage を使用して Apache Airflow DAG(ワークフロー)を保存します。各環境には、関連する Cloud Storage バケットがあります。Cloud Composer では、Cloud Storage バケット内の DAG のみがスケジュールされます。

次の手順は、Cloud Shell で完了する必要があります。

仮想環境を作成する

Python 仮想環境を使用して、パッケージのインストール先をシステムとは別の場所にします。

  1. virtualenv 環境をインストールします。
sudo apt-get install -y virtualenv
  1. 仮想環境をビルドします。
python3 -m venv venv
  1. 仮想環境をアクティブにします。
source venv/bin/activate

タスク 7. DAG Cloud Storage バケットの変数を作成する

  • Cloud Shell で以下を実行して [環境の詳細] ページの DAG バケット名をコピーし、Cloud Shell 内で参照できるように変数を設定します。
注: 次のコマンドの DAG バケット名を必ず置き換えてください。ナビゲーション メニュー > [Cloud Storage] に移動します。-composer-advanced-YOURDAGSBUCKET-bucket のような名前です。 DAGS_BUCKET=<DAG のバケット名>

ラボ中にこの変数を何度か使用します。

タスク 8. Airflow 変数を設定する

Airflow 変数は Airflow 固有の概念であり、環境変数とは異なります。ここでは、デプロイする DAG で使用される 3 つの Airflow 変数table_list_file_pathgcs_source_bucketgcs_dest_bucket)を設定します。

キー 詳細
table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv ソーステーブルとターゲット テーブルがリストされた CSV ファイル(データセットを含む)
gcs_source_bucket {一意の ID}-us BigQuery の tabledest_bbucks をソースからエクスポートするために使用する Cloud Storage バケット
gcs_dest_bucket {一意の ID}-eu ターゲットで BigQuery テーブルをインポートするために使用する Cloud Storage バケット

次の gcloud composer コマンドでは、Airflow CLI のサブコマンド variables が実行されます。このサブコマンドは gcloud コマンドライン ツールに引数を渡します。

3 つの変数を設定するために、上記の表のそれぞれのキーと値に対して 1 回ずつ composer コマンドを実行します。コマンドは次のようになります。

gcloud composer environments run ENVIRONMENT_NAME \ --location LOCATION variables -- \ set KEY VALUE gcloud エラー(ERROR: gcloud crashed (TypeError): 'NoneType' object is not callable)は無視しても問題ありません。これは、410.0.0 バージョンの gcloud の gcloud composer environments run に関する既知の問題です。エラー メッセージが表示されても、変数は適切に設定されます。
  • ENVIRONMENT_NAME には、環境の名前を指定します。
  • LOCATION には、環境がある Compute Engine のリージョンを指定します。gcloud composer コマンドを使用する場合は、--location フラグを含めるか、gcloud コマンドを実行する前にデフォルトのロケーションを設定する必要があります。
  • KEYVALUE には、変数と設定値を指定します。左側の gcloud コマンド(gcloud 関連の引数を含む)と、右側の Airflow サブコマンド関連の引数の間に、スペース、ダッシュ 2 つ、スペース( -- )を含めます。また、gcloud composer environments run コマンドを variables サブコマンドと一緒に使用する場合は、KEYVALUE の引数の間にスペースを含めます。

gcs_source_bucketgcs_dest_bucket をタスク 2 で作成したバケットの名前に置き換えて、これらのコマンドを Cloud Shell で実行します。

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set table_list_file_path /home/airflow/gcs/dags/bq_copy_eu_to_us_sample.csv gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_source_bucket {UNIQUE ID}-us gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ set gcs_dest_bucket {UNIQUE_ID}-eu

変数の値を表示するには、Airflow CLI サブコマンド variablesget 引数を指定して実行するか、Airflow UI を使用します。

たとえば、コマンドを実行する場合は次のようになります。

gcloud composer environments run composer-advanced-lab \ --location {{{ project_0.default_region | "REGION" }}} variables -- \ get gcs_source_bucket : DAG で使用される 3 つの Airflow 変数をすべて設定してください。

タスク 9. DAG と依存関係を Cloud Storage にアップロードする

  1. Google Cloud Python ドキュメント サンプル ファイルを Cloud Shell にコピーします。
cd ~ gcloud storage cp -r gs://spls/gsp283/python-docs-samples .
  1. サードパーティのフックとオペレーターのコピーを、Composer の DAG Cloud Storage バケットの plugins フォルダにアップロードします。
gcloud storage cp -r python-docs-samples/third_party/apache-airflow/plugins/* gs://$DAGS_BUCKET/plugins
  1. 次に、DAG と構成ファイルを、環境内の DAG Cloud Storage バケットにアップロードします。
gcloud storage cp python-docs-samples/composer/workflows/bq_copy_across_locations.py gs://$DAGS_BUCKET/dags gcloud storage cp python-docs-samples/composer/workflows/bq_copy_eu_to_us_sample.csv gs://$DAGS_BUCKET/dags

Cloud Composer が自動的に DAG を Airflow 環境に登録します。DAG の変更は 3~5 分以内に行われます。タスクのステータスを Airflow ウェブ インターフェースで確認し、設定どおり、DAG がスケジュール設定されていないことを確認できます。

タスク 10. Airflow UI を探索する

Cloud コンソールを使用して Airflow ウェブ インターフェースにアクセスする手順は次のとおりです。

  1. Composer の [環境] ページに戻ります。
  2. 対象環境の [Airflow ウェブサーバー] 列で、[Airflow] リンクをクリックします。

  1. ラボの認証情報をクリックします。
  2. 新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。この段階でデータはまだ読み込み中です。その間にラボを進めることができます。

変数を表示する

先ほど設定した変数は、環境内に保持されています。

  • Airflow のメニューバーで [管理] > [変数] を選択すると、変数を確認できます。

DAG をトリガーして手動で実行する

  1. [DAG] タブをクリックし、リンクの読み込みが終了するのを待ちます。

  2. DAG を手動でトリガーするには composer_sample_bq_copy_across_locations の再生ボタンをクリックします。

  1. [DAG をトリガー] をクリックして、この操作を確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG と依存関係を Cloud Storage にアップロードする

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の DAGs フォルダにアップロードすると、Cloud Composer がそのファイルを解析します。エラーがない場合は、DAG の一覧にワークフローの名前が表示されます。ワークフローはキューに格納され、スケジュールの条件(設定に基づき、ここでは「None」)が満たされるとすぐに実行されます。

再生ボタンを押すと、実行ステータスが緑色になります。

  1. DAG の名前をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されます。

  1. ツールバーで [グラフ] をクリックし、各タスクに該当するグラフィックにカーソルを合わせてステータスを表示します。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

グラフビューからワークフローをもう一度実行する手順は次のとおりです。

  1. Airflow UI のグラフビューで、開始のグラフィックをクリックします。
  2. [消去] をクリックしてすべてのタスクをリセットし、[OK] をクリックして確定します。

プロセスの実行中にブラウザを更新して、最新の情報を表示します。

タスク 11. 結果を確認する

Cloud コンソールのページに移動してワークフローのステータスと結果を確認します。

  • エクスポートされたテーブルは米国のバケットからヨーロッパの Cloud Storage バケットにコピーされました。[Cloud Storage] をクリックして、ソース(米国)とターゲット(ヨーロッパ)のバケットで Avro の中間ファイルを確認します。
  • テーブルのリストが BigQuery データセットにインポートされました。[BigQuery] をクリックし、プロジェクト名と nyc_tlc_EU データセットをクリックして、作成したデータセットからテーブルにアクセスできることを確認します。

Cloud Composer 環境を削除します。

  1. Cloud Composer の [環境] ページに戻ります。

  2. Cloud Composer 環境を選択します。

  3. [削除] をクリックします。

  4. 表示されたダイアログで、[削除] をクリックして、Cloud Composer 環境の削除を続行します。

お疲れさまでした

プログラムで米国から EU にテーブルをコピーできました。このラボは、こちらのブログ投稿(投稿者: David Sabater Dinter)の内容に基づくものです。

次のステップ

Google Cloud トレーニングと認定資格

Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。

マニュアルの最終更新日: 2024 年 6 月 21 日

ラボの最終テスト日: 2024 年 6 月 21 日

Copyright 2025 Google LLC. All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。