チェックポイント
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Setup the data environment
/ 15
Run your pipeline from the command line
/ 10
Dataflow によるサーバーレス データ処理 - 分岐するパイプライン(Python)
概要
このラボでは、次の作業を行います。
- ブランチがあるパイプラインを実装する
- 書き込む前にデータをフィルタする
- カスタム コマンドライン パラメータをパイプラインに追加する
前提条件:
- Python に関する基本的な知識
前のラボでは、基本的な抽出、変換、読み込みの連続的なパイプラインを作成し、対応する Dataflow テンプレートを使用して Google Cloud Storage のバッチ データ ストレージを取り込みました。このパイプラインは、以下に示す変換のシーケンスで構成されています。
しかし多くの場合、パイプラインの構造はこのように単純ではありません。このラボでは、より高度な連続的でないパイプラインを構築します。
今回のユースケースではリソース消費量を最適化します。プロダクトによってリソースの利用状況は異なります。また、一つの企業内でもすべてのデータが同じように使われるわけではなく、たとえば分析ワークロードで定期的にクエリされるデータもあれば、復元にのみ使用されるデータもあります。このラボでは、最初のラボで作成したパイプラインのリソース消費量を最適化するために、アナリストが使用するデータのみを BigQuery に保存し、他のデータは非常に低コストで耐久性の高いストレージ サービスである Google Cloud Storage の Coldline Storage にアーカイブします。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例:
1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。 -
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。 -
利用規約に同意し、再設定用のリソースページをスキップします。
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
-
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
-
Compute Engine のデフォルトのサービス アカウント
{project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。- Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
- プロジェクト番号(例:
729328892908
)をコピーします。 - ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
- ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
- [新しいプリンシパル] に次のように入力します。
-
{project-number}
はプロジェクト番号に置き換えてください。 - [ロール] で、[Project](または [基本])> [編集者] を選択します。
- [保存] をクリックします。
Jupyter ノートブック ベースの開発環境の設定
このラボでは、すべてのコマンドをノートブックのターミナルで実行します。
-
Google Cloud コンソールのナビゲーション メニューで、[Vertex AI] > [ワークベンチ] をクリックします。
-
Notebooks API を有効にします。
-
ワークベンチのページで [新規作成] をクリックします。
-
表示された [新しいインスタンス] ダイアログ ボックスで、リージョンを
に、ゾーンを に設定します。 -
[環境] で [Apache Beam] を選択します。
-
ダイアログ ボックスの下部にある [作成] をクリックします。
- 環境の準備が完了したら、ノートブック名の横にある [JupyterLab を開く] をクリックします。これにより、使用環境がブラウザの新しいタブで開きます。
- 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。
コード リポジトリをダウンロードする
このラボで使用するコード リポジトリをダウンロードします。
- 開いたターミナルで、次のコマンドを入力します。
-
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
-
クローン リポジトリ
/training-data-analyst/quests/dataflow_python/
に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納されるlab
サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できるsolution
サブフォルダとに分けられています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
複数の変換が同じ PCollection を処理する
このラボでは、Google Cloud Storage と BigQuery の両方にデータを書き込む、分岐するパイプラインを作成します。
分岐するパイプラインを作成する方法の一つは、2 つの異なる変換を同じ PCollection に適用することにより、2 つの異なる PCollection を作成することです。
分岐するパイプラインを実装する
このセクションや後のセクションでヒントが必要な場合は、Google Cloud training-data-analyst ページにあるソリューションを参照してください。
タスク 1. Cloud Storage に書き込むブランチを追加する
このタスクを完了するには、Cloud Storage に書き込むブランチを追加し、既存のパイプラインを変更します。
適切なラボを開く
- IDE 環境のターミナルで、次のコマンドを実行します。
仮想環境と依存関係を設定する
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
- IDE 環境のターミナルで、次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
- 次に、パイプラインを実行するために必要なパッケージをインストールします。
- 最後に、Dataflow API が有効になっていることを確認します。
データ環境を設定する
-
IDE で
my_pipeline.py
を開きます。これはtraining-data-analyst/quests/dataflow_python/2_Branching_Pipelines/labs/
にあります。 -
パイプラインの本体部分を定義している run() メソッドまで下にスクロールします。現在は次のような内容です。
- 各要素が
json
からdict
に変換される前に、textio.WriteToText
を使用して Cloud Storage への書き込みを行う新しい分岐変換を追加して、このコードを変換します。
このセクションや後のセクションでヒントが必要な場合は、Google Cloud training-data-analyst ページにあるソリューションを参照してください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. フィールドでデータをフィルタする
この時点では、すべてのデータが 2 回保存されるため、新しいパイプラインでもリソース消費量はほとんど減りません。リソース消費量を改善するには、重複するデータの量を減らす必要があります。
Google Cloud Storage バケットは、アーカイブおよびバックアップ ストレージとして機能することを目的としているので、すべてのデータを保存することが重要になります。その一方で、BigQuery には必ずしもすべてのデータを送る必要はありません。
- たとえば、データ アナリストが頻繁に確認する対象が、ウェブサイトでユーザーがどのリソースにアクセスしているか、そしてそのアクセス パターンが地域と時間によってどのように異なるかであると仮定します。この場合、必要なフィールドはごく一部です。すでに JSON をパースして辞書型に変換しているので、
pop
メソッドを使用して容易に Python の呼び出し可能オブジェクトからフィールドを除くことができます。
- このタスクを完了するには、Python の呼び出し可能オブジェクトを
beam.Map
とともに使用して、アナリストが BigQuery で使用しないフィールドuser_agent
を除きます。
タスク 3. 要素でデータをフィルタする
Apache Beam にはフィルタリングの方法が数多くあります。Python 辞書形式の PCollection を取り扱っているため、最も簡単なのはブール値を返す関数である(匿名の)ラムダ関数をフィルタとして利用する方法で、beam.Filter
とともに使用します。次に例を示します。
- このタスクを完了するには、パイプラインに
beam.Filter
変換を追加します。どのような条件でもフィルタできますが、たとえばnum_bytes
が 120 以上である行を除くということを試してみると良いでしょう。
タスク 4. カスタム コマンドライン パラメータを追加する
パイプラインには現在、入力のパスや BigQuery のテーブルの場所など、多くのパラメータがハードコードされていますが、Cloud Storage の任意の JSON ファイルを読み取ることができれば、パイプラインがさらに便利になります。この機能を追加するには、一連のコマンドライン パラメータへの追加が必要です。
現在は、コマンドライン引数の読み込みと解析に ArgumentParser
を使用しています。そして、パイプライン作成時に指定した PipelineOptions()
オブジェクトに引数を渡します。
PipelineOptions を使用して、ArgumentParser
で読み込んだオプションを解釈します。このパーサーに新しいコマンドライン引数を追加するには、以下の構文を使用します。
コードでコマンドライン パラメータにアクセスするには、引数を解析した結果できる辞書のフィールドを参照します。
- このタスクを完了するには、入力パス、Google Cloud Storage 出力パス、BigQuery テーブル名を示すコマンドライン パラメータを追加して、定数の代わりにこれらのパラメータにアクセスするようにパイプライン コードを更新します。
タスク 5. パイプラインに null 値許容フィールドを追加する
お気づきかもしれませんが、前回のラボで作成した BigQuery テーブルには、すべてのフィールドを REQUIRED
とする次のようなスキーマがありました。
データが存在しない NULLABLE
フィールドがある Apache Beam スキーマを、パイプラインの実行自体およびその実行の結果である BigQuery テーブルの両方について作成するのが良いでしょう。
次のように、NULL 値を許容したいフィールドに新しいプロパティ mode
を追加して JSON BigQuery スキーマを更新することができます。
- このタスクを完成するには、BigQuery スキーマの
lat
フィールドとlon
フィールドを null 値許容に設定します。
タスク 6. コマンドラインでパイプラインを実行する
- このタスクを完了するには、コマンドラインでパイプラインを実行して適切なパラメータを渡します。生成される BigQuery スキーマの NULLABLE フィールドを忘れずにメモしておいてください。コードは次のようになります。
タスク 7. パイプラインの結果を確認する
- Dataflow の [ジョブ] ページに移動して、実行中のジョブを確認します。グラフは次のようになっているはずです。
-
Filter
関数を表すノード(上の図ではFilterFn
)をクリックします。右側に表示されたパネルで、入力として追加された要素が出力として書き込まれた要素よりも多いことが確認できます。 -
次に Cloud Storage への書き込みを表すノードをクリックします。すべての要素が書き込まれているので、この数字は Filter 関数への入力の要素数と一致しているはずです。
-
パイプラインが終了したら、テーブルに対してクエリを実行して BigQuery で結果を確認します。テーブル内のレコード数は Filter 関数で出力された要素の数と一致しているはずです。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。