Checkpoints
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
Processamento de dados sem servidor com o Dataflow: como fazer testes usando o Apache Beam (Java)
- Visão geral
- Configuração e requisitos
- Parte 1 do laboratório: como realizar testes de unidade para DoFns e PTransforms
- Tarefa 1: analisar o código do pipeline principal
- Tarefa 2: adicionar dependências para testes
- Tarefa 3: gravar o primeiro teste de unidade de DoFn no Apache Beam
- Tarefa 4: executar o primeiro teste de unidade de DoFn
- Tarefa 5: executar o segundo teste de unidade de DoFn e o pipeline de depuração
- Tarefa 6: executar o teste de unidade de PTransform e o pipeline completo
- Parte 2 do laboratório: como testar a lógica do processamento de stream com o TestStream
- Tarefa 1: analisar o código do pipeline principal
- Tarefa 2: analisar o uso do TestStream e executar o primeiro teste
- Tarefa 3: criar o TestStream para testar o processamento de dados com atraso
- Tarefa 4: executar um teste para processar dados atrasados
- Finalize o laboratório
Visão geral
Neste laboratório, você irá:
- gravar testes de unidade para
DoFns
ePTransforms
usando ferramentas de teste no Apache Beam; - realizar um teste de integração de pipeline;
- usar a classe
TestStream
para testar o comportamento do janelamento de um pipeline de streaming.
Testar o pipeline é uma etapa particularmente importante no desenvolvimento de uma solução eficaz de processamento de dados. A natureza indireta do modelo do Beam pode fazer com que as execuções de depuração com falhas sejam tarefas incomuns.
Neste laboratório, vamos ver como realizar testes de unidade localmente com ferramentas no pacote de testes (link em inglês) do SDK do Beam usando o DirectRunner
.
Configuração e requisitos
Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.
-
Faça login no Qwiklabs em uma janela anônima.
-
Confira o tempo de acesso do laboratório (por exemplo,
1:15:00
) e finalize todas as atividades nesse prazo.
Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas. -
Quando tudo estiver pronto, clique em Começar o laboratório.
-
Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.
-
Clique em Abrir Console do Google.
-
Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
Se você usar outras credenciais, vai receber mensagens de erro ou cobranças. -
Aceite os termos e pule a página de recursos de recuperação.
Verifique as permissões do projeto
Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).
-
No console do Google Cloud, em Menu de navegação (), selecione IAM e administrador > IAM.
-
Confira se a conta de serviço padrão do Compute
{project-number}-compute@developer.gserviceaccount.com
está na lista e recebeu o papel deeditor
. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.
editor
, siga as etapas abaixo.- No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
- Copie o número do projeto, por exemplo,
729328892908
. - Em Menu de navegação, clique em IAM e administrador > IAM.
- Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
- Em Novos principais, digite:
- Substitua
{project-number}
pelo número do seu projeto. - Em Papel, selecione Projeto (ou Básico) > Editor.
- Clique em Save.
Configuração do ambiente de desenvolvimento integrado
Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud
, similar ao Cloud Shell.
Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.
O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs
com códigos que você vai concluir e uma pasta solution
com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer
para conferir:
Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:
Outra opção de visualização é executar gcloud auth list
no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:
Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:
O código do laboratório é dividido entre duas pastas: 8a_Batch_Testing_Pipeline/lab e 8b_Stream_Testing_Pipeline/lab. Caso haja problemas em algum momento, a solução poderá ser encontrada nas pastas de solução correspondentes.
Parte 1 do laboratório: como realizar testes de unidade para DoFns e PTransforms
Nesta etapa do laboratório, vamos realizar testes de unidade em DoFns e PTransforms para um pipeline em lote calculando estatísticas de sensores climáticos. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:
- Crie um
TestPipeline
(link em inglês). - Crie alguns dados de entrada de teste e use a transformação
Create
(link em inglês) para criar umaPCollection
com esses dados. - Aplique a transformação à
PCollection
de entrada e salve aPCollection
resultante. - Use
PAssert
(link em inglês) e suas subclasses para verificar se aPCollection
de saída contém os elementos esperados.
O elemento TestPipeline
é uma classe especial incluída no SDK do Beam especificamente para testar a lógica de pipeline e transformações.
- Ao fazer os testes, use
TestPipeline
em vez dePipeline
ao criar o objeto de pipeline:
A transformação Create
usa uma coleção de objetos na memória (um iterável em Java) e cria uma PCollection
dessa coleção. A meta é ter um pequeno conjunto de dados de entrada de teste das PTransforms
, em que sabemos a PCollection
de saída esperada.
Por fim, queremos verificar se a PCollection de saída corresponde à saída esperada. Usamos a classe PAssert
para verificar isso. Por exemplo, podemos usar o método containsInAnyOrder
para verificar se a PCollection de saída tem os elementos corretos:
Tarefa 1: analisar o código do pipeline principal
- Navegue até 8a_Batch_Testing_Pipeline/lab no seu ambiente de desenvolvimento integrado.
Esse diretório contém um arquivo pom.xml
para definir dependências e a pasta src
, que contém dois subdiretórios. A pasta src/main
contém o código do pacote de pipeline, e a pasta src/test
contém o código de teste.
- Primeiro, abra 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.
Esse arquivo contém a definição da classe WeatherRecord
que será usada no pipeline. A classe WeatherRecord
tem um esquema associado, e é preciso que as etapas para defini-lo usando a anotação @DefaultSchema
já sejam conhecidas. No entanto, precisamos sobrepor o método equals
ao definir a classe.
Por quê? O elemento PAssert
vai usar o método equals
para verificar a associação na PCollection
de saída. No entanto, o método "equals" padrão para um objeto Java simples (POJO, na sigla em inglês) apenas compara os endereços dos objetos. Em vez disso, queremos ter certeza de que estamos comparando o conteúdo dos objetos. Conforme visto acima, isso é simples.
- Agora abra 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.
Esse é o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:
- As
DoFns
ConvertCsvToWeatherRecord
(a partir da linha 65) eConvertTempUnits
(a partir da linha 81). Em breve, vamos realizar um teste de unidade nessasDoFns
. - A
PTransform
ComputeStatistics
(a partir da linha 103). Esse é um exemplo de uma transformação composta que será possível testar da mesma maneira que umaDoFn
. - A
PTransform
WeatherStatsTransform
(a partir da linha 123). EssaPTransform
contém a lógica de processamento de todo o pipeline, exceto as transformações de origem e de coletor, para podermos realizar um pequeno teste de integração de pipeline em dados sintéticos criados por uma transformaçãoCreate
.
Se você encontrar um erro lógico no código de processamento, não o corrija ainda. Mais tarde, vamos abordar como limitar o erro realizando testes.
Tarefa 2: adicionar dependências para testes
- Abra 8a_Batch_Testing_Pipeline/lab/pom.xml.
Precisamos adicionar algumas dependências para realizar testes. Qualquer código em Java do Beam para testes precisa estar vinculado ao JUnit
e ao Hamcrest
. No Maven, só precisamos atualizar o arquivo pom.xml
.
- Para concluir esta tarefa, copie e cole a seguinte XML no arquivo
pom.xml
, indicado em um comentário:
O escopo dessas dependências é "test". Esses pacotes serão necessários ao executar um teste com o mvn test
, mas não ao executar o pipeline principal.
Tarefa 3: gravar o primeiro teste de unidade de DoFn no Apache Beam
- Acesse 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.
Esse arquivo contém o código dos testes de unidade de DoFn
e PTransform
. Na maioria das vezes, o código será comentado, mas vamos remover a marca de comentário à medida que continuarmos.
Vamos começar explorando um teste de unidade de DoFn
para a DoFn de ConvertCsvToWeatherRecord
(a partir da linha 43).
- Primeiro, criamos uma classe para testar o pipeline e criamos um objeto
TestPipeline
:
Vamos usar o objeto TestPipeline
em todos os testes a seguir, embora não precisemos nos preocupar com os efeitos colaterais de reutilizar o mesmo objeto devido à palavra-chave transient
ao criá-lo.
- Agora confira o código (incompleto) do primeiro teste:
Anotamos o método que vamos usar para testar o pipeline com a anotação @Test
. Criamos uma única entrada de teste (testInput
) que representa uma linha de um arquivo CSV (o formato de entrada esperado para nosso pipeline) e a colocamos em uma entrada de objeto da lista (List
object input
).
Faltam algumas partes no restante do código para realizar o teste.
-
Para concluir a tarefa, primeiro adicione a transformação
Create
para converterinput
em umaPCollection
. -
Em segundo lugar, inclua uma instrução
PAssert
usando o métodocontainsInAnyOrder
para comparar os elementosinput
etestOutput
.
Se você não souber o que fazer, consulte os testes comentados posteriormente ou as soluções.
Tarefa 4: executar o primeiro teste de unidade de DoFn
- Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso, e cole o comando a seguir:
Agora já podemos executar o teste.
- Para fazer isso, execute este comando no seu terminal:
Se você concluiu a tarefa anterior corretamente, a mensagem a seguir será exibida no terminal após a conclusão do teste (o tempo exato decorrido será diferente):
Tarefa 5: executar o segundo teste de unidade de DoFn e o pipeline de depuração
- Volte para 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java e remova a marca de comentário do código do segundo teste de unidade (perto das linhas 67 a 80). É possível fazer isso destacando o código e pressionando Ctrl + / (ou Cmd + / no MacOS). O código é exibido abaixo para referência:
Esse teste garante que a DoFn
ConvertTempUnits()
esteja funcionando conforme o esperado.
-
Salve
WeatherStatisticsPipelineTest.java
e volte ao terminal. -
Mais uma vez, execute o comando a seguir para executar os testes:
Desta vez, o teste falhou. Se rolarmos pela saída, vamos encontrar as seguintes informações sobre essa falha:
À primeira vista, talvez essa não seja a mensagem de erro mais útil. No entanto, vemos que não houve correspondência para o WeatherRecord
esperado em testOutput
. Talvez haja algo de errado com a conversão de temperatura.
-
Volte para 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java e role para baixo até a definição de
ConvertTempUnits
(perto da linha 81). -
Para concluir a tarefa, encontre o erro na lógica de processamento da
DoFn
e execute novamente o comandomvn test
para confirmar se o teste foi concluído. Como lembrete, a fórmula para converter graus Celsius em Farenheit é fornecida abaixo:
Se você não souber o que fazer, consulte as soluções.
Tarefa 6: executar o teste de unidade de PTransform e o pipeline completo
- Volte para 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java e remova a marca de comentário do código dos dois testes finais (começando na linha 84, mais ou menos).
O primeiro teste em que removemos a marca de comentário foi o de testar a PTransform
composta ComputeStatistics
. Veja a seguir uma forma truncada do código para referência:
Observe que isso é muito semelhante aos testes de unidade de DoFn
anteriores. A única diferença real, além das entradas e saídas diferentes de testes, é que estamos aplicando PTransform
em vez de ParDo(new DoFn())
.
O teste final é para o pipeline completo. No código de pipeline (WeatherStatisticsPipeline.java), o pipeline completo, exceto a origem e o coletor, foi incluído em uma única PTransform
WeatherStatsTransform
.
- Para testar o pipeline completo, podemos repetir algo semelhante ao que fizemos acima, mas usando aquela
PTransform
:
- Volte ao terminal e execute o comando a seguir para fazer os testes mais uma vez:
Se você concluiu as tarefas anteriores, a mensagem a seguir será exibida no terminal após o término dos testes:
Clique em Verificar meu progresso para ver o objetivo.
Parte 2 do laboratório: como testar a lógica do processamento de stream com o TestStream
Nesta etapa do laboratório, vamos realizar testes de unidade para um pipeline de streaming calculando contagens de janelas de corridas de táxi. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:
- Crie um
TestPipeline
(link em inglês). - Use a classe
TestStream
(link em inglês) para gerar dados de streaming. Isso inclui gerar uma série de eventos, avançar a marca-d'água e melhorar o tempo de processamento. - Use
PAssert
(link em inglês) e as subclasses para verificar se aPCollection
de saída contém os elementos esperados em janelas específicas.
Durante a execução de um pipeline que lê dados de um TestStream
, a leitura aguarda a conclusão de todas as consequências de cada evento antes de passar para o próximo, inclusive quando o tempo de processamento avança e os gatilhos apropriados são acionados. O elemento TestStream
permite que o efeito do acionamento e a lentidão permitida sejam observados e testados em um pipeline. Isso inclui a lógica sobre gatilhos atrasados e dados descartados devido a atrasos.
Tarefa 1: analisar o código do pipeline principal
- Acesse 8b_Stream_Testing_Pipeline/lab no seu ambiente de desenvolvimento integrado.
Esse diretório contém um arquivo pom.xml
para definir dependências e a pasta src
, que contém dois subdiretórios. A pasta src/main
contém o código do pacote de pipeline, e a pasta src/test
contém o código de teste.
- Primeiro, abra 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.
Esse arquivo contém a definição da classe TaxiRide
que será usada no pipeline. A classe TaxiRide
tem um esquema associado, e é preciso que as etapas para defini-lo usando a anotação @DefaultSchema
já sejam conhecidas.
- Abra 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.
Esse é o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:
- A
DoFn
JsonToTaxiRide
(a partir da linha 94) usada para converter as mensagens recebidas do Pub/Sub em objetos da classeTaxiRide
. - A
PTransform
TaxiCountTransform
(a partir da linha 113). EssaPTransform
contém a lógica principal de contagem e janelamento do pipeline. Os testes serão focados nessaPTransform
.
A saída de TaxiCountTransform
precisa ser uma contagem de todas as corridas de táxi registradas por janela. No entanto, haverá vários eventos por viagem (embarques, desembarques etc.).
- Vamos filtrar a propriedade
ride_status
para garantir a contagem de cada viagem apenas uma vez. Para isso, vamos manter apenas os elementos comride_status
igual a "pickup":
Analisando de modo mais detalhado, a lógica de janelamento usada no pipeline está incluída abaixo:
Vamos fazer o janelamento em janelas fixas com 60 segundos de duração. Não temos um gatilho antecipado, mas vamos gerar resultados após a marca d'água transmitir o fim da janela. Incluímos disparos atrasados em cada novo elemento recebido, mas isso só será feito com um atraso permitido de um minuto. Por fim, vamos acumular o estado nas janelas até que o atraso permitido tenha sido transmitido.
Tarefa 2: analisar o uso do TestStream e executar o primeiro teste
- Abra 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.
A primeira meta é entender o uso do TestStream
no código de teste. Vale lembrar que a classe TestStream
permite simular um fluxo de mensagens em tempo real enquanto controla a progressão do tempo de processamento e a marca d'água.
O código do primeiro teste (a partir da linha 66) está incluído abaixo:
Criamos um novo TestStream
usando o método create
e, ao mesmo tempo, especificamos o codificador. Vamos transmitir a mensagem JSON como uma string para que possamos usar o elemento StringUtf8Coder
. O que o TestStream
acima está fazendo?
O TestStream
está realizando estas tarefas:
- definindo a marca-d'água inicial como a variável
startTime
(Instant(0)
); - adicionando três elementos à string com um carimbo de data/hora do evento de
startTime
. Dois desses eventos vçao ser contados (ride_status = "pickup"
). O outro, não; - adicionando outro evento "pickup", mas com um carimbo de data/hora de um minuto após o
startTime
; - avançando a marca d'água para um minuto após o
startTime
, o que vai acionar a primeira janela; - adicionando outro evento "pickup", mas com um carimbo de data/hora de dois minutos após o
startTime
; - avançando a marca d'água para "infinity". Isso significa que todas as janelas serão fechadas, e os novos dados vão estar além de qualquer atraso permitido.
- O restante do código do primeiro teste é semelhante ao exemplo de lote anterior, mas agora o
TestStream
está sendo usado em vez da transformaçãoCreate
:
No código acima, definimos a PCollection
de saída (outputCount
) criando o TestStream
e aplicando a PTransform
TaxiCountTransform
. Usamos a classe InvervalWindow
para definir as janelas que queremos verificar e, em seguida, usamos PAssert
com o método inWindow
para verificar os resultados por janela.
- Volte para o terminal no ambiente de desenvolvimento integrado (ou abra um novo terminal). Depois execute os comandos a seguir para fazer o realocamento para o diretório correto e instalar as dependências:
- Agora execute o teste acima usando este comando:
A saída a seguir será exibida após a conclusão do teste (embora o tempo decorrido possa ser diferente):
Tarefa 3: criar o TestStream para testar o processamento de dados com atraso
Nesta tarefa, você criará um código para um TestStream
, o que vai permitir testar a lógica de processamento de dados atrasados.
-
Volte para 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java e role para baixo até onde o método
testTaxiRideLateData
é comentado (perto da linha 104). -
Remova a marca de comentário do código do teste em questão, já que vamos concluir o código desta tarefa:
O código do teste foi concluído fora do processo de criação do TestStream
.
- Para concluir a tarefa, crie um objeto
TestStream
que execute estas ações:
- avança a marca d'água para
startTime
; - adiciona dois
TimestampedValue
s com o valorjson.format(json, "pickup")
e com o carimbo de data/horastartTime
; - avança a marca d'água em um minuto após o
startTime
; - adiciona outro
TimestamedValue
com o valorjson.format(json, "pickup")
e com o carimbo de data/horastartTime
; - avança a marca d'água em dois minutos após o
startTime
; - adiciona outro
TimestamedValue
com o valorjson.format(json, "pickup")
e com o carimbo de data/horastartTime
; - avança a marca d'água até o infinito.
Isso vai criar um TestStream
com quatro elementos que pertencem à primeira janela. Os dois primeiros elementos são pontuais, o segundo está atrasado (mas dentro do atraso permitido) e o elemento final está atrasado e ultrapassa o atraso permitido. Como estamos acumulando painéis disparados, o primeiro gatilho precisa contar dois eventos, e o gatilho final, três. O quarto evento não precisa ser incluído. Isso pode ser verificado com os métodos inOnTimePane
e inFinalPane
da classe PAssert
.
Se você não souber o que fazer, consulte as soluções.
Tarefa 4: executar um teste para processar dados atrasados
- Volte ao terminal e use o comando a seguir para executar os testes mais uma vez:
Se você concluiu as tarefas anteriores, a mensagem a seguir será exibida no terminal após o término dos testes:
Clique em Verificar meu progresso para ver o objetivo.
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.