arrow_back

Dataflow でのサーバーレスなデータ処理 - Apache Beam(Java)でのテスト

ログイン 参加
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

Dataflow でのサーバーレスなデータ処理 - Apache Beam(Java)でのテスト

ラボ 1時間 30分 universal_currency_alt クレジット: 7 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
知識をテストして、コミュニティで共有しましょう
done
700 を超えるハンズオンラボ、スキルバッジ、コースへのアクセス

概要

このラボの内容:

  • Apache Beam にあるテストツールを使用して、DoFnsPTransforms 用の単体テストを記述します。
  • パイプライン統合テストを実施します。
  • TestStream クラスを使用して、ストリーミング パイプライン向けのウィンドウ処理の動作をテストします。

パイプラインのテストは、効果的なデータ処理ソリューションの開発で特に重要なステップです。Beam モデルの間接的な性質により、失敗した実行のデバッグが複雑なタスクになることがあります。

このラボでは、Beam SDK のテスト用パッケージに含まれるツールを使い、ローカルで DirectRunner による単体テストを行う方法を説明します。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

統合開発環境(IDE)を設定する

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。

注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。ファイル エクスプローラ ボタンをクリックして確認します。

file_explorer

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

new_terminal

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud_auth

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

gce_reset

このラボのコードは 2 つのフォルダに分けてあります。8a_Batch_Testing_Pipeline/lab8b_Stream_Testing_Pipeline/lab です。不明な点がある場合、対応するソリューション フォルダ内を探すことで解決策が見つかります。

ラボのパート 1. DoFns と PTransforms の単体テスト

ラボのこのセクションでは、気象センサーから得た統計情報を計算するバッチ パイプラインのために、DoFns と PTransforms の単体テストを実施します。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。

  • TestPipeline を作成します。
  • テスト入力データを作成し、Create 変換を使って入力データの PCollection を作成します。
  • 入力用の PCollection に変換を適用し、結果として得られた PCollection を保存します。
  • PAssert とそのサブクラスを使用して、期待される要素が出力 PCollection に含まれることを確認します。

TestPipeline は Beam SDK に含まれる特別なクラスで、特に変換とパイプライン ロジックのテストに使います。

  • テストでは、パイプライン オブジェクトの作成時に、Pipeline の代わりに TestPipeline を使用します。
TestPipeline p = TestPipeline.create();

Create 変換では、オブジェクト(Java Iterable)のインメモリ コレクションを利用して、このコレクションから PCollection を作成します。目的は、期待される出力 PCollection がわかっている小さめのテスト入力データを PTransforms から得ることです。

List<String> input = Arrays.asList(testInput); // TestPipeline p を作成するためのコード outputPColl = p.apply(Create.of(input).apply(...);

最後に、出力 PCollection が期待される出力と一致するかを確認します。この確認には PAssert クラスを使用します。たとえば、containsInAnyOrder メソッドを使用すると、出力 PCollection に正しい要素があるかどうかを確認できます。

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

タスク 1. パイプラインのメインコードについて学ぶ

  1. IDE で 8a_Batch_Testing_Pipeline/lab を開きます。

このディレクトリには依存関係を定義するための pom.xml ファイルがあります。また、サブディレクトリ 2 つを含む src フォルダもあります。src/main フォルダにはパイプライン パッケージ コードがあり、src/test フォルダには使用するテストコードがあります。

  1. まず 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java の順に開きます。

このファイルにはパイプラインで使用する予定の WeatherRecord クラスの定義が含まれています。WeatherRecord クラスには関連するスキーマがあります。@DefaultSchema アノテーションを使用してこのスキーマを定義する手順は、すでにご存じのはずです。ただし、クラスを定義する際に equals メソッドをオーバーライドしなければならないことにご注意ください。

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

これはなぜでしょうか。PAssertequals メソッドを使用して出力 PCollection にあるメンバーシップを確認します。しかし、POJO(Plain Old Java Object)に使用するデフォルトの equals メソッドでは、オブジェクトのアドレスの比較のみが行われます。ここでは、オブジェクトの中身を突き合わせて確認する必要があります。オーバーライドは、上記のように簡単に行えます。

  1. 次に、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開きます。

これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。

  • DoFns ConvertCsvToWeatherRecord(65 行目以降)と ConvertTempUnits(81 行目以降)については、DoFns の単体テストを後ほど行います。
  • PTransform ComputeStatistics(103 行目以降)について。これは、DoFn と同様にテストできる複合変換の例です。
  • PTransform WeatherStatsTransform(123 行目以降)について。この PTransform には、Create 変換で作成した合成データに小規模なパイプライン統合テストを行えるようにするために、パイプライン全体(ソースとシンク変換を除く)に使用する処理ロジックが含まれています。

処理コードに含まれる論理エラーに気付いても、まだ修正しないでください。この後に、テストによってエラーを絞り込む方法を確認します。

タスク 2. テスト用の依存関係を追加する

  1. 8a_Batch_Testing_Pipeline/lab/pom.xml を開きます。

テストのために依存関係をいくつか追加する必要があります。テスト用の Beam の Java コードでは、JUnitHamcrest をリンクする必要があります。Maven では pom.xml ファイルを更新するだけでこれを行うことができます。

  1. このタスクを完了するには、以下の XML をコピーして pom.xml ファイルのコメントで指示されている部分に貼り付けます。
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

これらの依存関係のスコープは「test」です。mvn test でのテスト実行時はこのパッケージが必要になりますが、メインのパイプラインの実行時には不要です。

タスク 3. Apache Beam に最初の DoFn 単体テストを記述する

  1. 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に移動します。

このファイルには、DoFn および PTransform 単体テストに必要なコードが含まれています。現在、ほとんどのコードがコメントアウトされていますが、作業をしながらコメント化を解除していきます。

まず、ConvertCsvToWeatherRecord DoFn(43 行目以降)に対する DoFn 単体テストについて確認します。

  1. 最初にパイプラインをテストするためのクラスを作成し、次に TestPipeline オブジェクトを作成します。
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

この TestPipeline オブジェクトを以降のテストすべてに使用します。オブジェクト作成時に transient キーワードを含めているため、同じオブジェクトの再使用による副次的な影響の心配はありません。

  1. 最初のテストのための(不完全な)コードを見てみます。
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* インメモリ オブジェクトから PCollection を作成 */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // PAssert ステートメントを含めて正しい結果かチェックする p.run().waitUntilFinish(); }

パイプラインをテストするためのメソッドに、@Test アノテーションを追加します。テスト入力は 1 つだけ(testInput)、CSV ファイルの 1 行を表すものを作成します。これがこのパイプラインに対して期待される入力フォーマットです。このテスト入力を List オブジェクトの input に組み込みます。

テスト用のコードの残りの部分には、欠けているパーツがいくつかあります。

  1. このタスクを完了するには、まず Create 変換を追加して、inputPCollection に変換します。

  2. 次に、containsInAnyOrder メソッドを使用して PAssert ステートメントを追加し、inputtestOutput を比較します。

不明な点がある場合は、後で紹介するコメント付きのテストか、ソリューションを参照してください。

タスク 4. DoFn の最初の単体テストを実施する

  1. IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリをラボ用のものに変更 cd 8a_Batch_Testing_Pipeline/lab # 依存関係をダウンロード mvn clean dependency:resolve export BASE_DIR=$(pwd)

これでテストの準備ができました。

  1. あとは、次のコマンドをターミナルで実行するだけです。
mvn test

ここまでのタスクを正しく完了できていれば、テスト完了後にはターミナルに以下が表示されます(正確な経過時間は異なります)。

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

タスク 5. DoFn の 2 度目の単体テストとパイプラインのデバッグを実施する

  1. 再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、2 度目の単体テストのためにコードのコメント化を解除します(67~80 行付近)。これには、コードをハイライト表示し、Ctrl + / キー(MacOS の場合は Cmd + / キー)を押します。参考のために、以下にコードを示します。
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

このテストにより、ConvertTempUnits() DoFn が想定通りに動作することを確認できます。

  1. WeatherStatisticsPipelineTest.java を保存し、ターミナルに戻ります。

  2. 再度、次のコマンドを実行してテストを行います。

mvn test

今回はテストが失敗しました。スクロールしていくと、テストの失敗について次の情報が見つかります。

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>

一見すると、それほど役に立つエラー メッセージとは思われません。しかし、予想していた testOutputWeatherRecord と一致していなかったことはわかります。温度の変換方法に間違いがあった可能性があります。

  1. 再度、8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java の順に開き、下にスクロールして ConvertTempUnits の定義(81 行目付近)を確認します。

  2. このタスクを完了するには、DoFn 処理ロジックにあるエラーを見つけ、mvn test コマンドをもう一度実行し、テストが成功することを確認します。参考までに、以下に摂氏と華氏を変換するコードを挙げておきます。

tempF = tempC * 1.8 + 32.0

不明な点がある場合、ソリューションを参照してください。

タスク 6. PTransform 単体テストとエンドツーエンドのパイプラインのテストを実施する

  1. 再度、8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java の順に開き、最後の 2 つのテストのためにコードのコメント化を解除します(84 行目付近以降)。

さきほどコメント化解除した最初のテストは、複合的な PTransform ComputeStatistics をテストするものでます。参考までに、抜き出したコードを以下に記載しておきます。

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //テスト入力を定義(ここでは省略) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

前に行った DoFn 単体テストとよく似ていることに注目してください。(テスト入力と出力以外の)違いは、PTransformParDo(new DoFn()) の代わりに使用していることだけです。

最後にエンドツーエンドのパイプラインをテストします。パイプライン コード(WeatherStatisticsPipeline.java)には、エンドツーエンドのパイプライン全体からソースとシンクを削除したものが、1 つの PTransform WeatherStatsTransform にすべて含まれています。

  1. エンドツーエンドのパイプラインのテストでは、ここまでに行ったことと類似の内容を繰り返しますが、代わりに PTransform を使用します。
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //テスト入力を定義(ここでは省略) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. ターミナルに戻り、次のコマンドを実行してもう一度テストを行います。
mvn test

ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 DoFns と PTransforms の単体テストを実施する

ラボのパート 2. TestStream でのストリーム処理ロジックのテスト

このセクションでは、タクシーの乗車回数をウィンドウ処理して計算するストリーミング パイプラインを単体テストします。作成した変換のテストには、以下のパターンと Beam が提供する変換を使用できます。

  • TestPipeline を作成します。
  • ストリーミング データの生成には、TestStream クラスを使用します。これには一連のイベントの生成と、ウォーターマークおよび処理時間の進行が含まれています。
  • PAssert とそのサブクラスを使用して、指定したウィンドウに期待される要素が出力 PCollection に含まれることを確認します。

TestStream から読み取るパイプラインの実行時、読み取り処理は、各イベントですべての結果(処理時間の進行や適切なトリガーの起動など)が完了するまで待ってから次のイベントに移ります。TestStream により、トリガーの影響と許容される遅延をパイプライン上でモニタリングしテストすることができます。これにはトリガー遅延や遅延によるデータの欠損に関するロジックも含まれています。

タスク 1. パイプラインのメインコードについて学ぶ

  1. IDE で 8b_Stream_Testing_Pipeline/lab に移動します。

このディレクトリには依存関係を定義するための pom.xml ファイルがあります。また、サブディレクトリ 2 つを含む src フォルダもあります。src/main フォルダにはパイプライン パッケージ コードがあり、src/test フォルダには使用するテストコードがあります。

  1. まず、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java の順に開きます。

このファイルにはパイプラインで使用する予定の TaxiRide クラスの定義が含まれています。TaxiRide クラスには関連するスキーマがあります。@DefaultSchema アノテーションを使用してこのスキーマを定義する手順はすでにご存じのはずです。

  1. 次に、8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java の順に開きます。

これが今回使用するパイプラインのメインコードです。このパイプラインでのコンセプトについてはこれまでのラボでほぼ説明済みですが、以下の点についてもう少し詳しく見てみます。

  • DoFn JsonToTaxiRide(94 行目以降)は、TaxiRide クラスのオブジェクトが受け取る Pub/Sub メッセージの変換に使用されます。
  • PTransform TaxiCountTransform(113 行目以降)について。この PTransform には、パイプラインのカウントとウィンドウ処理に使用するメインのロジックが含まれています。今回のテストでフォーカスするのはこの PTransform です。

TaxiCountTransform の出力では、ウィンドウごとに記録されたタクシー乗車がすべてカウントされているはずです。しかし、乗車ごとに複数のイベント(ピックアップや降車など)があります。

  1. そこで、乗車ごとに 1 回だけカウントするように、ride_status プロパティをフィルタリングします。ride_status が「pickup」の場合にのみ、要素を残すようにします。
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

少し詳しく見ていきます。このパイプラインで使用されているウィンドウ処理のロジックが、以下に含まれています。

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

60 秒の長さの固定ウィンドウに収まるように処理します。始めのほうにはトリガーはなく、ウォーターマークがウィンドウの最後を過ぎた後に結果が出力されます。受信する新しい要素それぞれに遅延のあるトリガー起動が含まれますが、許容範囲である 1 分を過ぎた呼び出しは含まれません。また、遅延の許容範囲が過ぎるまでは、ウィンドウ内の状態が累積されます。

タスク 2. TestStream の使い方を学び、最初のテストを実施する

  1. 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java の順に開きます。

最初の目標は、テストコードでの TestStream の使用方法を理解することです。TestStream クラスによって、メッセージのリアルタイムなストリーミングをシミュレーションしつつ、処理時間とウォーターマークの進行を管理できることを思い出してください。

以下には、最初のテストのコード(66 行目以降)が含まれています。

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

create メソッドを使用して新しい TestStream を作成し、一方でコーダーも指定します。JSON メッセージを文字列として渡すことで、StringUtf8Coder を使用できます。次に、上記のコードでの TestStream の役割を確認します。

TestStream は、以下の役割を担っています。

  • 最初のウォーターマークを変数 startTimeInstant(0))に設定します。
  • 3 つの要素を、startTime のイベント タイムスタンプとともに文字列に追加します。これらイベントのうち 2 つはカウントされます(ride_status = "pickup")が、残りの 1 つはカウントされません。
  • もう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは startTime の 1 分後です。
  • ウォーターマークを startTime の 1 分後まで進めて、最初のウィンドウをトリガーします。
  • さらにもう 1 つ「pickup」のイベントを追加します。ただし、イベント タイムスタンプは startTime の 2 分後です。
  • ウォーターマークを「infinity」まで進めます。これですべてのウィンドウが終了します。この後のデータはすべて、許容される遅延期限を過ぎています。
  1. 最初のテストのコードの残り部分は、パート 1 のバッチの例に似ていますが、今回は Create 変換の代わりに TestStream を使用します。
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

この上のコードでは、出力 PCollectionoutputCount)を、TestStream を作成して TaxiCountTransform PTransform を適用することで定義しています。チェック対象のウィンドウは InvervalWindow クラスを使用して定義し、その後 PAssertinWindow メソッドで使用して各ウィンドウの結果を確認しています。

  1. IDE のターミナルに戻るか新しくターミナルを開きます。次のコマンドを実行することによって、正しいディレクトリに移動して依存関係をインストールします。
# ディレクトリをラボ用のものに変更 cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # 依存関係をダウンロード mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. 次のコマンドを実行して、上記のテストを行います。
mvn test

テストが終わると、次の出力が確認できるはずです(経過時間は異なる場合があります)。

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

タスク 3. 遅延データ処理のテスト用に TestStream を作成する

このタスクでは、遅延データの処理に関連するロジックをテストするための TestStream のコードを記述します。

  1. 再度、8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java の順に開き、testTaxiRideLateData メソッドのコメント(104 行目付近)まで下にスクロールします。

  2. このタスクで使用するコードを完成させるため、テスト用のコードのコメント化を解除します。

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

テスト用のコードは、TestStream の作成を除いて完成しています。

  1. このタスクを完了するには、以下のタスクを行う TestStream オブジェクトを作成します。
  • ウォーターマークを startTime まで進めます。
  • 2 つの TimestampedValue(値 json.format(json, "pickup")、タイムスタンプ startTime)を追加します。
  • ウォーターマークを startTime の 1 分後まで進めます。
  • もう 1 つ別の TimestamedValue(値 json.format(json, "pickup")、タイムスタンプ startTime)を追加します。
  • ウォーターマークを startTime の 2 分後まで進めます。
  • もう 1 つ別の TimestamedValue(値 json.format(json, "pickup")、タイムスタンプ startTime)を追加します。
  • ウォーターマークを infinity まで進めます。

これで要素が 4 つある TestStream が作成されます。要素はいずれも最初のウィンドウに属しています。最初の 2 要素は時間通り、2 番目の要素は遅延あり(ただし許容される範囲内)、最後の要素は許容される範囲外までの遅延ありとなっています。トリガ―されたペインは累積されるため、最初のトリガーは 2 つのイベントをカウントし、最後のトリガーは 3 つのイベントをカウントするはずです。4 番目のイベントは含まれません。このチェックには、PAssert クラスの inOnTimePane メソッドと inFinalPane メソッドを使用します。

不明な点がある場合、ソリューションを参照してください。

タスク 4. 遅延データ処理をテストする

  • ターミナルに戻り、以下のコマンドを実行してもう一度テストを行います。
mvn test

ここまでのタスクを正しく完了していれば、テスト完了後には以下がターミナルに表示されます。

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 TestStream でストリーム処理ロジックをテストする

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします