
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
Neste laboratório, você vai fazer o seguinte:
Ao final laboratório anterior, vimos um desafio comum para os pipelines em tempo real, que é a lacuna entre a ocorrência e o processamento dos eventos, também conhecida como atraso. Este laboratório apresenta conceitos do Apache Beam que os designers de pipelines podem usar para definir formalmente como lidar com esse atraso.
Além do atraso, existem outros problemas que podem afetar os pipelines em um contexto de streaming, como erros de formatação em entradas de fora do sistema. Também vamos ver técnicas para processar essas entradas.
Quando este laboratório terminar, o pipeline final vai ficar parecido com o da imagem abaixo. Observe que ele tem uma ramificação.
Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.
Faça login no Qwiklabs em uma janela anônima.
Confira o tempo de acesso do laboratório (por exemplo, 1:15:00
) e finalize todas as atividades nesse prazo.
Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.
Quando tudo estiver pronto, clique em Começar o laboratório.
Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.
Clique em Abrir Console do Google.
Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.
Aceite os termos e pule a página de recursos de recuperação.
Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).
No console do Google Cloud, em Menu de navegação (), selecione IAM e administrador > IAM.
Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com
está na lista e recebeu o papel de editor
. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.
editor
, siga as etapas abaixo.729328892908
.{project-number}
pelo número do seu projeto.Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud
, similar ao Cloud Shell.
Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.
O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs
com códigos que você vai concluir e uma pasta solution
com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer
para conferir:
Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:
Outra opção de visualização é executar gcloud auth list
no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:
Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:
Nos laboratórios anteriores, você escreveu um código que dividia os elementos pela hora do evento em janelas de largura fixa. O código era parecido com este:
No entanto, como você viu ao final do último laboratório non-SQL, o atraso é comum em streams de dados. Isso pode ser um problema quando as janelas são criadas com base na hora do evento, e não no tempo de processamento dele, porque não é possível afirmar que os eventos chegaram naquele momento específico.
Para gerar resultados, seu pipeline precisou tomar uma decisão e fez isso usando um conceito chamado marca-d'água. Uma marca d'água é a noção do sistema baseada em heurística de quando todos os dados, até um certo ponto no horário do evento, podem ter chegado ao pipeline. Quando a marca d'água passar do fim de uma janela, qualquer outro elemento que chegar com um carimbo de data/hora dessa janela é considerado um dado atrasado, sendo simplesmente descartado. Portanto, o comportamento padrão do janelamento é emitir um único resultado completo quando o sistema tiver certeza de que recebeu todos os dados.
O Apache Beam usa várias heurísticas para tomar uma decisão embasada sobre o que é a marca d'água. De toda forma, ainda são heurísticas. Em outras palavras, essas heurísticas são de uso geral e podem não ser relevantes para alguns casos. Em vez de empregar heurísticas de uso geral, os designers de pipelines precisam considerar as seguintes perguntas para determinar quais compensações são apropriadas:
Com base nas respostas, é possível usar os formalismos do Apache Beam para escrever um código que faça a compensação certa.
O atraso permitido define por quanto tempo uma janela deve manter um estado. Quando a marca d'água atinge o fim do período de atraso permitido, o estado é descartado. Seria ótimo poder manter o estado de uma janela indefinidamente, mas não é prático fazer isso com uma fonte de dados ilimitada porque o espaço em disco um dia acaba.
Logo, qualquer sistema de processamento real e fora de ordem precisa oferecer uma forma de vincular os ciclos de vida das janelas que está processando. Uma forma simples e rápida de fazer isso é definir um limite para o atraso permitido dentro do sistema. Por exemplo, indicar para o sistema até quando processar registros em relação à marca d'água. Os dados que chegarem depois desse limite estabelecido serão descartados. Ao definir o limite de atraso de dados individuais, você também estabelece com precisão por quanto tempo o estado das janelas precisa ser mantido: até que a marca d'água exceda o horizonte de atraso para o fim da janela.
Assim como nos laboratórios anteriores, a primeira coisa a se fazer é gerar os dados que o pipeline vai processar. Abra o ambiente do laboratório e gere os dados como antes.
Clique em Verificar meu progresso para conferir o objetivo.
No Apache Beam, o atraso permitido é definido pelo método withAllowedLateness(), como no exemplo abaixo:
.withAllowedLateness()
, transmitindo um valor válido para Duration
, determinado com o parâmetro de linha de comando apropriado. Escolha um valor razoável e atualize a linha de comando para refletir as unidades certas.Designers de pipeline também podem escolher quando gerar resultados preliminares. Por exemplo, se a marca d'água do final de uma janela não tiver sido alcançada, mas 75% dos dados esperados já tiverem chegado. Em muitos casos, essa porcentagem seria representativa o bastante para mostrar os resultados aos usuários finais.
Um Trigger
(gatilho) determina em que ponto do tempo de processamento os resultados são mostrados. Cada resposta gerada para uma janela específica é chamada de painel da janela. Quando as condições do gatilho são cumpridas, ele dispara um painel. No Apache Beam, essas condições incluem o progresso da marca d'água, o tempo de processamento (que avança de maneira uniforme sem importar quantos dados foram recebidos), contagens de elementos (quando um determinado número de dados novos chega) e gatilhos dependentes de dados (por exemplo, para indicar que o fim de um arquivo foi atingido).
Dependendo das condições definidas, um gatilho pode disparar um painel várias vezes. Portanto, também é necessário especificar como os resultados serão acumulados. Atualmente, o Apache Beam oferece suporte a dois modos de acumulação. O primeiro acumula resultados, e o outro retorna apenas as partes do resultado que são novas desde que o último painel foi disparado.
Quando você define uma função de janelamento para uma PCollection
usando a transformação Window
, também pode especificar um gatilho.
Para definir os gatilhos de uma PCollection
, chame o método .triggering()
no resultado da transformação Window.into()
. Em seguida Window.triggering() aceita um gatilho como o próprio argumento. O Apache Beam vem com alguns gatilhos:
Este exemplo de código define um gatilho baseado em tempo para uma PCollection
que emite resultados um minuto depois do primeiro elemento da janela ser processado. A última linha no exemplo de código, .discardingFiredPanes()
, define o modo de acumulação da janela:
Window.triggering()
na transformação de janelamento e transmitindo um Trigger
válido. Quando criar um gatilho, considere que os dados podem ser colocados em janelas de um minuto e talvez cheguem atrasados.Se quiser ver um exemplo de gatilho, consulte a solução aqui.
Dependendo de como você configurou Trigger
, se executar o pipeline agora vai observar que os resultados chegam antes do que os do pipeline do laboratório anterior. Também é possível que os resultados agora sejam mais precisos, caso a heurística tenha feito uma previsão falha do comportamento de streaming, e o atraso permitido seja uma estratégia melhor.
Embora o pipeline atual esteja mais preparado para os atrasos, ele ainda precisa lidar com dados mal formatados. Se você executasse o pipeline e publicasse uma mensagem só com uma string JSON correta e que pudesse ser analisada por CommonLog
, o pipeline retornaria um erro. Ferramentas como o Cloud Logging facilitam a leitura desses erros, mas um pipeline bem projetado armazenaria os erros em um local predefinido para análise posterior.
Nesta seção, você vai adicionar componentes que deixam o pipeline mais modular e preparado para lidar com problemas.
Para lidar melhor com dados mal formatados, o pipeline precisa ter uma forma de filtrar e encaminhar esses dados para outro tipo de processamento. Você já viu como adicionar uma ramificação a um pipeline tornando uma determinada PCollection
a entrada de várias transformações.
Essa forma de ramificação é muito eficiente, mas não funciona em alguns casos. Por exemplo, se você quiser criar dois subconjuntos com a mesma PCollection
. Usando o método de várias transformações, você criaria uma transformação de filtro para cada subconjunto e as aplicaria à PCollection
original. O problema é que os elementos seriam processados duas vezes.
Outra forma de criar um pipeline com ramificações é configurar uma única transformação para gerar várias saídas processando a PCollection
de entrada uma só vez. Nesta tarefa, você vai criar uma transformação que gera duas saídas: uma com o resultado do processamento dos dados formatados corretamente e outra com os elementos mal formatados do stream da entrada original.
O Apache Beam usa a classe PCollectionTuple
para gerar mais de um resultado criando só uma PCollection
. A PCollectionTuple é uma tupla imutável que contém tipos de PCollection
, cada um deles representado por uma TupleTag
.
Confira abaixo um exemplo de PCollectionTuple
sendo instanciada com dois tipos de PCollections. Para recuperar essas duas PCollections
, usamos o método PCollectionTuple.get()
:
Se quiser usar esse método com uma PTransform
, você pode escrever um código, como o exemplo a seguir, que atribui uma TupleTag
a um elemento com base no conteúdo dele:
TupleTag
na parte de cima da classe e altere a transformação JsonToCommonLog
para retornar uma PCollectionTuple
e marcar os elementos não analisados com uma tag e os elementos analisados com a outra. Em vez de um bloco if/then/else
, use uma instrução try/catch
.As transformações podem ter uma estrutura aninhada, em que uma transformação complexa executa várias transformações mais simples. Por exemplo, mais de uma ParDo
, Combine
, AdRequest
ou até mesmo outras transformações compostas. Essas são as transformações compostas. Aninhar várias transformações em uma única transformação composta pode deixar o código mais modular e fácil de entender.
PTransform
e modifique o método de expansão para especificar a lógica real de processamento. Para os parâmetros da classe PTransform
, transmita os tipos de PCollection
que a transformação usa como entrada e gera como saída.O exemplo de código a seguir mostra como declarar uma PTransform
que aceita uma PCollection
de strings como entrada e retorna uma PCollection
de números inteiros como saída:
#TODO: JsonToRow
PTransform
que você criou, será necessário substituir o método de expansão, onde a lógica de processamento da PTransform
é adicionada. A substituição do método de expansão precisa aceitar o tipo de PCollection
de entrada como um parâmetro e especificar a PCollection
de saída como o valor a ser retornado.PCollection.apply()
na PCollection
e transmita uma instância da transformação composta:JsonToCommonLog
que você acabou de modificar. Veja que isso vai causar um problema com a transformação de gravação atual, que espera instâncias de CommonLog
. Salve os resultados da transformação composta em uma nova PCollectionTuple
e use .get()
para recuperar a PCollection
que a transformação de gravação espera.Para corrigir o problema de upstream que está produzindo dados malformados, é importante saber analisar esses dados. Para isso, é necessário armazenar os dados em algum lugar. Nesta tarefa, você vai gravar dados com erros de formatação no Google Cloud Storage. Chamamos esse padrão usando o armazenamento de mensagens inativas.
Nos laboratórios anteriores, você gravou os dados de uma origem limitada (em lote) no Cloud Storage usando TextIO.write()
. Para gravar dados de uma fonte ilimitada (streaming), essa abordagem precisa ser ajustada.
No upstream da transformação de gravação, use um Trigger
para especificar quando a gravação deve ser feita durante o tempo de processamento. Se você mantiver os padrões, a gravação nunca será feita. Por padrão, todos os eventos pertencem à janela global. No processamento em lote, isso não é um problema porque o conjunto completo dos dados é conhecido no momento da execução. No entanto, com fontes ilimitadas, o tamanho total do conjunto de dados é desconhecido. Sendo assim, os painéis da janela global não são disparados porque jamais são concluídos.
Como você está usando um gatilho (Trigger
), também precisa de uma janela (Window
). No entanto, não é necessário alterar a janela. Nos laboratórios e tarefas anteriores, você usou transformações de janelamento para substituir a janela global por uma janela de duração fixa na hora do evento. Nesse caso, é mais importante receber os resultados de maneira útil e a uma taxa eficiente do que saber quais elementos foram agrupados.
No exemplo abaixo, a janela dispara o painel da janela global a cada 10 segundos durante o processamento, mas só grava eventos novos:
Depois de configurar um Trigger
altere a chamada para TextIO.write()
que executa as gravações. Ao gravar o downstream de uma transformação de janelamento, encadeie uma chamada para withWindowedWrites()
e especifique vários fragmentos para fazer a gravação em paralelo:
.get()
na PCollectionTuple
para recuperar os dados com erros de formatação. Aplique tudo o que você sabe sobre gatilhos e tenha bom senso ao definir as condições de disparo para este gatilho.O código desta Quest inclui um script para publicar eventos JSON usando o Pub/Sub.
training-data-analyst/quests/dataflow
.true
adiciona eventos atrasados ao stream.Clique em Verificar meu progresso para conferir o objetivo.
my_topic
.A mensagem deve chegar logo ao bucket do Cloud Storage, desde que não esteja em conformidade perfeita com a especificação JSON de CommonLog
. Para acompanhar o trajeto da mensagem, volte à janela de monitoramento do pipeline e clique em um nó da ramificação que processa mensagens não analisadas.
Clique em Verificar meu progresso para conferir o objetivo.
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.
Este conteúdo não está disponível no momento
Você vai receber uma notificação por e-mail quando ele estiver disponível
Ótimo!
Vamos entrar em contato por e-mail se ele ficar disponível
One lab at a time
Confirm to end all existing labs and start this one