arrow_back

Serverless Data Processing with Dataflow - バッチ分析に Dataflow SQL を使用する(Python)

ログイン 参加
700 以上のラボとコースにアクセス

Serverless Data Processing with Dataflow - バッチ分析に Dataflow SQL を使用する(Python)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • SQL を使用してサイト トラフィックをユーザー別に集計するパイプラインを作成する。
  • SQL を使用してサイト トラフィックを分単位で集計するパイプラインを作成する。

設定と要件

[ラボを開始] ボタンをクリックする前に

注: 以下の説明をお読みください。

ラボの時間は計測されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

この Qwiklabs ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

必要なもの

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
  • ラボを完了するために十分な時間
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。 注: Pixelbook を使用している場合は、このラボをシークレット ウィンドウで実施してください。

ラボを開始してコンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側のパネルには、このラボで使用する必要がある一時的な認証情報が表示されます。

  2. ユーザー名をコピーし、[Google Console を開く] をクリックします。 ラボでリソースが起動し、別のタブで [アカウントの選択] ページが表示されます。

    注: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
  3. [アカウントの選択] ページで [別のアカウントを使用] をクリックします。[ログイン] ページが開きます。

  4. [接続の詳細] パネルでコピーしたユーザー名を貼り付けます。パスワードもコピーして貼り付けます。

注: 認証情報は [接続の詳細] パネルに表示されたものを使用してください。Google Cloud Skills Boost の認証情報は使用しないでください。請求が発生する事態を避けるため、Google Cloud アカウントをお持ちの場合でも、このラボでは使用しないでください。
  1. その後次のように進みます。
  • 利用規約に同意してください。
  • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
  • 無料トライアルには登録しないでください。

しばらくすると、このタブで Cloud コンソールが開きます。

注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。

Jupyter ノートブック ベースの開発環境の設定

このラボでは、すべてのコマンドをノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [Workbench] をクリックします。

  2. [Notebooks API を有効にする] をクリックします。

  3. [Workbench] ページで [ユーザー管理のノートブック] を選択し、[新規作成] をクリックします。

  4. 表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを に、ゾーンを に設定します。

  5. [環境] で [Apache Beam] を選択します。

  6. ダイアログ ボックスの下部にある [作成] をクリックします。

注: 環境の完全なプロビジョニングには 3~5 分かかる場合があります。処理が完了するまでお待ちください。 注: [Notebook API を有効にする] をクリックして Notebooks API を有効にします。
  1. 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。

  1. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。ファイルが開き、コードを追加または変更できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する

パート 1: SQL を使用してサイト トラフィックをユーザー別に集計する

ラボのこのパートでは、以下を実行するように以前の BatchUserTraffic パイプラインを書き換えます。

  • Cloud Storage 内のファイルからその日のトラフィックを読み取る。
  • 各イベントを CommonLog オブジェクトに変換する。
  • Java 変換ではなく SQL を使用して、一意のユーザー ID ごとにヒット数を合計し、追加の集計を実行する。
  • 結果のデータを BigQuery に書き込む。
  • 元データを後で分析できるように、追加のブランチで BigQuery に書き込む。

タスク 1. 合成データを生成する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

適切なラボを開く

  • IDE のターミナルで、このラボで使用するディレクトリに変更します。
# ディレクトリをラボに変更する cd 4_SQL_Batch_Analytics/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. ターミナルで次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get update && sudo apt-get install -y python3-venv # 仮想環境を作成して有効化します python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API と Data Catalog API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com gcloud services enable datacatalog.googleapis.com

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習バージョンのコードを含むディレクトリに移動する cd $BASE_DIR

このスクリプトにより、次のような行を含む events.json というファイルを作成します。

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

次に、このファイルは自動的に にある Google Cloud Storage バケットにコピーされます。

  • Google Cloud Storage に移動し、ストレージ バケットに events.json というファイルが含まれていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

タスク 2. SQL 依存関係を追加する

  1. ファイル エクスプローラで、training-data-analyst/quests/dataflow_python/4_SQL_Batch_Analytics/lab/ に移動し、batch_user_traffic_SQL_pipeline.py ファイルを開きます。

このパイプラインには、入力パスと 1 つの出力テーブル名のコマンドライン オプションを受け入れるために必要なコードと、Google Cloud Storage からイベントを読み取り、それらのイベントを解析して、結果を BigQuery に書き込むコードがすでに含まれています。ただし、いくつかの重要な部分が欠けています。

以前のラボと同様に、パイプラインの次のステップでは、一意の user_id 別にイベントを集計し、それぞれのページビューをカウントします。ただし、今回は、Python ベースの変換ではなく SqlTransform を使用して、SQL で集計を実行します。

  1. batch_user_traffic_SQL_pipeline.py に、次の import ステートメントを追加します。
from apache_beam.transforms.sql import SqlTransform
  1. 次に、query 変数定義のファイルに以下の SQL クエリを追加します。
SELECT user_id, COUNT(*) AS page_views, SUM(num_bytes) as total_bytes, MAX(num_bytes) AS max_bytes, MIN(num_bytes) as min_bytes FROM PCOLLECTION GROUP BY user_id
  1. 次に、以下の #TODO を記入します。次のコードを使用して、元データを BigQuery に書き込む Transform を記述します。
logs | 'WriteRawToBQ' >> beam.io.WriteToBigQuery( raw_table_name, schema=raw_table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

Beam SQL は、Apache Calcite 言語(デフォルト)ZetaSQL 言語の両方で実装できます。どちらも Dataflow によって実行されますが、この例では ZetaSQL を実装します。BigQuery で使用される言語に似ており、「Dataflow SQL」(Dataflow UI で直接作成される SQL クエリ)に実装されている言語でもあるからです。

  1. 最後の #TODO を記入します。次のコードを使用し、ZetaSQL 言語を使用して SQLTransform を適用します。
SqlTransform(query, dialect='zetasql')
  1. ファイルに加えた変更を保存します。

タスク 3. パイプラインを実行する

  1. ターミナルに戻り、次のコードを実行してパイプラインを実行します。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.user_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw python3 batch_user_traffic_SQL_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --experiments=use_runner_v2 \ --input_path=${INPUT_PATH} \ --agg_table_name=${AGGREGATE_TABLE_NAME} \ --raw_table_name=${RAW_TABLE_NAME}
  1. [ナビゲーション メニュー] > [Dataflow] に移動して、パイプラインのステータスを確認します。

  2. パイプラインが終了したら、BigQuery UI に移動して、結果として得られる 2 つのテーブルに対してクエリを実行します。

  3. logs.raw が存在し、データが入力されていることを確認してください。ラボの後半で必要になります。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 SQL を使用してサイト トラフィックをユーザー別に集計する

パート 2: SQL を使用してサイト トラフィックを分単位で集計する

ラボのこのパートでは、以下を実行するように以前の BatchMinuteTraffic パイプラインを書き換えます。

  • Cloud Storage 内のファイルからその日のトラフィックを読み取る。
  • 各イベントを CommonLog オブジェクトに変換し、そのオブジェクトに Joda Timestamp 属性を追加する。
  • Java 変換ではなく SQL を使用して、1 分あたりの合計ヒット数を再度ウィンドウ関数で合計する。
  • 結果のデータを BigQuery に書き込む。

タスク 1. CommonLog 行にタイムスタンプ フィールドを追加する

このタスクでは、Joda タイムスタンプ フィールドを CommonLog オブジェクトに追加し、それを暗黙的に汎用 Row オブジェクトに変換します。

適切なインポートとオプションは、ZetaSQL を使用するようにすでに設定されています。パイプラインは、データの取り込みと書き込みのステップで構築されていますが、データの変換や集計は行われません。IDE のファイル エクスプローラで training-data-analyst/quests/dataflow_python/4_SQL_Batch_Analytics/lab/ に移動し、atch_minute_user_SQL_pipeline.py ファイルを開きます。

  1. 最初の #TODO には、形式設定されたタイムスタンプを文字列として追加します。
ts = datetime.strptime(element.ts[:-8], "%Y-%m-%dT%H:%M:%S") ts = datetime.strftime(ts, "%Y-%m-%d %H:%M:%S")

Python SDK を使用する場合、現時点では datetime 型のオブジェクトを SqlTransform に直接渡すことはできません。代わりに、strftime を使用してオブジェクトを文字列に変換し、SQL で TIMESTAMP 関数を使用します。

  1. 次に、以下の SQL クエリを追加します。
SELECT COUNT(*) AS page_views, STRING(window_start) AS start_time FROM TUMBLE( (SELECT TIMESTAMP(ts) AS ts FROM PCOLLECTION), DESCRIPTOR(ts), 'INTERVAL 1 MINUTE') GROUP BY window_start 注: この SQL クエリでは、ts フィールドを TIMESTAMP 型に変換し、それを 1 分間の固定時間枠のイベント タイムスタンプとして使用します。window_start フィールドは TUMBLE によって生成され、同様に TIMESTAMP 型です。Python SDK で先述した問題があるため、結果の PCollection を Python 変換に送り返す前に、このフィールドを STRING に変換する必要があります。
  1. 次の #TODO では、以下のコードを使用し、ZetaSQL 言語を使用して SQLTransform を適用します。
SqlTransform(query, dialect='zetasql')

タスク 2. パイプラインを実行する

  1. ターミナルに戻り、次のコードを実行してパイプラインを実行します。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_SQL_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --tableName=${TABLE_NAME} \ --experiments=use_runner_v2
  1. Cloud コンソールで、[ナビゲーション メニュー] > [Dataflow] に移動してパイプラインのステータスを確認します。

  2. パイプラインが終了したら、BigQuery UI にアクセスして、結果として得られる logs.minute_traffic テーブルに対してクエリを実行します。または、ターミナルからクエリを実行することもできます。

bq head -n 10 $PROJECT_ID:logs.minute_traffic

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 SQL を使用してサイト トラフィックを分単位で集計する

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。