arrow_back

Serverless Data Processing with Dataflow - Testing with Apache Beam (Java)

로그인 가입
지식을 테스트하고 커뮤니티와 공유하기
done
700개 이상의 실무형 실습, 기술 배지, 과정에 액세스

Serverless Data Processing with Dataflow - Testing with Apache Beam (Java)

실습 1시간 30분 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
지식을 테스트하고 커뮤니티와 공유하기
done
700개 이상의 실무형 실습, 기술 배지, 과정에 액세스

Overview

In this lab, you:

  • Write unit tests for DoFns and PTransforms using testing tools in Apache Beam.
  • Perform a pipeline integration test.
  • Use the TestStream class to test windowing behavior for a streaming pipeline.

Testing your pipeline is a particularly important step in developing an effective data-processing solution. The indirect nature of the Beam model can make debugging failed runs a non-trivial task.

In this lab, we will explore how to perform unit tests locally with tools in the testing package of the Beam SDK, using the DirectRunner.

Setup and requirements

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Qwiklabs using an incognito window.

  2. Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
    There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

  4. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.

  5. Click Open Google Console.

  6. Click Use another account and copy/paste credentials for this lab into the prompts.
    If you use other credentials, you'll receive errors or incur charges.

  7. Accept the terms and skip the recovery resource page.

Check project permissions

Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (Navigation menu icon), select IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.

Compute Engine default service account name and editor status highlighted on the Permissions tabbed page

Note: If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.
  1. In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
  2. Copy the project number (e.g. 729328892908).
  3. On the Navigation menu, select IAM & Admin > IAM.
  4. At the top of the roles table, below View by Principals, click Grant Access.
  5. For New principals, type:
{project-number}-compute@developer.gserviceaccount.com
  1. Replace {project-number} with your project number.
  2. For Role, select Project (or Basic) > Editor.
  3. Click Save.

Setting up your IDE

For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud command line tool, similar to Cloud Shell.

  1. To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
Note: You may need to wait 3-5 minutes for the environment to be fully provisioned, even after the url appears. Until then you will see an error in the browser.

Credentials pane displaying the ide_url

The lab repo has been cloned to your environment. Each lab is divided into a labs folder with code to be completed by you, and a solution folder with a fully workable example to reference if you get stuck.

  1. Click on the File Explorer button to look:

Expanded File Explorer menu with the labs folder highlighted

You can also create multiple terminals in this environment, just as you would with cloud shell:

New Terminal option highlighted in the Terminal menu

You can see with by running gcloud auth list on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:

Terminal dislaying the gcloud auth list command

If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:

Both the Reset button and VM instance name highlighted on the VM instances page

The lab code is split between two folders: 8a_Batch_Testing_Pipeline/lab and 8b_Stream_Testing_Pipeline/lab. If you get stuck at any point, the solution can be found in the corresponding solution folders.

Lab part 1. Performing unit tests for DoFns and PTransforms

In this part of the lab, we will perform unit testing on DoFns and PTransforms for a batch pipeline computing statistics from weather sensors. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:

  • Create a TestPipeline.
  • Create some test input data and use the Create transform to create a PCollection of your input data.
  • Apply your transform to the input PCollection and save the resulting PCollection.
  • Use PAssert and its subclasses to verify that the output PCollection contains the elements that you expect.

TestPipeline is a special class included in the Beam SDK specifically for testing your transforms and pipeline logic.

  • When testing, use TestPipeline instead of Pipeline when you create the pipeline object:
TestPipeline p = TestPipeline.create();

The Create transfrom takes an in-memory collection of objects (a Java iterable) and creates a PCollection from this collection. The goal is to have a small set of test input data, for which we know the expected output PCollection, from our PTransforms.

List<String> input = Arrays.asList(testInput); // Some code to create a TestPipeline p outputPColl = p.apply(Create.of(input).apply(...);

Finally, we want to check that the output PCollection matches the expected output. We use the PAssert class to verify this. For example, we can use the containsInAnyOrder method to verify that the output PCollection has the correct elements:

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

Task 1. Explore the main pipeline code

  1. Navigate to 8a_Batch_Testing_Pipeline/lab in your IDE.

This directory contains a pom.xml file for defining dependecies, and the src folder, which contains two subdirectories. The src/main folder contains the pipeline package code and the src/test folder will contain our testing code.

  1. First open 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.

This file contains the definition of the WeatherRecord class we will use in our pipeline. The WeatherRecord class has an associated schema, and the steps for defining the schema using the @DefaultSchema annotation should be familiar. However, note that we must override the equals method in defining the class.

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

Why is this? PAssert will use the equals method to verify membership in the output PCollection. However, the default equals method for a POJO (Plain Old Java Object) only compares the addresses of the objects. We want to be sure that we are instead comparing the contents of the objects. This is simple to do as shown above.

  1. Now open 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.

This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:

  • The DoFns ConvertCsvToWeatherRecord (starting on line 65) and ConvertTempUnits (starting on line 81). We will be performing unit testing on these DoFns later.
  • The PTransform ComputeStatistics (starting on line 103). This is an example of a composite transform that we will be able to test in the same manner as a DoFn.
  • The PTransform WeatherStatsTransform (starting on line 123. This PTransform contains the processing logic for our entire pipeline (minus the source and sink transforms) so that we can perform a small pipeline integration test on synthetic data created by a Create transform.

If you notice a logical error in the processing code, do not fix it yet! Later we will see how to narrow down the error via testing.

Task 2. Add dependencies for testing

  1. Now open 8a_Batch_Testing_Pipeline/lab/pom.xml.

We need to add a few dependecies for testing. Any Beam Java code for testing must link in JUnit and Hamcrest. In Maven, we simply need to update the pom.xml file.

  1. To complete this task, copy and paste the following XML into the pom.xml file where noted in a comment:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

Note that the scope for these dependencies is "test". We will need these packages when we run a test with mvn test, but not when executing the main pipeline.

Task 3. Write first DoFn unit test in Apache Beam

  1. Now navigate to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.

This file contains the code for our DoFn and PTransform unit tests. The code is mostly commented out now, but we will be uncommenting the code as we go along.

We will start by exploring a DoFn unit test for our ConvertCsvToWeatherRecord DoFn (starting on line 43).

  1. First we create a class for testing our pipeline and create a TestPipeline object:
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

We will use this TestPipeline object in all of the following tests, though we will not need to worry about side effects of reusing the same object due to the transient keyword when creating the object.

  1. Now look at the (incomplete) code for our first test:
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* Create PCollection from in-memory object */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // Include PAssert statement to check for correct results p.run().waitUntilFinish(); }

We annotate the method we will use to test our pipeline with the @Test annotation. We create a single test input (testInput) representing a line of a CSV file (the expected input format for our pipeline) and put it into a List object input.

There are a couple of pieces missing in the rest of the code for the test.

  1. To complete this task, first add the Create transform to convert input into a PCollection.

  2. Second, include a PAssert statement using the containsInAnyOrder method to compare input with testOutput.

If you get stuck, you can refer to later commented tests or the solutions.

Task 4. Run first DoFn unit test

  1. Create a new terminal in your IDE environment, if you have not already, and paste the following command:
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

Now we're ready to run our test.

  1. To do so, simply run the following command in your terminal:
mvn test

If you correctly completed the previous task, you should see the following in your terminal after the test completes (the exact time elapsed will differ):

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Task 5. Run second DoFn unit test and debug pipeline

  1. Go back to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java and uncomment the code for the second unit test (around lines 67-80). You can do this by highlighting the code and pressing Ctrl + / (or Cmd + / on MacOS). The code is shown below for reference:
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

This test ensures that the ConvertTempUnits() DoFn is working as intended.

  1. Save WeatherStatisticsPipelineTest.java and return to your terminal.

  2. Once again, execute the follow command to run the tests:

mvn test

The test fails this time! If we scroll through the output, we can find the following information about the test failure:

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [] in any order but: not matched:

At first glance, this may not seem like the most useful error message. However, we can see that the expected WeatherRecord in testOutput was not matched. Maybe there is something wrong with how we performed the temperature conversion.

  1. Return to 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java and scroll down to the definition of ConvertTempUnits (around line 81).

  2. To complete this task, find the error in the DoFn processing logic and rerun the mvn test command to confirm that the test now succeeds. As a reminder, the formula for converting degrees Celsius to Farenheit is given below:

tempF = tempC * 1.8 + 32.0

If you get stuck, you can refer to the solutions.

Task 6. Run PTransform unit test and test end-to-end pipeline

  1. Return to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java and uncomment the code for the final two tests (starting around line 84).

The first test we just uncommented is testing the composit PTransform ComputeStatistics. A truncated form of the code is presented below for reference:

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //Define Testing Inputs (Omitted here) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

Notice that this looks very similar to our earlier DoFn unit tests! The only real difference (other than different test inputs and outputs) is that we are applying the PTransform rather than ParDo(new DoFn()).

The final test is for the end-to-end pipeline. In the pipeline code (WeatherStatisticsPipeline.java) the entire end-to-end pipeline minus the source and sink has been included in a single PTransform WeatherStatsTransform.

  1. To test the end-to-end pipeline, we can repeat something similar to what we did above, but using that PTransform instead:
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //Define Testing Inputs (Omitted here) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. Now return to your terminal and execute the follow command to run the tests one more time:
mvn test

If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Click Check my progress to verify the objective. Perform unit tests for DoFns and PTransforms

Lab part 2. Testing stream processing logic with TestStream

In this part of the lab, we will perform unit testing for a streaming pipeline computing windowed counts of taxi rides. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:

  • Create a TestPipeline.
  • Use the TestStream class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time.
  • Use PAssert and its subclasses to verify that the output PCollection contains the elements that you expect in specific windows.

When executing a pipeline that reads from a TestStream, the read waits for all of the consequences of each event to complete before moving on to the next event, including when processing time advances and the appropriate triggers fire. TestStream allows for the effect of triggering and allowed lateness to be observed and tested on a pipeline. This includes logic around late triggers and dropped data due to lateness.

Task 1. Explore the main pipeline code

  1. Navigate to 8b_Stream_Testing_Pipeline/lab in your IDE.

This directory contains a pom.xml file for defining dependecies, and the src folder, which contains two subdirectories. The src/main folder contains the pipeline package code and the src/test folder will contain our testing code.

  1. First, open 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.

This file contains the definition of the TaxiRide class we will use in our pipeline. The TaxiRide class has an associated schema and the steps for defining the schema using the @DefaultSchema annotation should be familiar.

  1. Now open 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.

This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:

  • The DoFn JsonToTaxiRide (starting on line 94) used to convert the incoming Pub/Sub messages into objects of the TaxiRide class.
  • The PTransform TaxiCountTransform (starting on line 113). This PTransform contains the main counting and windowing logic of the pipeline. Our tests will focus on this PTransform.

The output of the TaxiCountTransform should be a count of all of the recorded taxi rides per window. However, we will have multiple events per ride (pickup, dropoff, etc.).

  1. We will filter on the ride_status property to ensure that we only count each ride once. We will do this by only keeping elements with ride_status equal to "pickup":
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

Zooming in a bit more, the windowing logic being used in our pipeline is included below:

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

We will window into fixed windows of 60 seconds' length. We do not have an early trigger, but will emit results after the watermark passes the end of the window. We include late firings with every new element that comes in, but will only do so with an allowed lateness of 1 minute. Finally, we will accumulate state in windows until the allowed lateness has passed.

Task 2. Explore TestStream usage and run first test

  1. Now open 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.

The first goal will be to understand the usage of TestStream in our testing code. Recall that the TestStream class allows us to simulate a real-time stream of message while having control over the progression of processing time and the watermark.

The code from the first test (starting on line 66) is included below:

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

We create a new TestStream using the create method while also specifying the coder. We will pass in the JSON message as a string, so we can use the StringUtf8Coder. What is the above TestStream doing?

The TestStream is performing the following tasks:

  • Set the initial watermark to the variable startTime (Instant(0)).
  • Add three elements to the string with an event timestamp of startTime. Two of these events will be counted (ride_status = "pickup") and the other will not.
  • Add another "pickup" event, but with an event timestamp of one minute after startTime.
  • Advance the watermark to one minute past startTime, triggering the first window.
  • Add another "pickup" event, but with an event timestamp of two minutes after startTime.
  • Advance the watermark to "infinity". This means that all windows will now be closed and any new data will be beyond any allowed lateness.
  1. The rest of the code for the first test is similar to the previous batch example, but we now use the TestStream instead of the Create transform:
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

In the above code, we define out output PCollection (outputCount) by creating the TestStream and applying the TaxiCountTransform PTransform. We use the InvervalWindow class to define the windows we wish to check, and then use PAssert with the inWindow method to verify results per window.

  1. Now return to the termnial in your IDE (or open a new terminal) and run the following commands to relocate to the correct directory and install dependencies:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Now run the test above by executing the following command:
mvn test

You should see the following output after the test (though the elapsed time may differ):

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

Task 3. Create TestStream to test late data handling

In this task you will write code for a TestStream to test logic around handling late data.

  1. Return to 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java and scroll down to where the testTaxiRideLateData method is commented out (around line 104).

  2. Uncomment the code for this test, as we will be completing the code for this task:

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

The code for the test is complete outside of the creating of the TestStream.

  1. To complete this task, create a TestStream object that performs the following tasks:
  • Advances the watermark to startTime.
  • Adds two TimestampedValues with value json.format(json, "pickup") and timestame startTime.
  • Advances the watermark to one minute after the startTime.
  • Adds another TimestamedValuewith value json.format(json, "pickup") and timestame startTime.
  • Advances the watermark to two minutes after the startTime.
  • Adds another TimestamedValuewith value json.format(json, "pickup") and timestame startTime.
  • Advances the watermark to infinity.

This will create a TestStream with four elements belonging to our first window. The first two elements are on-time, the second element is late (but within the allowed lateness), and the final element is late and beyond the allowed lateness. Since we are accumulating fired panes, the first trigger should count two events and the final trigger should count three events. The fourth event should not be included. We can check this by using the inOnTimePane and inFinalPane methods of the PAssert class.

If you get stuck, you can refer to the solutions.

Task 4. Run test for late data handling

  • Now return to your terminal and execute the following command to run the tests one more time:
mvn test

If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

Click Check my progress to verify the objective. Test stream processing logic with TestStream

End your lab

When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

현재 이 콘텐츠를 이용할 수 없습니다

We will notify you via email when it becomes available

감사합니다

We will contact you via email if it becomes available