チェックポイント
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
Dataflow でのサーバーレスなデータ処理 - Apache Beam(Java)でのテスト
- 概要
- 設定と要件
- ラボのパート 1. DoFns と PTransforms の単体テスト
- タスク 1. パイプラインのメインコードについて学ぶ
- タスク 2. テスト用の依存関係を追加する
- タスク 3. Apache Beam に最初の DoFn 単体テストを記述する
- タスク 4. DoFn の最初の単体テストを実施する
- タスク 5. DoFn の 2 度目の単体テストとパイプラインのデバッグを実施する
- タスク 6. PTransform 単体テストとエンドツーエンドのパイプラインのテストを実施する
- ラボのパート 2. TestStream でのストリーム処理ロジックのテスト
- タスク 1. パイプラインのメインコードについて学ぶ
- タスク 2. TestStream の使い方を学び、最初のテストを実施する
- タスク 3. 遅延データ処理のテスト用に TestStream を作成する
- タスク 4. 遅延データ処理をテストする
- ラボを終了する
概要
このラボの内容:
- Apache Beam にあるテストツールを使用して、
DoFns
とPTransforms
用の単体テストを記述します。 - パイプライン統合テストを実施します。
-
TestStream
クラスを使用して、ストリーミング パイプライン向けのウィンドウ処理の動作をテストします。
パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Beam モデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。
このラボでは、Beam SDK のテスト用パッケージに含まれるツールを使い、ローカルで DirectRunner
による単体テストを行う方法を説明します。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
-
Qwiklabs にシークレット ウィンドウでログインします。
-
ラボのアクセス時間(例:
1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。 -
準備ができたら、[ラボを開始] をクリックします。
-
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
-
[Google Console を開く] をクリックします。
-
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。 -
利用規約に同意し、再設定用のリソースページをスキップします。
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
-
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
-
Compute Engine のデフォルトのサービス アカウント
{project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。- Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
- プロジェクト番号(例:
729328892908
)をコピーします。 - ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
- ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
- [新しいプリンシパル] に次のように入力します。
-
{project-number}
はプロジェクト番号に置き換えてください。 - [ロール] で、[Project](または [基本])> [編集者] を選択します。
- [保存] をクリックします。
統合開発環境(IDE)を設定する
このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
このラボのコードは 2 つのフォルダに分けてあります。8a_Batch_Testing_Pipeline/lab と 8b_Stream_Testing_Pipeline/lab です。不明な点がある場合、対応するソリューション フォルダ内を探すことで解決策が見つかります。
ラボのパート 1. DoFns と PTransforms の単体テスト
ラボのこのセクションでは、気象センサーから得た統計情報を計算するバッチ パイプラインのために、DoFns と PTransforms の単体テストを実施します。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
-
TestPipeline
を作成します。 - テスト入力データを作成し、
Create
変換を使って入力データのPCollection
を作成します。 - 入力用の
PCollection
に変換を適用し、結果として得られたPCollection
を保存します。 -
PAssert
とそのサブクラスを使用して、期待される要素が出力PCollection
に含まれることを確認します。
TestPipeline
は Beam SDK に含まれる特別なクラスで、特に変換とパイプライン ロジックのテストに使います。
- テストでは、パイプライン オブジェクトの作成時に、
Pipeline
の代わりにTestPipeline
を使用します。
Create
変換では、オブジェクト(Java Iterable)のインメモリ コレクションを利用して、このコレクションから PCollection
を作成します。目的は、期待される出力 PCollection
がわかっている小さめのテスト入力データを PTransforms
から得ることです。
最後に、出力 PCollection が期待される出力と一致するかを確認します。この確認には PAssert
クラスを使用します。たとえば、containsInAnyOrder
メソッドを使用すると、出力 PCollection に正しい要素があるかどうかを確認できます。
タスク 1. パイプラインのメインコードについて学ぶ
- IDE で 8a_Batch_Testing_Pipeline/lab を開きます。
このディレクトリには依存関係を定義するための pom.xml
ファイルがあります。また、サブディレクトリ 2 つを含む src
フォルダもあります。src/main
フォルダにはパイプライン パッケージ コードがあり、src/test
フォルダには使用するテストコードがあります。
- まず 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java の順に開きます。
このファイルにはパイプラインで使用する予定の WeatherRecord
クラスの定義が含まれています。WeatherRecord
クラスには関連するスキーマがあります。@DefaultSchema
アノテーションを使用してこのスキーマを定義する手順は、すでにご存じのはずです。ただし、クラスを定義する際に equals
メソッドをオーバーライドしなければならないことにご注意ください。
これはなぜでしょうか。PAssert
は equals
メソッドを使用して出力 PCollection
にあるメンバーシップを確認します。しかし、POJO(Plain Old Java Object)に使用するデフォルトの equals メソッドでは、オブジェクトのアドレスの比較のみが行われます。ここでは、オブジェクトの中身を突き合わせて確認する必要があります。オーバーライドは、上記のように簡単に行えます。
- 次に、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開きます。
これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。
-
DoFns
ConvertCsvToWeatherRecord
(65 行目以降)とConvertTempUnits
(81 行目以降)については、DoFns
の単体テストを後ほど行います。 -
PTransform
ComputeStatistics
(103 行目以降)について。これは、DoFn
と同様にテストできる複合変換の例です。 -
PTransform
WeatherStatsTransform
(123 行目以降)について。このPTransform
には、Create
変換で作成した合成データに小規模なパイプライン統合テストを行えるようにするために、パイプライン全体(ソースとシンク変換を除く)に使用する処理ロジックが含まれています。
処理コードに含まれる論理エラーに気付いても、まだ修正しないでください。この後に、テストによってエラーを絞り込む方法を確認します。
タスク 2. テスト用の依存関係を追加する
- 8a_Batch_Testing_Pipeline/lab/pom.xml を開きます。
テストのために依存関係をいくつか追加する必要があります。テスト用の Beam の Java コードでは、JUnit
と Hamcrest
をリンクする必要があります。Maven では pom.xml
ファイルを更新するだけでこれを行うことができます。
- このタスクを完了するには、以下の XML をコピーして
pom.xml
ファイルのコメントで指示されている部分に貼り付けます。
これらの依存関係のスコープは「test」です。mvn test
でのテスト実行時はこのパッケージが必要になりますが、メインのパイプラインの実行時には不要です。
タスク 3. Apache Beam に最初の DoFn 単体テストを記述する
- 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に移動します。
このファイルには、DoFn
および PTransform
単体テストに必要なコードが含まれています。現在、ほとんどのコードがコメントアウトされていますが、作業をしながらコメント化を解除していきます。
まず、ConvertCsvToWeatherRecord DoFn
(43 行目以降)に対する DoFn
単体テストについて確認します。
- 最初にパイプラインをテストするためのクラスを作成し、次に
TestPipeline
オブジェクトを作成します。
この TestPipeline
オブジェクトを以降のテストすべてに使用します。オブジェクト作成時に transient
キーワードを含めているため、同じオブジェクトの再使用による副次的な影響の心配はありません。
- 最初のテストのための(不完全な)コードを見てみます。
パイプラインをテストするためのメソッドに、@Test
アノテーションを追加します。テスト入力は 1 つだけ(testInput
)、CSV ファイルの 1 行を表すものを作成します。これがこのパイプラインに対して期待される入力フォーマットです。このテスト入力を List
オブジェクトの input
に組み込みます。
テスト用のコードの残りの部分には、欠けているパーツがいくつかあります。
-
このタスクを完了するには、まず
Create
変換を追加して、input
をPCollection
に変換します。 -
次に、
containsInAnyOrder
メソッドを使用してPAssert
ステートメントを追加し、input
とtestOutput
を比較します。
不明な点がある場合は、後で紹介するコメント付きのテストか、ソリューションを参照してください。
タスク 4. DoFn の最初の単体テストを実施する
- IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
これでテストの準備ができました。
- あとは、次のコマンドをターミナルで実行するだけです。
ここまでのタスクを正しく完了できていれば、テスト完了後にはターミナルに以下が表示されます(正確な経過時間は異なります)。
タスク 5. DoFn の 2 度目の単体テストとパイプラインのデバッグを実施する
- 再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、2 度目の単体テストのためにコードのコメント化を解除します(67~80 行付近)。これには、コードをハイライト表示し、Ctrl + / キー(MacOS の場合は Cmd + / キー)を押します。参考のために、以下にコードを示します。
このテストにより、ConvertTempUnits()
DoFn
が想定通りに動作することを確認できます。
-
WeatherStatisticsPipelineTest.java
を保存し、ターミナルに戻ります。 -
再度、次のコマンドを実行してテストを行います。
今回はテストが失敗しました。スクロールしていくと、テストの失敗について次の情報が見つかります。
一見すると、それほど役に立つエラー メッセージとは思われません。しかし、予想していた testOutput
の WeatherRecord
と一致していなかったことはわかります。温度の変換方法に間違いがあった可能性があります。
-
再度、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開き、下にスクロールして
ConvertTempUnits
の定義(81 行目付近)を確認します。 -
このタスクを完了するには、
DoFn
処理ロジックにあるエラーを見つけ、mvn test
コマンドをもう一度実行し、テストが成功することを確認します。参考までに、以下に摂氏と華氏を変換するコードを挙げておきます。
不明な点がある場合、ソリューションを参照してください。
タスク 6. PTransform 単体テストとエンドツーエンドのパイプラインのテストを実施する
- 再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、最後の 2 つのテストのためにコードのコメント化を解除します(84 行目付近以降)。
さきほどコメント化解除した最初のテストは、複合的な PTransform
ComputeStatistics
をテストするものでます。参考までに、抜き出したコードを以下に記載しておきます。
前に行った DoFn
単体テストとよく似ていることに注目してください。(テスト入力と出力以外の)違いは、PTransform
を ParDo(new DoFn())
の代わりに使用していることだけです。
最後にエンドツーエンドのパイプラインをテストします。パイプライン コード(WeatherStatisticsPipeline.java)には、エンドツーエンドのパイプライン全体からソースとシンクを削除したものが、1 つの PTransform
WeatherStatsTransform
にすべて含まれています。
- エンドツーエンドのパイプラインのテストでは、ここまでに行ったことと類似の内容を繰り返しますが、代わりに
PTransform
を使用します。
- ターミナルに戻り、次のコマンドを実行してもう一度テストを行います。
ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボのパート 2. TestStream でのストリーム処理ロジックのテスト
このセクションでは、タクシーの乗車回数をウィンドウ処理して計算するストリーミング パイプラインを単体テストします。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
-
TestPipeline
を作成します。 - ストリーミング データの生成には、
TestStream
クラスを使用します。これには一連のイベントの生成と、ウォーターマークおよび処理時間の進行が含まれています。 -
PAssert
とそのサブクラスを使用して、指定したウィンドウに期待される要素が出力PCollection
に含まれることを確認します。
TestStream
から読み取るパイプラインの実行時、読み取り処理は、各イベントですべての結果(処理時間の進行や適切なトリガーの起動など)が完了するまで待ってから次のイベントに移ります。TestStream
により、トリガーの影響と許容される遅延をパイプライン上でモニタリングしテストすることができます。これにはトリガー遅延や遅延によるデータの欠損に関するロジックも含まれています。
タスク 1. パイプラインのメインコードについて学ぶ
- IDE で 8b_Stream_Testing_Pipeline/lab に移動します。
このディレクトリには依存関係を定義するための pom.xml
ファイルがあります。また、サブディレクトリ 2 つを含む src
フォルダもあります。src/main
フォルダにはパイプライン パッケージ コードがあり、src/test
フォルダには使用するテストコードがあります。
- まず、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java の順に開きます。
このファイルにはパイプラインで使用する予定の TaxiRide
クラスの定義が含まれています。TaxiRide
クラスには関連するスキーマがあります。@DefaultSchema
アノテーションを使用してこのスキーマを定義する手順はすでにご存じのはずです。
- 次に、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java の順に開きます。
これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。
-
DoFn
JsonToTaxiRide
(94 行目以降)は、TaxiRide
クラスのオブジェクトが受け取る Pub/Sub メッセージの変換に使用されます。 -
PTransform
TaxiCountTransform
(113 行目以降)について。このPTransform
には、パイプラインのカウントとウィンドウ処理に使用するメインのロジックが含まれています。今回のテストでフォーカスするのはこのPTransform
です。
TaxiCountTransform
の出力では、ウィンドウごとに記録されたタクシー乗車がすべてカウントされているはずです。しかし、乗車ごとに複数のイベント(ピックアップや降車など)があります。
- そこで、乗車ごとに 1 回だけカウントするように、
ride_status
プロパティをフィルタリングします。ride_status
が「pickup」の場合にのみ、要素を残すようにします。
少し詳しく見ていきます。このパイプラインで使用されているウィンドウ処理のロジックが、以下に含まれています。
60 秒の長さの固定ウィンドウに収まるように処理します。始めのほうにはトリガーはなく、ウォーターマークがウィンドウの最後を過ぎた後に結果が出力されます。受信する新しい要素それぞれに遅延のあるトリガー起動が含まれますが、許容範囲である 1 分を過ぎた呼び出しは含まれません。また、遅延の許容範囲が過ぎるまでは、ウィンドウ内の状態が累積されます。
タスク 2. TestStream の使い方を学び、最初のテストを実施する
- 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java の順に開きます。
最初の目標は、テストコードでの TestStream
の使用方法を理解することです。TestStream
クラスによって、メッセージのリアルタイムなストリーミングをシミュレーションしつつ、処理時間とウォーターマークの進行を管理できることを思い出してください。
以下には、最初のテストのコード(66 行目以降)が含まれています。
create
メソッドを使用して新しい TestStream
を作成し、一方でコーダーも指定します。JSON メッセージを文字列として渡すことで、StringUtf8Coder
を使用できます。次に、上記のコードでの TestStream
の役割を確認します。
TestStream
は、以下の役割を担っています。
- 最初のウォーターマークを変数
startTime
(Instant(0)
)に設定します。 - 3 つの要素を、
startTime
のイベント タイムスタンプとともに文字列に追加します。これらイベントのうち 2 つはカウントされます(ride_status = "pickup"
)が、残りの 1 つはカウントされません。 - もう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは
startTime
の 1 分後です。 - ウォーターマークを
startTime
の 1 分後まで進めて、最初のウィンドウをトリガーします。 - さらにもう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは
startTime
の 2 分後です。 - ウォーターマークを「infinity」まで進めます。これですべてのウィンドウが終了します。この後のデータはすべて、許容される遅延期限を過ぎています。
- 最初のテストのコードの残り部分は、パート 1 のバッチの例に似ていますが、今回は
Create
変換の代わりにTestStream
を使用します。
この上のコードでは、出力 PCollection
(outputCount
)を、TestStream
を作成して TaxiCountTransform
PTransform
を適用することで定義しています。チェック対象のウィンドウは InvervalWindow
クラスを使用して定義し、その後 PAssert
を inWindow
メソッドで使用して各ウィンドウの結果を確認しています。
- IDE のターミナルに戻るか新しくターミナルを開きます。次のコマンドを実行することによって、正しいディレクトリに移動して依存関係をインストールします。
- 次のコマンドを実行して、上記のテストを行います。
テストが終わると、次の出力が確認できるはずです(経過時間は異なる場合があります)。
タスク 3. 遅延データ処理のテスト用に TestStream を作成する
このタスクでは、遅延データの処理に関連するロジックをテストするための TestStream
のコードを記述します。
-
再度、8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java の順に開き、
testTaxiRideLateData
メソッドのコメント(104 行目付近)まで下にスクロールします。 -
このタスクで使用するコードを完成させるため、テスト用のコードのコメント化を解除します。
テスト用のコードは、TestStream
の作成を除いて完成しています。
- このタスクを完了するには、以下のタスクを行う
TestStream
オブジェクトを作成します。
- ウォーターマークを
startTime
まで進めます。 - 2 つの
TimestampedValue
(値json.format(json, "pickup")
、タイムスタンプstartTime
)を追加します。 - ウォーターマークを
startTime
の 1 分後まで進めます。 - もう 1 つ別の
TimestamedValue
(値json.format(json, "pickup")
、タイムスタンプstartTime
)を追加します。 - ウォーターマークを
startTime
の 2 分後まで進めます。 - もう 1 つ別の
TimestamedValue
(値json.format(json, "pickup")
、タイムスタンプstartTime
)を追加します。 - ウォーターマークを infinity まで進めます。
これで要素が 4 つある TestStream
が作成されます。要素はいずれも最初のウィンドウに属しています。最初の 2 要素は時間通り、2 番目の要素は遅延あり(ただし許容される範囲内)、最後の要素は許容される範囲外までの遅延ありとなっています。トリガ―されたペインは累積されるため、最初のトリガーは 2 つのイベントをカウントし、最後のトリガーは 3 つのイベントをカウントするはずです。4 番目のイベントは含まれません。このチェックには、PAssert
クラスの inOnTimePane
メソッドと inFinalPane
メソッドを使用します。
不明な点がある場合、ソリューションを参照してください。
タスク 4. 遅延データ処理をテストする
- ターミナルに戻り、以下のコマンドを実行してもう一度テストを行います。
ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボを終了する
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
- 星 1 つ = 非常に不満
- 星 2 つ = 不満
- 星 3 つ = どちらともいえない
- 星 4 つ = 満足
- 星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。