arrow_back

Serverless Data Processing with Dataflow - Testing with Apache Beam (Python)

Sign in Join
Get access to 700+ labs and courses

Serverless Data Processing with Dataflow - Testing with Apache Beam (Python)

Lab 2 hours universal_currency_alt 5 Credits show_chart Advanced
info This lab may incorporate AI tools to support your learning.
Get access to 700+ labs and courses

Overview

In this lab, you:

  • Write unit tests for DoFns and PTransforms using testing tools in Apache Beam.
  • Perform a pipeline integration test.
  • Use the TestStream class to test windowing behavior for a streaming pipeline.

Testing your pipeline is a particularly important step in developing an effective data-processing solution. The indirect nature of the Beam model can make debugging failed runs a non-trivial task.

In this lab, we will explore how to perform unit tests locally with tools in the testing package of the Beam SDK, using the DirectRunner.

Setup and requirements

Before you click the Start Lab button

Note: Read these instructions.

Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This Qwiklabs hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

What you need

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
  • Time to complete the lab.
Note: If you already have your own personal Google Cloud account or project, do not use it for this lab. Note: If you are using a Pixelbook, open an Incognito window to run this lab.

How to start your lab and sign in to the Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.

  2. Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Choose an account page.

    Note: Open the tabs in separate windows, side-by-side.
  3. On the Choose an account page, click Use Another Account. The Sign in page opens.

  4. Paste the username that you copied from the Connection Details panel. Then copy and paste the password.

Note: You must use the credentials from the Connection Details panel. Do not use your Google Cloud Skills Boost credentials. If you have your own Google Cloud account, do not use it for this lab (avoids incurring charges).
  1. Click through the subsequent pages:
  • Accept the terms and conditions.
  • Do not add recovery options or two-factor authentication (because this is a temporary account).
  • Do not sign up for free trials.

After a few moments, the Cloud console opens in this tab.

Note: You can view the menu with a list of Google Cloud Products and Services by clicking the Navigation menu at the top-left.

Jupyter notebook-based development environment setup

For this lab, you will be running all commands in a terminal from your notebook.

  1. In the Google Cloud Console, on the Navigation Menu, click Vertex AI > Workbench.

  2. Click Enable Notebooks API.

  3. On the Workbench page, select USER-MANAGED NOTEBOOKS and click CREATE NEW.

  4. In the New instance dialog box that appears, set the region to and zone to .

  5. For Environment, select Apache Beam.

  6. Click CREATE at the bottom of the dialog vox.

Note: The environment may take 3 - 5 minutes to be fully provisioned. Please wait until the step is complete. Note: Click Enable Notebook API to enable the notebook api.
  1. Once the environment is ready, click the OPEN JUPYTERLAB link next to your Notebook name. This will open up your environment in a new tab in your browser.

  1. Next, click Terminal. This will open up a terminal where you can run all the commands in this lab.

Download Code Repository

Next you will download a code repository for use in this lab.

  1. In the terminal you just opened, enter the following:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.

  2. Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/. You will see a folder for each lab, which is further divided into a lab sub-folder with code to be completed by you, and a solution sub-folder with a fully workable example to reference if you get stuck.

Note: To open a file for editing purposes, simply navigate to the file and click on it. This will open the file, where you can add or modify code.

The lab code is split between two folders: 8a_Batch_Testing_Pipeline/lab and 8b_Stream_Testing_Pipeline/lab. If you get stuck at any point, the solution can be found in the corresponding solution folders.

Click Check my progress to verify the objective. Create notebook instance and clone course repo

Lab part 1: Performing unit tests for DoFns and PTransforms

Task 1. Prepare environment

In this part of the lab, we will perform unit testing on DoFns and PTransforms for a batch pipeline computing statistics from weather sensors. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:

  • Create a TestPipeline.
  • Create some test input data and use the Create transform to create a PCollection of your input data.
  • Apply your transform to the input PCollection and save the resulting PCollection.
  • Use the assert_that method from the testing.util module and its other methods to verify that the output PCollection contains the elements that you expect.

TestPipeline is a special class included in the Beam SDK specifically for testing your transforms and pipeline logic. When testing, use TestPipeline instead of Pipeline when you create the pipeline object. The Create transfrom takes an in-memory collection of objects (a Java iterable) and creates a PCollection from this collection. The goal is to have a small set of test input data, for which we know the expected output PCollection, from our PTransforms.

with TestPipeline() as p: INPUTS = [fake_input_1, fake_input_2] test_output = p | beam.Create(INPUTS) | # Transforms to be tested

Finally, we want to check that the output PCollection matches the expected output. We use the assert_that method to verify this. For example, we can use the equal_to method to verify that the output PCollection has the correct elements:

assert_that(test_output, equal_to(EXPECTED_OUTPUTS))

As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:

Open the appropriate lab

  • In the terminal in your IDE, run the following commands to change to the directory you will use for this lab:
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab export BASE_DIR=$(pwd)

Setting up virtual environment and dependencies

Before you can begin editing the actual pipeline code, you need to ensure you have installed the necessary dependencies.

  1. Execute the following to create a virtual environment for your work in this lab:
sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Next, install the packages you will need to execute your pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Ensure that the Dataflow API is enabled:
gcloud services enable dataflow.googleapis.com
  1. Finally, create a storage bucket:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage buckets create gs://$PROJECT_ID --location=US

Click Check my progress to verify the objective. Prepare environment

Task 2. Explore the main pipeline code

  1. In the file explorer, navigate to 8a_Batch_Testing_Pipeline/lab. This directory contains two files: weather_statistics_pipeline.py containing our main pipeline code, and weather_statistics_pipeline_test.py containing our testing code.

  2. First, open 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py.

We will explore our pipeline code briefly before working on our unit tests. First we see the WeatherRecord class (starting at line 6). This is a subclass of typing.NamedTuple, so we can use schema-aware transforms to work with objects of this class. We will also be defining in-memory collections of objects of this class for our tests later.

class WeatherRecord(typing.NamedTuple): loc_id: str lat: float lng: float date: str low_temp: float high_temp: float precip: float
  1. Now scroll down to where we start defining our DoFns and PTransforms (line 17).

The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:

  • The DoFns ConvertCsvToWeatherRecord (starting on line 17) and ConvertTempUnits (starting on line 27). We will be performing unit testing on these DoFns later.
  • The PTransform ComputeStatistics (starting on line 41). This is an example of a composite transform that we will be able to test in the same manner as a DoFn.
  • The PTransform WeatherStatsTransform (starting on line 55). This PTransform contains the processing logic for our entire pipeline (minus the source and sink transforms) so that we can perform a small pipeline integration test on synthetic data created by a Create transform.
Note: If you notice a logical error in the processing code, do not fix it yet! Later we will see how to narrow down the error via testing.

Task 3. Add dependencies for testing

  1. Now open the 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py in your file explorer.

We need to add a few dependecies for testing. We will be taking advantage of testing utilities included in Apache Beam and the unittest package in Python.

  1. To complete this task, add the following import statements where prompted at the top of weather_statistics_pipeline_test.py:
from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import BeamAssertException from apache_beam.testing.util import assert_that, equal_to

If you get stuck, you can refer to the solutions.

Task 4. Write first DoFn unit test in Apache Beam

Notice that the 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py file contains the code for our DoFn and PTransform unit tests. The code is mostly commented out now, but we will be uncommenting the code as we go along.

Before exploring our Beam code, note that we have defined a custom main method to manage running our tests and writing the test output to a text file. This way we have a record of the tests to refer to after the current terminal session has ended. This could also be managed using the logging module, for example.

def main(out = sys.stderr, verbosity = 2): loader = unittest.TestLoader() suite = loader.loadTestsFromModule(sys.modules[__name__]) unittest.TextTestRunner(out, verbosity = verbosity).run(suite) # Testing code omitted if __name__ == '__main__': with open('testing.out', 'w') as f: main(f)
  1. We will start by exploring a DoFn unit test for our ConvertCsvToWeatherRecord DoFn (starting on line 43). First we create a class for testing our pipeline and create a TestPipeline object:
class ConvertToWeatherRecordTest(unittest.TestCase): def test_convert_to_csv(self): with TestPipeline() as p: ...
  1. Now look at the (incomplete) code for our first test:
LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1'] EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)] input_lines = p | # TASK 4: Create PCollection from LINES output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord()) # TASK 4: Write assert_that statement

We create a single test input (LINES) representing a line of a CSV file (the expected input format for our pipeline) and put it into a list. We also define the expected output (EXPECTED_OUTPUT) as a list of WeatherRecords.

There are a couple of pieces missing in the rest of the code for the test.

  1. To complete this task, first add the Create transform to convert LINES into a PCollection.

  2. Second, include an assert_that statement using the equal_to method to compare output with EXPECTED_OUTPUT.

If you get stuck, you can refer to later commented tests or the solutions.

Task 5. Run first DoFn unit test

  • Return to your terminal and run the following command:
python3 weather_statistics_pipeline_test.py cat testing.out

The output from the tests will be written to the testing.out file. We can view the contents of this file by executing cat testing.out in our terminal. If the previous task was completed correctly, the contents of the testing.out file should be the following (the time elapsed may differ):

test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 0.918s OK Note: The test above could have been run using the command "python3 -m unittest test_script.py" instead. We ran the python script directly to access the main method, but the approach mentioned here tends to be more common in practice.

Task 6. Run second DoFn unit test and debug pipeline

  1. Go back to 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py and uncomment the code for the second unit test (around lines 33-50). You can do this by highlighting the code and pressing Ctrl + / (or Cmd + / on MacOS). The code is shown below for reference:
class ConvertTempUnitsTest(unittest.TestCase): def test_convert_temp_units(self): with TestPipeline() as p: RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)] EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1), WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)] input_records = p | beam.Create(RECORDS) output = input_records | beam.ParDo(ConvertTempUnits()) assert_that(output, equal_to(EXPECTED_RECORDS))

This test ensures that the ConvertTempUnits() DoFn is working as intended. Save weather_statistics_pipeline_test.py and return to your terminal.

  1. Execute the follow commands to run the tests and view the output:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

The test fails this time! If we scroll through the output, we can find the following information about the test failure:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ERROR ... apache_beam.testing.util.BeamAssertException: Failed assert: [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] == [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], unexpected elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=32.8, high_temp=34.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=25.6, high_temp=29.2, precip=0.3)], missing elements [WeatherRecord(loc_id='x', lat=0.0, lng=0.0, date='2/2/2021', low_temp=33.8, high_temp=35.6, precip=0.1), WeatherRecord(loc_id='y', lat=0.0, lng=0.0, date='2/2/2021', low_temp=26.6, high_temp=30.2, precip=0.3)] [while running 'assert_that/Match']

Looking a little more closely at the BeamAssertException, we can see that the values for low_temp and high_temp are incorrect. Something is wrong in the processing logic of the ConvertTempUnits DoFn!

  1. Return to 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline.py and scroll down to the definition of ConvertTempUnits (around line 32). To complete this task, find the error in the DoFn processing logic and rerun the following commands to confirm that the test now succeeds:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

As a reminder, the formula for converting degrees Celsius to degrees Farenheit is given below:

temp_f = temp_c * 1.8 + 32.0

If you get stuck, you can refer to the solutions.

Task 7. Run PTransform unit test and test end-to-end pipeline

  1. Return to 8a_Batch_Testing_Pipeline/lab/weather_statistics_pipeline_test.py and uncomment the code for the final two tests (starting around line 53).

The first test we just uncommented is testing the composite PTransform ComputeStatistics. A truncated form of the code is presented below for reference:

def test_compute_statistics(self): with TestPipeline() as p: INPUT_RECORDS = # Test input omitted here EXPECTED_STATS = # Expected output omitted here inputs = p | beam.Create(INPUT_RECORDS) output = inputs | ComputeStatistics() assert_that(output, equal_to(EXPECTED_STATS))

Notice that this looks very similar to our earlier DoFn unit tests! The only real difference (other than different test inputs and outputs) is that we are applying the PTransform rather than beam.ParDo(DoFn()).

The final test is for the end-to-end pipeline. In the pipeline code (weather_statistics_pipeline.py) the entire end-to-end pipeline minus the source and sink has been included in a single PTransform WeatherStatsTransform. To test the end-to-end pipeline, we can repeat something similar to what we did above, but using that PTransform instead.

  1. Now return to your terminal and execute the follow command to run the tests one more time:
rm testing.out python3 weather_statistics_pipeline_test.py cat testing.out

If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:

test_compute_statistics (__main__.ComputeStatsTest) ... ok test_convert_temp_units (__main__.ConvertTempUnitsTest) ... ok test_convert_to_csv (__main__.ConvertToWeatherRecordTest) ... ok test_weather_stats_transform (__main__.WeatherStatsTransformTest) ... ok ---------------------------------------------------------------------- Ran 4 tests in 2.295s OK
  1. Now, copy the testing.out file to the storage bucket:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/

Click Check my progress to verify the objective. Performing unit tests for DoFns and PTransforms

Lab part 2: Testing stream processing logic with TestStream

In this part of the lab, we will perform unit testing for a streaming pipeline computing windowed counts of taxi rides. To test transforms that you have created, you can use the following pattern and transforms provided by Beam:

  • Create a TestPipeline.
  • Use the TestStream class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time.
  • Use the assert_that method from the testing.util module and its other methods to verify that the output PCollection contains the elements that you expect.

When executing a pipeline that reads from a TestStream, the read waits for all of the consequences of each event to complete before moving on to the next event, including when processing time advances and the appropriate triggers fire. TestStream allows for the effect of triggering and allowed lateness to be observed and tested on a pipeline. This includes logic around late triggers and dropped data due to lateness.

Task 1. Explore the main pipeline code

  1. In file explorer, navigate to 8b_Stream_Testing_Pipeline/lab.

This directory contains two files: taxi_streaming_pipeline.py containing our main pipeline code, and taxi_streaming_pipeline_test.py containing our testing code.

  1. First, open 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline.py.

We will explore our pipeline code briefly before working on our unit tests. First we see the TaxiRide class (starting at line 6). This is a subclass of typing.NamedTuple, so we can use schema-aware transforms to work with objects of this class. We will also be defining in-memory collections of objects of this class for our tests later.

class TaxiRide(typing.NamedTuple): ride_id: str point_idx: int latitude: float longitude: float timestamp: str meter_reading: float meter_increment: float ride_status: str passenger_count: int

Next is the main code for our pipeline. The concepts in this pipeline have mostly been covered in previous labs, but be sure to explore the following pieces more carefully:

  • The DoFn JsonToTaxiRide (starting on line 22) used to convert the incoming Pub/Sub messages into objects of the TaxiRide class.
  • The PTransform TaxiCountTransform (starting on line 36). This PTransform contains the main counting and windowing logic of the pipeline. Our tests will focus on this PTransform.

The output of the TaxiCountTransform should be a count of all of the recorded taxi rides per window. However, we will have multiple events per ride (pickup, dropoff, etc.). We will filter on the ride_status property to ensure that we only count each ride once. We will do this by only keeping elements with ride_status equal to "pickup":

... | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')

Zooming in a bit more, the windowing logic being used in our pipeline is included below:

... | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60), trigger=AfterWatermark(late=AfterCount(1)), allowed_lateness=60, accumulation_mode=AccumulationMode.ACCUMULATING)

We will window into fixed windows of 60 seconds' length. We do not have an early trigger, but will emit results after the watermark passes the end of the window. We include late firings with every new element that comes in, but will only do so with an allowed lateness of 60 seconds. Finally, we will accumulate state in windows until the allowed lateness has passed.

Task 2. Explore TestStream usage and run first test

  1. Now open 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py.

The first goal will be to understand the usage of TestStream in our testing code. Recall that the TestStream class allows us to simulate a real-time stream of message while having control over the progression of processing time and the watermark. The code from the first test (starting on line 66) is included below:

test_stream = TestStream().advance_watermark_to(0).add_elements([ TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_pickup, 0), TimestampedValue(base_json_enroute, 0), TimestampedValue(base_json_pickup, 60) ]).advance_watermark_to(60).advance_processing_time(60).add_elements([ TimestampedValue(base_json_pickup, 120) ]).advance_watermark_to_infinity()
  1. We create a new TestStream object, and then we will pass in the JSON message as a string (named base_json_pickup or base_json_enroute, depending on the ride status). What is the above TestStream doing?

The TestStream is performing the following tasks:

  • Set the initial watermark to time 0 (all timestamps are in seconds).
  • Add three elements to the stream via the add_elements method with an event timestamp of 0. Two of these events will be counted (ride_status = "pickup") and the other will not.
  • Add another "pickup" event, but with an event timestamp 60.
  • Advance the watermark and processing time to 60, triggering the first window.
  • Add another "pickup" event, but with an event timestamp 120.
  • Advance the watermark to "infinity". This means that all windows will now be closed and any new data will be beyond any allowed lateness.
  1. The rest of the code for the first test is similar to the previous batch example, but we now use the TestStream instead of the Create transform:
taxi_counts = (p | test_stream | TaxiCountTransform() ) EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3], IntervalWindow(60,120): [1], IntervalWindow(120,180): [1]} assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS), reify_windows=True)

In the above code, we define our output PCollection (taxi_counts) by creating the TestStream and applying the TaxiCountTransform PTransform. We use the InvervalWindow class to define the windows we wish to check, and then use assert_that with the equal_to_per_window method to verify results per window.

  1. Save the file, return to your terminal and run the following commands to relocate to the correct directory:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab export BASE_DIR=$(pwd)
  1. Now run the test above and view the output by executing the following commands:
python3 taxi_streaming_pipeline_test.py cat testing.out

You should see the following output after the test (though the elapsed time may differ):

test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 1 test in 1.113s OK

Task 3. Create TestStream to test late data handling

In this task you will write code for a TestStream to test logic around handling late data.

  1. Return to 8b_Stream_Testing_Pipeline/lab/taxi_streaming_pipeline_test.py and scroll down to where the test_late_data_behavior method is commented out (around line 60). Uncomment the code for this test, as we will be completing the code for this task.
class TaxiLateDataTest(unittest.TestCase): def test_late_data_behavior(self): options = PipelineOptions() options.view_as(StandardOptions).streaming = True with TestPipeline(options=options) as p: base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \ "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \ "\"ride_status\":\"pickup\",\"passenger_count\":1}" test_stream = # TASK 3: Create TestStream Object EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]} #On Time and Late Result taxi_counts = (p | test_stream | TaxiCountTransform() ) assert_that(taxi_counts, equal_to(EXPECTED_RESULTS))

Note that EXPECTED_RESULTS contains two results for IntervalWindow(0,60). These represent the results from on-time and late triggers for this window.

The code for the test is complete outside of the creating of the TestStream.

  1. To complete this task, create a TestStream object that performs the following tasks:

    1. Advances the watermark to 0 (all timestamps are in seconds).
    2. Adds two TimestampedValues with value base_json_pickup and timestamp 0.
    3. Advances the watermark and processing time to 60.
    4. Adds another TimestampedValue with value base_json_pickup and timestamp 0.
    5. Advances the watermark and processing time to 300.
    6. Adds another TimestampedValue with value base_json_pickup and timestamp 0.
    7. Advances the watermark to infinity.

This will create a TestStream with four elements belonging to our first window. The first two elements are on-time, the second element is late (but within the allowed lateness), and the final element is late and beyond the allowed lateness. Since we are accumulating fired panes, the first trigger should count two events and the final trigger should count three events. The fourth event should not be included.

If you get stuck, you can refer to the solutions.

Task 4. Run test for late data handling

  1. Now return to your terminal and execute the following command to run the tests one more time:
rm testing.out python3 taxi_streaming_pipeline_test.py cat testing.out

If you have completed the previous tasks successfully, you should see the following in the terminal after the tests complete:

test_late_data_behavior (__main__.TaxiLateDataTest) ... ok test_windowing_behavior (__main__.TaxiWindowingTest) ... ok ---------------------------------------------------------------------- Ran 2 tests in 2.225s OK
  1. Now, copy the testing.out file to the storage bucket:
export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp testing.out gs://$PROJECT_ID/8b_Stream_Testing_Pipeline/

Click Check my progress to verify the objective. Testing stream processing logic with TestStream

End your lab

When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

Before you begin

  1. Labs create a Google Cloud project and resources for a fixed time
  2. Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
  3. On the top left of your screen, click Start lab to begin

This content is not currently available

We will notify you via email when it becomes available

Great!

We will contact you via email if it becomes available

One lab at a time

Confirm to end all existing labs and start this one

Use private browsing to run the lab

Use an Incognito or private browser window to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.