チェックポイント
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
Dataflow を使用したサーバーレスのデータ処理 - Cloud Dataflow を使用した高度なストリーミング分析パイプライン(Java)
概要
このラボの内容:
- 遅延データに対処する。
- 不正な形式のデータに以下の方法で対処する。
- コードのモジュール性を強化するために複合変換を書き込む。
- さまざまなタイプの多重出力を送信する変換を書き込む。
- 不正な形式のデータを収集して、それらを調査できる場所に書き込む。
前回のラボの最後では、リアルタイム型パイプラインが対処すべきある種の課題に触れました。それは、イベントが発生したタイミングと処理されたタイミングの間に生じる隔たりで、ラグとも呼ばれます。このラボでは、パイプラインによるラグの対処方法を、パイプライン作成者が正しい形式で指定できるようにする Apache Beam のコンセプトについてご紹介します。
また、ストリーミング環境でパイプラインが遭遇する可能性がある問題は、ラグだけではありません。システム外からの入力は、なんらかの不正な形式である可能性が常にあります。このラボではそのような入力に対処するための手法もご紹介します。
このラボで最後にご紹介するパイプラインは、以下の画像に類似したパイプラインです。ブランチを含んでいることに注意してください。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例:
1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。 -
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。 -
利用規約に同意し、再設定用のリソースページをスキップします。
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
-
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
-
Compute Engine のデフォルトのサービス アカウント
{project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。- Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
- プロジェクト番号(例:
729328892908
)をコピーします。 - ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
- ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
- [新しいプリンシパル] に次のように入力します。
-
{project-number}
はプロジェクト番号に置き換えてください。 - [ロール] で、[Project](または [基本])> [編集者] を選択します。
- [保存] をクリックします。
このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
ラボのパート 1遅延データに対処する
これまでのラボでは、要素をイベント時間に基づき固定幅のウィンドウに分割するコードを、以下のようなコードを使用して記述してきました。
ですが、前回の非 SQL ラボの終わりにご確認いただいたように、データ ストリームにラグが起こることは珍しくありません。処理時間ではなくイベント時間を使用したウィンドウ処理を行った際には、ラグが問題となります。イベント時間のある特定の時点に発生したすべてのイベントが実際に到着したのかどうかがわからないからです。
当然、記述したパイプラインが結果を出力するには、それがどちらであるか決定しなければいけません。そのために、ウォーターマークというコンセプトを使います。ウォーターマークは、イベント時間のある時点までの全データがパイプラインに到着していると想定できるタイミングを判断する、システムのヒューリスティック的な概念です。ウォーターマークがウィンドウの終了時間を超えた後に、そのウィンドウのタイムスタンプを持つ要素が到着した場合、その要素は遅延データと見なされ、単純にドロップされます。つまり、全データが揃っているとシステムが高い確度で判断したときに、単一の(願わくば)完全な形の結果を出力するのがデフォルトのウィンドウ処理動作となります。
ウォーターマークをなるべく正確に推測するために、Apache Beam はいくつものヒューリスティックを使用していますが、当然、必ずしも正しいとは限りません。さらに言えば、これらのヒューリスティックは汎用的なものであり、すべてのユースケースに適切とは言えません。パイプラインの設計者たちは、汎用的なヒューリスティックを使用する代わりに、トレードオフとして何が妥当なのか見極めるために、以下の疑問を慎重に検討する必要があります。
- 完全性: 結果を演算する前に全データが揃っていることはどれほど重要なのか?
- レイテンシ: データの待ち時間はどれくらいにするのか?たとえば、全データが揃ったと判断できるまで待機するのか、データが到着するたびに処理していくのか、という点です。
- 費用: レイテンシ低減のために費やして構わないと考える演算能力と費用はどれくらいか?
これらの疑問への答えを判断材料に、Apache Beam の形式に則って、妥当なトレードオフを伴ったコードを記述できるようになります。
許容される遅延
ウィンドウが状態を維持する期間を許容される遅延で制御できます。ウォーターマークが許容される遅延期間の終わりに達すると、すべての状態はドロップされます。すべての永続状態を永久的に維持し続けることができたら、それはすばらしいことですが、制限なしのデータソースを取り扱う際、特定のウィンドウの状態を無期限に維持すればディスク容量を使い切ってしまうため、実用的ではありません。
そのため、現実世界のあらゆる順不同処理システムで、処理対象のウィンドウの存続期間を制限するなんらかの方法が必要になります。これを実現するシンプルかつ簡明な方法の一つが、システム内で許容される遅延の範囲を定義することです。つまり、ウォーターマークに基づきレコードの遅延に限度を設け、この範囲を超過して到着したデータはシステムによって処理されず、単純にドロップされるよう設定することが考えられます。各データに許容する遅延の範囲を設定すると、ウィンドウの状態を維持するべき期間も正確に定義されます(ウィンドウの終了時間に設定された遅延範囲をウォーターマークが超過するまで)。
タスク 1. 環境を準備する
以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。
適切なラボを開く
- IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
- データ環境を設定します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 2. 許容される遅延を設定する
Apache Beam では、以下の例のように withAllowedLateness() メソッドを使用して許容される遅延を設定します。
- このタスクの仕上げとして、ウィンドウ処理変換を調べて
.withAllowedLateness()
の呼び出し処理を追加し、適切なコマンドライン パラメータから構築した有効なDuration
に渡します。妥当な値を決め、適切な数を反映するようコマンドラインを更新してください。
トリガー
パイプラインの設計者は、暫定的な結果を出力するタイミングを自ら決定することもできます。たとえば、ウィンドウの終了時間に設定されたウォーターマークには達していないものの、想定データの 75% がすでに到着している場合を考えてみましょう。多くの場合、このようなサンプルは全体の傾向を表すエンドユーザーに紹介する価値があるものと見なされます。
Trigger
は、処理時間中に結果を出力するタイミングを決定します。ウィンドウの各出力は、ウィンドウのペインと呼ばれます。トリガーの条件が満たされると、トリガーによりペインが出力されます。Apache Beam では、ウォーターマークの進行状況、処理時間の進行状況(どのくらいのデータが実際に到着したかにかかわらず均一に進行)、要素数のカウント(例: 新規データの到着が特定数に達した場合)、ファイルの末尾に到達した場合などのデータ依存トリガーなどが、これらの条件に含まれます。
トリガーの条件によっては、ペインが何度も出力される場合もあります。そのため、これらの結果の蓄積方法を指定することも必要になります。Apache Beam は現在 2 種類の蓄積モードをサポートしています。1 つ目は結果をまとめて蓄積するモードで、2 つ目は最後に出力されたペインにはなかった新規の結果のみを返すモードです。
タスク 3. トリガーを設定する
Window
変換を使用して PCollection
のウィンドウ関数を設定するときに、トリガーも指定できます。
次のようにメソッド「.triggering()
」を Window.into()
変換の結果に対して呼び出すことで、PCollection
のトリガーを設定します。Window.triggering() はトリガーを引数として受け取ります。Apache Beam では、以下のようにいくつものトリガーが用意されています。
- AfterWatermark: ウィンドウの終了時間またはペイン内の最初の要素の到着に基づいて定めたタイムスタンプをウォーターマークが通過した際に起動させます。
- AfterProcessingTime: 一定の処理時間が経過した後で起動させます(一般的には、ペイン内の最初の要素が到着してから)。
- AfterPane: 現在のペイン内の要素に関する特性(例: 現在のペインに割り当てられている要素の数)が成立してから起動させます。
以下のコードサンプルでは PCollection
に時間ベースのトリガーを設定しています。このトリガーは、ウィンドウ内で最初の要素が処理されて 1 分後に結果を出力します。コードサンプルの最終行にある .discardingFiredPanes()
は、ウィンドウの蓄積モードを設定します。
- このタスクの仕上げとして、ウィンドウ処理変換に
Window.triggering()
の呼び出し処理を追加し、有効なTrigger
を渡します。トリガーを設計する際には、データが複数の 1 分間ウィンドウに表示され、到着の遅延が許容されるこのユースケースを忘れないようにしてください。
トリガーの例が必要な方は、こちらのソリューションをご覧ください。
ラボのパート 2不正な形式のデータに対処する
Trigger
の設定方法によっては、今回のパイプラインをすぐに実行して以前のラボから取得したパイプラインと比較した際に、新しいパイプラインの方が結果表示が早いと思われるかもしれません。また、ヒューリスティックがストリーミング動作をうまく予想できず、許容される遅延の方が効果的な場合、今回のパイプラインから得た結果の方が正確なこともあり得ます。
ですが、今回のパイプラインは遅延に対して高い堅牢性を発揮する一方で、不正な形式のデータに対しては以前のパイプラインと同様に脆弱です。パイプラインを実行して CommonLog
に解析可能な正しい形式の JSON 文字列以外のものを含むメッセージをパブリッシュする場合、パイプラインでエラーが発生するでしょう。このようなエラーは、Cloud Logging のようなツールで簡単に確認できますが、より適切に設計されたパイプラインは、これらのエラーを後で調査できるよう、事前に定義した場所に保管します。
このセクションでは、パイプラインにコンポーネントを追加して、モジュール性と堅牢性を強化します。
タスク 1. 不正な形式のデータを収集する
不正な形式のデータに対する堅牢性を強化するには、パイプラインがこの種のデータをフィルタリングして違う方法で処理できるよう分岐させる方法が必要です。パイプラインにブランチを組み込む方法の一つは、すでにご紹介しています。それは、多重変換用の入力である PCollection
を 1 つ作成することです。
この分岐形態は強力です。ですが、この戦略が適さないユースケースもあります。たとえば、同じ PCollection
のサブセットを 2 種類作成したいとしましょう。多重変換メソッドを使用して、各サブセット用のフィルタ変換を 1 つずつ作成し、それら両方を元の PCollection
に適用することになります。ですが、これでは各要素を 2 回処理することになってしまいます。
もう一つの分岐パイプライン生成方法は、PCollection
入力を 1 回処理し、単一の変換に多重出力を生成させる方法です。このタスクでは、多重出力を生成する変換を記述します。この出力の一方は元の入力ストリームから得た正しい形式のデータから取得した結果であり、もう一方は不正な形式の要素です。
PCollection
を 1 つだけ作成しながら複数の結果を出力するために、Apache Beam は PCollectionTuple
と呼ばれるクラスを使用します。PCollectionTuple は、異なる型を含む PCollection
の不変タプルであり、TupleTag
を「キー」とします。
2 種類の PCollection を使用して PCollectionTuple
をインスタンス化する例を以下に示します。その後、これらの PCollection
は PCollectionTuple.get()
メソッドを使用して取得されます。
PTransform
のコンテキストでこのメソッドを使用するには、以下の例にあるようなコードを記述します。この例では、要素の内容に基づいて TupleTag
を要素に割り当てています。
- このタスクの仕上げとして、2 つの
TupleTag
定数をクラスの冒頭で宣言します。そして、PCollectionTuple
を返し、未解析の要素に片方のタグを付与して解析済みの要素にもう片方のタグを付与するように、JsonToCommonLog
変換を変更します。if / then / else
型の代わりに、try / catch
文を使用します。
タスク 2. 複合変換を使用してコードのモジュール性を強化する
変換は、複雑な変換がよりシンプルな変換(複数の ParDo
、Combine
、GroupByKey
、あるいは他の複合変換など)を複数実行するネスト構造にすることができます。これらの変換は複合変換と呼ばれます。1 つの複合変換内で複数の変換をネストすると、コードのモジュール性を高めて、わかりやすくすることができます。
- 独自の複合変換を作成する場合は、
PTransform
クラスのサブクラスを作成し、実際の処理ロジックを指定するように expand() メソッドをオーバーライドします。PTransform
クラスの型パラメータについては、変換が入力として受け取り、出力として生成するPCollection
型を渡します。
次のコードサンプルは、文字列の PCollection
を入力として受け付け、整数型の PCollection
を出力する PTransform
の宣言方法を示したものです。
#TODO: JsonToRow
-
PTransform
サブクラス内では、expand() メソッドをオーバーライドする必要があります。expand() メソッドでは、PTransform
の処理ロジックを追加します。expand() メソッドのオーバーライドでは、適切なタイプの入力「PCollection
」をパラメータとして受け付け、出力「PCollection
」を戻り値として指定する必要があります。
- 変換を呼び出すには、
PCollection
に対してPCollection.apply()
を使用し、複合変換のインスタンスを渡します。
- このタスクの仕上げとして、先ほど変更したばかりの
JsonToCommonLog
変換を複合変換に変えます。この処理によって、CommonLog
のインスタンスを想定している現在の書き込み変換に問題が発生します。複合変換の結果を新規のPCollectionTuple
に保存してから、.get()
を使用して書き込み変換が想定しているPCollection
を取得します。
タスク 3. 後で分析できるように不正な形式のデータを書き込む
不正な形式のデータを生み出しているアップストリームの問題を解消するためには、不正な形式のデータを分析できなければいけません。それには、不正な形式のデータをどこかに出力する必要があります。このタスクでは、不正な形式のデータを Google Cloud Storage に書き込みます。このような方法を、デッドレター ストレージの使用と呼びます。
これまでのラボでは、TextIO.write()
を使用して制限付きソース(バッチ)を Cloud Storage に直接書き込んできました。ですが、制限なしソース(ストリーミング)からの書き込みでは、この手法に少し手を加える必要があります。
まずは書き込み変換のアップストリームで、Trigger
を使用して、処理時間のどのタイミングで書き込むのかを指定します。指定をせずにデフォルトのままにした場合、書き込みが実行されません。デフォルトでは、すべてのイベントがグローバル ウィンドウに属しています。一括で動作させる場合は全データセットが実行時に把握されているため、このままで問題ありません。しかし、制限なしソースの場合は全データセットのサイズが不明であり、グローバル ウィンドウのペインが完了しないため、ペインは出力されなくなります。
Trigger
を使用する以上、Window
も使用する必要が出てきます。ただし、ウィンドウの変更が常に必要とは限りません。これまでのラボやタスクでは、ウィンドウ処理変換を使用してグローバル ウィンドウをイベント時間内で期間が固定されているウィンドウと置き換えてきました。このような場合では、どの要素がどの要素とグループ化されているのかよりも、有用な方法かつ有用な頻度で結果が出力されているかどうかが重要です。
以下の例では、ウィンドウは処理時間 10 秒ごとにグローバル ウィンドウのペインを出力しますが、新規イベントのみを書き込みます。
Trigger
を設定したら、書き込みを実行するように TextIO.write()
の呼び出し処理を変更します。ウィンドウ処理変換のダウンストリームに書き込む際は、withWindowedWrites()
の呼び出し処理をチェーンして、書き込みが並列処理されるように複数のシャードを指定します。
- このタスクの仕上げとして、
PCollectionTuple
に対して.get()
を使用して新規の変換を作成し、不正な形式のデータを取得します。トリガーに関する知識と判断力を働かせて、このトリガーに対して適切な起動条件を設定しましょう。
タスク 4. パイプラインを実行する
- パイプラインを実行するには、以下の例に類似したコマンドを作成します。この例は、記述に含めたコマンドライン オプションの名前を反映するように修正が必要です。
このクエストのコードには、JSON イベントを Pub/Sub でパブリッシュするためのスクリプトが含まれています。
- このタスクを完了してメッセージのパブリッシュを開始するには、現在のターミナルと並べて新しいターミナルを開き、次のスクリプトを実行します。このスクリプトは停止されるまでメッセージをパブリッシュし続けます。
training-data-analyst/quests/dataflow
フォルダが作業ディレクトリになっていることを確認します。
true
フラグはストリームに遅延イベントを追加します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
タスク 5. パイプラインをテストする
- [Pub/Sub] > [トピック] に移動して、トピック「
my_topic
」をクリックします。 - [メッセージ] タブ > [メッセージをパブリッシュ] ボタンの順にクリックします。
- その後のページで、配信するメッセージを入力します。
CommonLog
JSON 仕様に対して完璧に適合しない限り、メッセージは短時間のうちにデッドレター Cloud Storage バケットに到着するはずです。パイプラインのモニタリング ウィンドウに戻り、未解析メッセージの処理を担当するブランチに存在するノードをクリックすることで、パイプラインを通過する経路を追跡できます。
- このブランチに要素が追加されたことを確認したら、Cloud Storage に移動してメッセージがディスクに書き込まれたのか検証できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。