
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Create Vertex AI Platform Notebooks instance and clone course repo
/ 10
Prepare Environment
/ 10
Performing Unit Tests for DoFns and PTransforms
/ 10
Testing Stream Processing Logic with TestStream
/ 10
In this lab, you:
DoFns
and PTransforms
using testing tools in Apache Beam.TestStream
class to test windowing behavior for a streaming pipeline.Testing your pipeline is a particularly important step in developing an effective data-processing solution. The indirect nature of the Beam model can make debugging failed runs a non-trivial task.
In this lab, we will explore how to perform unit tests locally with tools in the testing package of the Beam SDK, using the DirectRunner
.
This Qwiklabs hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.
To complete this lab, you need:
Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.
Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Choose an account page.
On the Choose an account page, click Use Another Account. The Sign in page opens.
Paste the username that you copied from the Connection Details panel. Then copy and paste the password.
After a few moments, the Cloud console opens in this tab.
For this lab, you will be running all commands in a terminal from your notebook.
In the Google Cloud Console, on the Navigation Menu, click Vertex AI > Workbench.
Click Enable Notebooks API.
On the Workbench page, select USER-MANAGED NOTEBOOKS and click CREATE NEW.
In the New instance dialog box that appears, set the region to
For Environment, select Apache Beam.
Click CREATE at the bottom of the dialog vox.
Next you will download a code repository for use in this lab.
On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.
Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/
. You will see a folder for each lab, which is further divided into a lab
sub-folder with code to be completed by you, and a solution
sub-folder with a fully workable example to reference if you get stuck.
The lab code is split between two folders: 8a_Batch_Testing_Pipeline/lab and 8b_Stream_Testing_Pipeline/lab. If you get stuck at any point, the solution can be found in the corresponding solution folders.
Click Check my progress to verify the objective.
In this part of the lab, we will perform unit testing on DoFns and PTransforms for a batch pipeline computing statistics from weather sensors. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
TestPipeline
.Create
transform to create a PCollection
of your input data.PCollection
and save the resulting PCollection
.assert_that
method from the testing.util
module and its other methods to verify that the output PCollection
contains the elements that you expect.TestPipeline
is a special class included in the Beam SDK specifically for testing your transforms and pipeline logic. When testing, use TestPipeline
instead of Pipeline
when you create the pipeline object. The Create
transfrom takes an in-memory collection of objects (a Java iterable) and creates a PCollection
from this collection. The goal is to have a small set of test input data, for which we know the expected output PCollection
, from our PTransforms
.
Finally, we want to check that the output PCollection matches the expected output. We use the assert_that
method to verify this. For example, we can use the equal_to
method to verify that the output PCollection has the correct elements:
As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:
Before you can begin editing the actual pipeline code, you need to ensure you have installed the necessary dependencies.
Click Check my progress to verify the objective.
In the file explorer, navigate to 8a_Batch_Testing_Pipeline/lab. This directory contains two files: weather_statistics_pipeline.py
containing our main pipeline code, and weather_statistics_pipeline_test.py
containing our testing code.
First, open 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py.
We will explore our pipeline code briefly before working on our unit tests. First we see the WeatherRecord
class (starting at line 6). This is a subclass of typing.NamedTuple
, so we can use schema-aware transforms to work with objects of this class. We will also be defining in-memory collections of objects of this class for our tests later.
DoFns
and PTransforms
(line 17).The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
DoFns
ConvertCsvToWeatherRecord
(starting on line 17) and ConvertTempUnits
(starting on line 27). We will be performing unit testing on these DoFns
later.PTransform
ComputeStatistics
(starting on line 41). This is an example of a composite transform that we will be able to test in the same manner as a DoFn
.PTransform
WeatherStatsTransform
(starting on line 55). This PTransform
contains the processing logic for our entire pipeline (minus the source and sink transforms) so that we can perform a small pipeline integration test on synthetic data created by a Create
transform.We need to add a few dependecies for testing. We will be taking advantage of testing utilities included in Apache Beam and the unittest
package in Python.
If you get stuck, you can refer to the solutions.
Notice that the 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py file contains the code for our DoFn
and PTransform
unit tests. The code is mostly commented out now, but we will be uncommenting the code as we go along.
Before exploring our Beam code, note that we have defined a custom main
method to manage running our tests and writing the test output to a text file. This way we have a record of the tests to refer to after the current terminal session has ended. This could also be managed using the logging
module, for example.
DoFn
unit test for our ConvertCsvToWeatherRecord DoFn
(starting on line 43). First we create a class for testing our pipeline and create a TestPipeline
object:We create a single test input (LINES
) representing a line of a CSV file (the expected input format for our pipeline) and put it into a list. We also define the expected output (EXPECTED_OUTPUT
) as a list of WeatherRecords
.
There are a couple of pieces missing in the rest of the code for the test.
To complete this task, first add the Create
transform to convert LINES
into a PCollection
.
Second, include an assert_that
statement using the equal_to
method to compare output
with EXPECTED_OUTPUT
.
If you get stuck, you can refer to later commented tests or the solutions.
The output from the tests will be written to the testing.out
file. We can view the contents of this file by executing cat testing.out
in our terminal. If the previous task was completed correctly, the contents of the testing.out
file should be the following (the time elapsed may differ):
This test ensures that the ConvertTempUnits()
DoFn
is working as intended. Save weather_statistics_pipeline_test.py and return to your terminal.
The test fails this time! If we scroll through the output, we can find the following information about the test failure:
Looking a little more closely at the BeamAssertException
, we can see that the values for low_temp
and high_temp
are incorrect. Something is wrong in the processing logic of the ConvertTempUnits
DoFn
!
ConvertTempUnits
(around line 32). To complete this task, find the error in the DoFn
processing logic and rerun the following commands to confirm that the test now succeeds:As a reminder, the formula for converting degrees Celsius to degrees Farenheit is given below:
If you get stuck, you can refer to the solutions.
The first test we just uncommented is testing the composite PTransform
ComputeStatistics
. A truncated form of the code is presented below for reference:
Notice that this looks very similar to our earlier DoFn
unit tests! The only real difference (other than different test inputs and outputs) is that we are applying the PTransform
rather than beam.ParDo(DoFn())
.
The final test is for the end-to-end pipeline. In the pipeline code (weather_statistics_pipeline.py) the entire end-to-end pipeline minus the source and sink has been included in a single PTransform
WeatherStatsTransform
. To test the end-to-end pipeline, we can repeat something similar to what we did above, but using that PTransform
instead.
If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
testing.out
file to the storage bucket:Click Check my progress to verify the objective.
In this part of the lab, we will perform unit testing for a streaming pipeline computing windowed counts of taxi rides. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:
TestPipeline
.TestStream
class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time.assert_that
method from the testing.util
module and its other methods to verify that the output PCollection
contains the elements that you expect.When executing a pipeline that reads from a TestStream
, the read waits for all of the consequences of each event to complete before moving on to the next event, including when processing time advances and the appropriate triggers fire. TestStream
allows for the effect of triggering and allowed lateness to be observed and tested on a pipeline. This includes logic around late triggers and dropped data due to lateness.
This directory contains two files: taxi_streaming_pipeline.py
containing our main pipeline code, and taxi_streaming_pipeline_test.py
containing our testing code.
We will explore our pipeline code briefly before working on our unit tests. First we see the TaxiRide
class (starting at line 6). This is a subclass of typing.NamedTuple
, so we can use schema-aware transforms to work with objects of this class. We will also be defining in-memory collections of objects of this class for our tests later.
Next is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:
DoFn
JsonToTaxiRide
(starting on line 22) used to convert the incoming Pub/Sub messages into objects of the TaxiRide
class.PTransform
TaxiCountTransform
(starting on line 36). This PTransform
contains the main counting and windowing logic of the pipeline. Our tests will focus on this PTransform
.The output of the TaxiCountTransform
should be a count of all of the recorded taxi rides per window. However, we will have multiple events per ride (pickup, dropoff, etc.). We will filter on the ride_status
property to ensure that we only count each ride once. We will do this by only keeping elements with ride_status
equal to "pickup":
Zooming in a bit more, the windowing logic being used in our pipeline is included below:
We will window into fixed windows of 60 seconds' length. We do not have an early trigger, but will emit results after the watermark passes the end of the window. We include late firings with every new element that comes in, but will only do so with an allowed lateness of 60 seconds. Finally, we will accumulate state in windows until the allowed lateness has passed.
The first goal will be to understand the usage of TestStream
in our testing code. Recall that the TestStream
class allows us to simulate a real-time stream of message while having control over the progression of processing time and the watermark. The code from the first test (starting on line 66) is included below:
TestStream
object, and then we will pass in the JSON message as a string (named base_json_pickup
or base_json_enroute
, depending on the ride status). What is the above TestStream
doing?The TestStream
is performing the following tasks:
0
(all timestamps are in seconds).add_elements
method with an event timestamp of 0
. Two of these events will be counted (ride_status = "pickup"
) and the other will not.60
.60
, triggering the first window.120
.TestStream
instead of the Create
transform:In the above code, we define our output PCollection
(taxi_counts
) by creating the TestStream
and applying the TaxiCountTransform
PTransform
. We use the InvervalWindow
class to define the windows we wish to check, and then use assert_that
with the equal_to_per_window
method to verify results per window.
You should see the following output after the test (though the elapsed time may differ):
In this task you will write code for a TestStream
to test logic around handling late data.
test_late_data_behavior
method is commented out (around line 60). Uncomment the code for this test, as we will be completing the code for this task.Note that EXPECTED_RESULTS
contains two results for IntervalWindow(0,60)
. These represent the results from on-time and late triggers for this window.
The code for the test is complete outside of the creating of the TestStream
.
To complete this task, create a TestStream
object that performs the following tasks:
0
(all timestamps are in seconds).TimestampedValues
with value base_json_pickup
and timestamp 0
.60
.TimestampedValue
with value base_json_pickup
and timestamp 0
.300
.TimestampedValue
with value base_json_pickup
and timestamp 0
.This will create a TestStream
with four elements belonging to our first window. The first two elements are on-time, the second element is late (but within the allowed lateness), and the final element is late and beyond the allowed lateness. Since we are accumulating fired panes, the first trigger should count two events and the final trigger should count three events. The fourth event should not be included.
If you get stuck, you can refer to the solutions.
If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:
testing.out
file to the storage bucket:Click Check my progress to verify the objective.
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
This content is not currently available
We will notify you via email when it becomes available
Great!
We will contact you via email if it becomes available
One lab at a time
Confirm to end all existing labs and start this one