检查点
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
Serverless Data Processing with Dataflow - Advanced Streaming Analytics Pipeline with Dataflow (Java)
- Overview
- Setup and requirements
- Lab part 1. Dealing with late data
- Task 1. Prepare the environment
- Task 2. Set allowed lateness
- Task 3. Set a trigger
- Lab part 2. Dealing with malformed data
- Task 1. Collect malformed data
- Task 2. Make code more modular with a composite transform
- Task 3. Write malformed data for later analysis
- Task 4. Run your pipeline
- Task 5. Test your pipeline
- End your lab
Overview
In this lab, you:
- Deal with late data.
- Deal with malformed data by:
- Writing a composite transform for more modular code.
- Writing a transform that emits multiple outputs of different types.
- Collecting malformed data and writing it to a location where it can be examined.
The end of the previous lab introduces one sort of challenge that real-time pipelines must contend with: the gap between when events transpire and when they are processed, also known as lag. This lab introduces Apache Beam concepts that allow pipeline creators to specify how their pipelines should deal with lag in a formal way.
But lag isn’t the only sort of problem that pipelines are likely to encounter in a streaming context: whenever input comes from outside the system, there is always the possibility that it will be malformed in some way. This lab also introduces techniques that can be used to deal with such inputs.
The final pipeline in this lab resembles the picture below. Note that it contains a branch.
Setup and requirements
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
-
Sign in to Qwiklabs using an incognito window.
-
Note the lab's access time (for example,
1:15:00
), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning. -
When ready, click Start lab.
-
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
-
Click Open Google Console.
-
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges. -
Accept the terms and skip the recovery resource page.
Check project permissions
Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).
-
In the Google Cloud console, on the Navigation menu (), select IAM & Admin > IAM.
-
Confirm that the default compute Service Account
{project-number}-compute@developer.gserviceaccount.com
is present and has theeditor
role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.
editor
role, follow the steps below to assign the required role.- In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
- Copy the project number (e.g.
729328892908
). - On the Navigation menu, select IAM & Admin > IAM.
- At the top of the roles table, below View by Principals, click Grant Access.
- For New principals, type:
- Replace
{project-number}
with your project number. - For Role, select Project (or Basic) > Editor.
- Click Save.
For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud
command line tool, similar to Cloud Shell.
- To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
The lab repo has been cloned to your environment. Each lab is divided into a labs
folder with code to be completed by you, and a solution
folder with a fully workable example to reference if you get stuck.
- Click on the
File Explorer
button to look:
You can also create multiple terminals in this environment, just as you would with cloud shell:
You can see with by running gcloud auth list
on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:
If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:
Lab part 1. Dealing with late data
In the previous labs, you wrote code that divided elements by event time into windows of fixed width, using code that looked like the following:
However, as you saw at the end of the last non-SQL lab, streams of data often have lag. Lag is problematic when windowing using event time (as opposed to processing time) because it introduces uncertainty: have all of the events for a particular point in event time actually arrived, or haven’t they?
Clearly, in order to output results, the pipeline you wrote needed to make a decision in this respect. It did so using a concept called a watermark. A watermark is the system’s heuristic-based notion of when all data up to a certain point in event time can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data and is simply dropped. So, the default windowing behavior is to emit a single, hopefully complete result when the system is confident that it has all of the data.
Apache Beam uses a number of heuristics to make an educated guess about what the watermark is. However, these are still heuristics. More to the point, those heuristics are general-purpose and are not suitable for all use cases. Instead of using general-purpose heuristics, pipeline designers need to thoughtfully consider the following questions in order to determine what tradeoffs are appropriate:
- Completeness: How important is it to have all of your data before you compute your result?
- Latency: How long do you want to wait for data? For example, do you wait until you think you have all data, or do you process data as it arrives?
- Cost: How much compute power and money are you willing to spend to lower the latency?
Armed with those answers, it’s possible to use Apache Beam’s formalisms to write code that makes the right tradeoff.
Allowed lateness
Allowed lateness controls how long a window should retain its state; once the watermark reaches the end of the allowed lateness period, all state is dropped. While it would be great to be able to keep all of our persistent state around until the end of time, in reality, when dealing with an unbounded data source, it’s often not practical to keep a given window's state indefinitely; we’ll eventually run out of disk space.
As a result, any real-world, out-of-order processing system needs to provide some way to bound the lifetimes of the windows it’s processing. A clean and concise way of doing this is by defining a horizon on the allowed lateness within the system, i.e. placing a bound on how late any given record may be (relative to the watermark) for the system to bother processing it; any data that arrive after this horizon are simply dropped. Once you’ve bounded how late individual data may be, you’ve also established precisely how long the state for windows must be kept around: until the watermark exceeds the lateness horizon for the end of the window.
Task 1. Prepare the environment
As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before.
Open the appropriate lab
- Create a new terminal in your IDE environment, if you haven't already, and copy and paste the following command:
- Set up the data environment:
Click Check my progress to verify the objective.
Task 2. Set allowed lateness
- In your IDE, open
StreamingMinuteTrafficPipeline.java
which is located in7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline
.
In Apache Beam, allowed lateness is set using the withAllowedLateness() method, as in the example below:
- To complete this task, examine the windowing transform and add a call to
.withAllowedLateness()
, passing in a validDuration
constructed from the appropriate command-line parameter. Use your judgment as to what a reasonable value should be and update the command line to reflect the right units.
Triggers
Pipeline designers also have discretion over when to emit preliminary results. For example, say that the watermark for the end of a window has not yet been reached but 75% of the expected data have already arrived. In many cases, such a sample can be assumed to be representative, making it worth showing to end users.
Trigger
s determine at what point during processing time results will be materialized. Each specific output for a window is referred to as a pane of the window. Triggers fire panes when a trigger's conditions are met. In Apache Beam, those conditions include watermark progress, processing time progress (which will progress uniformly, regardless of how much data has actually arrived), element counts (such as when a certain amount of new data arrives), and data-dependent triggers, like when the end of a file is reached.
A trigger’s conditions may lead it to fire a pane many times. Consequently, it’s also necessary to specify how to accumulate these results. Apache Beam currently supports two accumulation modes, one which accumulates results together and the other which returns only the portions of the result that are new since the last pane fired.
Task 3. Set a trigger
When you set a windowing function for a PCollection
by using the Window
transform, you can also specify a trigger.
You set the trigger(s) for a PCollection
by invoking the method .triggering()
on the result of your Window.into()
transform. Window.triggering() accepts a trigger as its argument. Apache Beam comes with a number of provided triggers:
- AfterWatermark for firing when the watermark passes a timestamp determined from either the end of the window or the arrival of the first element in a pane.
- AfterProcessingTime for firing after some amount of processing time has elapsed (typically since the first element in a pane).
- AfterPane for firing off a property of the elements in the current pane, such as the number of elements that have been assigned to the current pane.
This code sample sets a time-based trigger for a PCollection
which emits results one minute after the first element in that window has been processed. The last line in the code sample, .discardingFiredPanes()
, sets the window’s accumulation mode:
- To complete this task, add a call to
Window.triggering()
inside the windowing transform, passing in a validTrigger
. When designing your trigger, keep in mind this use case, in which data are windowed into one-minute windows and data can arrive late.
If you want an example of a trigger, take a look at the solution.
Lab part 2. Dealing with malformed data
Depending on how you set up your Trigger
, if you were to run the pipeline right now and compare it to the pipeline from the prior lab, you might notice that the new pipeline presents results earlier. It’s also possible that its results might be more accurate, if the heuristics did a poor job of predicting streaming behavior and the allowed lateness is better.
However, while the current pipeline is more robust to lateness, it is still vulnerable to malformed data. If you were to run the pipeline and publish a message containing anything but a well-formed JSON string that could be parsed into a CommonLog
, the pipeline would generate an error. Although tools like Cloud Logging make it straightforward to read those errors, a better-designed pipeline will store these in a pre-defined location for later inspection.
In this section, you add components to the pipeline that make it both more modular as well as more robust.
Task 1. Collect malformed data
In order to be more robust to malformed data, the pipeline needs a way of filtering out this data and branching to process it differently. You have already seen one way to introduce a branch into a pipeline: by making one PCollection
the input for multiple transforms.
This form of branching is powerful. However, there are some use cases where this strategy is inefficient. For example, say you want to create two different subsets of the same PCollection
. Using the multiple transform method, you would create one filter transform for each subset and apply them both to the original PCollection
. However, this would process each element twice.
An alternative method for producing a branching pipeline is to have a single transform produce multiple outputs while processing the input PCollection
one time. In this task, you write a transform that produces multiple outputs, the first of which are the results obtained from well-formed data and second of which are the malformed elements from the original input stream.
In order to emit multiple results while still creating only a single PCollection
, Apache Beam uses a class called PCollectionTuple
. A PCollectionTuple is an immutable tuple of heterogeneously typed PCollection
, "keyed" by TupleTag
.
Here is an example of a PCollectionTuple
being instantiated with two different types of PCollections. Then, those PCollection
are retrieved using the PCollectionTuple.get()
method:
To use this method in the context of a PTransform,
you can write code like the following example, which assigns a TupleTag
to an element based on its contents:
- To complete this task, declare two
TupleTag
constants at the top of the class, and change theJsonToCommonLog
transform so that it returns aPCollectionTuple
and tags unparsed elements with one tag and parsed elements with the other. Instead of anif/then/else
block, use atry/catch
statement.
Task 2. Make code more modular with a composite transform
Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo
, Combine
, GroupByKey
, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.
- To create your own composite transform, create a subclass of the
PTransform
class and override the expand method to specify the actual processing logic. For thePTransform
class type parameters, you pass thePCollection
types that your transform takes as input, and produces as output.
The following code sample shows how to declare a PTransform
that accepts a PCollection
of strings for input, and outputs a PCollection
of integers:
#TODO: JsonToRow
- Within your
PTransform
subclass, you’ll need to override the expand method. The expand method is where you add the processing logic for thePTransform
. Your override of the expand method must accept the appropriate type of inputPCollection
as a parameter, and specify the outputPCollection
as the return value.
- To invoke your transform, use
PCollection.apply()
on thePCollection
and pass an instance of the composite transform:
- To complete this task, take the
JsonToCommonLog
transform that you just modified and convert it into a composite transform. Note that this will cause an issue with the current write transform, which expects instances ofCommonLog
. Save the results of the composite transform in a newPCollectionTuple
and use.get()
to retrieve thePCollection
that the write transform expects.
Task 3. Write malformed data for later analysis
In order to fix the upstream problem that is producing malformed data, it is important to be able to analyze the malformed data. Doing so requires materializing it somewhere. In this task, you write malformed data to Google Cloud Storage. This pattern is called using dead-letter storage.
In previous labs, you wrote directly from a bounded source (batch) to Cloud Storage using TextIO.write()
. However, when writing from an unbounded source (streaming), this approach needs to be modified slightly.
Firstly, upstream of the write transform, you need to use a Trigger
to specify when, in processing time, to write. Otherwise, if the defaults are left, the write will never take place. By default, every event belongs to the Global Window. When operating in batch, this is fine because the full data set is known at run time. However, with unbounded sources, the full dataset size is unknown and so Global Window panes never fire, as they are never complete.
Because you are using a Trigger
, you also need to use a Window
. However, you don’t necessarily need to change the window. In previous labs and tasks, you have used windowing transforms to replace the global window with a window of fixed duration in event time. In this case, which elements are grouped together is not as important as that the results be materialized in a useful manner and at a useful rate.
In the example below, the window fires the Global Window pane after every 10 seconds of processing time but only writes new events:
Once you’ve set a Trigger
, you need to modify the call to TextIO.write()
to perform the writes. When writing downstream of a windowing transform, chain a call to withWindowedWrites()
and specify a number of shards so that writing can be done in parallel:
- To complete this task, create a new transform using
.get()
on thePCollectionTuple
to retrieve the malformed data. Use your judgment and knowledge of triggers to set appropriate firing conditions for this trigger.
Task 4. Run your pipeline
- To run your pipeline, construct a command resembling the example below. Note that it will need to be modified to reflect the names of any command-line options that you have included.
The code for this quest includes a script for publishing JSON events using Pub/Sub.
- To complete this task and start publishing messages, open a new terminal side-by-side with your current one and run the following script. It will keep publishing messages until you kill the script. Make sure you are in the
training-data-analyst/quests/dataflow
folder.
true
flag adds late events to the stream.Click Check my progress to verify the objective.
Task 5. Test your pipeline
-
On the Google Cloud console title bar, type Pub/Sub in the Search field, then click Pub/Sub in the Products & Pages section.
-
Click Topics, and then click on the topic
my_topic
. -
Click on Messages tab and then click on the Publish Message button.
-
On the following page, enter a message to be published.
So long as it doesn’t conform perfectly to the CommonLog
JSON spec, it should arrive in the dead-letter Cloud Storage bucket shortly. You can trace its path through the pipeline by returning to the pipeline monitoring window and clicking on a node in the branch responsible for handling unparsed messages.
- Once you see an element added to this branch, you can then navigate to Cloud Storage and verify that the message has been written to disk:
Click Check my progress to verify the objective.
End your lab
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
- 1 star = Very dissatisfied
- 2 stars = Dissatisfied
- 3 stars = Neutral
- 4 stars = Satisfied
- 5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.