
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
In this lab, you:
DoFns
and PTransforms
using testing tools in Apache Beam.TestStream
class to test windowing behavior for a streaming pipeline.Testing your pipeline is a particularly important step in developing an effective data-processing solution. The indirect nature of the Beam model can make debugging failed runs a non-trivial task.
In this lab, we will explore how to perform unit tests locally with tools in the testing package of the Beam SDK, using the DirectRunner
.
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00
), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com
is present and has the editor
role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
editor
role, follow the steps below to assign the required role.729328892908
).{project-number}
with your project number.For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud
command line tool, similar to Cloud Shell.
The lab repo has been cloned to your environment. Each lab is divided into a labs
folder with code to be completed by you, and a solution
folder with a fully workable example to reference if you get stuck.
File Explorer
button to look:You can also create multiple terminals in this environment, just as you would with cloud shell:
You can see with by running gcloud auth list
on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:
If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:
The lab code is split between two folders: 8a_Batch_Testing_Pipeline/lab and 8b_Stream_Testing_Pipeline/lab. If you get stuck at any point, the solution can be found in the corresponding solution folders.
In this part of the lab, we will perform unit testing on DoFns and PTransforms for a batch pipeline computing statistics from weather sensors. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
TestPipeline
.Create
transform to create a PCollection
of your input data.PCollection
and save the resulting PCollection
.PAssert
and its subclasses to verify that the output PCollection
contains the elements that you expect.TestPipeline
is a special class included in the Beam SDK specifically for testing your transforms and pipeline logic.
TestPipeline
instead of Pipeline
when you create the pipeline object:The Create
transfrom takes an in-memory collection of objects (a Java iterable) and creates a PCollection
from this collection. The goal is to have a small set of test input data, for which we know the expected output PCollection
, from our PTransforms
.
Finally, we want to check that the output PCollection matches the expected output. We use the PAssert
class to verify this. For example, we can use the containsInAnyOrder
method to verify that the output PCollection has the correct elements:
This directory contains a pom.xml
file for defining dependecies, and the src
folder, which contains two subdirectories. The src/main
folder contains the pipeline package code and the src/test
folder will contain our testing code.
This file contains the definition of the WeatherRecord
class we will use in our pipeline. The WeatherRecord
class has an associated schema, and the steps for defining the schema using the @DefaultSchema
annotation should be familiar. However, note that we must override the equals
method in defining the class.
Why is this? PAssert
will use the equals
method to verify membership in the output PCollection
. However, the default equals method for a POJO (Plain Old Java Object) only compares the addresses of the objects. We want to be sure that we are instead comparing the contents of the objects. This is simple to do as shown above.
This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
DoFns
ConvertCsvToWeatherRecord
(starting on line 65) and ConvertTempUnits
(starting on line 81). We will be performing unit testing on these DoFns
later.PTransform
ComputeStatistics
(starting on line 103). This is an example of a composite transform that we will be able to test in the same manner as a DoFn
.PTransform
WeatherStatsTransform
(starting on line 123. This PTransform
contains the processing logic for our entire pipeline (minus the source and sink transforms) so that we can perform a small pipeline integration test on synthetic data created by a Create
transform.If you notice a logical error in the processing code, do not fix it yet! Later we will see how to narrow down the error via testing.
We need to add a few dependecies for testing. Any Beam Java code for testing must link in JUnit
and Hamcrest
. In Maven, we simply need to update the pom.xml
file.
pom.xml
file where noted in a comment:Note that the scope for these dependencies is "test". We will need these packages when we run a test with mvn test
, but not when executing the main pipeline.
This file contains the code for our DoFn
and PTransform
unit tests. The code is mostly commented out now, but we will be uncommenting the code as we go along.
We will start by exploring a DoFn
unit test for our ConvertCsvToWeatherRecord DoFn
(starting on line 43).
TestPipeline
object:We will use this TestPipeline
object in all of the following tests, though we will not need to worry about side effects of reusing the same object due to the transient
keyword when creating the object.
We annotate the method we will use to test our pipeline with the @Test
annotation. We create a single test input (testInput
) representing a line of a CSV file (the expected input format for our pipeline) and put it into a List
object input
.
There are a couple of pieces missing in the rest of the code for the test.
To complete this task, first add the Create
transform to convert input
into a PCollection
.
Second, include a PAssert
statement using the containsInAnyOrder
method to compare input
with testOutput
.
If you get stuck, you can refer to later commented tests or the solutions.
Now we're ready to run our test.
If you correctly completed the previous task, you should see the following in your terminal after the test completes (the exact time elapsed will differ):
This test ensures that the ConvertTempUnits()
DoFn
is working as intended.
Save WeatherStatisticsPipelineTest.java
and return to your terminal.
Once again, execute the follow command to run the tests:
The test fails this time! If we scroll through the output, we can find the following information about the test failure:
At first glance, this may not seem like the most useful error message. However, we can see that the expected WeatherRecord
in testOutput
was not matched. Maybe there is something wrong with how we performed the temperature conversion.
Return to 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java and scroll down to the definition of ConvertTempUnits
(around line 81).
To complete this task, find the error in the DoFn
processing logic and rerun the mvn test
command to confirm that the test now succeeds. As a reminder, the formula for converting degrees Celsius to Farenheit is given below:
If you get stuck, you can refer to the solutions.
The first test we just uncommented is testing the composit PTransform
ComputeStatistics
. A truncated form of the code is presented below for reference:
Notice that this looks very similar to our earlier DoFn
unit tests! The only real difference (other than different test inputs and outputs) is that we are applying the PTransform
rather than ParDo(new DoFn())
.
The final test is for the end-to-end pipeline. In the pipeline code (WeatherStatisticsPipeline.java) the entire end-to-end pipeline minus the source and sink has been included in a single PTransform
WeatherStatsTransform
.
PTransform
instead:If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
Click Check my progress to verify the objective.
In this part of the lab, we will perform unit testing for a streaming pipeline computing windowed counts of taxi rides. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
TestPipeline
.TestStream
class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time.PAssert
and its subclasses to verify that the output PCollection
contains the elements that you expect in specific windows.When executing a pipeline that reads from a TestStream
, the read waits for all of the consequences of each event to complete before moving on to the next event, including when processing time advances and the appropriate triggers fire. TestStream
allows for the effect of triggering and allowed lateness to be observed and tested on a pipeline. This includes logic around late triggers and dropped data due to lateness.
This directory contains a pom.xml
file for defining dependecies, and the src
folder, which contains two subdirectories. The src/main
folder contains the pipeline package code and the src/test
folder will contain our testing code.
This file contains the definition of the TaxiRide
class we will use in our pipeline. The TaxiRide
class has an associated schema and the steps for defining the schema using the @DefaultSchema
annotation should be familiar.
This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
DoFn
JsonToTaxiRide
(starting on line 94) used to convert the incoming Pub/Sub messages into objects of the TaxiRide
class.PTransform
TaxiCountTransform
(starting on line 113). This PTransform
contains the main counting and windowing logic of the pipeline. Our tests will focus on this PTransform
.The output of the TaxiCountTransform
should be a count of all of the recorded taxi rides per window. However, we will have multiple events per ride (pickup, dropoff, etc.).
ride_status
property to ensure that we only count each ride once. We will do this by only keeping elements with ride_status
equal to "pickup":Zooming in a bit more, the windowing logic being used in our pipeline is included below:
We will window into fixed windows of 60 seconds' length. We do not have an early trigger, but will emit results after the watermark passes the end of the window. We include late firings with every new element that comes in, but will only do so with an allowed lateness of 1 minute. Finally, we will accumulate state in windows until the allowed lateness has passed.
The first goal will be to understand the usage of TestStream
in our testing code. Recall that the TestStream
class allows us to simulate a real-time stream of message while having control over the progression of processing time and the watermark.
The code from the first test (starting on line 66) is included below:
We create a new TestStream
using the create
method while also specifying the coder. We will pass in the JSON message as a string, so we can use the StringUtf8Coder
. What is the above TestStream
doing?
The TestStream
is performing the following tasks:
startTime
(Instant(0)
).startTime
. Two of these events will be counted (ride_status = "pickup"
) and the other will not.startTime
.startTime
, triggering the first window.startTime
.TestStream
instead of the Create
transform:In the above code, we define out output PCollection
(outputCount
) by creating the TestStream
and applying the TaxiCountTransform
PTransform
. We use the InvervalWindow
class to define the windows we wish to check, and then use PAssert
with the inWindow
method to verify results per window.
You should see the following output after the test (though the elapsed time may differ):
In this task you will write code for a TestStream
to test logic around handling late data.
Return to 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java and scroll down to where the testTaxiRideLateData
method is commented out (around line 104).
Uncomment the code for this test, as we will be completing the code for this task:
The code for the test is complete outside of the creating of the TestStream
.
TestStream
object that performs the following tasks:startTime
.TimestampedValue
s with value json.format(json, "pickup")
and timestame startTime
.startTime
.TimestamedValue
with value json.format(json, "pickup")
and timestame startTime
.startTime
.TimestamedValue
with value json.format(json, "pickup")
and timestame startTime
.This will create a TestStream
with four elements belonging to our first window. The first two elements are on-time, the second element is late (but within the allowed lateness), and the final element is late and beyond the allowed lateness. Since we are accumulating fired panes, the first trigger should count two events and the final trigger should count three events. The fourth event should not be included. We can check this by using the inOnTimePane
and inFinalPane
methods of the PAssert
class.
If you get stuck, you can refer to the solutions.
If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
Click Check my progress to verify the objective.
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
Questi contenuti non sono al momento disponibili
Ti invieremo una notifica via email quando sarà disponibile
Bene.
Ti contatteremo via email non appena sarà disponibile
One lab at a time
Confirm to end all existing labs and start this one