arrow_back

Processamento de dados sem servidor com o Dataflow: como criar um Pipeline ETL usando Apache Beam e Cloud Dataflow (Python)

Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Processamento de dados sem servidor com o Dataflow: como criar um Pipeline ETL usando Apache Beam e Cloud Dataflow (Python)

Laboratório 1 hora 30 minutos universal_currency_alt 5 créditos show_chart Intermediário
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Informações gerais

Neste laboratório, vamos mostrar como:

  • criar um pipeline do tipo Extrair-Transformar-Carregar (ETL) em lote no Apache Beam, que grava os dados brutos do Google Cloud Storage no Google BigQuery;
  • executar o pipeline do Apache Beam no Cloud Dataflow;
  • parametrizar a execução do pipeline.

Pré-requisitos:

  • Conhecimento básico do Python

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Verifique as permissões do projeto

Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).

  1. No console do Google Cloud, em Menu de navegação (Ícone do menu de navegação), selecione IAM e administrador > IAM.

  2. Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.

Nome da conta de serviço padrão e status do editor do Compute Engine destacados na página com a guia "Permissões"

Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
  1. No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
  2. Copie o número do projeto, por exemplo, 729328892908.
  3. Em Menu de navegação, clique em IAM e administrador > IAM.
  4. Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
  5. Em Novos principais, digite:
{project-number}-compute@developer.gserviceaccount.com
  1. Substitua {project-number} pelo número do seu projeto.
  2. Em Papel, selecione Projeto (ou Básico) > Editor.
  3. Clique em Save.

Configuração do ambiente de desenvolvimento baseado em notebook do Jupyter

Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.

  1. No console do Google Cloud, abra o Menu de navegação e selecione Vertex AI > Workbench.

  2. Ative a API Notebooks.

  3. Na página "Workbench", clique em CRIAR NOVA.

  4. Na caixa de diálogo Nova instância, defina a região como e a zona como .

  5. Em "Ambiente", selecione Apache Beam.

  6. Clique em CRIAR na parte de baixo da caixa de diálogo.

Observação: pode levar de três a cinco minutos para que o ambiente seja totalmente provisionado. Aguarde até a conclusão dessa etapa. Observação: clique em Ativar API Notebooks para fazer isso.
  1. Depois, clique no link ABRIR O JUPYTERLAB ao lado do nome do seu notebook para abrir o ambiente em uma nova guia do seu navegador.

IDE_link

  1. Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.

Abrir terminal

Faça o download do repositório de código

Agora você precisa dele para usar neste laboratório.

  1. Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. No painel à esquerda do ambiente do notebook, no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.

  2. Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.

Opção "Explorador" destacada no menu "Visualização" expandido

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.

Clique em Verificar meu progresso para conferir o objetivo. Crie uma instância de notebook e clone uma repo de curso

Apache Beam e Cloud Dataflow

Aproximadamente 5 minutos

O Cloud Dataflow é um serviço do Google Cloud Platform totalmente gerenciado para executar pipelines de processamento de dados em lote e por streaming do Apache Beam.

Apache Beam é um avançado modelo de programação de processamento de dados portátil, unificado e de código aberto. Com ele, os usuários finais podem definir pipelines de processamento paralelo de dados em lote e streaming usando Java, Python ou Go. Além disso, os pipelines do Apache Beam podem ser executados na máquina de desenvolvimento local em pequenos conjuntos de dados e em escala no Cloud Dataflow. No entanto, como o Apache Beam tem código aberto, há outros executores. Portanto, é possível executar pipelines do Beam no Apache Flink e no Apache Spark, entre outros.

Diagrama da arquitetura da rede do laboratório

Laboratório 1. como criar um pipeline ETL do zero

Introdução

Nesta seção, você irá criar um pipeline do tipo Extrair-Transformar-Carregar no Apache Beam do zero.

Revisão do conjunto de dados e do caso de uso

Para cada laboratório desta Quest, os dados de entrada são semelhantes aos registros do servidor da Web no formato de registro comum (link em inglês), junto a outros dados que um servidor da Web pode conter. Neste primeiro laboratório, os dados são tratados como uma fonte em lote: nos próximos, eles serão tratados como uma fonte em streaming. Sua tarefa é ler os dados, analisá-los e gravá-los no BigQuery, que é um armazenamento de dados sem servidor, para análise posterior.

Abrir o laboratório apropriado

  • Volte ao terminal no seu ambiente de desenvolvimento integrado e copie e cole o seguinte comando:
cd 1_Basic_ETL/lab export BASE_DIR=$(pwd)

Configurar o ambiente virtual e as dependências

Antes de começar a editar o código do pipeline real, você precisa verificar se instalou as dependências necessárias.

  1. No terminal, crie um ambiente virtual para realizar o trabalho deste laboratório:
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Por fim, verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com

Gravar seu primeiro pipeline

1 hora

Tarefa 1: gerar dados sintéticos

  1. Execute o seguinte comando no terminal para clonar um repositório com scripts para gerar registros sintéticos do servidor da Web:
cd $BASE_DIR/../.. source create_batch_sinks.sh bash generate_batch_events.sh head events.json

O script cria um arquivo chamado events.json com linhas semelhantes a estas:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Em seguida, ele copia automaticamente esse arquivo para o bucket do Google Cloud Storage em .

  1. Em outra guia do navegador, acesse o Google Cloud Storage (link em inglês) e veja se o bucket de armazenamento contém o arquivo events.json.

Clique em Verificar meu progresso para conferir o objetivo. Gerar dados sintéticos

Tarefa 2: ler dados da sua origem

Encontre soluções para dúvidas sobre esta ou outras seções neste link (link em inglês).

  1. No seu Explorador de arquivos, acesse a pasta do laboratório 1_Basic_ETL/lab e clique em my_pipeline.py. O arquivo vai ser aberto em um painel de edição. Verifique se os pacotes a seguir foram importados:
import argparse import time import logging import json import apache_beam as beam from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners import DataflowRunner, DirectRunner
  1. Role para baixo até o método run(). No momento, esse método contém um pipeline que não faz nada. Observe como um objeto Pipeline é criado usando um objeto PipelineOptions (links em inglês) e a linha final do método executa o pipeline:
options = PipelineOptions() # Set options p = beam.Pipeline(options=options) # Do stuff p.run()
  • Todos os dados nos pipelines do Apache Beam residem em PCollections (link em inglês). Para criar a PCollection inicial do pipeline, você precisará aplicar uma transformação de raiz ao objeto de pipeline. A transformação raiz cria uma PCollection de uma fonte de dados externa ou de alguns dados locais que você especificar.

  • Há dois tipos de transformações raiz nos SDKs do Beam: leitura e criação. Leitura: as transformações leem dados de uma fonte externa, como um arquivo de texto ou uma tabela de banco de dados Criação: as transformações criam uma PCollection de uma lista na memória e são especialmente úteis para testes.

O código de exemplo a seguir mostra como aplicar uma transformação raiz ReadFromText para ler dados de um arquivo de texto. A transformação é aplicada a um objeto p do Pipeline e retorna um conjunto de dados de pipeline na forma de uma PCollection[str] usando a notação de dicas de tipo parametrizado (link em inglês). "ReadLines" é o nome da transformação. Ela será útil quando você trabalhar com pipelines maiores:

lines = p | "ReadLines" >> beam.io.ReadFromText("gs://path/to/input.txt")
  1. No método run(), crie uma constante de string chamada "input" e defina o valor dela como gs://<YOUR-PROJECT-ID>/events.json. Em um laboratório futuro, você usará parâmetros de linha de comando para transmitir essas informações.

  2. Crie uma PCollection de strings de todos os eventos no events.json chamando a transformação textio.ReadFromText (link em inglês).

  3. Adicione as instruções de importação adequadas ao topo do my_pipeline.py.

  4. Para salvar seu trabalho, clique em Arquivo e selecione Salvar no menu de navegação superior.

Tarefa 3: executar o pipeline para verificar se ele funciona

  • Volte ao terminal, acesse a pasta $BASE_DIR e execute os seguintes comandos. Antes de executar o pipeline, defina a variável de ambiente PROJECT_ID:
cd $BASE_DIR # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) # Run the pipeline python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DirectRunner

No momento, o pipeline apenas lê dados.

No entanto, a execução dele demonstra um fluxo de trabalho útil. Nele, você verifica o pipeline localmente e de maneira econômica usando o DirectRunner (link em inglês) em execução na máquina local antes de fazer cálculos mais caros. Para executar o pipeline usando o Google Cloud Dataflow, mude runner para DataflowRunner (link em inglês).

Tarefa 4: adicionar a uma transformação

Se tiver dificuldades, veja a solução (link em inglês).

As transformações são o que mudam seus dados. No Apache Beam, as transformações são feitas pela classe PTransform (link em inglês). No ambiente de execução, essas operações serão realizadas em vários workers independentes.

A entrada e a saída para cada PTransform são uma PCollection. Na verdade, talvez você não tenha percebido, mas já usou uma PTransform ao ler dados do Google Cloud Storage. Uma PCollection de strings foi criada, independentemente da atribuição a uma variável.

Como o Beam usa um método de aplicação genérico para PCollections, representado pelo operador de barra vertical | no Python, é possível encadear transformações de maneira sequencial. Por exemplo, é possível encadear transformações para criar um pipeline sequencial como este:

[Output_PCollection] = ([Input_PCollection] | [First Transform] | [Second Transform] | [Third Transform])

Nesta tarefa, você usará um novo tipo de transformação: ParDo (link em inglês). ParDo é uma transformação do Beam para processamento paralelo genérico.

O paradigma de processamento de ParDo é semelhante à fase "Map" de um algoritmo no estilo Mapear/Embaralhar/Reduzir: uma transformação ParDo considera cada elemento na PCollection de entrada, realiza alguma função de processamento (seu código de usuário) nesse elemento e emite zero, um ou vários elementos a uma PCollection de saída.

O ParDo é útil para várias operações comuns de processamento de dados. No entanto, há PTransforms especiais no Python para tornar o processo mais simples, incluindo os seguintes:

  • Filtragem de um conjunto de dados. É possível usar Filter para considerar cada elemento em uma PCollection e enviar o elemento para uma nova PCollection, ou descartá-lo dependendo do resultado de um Python chamável que retorna um valor booleano.
  • Formatação ou conversão do tipo de cada elemento de um conjunto de dados. Se a PCollection de entrada tiver elementos de um tipo ou formato diferente do que você quer, use Map para realizar uma conversão em cada elemento e enviar o resultado para uma nova PCollection.
  • Extração de partes de cada elemento de um conjunto de dados. Se você tem uma PCollection de registros com vários campos, por exemplo, também é possível usar Map ou FlatMap para analisar apenas os campos que quer considerar em uma nova PCollection.
  • Cálculos de cada elemento de um conjunto de dados. É possível usar ParDo, Map ou FlatMap para executar cálculos simples ou complexos em cada elemento (ou determinados elementos) de uma PCollection e gerar os resultados como uma nova PCollection.

Para concluir esta tarefa, você precisa escrever uma transformação Map que leia em uma string JSON representando um evento único, analise usando o pacote json de Python e gere os resultados que o dicionário retornou em json.loads.

As funções Map podem ser implementadas de duas maneiras: in-line ou com uma chamada predefinida. Escreva funções Map in-line desta forma:

p | beam.Map(lambda x : something(x))

Como alternativa, é possível usar beam.Map com uma função Python chamável definida anteriormente no script:

def something(x): y = # Do something! return y p | beam.Map(something)

Se você precisar de mais flexibilidade do que a oferecida por beam.Map (e outros DoFns leves), implemente ParDo com DoFns personalizados que são da subclasse DoFn (link em inglês). Isso facilita a integração com frameworks de teste.

class MyDoFn(beam.DoFn): def process(self, element): output = #Do Something! yield output p | beam.ParDo(MyDoFn())

Se tiver dificuldades, confira a solução (link em inglês).

Tarefa 5: escrever para um coletor

Neste ponto, o pipeline lê um arquivo do Google Cloud Storage, analisa cada linha e gera um dicionário Python para cada elemento. O próximo passo é gravar esses objetos em uma tabela do BigQuery.

  1. Embora seja possível instruir o pipeline a criar uma tabela do BigQuery quando necessário, você precisará criar o conjunto de dados com antecedência. Isso já foi feito pelo script generate_batch_events.sh. É possível examinar o conjunto de dados com o seguinte código:
# Examine dataset bq ls # No tables yet bq ls logs

Para gerar as PCollections finais do pipeline, aplique uma transformação Write a essa PCollection. As transformações Write podem gerar elementos de uma PCollection​ em um coletor de dados externo, como uma tabela de banco de dados. É possível usar Write para gerar uma PCollection a qualquer momento no pipeline, embora o normal seja gravar os dados no final do pipeline.

O código de exemplo a seguir mostra como aplicar uma transformação WriteToText para gravar uma PCollection de string em um arquivo de texto:

p | "WriteMyFile" >> beam.io.WriteToText("gs://path/to/output")
  1. Nesse caso, em vez de usar WriteToText, use WriteToBigQuery (link em inglês).

Essa função exige vários fatores, como a tabela específica a ser gravada e o esquema dela. Como alternativa, especifique se você pretende anexar a uma tabela, recriar tabelas (útil na iteração inicial do pipeline) ou criar a tabela. Por padrão, essa transformação cria tabelas que não existem e não grava em tabelas que não estejam vazias.

  1. No entanto, precisamos especificar nosso esquema. Há duas maneiras de fazer isso. É possível especificar o esquema como uma única string ou no formato JSON. Por exemplo, suponha que o dicionário tenha três campos: nome (do tipo str), ID (do tipo int) e saldo (do tipo float). Em seguida, podemos especificar o esquema em uma única linha:
table_schema = 'name:STRING,id:INTEGER,balance:FLOAT'

Ou especifique como JSON:

table_schema = { "fields": [ { "name": "name", "type": "STRING" }, { "name": "id", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "balance", "type": "FLOAT", "mode": "REQUIRED" } ] }

No primeiro caso (a string única), todos os campos são considerados NULLABLE. Ao usar a abordagem JSON, é possível especificar o modo.

  1. Depois de definir o esquema da tabela, podemos adicionar o coletor ao DAG:
p | 'WriteToBQ' >> beam.io.WriteToBigQuery( 'project:dataset.table', schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) Observação: WRITE_TRUNCATE sempre exclui e recria sua tabela. Isso é útil na iteração inicial do pipeline, especialmente à medida que você faz iterações no esquema. No entanto, isso pode causar problemas não intencionais na produção com facilidade. As opções WRITE_APPEND ou WRITE_EMPTY são mais seguras.

Lembre-se de definir o esquema da tabela e adicionar o coletor do BigQuery ao seu pipeline. Se tiver dificuldades, confira a solução (link em inglês).

Tarefa 6: executar o pipeline

  1. Volte ao terminal e execute o pipeline com o mesmo comando usado anteriormente. No entanto, agora use o DataflowRunner para executar o pipeline no Cloud Dataflow.
# Set up environment variables cd $BASE_DIR export PROJECT_ID=$(gcloud config get-value project) # Run the pipelines python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DataflowRunner

A forma geral precisa ter um único caminho, começando pela transformação Read e terminando com a transformação Write. À medida que o pipeline é executado, os workers são adicionados automaticamente, enquanto o serviço determina as necessidades do pipeline, e desaparecem quando não são mais necessários. É possível encontrar esse processo ao acessar o Compute Engine (link em inglês) e acessar as máquinas virtuais criadas pelo serviço Dataflow.

Observação: se o pipeline estiver sendo criado e você notar muitos erros devido ao código ou à configuração incorreta no serviço do Dataflow, defina runner novamente como DirectRunner para que ele seja executado no local e receba feedback mais rápido. Nesse caso, essa abordagem funciona porque o conjunto de dados é pequeno e você não está usando recursos incompatíveis com o DirectRunner.
  1. Quando o pipeline terminar, volte à janela do navegador do BigQuery e consulte a tabela.

Se o código não estiver funcionando conforme o esperado e você não souber o que fazer, confira a solução (link em inglês).

Clique em Verificar meu progresso para conferir o objetivo. Execute o pipeline

Laboratório parte 2: parametrização de ETL básico

Aproximadamente 20 minutos

Grande parte do trabalho dos engenheiros de dados é previsível, como jobs recorrentes, ou é semelhante a outros trabalhos. No entanto, o processo para executar pipelines exige experiência em engenharia. Veja as etapas que você acabou de concluir:

  1. Você criou um ambiente de desenvolvimento e desenvolveu um pipeline. O ambiente incluiu o SDK do Apache Beam e outras dependências.
  2. Você executou o pipeline do ambiente de desenvolvimento. O SDK do Apache Beam preparou arquivos no Cloud Storage, criou um arquivo de solicitação de job e enviou o arquivo ao serviço do Cloud Dataflow.

Seria muito melhor se houvesse uma maneira de iniciar um job por uma chamada de API ou sem ter que configurar um ambiente de desenvolvimento, já que usuários sem conhecimentos técnicos não conseguiriam realizar esse processo. Com isso, também seria possível executar pipelines.

Os modelos do Dataflow tentam resolver esse problema mudando a representação criada quando um pipeline é compilado para parametrização. Infelizmente, esse processo não é tão simples quanto a exposição de parâmetros de linha de comando, mas isso será feito em outros laboratórios. Com os modelos do Dataflow, o fluxo de trabalho acima fica da seguinte forma:

  1. Os desenvolvedores criam um ambiente de desenvolvimento e elaboram o pipeline. O ambiente inclui o SDK do Apache Beam e outras dependências.
  2. Os desenvolvedores executam o pipeline e criam um modelo. O SDK do Apache Beam organiza arquivos no Cloud Storage, cria um arquivo de modelo (semelhante ao de solicitação de job) e o salva no Cloud Storage.
  3. Usuários que não são desenvolvedores ou outras ferramentas de fluxo de trabalho, como o Airflow, podem executar jobs facilmente com o Console do Google Cloud, a ferramenta de linha de comando gcloud ou a API REST para enviar solicitações de execução de arquivos de modelo para o serviço do Cloud Dataflow.

Neste laboratório, você vai praticar o uso de um dos diversos modelos do Dataflow criados pelo Google para realizar a mesma tarefa do pipeline criado na parte 1.

Tarefa 1: criar um arquivo do esquema JSON

Assim como antes, você precisa passar um modelo Dataflow para um arquivo JSON que represente o esquema neste exemplo.

  1. Volte ao terminal no seu ambiente de desenvolvimento integrado. Execute os comandos a seguir para voltar ao diretório principal. Em seguida, copie o esquema da sua tabela logs.logs:
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. Agora capture esse resultado em um arquivo e faça upload para o GCS. Os comandos sed extras são para criar um objeto JSON completo esperado pelo Dataflow.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

Clique em Verificar meu progresso para conferir o objetivo. Como criar um arquivo de esquema JSON

Tarefa 2: escrever uma função definida pelo usuário com JavaScript

O modelo do Dataflow do Cloud Storage para o BigQuery exige uma função JavaScript para converter o texto bruto em um JSON válido. Nesse caso, cada linha de texto é um JSON válido, e a função não tem tanta importância.

  1. Para concluir esta tarefa, crie um novo arquivo na pasta dataflow_python no Explorador de arquivos do seu ambiente de desenvolvimento integrado.

  2. Para criar um Novo arquivo, clique em Arquivo >> Novo >> Arquivo de texto.

  3. Clique com o botão direito do mouse para renomear o arquivo como transform.js.

  4. Abra o arquivo transform.js no painel de edição e clique nele para abri-lo.

  5. Copie a função abaixo para seu próprio arquivo transform.js e salve:

function transform(line) { return line; }
  1. Depois, execute o comando a seguir para copiar o arquivo no Google Cloud Storage:
export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

Clique em Verificar meu progresso para conferir o objetivo. Escreva uma função JavaScript definida pelo usuário em um arquivo JavaScript

Tarefa 3: executar um modelo do Dataflow

  1. Acesse a IU da Web do Cloud Dataflow (link em inglês).
  2. Clique em CRIAR JOB A PARTIR DO MODELO.
  3. Digite um nome para o job do Cloud Dataflow.
  4. Em Modelo do Dataflow, selecione o modelo de Arquivos de texto no Cloud Storage para o BigQuery na seção Processar dados em massa (lote). NÃO use a seção de streaming.
  5. No arquivo de entrada do Cloud Storage, insira o caminho para events.json no formato
  6. No local do Cloud Storage do seu arquivo de esquema do BigQuery, grave o caminho para o arquivo schema.json no formato
  7. Em Tabela de saída do BigQuery, digite
  8. Em Diretório temporário do BigQuery, insira uma nova pasta no mesmo bucket. Ela será criada pelo job.
  9. Em Local temporário, insira uma segunda pasta nova nesse bucket.
  10. Deixe a Criptografia como Chave de criptografia gerenciada pelo Google.
  11. Clique para abrir Parâmetros opcionais.
  12. No Caminho da UDF em JavaScript no Cloud Storage, digite o caminho para .js no formato
  13. Em Nome da UDF em JavaScript, digite transform.
  14. Clique no botão Executar job.

Durante a execução do job, é possível inspecioná-lo na IU da Web do Dataflow.

Clique em Verificar meu progresso para conferir o objetivo. Executar um modelo do Dataflow

Tarefa 4: inspecionar o código do modelo do Dataflow

O código do modelo do Dataflow que você acabou de usar está neste guia TextIOToBigQuery (link em inglês).

  • Role para baixo até encontrar o método principal. O código precisa ser semelhante ao pipeline que você criou.

    • Ele começa com um objeto Pipeline, que foi criado usando um objeto PipelineOptions.
    • Ele consiste em uma cadeia de PTransforms, começando com uma transformação TextIO.read().
    • A PTransform fica um pouco diferente após a transformação de leitura (link em inglês). Ela permite usar JavaScript para transformar as strings de entrada se, por exemplo, o formato de origem não estiver bem alinhado com o formato da tabela do BigQuery. Para acessar a documentação sobre como usar esse recurso, consulte esta página (em inglês).
    • Depois da UDF em JavaScript, a PTransform usa uma função de biblioteca para converter o JSON em um tablerow. Saiba mais sobre esse código nesta página (links em inglês).
    • O método Write PTransform (link em inglês) é um pouco diferente porque, em vez de usar um esquema conhecido no momento da compilação do gráfico, o código serve para aceitar parâmetros que só serão conhecidos no ambiente de execução. Isso só é possível devido à classe NestedValueProvider (link em inglês).

Confira o próximo laboratório, que aborda a criação de pipelines que não são simplesmente cadeias de PTransforms e como adaptar um pipeline que você criou para ser um modelo personalizado do Dataflow.

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.

Este conteúdo não está disponível no momento

We will notify you via email when it becomes available

Ótimo!

We will contact you via email if it becomes available