arrow_back

Processamento de dados sem servidor com o Dataflow: pipeline de análise avançada de streaming com o Cloud Dataflow (Java)

Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Processamento de dados sem servidor com o Dataflow: pipeline de análise avançada de streaming com o Cloud Dataflow (Java)

Lab 1 hora 30 minutos universal_currency_alt 7 créditos show_chart Avançado
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Visão geral

Neste laboratório, você vai fazer o seguinte:

  • Lidar com dados atrasados
  • Corrigir dados mal formatados:
    1. escrevendo uma transformação composta para criar um código mais modular;
    2. escrevendo uma transformação que gere respostas de vários tipos;
    3. coletar dados com erro de formatação e gravá-los em um local onde poderão ser examinados.

Ao final laboratório anterior, vimos um desafio comum para os pipelines em tempo real, que é a lacuna entre a ocorrência e o processamento dos eventos, também conhecida como atraso. Este laboratório apresenta conceitos do Apache Beam que os designers de pipelines podem usar para definir formalmente como lidar com esse atraso.

Além do atraso, existem outros problemas que podem afetar os pipelines em um contexto de streaming, como erros de formatação em entradas de fora do sistema. Também vamos ver técnicas para processar essas entradas.

Quando este laboratório terminar, o pipeline final vai ficar parecido com o da imagem abaixo. Observe que ele tem uma ramificação.

O diagrama final da arquitetura do pipeline

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Verifique as permissões do projeto

Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).

  1. No console do Google Cloud, em Menu de navegação (Ícone do menu de navegação), selecione IAM e administrador > IAM.

  2. Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.

Nome da conta de serviço padrão e status do editor do Compute Engine destacados na página com a guia "Permissões"

Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
  1. No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
  2. Copie o número do projeto, por exemplo, 729328892908.
  3. Em Menu de navegação, clique em IAM e administrador > IAM.
  4. Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
  5. Em Novos principais, digite:
{project-number}-compute@developer.gserviceaccount.com
  1. Substitua {project-number} pelo número do seu projeto.
  2. Em Papel, selecione Projeto (ou Básico) > Editor.
  3. Clique em Save.

Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud, similar ao Cloud Shell.

Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.

OBSERVAÇÃO: mesmo depois que o URL aparecer, talvez você precise esperar de três a cinco minutos para o ambiente ser totalmente provisionado. Até isso acontecer, uma mensagem de erro será exibida no navegador.

ide_url

O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs com códigos que você vai concluir e uma pasta solution com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer para conferir:

file_explorer

Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:

new_terminal

Outra opção de visualização é executar gcloud auth list no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:

gcloud_auth

Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:

gce_reset

Parte 1 do laboratório: como lidar com dados atrasados

Nos laboratórios anteriores, você escreveu um código que dividia os elementos pela hora do evento em janelas de largura fixa. O código era parecido com este:

commonLogs .apply("WindowCommonLogs", Window.into( FixWindows.of( Duration.standardMinutes( options.getWindowDuration())) ))) .apply("CountEventsPerWindow", Combine.globally( Count.<CommonLog>combineFn()).withoutDefaults());

No entanto, como você viu ao final do último laboratório non-SQL, o atraso é comum em streams de dados. Isso pode ser um problema quando as janelas são criadas com base na hora do evento, e não no tempo de processamento dele, porque não é possível afirmar que os eventos chegaram naquele momento específico.

Para gerar resultados, seu pipeline precisou tomar uma decisão e fez isso usando um conceito chamado marca-d'água. Uma marca d'água é a noção do sistema baseada em heurística de quando todos os dados, até um certo ponto no horário do evento, podem ter chegado ao pipeline. Quando a marca d'água passar do fim de uma janela, qualquer outro elemento que chegar com um carimbo de data/hora dessa janela é considerado um dado atrasado, sendo simplesmente descartado. Portanto, o comportamento padrão do janelamento é emitir um único resultado completo quando o sistema tiver certeza de que recebeu todos os dados.

O Apache Beam usa várias heurísticas para tomar uma decisão embasada sobre o que é a marca d'água. De toda forma, ainda são heurísticas. Em outras palavras, essas heurísticas são de uso geral e podem não ser relevantes para alguns casos. Em vez de empregar heurísticas de uso geral, os designers de pipelines precisam considerar as seguintes perguntas para determinar quais compensações são apropriadas:

  • Totalidade: qual é a importância de ter todos os dados antes de calcular o resultado?
  • Latência: quanto tempo você quer esperar pelos dados? Por exemplo, você vai esperar por todos eles ou prefere processar os dados assim que chegarem?
  • Custo: quanto você se dispõe a gastar em dinheiro e capacidade de computação para reduzir a latência?

Com base nas respostas, é possível usar os formalismos do Apache Beam para escrever um código que faça a compensação certa.

Atraso permitido

O atraso permitido define por quanto tempo uma janela deve manter um estado. Quando a marca d'água atinge o fim do período de atraso permitido, o estado é descartado. Seria ótimo poder manter o estado de uma janela indefinidamente, mas não é prático fazer isso com uma fonte de dados ilimitada porque o espaço em disco um dia acaba.

Logo, qualquer sistema de processamento real e fora de ordem precisa oferecer uma forma de vincular os ciclos de vida das janelas que está processando. Uma forma simples e rápida de fazer isso é definir um limite para o atraso permitido dentro do sistema. Por exemplo, indicar para o sistema até quando processar registros em relação à marca d'água. Os dados que chegarem depois desse limite estabelecido serão descartados. Ao definir o limite de atraso de dados individuais, você também estabelece com precisão por quanto tempo o estado das janelas precisa ser mantido: até que a marca d'água exceda o horizonte de atraso para o fim da janela.

Tarefa 1: prepare o ambiente

Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes.

Abra o laboratório apropriado

  1. Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso. Depois copie e cole este comando:
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/labs # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Configure o ambiente de dados:
# Create GCS buckets, BQ dataset, and Pubsub Topic cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Clique em Verificar meu progresso para conferir o objetivo. Prepare o ambiente

Tarefa 2: defina o atraso permitido

No Apache Beam, o atraso permitido é definido pelo método withAllowedLateness(), como no exemplo abaixo:

PCollection<String> items = ...; PCollection<String> windowed_items = items.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(1)));
  • Para concluir essa tarefa, analise a transformação do janelamento e adicione uma chamada para .withAllowedLateness(), transmitindo um valor válido para Duration, determinado com o parâmetro de linha de comando apropriado. Escolha um valor razoável e atualize a linha de comando para refletir as unidades certas.

Gatilhos

Designers de pipeline também podem escolher quando gerar resultados preliminares. Por exemplo, se a marca d'água do final de uma janela não tiver sido alcançada, mas 75% dos dados esperados já tiverem chegado. Em muitos casos, essa porcentagem seria representativa o bastante para mostrar os resultados aos usuários finais.

Um Trigger (gatilho) determina em que ponto do tempo de processamento os resultados são mostrados. Cada resposta gerada para uma janela específica é chamada de painel da janela. Quando as condições do gatilho são cumpridas, ele dispara um painel. No Apache Beam, essas condições incluem o progresso da marca d'água, o tempo de processamento (que avança de maneira uniforme sem importar quantos dados foram recebidos), contagens de elementos (quando um determinado número de dados novos chega) e gatilhos dependentes de dados (por exemplo, para indicar que o fim de um arquivo foi atingido).

Dependendo das condições definidas, um gatilho pode disparar um painel várias vezes. Portanto, também é necessário especificar como os resultados serão acumulados. Atualmente, o Apache Beam oferece suporte a dois modos de acumulação. O primeiro acumula resultados, e o outro retorna apenas as partes do resultado que são novas desde que o último painel foi disparado.

Tarefa 3: defina um gatilho

Quando você define uma função de janelamento para uma PCollection usando a transformação Window, também pode especificar um gatilho.

Para definir os gatilhos de uma PCollection, chame o método .triggering() no resultado da transformação Window.into(). Em seguida Window.triggering() aceita um gatilho como o próprio argumento. O Apache Beam vem com alguns gatilhos:

  • AfterWatermark: dispara quando a marca-d'água transmite um carimbo de data/hora determinado pelo final da janela ou pela chegada do primeiro elemento em um painel.
  • AfterProcessingTime: dispara depois de um tempo de processamento específico, normalmente depois do primeiro elemento em um painel.
  • AfterPane: dispara uma propriedade dos elementos no painel atual, como o número de elementos atribuídos ao painel atual.

Este exemplo de código define um gatilho baseado em tempo para uma PCollection que emite resultados um minuto depois do primeiro elemento da janela ser processado. A última linha no exemplo de código, .discardingFiredPanes(), define o modo de acumulação da janela:

PCollection<String> pc = ...; pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)) .discardingFiredPanes());
  • Conclua esta tarefa adicionando uma chamada para Window.triggering() na transformação de janelamento e transmitindo um Trigger válido. Quando criar um gatilho, considere que os dados podem ser colocados em janelas de um minuto e talvez cheguem atrasados.

Se quiser ver um exemplo de gatilho, consulte a solução aqui.

Parte 2 do laboratório: como lidar com dados mal formatados

Dependendo de como você configurou Trigger, se executar o pipeline agora vai observar que os resultados chegam antes do que os do pipeline do laboratório anterior. Também é possível que os resultados agora sejam mais precisos, caso a heurística tenha feito uma previsão falha do comportamento de streaming, e o atraso permitido seja uma estratégia melhor.

Embora o pipeline atual esteja mais preparado para os atrasos, ele ainda precisa lidar com dados mal formatados. Se você executasse o pipeline e publicasse uma mensagem só com uma string JSON correta e que pudesse ser analisada por CommonLog, o pipeline retornaria um erro. Ferramentas como o Cloud Logging facilitam a leitura desses erros, mas um pipeline bem projetado armazenaria os erros em um local predefinido para análise posterior.

Nesta seção, você vai adicionar componentes que deixam o pipeline mais modular e preparado para lidar com problemas.

Tarefa 1: colete dados mal formatados

Para lidar melhor com dados mal formatados, o pipeline precisa ter uma forma de filtrar e encaminhar esses dados para outro tipo de processamento. Você já viu como adicionar uma ramificação a um pipeline tornando uma determinada PCollection a entrada de várias transformações.

Essa forma de ramificação é muito eficiente, mas não funciona em alguns casos. Por exemplo, se você quiser criar dois subconjuntos com a mesma PCollection. Usando o método de várias transformações, você criaria uma transformação de filtro para cada subconjunto e as aplicaria à PCollection original. O problema é que os elementos seriam processados duas vezes.

Outra forma de criar um pipeline com ramificações é configurar uma única transformação para gerar várias saídas processando a PCollection de entrada uma só vez. Nesta tarefa, você vai criar uma transformação que gera duas saídas: uma com o resultado do processamento dos dados formatados corretamente e outra com os elementos mal formatados do stream da entrada original.

O Apache Beam usa a classe PCollectionTuple para gerar mais de um resultado criando só uma PCollection. A PCollectionTuple é uma tupla imutável que contém tipos de PCollection, cada um deles representado por uma TupleTag.

Confira abaixo um exemplo de PCollectionTuple sendo instanciada com dois tipos de PCollections. Para recuperar essas duas PCollections, usamos o método PCollectionTuple.get():

PCollection<String> pc1 = ...; PCollection<Integer> pc2 = ...; TupleTag<String> tag1 = new TupleTag<>(); TupleTag<Integer> tag2 = new TupleTag<>(); PCollectionTuple pcs = PCollectionTuple.of(tag1, pc1) .and(tag2, pc2); PCollection<Integer> pcX = pcs.get(tag1); PCollection<String> pcY = pcs.get(tag2);

Se quiser usar esse método com uma PTransform, você pode escrever um código, como o exemplo a seguir, que atribui uma TupleTag a um elemento com base no conteúdo dele:

final TupleTag<String> aTag = new TupleTag<String>(){}; final TupleTag<String> bTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = input.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // Emit to main output, which is the output with tag aTag. c.output(c.element()); } else if(c.element().startsWith("B")) { // Emit to output with tag bTag. c.output(bTag, c.element()); } } }) // Specify main output. In this example, it is the output // with tag startsWithATag. .withOutputTags(aTag, // Specify the output with tag bTag, as a TupleTagList. TupleTagList.of(bTag))); // Get subset of the output with tag bTag. mixedCollection.get(aTag).apply(...); // Get subset of the output with tag startsWithBTag. mixedCollection.get(bTag).apply(...);
  • Para concluir essa tarefa, declare duas constantes de TupleTag na parte de cima da classe e altere a transformação JsonToCommonLog para retornar uma PCollectionTuple e marcar os elementos não analisados com uma tag e os elementos analisados com a outra. Em vez de um bloco if/then/else, use uma instrução try/catch.

Tarefa 2: deixe o código mais modular com uma transformação composta

As transformações podem ter uma estrutura aninhada, em que uma transformação complexa executa várias transformações mais simples. Por exemplo, mais de uma ParDo, Combine, AdRequest ou até mesmo outras transformações compostas. Essas são as transformações compostas. Aninhar várias transformações em uma única transformação composta pode deixar o código mais modular e fácil de entender.

  1. Para ter sua própria transformação composta, crie uma subclasse da classe PTransform e modifique o método de expansão para especificar a lógica real de processamento. Para os parâmetros da classe PTransform, transmita os tipos de PCollection que a transformação usa como entrada e gera como saída.

O exemplo de código a seguir mostra como declarar uma PTransform que aceita uma PCollection de strings como entrada e retorna uma PCollection de números inteiros como saída:

#TODO: JsonToRow

static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
  1. Na subclasse PTransform que você criou, será necessário substituir o método de expansão, onde a lógica de processamento da PTransform é adicionada. A substituição do método de expansão precisa aceitar o tipo de PCollection de entrada como um parâmetro e especificar a PCollection de saída como o valor a ser retornado.
static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // transform logic goes here ... } }
  1. Para invocar sua transformação, use PCollection.apply() na PCollection e transmita uma instância da transformação composta:
PCollection<Integer> i = stringPColl.apply("CompositeTransform", new MyCompositeTransform());
  1. Para concluir esta tarefa, converta em uma transformação composta a transformação JsonToCommonLog que você acabou de modificar. Veja que isso vai causar um problema com a transformação de gravação atual, que espera instâncias de CommonLog. Salve os resultados da transformação composta em uma nova PCollectionTuple e use .get() para recuperar a PCollection que a transformação de gravação espera.

Tarefa 3: grave os dados com erro de formatação para análise posterior

Para corrigir o problema de upstream que está produzindo dados malformados, é importante saber analisar esses dados. Para isso, é necessário armazenar os dados em algum lugar. Nesta tarefa, você vai gravar dados com erros de formatação no Google Cloud Storage. Chamamos esse padrão usando o armazenamento de mensagens inativas.

Nos laboratórios anteriores, você gravou os dados de uma origem limitada (em lote) no Cloud Storage usando TextIO.write(). Para gravar dados de uma fonte ilimitada (streaming), essa abordagem precisa ser ajustada.

No upstream da transformação de gravação, use um Trigger para especificar quando a gravação deve ser feita durante o tempo de processamento. Se você mantiver os padrões, a gravação nunca será feita. Por padrão, todos os eventos pertencem à janela global. No processamento em lote, isso não é um problema porque o conjunto completo dos dados é conhecido no momento da execução. No entanto, com fontes ilimitadas, o tamanho total do conjunto de dados é desconhecido. Sendo assim, os painéis da janela global não são disparados porque jamais são concluídos.

Como você está usando um gatilho (Trigger), também precisa de uma janela (Window). No entanto, não é necessário alterar a janela. Nos laboratórios e tarefas anteriores, você usou transformações de janelamento para substituir a janela global por uma janela de duração fixa na hora do evento. Nesse caso, é mais importante receber os resultados de maneira útil e a uma taxa eficiente do que saber quais elementos foram agrupados.

No exemplo abaixo, a janela dispara o painel da janela global a cada 10 segundos durante o processamento, mas só grava eventos novos:

pCollection.apply("FireEvery10s", Window.<String>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes())

Depois de configurar um Trigger altere a chamada para TextIO.write() que executa as gravações. Ao gravar o downstream de uma transformação de janelamento, encadeie uma chamada para withWindowedWrites() e especifique vários fragmentos para fazer a gravação em paralelo:

fixedWindowedItems.apply( "WriteWindowedPCollection", TextIO .write() .to("gs://path/to/somewhere") .withWindowedWrites() .withNumShards(NUM_SHARDS));
  • Para concluir esta tarefa, crie uma nova transformação usando .get() na PCollectionTuple para recuperar os dados com erros de formatação. Aplique tudo o que você sabe sobre gatilhos e tenha bom senso ao definir as condições de disparo para este gatilho.

Tarefa 4: executar o pipeline

  1. Para executar o pipeline, crie um comando semelhante ao exemplo abaixo. Faça alterações para refletir os nomes das opções de linha de comando que você adicionou.
export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --allowedLateness=${ALLOWED_LATENESS} \ --outputTableName=${OUTPUT_TABLE_NAME} \ --deadletterBucket=${DEADLETTER_BUCKET}"

O código desta Quest inclui um script para publicar eventos JSON usando o Pub/Sub.

  1. Para concluir a tarefa e começar a publicar mensagens, abra um novo terminal ao lado do atual e execute o script a seguir. Ele só vai parar de publicar mensagens quando for encerrado. Confirme que a pasta é a training-data-analyst/quests/dataflow.
Observação: a sinalização true adiciona eventos atrasados ao stream. bash generate_streaming_events.sh true

Clique em Verificar meu progresso para conferir o objetivo. Execute o pipeline

Tarefa 5. Teste o pipeline

  1. Navegue até Pub/Sub > Tópicos e selecione my_topic.
  2. Clique na guia Mensagens e depois no botão Publicar mensagem.
  3. Na página a seguir, digite uma mensagem a ser publicada.

A mensagem deve chegar logo ao bucket do Cloud Storage, desde que não esteja em conformidade perfeita com a especificação JSON de CommonLog. Para acompanhar o trajeto da mensagem, volte à janela de monitoramento do pipeline e clique em um nó da ramificação que processa mensagens não analisadas.

  1. Ao ver um elemento adicionado a essa ramificação, acesse o Cloud Storage e verifique se a mensagem foi gravada em disco:
export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export BUCKET=gs://${PROJECT_ID}/deadletter gsutil ls $BUCKET gsutil cat $BUCKET/*/*

Clique em Verificar meu progresso para conferir o objetivo. Teste o pipeline

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.