
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
In this lab, you will learn how to:
Prerequisites:
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00
), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com
is present and has the editor
role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
editor
role, follow the steps below to assign the required role.729328892908
).{project-number}
with your project number.For this lab, you will be running all commands in a terminal from your notebook.
In the Google Cloud Console, on the Navigation Menu, click Vertex AI > Workbench.
Click Enable Notebooks API.
On the Workbench page, select USER-MANAGED NOTEBOOKS and click CREATE NEW.
In the New instance dialog box that appears, set the region to
For Environment, select Apache Beam.
Click CREATE at the bottom of the dialog vox.
Next you will download a code repository for use in this lab.
On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.
Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/
. You will see a folder for each lab, which is further divided into a lab
sub-folder with code to be completed by you, and a solution
sub-folder with a fully workable example to reference if you get stuck.
Click Check my progress to verify the objective.
About 5 minutes
Dataflow is a fully-managed Google Cloud service for running batch and streaming Apache Beam data processing pipelines.
Apache Beam is an open source, advanced, unified, and portable data processing programming model that allows end users to define both batch and streaming data-parallel processing pipelines using Java, Python, or Go. Apache Beam pipelines can be executed on your local development machine on small datasets, and at scale on Dataflow. However, because Apache Beam is open source, other runners exist — you can run Beam pipelines on Apache Flink and Apache Spark, among others.
In this section, you write an Apache Beam Extract-Transform-Load (ETL) pipeline from scratch.
For each lab in this quest, the input data is intended to resemble web server logs in Common Log format along with other data that a web server might contain. For this first lab, the data is treated as a batch source; in later labs, the data will be treated as a streaming source. Your task is to read the data, parse it, and then write it to BigQuery, a serverless data warehouse, for later data analysis.
Before you can begin editing the actual pipeline code, you need to ensure that you have installed the necessary dependencies.
1 hour
The script creates a file called events.json
containing lines resembling the following:
It then automatically copies this file to your Google Cloud Storage bucket at
events.json
.Click Check my progress to verify the objective.
If you get stuck in this or later sections, you can refer to the solution.
1_Basic_ETL/lab
and click my_pipeline.py. This will open the file in an editor panel. Make sure the following packages are imported:run()
method. This method currently contains a pipeline that doesn’t do anything; note how a Pipeline object is created using a PipelineOptions object and the final line of the method runs the pipeline:All data in Apache Beam pipelines reside in PCollections. To create your pipeline’s initial PCollection
, you will need to apply a root transform to your pipeline object. A root transform creates a PCollection
from either an external data source or some local data you specify.
There are two kinds of root transforms in the Beam SDKs: Read and Create. Read transforms read data from an external source, such as a text file or a database table. Create transforms create a PCollection
from an in-memory list
and are especially useful for testing.
The following example code shows how to apply a ReadFromText
root transform to read data from a text file. The transform is applied to a Pipeline
object, p
, and returns a pipeline dataset in the form of a PCollection[str]
(using notation coming from parameterized type hints). "ReadLines" is your name for the transform, which will be helpful later when working with larger pipelines.
Inside the run()
method, create a string constant called “input” and set its value to gs://<YOUR-PROJECT-ID>/events.json
. In a future lab, you will use command-line parameters to pass this information.
Create a PCollection
of strings of all the events in events.json
by calling the textio.ReadFromText
transform.
Add any appropriate import statements to the top of my_pipeline.py
.
To save your work, click on File and select Save in the top navigation menu.
$BASE_DIR
folder and execute the following commands. Be sure to set the PROJECT_ID
environment variable before running the pipeline:At the moment, your pipeline doesn’t actually do anything; it simply reads in data.
However, running it demonstrates a useful workflow, in which you verify the pipeline locally and cheaply using DirectRunner running on your local machine before doing more expensive computations. To run the pipeline using Google Dataflow, you may change runner
to DataflowRunner.
If you get stuck, refer to the solution.
Transforms are what change your data. In Apache Beam, transforms are done by the PTransform class. At runtime, these operations will be performed on a number of independent workers.
The input and output to every PTransform
is a PCollection
. In fact, though you may not have realized it, you have already used a PTransform
when you read in data from Google Cloud Storage. Whether or not you assigned it to a variable, this created a PCollection
of strings.
Because Beam uses a generic apply method for PCollection
s, represented by the pipe operator |
in Python, you can chain transforms sequentially. For example, you can chain transforms to create a sequential pipeline, like this one:
For this task, you will use a new sort of transform, a ParDo. ParDo
is a Beam transform for generic parallel processing.
The ParDo
processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm: a ParDo
transform considers each element in the input PCollection
, performs some processing function (your user code) on that element, and emits zero, one, or multiple elements to an output PCollection
.
ParDo
is useful for a variety of common data processing operations, however there are special PTransform
s in Python to make the process simpler, including:
Filter
to consider each element in a PCollection
and either output that element to a new PCollection
, or discard it depending on the output of a Python callable which returns a boolean value.PCollection
contains elements that are of a different type or format than you want, you can use Map
to perform a conversion on each element and output the result to a new PCollection
.PCollection
of records with multiple fields, for example, you can also use Map
or FlatMap
to parse out just the fields you want to consider into a new PCollection
.ParDo
, Map
, or FlatMap
to perform simple or complex computations on every element, or certain elements, of a PCollection
and output the results as a new PCollection
.To complete this task, you need to write a Map
transform that reads in a JSON string representing a single event, parses it using the Python json
package, and outputs the dictionary returned by json.loads
.
Map
functions can be implemented in two ways, either inline or via a predefined callable. You write inline Map
functions like this:
Alternatively, beam.Map
can be used with a Python callable defined earlier in the script:
If you need more flexibility, than beam.Map
(and other lightweight DoFn
s) offers, then you can implement ParDo
with custom DoFn
s that subclass DoFn. This allows them to be more easily integrated with testing frameworks.
Remember, if you get stuck, refer to the solution.
At this point, your pipeline reads a file from Google Cloud Storage, parses each line, and emits a Python dictionary for each element. The next step is to write these objects into a BigQuery table.
generate_batch_events.sh
script. You can examine the dataset using the following code:To output your pipeline’s final PCollection
s, you apply a Write transform to that PCollection
. Write transforms can output the elements of a PCollection
to an external data sink, such as a database table. You can use Write to output a PCollection
at any time in your pipeline, although you’ll typically write out data at the end of your pipeline.
The following example code shows how to apply a WriteToText
transform to write a PCollection
of string to a text file:
WriteToText
, use WriteToBigQuery
.This function requires a number of things to be specified, including the specific table to write to and the schema of this table. You can optionally specify whether to append to an existing table, recreate existing tables (helpful in early pipeline iteration), or create the table if it doesn't exist. By default, this transform will create tables that don't exist and won't write to a non-empty table.
str
), ID (of type int
) and balance (of type float
). Then we can specify the schema in a single line:Or specify it as JSON:
In the first case (the single string), all fields are assumed to be NULLABLE
. We can specify the mode if we use the JSON approach instead.
WRITE_TRUNCATE
will delete and recreate your table each and every time. This is helpful in early pipeline iteration, especially as you are iterating on your schema, but can easily cause unintended issues in production. WRITE_APPEND
or WRITE_EMPTY
are safer.Remember to define the table schema and add the BigQuery sink to your pipeline. Remember, if you get stuck, refer to the solution.
DataflowRunner
to run the pipeline on Dataflow.The overall shape should be a single path, starting with the Read
transform and ending with the Write
transform. As your pipeline runs, workers will be added automatically, as the service determines the needs of your pipeline, and then disappear when they are no longer needed. You can observe this by navigating to Compute Engine, where you should see virtual machines created by the Dataflow service.
runner
back to DirectRunner
to run it locally and receive faster feedback. This approach works in this case because the dataset is small and you are not using any features that aren't supported by DirectRunner
.
If your code isn’t performing as expected and you don’t know what to do, check out the solution.
Click Check my progress to verify the objective.
Approximately 20 minutes
Much of the work of data engineers is either predictable, like recurring jobs, or it’s similar to other work. However, the process for running pipelines requires engineering expertise. Think back to the steps that you just completed:
It would be much better if there were a way to initiate a job through an API call or without having to set up a development environment (which non-technical users would be unable to do). This would also allow you to run pipelines.
Dataflow Templates seek to solve this problem by changing the representation that is created when a pipeline is compiled so that it is parameterizable. Unfortunately, it is not as simple as exposing command-line parameters, although that is something you do in a later lab. With Dataflow Templates, the workflow above becomes:
In this lab, you will practice using one of the many Google-created Dataflow Templates to accomplish the same task as the pipeline that you built in Part 1.
Just like before, you must pass the Dataflow Template a JSON file representing the schema in this example.
logs.logs
table:sed
commands are to build a full JSON object that Dataflow will expect.Click Check my progress to verify the objective.
The Cloud Storage to BigQuery Dataflow Template requires a JavaScript function to convert the raw text into valid JSON. In this case, each line of text is valid JSON, so the function is somewhat trivial.
To complete this task, create a New File in the dataflow_python folder in the file explorer of your IDE.
To create New File, click on File >> New >> Text File.
Rename the file name as transform.js, to rename the file name right click on it.
Open transform.js file in the editor panel, click on the file to open it.
Copy the function below to the transform.js file and save it:
Click Check my progress to verify the objective.
events.json
in the form schema.json
file, in the form .js
, in the form transform
.While your job is running, you may inspect it from within the Dataflow Web UI.
Click Check my progress to verify the objective.
The code for the Dataflow Template you just used is located in this TextIOToBigQuery guide.
Scroll down to the main method. The code should look familiar to the pipeline you authored!
Pipeline
object, created using a PipelineOptions
object.PTransform
s, beginning with a TextIO.read() transform.Make sure to check out the next lab, which will cover making pipelines that are not simply chains of PTransform
s, and how you can adapt a pipeline you’ve built to be a custom Dataflow Template.
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
This content is not currently available
We will notify you via email when it becomes available
Great!
We will contact you via email if it becomes available
One lab at a time
Confirm to end all existing labs and start this one