arrow_back

Processamento de dados sem servidor com o Dataflow: como criar um pipeline ETL usando Apache Beam e Cloud Dataflow (Java)

Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Processamento de dados sem servidor com o Dataflow: como criar um pipeline ETL usando Apache Beam e Cloud Dataflow (Java)

Laboratório 1 hora 30 minutos universal_currency_alt 5 créditos show_chart Intermediário
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Informações gerais

Neste laboratório, você vai:

  • criar um pipeline do tipo Extrair-Transformar-Carregar (ETL) em lote no Apache Beam, que grava os dados brutos do Google Cloud Storage no Google BigQuery;
  • executar o pipeline do Apache Beam no Cloud Dataflow;
  • parametrizar a execução do pipeline.

Pré-requisitos:

  • Noções básicas sobre o Java.

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Ative o Google Cloud Shell

O Google Cloud Shell é uma máquina virtual com ferramentas de desenvolvimento. Ele tem um diretório principal permanente de 5 GB e é executado no Google Cloud.

O Cloud Shell oferece acesso de linha de comando aos recursos do Google Cloud.

  1. No console do Cloud, clique no botão "Abrir o Cloud Shell" na barra de ferramentas superior direita.

    Ícone do Cloud Shell em destaque

  2. Clique em Continuar.

O provisionamento e a conexão do ambiente podem demorar um pouco. Quando você estiver conectado, já estará autenticado, e o projeto estará definido com seu PROJECT_ID. Exemplo:

ID do projeto em destaque no terminal do Cloud Shell

A gcloud é a ferramenta de linha de comando do Google Cloud. Ela vem pré-instalada no Cloud Shell e aceita preenchimento com tabulação.

  • Para listar o nome da conta ativa, use este comando:
gcloud auth list

Saída:

Credentialed accounts: - @.com (active)

Exemplo de saída:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Para listar o ID do projeto, use este comando:
gcloud config list project

Saída:

[core] project =

Exemplo de saída:

[core] project = qwiklabs-gcp-44776a13dea667a6 Observação: a documentação completa da gcloud está disponível no guia com informações gerais sobre a gcloud CLI .

Verifique as permissões do projeto

Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).

  1. No console do Google Cloud, em Menu de navegação (Ícone do menu de navegação), selecione IAM e administrador > IAM.

  2. Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.

Nome da conta de serviço padrão e status do editor do Compute Engine destacados na página com a guia "Permissões"

Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
  1. No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
  2. Copie o número do projeto, por exemplo, 729328892908.
  3. Em Menu de navegação, clique em IAM e administrador > IAM.
  4. Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
  5. Em Novos principais, digite:
{project-number}-compute@developer.gserviceaccount.com
  1. Substitua {project-number} pelo número do seu projeto.
  2. Em Papel, selecione Projeto (ou Básico) > Editor.
  3. Clique em Save.

Como configurar o ambiente de desenvolvimento integrado

Neste laboratório, você vai usar principalmente a versão da Web do ambiente de desenvolvimento integrado Theia. Ele é hospedado no Google Compute Engine e contém o repositório do laboratório pré-clonado. Além disso, o Theia oferece suporte de servidor à linguagem Java e um terminal para acesso programático às APIs do Google Cloud com a ferramenta de linha de comando gcloud, similar ao Cloud Shell.

Para acessar o ambiente de desenvolvimento integrado Theia, copie e cole o link exibido no Qwiklabs em uma nova guia.

OBSERVAÇÃO: mesmo depois que o URL aparecer, talvez você precise esperar de três a cinco minutos para o ambiente ser totalmente provisionado. Até isso acontecer, uma mensagem de erro será exibida no navegador.

ide_url

O repositório do laboratório foi clonado para seu ambiente. Cada laboratório é dividido em uma pasta labs com códigos que você vai concluir e uma pasta solution com um exemplo totalmente funcional para consulta, caso você enfrente dificuldades. Clique no botão File explorer para conferir:

file_explorer

Também é possível criar vários terminais nesse ambiente, como você faria com o Cloud Shell:

new_terminal

Outra opção de visualização é executar gcloud auth list no terminal em que você fez login com uma conta de serviço fornecida. Ela tem as mesmas permissões que a sua conta de usuário do laboratório:

gcloud_auth

Se em algum momento o ambiente parar de funcionar, tente redefinir a VM que hospeda o ambiente de desenvolvimento integrado no Console do GCE, conforme este exemplo:

gce_reset

Apache Beam e Cloud Dataflow

Aproximadamente 5 minutos

O Cloud Dataflow é um serviço do Google Cloud Platform totalmente gerenciado para executar pipelines de processamento de dados em lote e por streaming do Apache Beam.

Apache Beam é um modelo avançado de programação de processamento de dados portátil, unificado e de código aberto. Com ele, os usuários finais podem definir pipelines de processamento paralelo de dados em lote e streaming usando Java, Python ou Go. Além disso, os pipelines do Apache Beam podem ser executados na máquina de desenvolvimento local em pequenos conjuntos de dados e em escala no Cloud Dataflow. No entanto, como o Apache Beam tem código aberto, há outros executores, portanto, é possível executar pipelines do Beam no Apache Flink e no Apache Spark, entre outros.

Diagrama da arquitetura do modelo dBeam.

Laboratório 1: como criar um pipeline ETL do zero

Introdução

Nesta seção, você irá criar um pipeline do tipo Extrair-Transformar-Carregar no Apache Beam do zero.

Revisão do conjunto de dados e do caso de uso

Para cada laboratório desta Quest, os dados de entrada são semelhantes aos registros do servidor da Web no formato de registro comum (link em inglês), junto a outros dados que um servidor da Web pode conter. Neste primeiro laboratório, os dados são tratados como uma fonte em lote; nos próximos, eles serão tratados como uma fonte em streaming. Sua tarefa é ler, analisar e gravar os dados no BigQuery, que é um armazenamento de dados sem servidor, para análise posterior.

Abra o laboratório apropriado

  • Crie um novo terminal no ambiente de desenvolvimento integrado, caso ainda não tenha feito isso. Depois copie e cole este comando:
# Change directory into the lab cd 1_Basic_ETL/labs export BASE_DIR=$(pwd)

Como modificar o arquivo pom.xml

Antes de começar a editar o código do pipeline real, adicione as dependências necessárias.

  1. Adicione as seguintes dependências ao arquivo pom.xml, localizado em 1_Basic_ETL/labs, na tag de dependências:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> <version>${beam.version}</version> </dependency>
  1. Uma tag <beam.version> já foi adicionada a pom.xml para indicar qual versão será instalada. Salve o arquivo.

  2. Por fim, faça o download dessas dependências para uso no pipeline:

# Download dependencies listed in pom.xml mvn clean dependency:resolve

Gravar seu primeiro pipeline

1 hora

Tarefa 1: gerar dados sintéticos

  1. Execute o seguinte comando no shell para clonar um repositório com scripts para gerar registros sintéticos do servidor da Web:
# Change to the directory containing the relevant code cd $BASE_DIR/../.. # Create GCS buckets and BQ dataset source create_batch_sinks.sh # Run a script to generate a batch of web server log events bash generate_batch_events.sh # Examine some sample events head events.json

O script cria um arquivo chamado events.json com linhas semelhantes às seguintes:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Em seguida, ele copia automaticamente o arquivo para seu bucket do Google Cloud Storage em gs://<YOUR-PROJECT-ID>/events.json.

  1. Acesse o Google Cloud Storage (em inglês) e veja se o bucket de armazenamento tem um arquivo chamado events.json.

Clique em Verificar meu progresso para conferir o objetivo. Gerar dados sintéticos

Tarefa 2: ler dados da sua origem

Se você tiver dúvidas nesta seção ou nas próximas, consulte a solução (em inglês).

  1. Abra MyPipeline.java no seu ambiente de desenvolvimento integrado, que pode ser encontrado em 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline. Verifique se os pacotes a seguir foram importados:
import com.google.gson.Gson; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  1. Role para baixo até o método run(). No momento, esse método contém um pipeline que não faz nada. Observe como um objeto Pipeline é criado usando um objeto PipelineOptions (links em inglês) e a linha final do método executa o pipeline.
Pipeline pipeline = Pipeline.create(options); // Do stuff pipeline.run();

Todos os dados nos pipelines do Apache Beam residem em PCollections. Para criar a PCollection inicial do pipeline, aplique uma transformação de raiz ao objeto de pipeline. A transformação raiz cria uma PCollection a partir de uma fonte de dados externa ou de alguns dados locais que você especificar.

Há dois tipos de transformações raiz nos SDKs do Beam: leitura e criação. Leitura: as transformações leem dados de uma fonte externa, como um arquivo de texto ou uma tabela de banco de dados Criação: as transformações criam uma PCollection a partir de uma java.util.Collection na memória e são especialmente úteis para testes.

O código de exemplo a seguir mostra como aplicar uma transformação raiz TextIO.Read para ler dados de um arquivo de texto. A transformação é aplicada a um objeto p do Pipeline e retorna um conjunto de dados de pipeline na forma de uma PCollection<String>. "ReadLines" é o nome da transformação que será útil quando você estiver trabalhando com pipelines maiores:

PCollection<String> lines = pipeline.apply("ReadLines", TextIO.read().from("gs://path/to/input.txt"));
  1. No método run(), crie uma constante de string chamada "input" e defina o valor dela como gs://<YOUR-PROJECT-ID>/events.json. Em um laboratório futuro, você usará parâmetros de linha de comando para transmitir essas informações.

  2. Crie uma PCollection de strings de todos os eventos no events.json chamando a transformação TextIO.read().

  3. Adicione as instruções de importação apropriadas ao topo do MyPipeline.java. Neste caso, import org.apache.beam.sdk.values.PCollection;

Tarefa 3: executar o pipeline para verificar se ele funciona

  • Volte ao terminal, acesse a pasta $BASE_DIR e execute o comando mvn compile exec:java:
cd $BASE_DIR # Set up environment variables export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} Observação: caso haja falha no build, execute o comando mvn clean install

No momento, o pipeline apenas lê dados. No entanto, a execução dele demonstra um fluxo de trabalho útil. Nele, você verifica o pipeline localmente e de maneira econômica usando o DirectRunner (link em inglês) em execução na máquina local antes de fazer cálculos mais caros. Para executar o pipeline usando o Google Cloud Dataflow, mude runner para DataflowRunner (link em inglês).

Tarefa 4: adicionar em uma transformação

Se tiver dificuldades, veja a solução (em inglês).

As transformações são o que mudam seus dados. No Apache Beam, as transformações são feitas pela classe PTransform (link em inglês). No ambiente de execução, essas operações serão realizadas em vários workers independentes. A entrada e a saída para cada PTransform são uma PCollection. Na verdade, talvez você não tenha percebido, mas já usou uma PTransform ao ler dados do Google Cloud Storage. Uma PCollection de strings foi criada, independentemente da atribuição a uma variável.

Como o Beam usa um método de aplicação genérico para PCollections, é possível encadear transformações de maneira sequencial, por exemplo, para criar um pipeline sequencial como este:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]);

Nesta tarefa, use um novo tipo de transformação: ParDo. ParDo é uma transformação do Beam para processamento paralelo genérico. O paradigma de processamento de ParDo é semelhante à fase “Map” de um algoritmo no estilo Mapear/Embaralhar/Reduzir: uma transformação ParDo considera cada elemento na PCollection de entrada, realiza alguma função de processamento (seu código de usuário) nesse elemento e emite zero, um ou vários elementos a uma PCollection de saída.

ParDo é útil para várias operações de processamento de dados comum, incluindo as seguintes:

  • Filtrar um conjunto de dados. Use ParDo para considerar cada elemento de uma PCollection e enviar o elemento para uma nova coleção ou descartá-lo.
  • Formatação ou conversão do tipo de cada elemento de um conjunto de dados: Se a PCollection de entrada tiver elementos de um tipo ou formato diferente do que você quer, use ParDo para realizar uma conversão em cada elemento e gerar o resultado em uma nova PCollection.
  • Extração de partes de cada elemento de um conjunto de dados: Se você tem uma PCollection de registros com vários campos, por exemplo, é possível usar umParDo para analisar apenas os campos que você quer considerar em uma nova PCollection.
  • Cálculos de cada elemento de um conjunto de dados: É possível usar ParDo para realizar cálculos simples ou complexos em cada elemento (ou determinados elementos) de uma PCollection e gerar os resultados como uma nova PCollection.
  1. Para concluir essa tarefa, grave uma transformação ParDo que leia uma string JSON representando um único evento, analise-a usando Gson e gere o objeto personalizado retornado por Gson.

As funções ParDo podem ser implementadas de duas maneiras: in-line ou como uma classe estática. Escreva funções ParDo in-line desta forma:

pCollection.apply(ParDo.of(new DoFn<T1, T2>() { @ProcessElement public void processElement(@Element T1 i, OutputReceiver<T2> r) { // Do something r.output(0); } }));

Como alternativa, elas podem ser implementadas como classes estáticas que ampliam DoFn, o que facilita a integração com frameworks de teste:

static class MyDoFn extends DoFn<T1, T2> { @ProcessElement public void processElement(@Element T1 json, OutputReceiver<T2> r) throws Exception { // Do something r.output(0); } }

Em seguida, no próprio pipeline:

[Initial Input PCollection].apply(ParDo.of(new MyDoFn());
  1. Para usar o Gson, você precisa criar uma classe interna dentro de MyPipeline. Para aproveitar os esquemas do Beam, adicione a anotação @DefaultSchema. Falaremos mais sobre ela depois. Confira um exemplo de como usar o Gson:
// Elsewhere @DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; String field2; } // Within the DoFn Gson gson = new Gson(); MyClass myClass = gson.fromJson(jsonString, MyClass.class);
  1. Nomeie a classe interna como CommonLog. Para construir essa classe interna com as variáveis de estado corretas, consulte a amostra JSON acima: a classe precisa ter uma variável de estado para cada chave no JSON de entrada. Essa variável precisa indicar o tipo e o nome do valor e da chave.

  2. Por enquanto, use String para "timestamp", Long para "INTEGER" (o INTEGER do BigQuery é INT64), Double para "FLOAT" (o FLOAT do BigQuery é FLOAT64), e combine com o seguinte esquema do BigQuery:

Página em guias do esquema CommonLog, que inclui as informações de registro como user_id, timestamp e num_bytes.

Se tiver dificuldades, confira a solução (link em inglês).

Tarefa 5: gravar em um coletor

Neste momento, o pipeline lê um arquivo do Google Cloud Storage, analisa cada linha e gera um CommonLog para cada elemento. A próxima etapa é gravar esses objetos CommonLog em uma tabela do BigQuery.

Embora seja possível instruir o pipeline a criar uma tabela do BigQuery quando necessário, você precisará criar o conjunto de dados com antecedência. Isso já foi feito pelo script generate_batch_events.sh.

Para examinar o conjunto de dados:

# Examine dataset bq ls # No tables yet bq ls logs

Para gerar as PCollections finais do pipeline, aplique uma transformação Write a essa PCollection. As transformações Write podem gerar elementos de uma PCollection em um coletor de dados externo, como uma tabela de banco de dados. É possível usar Write para gerar uma PCollection a qualquer momento no pipeline, embora o normal seja gravar os dados no final do pipeline.

O código de exemplo a seguir mostra como aplicar uma transformação TextIO.Write para gravar uma PCollection de string em um arquivo de texto:

PCollection<String> pCollection = ...; pCollection.apply("WriteMyFile", TextIO.write().to("gs://path/to/output"));
  1. Nesse caso, em vez de usar TextIO.write(), use BigQueryIO.write() (em inglês).

Essa função exige vários fatores, como a tabela específica a ser gravada e o esquema dela. Como alternativa, especifique se você pretende anexar a uma tabela, recriar tabelas (útil na iteração inicial do pipeline) ou criar a tabela. Por padrão, essa transformação cria tabelas que não existem e não grava em tabelas que não estejam vazias (ambos em inglês).

Devido à adição de esquemas do Beam ao SDK, é possível instruir a transformação para inferir o esquema da tabela do objeto passado usando .useBeamSchema() e marcando o tipo de entrada. Também é possível fornecer explicitamente o esquema com .withSchema(), mas é necessário criar um objeto TableSchema do BigQuery para a transmissão. Como você anotou a classe CommonLog com @DefaultSchema(JavaFieldSchema.class), cada transformação sabe os nomes e tipos dos campos no objeto, incluindo BigQueryIO.write().

  1. Analise as diversas alternativas na seção "Writing" do BigQueryIO (em inglês). Nesse caso, como você anotou o objeto CommonLog, use .useBeamSchema() e segmente a tabela <YOUR-PROJECT-ID>:logs.logs desta forma:
pCollection.apply(BigQueryIO.<MyObject>write() .to("my-project:output_dataset.output_table") .useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) ); Note: WRITE_TRUNCATE sempre irá excluir e recriar a tabela. Isso é útil na iteração inicial do pipeline, especialmente à medida que você faz iterações no esquema. No entanto, isso pode causar problemas não intencionais na produção com facilidade. As opções WRITE_APPEND ou WRITE_EMPTY são mais seguras.

O conjunto de todos os tipos disponíveis em "Esquemas do Beam" pode ser encontrado na documentação Esquema. FieldType (em inglês). Todos os possíveis tipos de dados do BigQuery no SQL padrão que podem ser usados na documentação do setType. Se quiser saber mais sobre isso, inspecione o esquema do Beam até a conversão no BigQuery (todos em inglês).

Tarefa 6: executar o pipeline

Volte ao terminal, altere o valor da variável de ambiente RUNNER para DataflowRunner e execute o pipeline com o mesmo comando usado anteriormente. Depois de começar, acesse a página do produto Dataflow e observe a organização do pipeline. Se você tiver nomeado as transformações, os nomes serão exibidos. Clique em cada um para revelar em tempo real o número de elementos sendo processados a cada segundo.

A forma geral precisa ter um único caminho, começando pela transformação "Read" e terminando com a transformação "Write". À medida que o pipeline é executado, os workers são adicionados automaticamente, enquanto o serviço determina as necessidades do pipeline, e desaparecem quando não são mais necessários. É possível observar esse processo ao acessar o Compute Engine (link em inglês) e conferir as máquinas virtuais criadas pelo serviço Dataflow.

Observação: se o pipeline estiver sendo criado e você encontrar muitos erros devido ao código ou à configuração incorreta no serviço do Dataflow, defina RUNNER novamente como "DirectRunner" para executá-lo localmente e receber feedback mais rápido. Nesse caso, essa abordagem funciona porque o conjunto de dados é pequeno e você não está usando recursos incompatíveis com o DirectRunner. # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export PIPELINE_FOLDER=gs://${PROJECT_ID} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER}"
  • Quando o pipeline terminar, volte à janela do navegador do BigQuery e consulte a tabela.

Se o código não estiver funcionando conforme o esperado e você não souber o que fazer, confira a solução (em inglês).

Clique em Verificar meu progresso para conferir o objetivo. Execute o pipeline

Laboratório parte 2: parametrização de ETL básico

Aproximadamente 20 minutos

Grande parte do trabalho dos engenheiros de dados é previsível, como jobs recorrentes, ou é semelhante a outros trabalhos. No entanto, o processo para executar pipelines exige experiência em engenharia. Veja as etapas que você acabou de concluir:

  1. Você criou um ambiente de desenvolvimento e desenvolveu um pipeline. O ambiente incluiu o SDK do Apache Beam e outras dependências.
  2. Você executou o pipeline do ambiente de desenvolvimento. O SDK do Apache Beam preparou arquivos no Cloud Storage, criou um arquivo de solicitação de job e enviou o arquivo ao serviço do Cloud Dataflow.

Seria muito melhor se houvesse uma maneira de iniciar um job por uma chamada de API ou sem ter que configurar um ambiente de desenvolvimento, já que usuários sem conhecimentos técnicos não conseguiriam realizar esse processo. Com isso, também seria possível executar pipelines.

Os modelos do Dataflow tentam resolver esse problema mudando a representação criada quando um pipeline é compilado para parametrização. Infelizmente, esse processo não é tão simples quanto a exposição de parâmetros de linha de comando, mas isso será feito em outros laboratórios. Com os modelos do Dataflow, o fluxo de trabalho acima fica da seguinte forma:

  1. Os desenvolvedores criam um ambiente de desenvolvimento e elaboram o pipeline. O ambiente inclui o SDK do Apache Beam e outras dependências.
  2. Os desenvolvedores executam o pipeline e criam um modelo. O SDK do Apache Beam organiza arquivos no Cloud Storage, cria um arquivo de modelo (semelhante ao de solicitação de job) e o salva no Cloud Storage.
  3. Usuários que não são desenvolvedores ou outras ferramentas de fluxo de trabalho, como o Airflow, podem executar jobs facilmente com o Console do Google Cloud, a ferramenta de linha de comando gcloud ou a API REST para enviar solicitações de execução de arquivos de modelo para o serviço do Cloud Dataflow.

Neste laboratório, você vai praticar o uso de um dos diversos modelos do Dataflow criados pelo Google para realizar a mesma tarefa do pipeline criado na parte 1.

Tarefa 1: criar um arquivo de esquema JSON

Embora você não tivesse que transmitir um objeto TableSchema para a transformação BigQueryIO.writeTableRows() devido ao uso de .usedBeamSchema(), é necessário transmitir ao modelo do Dataflow um arquivo JSON que represente o esquema neste exemplo.

  1. Abra o terminal e volte ao diretório principal. Execute o seguinte comando para capturar o esquema da tabela logs.logs:
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. Agora, capture esse resultado em um arquivo e faça upload para o GCS. Os comandos sed extras são para criar um objeto JSON completo esperado pelo DataFlow.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

Clique em Verificar meu progresso para conferir o objetivo. Como criar um arquivo de esquema JSON

Tarefa 2: escrever uma função JavaScript definida pelo usuário

O modelo do Dataflow do Cloud Storage para o BigQuery exige uma função JavaScript para converter o texto bruto em JSON válido. Nesse caso, cada linha de texto é um JSON válido, e a função é um pouco trivial.

Para concluir a tarefa, use o ambiente de desenvolvimento integrado para criar um arquivo .js com o conteúdo abaixo e copiá-lo para o Google Cloud Storage.

  1. Copie a função abaixo para seu próprio arquivo transform.js na pasta 1_Basic_ETL/:
function transform(line) { return line; }
  1. Em seguida, execute o comando a seguir para copiar o arquivo no Google Cloud Storage:
cd 1_Basic_ETL/ export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

Clique em Verificar meu progresso para conferir o objetivo. Escrever uma função JavaScript definida pelo usuário

Tarefa 3: executar um modelo do Dataflow

  1. Acesse a IU da Web do Cloud Dataflow (link em inglês).
  2. Clique em CRIAR JOB A PARTIR DO MODELO.
  3. Em job name, insira um nome para o job do Cloud Dataflow.
  4. No modelo do Cloud Dataflow, selecione o modelo de Arquivos de texto no Cloud Storage para o BigQuery na seção Processar dados em massa (lote). NÃO use a seção de streaming.
  5. No Caminho da UDF em JavaScript no Cloud Storage, digite o caminho para .js no formato gs://<YOUR-PROJECT-ID>/transform.js.
  6. No Caminho JSON, grave o caminho no schema.json no formato gs://<YOUR-PROJECT-ID>/schema.json.
  7. No Nome da UDF em JavaScript, digite transform.
  8. Na Tabela de saída do BigQuery, digite o código do projeto do GCP seguido de :iotlab.sensordata
  9. No Caminho de entrada do Cloud Storage, insira o caminho para events.json no formato gs://<YOUR-PROJECT-ID>/events.json.
  10. No Diretório temporário do BigQuery, insira uma nova pasta no mesmo bucket, que será criada pelo job.
  11. No Local temporário, insira uma segunda pasta nova nesse bucket.
  12. Deixe a Criptografia como Chave gerenciada pelo Google.
  13. Clique no botão Executar job.

Durante a execução do job, é possível inspecioná-lo na IU da Web do Dataflow.

Clique em Verificar meu progresso para conferir o objetivo. Executar um modelo do Dataflow

Tarefa 4: inspecionar o código do modelo do Dataflow

  1. Faça o recall do código do modelo do Dataflow que você acabou de usar.

  2. Role para baixo até encontrar o método principal. O código precisa ser semelhante ao pipeline que você criou.

  • Ele começa com um objeto Pipeline, criado usando um objeto PipelineOptions.
  • Ele consiste em uma cadeia de PTransforms, começando com uma transformação TextIO.read().
  • A PTransform fica um pouco diferente após a transformação de leitura (link em inglês). Ela permite usar JavaScript para transformar as strings de entrada se, por exemplo, o formato de origem não estiver bem alinhado com o formato da tabela do BigQuery. Para conferir a documentação sobre como usar esse recurso, consulte esta página.
  • Depois da UDF em JavaScript, a PTransform usa uma função de biblioteca para converter o JSON em um tablerow. Saiba mais sobre esse código nesta página (links em inglês).
  • O método Write PTransform (link em inglês) é um pouco diferente porque, em vez de usar um esquema conhecido no momento da compilação do gráfico, o código serve para aceitar parâmetros que só serão conhecidos no ambiente de execução. Isso só é possível devido à classe NestedValueProvider. Ela também usa um esquema definido explicitamente, em vez de um sistema inferido pelo esquema do Beam, usando .useBeamSchema(), como você fez.
  1. Confira o próximo laboratório, que aborda a criação de pipelines que não são simplesmente cadeias de PTransforms e como é possível adaptar um pipeline que você criou para ser um modelo personalizado do Dataflow.

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível