Checkpoints
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
Processamento de dados sem servidor com o Dataflow: como criar um Pipeline ETL usando Apache Beam e Cloud Dataflow (Python)
- Informações gerais
- Configuração e requisitos
- Laboratório 1. como criar um pipeline ETL do zero
- Tarefa 1: gerar dados sintéticos
- Tarefa 2: ler dados da sua origem
- Tarefa 3: executar o pipeline para verificar se ele funciona
- Tarefa 4: adicionar a uma transformação
- Tarefa 5: escrever para um coletor
- Tarefa 6: executar o pipeline
- Laboratório parte 2: parametrização de ETL básico
- Tarefa 1: criar um arquivo do esquema JSON
- Tarefa 2: escrever uma função definida pelo usuário com JavaScript
- Tarefa 3: executar um modelo do Dataflow
- Tarefa 4: inspecionar o código do modelo do Dataflow
- Finalize o laboratório
Informações gerais
Neste laboratório, vamos mostrar como:
- criar um pipeline do tipo Extrair-Transformar-Carregar (ETL) em lote no Apache Beam, que grava os dados brutos do Google Cloud Storage no Google BigQuery;
- executar o pipeline do Apache Beam no Cloud Dataflow;
- parametrizar a execução do pipeline.
Pré-requisitos:
- Conhecimento básico do Python
Configuração e requisitos
Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.
-
Faça login no Qwiklabs em uma janela anônima.
-
Confira o tempo de acesso do laboratório (por exemplo,
1:15:00
) e finalize todas as atividades nesse prazo.
Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas. -
Quando tudo estiver pronto, clique em Começar o laboratório.
-
Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.
-
Clique em Abrir Console do Google.
-
Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
Se você usar outras credenciais, vai receber mensagens de erro ou cobranças. -
Aceite os termos e pule a página de recursos de recuperação.
Verifique as permissões do projeto
Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).
-
No console do Google Cloud, em Menu de navegação (), selecione IAM e administrador > IAM.
-
Confira se a conta de serviço padrão do Compute
{project-number}-compute@developer.gserviceaccount.com
está na lista e recebeu o papel deeditor
. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.
editor
, siga as etapas abaixo.- No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
- Copie o número do projeto, por exemplo,
729328892908
. - Em Menu de navegação, clique em IAM e administrador > IAM.
- Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
- Em Novos principais, digite:
- Substitua
{project-number}
pelo número do seu projeto. - Em Papel, selecione Projeto (ou Básico) > Editor.
- Clique em Save.
Configuração do ambiente de desenvolvimento baseado em notebook do Jupyter
Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.
-
No console do Google Cloud, abra o Menu de navegação e selecione Vertex AI > Workbench.
-
Ative a API Notebooks.
-
Na página "Workbench", clique em CRIAR NOVA.
-
Na caixa de diálogo Nova instância, defina a região como
e a zona como . -
Em "Ambiente", selecione Apache Beam.
-
Clique em CRIAR na parte de baixo da caixa de diálogo.
- Depois, clique no link ABRIR O JUPYTERLAB ao lado do nome do seu notebook para abrir o ambiente em uma nova guia do seu navegador.
- Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.
Faça o download do repositório de código
Agora você precisa dele para usar neste laboratório.
- Insira este comando no terminal que você abriu:
-
No painel à esquerda do ambiente do notebook, no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.
-
Acesse o repositório clonado
/training-data-analyst/quests/dataflow_python/
. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas:lab
, que contém o código que precisa ser concluído, esolution
, que inclui um exemplo prático caso você precise de ajuda.
Clique em Verificar meu progresso para conferir o objetivo.
Apache Beam e Cloud Dataflow
Aproximadamente 5 minutos
O Cloud Dataflow é um serviço do Google Cloud Platform totalmente gerenciado para executar pipelines de processamento de dados em lote e por streaming do Apache Beam.
Apache Beam é um avançado modelo de programação de processamento de dados portátil, unificado e de código aberto. Com ele, os usuários finais podem definir pipelines de processamento paralelo de dados em lote e streaming usando Java, Python ou Go. Além disso, os pipelines do Apache Beam podem ser executados na máquina de desenvolvimento local em pequenos conjuntos de dados e em escala no Cloud Dataflow. No entanto, como o Apache Beam tem código aberto, há outros executores. Portanto, é possível executar pipelines do Beam no Apache Flink e no Apache Spark, entre outros.
Laboratório 1. como criar um pipeline ETL do zero
Introdução
Nesta seção, você irá criar um pipeline do tipo Extrair-Transformar-Carregar no Apache Beam do zero.
Revisão do conjunto de dados e do caso de uso
Para cada laboratório desta Quest, os dados de entrada são semelhantes aos registros do servidor da Web no formato de registro comum (link em inglês), junto a outros dados que um servidor da Web pode conter. Neste primeiro laboratório, os dados são tratados como uma fonte em lote: nos próximos, eles serão tratados como uma fonte em streaming. Sua tarefa é ler os dados, analisá-los e gravá-los no BigQuery, que é um armazenamento de dados sem servidor, para análise posterior.
Abrir o laboratório apropriado
- Volte ao terminal no seu ambiente de desenvolvimento integrado e copie e cole o seguinte comando:
Configurar o ambiente virtual e as dependências
Antes de começar a editar o código do pipeline real, você precisa verificar se instalou as dependências necessárias.
- No terminal, crie um ambiente virtual para realizar o trabalho deste laboratório:
- Em seguida, instale os pacotes necessários para executar seu pipeline:
- Por fim, verifique se a API Dataflow está ativada:
Gravar seu primeiro pipeline
1 hora
Tarefa 1: gerar dados sintéticos
- Execute o seguinte comando no terminal para clonar um repositório com scripts para gerar registros sintéticos do servidor da Web:
O script cria um arquivo chamado events.json
com linhas semelhantes a estas:
Em seguida, ele copia automaticamente esse arquivo para o bucket do Google Cloud Storage em
- Em outra guia do navegador, acesse o Google Cloud Storage (link em inglês) e veja se o bucket de armazenamento contém o arquivo
events.json
.
Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 2: ler dados da sua origem
Encontre soluções para dúvidas sobre esta ou outras seções neste link (link em inglês).
- No seu Explorador de arquivos, acesse a pasta do laboratório
1_Basic_ETL/lab
e clique em my_pipeline.py. O arquivo vai ser aberto em um painel de edição. Verifique se os pacotes a seguir foram importados:
- Role para baixo até o método
run()
. No momento, esse método contém um pipeline que não faz nada. Observe como um objeto Pipeline é criado usando um objeto PipelineOptions (links em inglês) e a linha final do método executa o pipeline:
-
Todos os dados nos pipelines do Apache Beam residem em PCollections (link em inglês). Para criar a
PCollection
inicial do pipeline, você precisará aplicar uma transformação de raiz ao objeto de pipeline. A transformação raiz cria umaPCollection
de uma fonte de dados externa ou de alguns dados locais que você especificar. -
Há dois tipos de transformações raiz nos SDKs do Beam: leitura e criação. Leitura: as transformações leem dados de uma fonte externa, como um arquivo de texto ou uma tabela de banco de dados Criação: as transformações criam uma
PCollection
de umalista
na memória e são especialmente úteis para testes.
O código de exemplo a seguir mostra como aplicar uma transformação raiz ReadFromText
para ler dados de um arquivo de texto. A transformação é aplicada a um objeto p
do Pipeline
e retorna um conjunto de dados de pipeline na forma de uma PCollection[str]
usando a notação de dicas de tipo parametrizado (link em inglês). "ReadLines" é o nome da transformação. Ela será útil quando você trabalhar com pipelines maiores:
-
No método
run()
, crie uma constante de string chamada "input" e defina o valor dela comogs://<YOUR-PROJECT-ID>/events.json
. Em um laboratório futuro, você usará parâmetros de linha de comando para transmitir essas informações. -
Crie uma
PCollection
de strings de todos os eventos noevents.json
chamando a transformaçãotextio.ReadFromText
(link em inglês). -
Adicione as instruções de importação adequadas ao topo do
my_pipeline.py
. -
Para salvar seu trabalho, clique em Arquivo e selecione Salvar no menu de navegação superior.
Tarefa 3: executar o pipeline para verificar se ele funciona
- Volte ao terminal, acesse a pasta
$BASE_DIR
e execute os seguintes comandos. Antes de executar o pipeline, defina a variável de ambientePROJECT_ID
:
No momento, o pipeline apenas lê dados.
No entanto, a execução dele demonstra um fluxo de trabalho útil. Nele, você verifica o pipeline localmente e de maneira econômica usando o DirectRunner (link em inglês) em execução na máquina local antes de fazer cálculos mais caros. Para executar o pipeline usando o Google Cloud Dataflow, mude runner
para DataflowRunner (link em inglês).
Tarefa 4: adicionar a uma transformação
Se tiver dificuldades, veja a solução (link em inglês).
As transformações são o que mudam seus dados. No Apache Beam, as transformações são feitas pela classe PTransform (link em inglês). No ambiente de execução, essas operações serão realizadas em vários workers independentes.
A entrada e a saída para cada PTransform
são uma PCollection
. Na verdade, talvez você não tenha percebido, mas já usou uma PTransform
ao ler dados do Google Cloud Storage. Uma PCollection
de strings foi criada, independentemente da atribuição a uma variável.
Como o Beam usa um método de aplicação genérico para PCollection
s, representado pelo operador de barra vertical |
no Python, é possível encadear transformações de maneira sequencial. Por exemplo, é possível encadear transformações para criar um pipeline sequencial como este:
Nesta tarefa, você usará um novo tipo de transformação: ParDo (link em inglês). ParDo
é uma transformação do Beam para processamento paralelo genérico.
O paradigma de processamento de ParDo
é semelhante à fase "Map" de um algoritmo no estilo Mapear/Embaralhar/Reduzir: uma transformação ParDo
considera cada elemento na PCollection
de entrada, realiza alguma função de processamento (seu código de usuário) nesse elemento e emite zero, um ou vários elementos a uma PCollection
de saída.
O ParDo
é útil para várias operações comuns de processamento de dados. No entanto, há PTransform
s especiais no Python para tornar o processo mais simples, incluindo os seguintes:
-
Filtragem de um conjunto de dados. É possível usar
Filter
para considerar cada elemento em umaPCollection
e enviar o elemento para uma novaPCollection
, ou descartá-lo dependendo do resultado de um Python chamável que retorna um valor booleano. -
Formatação ou conversão do tipo de cada elemento de um conjunto de dados. Se a
PCollection
de entrada tiver elementos de um tipo ou formato diferente do que você quer, useMap
para realizar uma conversão em cada elemento e enviar o resultado para uma novaPCollection
. -
Extração de partes de cada elemento de um conjunto de dados. Se você tem uma
PCollection
de registros com vários campos, por exemplo, também é possível usarMap
ouFlatMap
para analisar apenas os campos que quer considerar em uma novaPCollection
. -
Cálculos de cada elemento de um conjunto de dados. É possível usar
ParDo
,Map
ouFlatMap
para executar cálculos simples ou complexos em cada elemento (ou determinados elementos) de umaPCollection
e gerar os resultados como uma novaPCollection
.
Para concluir esta tarefa, você precisa escrever uma transformação Map
que leia em uma string JSON representando um evento único, analise usando o pacote json
de Python e gere os resultados que o dicionário retornou em json.loads
.
As funções Map
podem ser implementadas de duas maneiras: in-line ou com uma chamada predefinida. Escreva funções Map
in-line desta forma:
Como alternativa, é possível usar beam.Map
com uma função Python chamável definida anteriormente no script:
Se você precisar de mais flexibilidade do que a oferecida por beam.Map
(e outros DoFn
s leves), implemente ParDo
com DoFn
s personalizados que são da subclasse DoFn (link em inglês). Isso facilita a integração com frameworks de teste.
Se tiver dificuldades, confira a solução (link em inglês).
Tarefa 5: escrever para um coletor
Neste ponto, o pipeline lê um arquivo do Google Cloud Storage, analisa cada linha e gera um dicionário Python para cada elemento. O próximo passo é gravar esses objetos em uma tabela do BigQuery.
- Embora seja possível instruir o pipeline a criar uma tabela do BigQuery quando necessário, você precisará criar o conjunto de dados com antecedência. Isso já foi feito pelo script
generate_batch_events.sh
. É possível examinar o conjunto de dados com o seguinte código:
Para gerar as PCollection
s finais do pipeline, aplique uma transformação Write a essa PCollection
. As transformações Write podem gerar elementos de uma PCollection
em um coletor de dados externo, como uma tabela de banco de dados. É possível usar Write para gerar uma PCollection
a qualquer momento no pipeline, embora o normal seja gravar os dados no final do pipeline.
O código de exemplo a seguir mostra como aplicar uma transformação WriteToText
para gravar uma PCollection
de string em um arquivo de texto:
- Nesse caso, em vez de usar
WriteToText
, useWriteToBigQuery
(link em inglês).
Essa função exige vários fatores, como a tabela específica a ser gravada e o esquema dela. Como alternativa, especifique se você pretende anexar a uma tabela, recriar tabelas (útil na iteração inicial do pipeline) ou criar a tabela. Por padrão, essa transformação cria tabelas que não existem e não grava em tabelas que não estejam vazias.
- No entanto, precisamos especificar nosso esquema. Há duas maneiras de fazer isso. É possível especificar o esquema como uma única string ou no formato JSON. Por exemplo, suponha que o dicionário tenha três campos: nome (do tipo
str
), ID (do tipoint
) e saldo (do tipofloat
). Em seguida, podemos especificar o esquema em uma única linha:
Ou especifique como JSON:
No primeiro caso (a string única), todos os campos são considerados NULLABLE
. Ao usar a abordagem JSON, é possível especificar o modo.
- Depois de definir o esquema da tabela, podemos adicionar o coletor ao DAG:
WRITE_TRUNCATE
sempre exclui e recria sua tabela. Isso é útil na iteração inicial do pipeline, especialmente à medida que você faz iterações no esquema. No entanto, isso pode causar problemas não intencionais na produção com facilidade. As opções WRITE_APPEND
ou WRITE_EMPTY
são mais seguras.Lembre-se de definir o esquema da tabela e adicionar o coletor do BigQuery ao seu pipeline. Se tiver dificuldades, confira a solução (link em inglês).
Tarefa 6: executar o pipeline
- Volte ao terminal e execute o pipeline com o mesmo comando usado anteriormente. No entanto, agora use o
DataflowRunner
para executar o pipeline no Cloud Dataflow.
A forma geral precisa ter um único caminho, começando pela transformação Read
e terminando com a transformação Write
. À medida que o pipeline é executado, os workers são adicionados automaticamente, enquanto o serviço determina as necessidades do pipeline, e desaparecem quando não são mais necessários. É possível encontrar esse processo ao acessar o Compute Engine (link em inglês) e acessar as máquinas virtuais criadas pelo serviço Dataflow.
runner
novamente como DirectRunner
para que ele seja executado no local e receba feedback mais rápido. Nesse caso, essa abordagem funciona porque o conjunto de dados é pequeno e você não está usando recursos incompatíveis com o DirectRunner
.
- Quando o pipeline terminar, volte à janela do navegador do BigQuery e consulte a tabela.
Se o código não estiver funcionando conforme o esperado e você não souber o que fazer, confira a solução (link em inglês).
Clique em Verificar meu progresso para conferir o objetivo.
Laboratório parte 2: parametrização de ETL básico
Aproximadamente 20 minutos
Grande parte do trabalho dos engenheiros de dados é previsível, como jobs recorrentes, ou é semelhante a outros trabalhos. No entanto, o processo para executar pipelines exige experiência em engenharia. Veja as etapas que você acabou de concluir:
- Você criou um ambiente de desenvolvimento e desenvolveu um pipeline. O ambiente incluiu o SDK do Apache Beam e outras dependências.
- Você executou o pipeline do ambiente de desenvolvimento. O SDK do Apache Beam preparou arquivos no Cloud Storage, criou um arquivo de solicitação de job e enviou o arquivo ao serviço do Cloud Dataflow.
Seria muito melhor se houvesse uma maneira de iniciar um job por uma chamada de API ou sem ter que configurar um ambiente de desenvolvimento, já que usuários sem conhecimentos técnicos não conseguiriam realizar esse processo. Com isso, também seria possível executar pipelines.
Os modelos do Dataflow tentam resolver esse problema mudando a representação criada quando um pipeline é compilado para parametrização. Infelizmente, esse processo não é tão simples quanto a exposição de parâmetros de linha de comando, mas isso será feito em outros laboratórios. Com os modelos do Dataflow, o fluxo de trabalho acima fica da seguinte forma:
- Os desenvolvedores criam um ambiente de desenvolvimento e elaboram o pipeline. O ambiente inclui o SDK do Apache Beam e outras dependências.
- Os desenvolvedores executam o pipeline e criam um modelo. O SDK do Apache Beam organiza arquivos no Cloud Storage, cria um arquivo de modelo (semelhante ao de solicitação de job) e o salva no Cloud Storage.
- Usuários que não são desenvolvedores ou outras ferramentas de fluxo de trabalho, como o Airflow, podem executar jobs facilmente com o Console do Google Cloud, a ferramenta de linha de comando gcloud ou a API REST para enviar solicitações de execução de arquivos de modelo para o serviço do Cloud Dataflow.
Neste laboratório, você vai praticar o uso de um dos diversos modelos do Dataflow criados pelo Google para realizar a mesma tarefa do pipeline criado na parte 1.
Tarefa 1: criar um arquivo do esquema JSON
Assim como antes, você precisa passar um modelo Dataflow para um arquivo JSON que represente o esquema neste exemplo.
- Volte ao terminal no seu ambiente de desenvolvimento integrado. Execute os comandos a seguir para voltar ao diretório principal. Em seguida, copie o esquema da sua tabela
logs.logs
:
- Agora capture esse resultado em um arquivo e faça upload para o GCS. Os comandos
sed
extras são para criar um objeto JSON completo esperado pelo Dataflow.
Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 2: escrever uma função definida pelo usuário com JavaScript
O modelo do Dataflow do Cloud Storage para o BigQuery exige uma função JavaScript para converter o texto bruto em um JSON válido. Nesse caso, cada linha de texto é um JSON válido, e a função não tem tanta importância.
-
Para concluir esta tarefa, crie um novo arquivo na pasta dataflow_python no Explorador de arquivos do seu ambiente de desenvolvimento integrado.
-
Para criar um Novo arquivo, clique em Arquivo >> Novo >> Arquivo de texto.
-
Clique com o botão direito do mouse para renomear o arquivo como transform.js.
-
Abra o arquivo transform.js no painel de edição e clique nele para abri-lo.
-
Copie a função abaixo para seu próprio arquivo transform.js e salve:
- Depois, execute o comando a seguir para copiar o arquivo no Google Cloud Storage:
Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 3: executar um modelo do Dataflow
- Acesse a IU da Web do Cloud Dataflow (link em inglês).
- Clique em CRIAR JOB A PARTIR DO MODELO.
- Digite um nome para o job do Cloud Dataflow.
- Em Modelo do Dataflow, selecione o modelo de Arquivos de texto no Cloud Storage para o BigQuery na seção Processar dados em massa (lote). NÃO use a seção de streaming.
- No arquivo de entrada do Cloud Storage, insira o caminho para
events.json
no formato - No local do Cloud Storage do seu arquivo de esquema do BigQuery, grave o caminho para o arquivo
schema.json
no formato - Em Tabela de saída do BigQuery, digite
- Em Diretório temporário do BigQuery, insira uma nova pasta no mesmo bucket. Ela será criada pelo job.
- Em Local temporário, insira uma segunda pasta nova nesse bucket.
- Deixe a Criptografia como Chave de criptografia gerenciada pelo Google.
- Clique para abrir Parâmetros opcionais.
- No Caminho da UDF em JavaScript no Cloud Storage, digite o caminho para
.js
no formato - Em Nome da UDF em JavaScript, digite
transform
. - Clique no botão Executar job.
Durante a execução do job, é possível inspecioná-lo na IU da Web do Dataflow.
Clique em Verificar meu progresso para conferir o objetivo.
Tarefa 4: inspecionar o código do modelo do Dataflow
O código do modelo do Dataflow que você acabou de usar está neste guia TextIOToBigQuery (link em inglês).
-
Role para baixo até encontrar o método principal. O código precisa ser semelhante ao pipeline que você criou.
- Ele começa com um objeto
Pipeline
, que foi criado usando um objetoPipelineOptions
. - Ele consiste em uma cadeia de
PTransform
s, começando com uma transformação TextIO.read(). - A PTransform fica um pouco diferente após a transformação de leitura (link em inglês). Ela permite usar JavaScript para transformar as strings de entrada se, por exemplo, o formato de origem não estiver bem alinhado com o formato da tabela do BigQuery. Para acessar a documentação sobre como usar esse recurso, consulte esta página (em inglês).
- Depois da UDF em JavaScript, a PTransform usa uma função de biblioteca para converter o JSON em um tablerow. Saiba mais sobre esse código nesta página (links em inglês).
- O método Write PTransform (link em inglês) é um pouco diferente porque, em vez de usar um esquema conhecido no momento da compilação do gráfico, o código serve para aceitar parâmetros que só serão conhecidos no ambiente de execução. Isso só é possível devido à classe NestedValueProvider (link em inglês).
- Ele começa com um objeto
Confira o próximo laboratório, que aborda a criação de pipelines que não são simplesmente cadeias de PTransform
s e como adaptar um pipeline que você criou para ser um modelo personalizado do Dataflow.
Finalize o laboratório
Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.
Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.
O número de estrelas indica o seguinte:
- 1 estrela = muito insatisfeito
- 2 estrelas = insatisfeito
- 3 estrelas = neutro
- 4 estrelas = satisfeito
- 5 estrelas = muito satisfeito
Feche a caixa de diálogo se não quiser enviar feedback.
Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.
Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.