arrow_back

Processamento de dados sem servidor com o Dataflow: como fazer testes usando o Apache Beam (Java)

Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Processamento de dados sem servidor com o Dataflow: como fazer testes usando o Apache Beam (Java)

Laboratório 1 hora 30 minutos universal_currency_alt 7 créditos show_chart Avançado
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Visão geral

Neste laboratório, você irá:

  • gravar testes de unidade para DoFns e PTransforms usando ferramentas de teste no Apache Beam;
  • realizar um teste de integração de pipeline;
  • usar a classe TestStream para testar o comportamento do janelamento de um pipeline de streaming.

Testar o pipeline é uma etapa particularmente importante no desenvolvimento de uma solução eficaz de processamento de dados. A natureza indireta do modelo do Beam pode fazer com que as execuções de depuração com falhas sejam tarefas incomuns.

Neste laboratório, vamos ver como realizar testes de unidade localmente com ferramentas no pacote de testes (link em inglês) do SDK do Beam usando o DirectRunner.

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Verifique as permissões do projeto

Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).

  1. No console do Google Cloud, em Menu de navegação (Ícone do menu de navegação), selecione IAM e administrador > IAM.

  2. Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.

Nome da conta de serviço padrão e status do editor do Compute Engine destacados na página com a guia "Permissões"

Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
  1. No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
  2. Copie o número do projeto, por exemplo, 729328892908.
  3. Em Menu de navegação, clique em IAM e administrador > IAM.
  4. Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
  5. Em Novos principais, digite:
{project-number}-compute@developer.gserviceaccount.com
  1. Substitua {project-number} pelo número do seu projeto.
  2. Em Papel, selecione Projeto (ou Básico) > Editor.
  3. Clique em Save.

Configuração do ambiente de desenvolvimento integrado

Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud, similar ao Cloud Shell.

Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.

OBSERVAÇÃO: mesmo depois que o URL aparecer, talvez você precise esperar de três a cinco minutos para o ambiente ser totalmente provisionado. Até isso acontecer, uma mensagem de erro será exibida no navegador.

ide_url

O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs com códigos que você vai concluir e uma pasta solution com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer para conferir:

file_explorer

Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:

new_terminal

Outra opção de visualização é executar gcloud auth list no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:

gcloud_auth

Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:

gce_reset

O código do laboratório é dividido entre duas pastas: 8a_Batch_Testing_Pipeline/lab e 8b_Stream_Testing_Pipeline/lab. Caso haja problemas em algum momento, a solução poderá ser encontrada nas pastas de solução correspondentes.

Parte 1 do laboratório: como realizar testes de unidade para DoFns e PTransforms

Nesta etapa do laboratório, vamos realizar testes de unidade em DoFns e PTransforms para um pipeline em lote calculando estatísticas de sensores climáticos. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:

  • Crie um TestPipeline (link em inglês).
  • Crie alguns dados de entrada de teste e use a transformação Create (link em inglês) para criar uma PCollection com esses dados.
  • Aplique a transformação à PCollection de entrada e salve a PCollection resultante.
  • Use PAssert (link em inglês) e suas subclasses para verificar se a PCollection de saída contém os elementos esperados.

O elemento TestPipeline é uma classe especial incluída no SDK do Beam especificamente para testar a lógica de pipeline e transformações.

  • Ao fazer os testes, use TestPipeline em vez de Pipeline ao criar o objeto de pipeline:
TestPipeline p = TestPipeline.create();

A transformação Create usa uma coleção de objetos na memória (um iterável em Java) e cria uma PCollection dessa coleção. A meta é ter um pequeno conjunto de dados de entrada de teste das PTransforms, em que sabemos a PCollection de saída esperada.

List<String> input = Arrays.asList(testInput); // Some code to create a TestPipeline p outputPColl = p.apply(Create.of(input).apply(...);

Por fim, queremos verificar se a PCollection de saída corresponde à saída esperada. Usamos a classe PAssert para verificar isso. Por exemplo, podemos usar o método containsInAnyOrder para verificar se a PCollection de saída tem os elementos corretos:

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

Tarefa 1: analisar o código do pipeline principal

  1. Navegue até 8a_Batch_Testing_Pipeline/lab no seu ambiente de desenvolvimento integrado.

Esse diretório contém um arquivo pom.xml para definir dependências e a pasta src, que contém dois subdiretórios. A pasta src/main contém o código do pacote de pipeline, e a pasta src/test contém o código de teste.

  1. Primeiro, abra 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.

Esse arquivo contém a definição da classe WeatherRecord que será usada no pipeline. A classe WeatherRecord tem um esquema associado, e é preciso que as etapas para defini-lo usando a anotação @DefaultSchema já sejam conhecidas. No entanto, precisamos sobrepor o método equals ao definir a classe.

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

Por quê? O elemento PAssert vai usar o método equals para verificar a associação na PCollection de saída. No entanto, o método "equals" padrão para um objeto Java simples (POJO, na sigla em inglês) apenas compara os endereços dos objetos. Em vez disso, queremos ter certeza de que estamos comparando o conteúdo dos objetos. Conforme visto acima, isso é simples.

  1. Agora abra 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.

Esse é o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:

  • As DoFns ConvertCsvToWeatherRecord (a partir da linha 65) e ConvertTempUnits (a partir da linha 81). Em breve, vamos realizar um teste de unidade nessas DoFns.
  • A PTransform ComputeStatistics (a partir da linha 103). Esse é um exemplo de uma transformação composta que será possível testar da mesma maneira que uma DoFn.
  • A PTransform WeatherStatsTransform (a partir da linha 123). Essa PTransform contém a lógica de processamento de todo o pipeline, exceto as transformações de origem e de coletor, para podermos realizar um pequeno teste de integração de pipeline em dados sintéticos criados por uma transformação Create.

Se você encontrar um erro lógico no código de processamento, não o corrija ainda. Mais tarde, vamos abordar como limitar o erro realizando testes.

Tarefa 2: adicionar dependências para testes

  1. Abra 8a_Batch_Testing_Pipeline/lab/pom.xml.

Precisamos adicionar algumas dependências para realizar testes. Qualquer código em Java do Beam para testes precisa estar vinculado ao JUnit e ao Hamcrest. No Maven, só precisamos atualizar o arquivo pom.xml.

  1. Para concluir esta tarefa, copie e cole a seguinte XML no arquivo pom.xml, indicado em um comentário:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

O escopo dessas dependências é "test". Esses pacotes serão necessários ao executar um teste com o mvn test, mas não ao executar o pipeline principal.

Tarefa 3: gravar o primeiro teste de unidade de DoFn no Apache Beam

  1. Acesse 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.

Esse arquivo contém o código dos testes de unidade de DoFn e PTransform. Na maioria das vezes, o código será comentado, mas vamos remover a marca de comentário à medida que continuarmos.

Vamos começar explorando um teste de unidade de DoFn para a DoFn de ConvertCsvToWeatherRecord (a partir da linha 43).

  1. Primeiro, criamos uma classe para testar o pipeline e criamos um objeto TestPipeline:
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

Vamos usar o objeto TestPipeline em todos os testes a seguir, embora não precisemos nos preocupar com os efeitos colaterais de reutilizar o mesmo objeto devido à palavra-chave transient ao criá-lo.

  1. Agora confira o código (incompleto) do primeiro teste:
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* Create PCollection from in-memory object */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // Include PAssert statement to check for correct results p.run().waitUntilFinish(); }

Anotamos o método que vamos usar para testar o pipeline com a anotação @Test. Criamos uma única entrada de teste (testInput) que representa uma linha de um arquivo CSV (o formato de entrada esperado para nosso pipeline) e a colocamos em uma entrada de objeto da lista (List object input).

Faltam algumas partes no restante do código para realizar o teste.

  1. Para concluir a tarefa, primeiro adicione a transformação Create para converter input em uma PCollection.

  2. Em segundo lugar, inclua uma instrução PAssert usando o método containsInAnyOrder para comparar os elementos input e testOutput.

Se você não souber o que fazer, consulte os testes comentados posteriormente ou as soluções.

Tarefa 4: executar o primeiro teste de unidade de DoFn

  1. Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso, e cole o comando a seguir:
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

Agora já podemos executar o teste.

  1. Para fazer isso, execute este comando no seu terminal:
mvn test

Se você concluiu a tarefa anterior corretamente, a mensagem a seguir será exibida no terminal após a conclusão do teste (o tempo exato decorrido será diferente):

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Tarefa 5: executar o segundo teste de unidade de DoFn e o pipeline de depuração

  1. Volte para 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java e remova a marca de comentário do código do segundo teste de unidade (perto das linhas 67 a 80). É possível fazer isso destacando o código e pressionando Ctrl + / (ou Cmd + / no MacOS). O código é exibido abaixo para referência:
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

Esse teste garante que a DoFn ConvertTempUnits() esteja funcionando conforme o esperado.

  1. Salve WeatherStatisticsPipelineTest.java e volte ao terminal.

  2. Mais uma vez, execute o comando a seguir para executar os testes:

mvn test

Desta vez, o teste falhou. Se rolarmos pela saída, vamos encontrar as seguintes informações sobre essa falha:

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>

À primeira vista, talvez essa não seja a mensagem de erro mais útil. No entanto, vemos que não houve correspondência para o WeatherRecord esperado em testOutput. Talvez haja algo de errado com a conversão de temperatura.

  1. Volte para 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java e role para baixo até a definição de ConvertTempUnits (perto da linha 81).

  2. Para concluir a tarefa, encontre o erro na lógica de processamento da DoFn e execute novamente o comando mvn test para confirmar se o teste foi concluído. Como lembrete, a fórmula para converter graus Celsius em Farenheit é fornecida abaixo:

tempF = tempC * 1.8 + 32.0

Se você não souber o que fazer, consulte as soluções.

Tarefa 6: executar o teste de unidade de PTransform e o pipeline completo

  1. Volte para 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java e remova a marca de comentário do código dos dois testes finais (começando na linha 84, mais ou menos).

O primeiro teste em que removemos a marca de comentário foi o de testar a PTransform composta ComputeStatistics. Veja a seguir uma forma truncada do código para referência:

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //Define Testing Inputs (Omitted here) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

Observe que isso é muito semelhante aos testes de unidade de DoFn anteriores. A única diferença real, além das entradas e saídas diferentes de testes, é que estamos aplicando PTransform em vez de ParDo(new DoFn()).

O teste final é para o pipeline completo. No código de pipeline (WeatherStatisticsPipeline.java), o pipeline completo, exceto a origem e o coletor, foi incluído em uma única PTransform WeatherStatsTransform.

  1. Para testar o pipeline completo, podemos repetir algo semelhante ao que fizemos acima, mas usando aquela PTransform:
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //Define Testing Inputs (Omitted here) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. Volte ao terminal e execute o comando a seguir para fazer os testes mais uma vez:
mvn test

Se você concluiu as tarefas anteriores, a mensagem a seguir será exibida no terminal após o término dos testes:

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Clique em Verificar meu progresso para ver o objetivo. Execute testes de unidade para DoFns e PTransforms

Parte 2 do laboratório: como testar a lógica do processamento de stream com o TestStream

Nesta etapa do laboratório, vamos realizar testes de unidade para um pipeline de streaming calculando contagens de janelas de corridas de táxi. Para testar as transformações criadas, use o padrão e as transformações a seguir fornecidos pelo Beam:

  • Crie um TestPipeline (link em inglês).
  • Use a classe TestStream (link em inglês) para gerar dados de streaming. Isso inclui gerar uma série de eventos, avançar a marca-d'água e melhorar o tempo de processamento.
  • Use PAssert (link em inglês) e as subclasses para verificar se a PCollection de saída contém os elementos esperados em janelas específicas.

Durante a execução de um pipeline que lê dados de um TestStream, a leitura aguarda a conclusão de todas as consequências de cada evento antes de passar para o próximo, inclusive quando o tempo de processamento avança e os gatilhos apropriados são acionados. O elemento TestStream permite que o efeito do acionamento e a lentidão permitida sejam observados e testados em um pipeline. Isso inclui a lógica sobre gatilhos atrasados e dados descartados devido a atrasos.

Tarefa 1: analisar o código do pipeline principal

  1. Acesse 8b_Stream_Testing_Pipeline/lab no seu ambiente de desenvolvimento integrado.

Esse diretório contém um arquivo pom.xml para definir dependências e a pasta src, que contém dois subdiretórios. A pasta src/main contém o código do pacote de pipeline, e a pasta src/test contém o código de teste.

  1. Primeiro, abra 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.

Esse arquivo contém a definição da classe TaxiRide que será usada no pipeline. A classe TaxiRide tem um esquema associado, e é preciso que as etapas para defini-lo usando a anotação @DefaultSchema já sejam conhecidas.

  1. Abra 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.

Esse é o código principal do pipeline. Os conceitos desse pipeline foram abordados principalmente em laboratórios anteriores, mas não se esqueça de analisar os itens a seguir com mais atenção:

  • A DoFn JsonToTaxiRide (a partir da linha 94) usada para converter as mensagens recebidas do Pub/Sub em objetos da classe TaxiRide.
  • A PTransform TaxiCountTransform (a partir da linha 113). Essa PTransform contém a lógica principal de contagem e janelamento do pipeline. Os testes serão focados nessa PTransform.

A saída de TaxiCountTransform precisa ser uma contagem de todas as corridas de táxi registradas por janela. No entanto, haverá vários eventos por viagem (embarques, desembarques etc.).

  1. Vamos filtrar a propriedade ride_status para garantir a contagem de cada viagem apenas uma vez. Para isso, vamos manter apenas os elementos com ride_status igual a "pickup":
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

Analisando de modo mais detalhado, a lógica de janelamento usada no pipeline está incluída abaixo:

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

Vamos fazer o janelamento em janelas fixas com 60 segundos de duração. Não temos um gatilho antecipado, mas vamos gerar resultados após a marca d'água transmitir o fim da janela. Incluímos disparos atrasados em cada novo elemento recebido, mas isso só será feito com um atraso permitido de um minuto. Por fim, vamos acumular o estado nas janelas até que o atraso permitido tenha sido transmitido.

Tarefa 2: analisar o uso do TestStream e executar o primeiro teste

  1. Abra 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.

A primeira meta é entender o uso do TestStream no código de teste. Vale lembrar que a classe TestStream permite simular um fluxo de mensagens em tempo real enquanto controla a progressão do tempo de processamento e a marca d'água.

O código do primeiro teste (a partir da linha 66) está incluído abaixo:

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

Criamos um novo TestStream usando o método create e, ao mesmo tempo, especificamos o codificador. Vamos transmitir a mensagem JSON como uma string para que possamos usar o elemento StringUtf8Coder. O que o TestStream acima está fazendo?

O TestStream está realizando estas tarefas:

  • definindo a marca-d'água inicial como a variável startTime (Instant(0));
  • adicionando três elementos à string com um carimbo de data/hora do evento de startTime. Dois desses eventos vçao ser contados (ride_status = "pickup"). O outro, não;
  • adicionando outro evento "pickup", mas com um carimbo de data/hora de um minuto após o startTime;
  • avançando a marca d'água para um minuto após o startTime, o que vai acionar a primeira janela;
  • adicionando outro evento "pickup", mas com um carimbo de data/hora de dois minutos após o startTime;
  • avançando a marca d'água para "infinity". Isso significa que todas as janelas serão fechadas, e os novos dados vão estar além de qualquer atraso permitido.
  1. O restante do código do primeiro teste é semelhante ao exemplo de lote anterior, mas agora o TestStream está sendo usado em vez da transformação Create:
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

No código acima, definimos a PCollection de saída (outputCount) criando o TestStream e aplicando a PTransform TaxiCountTransform. Usamos a classe InvervalWindow para definir as janelas que queremos verificar e, em seguida, usamos PAssert com o método inWindow para verificar os resultados por janela.

  1. Volte para o terminal no ambiente de desenvolvimento integrado (ou abra um novo terminal). Depois execute os comandos a seguir para fazer o realocamento para o diretório correto e instalar as dependências:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Agora execute o teste acima usando este comando:
mvn test

A saída a seguir será exibida após a conclusão do teste (embora o tempo decorrido possa ser diferente):

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

Tarefa 3: criar o TestStream para testar o processamento de dados com atraso

Nesta tarefa, você criará um código para um TestStream, o que vai permitir testar a lógica de processamento de dados atrasados.

  1. Volte para 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java e role para baixo até onde o método testTaxiRideLateData é comentado (perto da linha 104).

  2. Remova a marca de comentário do código do teste em questão, já que vamos concluir o código desta tarefa:

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

O código do teste foi concluído fora do processo de criação do TestStream.

  1. Para concluir a tarefa, crie um objeto TestStream que execute estas ações:
  • avança a marca d'água para startTime;
  • adiciona dois TimestampedValues com o valor json.format(json, "pickup") e com o carimbo de data/hora startTime;
  • avança a marca d'água em um minuto após o startTime;
  • adiciona outro TimestamedValue com o valor json.format(json, "pickup") e com o carimbo de data/hora startTime;
  • avança a marca d'água em dois minutos após o startTime;
  • adiciona outro TimestamedValue com o valor json.format(json, "pickup") e com o carimbo de data/hora startTime;
  • avança a marca d'água até o infinito.

Isso vai criar um TestStream com quatro elementos que pertencem à primeira janela. Os dois primeiros elementos são pontuais, o segundo está atrasado (mas dentro do atraso permitido) e o elemento final está atrasado e ultrapassa o atraso permitido. Como estamos acumulando painéis disparados, o primeiro gatilho precisa contar dois eventos, e o gatilho final, três. O quarto evento não precisa ser incluído. Isso pode ser verificado com os métodos inOnTimePane e inFinalPane da classe PAssert.

Se você não souber o que fazer, consulte as soluções.

Tarefa 4: executar um teste para processar dados atrasados

  • Volte ao terminal e use o comando a seguir para executar os testes mais uma vez:
mvn test

Se você concluiu as tarefas anteriores, a mensagem a seguir será exibida no terminal após o término dos testes:

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

Clique em Verificar meu progresso para ver o objetivo. Teste a lógica de processamento de stream com o TestStream

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.

Este conteúdo não está disponível no momento

We will notify you via email when it becomes available

Ótimo!

We will contact you via email if it becomes available