arrow_back

Serverless Data Processing with Dataflow - Advanced Streaming Analytics Pipeline with Cloud Dataflow (Java)

Sign in Join
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Serverless Data Processing with Dataflow - Advanced Streaming Analytics Pipeline with Cloud Dataflow (Java)

Lab 1 hour 30 minutes universal_currency_alt 7 Credits show_chart Advanced
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Overview

In this lab, you:

  • Deal with late data.
  • Deal with malformed data by:
    1. Writing a composite transform for more modular code.
    2. Writing a transform that emits multiple outputs of different types.
    3. Collecting malformed data and writing it to a location where it can be examined.

The end of the previous lab introduces one sort of challenge that real-time pipelines must contend with: the gap between when events transpire and when they are processed, also known as lag. This lab introduces Apache Beam concepts that allow pipeline creators to specify how their pipelines should deal with lag in a formal way.

But lag isn’t the only sort of problem that pipelines are likely to encounter in a streaming context: whenever input comes from outside the system, there is always the possibility that it will be malformed in some way. This lab also introduces techniques that can be used to deal with such inputs.

The final pipeline in this lab resembles the picture below. Note that it contains a branch.

The final pipeline architecture diagram

Setup and requirements

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Qwiklabs using an incognito window.

  2. Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
    There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

  4. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.

  5. Click Open Google Console.

  6. Click Use another account and copy/paste credentials for this lab into the prompts.
    If you use other credentials, you'll receive errors or incur charges.

  7. Accept the terms and skip the recovery resource page.

Check project permissions

Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (Navigation menu icon), select IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud Overview > Dashboard.

Compute Engine default service account name and editor status highlighted on the Permissions tabbed page

Note: If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.
  1. In the Google Cloud console, on the Navigation menu, click Cloud Overview > Dashboard.
  2. Copy the project number (e.g. 729328892908).
  3. On the Navigation menu, select IAM & Admin > IAM.
  4. At the top of the roles table, below View by Principals, click Grant Access.
  5. For New principals, type:
{project-number}-compute@developer.gserviceaccount.com
  1. Replace {project-number} with your project number.
  2. For Role, select Project (or Basic) > Editor.
  3. Click Save.

For the purposes of this lab, you will mainly be using a Theia Web IDE hosted on Google Compute Engine. It has the lab repo pre-cloned. There is java langauge server support, as well as a terminal for programmatic access to Google Cloud APIs via the gcloud command line tool, similar to Cloud Shell.

  1. To access your Theia IDE, copy and paste the link shown in Google Cloud Skills Boost to a new tab.
Note: You may need to wait 3-5 minutes for the environment to be fully provisioned, even after the url appears. Until then you will see an error in the browser.

Credentials pane displaying the ide_url

The lab repo has been cloned to your environment. Each lab is divided into a labs folder with code to be completed by you, and a solution folder with a fully workable example to reference if you get stuck.

  1. Click on the File Explorer button to look:

Expanded File Explorer menu with the labs folder highlighted

You can also create multiple terminals in this environment, just as you would with cloud shell:

New Terminal option highlighted in the Terminal menu

You can see with by running gcloud auth list on the terminal that you're logged in as a provided service account, which has the exact same permissions are your lab user account:

Terminal dislaying the gcloud auth list command

If at any point your environment stops working, you can try resetting the VM hosting your IDE from the GCE console like this:

Both the Reset button and VM instance name highlighted on the VM instances page

Lab part 1. Dealing with late data

In the previous labs, you wrote code that divided elements by event time into windows of fixed width, using code that looked like the following:

commonLogs .apply("WindowCommonLogs", Window.into( FixedWindows.of( Duration.standardMinutes( options.getWindowDuration())))) .apply("CountEventsPerWindow", Combine.globally( Count.<CommonLog>combineFn()).withoutDefaults());

However, as you saw at the end of the last non-SQL lab, streams of data often have lag. Lag is problematic when windowing using event time (as opposed to processing time) because it introduces uncertainty: have all of the events for a particular point in event time actually arrived, or haven’t they?

Clearly, in order to output results, the pipeline you wrote needed to make a decision in this respect. It did so using a concept called a watermark. A watermark is the system’s heuristic-based notion of when all data up to a certain point in event time can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data and is simply dropped. So, the default windowing behavior is to emit a single, hopefully complete result when the system is confident that it has all of the data.

Apache Beam uses a number of heuristics to make an educated guess about what the watermark is. However, these are still heuristics. More to the point, those heuristics are general-purpose and are not suitable for all use cases. Instead of using general-purpose heuristics, pipeline designers need to thoughtfully consider the following questions in order to determine what tradeoffs are appropriate:

  • Completeness: How important is it to have all of your data before you compute your result?
  • Latency: How long do you want to wait for data? For example, do you wait until you think you have all data, or do you process data as it arrives?
  • Cost: How much compute power and money are you willing to spend to lower the latency?

Armed with those answers, it’s possible to use Apache Beam’s formalisms to write code that makes the right tradeoff.

Allowed lateness

Allowed lateness controls how long a window should retain its state; once the watermark reaches the end of the allowed lateness period, all state is dropped. While it would be great to be able to keep all of our persistent state around until the end of time, in reality, when dealing with an unbounded data source, it’s often not practical to keep a given window's state indefinitely; we’ll eventually run out of disk space.

As a result, any real-world, out-of-order processing system needs to provide some way to bound the lifetimes of the windows it’s processing. A clean and concise way of doing this is by defining a horizon on the allowed lateness within the system, i.e. placing a bound on how late any given record may be (relative to the watermark) for the system to bother processing it; any data that arrive after this horizon are simply dropped. Once you’ve bounded how late individual data may be, you’ve also established precisely how long the state for windows must be kept around: until the watermark exceeds the lateness horizon for the end of the window.

Task 1. Prepare the environment

As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before.

Open the appropriate lab

  1. Create a new terminal in your IDE environment, if you haven't already, and copy and paste the following command:
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/labs # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Set up the data environment:
# Create GCS buckets, BQ dataset, and Pubsub Topic cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Click Check my progress to verify the objective. Prepare the environment

Task 2. Set allowed lateness

In Apache Beam, allowed lateness is set using the withAllowedLateness() method, as in the example below:

PCollection<String> items = ...; PCollection<String> windowed_items = items.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(1)));
  • To complete this task, examine the windowing transform and add a call to .withAllowedLateness(), passing in a valid Duration constructed from the appropriate command-line parameter. Use your judgment as to what a reasonable value should be and update the command line to reflect the right units.

Triggers

Pipeline designers also have discretion over when to emit preliminary results. For example, say that the watermark for the end of a window has not yet been reached but 75% of the expected data have already arrived. In many cases, such a sample can be assumed to be representative, making it worth showing to end users.

Triggers determine at what point during processing time results will be materialized. Each specific output for a window is referred to as a pane of the window. Triggers fire panes when a trigger's conditions are met. In Apache Beam, those conditions include watermark progress, processing time progress (which will progress uniformly, regardless of how much data has actually arrived), element counts (such as when a certain amount of new data arrives), and data-dependent triggers, like when the end of a file is reached.

A trigger’s conditions may lead it to fire a pane many times. Consequently, it’s also necessary to specify how to accumulate these results. Apache Beam currently supports two accumulation modes, one which accumulates results together and the other which returns only the portions of the result that are new since the last pane fired.

Task 3. Set a trigger

When you set a windowing function for a PCollection by using the Window transform, you can also specify a trigger.

You set the trigger(s) for a PCollection by invoking the method .triggering() on the result of your Window.into() transform. Window.triggering() accepts a trigger as its argument. Apache Beam comes with a number of provided triggers:

  • AfterWatermark for firing when the watermark passes a timestamp determined from either the end of the window or the arrival of the first element in a pane.
  • AfterProcessingTime for firing after some amount of processing time has elapsed (typically since the first element in a pane).
  • AfterPane for firing off a property of the elements in the current pane, such as the number of elements that have been assigned to the current pane.

This code sample sets a time-based trigger for a PCollection which emits results one minute after the first element in that window has been processed. The last line in the code sample, .discardingFiredPanes(), sets the window’s accumulation mode:

PCollection<String> pc = ...; pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)) .discardingFiredPanes());
  • To complete this task, add a call to Window.triggering() inside the windowing transform, passing in a valid Trigger. When designing your trigger, keep in mind this use case, in which data are windowed into one-minute windows and data can arrive late.

If you want an example of a trigger, take a look at the solution.

Lab part 2. Dealing with malformed data

Depending on how you set up your Trigger, if you were to run the pipeline right now and compare it to the pipeline from the prior lab, you might notice that the new pipeline presents results earlier. It’s also possible that its results might be more accurate, if the heuristics did a poor job of predicting streaming behavior and the allowed lateness is better.

However, while the current pipeline is more robust to lateness, it is still vulnerable to malformed data. If you were to run the pipeline and publish a message containing anything but a well-formed JSON string that could be parsed into a CommonLog, the pipeline would generate an error. Although tools like Cloud Logging make it straightforward to read those errors, a better-designed pipeline will store these in a pre-defined location for later inspection.

In this section, you add components to the pipeline that make it both more modular as well as more robust.

Task 1. Collect malformed data

In order to be more robust to malformed data, the pipeline needs a way of filtering out this data and branching to process it differently. You have already seen one way to introduce a branch into a pipeline: by making one PCollection the input for multiple transforms.

This form of branching is powerful. However, there are some use cases where this strategy is inefficient. For example, say you want to create two different subsets of the same PCollection. Using the multiple transform method, you would create one filter transform for each subset and apply them both to the original PCollection. However, this would process each element twice.

An alternative method for producing a branching pipeline is to have a single transform produce multiple outputs while processing the input PCollection one time. In this task, you write a transform that produces multiple outputs, the first of which are the results obtained from well-formed data and second of which are the malformed elements from the original input stream.

In order to emit multiple results while still creating only a single PCollection, Apache Beam uses a class called PCollectionTuple. A PCollectionTuple is an immutable tuple of heterogeneously typed PCollection, "keyed" by TupleTag.

Here is an example of a PCollectionTuple being instantiated with two different types of PCollections. Then, those PCollection are retrieved using the PCollectionTuple.get() method:

PCollection<String> pc1 = ...; PCollection<Integer> pc2 = ...; TupleTag<String> tag1 = new TupleTag<>(); TupleTag<Integer> tag2 = new TupleTag<>(); PCollectionTuple pcs = PCollectionTuple.of(tag1, pc1) .and(tag2, pc2); PCollection<Integer> pcX = pcs.get(tag1); PCollection<String> pcY = pcs.get(tag2);

To use this method in the context of a PTransform, you can write code like the following example, which assigns a TupleTag to an element based on its contents:

final TupleTag<String> aTag = new TupleTag<String>(){}; final TupleTag<String> bTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = input.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // Emit to main output, which is the output with tag aTag. c.output(c.element()); } else if(c.element().startsWith("B")) { // Emit to output with tag bTag. c.output(bTag, c.element()); } } }) // Specify main output. In this example, it is the output // with tag startsWithATag. .withOutputTags(aTag, // Specify the output with tag bTag, as a TupleTagList. TupleTagList.of(bTag))); // Get subset of the output with tag bTag. mixedCollection.get(aTag).apply(...); // Get subset of the output with tag startsWithBTag. mixedCollection.get(bTag).apply(...);
  • To complete this task, declare two TupleTag constants at the top of the class, and change the JsonToCommonLog transform so that it returns a PCollectionTuple and tags unparsed elements with one tag and parsed elements with the other. Instead of an if/then/else block, use a try/catch statement.

Task 2. Make code more modular with a composite transform

Transforms can have a nested structure, where a complex transform performs multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey, or even other composite transforms). These transforms are called composite transforms. Nesting multiple transforms inside a single composite transform can make your code more modular and easier to understand.

  1. To create your own composite transform, create a subclass of the PTransform class and override the expand method to specify the actual processing logic. For the PTransform class type parameters, you pass the PCollection types that your transform takes as input, and produces as output.

The following code sample shows how to declare a PTransform that accepts a PCollection of strings for input, and outputs a PCollection of integers:

#TODO: JsonToRow

static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
  1. Within your PTransform subclass, you’ll need to override the expand method. The expand method is where you add the processing logic for the PTransform. Your override of the expand method must accept the appropriate type of input PCollection as a parameter, and specify the output PCollection as the return value.
static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // transform logic goes here ... } }
  1. To invoke your transform, use PCollection.apply() on the PCollection and pass an instance of the composite transform:
PCollection<Integer> i = stringPColl.apply("CompositeTransform", new MyCompositeTransform());
  1. To complete this task, take the JsonToCommonLog transform that you just modified and convert it into a composite transform. Note that this will cause an issue with the current write transform, which expects instances of CommonLog. Save the results of the composite transform in a new PCollectionTuple and use .get() to retrieve the PCollection that the write transform expects.

Task 3. Write malformed data for later analysis

In order to fix the upstream problem that is producing malformed data, it is important to be able to analyze the malformed data. Doing so requires materializing it somewhere. In this task, you write malformed data to Google Cloud Storage. This pattern is called using dead-letter storage.

In previous labs, you wrote directly from a bounded source (batch) to Cloud Storage using TextIO.write(). However, when writing from an unbounded source (streaming), this approach needs to be modified slightly.

Firstly, upstream of the write transform, you need to use a Trigger to specify when, in processing time, to write. Otherwise, if the defaults are left, the write will never take place. By default, every event belongs to the Global Window. When operating in batch, this is fine because the full data set is known at run time. However, with unbounded sources, the full dataset size is unknown and so Global Window panes never fire, as they are never complete.

Because you are using a Trigger, you also need to use a Window. However, you don’t necessarily need to change the window. In previous labs and tasks, you have used windowing transforms to replace the global window with a window of fixed duration in event time. In this case, which elements are grouped together is not as important as that the results be materialized in a useful manner and at a useful rate.

In the example below, the window fires the Global Window pane after every 10 seconds of processing time but only writes new events:

pCollection.apply("FireEvery10s", Window.<String>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes())

Once you’ve set a Trigger, you need to modify the call to TextIO.write() to perform the writes. When writing downstream of a windowing transform, chain a call to withWindowedWrites() and specify a number of shards so that writing can be done in parallel:

fixedWindowedItems.apply( "WriteWindowedPCollection", TextIO .write() .to("gs://path/to/somewhere") .withWindowedWrites() .withNumShards(NUM_SHARDS));
  • To complete this task, create a new transform using .get() on the PCollectionTuple to retrieve the malformed data. Use your judgment and knowledge of triggers to set appropriate firing conditions for this trigger.

Task 4. Run your pipeline

  1. To run your pipeline, construct a command resembling the example below. Note that it will need to be modified to reflect the names of any command-line options that you have included.
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --allowedLateness=${ALLOWED_LATENESS} \ --outputTableName=${OUTPUT_TABLE_NAME} \ --deadletterBucket=${DEADLETTER_BUCKET}"

The code for this quest includes a script for publishing JSON events using Pub/Sub.

  1. To complete this task and start publishing messages, open a new terminal side-by-side with your current one and run the following script. It will keep publishing messages until you kill the script. Make sure you are in the training-data-analyst/quests/dataflow folder.
Note: The true flag adds late events to the stream. bash generate_streaming_events.sh true

Click Check my progress to verify the objective. Run your pipeline

Task 5. Test your pipeline

  1. Navigate to Pub/Sub > Topics and click on the topic my_topic.
  2. Click on Messages tab and then click on the Publish Message button.
  3. On the following page, enter a message to be published.

So long as it doesn’t conform perfectly to the CommonLog JSON spec, it should arrive in the dead-letter Cloud Storage bucket shortly. You can trace its path through the pipeline by returning to the pipeline monitoring window and clicking on a node in the branch responsible for handling unparsed messages.

  1. Once you see an element added to this branch, you can then navigate to Cloud Storage and verify that the message has been written to disk:
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID}/deadletter gsutil ls $BUCKET gsutil cat $BUCKET/*/*

Click Check my progress to verify the objective. Test your pipeline

End your lab

When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.