arrow_back

Processamento de dados sem servidor com o Dataflow - Pipelines com ramificações (Python)

Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Processamento de dados sem servidor com o Dataflow - Pipelines com ramificações (Python)

Laboratório 2 horas universal_currency_alt 5 créditos show_chart Avançado
info Este laboratório pode incorporar ferramentas de IA para ajudar no seu aprendizado.
Teste e compartilhe seu conhecimento com nossa comunidade.
done
Tenha acesso a mais de 700 laboratórios, selos de habilidade e cursos

Informações gerais

Neste laboratório, você vai:

  • implementar um pipeline com ramificações;
  • filtrar os dados antes da gravação;
  • adicionar parâmetros de linha de comando personalizados a um pipeline.

Pré-requisitos:

  • Conhecimento básico do Python

No laboratório anterior, você criou um pipeline básico sequencial de extração, transformação e carga, e usou um modelo equivalente do Dataflow para ingerir o armazenamento de dados em lote no Google Cloud Storage. Esse pipeline se baseia em uma sequência de transformações:

Diagrama de fluxo do pipeline com elementos que fluem na seguinte ordem: entrada, transformação, PCollection, transformação, PCollection, transformação, saída.

No entanto, muitos pipelines não vão exibir uma estrutura tão simples. Neste laboratório, você vai criar um pipeline mais sofisticado e não sequencial.

O caso de uso aqui é otimizar o consumo de recursos. Os produtos variam de acordo com a maneira que consomem recursos. Além disso, nem todos os dados são usados do mesmo modo em uma empresa. Algumas informações serão consultadas regularmente, por exemplo, em cargas de trabalho analíticas, e outras serão usadas apenas para recuperação. Neste laboratório, você vai otimizar o pipeline do primeiro laboratório para o consumo de recursos ao armazenar apenas dados que os analistas usarão no BigQuery e, ao mesmo tempo, arquivar outros dados em um serviço de armazenamento altamente durável e de baixo custo, o Coldline Storage do Google Cloud Storage.

Configuração e requisitos

Para cada laboratório, você recebe um novo projeto do Google Cloud e um conjunto de recursos por um determinado período e sem custos financeiros.

  1. Faça login no Qwiklabs em uma janela anônima.

  2. Confira o tempo de acesso do laboratório (por exemplo, 1:15:00) e finalize todas as atividades nesse prazo.
    Não é possível pausar o laboratório. Você pode reiniciar o desafio, mas vai precisar refazer todas as etapas.

  3. Quando tudo estiver pronto, clique em Começar o laboratório.

  4. Anote as credenciais (Nome de usuário e Senha). É com elas que você vai fazer login no Console do Google Cloud.

  5. Clique em Abrir Console do Google.

  6. Clique em Usar outra conta, depois copie e cole as credenciais deste laboratório nos locais indicados.
    Se você usar outras credenciais, vai receber mensagens de erro ou cobranças.

  7. Aceite os termos e pule a página de recursos de recuperação.

Verifique as permissões do projeto

Antes de começar a trabalhar no Google Cloud, veja se o projeto tem as permissões corretas no Identity and Access Management (IAM).

  1. No console do Google Cloud, em Menu de navegação (Ícone do menu de navegação), selecione IAM e administrador > IAM.

  2. Confira se a conta de serviço padrão do Compute {project-number}-compute@developer.gserviceaccount.com está na lista e recebeu o papel de editor. O prefixo da conta é o número do projeto, que está no Menu de navegação > Visão geral do Cloud > Painel.

Nome da conta de serviço padrão e status do editor do Compute Engine destacados na página com a guia "Permissões"

Observação: se a conta não estiver no IAM ou não tiver o papel de editor, siga as etapas abaixo.
  1. No console do Google Cloud, em Menu de navegação, clique em Visão geral do Cloud > Painel.
  2. Copie o número do projeto, por exemplo, 729328892908.
  3. Em Menu de navegação, clique em IAM e administrador > IAM.
  4. Clique em Permitir acesso, logo abaixo de Visualizar por principais na parte de cima da tabela de papéis.
  5. Em Novos principais, digite:
{project-number}-compute@developer.gserviceaccount.com
  1. Substitua {project-number} pelo número do seu projeto.
  2. Em Papel, selecione Projeto (ou Básico) > Editor.
  3. Clique em Save.

Configuração do ambiente de desenvolvimento baseado em notebook do Jupyter

Neste laboratório, você vai executar todos os comandos em um terminal usando seu notebook.

  1. No console do Google Cloud, abra o Menu de navegação e selecione Vertex AI > Workbench.

  2. Ative a API Notebooks.

  3. Na página "Workbench", clique em CRIAR NOVA.

  4. Na caixa de diálogo Nova instância, defina a região como e a zona como .

  5. Em "Ambiente", selecione Apache Beam.

  6. Clique em CRIAR na parte de baixo da caixa de diálogo.

Observação: pode levar de três a cinco minutos para que o ambiente seja totalmente provisionado. Aguarde até a conclusão dessa etapa. Observação: clique em Ativar API Notebooks para fazer isso.
  1. Depois, clique no link ABRIR O JUPYTERLAB ao lado do nome do seu notebook para abrir o ambiente em uma nova guia do seu navegador.

IDE_link

  1. Em seguida, clique em Terminal. Nele, é possível executar todos os comandos deste laboratório.

Abrir terminal

Faça o download do repositório de código

Agora você precisa dele para usar neste laboratório.

  1. Insira este comando no terminal que você abriu:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. No painel à esquerda do ambiente do notebook, no navegador de arquivos, você vai notar que o repositório training-data-analyst foi adicionado.

  2. Acesse o repositório clonado /training-data-analyst/quests/dataflow_python/. Nele, você vai encontrar uma pasta para cada laboratório com duas subpastas: lab, que contém o código que precisa ser concluído, e solution, que inclui um exemplo prático caso você precise de ajuda.

Opção "Explorador" destacada no menu "Visualização" expandido

Observação: caso você queira editar um arquivo, é só clicar nele. O arquivo será aberto, e você poderá adicionar ou modificar o código.

Clique em Verificar meu progresso para conferir o objetivo. Crie uma instância de notebook e clone uma repo de curso

Várias transformações processam a mesma PCollection

Neste laboratório, você vai criar um pipeline ramificado que grava dados no Google Cloud Storage e no BigQuery.

Uma maneira de escrever um pipeline com ramificações é aplicar duas transformações diferentes à mesma PCollection, resultando em duas PCollections diferentes:

[PCollection1] = [Initial Input PCollection] | [A Transform] [PCollection2] = [Initial Input PCollection] | [A Different Transform]

Como implementar um pipeline com ramificações

Se você tiver dúvidas nesta seção ou nas próximas, a solução está disponível na página de treinamento de analista de dados do Google Cloud.

Tarefa 1: adicionar uma ramificação para fazer gravações no Cloud Storage

Para concluir esta tarefa, mude um pipeline atual adicionando uma ramificação que faça gravações no Cloud Storage.

A ordem do pipeline: entrada, transformação, PCollection, transformação, PCollection, transformação, saída. Uma ramificação começa na primeira instância de PCollection e segue para a transformação e a saída

Abrir o laboratório indicado

  • No terminal do ambiente de desenvolvimento integrado, execute os seguintes comandos:
# Altere o diretório no laboratório cd 2_Branching_Pipelines/lab export BASE_DIR=$(pwd)

Como configurar o ambiente virtual e as dependências

Antes de começar a editar o código do pipeline real, é preciso verificar se instalou as dependências necessárias.

  1. No terminal do seu ambiente de desenvolvimento integrado, execute os comandos abaixo para criar um ambiente virtual que você vai usar neste laboratório:
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. Em seguida, instale os pacotes necessários para executar seu pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Por fim, verifique se a API Dataflow está ativada:
gcloud services enable dataflow.googleapis.com

Configurar o ambiente de dados

# Crie buckets do GCS e um conjunto de dados do BQ cd $BASE_DIR/../.. source create_batch_sinks.sh # Gere um evento dataflow source generate_batch_events.sh # Mude para o diretório que contém a versão de prática do código cd $BASE_DIR
  1. Abra my_pipeline.py no seu ambiente de desenvolvimento integrado, disponível em training-data-analyst/quests/dataflow_python/2_Branching_Pipelines/labs/.

  2. Role para baixo até o método "run()", em que o corpo do pipeline está definido. Atualmente, o formato se parece com este:

(p | 'ReadFromGCS' >> beam.io.ReadFromText(input) | 'ParseJson' >> beam.Map(parse_json) | 'WriteToBQ' >> beam.io.WriteToBigQuery( output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) )
  1. Mude esse código adicionando uma nova transformação com ramificação que grava no Cloud Storage. Para isso, use textio.WriteToText antes de cada elemento que será convertido de json para dict.

Se você tiver dúvidas nesta seção ou nas próximas, consulte a solução na página de treinamento de analista de dados do Google Cloud.

Clique em Verificar meu progresso para conferir o objetivo. Configure o ambiente de dados

Tarefa 2: filtrar dados por campo

No momento, o novo pipeline não consome menos recursos, já que todos os dados são armazenados duas vezes. Para começar a otimizar o consumo de recursos, precisamos reduzir a quantidade de informações duplicadas.

O bucket do Google Cloud Storage funciona como um recurso de arquivamento e backup. Por isso, é importante que todos os dados sejam armazenados nele. No entanto, nem todas as informações precisam ser enviadas ao BigQuery.

  1. Vamos supor que as pessoas responsáveis pela análise de dados geralmente analisam quais recursos os usuários acessam no site e como esses padrões de acesso diferem em função da região geográfica e do tempo. Apenas um subconjunto dos campos seria necessário. Como analisamos os elementos JSON em dicionários, podemos usar facilmente o método pop para soltar um campo de dentro de uma chamável do Python:
def drop_field(element): element.pop('field_name') return element
  1. Para concluir esta tarefa, use uma chamável do Python com beam.Map para soltar o campo user_agent, que não será usado pelos nossos analistas no BigQuery.

Tarefa 3: filtrar dados por elemento

Há muitas maneiras de filtrar no Apache Beam. Como estamos trabalhando com uma PCollection de dicionários Python, a maneira mais fácil é usar uma função lambda (anônima) como filtro, uma função que retorna um valor booleano, com beam.Filter. Exemplo:

purchases | beam.Filter(lambda element : element['cost_cents'] > 20*100)
  • Para concluir esta tarefa, adicione uma transformação beam.Filter ao pipeline. É possível filtrar por qualquer critério, mas recomendamos tentar eliminar linhas em que num_bytes seja maior ou igual a 120.

Tarefa 4: adicionar parâmetros personalizados de linha de comando

O pipeline tem vários parâmetros codificados, incluindo o caminho para a entrada e o local da tabela no BigQuery. No entanto, o pipeline seria mais útil se pudesse ler qualquer arquivo JSON no Cloud Storage. Para adicionar esse recurso, é preciso adicionar um conjunto de parâmetros de linha de comando.

No momento, usamos um ArgumentParser para ler e analisar argumentos de linha de comando. Em seguida, transmitimos esses argumentos para o objeto PipelineOptions() que especificamos ao criar nosso pipeline:

parser = argparse.ArgumentParser(description='...') # Definir e analisar argumentos options = PipelineOptions() # Definir valores de opções p = beam.Pipeline(options=options)

O PipelineOptions é usado para interpretar as opções lidas pelo ArgumentParser. Para adicionar um novo argumento de linha de comando ao analisador, podemos usar a sintaxe:

parser.add_argument('--argument_name', required=True, help='Argument description')

Para acessar um parâmetro de linha de comando no código, analise os argumentos e consulte o campo no dicionário resultante:

opts = parser.parse_args() arg_value = opts.arg_name
  • Para concluir esta tarefa, adicione parâmetros de linha de comando para o caminho de entrada, o caminho de saída do Google Cloud Storage e o nome da tabela do BigQuery. Em seguida, atualize o código do pipeline para acessar esses parâmetros em vez de constantes.

Tarefa 5: adicionar campos NULLABLE ao pipeline

Você deve ter notado que a tabela do BigQuery criada no último laboratório tinha um esquema com todos os campos REQUIRED como este:

A página de registros do BigQuery, aberta na guia "Esquema", que mostra várias linhas de dados abaixo dos cabeçalhos das colunas: Nome do campo, Tipo, Modo, Tags de políticas e Descrição

Talvez seja melhor criar um esquema do Apache Beam com campos NULLABLE em que os dados estejam ausentes, tanto para a execução do pipeline quanto para a tabela resultante do BigQuery.

Podemos atualizar o esquema JSON do BigQuery adicionando um novo modo de propriedade para um campo que queremos anular:

{ "name": "field_name", "type": "STRING", "mode": "NULLABLE" }
  • Para concluir esta tarefa, marque os campos lat e lon como anuláveis no esquema do BigQuery.

Tarefa 6: executar o pipeline na linha de comando

  • Para finalizar a tarefa, execute o pipeline na linha de comando e transmita os parâmetros adequados. Além disso, anote o esquema resultante do BigQuery para campos NULLABLE. O código vai ficar assim:
# Configure as variáveis de ambiente export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export COLDLINE_BUCKET=${BUCKET}-coldline export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export OUTPUT_PATH=${PIPELINE_FOLDER}-coldline/pipeline_output export TABLE_NAME=${PROJECT_ID}:logs.logs_filtered cd $BASE_DIR python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --outputPath=${OUTPUT_PATH} \ --tableName=${TABLE_NAME} Observação: se o pipeline estiver sendo criado, mas você perceber que há vários erros devido a problemas no código ou configuração incorreta no serviço Dataflow, defina "runner" novamente como "DirectRunner", o que faz com que o pipeline seja executado localmente e receba feedback mais rápido. Nesse caso, essa abordagem funciona porque o conjunto de dados é pequeno e você não está usando recursos incompatíveis com o DirectRunner.

Tarefa 7: verificar os resultados do pipeline

  1. Acesse a página de jobs do Dataflow e veja o job enquanto ele está em execução. O gráfico será semelhante a este:

Diagrama de fluxo que mostra fluxos de jobs de ReadFrom GS para DropInputs com vários pontos intermediários e diretamente de ReadFrom GS para WriteToGCS

  1. Clique no nó que representa a função Filter, que na imagem acima é chamada de FilterFn. No painel do lado direito, você vai perceber que mais elementos foram adicionados como entradas do que gravados como saídas.

  2. Agora clique no nó que representa a gravação no Cloud Storage. Como todos os elementos foram gravados, esse número precisa estar de acordo com o número de elementos na entrada para a função "Filter".

  3. Quando o processo do pipeline terminar, examine os resultados no BigQuery consultando sua tabela. O número de registros na tabela precisa estar de acordo com o número de elementos gerados pela função "Filter".

Clique em Verificar meu progresso para conferir o objetivo. Execute o pipeline na linha de comando

Finalize o laboratório

Clique em Terminar o laboratório após a conclusão. O Google Cloud Ensina remove os recursos usados e limpa a conta por você.

Você vai poder avaliar sua experiência no laboratório. Basta selecionar o número de estrelas, digitar um comentário e clicar em Enviar.

O número de estrelas indica o seguinte:

  • 1 estrela = muito insatisfeito
  • 2 estrelas = insatisfeito
  • 3 estrelas = neutro
  • 4 estrelas = satisfeito
  • 5 estrelas = muito satisfeito

Feche a caixa de diálogo se não quiser enviar feedback.

Para enviar seu feedback, fazer sugestões ou correções, use a guia Suporte.

Copyright 2020 Google LLC. Todos os direitos reservados. Google e o logotipo do Google são marcas registradas da Google LLC. Todos os outros nomes de produtos e empresas podem ser marcas registradas das respectivas empresas a que estão associados.

Este conteúdo não está disponível no momento

Você vai receber uma notificação por e-mail quando ele estiver disponível

Ótimo!

Vamos entrar em contato por e-mail se ele ficar disponível