概要
このラボの内容:
Apache Beam にあるテストツールを使用して、DoFns
と PTransforms
用の単体テストを記述します。
パイプライン統合テストを実施します。
TestStream
クラスを使用して、ストリーミング パイプライン向けのウィンドウ処理の動作をテストします。
パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Beam モデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。
このラボでは、Beam SDK のテスト用 パッケージに含まれるツールを使い、ローカルで DirectRunner
による単体テストを行う方法を説明します。
設定と要件
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウ でログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始 ] をクリックします。
ラボの認証情報(ユーザー名 とパスワード )をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く ] をクリックします。
[別のアカウントを使用 ] をクリックし、この ラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求 が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
プロジェクトの権限を確認する
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー ( )で、[IAM と管理 ] > [IAM ] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
注: アカウントが IAM に存在しない場合やアカウントに編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
Google Cloud コンソールのナビゲーション メニュー で、[Cloud の概要] > [ダッシュボード] をクリックします。
プロジェクト番号(例: 729328892908
)をコピーします。
ナビゲーション メニュー で、[IAM と管理 ] > [IAM ] を選択します。
ロールの表の上部で、[プリンシパル別に表示 ] の下にある [アクセス権を付与 ] をクリックします。
[新しいプリンシパル ] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
{project-number}
はプロジェクト番号に置き換えてください。
[ロール ] で、[Project ](または [基本])> [編集者 ] を選択します。
[保存 ] をクリックします。
統合開発環境(IDE)を設定する
このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。
注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。ファイル エクスプローラ
ボタンをクリックして確認します。
Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソール から次のようにリセットしてみてください。
このラボのコードは 2 つのフォルダに分けてあります。8a_Batch_Testing_Pipeline/lab と 8b_Stream_Testing_Pipeline/lab です。不明な点がある場合、対応するソリューション フォルダ内を探すことで解決策が見つかります。
ラボのパート 1. DoFns と PTransforms の単体テスト
ラボのこのセクションでは、気象センサーから得た統計情報を計算するバッチ パイプラインのために、DoFns と PTransforms の単体テストを実施します。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
TestPipeline
を作成します。
テスト入力データを作成し、Create
変換を使って入力データの PCollection
を作成します。
入力用の PCollection
に変換を適用し、結果として得られた PCollection
を保存します。
PAssert
とそのサブクラスを使用して、期待される要素が出力 PCollection
に含まれることを確認します。
TestPipeline
は Beam SDK に含まれる特別なクラスで、特に変換とパイプライン ロジックのテストに使います。
テストでは、パイプライン オブジェクトの作成時に、Pipeline
の代わりに TestPipeline
を使用します。
TestPipeline p = TestPipeline.create();
Create
変換では、オブジェクト(Java Iterable)のインメモリ コレクションを利用して、このコレクションから PCollection
を作成します。目的は、期待される出力 PCollection
がわかっている小さめのテスト入力データを PTransforms
から得ることです。
List<String> input = Arrays.asList(testInput);
// TestPipeline p を作成するためのコード
outputPColl = p.apply(Create.of(input).apply(...);
最後に、出力 PCollection が期待される出力と一致するかを確認します。この確認には PAssert
クラスを使用します。たとえば、containsInAnyOrder
メソッドを使用すると、出力 PCollection に正しい要素があるかどうかを確認できます。
PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);
タスク 1. パイプラインのメインコードについて学ぶ
IDE で 8a_Batch_Testing_Pipeline/lab を開きます。
このディレクトリには依存関係を定義するための pom.xml
ファイルがあります。また、サブディレクトリ 2 つを含む src
フォルダもあります。src/main
フォルダにはパイプライン パッケージ コードがあり、src/test
フォルダには使用するテストコードがあります。
まず 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java の順に開きます。
このファイルにはパイプラインで使用する予定の WeatherRecord
クラスの定義が含まれています。WeatherRecord
クラスには関連するスキーマがあります。@DefaultSchema
アノテーションを使用してこのスキーマを定義する手順は、すでにご存じのはずです。ただし、クラスを定義する際に equals
メソッドをオーバーライドしなければならないことにご注意ください。
@Override
public boolean equals(final Object obj){
if(obj instanceof WeatherRecord){
final WeatherRecord other = (WeatherRecord) obj;
return (locId.equals(other.locId))
&& (Double.compare(lat, other.lat) == 0)
&& (Double.compare(lng, other.lng) == 0)
&& (date.equals(other.date))
&& (Double.compare(lowTemp, other.lowTemp) == 0)
&& (Double.compare(highTemp, other.highTemp) == 0)
&& (Double.compare(precip, other.precip) == 0);
} else{
return false;
}
}
これはなぜでしょうか。PAssert
は equals
メソッドを使用して出力 PCollection
にあるメンバーシップを確認します。しかし、POJO(Plain Old Java Object)に使用するデフォルトの equals メソッドでは、オブジェクトのアドレスの比較のみが行われます。ここでは、オブジェクトの中身を突き合わせて確認する必要があります。オーバーライドは、上記のように簡単に行えます。
次に、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開きます。
これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。
DoFns
ConvertCsvToWeatherRecord
(65 行目以降)と ConvertTempUnits
(81 行目以降)については、DoFns
の単体テストを後ほど行います。
PTransform
ComputeStatistics
(103 行目以降)について。これは、DoFn
と同様にテストできる複合変換の例です。
PTransform
WeatherStatsTransform
(123 行目以降)について。この PTransform
には、Create
変換で作成した合成データに小規模なパイプライン統合テストを行えるようにするために、パイプライン全体(ソースとシンク変換を除く)に使用する処理ロジックが含まれています。
処理コードに含まれる論理エラーに気付いても、まだ修正しないでください。この後に、テストによってエラーを絞り込む方法を確認します。
タスク 2. テスト用の依存関係を追加する
8a_Batch_Testing_Pipeline/lab/pom.xml を開きます。
テストのために依存関係をいくつか追加する必要があります。テスト用の Beam の Java コードでは、JUnit
と Hamcrest
をリンクする必要があります。Maven では pom.xml
ファイルを更新するだけでこれを行うことができます。
このタスクを完了するには、以下の XML をコピーして pom.xml
ファイルのコメントで指示されている部分に貼り付けます。
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>2.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>2.1</version>
<scope>test</scope>
</dependency>
これらの依存関係のスコープは「test」です。mvn test
でのテスト実行時はこのパッケージが必要になりますが、メインのパイプラインの実行時には不要です。
タスク 3. Apache Beam に最初の DoFn 単体テストを記述する
8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に移動します。
このファイルには、DoFn
および PTransform
単体テストに必要なコードが含まれています。現在、ほとんどのコードがコメントアウトされていますが、作業をしながらコメント化を解除していきます。
まず、ConvertCsvToWeatherRecord DoFn
(43 行目以降)に対する DoFn
単体テストについて確認します。
最初にパイプラインをテストするためのクラスを作成し、次に TestPipeline
オブジェクトを作成します。
@RunWith(JUnit4.class)
public class WeatherStatisticsPipelineTest {
@Rule
public final transient TestPipeline p = TestPipeline.create();
この TestPipeline
オブジェクトを以降のテストすべてに使用します。オブジェクト作成時に transient
キーワードを含めているため、同じオブジェクトの再使用による副次的な影響の心配はありません。
最初のテストのための(不完全な)コードを見てみます。
@Test
@Category(NeedsRunner.class)
public void testConvertCsvToWeatherRecord() throws Exception {
String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1";
List<String> input = Arrays.asList(testInput);
PCollection<WeatherRecord> output =
p.apply(/* インメモリ オブジェクトから PCollection を作成 */)
.apply(ParDo.of(new ConvertCsvToWeatherRecord()));
WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1);
// PAssert ステートメントを含めて正しい結果かチェックする
p.run().waitUntilFinish();
}
パイプラインをテストするためのメソッドに、@Test
アノテーションを追加します。テスト入力は 1 つだけ(testInput
)、CSV ファイルの 1 行を表すものを作成します。これがこのパイプラインに対して期待される入力フォーマットです。このテスト入力を List
オブジェクトの input
に組み込みます。
テスト用のコードの残りの部分には、欠けているパーツがいくつかあります。
このタスクを完了するには、まず Create
変換を追加して、input
を PCollection
に変換します。
次に、containsInAnyOrder
メソッドを使用して PAssert
ステートメントを追加し、input
と testOutput
を比較します。
不明な点がある場合は、後で紹介するコメント付きのテストか、ソリューション を参照してください。
タスク 4. DoFn の最初の単体テストを実施する
IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリをラボ用のものに変更
cd 8a_Batch_Testing_Pipeline/lab
# 依存関係をダウンロード
mvn clean dependency:resolve
export BASE_DIR=$(pwd)
これでテストの準備ができました。
あとは、次のコマンドをターミナルで実行するだけです。
mvn test
ここまでのタスクを正しく完了できていれば、テスト完了後にはターミナルに以下が表示されます(正確な経過時間は異なります)。
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
タスク 5. DoFn の 2 度目の単体テストとパイプラインのデバッグを実施する
再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、2 度目の単体テストのためにコードのコメント化を解除します(67~80 行付近)。これには、コードをハイライト表示し、Ctrl + / キー (MacOS の場合は Cmd + / キー )を押します。参考のために、以下にコードを示します。
@Test
@Category(NeedsRunner.class)
public void testConvertTempUnits() throws Exception {
WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1);
List<WeatherRecord> input = Arrays.asList(testInput);
PCollection<WeatherRecord> output =
p.apply(Create.of(input))
.apply(ParDo.of(new ConvertTempUnits()));
WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1);
PAssert.that(output).containsInAnyOrder(testOutput);
p.run().waitUntilFinish();
}
このテストにより、ConvertTempUnits()
DoFn
が想定通りに動作することを確認できます。
WeatherStatisticsPipelineTest.java
を保存し、ターミナルに戻ります。
再度、次のコマンドを実行してテストを行います。
mvn test
今回はテストが失敗しました。スクロールしていくと、テストの失敗について次の情報が見つかります。
[ERROR] Failures:
[ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output:
Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order
but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>
一見すると、それほど役に立つエラー メッセージとは思われません。しかし、予想していた testOutput
の WeatherRecord
と一致していなかったことはわかります。温度の変換方法に間違いがあった可能性があります。
再度、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開き、下にスクロールして ConvertTempUnits
の定義(81 行目付近)を確認します。
このタスクを完了するには、DoFn
処理ロジックにあるエラーを見つけ、mvn test
コマンドをもう一度実行し、テストが成功することを確認します。参考までに、以下に摂氏と華氏を変換するコードを挙げておきます。
tempF = tempC * 1.8 + 32.0
不明な点がある場合、ソリューション を参照してください。
タスク 6. PTransform 単体テストとエンドツーエンドのパイプラインのテストを実施する
再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、最後の 2 つのテストのためにコードのコメント化を解除します(84 行目付近以降)。
さきほどコメント化解除した最初のテストは、複合的な PTransform
ComputeStatistics
をテストするものでます。参考までに、抜き出したコードを以下に記載しておきます。
@Test
@Category(NeedsRunner.class)
public void testComputeStatistics() throws Exception {
WeatherRecord[] testInputs = new WeatherRecord[3];
//テスト入力を定義(ここでは省略)
List<WeatherRecord> input = Arrays.asList(testInputs);
PCollection<String> output =
p.apply(Create.of(input))
.apply(new ComputeStatistics());
String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]",
"[\"y\",72.5,82.5,0.5]"};
PAssert.that(output).containsInAnyOrder(testOutputs);
p.run().waitUntilFinish();
}
前に行った DoFn
単体テストとよく似ていることに注目してください。(テスト入力と出力以外の)違いは、PTransform
を ParDo(new DoFn())
の代わりに使用していることだけです。
最後にエンドツーエンドのパイプラインをテストします。パイプライン コード(WeatherStatisticsPipeline.java )には、エンドツーエンドのパイプライン全体からソースとシンクを削除したものが、1 つの PTransform
WeatherStatsTransform
にすべて含まれています。
エンドツーエンドのパイプラインのテストでは、ここまでに行ったことと類似の内容を繰り返しますが、代わりに PTransform
を使用します。
@Test
@Category(NeedsRunner.class)
public void testWeatherStatsTransform() throws Exception {
String[] testInputs = new String[] //テスト入力を定義(ここでは省略)
List<String> input = Arrays.asList(testInputs);
PCollection<String> output =
p.apply(Create.of(input))
.apply(new WeatherStatsTransform());
String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]",
"[\"y\",54.5,63.5,0.5]"};
PAssert.that(output).containsInAnyOrder(testOutputs);
p.run().waitUntilFinish();
}
ターミナルに戻り、次のコマンドを実行してもう一度テストを行います。
mvn test
ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。
[INFO] -------------------------------------------------------
[INFO] T E S T S
[INFO] -------------------------------------------------------
[INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest
[INFO]
[INFO] Results:
[INFO]
[INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[進行状況を確認 ] をクリックして、目標に沿って進んでいることを確認します。
DoFns と PTransforms の単体テストを実施する
ラボのパート 2. TestStream でのストリーム処理ロジックのテスト
このセクションでは、タクシーの乗車回数をウィンドウ処理して計算するストリーミング パイプラインを単体テストします。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。
TestPipeline
を作成します。
ストリーミング データの生成には、TestStream
クラスを使用します。これには一連のイベントの生成と、ウォーターマークおよび処理時間の進行が含まれています。
PAssert
とそのサブクラスを使用して、指定したウィンドウに期待される要素が出力 PCollection
に含まれることを確認します。
TestStream
から読み取るパイプラインの実行時、読み取り処理は、各イベントですべての結果(処理時間の進行や適切なトリガーの起動など)が完了するまで待ってから次のイベントに移ります。TestStream
により、トリガーの影響と許容される遅延をパイプライン上でモニタリングしテストすることができます。これにはトリガー遅延や遅延によるデータの欠損に関するロジックも含まれています。
タスク 1. パイプラインのメインコードについて学ぶ
IDE で 8b_Stream_Testing_Pipeline/lab に移動します。
このディレクトリには依存関係を定義するための pom.xml
ファイルがあります。また、サブディレクトリ 2 つを含む src
フォルダもあります。src/main
フォルダにはパイプライン パッケージ コードがあり、src/test
フォルダには使用するテストコードがあります。
まず、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java の順に開きます。
このファイルにはパイプラインで使用する予定の TaxiRide
クラスの定義が含まれています。TaxiRide
クラスには関連するスキーマがあります。@DefaultSchema
アノテーションを使用してこのスキーマを定義する手順はすでにご存じのはずです。
次に、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java の順に開きます。
これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。
DoFn
JsonToTaxiRide
(94 行目以降)は、TaxiRide
クラスのオブジェクトが受け取る Pub/Sub メッセージの変換に使用されます。
PTransform
TaxiCountTransform
(113 行目以降)について。この PTransform
には、パイプラインのカウントとウィンドウ処理に使用するメインのロジックが含まれています。今回のテストでフォーカスするのはこの PTransform
です。
TaxiCountTransform
の出力では、ウィンドウごとに記録されたタクシー乗車がすべてカウントされているはずです。しかし、乗車ごとに複数のイベント(ピックアップや降車など)があります。
そこで、乗車ごとに 1 回だけカウントするように、ride_status
プロパティをフィルタリングします。ride_status
が「pickup」の場合にのみ、要素を残すようにします。
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))
少し詳しく見ていきます。このパイプラインで使用されているウィンドウ処理のロジックが、以下に含まれています。
.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
.withAllowedLateness(Duration.standardMinutes(1))
.accumulatingFiredPanes())
60 秒の長さの固定ウィンドウに収まるように処理します。始めのほうにはトリガーはなく、ウォーターマークがウィンドウの最後を過ぎた後に結果が出力されます。受信する新しい要素それぞれに遅延のあるトリガー起動が含まれますが、許容範囲である 1 分を過ぎた呼び出しは含まれません。また、遅延の許容範囲が過ぎるまでは、ウィンドウ内の状態が累積されます。
タスク 2. TestStream の使い方を学び、最初のテストを実施する
8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java の順に開きます。
最初の目標は、テストコードでの TestStream
の使用方法を理解することです。TestStream
クラスによって、メッセージのリアルタイムなストリーミングをシミュレーションしつつ、処理時間とウォーターマークの進行を管理できることを思い出してください。
以下には、最初のテストのコード(66 行目以降)が含まれています。
TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of())
.advanceWatermarkTo(startTime)
.addElements(
TimestampedValue.of(json.format(json, "pickup"),
startTime),
TimestampedValue.of(json.format(json, "enroute"),
startTime),
TimestampedValue.of(json.format(json, "pickup"),
startTime))
.addElements(
TimestampedValue.of(json.format(json, "pickup"),
startTime.plus(Duration.standardMinutes(1))))
.advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1)))
.addElements(
TimestampedValue.of(json.format(json, "pickup"),
startTime.plus(Duration.standardMinutes(2))))
.advanceWatermarkToInfinity();
create
メソッドを使用して新しい TestStream
を作成し、一方でコーダーも指定します。JSON メッセージを文字列として渡すことで、StringUtf8Coder
を使用できます。次に、上記のコードでの TestStream
の役割を確認します。
TestStream
は、以下の役割を担っています。
最初のウォーターマークを変数 startTime
(Instant(0)
)に設定します。
3 つの要素を、startTime
のイベント タイムスタンプとともに文字列に追加します。これらイベントのうち 2 つはカウントされます(ride_status = "pickup"
)が、残りの 1 つはカウントされません。
もう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは startTime
の 1 分後です。
ウォーターマークを startTime
の 1 分後まで進めて、最初のウィンドウをトリガーします。
さらにもう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは startTime
の 2 分後です。
ウォーターマークを「infinity」まで進めます。これですべてのウィンドウが終了します。この後のデータはすべて、許容される遅延期限を過ぎています。
最初のテストのコードの残り部分は、パート 1 のバッチの例に似ていますが、今回は Create
変換の代わりに TestStream
を使用します。
PCollection<Long> outputCount = p
.apply(createEvents)
.apply(new TaxiCountTransform());
IntervalWindow window1 = new IntervalWindow(startTime,
startTime.plus(Duration.standardMinutes(1)));
IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)),
startTime.plus(Duration.standardMinutes(2)));
IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)),
startTime.plus(Duration.standardMinutes(3)));
PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L);
PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L);
PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L);
p.run().waitUntilFinish();
この上のコードでは、出力 PCollection
(outputCount
)を、TestStream
を作成して TaxiCountTransform
PTransform
を適用することで定義しています。チェック対象のウィンドウは InvervalWindow
クラスを使用して定義し、その後 PAssert
を inWindow
メソッドで使用して各ウィンドウの結果を確認しています。
IDE のターミナルに戻るか新しくターミナルを開きます。次のコマンドを実行することによって、正しいディレクトリに移動して依存関係をインストールします。
# ディレクトリをラボ用のものに変更
cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab
# 依存関係をダウンロード
mvn clean dependency:resolve
export BASE_DIR=$(pwd)
次のコマンドを実行して、上記のテストを行います。
mvn test
テストが終わると、次の出力が確認できるはずです(経過時間は異なる場合があります)。
[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 31.629 s
[INFO] Finished at: 2021-05-13T12:24:20-04:00
[INFO] ------------------------------------------------------------------------
タスク 3. 遅延データ処理のテスト用に TestStream を作成する
このタスクでは、遅延データの処理に関連するロジックをテストするための TestStream
のコードを記述します。
再度、8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java の順に開き、testTaxiRideLateData
メソッドのコメント(104 行目付近)まで下にスクロールします。
このタスクで使用するコードを完成させるため、テスト用のコードのコメント化を解除します。
@Test
@Category(NeedsRunner.class)
public void testTaxiRideLateData() throws Exception {
Instant startTime = new Instant(0);
String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,"
+ "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,"
+ "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}";
TestStream<String> createEvents = /* CreateTestStream */
PCollection<Long> outputCount = p
.apply(createEvents)
.apply(new TaxiCountTransform());
IntervalWindow window1 = new IntervalWindow(startTime,
startTime.plus(Duration.standardMinutes(1)));
PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L);
PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L);
p.run().waitUntilFinish();
}
テスト用のコードは、TestStream
の作成を除いて完成しています。
このタスクを完了するには、以下のタスクを行う TestStream
オブジェクトを作成します。
ウォーターマークを startTime
まで進めます。
2 つの TimestampedValue
(値 json.format(json, "pickup")
、タイムスタンプ startTime
)を追加します。
ウォーターマークを startTime
の 1 分後まで進めます。
もう 1 つ別の TimestamedValue
(値 json.format(json, "pickup")
、タイムスタンプ startTime
)を追加します。
ウォーターマークを startTime
の 2 分後まで進めます。
もう 1 つ別の TimestamedValue
(値 json.format(json, "pickup")
、タイムスタンプ startTime
)を追加します。
ウォーターマークを infinity まで進めます。
これで要素が 4 つある TestStream
が作成されます。要素はいずれも最初のウィンドウに属しています。最初の 2 要素は時間通り、2 番目の要素は遅延あり(ただし許容される範囲内)、最後の要素は許容される範囲外までの遅延ありとなっています。トリガ―されたペインは累積されるため、最初のトリガーは 2 つのイベントをカウントし、最後のトリガーは 3 つのイベントをカウントするはずです。4 番目のイベントは含まれません。このチェックには、PAssert
クラスの inOnTimePane
メソッドと inFinalPane
メソッドを使用します。
不明な点がある場合、ソリューション を参照してください。
タスク 4. 遅延データ処理をテストする
ターミナルに戻り、以下のコマンドを実行してもう一度テストを行います。
mvn test
ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。
[INFO] Results:
[INFO]
[INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.849 s
[INFO] Finished at: 2021-05-13T13:10:32-04:00
[INFO] ------------------------------------------------------------------------
[進行状況を確認 ] をクリックして、目標に沿って進んでいることを確認します。
TestStream でストリーム処理ロジックをテストする
ラボを終了する
ラボが完了したら、[ラボを終了 ] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信 ] をクリックします。
星の数は、それぞれ次の評価を表します。
星 1 つ = 非常に不満
星 2 つ = 不満
星 3 つ = どちらともいえない
星 4 つ = 満足
星 5 つ = 非常に満足
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート ] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。