arrow_back

Dataflow によるサーバーレス データ処理 - Dataflow ジョブのモニタリング、ロギング、エラーレポート

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow によるサーバーレス データ処理 - Dataflow ジョブのモニタリング、ロギング、エラーレポート

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • アラート ポリシーを作成する
  • パイプラインの簡単なトラブルシューティングを行う
  • 診断と BQ のタブの仕組みを確認する
  • Error Reporting のページ詳細を確認する

前提条件

Python の基本的な知識。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニュー)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

タスク 1. 仮想環境を作成して依存関係をインストールする

  1. このラボでの作業用に仮想環境を作成します。
sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. 最後に、Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com

タスク 2. アラート ポリシーを作成する

このセクションでは、条件が満たされた場合にトリガーされるアラート ポリシーを作成します。

  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Monitoring」と入力して [検索] をクリックし、[Monitoring] をクリックします。

  2. 左側のナビゲーション パネルで [アラート] をクリックします。

  3. [アラート] ページで [+ Create policy] をクリックします。

  4. [指標を選択] プルダウンをクリックし、[Active] のチェックをオフにします。

    a. リソースと指標名のフィルタに「Dataflow Job」と入力し、[Dataflow Job] > [Job] をクリックします。[Failed] を選択して [適用] をクリックします。

    b. [ローリング ウィンドウ関数] を [Sum] に設定します。

    c. [NEXT] をクリックします。[しきい値] に「0」を設定します。

  5. [Next] ボタンをクリックして、[Configure notifications and finalize alert] の手順に進みます。

  6. 読み込まれたフォームで、[通知チャンネル] プルダウン メニューをクリックし、[Manage Notification Channels] をクリックします。

新しいウィンドウが開き、サポートされている通知チャンネルの種類が示されます。

  1. [Email] の行まで下方向にスクロールし、右端にある [ADD NEW] をクリックします。

    a. [Email Address] に個人メールアドレスを入力します。

    注: Qwiklabs 受講者のアカウントは一時的であるため、このアカウントのメールアドレスは利用できません。

    b. [Display Name] に「Qwiklabs Student」を入力します。

    c. [SAVE] をクリックします。このウィンドウを閉じ、前の [通知ポリシーの作成] ウィンドウに戻ります。

  2. [MANAGE NOTIFICATION CHANNELS] の左側にある更新アイコンをクリックします。

  3. [通知チャンネル] プルダウン メニューを再度クリックします。今追加した受講者アカウントの表示名があることを確認します。

  4. [Qwiklabs Student] の左側にあるチェックボックスをオンにして、[OK] をクリックします。

  5. [アラート ポリシー名] のテキスト ボックスに「Failed Dataflow Job」を入力します。

  6. もう一度 [NEXT] をクリックします。

  7. アラートを確認して [ポリシーを作成] をクリックします。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 アラート ポリシーを作成する

タスク 3. Dataflow でのパイプライン開始のエラー

  1. 以下のパイプライン コードを確認します。
import argparse import logging import argparse, logging, os import apache_beam as beam from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions class ReadGBK(beam.DoFn): def process(self, e): k, elems = e for v in elems: logging.info(f"the element is {v}") yield v def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( '--output', dest='output', help='Output file to write results to.') known_args, pipeline_args = parser.parse_known_args(argv) read_query = """( SELECT version, block_hash, block_number FROM `bugquery-public-data.crypto_bitcoin.transactions` WHERE version = 1 LIMIT 1000000 ) UNION ALL ( SELECT version, block_hash, block_number FROM `bigquery-public-data.crypto_bitcoin.transactions` WHERE version = 2 LIMIT 1000 ) ;""" p = beam.Pipeline(options=PipelineOptions(pipeline_args)) (p | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(query=read_query, use_standard_sql=True) | "Add Hotkey" >> beam.Map(lambda elem: (elem["version"], elem)) | "Groupby" >> beam.GroupByKey() | 'Print' >> beam.ParDo(ReadGBK()) | 'Sink' >> WriteToText(known_args.output)) result = p.run() if __name__ == '__main__': logger = logging.getLogger().setLevel(logging.INFO) run()
  1. Cloud Shellmy_pipeline.py という名前の新しいファイルを開きます。任意のテキスト エディタ(以下の例では Vim)を使用して、このファイルに上のコードをコピーして貼り付け、保存します。
vi my_pipeline.py
  1. ファイルにコードを貼り付けた後、必ず保存してください。

  2. 以下のコマンドを実行して、Storage バケットを作成します。

export PROJECT_ID=$(gcloud config get-value project) gcloud storage buckets create gs://$PROJECT_ID --location=US
  1. 以下のコマンドを実行して、パイプラインの起動を試みます(ただし、コマンドは失敗します)。
# パイプラインの起動を試みる python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.default_region | Region}}} \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DataflowRunner

パイプラインを起動する上のコマンドは失敗し、以下のスクリーンショットのようなスタック トレースが表示されます。

この失敗は Beam エンドで発生しています。パイプライン コードで WriteToText(known_args.output) が指定されていますが、--output フラグを指定しなかったため、Beam 検証に合格せず、Dataflow でパイプラインを起動できませんでした。Dataflow ジョブを起動できなかったため、この起動オペレーションでジョブ ID は関連付けられませんでした。つまり、受信トレイにアラートメールは届きません。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Dataflow でのパイプライン開始のエラー

タスク 4. 無効な BigQuery テーブル

このセクションでは、必要な --output フラグを指定して再度パイプラインを起動します。今度は Dataflow パイプラインの起動には成功しますが、無効な BigQuery テーブルが意図的に追加されているため、ジョブは数分後に失敗します。この失敗によりアラート ポリシーがトリガーされ、アラートメールが送信されます。

  1. ターミナルで以下のコマンドを実行して、パイプラインを起動します。
# パイプラインを起動する python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.default_region | Region}}} \ --output gs://$PROJECT_ID/results/prefix \ --tempLocation=gs://$PROJECT_ID/temp/ \ --max_num_workers=5 \ --runner=DataflowRunner
  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力して [検索] をクリックし、[Dataflow] をクリックします。

  2. [ジョブ] をクリックします。

  3. ページに一覧表示されている Dataflow ジョブをクリックします。

約 4 分待つと、ジョブの失敗が表示されます。この失敗でアラート ポリシーがトリガーされ、メールの受信トレイを見るとアラートメールが表示されています。

  1. Dataflow パイプラインのジョブグラフの下に、[ログ] パネルが表示されます。[表示] をクリックします。[ジョブのログ]、[ワーカーログ]、[診断]、[BigQuery のジョブ]、[データ サンプリング] というタブが表示されます。

  2. [ジョブのログ] タブをクリックして、ログエントリを確認します。

  3. 同様に、[ワーカーログ] タブをクリックしてログを確認します。

注: それぞれのログをクリックすると開くことができます。 繰り返されるログは、[診断] タブに表示される場合があります。
  1. [診断] タブをクリックすると、クリック可能なリンクが表示されます。

  2. リンクをクリックすると、Error Reporting のページが表示されます。ここにはエラーの詳細が表示されています。スタック トレースを調べると、パイプライン コードのタイポが問題だとわかりました。

コードでは「bigquery」が、意図的に間違えて「bugquery」と入力されているため、Dataflow ジョブは失敗し、アラート ポリシーがトリガーされてメールが送信されます。次のセクションでは、コードを修正して、パイプラインを再起動します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 無効な BigQuery テーブル

タスク 5. 多すぎるロギング

  1. テキスト エディタ(Vim などの任意のエディタ)を使用して my_pipeline.py ファイルを開き、bugquerybigquery に置き換えてコードを修正します。

    注: スペルミスの bugquery はコード内の SELECT ステートメントの中の bugquery-public-data.crypto_bitcoin.transactions にあります。これを bigquery-public-data.crypto_bitcoin.transactions に変更します。
  2. ターミナルで以下のコマンドを実行して、パイプラインを起動します。

# パイプラインを起動する python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.default_region | Region}}} \ --output gs://$PROJECT_ID/results/prefix \ --tempLocation=gs://$PROJECT_ID/temp/ \ --max_num_workers=5 \ --runner=DataflowRunner

パイプラインが完了するまでに約 7 分かかります。

  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力して [検索] をクリックし、[Dataflow] をクリックします。

  2. [ジョブ] をクリックします。この新しいジョブをクリックすると、ジョブグラフのページの右端にある [ジョブ ステータス] に「完了しました」と表示されます。ジョブが完了していない場合は、正常に完了するまで待ちます。

  3. 下部にある [ログ] を開き、[ワーカーログ] タブを確認します。

[ワーカーログ] タブには、以下のスクリーンショットのように、形式「the element is {'version' : ....... }」のログエントリが表示されます。

パイプライン コードの以下の行により、これらのエントリがログに記録されます。

logging.info(f"the element is {v}")
  1. [診断] タブをクリックすると、ログのスロットリングについてのメッセージが表示されます。

  1. そのメッセージをクリックして、[Error Reporting] ページに移動します。ジョブの分析情報として過剰なロギングが指摘され、問題点を示す公開ドキュメントへのリンクが表示されています。

この問題を簡単に解決するには、パイプライン コードからロギングの行を削除し、Dataflow パイプラインを再実行します。完了後は、[診断] タブに同様のメッセージは表示されません。

注: このラボを完了するためにパイプラインを再実行する必要はありません。
  1. ブラウザの戻るボタンをクリックして、Dataflow の [ジョブ] ページに戻ります。

  2. 開いたログパネルで [BigQuery のジョブ] タブをクリックします。[データのロケーション] プルダウン メニューで [us(米国の複数のリージョン)] を選択して、[BigQuery ジョブを読み込む] をクリックします。[確認] をクリックします。

パイプラインの一部として 2 つのジョブがリストされています。

  1. 最初のジョブの右端にある [コマンドライン] をクリックし、表示されたポップアップで [Cloud Shell で実行] をクリックします。

これによりコマンドが Cloud Shell に貼り付けられるので、そのコマンドを実行してください。

  1. BigQuery の読み取りと書き込みに使用されたファイルの場所、サイズ、ジョブにかかった時間が出力されます。BigQuery ジョブが失敗した場合は、失敗の原因を示す場所が出力されます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 多すぎるロギング

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2025 Google LLC All rights reserved. Google および Google のロゴは、Google LLC の商標です。その他すべての社名および製品名は、それぞれ該当する企業の商標である可能性があります。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。