Checkpoint
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
Serverless Data Processing with Dataflow - Testing with Apache Beam (Java)
- Overview
- Setup and requirements
- Lab part 1. Performing unit tests for DoFns and PTransforms
- Task 1. Explore the main pipeline code
- Task 2. Add dependencies for testing
- Task 3. Write first DoFn unit test in Apache Beam
- Task 4. Run first DoFn unit test
- Task 5. Run second DoFn unit test and debug pipeline
- Task 6. Run PTransform unit test and test end-to-end pipeline
- Lab part 2. Testing stream processing logic with TestStream
- Task 1. Explore the main pipeline code
- Task 2. Explore TestStream usage and run first test
- Task 3. Create TestStream to test late data handling
- Task 4. Run test for late data handling
- End your lab
Overview
In this lab, you:
- Write unit tests for
DoFns
andPTransforms
using testing tools in Apache Beam. - Perform a pipeline integration test.
- Use the
TestStream
class to test windowing behavior for a streaming pipeline.
Testing your pipeline is a particularly important step in developing an effective data-processing solution. The indirect nature of the Beam model can make debugging failed runs a non-trivial task.
In this lab, we will explore how to perform unit tests locally with tools in the testing package of the Beam SDK, using the DirectRunner
.
Setup and requirements
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
-
Sign in to Qwiklabs using an incognito window.
-
Note the lab's access time (for example,
1:15:00
), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning. -
When ready, click Start lab.
-
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
-
Click Open Google Console.
-
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges. -
Accept the terms and skip the recovery resource page.
Check project permissions
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
-
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
-
Confirm that the default compute Service Account
{project-number}-compute@developer.gserviceaccount.com
is present and has theeditor
role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
editor
role, follow the steps below to assign the required role.- In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
- Copy the project number (e.g.
729328892908
). - On the Navigation menu, select IAM & Admin > IAM.
- At the top of the roles table, below View by Principals, click Grant Access.
- For New principals, type:
- Replace
{project-number}
with your project number. - For Role, select Project (or Basic) > Editor.
- Click Save.
Setting up your IDE
For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud
command line tool, similar to Cloud Shell.
- To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
The lab repo has been cloned to your environment. Each lab is divided into a labs
folder with code to be completed by you, and a solution
folder with a fully workable example to reference if you get stuck.
- Click on the
File Explorer
button to look:
You can also create multiple terminals in this environment, just as you would with cloud shell:
You can see with by running gcloud auth list
on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:
If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:
The lab code is split between two folders: 8a_Batch_Testing_Pipeline/lab and 8b_Stream_Testing_Pipeline/lab. If you get stuck at any point, the solution can be found in the corresponding solution folders.
Lab part 1. Performing unit tests for DoFns and PTransforms
In this part of the lab, we will perform unit testing on DoFns and PTransforms for a batch pipeline computing statistics from weather sensors. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
- Create a
TestPipeline
. - Create some test input data and use the
Create
transform to create aPCollection
of your input data. - Apply your transform to the input
PCollection
and save the resultingPCollection
. - Use
PAssert
and its subclasses to verify that the outputPCollection
contains the elements that you expect.
TestPipeline
is a special class included in the Beam SDK specifically for testing your transforms and pipeline logic.
- When testing, use
TestPipeline
instead ofPipeline
when you create the pipeline object:
The Create
transfrom takes an in-memory collection of objects (a Java iterable) and creates a PCollection
from this collection. The goal is to have a small set of test input data, for which we know the expected output PCollection
, from our PTransforms
.
Finally, we want to check that the output PCollection matches the expected output. We use the PAssert
class to verify this. For example, we can use the containsInAnyOrder
method to verify that the output PCollection has the correct elements:
Task 1. Explore the main pipeline code
- Navigate to 8a_Batch_Testing_Pipeline/lab in your IDE.
This directory contains a pom.xml
file for defining dependecies, and the src
folder, which contains two subdirectories. The src/main
folder contains the pipeline package code and the src/test
folder will contain our testing code.
- First open 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.
This file contains the definition of the WeatherRecord
class we will use in our pipeline. The WeatherRecord
class has an associated schema, and the steps for defining the schema using the @DefaultSchema
annotation should be familiar. However, note that we must override the equals
method in defining the class.
Why is this? PAssert
will use the equals
method to verify membership in the output PCollection
. However, the default equals method for a POJO (Plain Old Java Object) only compares the addresses of the objects. We want to be sure that we are instead comparing the contents of the objects. This is simple to do as shown above.
- Now open 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.
This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
- The
DoFns
ConvertCsvToWeatherRecord
(starting on line 65) andConvertTempUnits
(starting on line 81). We will be performing unit testing on theseDoFns
later. - The
PTransform
ComputeStatistics
(starting on line 103). This is an example of a composite transform that we will be able to test in the same manner as aDoFn
. - The
PTransform
WeatherStatsTransform
(starting on line 123. ThisPTransform
contains the processing logic for our entire pipeline (minus the source and sink transforms) so that we can perform a small pipeline integration test on synthetic data created by aCreate
transform.
If you notice a logical error in the processing code, do not fix it yet! Later we will see how to narrow down the error via testing.
Task 2. Add dependencies for testing
- Now open 8a_Batch_Testing_Pipeline/lab/pom.xml.
We need to add a few dependecies for testing. Any Beam Java code for testing must link in JUnit
and Hamcrest
. In Maven, we simply need to update the pom.xml
file.
- To complete this task, copy and paste the following XML into the
pom.xml
file where noted in a comment:
Note that the scope for these dependencies is "test". We will need these packages when we run a test with mvn test
, but not when executing the main pipeline.
Task 3. Write first DoFn unit test in Apache Beam
- Now navigate to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.
This file contains the code for our DoFn
and PTransform
unit tests. The code is mostly commented out now, but we will be uncommenting the code as we go along.
We will start by exploring a DoFn
unit test for our ConvertCsvToWeatherRecord DoFn
(starting on line 43).
- First we create a class for testing our pipeline and create a
TestPipeline
object:
We will use this TestPipeline
object in all of the following tests, though we will not need to worry about side effects of reusing the same object due to the transient
keyword when creating the object.
- Now look at the (incomplete) code for our first test:
We annotate the method we will use to test our pipeline with the @Test
annotation. We create a single test input (testInput
) representing a line of a CSV file (the expected input format for our pipeline) and put it into a List
object input
.
There are a couple of pieces missing in the rest of the code for the test.
-
To complete this task, first add the
Create
transform to convertinput
into aPCollection
. -
Second, include a
PAssert
statement using thecontainsInAnyOrder
method to compareinput
withtestOutput
.
If you get stuck, you can refer to later commented tests or the solutions.
Task 4. Run first DoFn unit test
- Create a new terminal in your IDE environment, if you have not already, and paste the following command:
Now we're ready to run our test.
- To do so, simply run the following command in your terminal:
If you correctly completed the previous task, you should see the following in your terminal after the test completes (the exact time elapsed will differ):
Task 5. Run second DoFn unit test and debug pipeline
- Go back to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java and uncomment the code for the second unit test (around lines 67-80). You can do this by highlighting the code and pressing Ctrl + / (or Cmd + / on MacOS). The code is shown below for reference:
This test ensures that the ConvertTempUnits()
DoFn
is working as intended.
-
Save
WeatherStatisticsPipelineTest.java
and return to your terminal. -
Once again, execute the follow command to run the tests:
The test fails this time! If we scroll through the output, we can find the following information about the test failure:
At first glance, this may not seem like the most useful error message. However, we can see that the expected WeatherRecord
in testOutput
was not matched. Maybe there is something wrong with how we performed the temperature conversion.
-
Return to 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java and scroll down to the definition of
ConvertTempUnits
(around line 81). -
To complete this task, find the error in the
DoFn
processing logic and rerun themvn test
command to confirm that the test now succeeds. As a reminder, the formula for converting degrees Celsius to Farenheit is given below:
If you get stuck, you can refer to the solutions.
Task 6. Run PTransform unit test and test end-to-end pipeline
- Return to 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java and uncomment the code for the final two tests (starting around line 84).
The first test we just uncommented is testing the composit PTransform
ComputeStatistics
. A truncated form of the code is presented below for reference:
Notice that this looks very similar to our earlier DoFn
unit tests! The only real difference (other than different test inputs and outputs) is that we are applying the PTransform
rather than ParDo(new DoFn())
.
The final test is for the end-to-end pipeline. In the pipeline code (WeatherStatisticsPipeline.java) the entire end-to-end pipeline minus the source and sink has been included in a single PTransform
WeatherStatsTransform
.
- To test the end-to-end pipeline, we can repeat something similar to what we did above, but using that
PTransform
instead:
- Now return to your terminal and execute the follow command to run the tests one more time:
If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
Click Check my progress to verify the objective.
Lab part 2. Testing stream processing logic with TestStream
In this part of the lab, we will perform unit testing for a streaming pipeline computing windowed counts of taxi rides. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
- Create a
TestPipeline
. - Use the
TestStream
class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time. - Use
PAssert
and its subclasses to verify that the outputPCollection
contains the elements that you expect in specific windows.
When executing a pipeline that reads from a TestStream
, the read waits for all of the consequences of each event to complete before moving on to the next event, including when processing time advances and the appropriate triggers fire. TestStream
allows for the effect of triggering and allowed lateness to be observed and tested on a pipeline. This includes logic around late triggers and dropped data due to lateness.
Task 1. Explore the main pipeline code
- Navigate to 8b_Stream_Testing_Pipeline/lab in your IDE.
This directory contains a pom.xml
file for defining dependecies, and the src
folder, which contains two subdirectories. The src/main
folder contains the pipeline package code and the src/test
folder will contain our testing code.
- First, open 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.
This file contains the definition of the TaxiRide
class we will use in our pipeline. The TaxiRide
class has an associated schema and the steps for defining the schema using the @DefaultSchema
annotation should be familiar.
- Now open 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.
This is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
- The
DoFn
JsonToTaxiRide
(starting on line 94) used to convert the incoming Pub/Sub messages into objects of theTaxiRide
class. - The
PTransform
TaxiCountTransform
(starting on line 113). ThisPTransform
contains the main counting and windowing logic of the pipeline. Our tests will focus on thisPTransform
.
The output of the TaxiCountTransform
should be a count of all of the recorded taxi rides per window. However, we will have multiple events per ride (pickup, dropoff, etc.).
- We will filter on the
ride_status
property to ensure that we only count each ride once. We will do this by only keeping elements withride_status
equal to "pickup":
Zooming in a bit more, the windowing logic being used in our pipeline is included below:
We will window into fixed windows of 60 seconds' length. We do not have an early trigger, but will emit results after the watermark passes the end of the window. We include late firings with every new element that comes in, but will only do so with an allowed lateness of 1 minute. Finally, we will accumulate state in windows until the allowed lateness has passed.
Task 2. Explore TestStream usage and run first test
- Now open 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.
The first goal will be to understand the usage of TestStream
in our testing code. Recall that the TestStream
class allows us to simulate a real-time stream of message while having control over the progression of processing time and the watermark.
The code from the first test (starting on line 66) is included below:
We create a new TestStream
using the create
method while also specifying the coder. We will pass in the JSON message as a string, so we can use the StringUtf8Coder
. What is the above TestStream
doing?
The TestStream
is performing the following tasks:
- Set the initial watermark to the variable
startTime
(Instant(0)
). - Add three elements to the string with an event timestamp of
startTime
. Two of these events will be counted (ride_status = "pickup"
) and the other will not. - Add another "pickup" event, but with an event timestamp of one minute after
startTime
. - Advance the watermark to one minute past
startTime
, triggering the first window. - Add another "pickup" event, but with an event timestamp of two minutes after
startTime
. - Advance the watermark to "infinity". This means that all windows will now be closed and any new data will be beyond any allowed lateness.
- The rest of the code for the first test is similar to the previous batch example, but we now use the
TestStream
instead of theCreate
transform:
In the above code, we define out output PCollection
(outputCount
) by creating the TestStream
and applying the TaxiCountTransform
PTransform
. We use the InvervalWindow
class to define the windows we wish to check, and then use PAssert
with the inWindow
method to verify results per window.
- Now return to the termnial in your IDE (or open a new terminal) and run the following commands to relocate to the correct directory and install dependencies:
- Now run the test above by executing the following command:
You should see the following output after the test (though the elapsed time may differ):
Task 3. Create TestStream to test late data handling
In this task you will write code for a TestStream
to test logic around handling late data.
-
Return to 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java and scroll down to where the
testTaxiRideLateData
method is commented out (around line 104). -
Uncomment the code for this test, as we will be completing the code for this task:
The code for the test is complete outside of the creating of the TestStream
.
- To complete this task, create a
TestStream
object that performs the following tasks:
- Advances the watermark to
startTime
. - Adds two
TimestampedValue
s with valuejson.format(json, "pickup")
and timestamestartTime
. - Advances the watermark to one minute after the
startTime
. - Adds another
TimestamedValue
with valuejson.format(json, "pickup")
and timestamestartTime
. - Advances the watermark to two minutes after the
startTime
. - Adds another
TimestamedValue
with valuejson.format(json, "pickup")
and timestamestartTime
. - Advances the watermark to infinity.
This will create a TestStream
with four elements belonging to our first window. The first two elements are on-time, the second element is late (but within the allowed lateness), and the final element is late and beyond the allowed lateness. Since we are accumulating fired panes, the first trigger should count two events and the final trigger should count three events. The fourth event should not be included. We can check this by using the inOnTimePane
and inFinalPane
methods of the PAssert
class.
If you get stuck, you can refer to the solutions.
Task 4. Run test for late data handling
- Now return to your terminal and execute the following command to run the tests one more time:
If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
Click Check my progress to verify the objective.
End your lab
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
- 1 star = Very dissatisfied
- 2 stars = Dissatisfied
- 3 stars = Neutral
- 4 stars = Satisfied
- 5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.