arrow_back

Procesamiento de datos sin servidores con Dataflow: Canalizaciones con ramas (Python)

Acceder Unirse
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Procesamiento de datos sin servidores con Dataflow: Canalizaciones con ramas (Python)

Lab 2 horas universal_currency_alt 5 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Implementar una canalización que tenga ramas
  • Filtrar datos antes de escribirlos
  • Agregar parámetros de línea de comandos personalizados a una canalización

Requisitos previos:

  • Conocimientos básicos sobre Python

En el lab anterior, creaste una canalización secuencial de extracción, transformación y carga básica, y utilizaste una plantilla de Dataflow equivalente para transferir el almacenamiento de datos por lotes en Google Cloud Storage. Esta canalización consta de una secuencia de transformaciones:

Diagrama de flujo de la canalización con elementos que fluyen en el siguiente orden: Entrada, Transformación, PCollection, Transformación, PCollection, Transformación, Salida.

Sin embargo, muchas canalizaciones no mostrarán una estructura tan simple. En este lab, crearás una canalización no secuencial más sofisticada.

El caso de uso aquí es optimizar el consumo de recursos. Los productos varían según la forma en que consumen los recursos. Además, no todos los datos se utilizan de la misma manera dentro de una empresa. Algunos datos se consultarán con regularidad (por ejemplo, dentro de cargas de trabajo analíticas) y, otros, solo se usarán para la recuperación. En este lab, optimizarás la canalización del primer lab en pos del consumo de recursos. Para ello, almacenarás solo los datos que los analistas usarán en BigQuery y archivarás los demás datos en un servicio de almacenamiento muy duradero y de bajo costo, Coldline Storage en Google Cloud Storage.

Configuración y requisitos

En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Qwiklabs desde una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usa otras credenciales, se generarán errores o incurrirá en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Verifica los permisos del proyecto

Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).

  1. En la consola de Google Cloud, en el Menú de navegación (Ícono del menú de navegación), selecciona IAM y administración > IAM.

  2. Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com, y que tenga asignado el rol Editor. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.

El nombre de la cuenta de servicio predeterminada de Compute Engine y el estado del editor destacados en la página de pestañas Permisos

Nota: Si la cuenta no aparece en IAM o no tiene asignado el rol Editor, sigue los pasos que se indican a continuación para asignar el rol necesario.
  1. En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
  2. Copia el número del proyecto (p. ej., 729328892908).
  3. En el Menú de navegación, selecciona IAM y administración > IAM.
  4. En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
  5. En Principales nuevas, escribe lo siguiente:
{project-number}-compute@developer.gserviceaccount.com
  1. Reemplaza {número-del-proyecto} por el número de tu proyecto.
  2. En Rol, selecciona Proyecto (o Básico) > Editor.
  3. Haz clic en Guardar.

Configuración del entorno de desarrollo basado en notebooks de Jupyter

En este lab, ejecutarás todos los comandos en una terminal del notebook.

  1. En el Menú de navegación de la consola de Google Cloud, haz clic en Vertex AI > Workbench.

  2. Habilita la API de Notebooks.

  3. En la página Workbench, haz clic en CREAR NUEVO.

  4. En el cuadro de diálogo Instancia nueva que se muestra, establece la región en y la zona en .

  5. En Entorno, selecciona Apache Beam.

  6. Haz clic en CREAR en la parte inferior del cuadro de diálogo.

Nota: El aprovisionamiento completo del entorno tarda de 3 a 5 minutos. Espera hasta que se complete este paso. Nota: Haz clic en Habilitar API de Notebooks para habilitarla.
  1. Cuando el entorno esté listo, haz clic en el vínculo ABRIR JUPYTERLAB que se encuentra junto al nombre del notebook. Esto abrirá tu entorno en una nueva pestaña del navegador.

IDE_link

  1. Luego, haz clic en Terminal. Esto abrirá una terminal en la que podrás ejecutar todos los comandos del lab.

Abre la terminal

Descarga el repositorio de código

A continuación, descargarás un repositorio de código que usarás en este lab.

  1. En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.

  2. Navega al repo clonado /training-data-analyst/quests/dataflow_python/. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab con un código que debes completar y una subcarpeta solution con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Opción Explorador destacada en el menú Ver expandido

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.

Haz clic en Revisar mi progreso para verificar el objetivo. Crear una instancia de notebook y clonar el repo del curso

Las transformaciones múltiples procesan la misma PCollection

En este lab, escribirás una canalización con ramas que, a su vez, escribirá datos en Google Cloud Storage y en BigQuery.

Una forma de escribir una canalización con ramas es aplicar dos transformaciones diferentes a la misma PCollection, lo que da como resultado dos PCollections diferentes:

[PCollection1] = [Initial Input PCollection] | [A Transform] [PCollection2] = [Initial Input PCollection] | [A Different Transform]

Implementa una canalización con ramas

Si no puedes avanzar en esta sección o en secciones posteriores, consulta la solución que está disponible en la página de training-data-analyst de Google Cloud.

Tarea 1. Agrega una rama para escribir en Cloud Storage

Para completar esta tarea, modifica una canalización existente agregando una rama que escriba en Cloud Storage.

El orden de la canalización: Entrada, Transformación, PCollection, Transformación, PCollection, Transformación, Salida. Una rama comienza en la primera instancia de la PCollection, luego, fluye a la Transformación y a la Salida.

Abre el lab adecuado

  • En la terminal de tu IDE, ejecuta los siguientes comandos:
# Change directory into the lab cd 2_Branching_Pipelines/lab export BASE_DIR=$(pwd)

Configura el entorno virtual y las dependencias

Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.

  1. En la terminal de tu IDE, ejecuta los siguientes comandos para crear un entorno virtual para tu trabajo en este lab:
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Por último, asegúrate de que la API de Dataflow esté habilitada:
gcloud services enable dataflow.googleapis.com

Configura el entorno de datos

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR
  1. Abre my_pipeline.py en tu IDE, que puedes encontrar en training-data-analyst/quests/dataflow_python/2_Branching_Pipelines/labs/.

  2. Desplázate hacia abajo hasta el método run(), en el que se define el cuerpo de la canalización. Actualmente, se ve de la siguiente manera:

(p | 'ReadFromGCS' >> beam.io.ReadFromText(input) | 'ParseJson' >> beam.Map(parse_json) | 'WriteToBQ' >> beam.io.WriteToBigQuery( output, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) )
  1. Modifica este código incorporando una transformación de ramificación nueva que escriba en Cloud Storage con textio.WriteToText antes de que cada elemento se convierta de json a dict.

Si no puedes avanzar en esta sección o en secciones posteriores, puedes consultar la solución en la página training-data-analyst de Google Cloud.

Haz clic en Revisar mi progreso para verificar el objetivo. Configurar el entorno de datos

Tarea 2. Filtra datos por campo

Por el momento, la canalización nueva no consume menos recursos, puesto que todos los datos se almacenan dos veces. Para comenzar a mejorar el consumo de recursos, debemos reducir la cantidad de datos duplicados.

El bucket de Google Cloud Storage está diseñado para funcionar como almacenamiento de archivos y copias de seguridad, por lo que es importante que todos los datos se guarden allí. Sin embargo, no es necesario que todos los datos se envíen a BigQuery.

  1. Supongamos que los analistas de datos suelen observar a qué recursos acceden los usuarios en el sitio web y cómo esos patrones de acceso difieren en función de la ubicación geográfica y el tiempo. Solo se necesitaría un subconjunto de los campos. Dado que analizamos los elementos JSON en diccionarios, podemos usar fácilmente el método pop para colocar un campo desde una función Python que admita llamadas:
def drop_field(element): element.pop('field_name') return element
  1. Para completar esta tarea, usa una función Python que admita llamadas con beam.Map para descartar el campo user_agent, que nuestros analistas no usarán en BigQuery.

Tarea 3. Filtra datos por elemento

Existen muchas formas de filtrar en Apache Beam. Como estamos trabajando con una PCollection de diccionarios de Python, la forma más fácil será usar una función lambda (anónima) como nuestro filtro, una función que muestra un valor booleano, con beam.Filter. Por ejemplo:

purchases | beam.Filter(lambda element : element['cost_cents'] > 20*100)
  • Para completar esta tarea, agrega una transformación beam.Filter a la canalización. Puedes filtrar según los criterios que desees, pero como sugerencia intenta quitar las filas en las que num_bytes sea mayor o igual que 120.

Tarea 4. Agrega parámetros personalizados de la línea de comandos

Actualmente, la canalización tiene una serie de parámetros hard-coded, incluida la ruta de acceso a la entrada y la ubicación de la tabla en BigQuery. Sin embargo, la canalización sería más útil si pudiera leer cualquier archivo JSON en Cloud Storage. Para incluir esta función, es necesario agregar elementos al conjunto de parámetros de la línea de comandos.

Actualmente, usamos un ArgumentParser para leer y analizar los argumentos de la línea de comandos. Luego, pasamos estos argumentos al objeto PipelineOptions() que especificamos cuando creamos nuestra canalización:

parser = argparse.ArgumentParser(description='...') # Define and parse arguments options = PipelineOptions() # Set options values from options p = beam.Pipeline(options=options)

Las PipelineOptions se usan para interpretar las opciones que lee ArgumentParser. Para agregar un argumento de línea de comandos nuevo al analizador, podemos usar la sintaxis:

parser.add_argument('--argument_name', required=True, help='Argument description')

Para acceder a un parámetro de la línea de comandos en el código, analiza los argumentos y consulta el campo en el diccionario resultante:

opts = parser.parse_args() arg_value = opts.arg_name
  • Para completar esta tarea, agrega parámetros de línea de comandos para la ruta de entrada, la ruta de salida de Google Cloud Storage y el nombre de la tabla de BigQuery, y actualiza el código de la canalización para acceder a esos parámetros en lugar de constantes.

Tarea 5. Agrega campos nullable a tu canalización

Probablemente notaste que la tabla de BigQuery creada en el último lab tenía un esquema con todos los campos REQUIRED como el siguiente:

La página de registros de BigQuery, abierta en la pestaña Esquema, en la que se muestran varias filas de datos debajo de los encabezados de las columnas: Nombre del campo, Tipo, Modo, Etiquetas de política y Descripción

Es posible que quieras crear un esquema de Apache Beam con campos NULLABLE en los que falten datos, tanto para la ejecución de la canalización, como para una tabla de BigQuery resultante.

Podemos actualizar el esquema JSON de BigQuery si agregamos un nuevo mode de propiedad para el campo que queremos que sea nulo:

{ "name": "field_name", "type": "STRING", "mode": "NULLABLE" }
  • Para completar esta tarea, marca los campos lat y lon como anulables en el esquema de BigQuery.

Tarea 6. Ejecuta tu canalización desde la línea de comandos

  • Para completar esta tarea, ejecuta tu canalización desde la línea de comandos y pasa los parámetros adecuados. Recuerda tomar nota del esquema resultante de BigQuery para los campos NULLABLE. El código debería ser similar al siguiente:
# Set up environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export COLDLINE_BUCKET=${BUCKET}-coldline export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export OUTPUT_PATH=${PIPELINE_FOLDER}-coldline/pipeline_output export TABLE_NAME=${PROJECT_ID}:logs.logs_filtered cd $BASE_DIR python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --outputPath=${OUTPUT_PATH} \ --tableName=${TABLE_NAME} Nota: Si tu canalización se compila correctamente, pero observas muchos errores debido a código o a una configuración incorrecta en el servicio de Dataflow, puedes configurar “runner” de nuevo como “DirectRunner” para ejecutarlo de forma local y recibir resultados más rápido. Este enfoque funciona en este caso porque el conjunto de datos es pequeño y no estás usando ninguna función que no sea compatible con DirectRunner.

Tarea 7: Verifica los resultados de la canalización

  1. Navega a la página Trabajos de Dataflow y observa el trabajo mientras se ejecuta. El gráfico debería ser similar al siguiente:

Diagrama de flujo en el que se muestran flujos de trabajo desde ReadFrom GS hasta DropInputs con varios puntos en medio y directamente desde ReadFrom GS hasta WriteToGCS

  1. Haz clic en el nodo que representa tu función Filter que, en la imagen anterior, se llama FilterFn. En el panel que aparece en el lado derecho, deberías ver que se agregaron más elementos como entradas que los que se escribieron como salidas.

  2. Ahora, haz clic en el nodo que representa la escritura en Cloud Storage. Dado que se escribieron todos los elementos, esta cantidad debe coincidir con la cantidad de elementos de la entrada de la función de filtro.

  3. Una vez finalizada la canalización, consulta tu tabla para examinar los resultados en BigQuery. Ten en cuenta que la cantidad de registros en la tabla debe coincidir con la cantidad de elementos que generó la función de filtro.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecuta tu canalización desde la línea de comandos

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible