Checkpoints
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
Processamento de dados sem servidor com o Dataflow: pipeline de análise avançada de streaming com o Cloud Dataflow (Java)
- Visão geral
- Configuração e requisitos
- Parte 1 do laboratório: como lidar com dados atrasados
- Tarefa 1: prepare o ambiente
- Tarefa 2: defina o atraso permitido
- Tarefa 3: defina um gatilho
- Parte 2 do laboratório: como lidar com dados mal formatados
- Tarefa 1: colete dados mal formatados
- Tarefa 2: deixe o código mais modular com uma transformação composta
- Tarefa 3: grave os dados com erro de formatação para análise posterior
- Tarefa 4: executar o pipeline
- Tarefa 5. Teste o pipeline
- Finalize o laboratório
Visão geral
Neste laboratório, você vai fazer o seguinte:
- Lidar com dados atrasados
- Corrigir dados mal formatados:
- escrevendo uma transformação composta para criar um código mais modular;
- escrevendo uma transformação que gere respostas de vários tipos;
- coletar dados com erro de formatação e gravá-los em um local onde poderão ser examinados.
Ao final laboratório anterior, vimos um desafio comum para os pipelines em tempo real, que é a lacuna entre a ocorrência e o processamento dos eventos, também conhecida como atraso. Este laboratório apresenta conceitos do Apache Beam que os designers de pipelines podem usar para definir formalmente como lidar com esse atraso.
Além do atraso, existem outros problemas que podem afetar os pipelines em um contexto de streaming, como erros de formatação em entradas de fora do sistema. Também vamos ver técnicas para processar essas entradas.
Quando este laboratório terminar, o pipeline final vai ficar parecido com o da imagem abaixo. Observe que ele tem uma ramificação.
Configuração e requisitos
Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.
-
Faça login no Qwiklabs em uma janela anônima.
-
Confira o tempo de acesso do laboratório (por exemplo,
1:15:00
) e finalize todas as atividades nesse prazo.
Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas. -
Quando tudo estiver pronto, clique em Começar o laboratório.
-
Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.
-
Clique em Abrir Console do Google.
-
Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
Se você usar outras credenciais, vai receber mensagens de erro ou cobranças. -
Aceite os termos e pule a página de recursos de recuperação.
Verifique as permissões do projeto
Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).
-
No console do Google Cloud, em Menu de navegação (), selecione IAM e administrador > IAM.
-
Confira se a conta de serviço padrão do Compute
{project-number}-compute@developer.gserviceaccount.com
está na lista e recebeu o papel deeditor
. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.
editor
, siga as etapas abaixo.- No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
- Copie o número do projeto, por exemplo,
729328892908
. - Em Menu de navegação, clique em IAM e administrador > IAM.
- Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
- Em Novos principais, digite:
- Substitua
{project-number}
pelo número do seu projeto. - Em Papel, selecione Projeto (ou Básico) > Editor.
- Clique em Save.
Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud
, similar ao Cloud Shell.
Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.
O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs
com códigos que você vai concluir e uma pasta solution
com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer
para conferir:
Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:
Outra opção de visualização é executar gcloud auth list
no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:
Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:
Parte 1 do laboratório: como lidar com dados atrasados
Nos laboratórios anteriores, você escreveu um código que dividia os elementos pela hora do evento em janelas de largura fixa. O código era parecido com este:
No entanto, como você viu ao final do último laboratório non-SQL, o atraso é comum em streams de dados. Isso pode ser um problema quando as janelas são criadas com base na hora do evento, e não no tempo de processamento dele, porque não é possível afirmar que os eventos chegaram naquele momento específico.
Para gerar resultados, seu pipeline precisou tomar uma decisão e fez isso usando um conceito chamado marca-d'água. Uma marca d'água é a noção do sistema baseada em heurística de quando todos os dados, até um certo ponto no horário do evento, podem ter chegado ao pipeline. Quando a marca d'água passar do fim de uma janela, qualquer outro elemento que chegar com um carimbo de data/hora dessa janela é considerado um dado atrasado, sendo simplesmente descartado. Portanto, o comportamento padrão do janelamento é emitir um único resultado completo quando o sistema tiver certeza de que recebeu todos os dados.
O Apache Beam usa várias heurísticas para tomar uma decisão embasada sobre o que é a marca d'água. De toda forma, ainda são heurísticas. Em outras palavras, essas heurísticas são de uso geral e podem não ser relevantes para alguns casos. Em vez de empregar heurísticas de uso geral, os designers de pipelines precisam considerar as seguintes perguntas para determinar quais compensações são apropriadas:
- Totalidade: qual é a importância de ter todos os dados antes de calcular o resultado?
- Latência: quanto tempo você quer esperar pelos dados? Por exemplo, você vai esperar por todos eles ou prefere processar os dados assim que chegarem?
- Custo: quanto você se dispõe a gastar em dinheiro e capacidade de computação para reduzir a latência?
Com base nas respostas, é possível usar os formalismos do Apache Beam para escrever um código que faça a compensação certa.
Atraso permitido
O atraso permitido define por quanto tempo uma janela deve manter um estado. Quando a marca d'água atinge o fim do período de atraso permitido, o estado é descartado. Seria ótimo poder manter o estado de uma janela indefinidamente, mas não é prático fazer isso com uma fonte de dados ilimitada porque o espaço em disco um dia acaba.
Logo, qualquer sistema de processamento real e fora de ordem precisa oferecer uma forma de vincular os ciclos de vida das janelas que está processando. Uma forma simples e rápida de fazer isso é definir um limite para o atraso permitido dentro do sistema. Por exemplo, indicar para o sistema até quando processar registros em relação à marca d'água. Os dados que chegarem depois desse limite estabelecido serão descartados. Ao definir o limite de atraso de dados individuais, você também estabelece com precisão por quanto tempo o estado das janelas precisa ser mantido: até que a marca d'água exceda o horizonte de atraso para o fim da janela.
Tarefa 1: prepare o ambiente
Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes.
Abra o laboratório apropriado
- Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso. Depois copie e cole este comando:
- Configure o ambiente de dados:
Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 2: defina o atraso permitido
No Apache Beam, o atraso permitido é definido pelo método withAllowedLateness(), como no exemplo abaixo:
- Para concluir essa tarefa, analise a transformação do janelamento e adicione uma chamada para
.withAllowedLateness()
, transmitindo um valor válido paraDuration
, determinado com o parâmetro de linha de comando apropriado. Escolha um valor razoável e atualize a linha de comando para refletir as unidades certas.
Gatilhos
Designers de pipeline também podem escolher quando gerar resultados preliminares. Por exemplo, se a marca d'água do final de uma janela não tiver sido alcançada, mas 75% dos dados esperados já tiverem chegado. Em muitos casos, essa porcentagem seria representativa o bastante para mostrar os resultados aos usuários finais.
Um Trigger
(gatilho) determina em que ponto do tempo de processamento os resultados são mostrados. Cada resposta gerada para uma janela específica é chamada de painel da janela. Quando as condições do gatilho são cumpridas, ele dispara um painel. No Apache Beam, essas condições incluem o progresso da marca d'água, o tempo de processamento (que avança de maneira uniforme sem importar quantos dados foram recebidos), contagens de elementos (quando um determinado número de dados novos chega) e gatilhos dependentes de dados (por exemplo, para indicar que o fim de um arquivo foi atingido).
Dependendo das condições definidas, um gatilho pode disparar um painel várias vezes. Portanto, também é necessário especificar como os resultados serão acumulados. Atualmente, o Apache Beam oferece suporte a dois modos de acumulação. O primeiro acumula resultados, e o outro retorna apenas as partes do resultado que são novas desde que o último painel foi disparado.
Tarefa 3: defina um gatilho
Quando você define uma função de janelamento para uma PCollection
usando a transformação Window
, também pode especificar um gatilho.
Para definir os gatilhos de uma PCollection
, chame o método .triggering()
no resultado da transformação Window.into()
. Em seguida Window.triggering() aceita um gatilho como o próprio argumento. O Apache Beam vem com alguns gatilhos:
- AfterWatermark: dispara quando a marca-d'água transmite um carimbo de data/hora determinado pelo final da janela ou pela chegada do primeiro elemento em um painel.
- AfterProcessingTime: dispara depois de um tempo de processamento específico, normalmente depois do primeiro elemento em um painel.
- AfterPane: dispara uma propriedade dos elementos no painel atual, como o número de elementos atribuídos ao painel atual.
Este exemplo de código define um gatilho baseado em tempo para uma PCollection
que emite resultados um minuto depois do primeiro elemento da janela ser processado. A última linha no exemplo de código, .discardingFiredPanes()
, define o modo de acumulação da janela:
- Conclua esta tarefa adicionando uma chamada para
Window.triggering()
na transformação de janelamento e transmitindo umTrigger
válido. Quando criar um gatilho, considere que os dados podem ser colocados em janelas de um minuto e talvez cheguem atrasados.
Se quiser ver um exemplo de gatilho, consulte a solução aqui.
Parte 2 do laboratório: como lidar com dados mal formatados
Dependendo de como você configurou Trigger
, se executar o pipeline agora vai observar que os resultados chegam antes do que os do pipeline do laboratório anterior. Também é possível que os resultados agora sejam mais precisos, caso a heurística tenha feito uma previsão falha do comportamento de streaming, e o atraso permitido seja uma estratégia melhor.
Embora o pipeline atual esteja mais preparado para os atrasos, ele ainda precisa lidar com dados mal formatados. Se você executasse o pipeline e publicasse uma mensagem só com uma string JSON correta e que pudesse ser analisada por CommonLog
, o pipeline retornaria um erro. Ferramentas como o Cloud Logging facilitam a leitura desses erros, mas um pipeline bem projetado armazenaria os erros em um local predefinido para análise posterior.
Nesta seção, você vai adicionar componentes que deixam o pipeline mais modular e preparado para lidar com problemas.
Tarefa 1: colete dados mal formatados
Para lidar melhor com dados mal formatados, o pipeline precisa ter uma forma de filtrar e encaminhar esses dados para outro tipo de processamento. Você já viu como adicionar uma ramificação a um pipeline tornando uma determinada PCollection
a entrada de várias transformações.
Essa forma de ramificação é muito eficiente, mas não funciona em alguns casos. Por exemplo, se você quiser criar dois subconjuntos com a mesma PCollection
. Usando o método de várias transformações, você criaria uma transformação de filtro para cada subconjunto e as aplicaria à PCollection
original. O problema é que os elementos seriam processados duas vezes.
Outra forma de criar um pipeline com ramificações é configurar uma única transformação para gerar várias saídas processando a PCollection
de entrada uma só vez. Nesta tarefa, você vai criar uma transformação que gera duas saídas: uma com o resultado do processamento dos dados formatados corretamente e outra com os elementos mal formatados do stream da entrada original.
O Apache Beam usa a classe PCollectionTuple
para gerar mais de um resultado criando só uma PCollection
. A PCollectionTuple é uma tupla imutável que contém tipos de PCollection
, cada um deles representado por uma TupleTag
.
Confira abaixo um exemplo de PCollectionTuple
sendo instanciada com dois tipos de PCollections. Para recuperar essas duas PCollections
, usamos o método PCollectionTuple.get()
:
Se quiser usar esse método com uma PTransform
, você pode escrever um código, como o exemplo a seguir, que atribui uma TupleTag
a um elemento com base no conteúdo dele:
- Para concluir essa tarefa, declare duas constantes de
TupleTag
na parte de cima da classe e altere a transformaçãoJsonToCommonLog
para retornar umaPCollectionTuple
e marcar os elementos não analisados com uma tag e os elementos analisados com a outra. Em vez de um blocoif/then/else
, use uma instruçãotry/catch
.
Tarefa 2: deixe o código mais modular com uma transformação composta
As transformações podem ter uma estrutura aninhada, em que uma transformação complexa executa várias transformações mais simples. Por exemplo, mais de uma ParDo
, Combine
, AdRequest
ou até mesmo outras transformações compostas. Essas são as transformações compostas. Aninhar várias transformações em uma única transformação composta pode deixar o código mais modular e fácil de entender.
- Para ter sua própria transformação composta, crie uma subclasse da classe
PTransform
e modifique o método de expansão para especificar a lógica real de processamento. Para os parâmetros da classePTransform
, transmita os tipos dePCollection
que a transformação usa como entrada e gera como saída.
O exemplo de código a seguir mostra como declarar uma PTransform
que aceita uma PCollection
de strings como entrada e retorna uma PCollection
de números inteiros como saída:
#TODO: JsonToRow
- Na subclasse
PTransform
que você criou, será necessário substituir o método de expansão, onde a lógica de processamento daPTransform
é adicionada. A substituição do método de expansão precisa aceitar o tipo dePCollection
de entrada como um parâmetro e especificar aPCollection
de saída como o valor a ser retornado.
- Para invocar sua transformação, use
PCollection.apply()
naPCollection
e transmita uma instância da transformação composta:
- Para concluir esta tarefa, converta em uma transformação composta a transformação
JsonToCommonLog
que você acabou de modificar. Veja que isso vai causar um problema com a transformação de gravação atual, que espera instâncias deCommonLog
. Salve os resultados da transformação composta em uma novaPCollectionTuple
e use.get()
para recuperar aPCollection
que a transformação de gravação espera.
Tarefa 3: grave os dados com erro de formatação para análise posterior
Para corrigir o problema de upstream que está produzindo dados malformados, é importante saber analisar esses dados. Para isso, é necessário armazenar os dados em algum lugar. Nesta tarefa, você vai gravar dados com erros de formatação no Google Cloud Storage. Chamamos esse padrão usando o armazenamento de mensagens inativas.
Nos laboratórios anteriores, você gravou os dados de uma origem limitada (em lote) no Cloud Storage usando TextIO.write()
. Para gravar dados de uma fonte ilimitada (streaming), essa abordagem precisa ser ajustada.
No upstream da transformação de gravação, use um Trigger
para especificar quando a gravação deve ser feita durante o tempo de processamento. Se você mantiver os padrões, a gravação nunca será feita. Por padrão, todos os eventos pertencem à janela global. No processamento em lote, isso não é um problema porque o conjunto completo dos dados é conhecido no momento da execução. No entanto, com fontes ilimitadas, o tamanho total do conjunto de dados é desconhecido. Sendo assim, os painéis da janela global não são disparados porque jamais são concluídos.
Como você está usando um gatilho (Trigger
), também precisa de uma janela (Window
). No entanto, não é necessário alterar a janela. Nos laboratórios e tarefas anteriores, você usou transformações de janelamento para substituir a janela global por uma janela de duração fixa na hora do evento. Nesse caso, é mais importante receber os resultados de maneira útil e a uma taxa eficiente do que saber quais elementos foram agrupados.
No exemplo abaixo, a janela dispara o painel da janela global a cada 10 segundos durante o processamento, mas só grava eventos novos:
Depois de configurar um Trigger
altere a chamada para TextIO.write()
que executa as gravações. Ao gravar o downstream de uma transformação de janelamento, encadeie uma chamada para withWindowedWrites()
e especifique vários fragmentos para fazer a gravação em paralelo:
- Para concluir esta tarefa, crie uma nova transformação usando
.get()
naPCollectionTuple
para recuperar os dados com erros de formatação. Aplique tudo o que você sabe sobre gatilhos e tenha bom senso ao definir as condições de disparo para este gatilho.
Tarefa 4: executar o pipeline
- Para executar o pipeline, crie um comando semelhante ao exemplo abaixo. Faça alterações para refletir os nomes das opções de linha de comando que você adicionou.
O código desta Quest inclui um script para publicar eventos JSON usando o Pub/Sub.
- Para concluir a tarefa e começar a publicar mensagens, abra um novo terminal ao lado do atual e execute o script a seguir. Ele só vai parar de publicar mensagens quando for encerrado. Confirme que a pasta é a
training-data-analyst/quests/dataflow
.
true
adiciona eventos atrasados ao stream.Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 5. Teste o pipeline
- Navegue até Pub/Sub > Tópicos e selecione
my_topic
. - Clique na guia Mensagens e depois no botão Publicar mensagem.
- Na página a seguir, digite uma mensagem a ser publicada.
A mensagem deve chegar logo ao bucket do Cloud Storage, desde que não esteja em conformidade perfeita com a especificação JSON de CommonLog
. Para acompanhar o trajeto da mensagem, volte à janela de monitoramento do pipeline e clique em um nó da ramificação que processa mensagens não analisadas.
- Ao ver um elemento adicionado a essa ramificação, acesse o Cloud Storage e verifique se a mensagem foi gravada em disco:
Clique em Verificar meu progresso para conferir o objetivo.
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.