arrow_back

Procesamiento de datos sin servidores con Dataflow: Cómo escribir una canalización de ETL con Apache Beam y Cloud Dataflow (Python)

Acceder Unirse
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Procesamiento de datos sin servidores con Dataflow: Cómo escribir una canalización de ETL con Apache Beam y Cloud Dataflow (Python)

Lab 1 hora 30 minutos universal_currency_alt 5 créditos show_chart Intermedio
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Crear una canalización de extracción, transformación y carga por lotes en Apache Beam, que utiliza datos sin procesar de Google Cloud Storage y los escribe en Google BigQuery
  • Ejecutar la canalización de Apache Beam en Cloud Dataflow
  • Parametrizar la ejecución de la canalización

Requisitos previos:

  • Conocimientos básicos sobre Python

Configuración y requisitos

En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Qwiklabs desde una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usa otras credenciales, se generarán errores o incurrirá en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Verifica los permisos del proyecto

Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).

  1. En la consola de Google Cloud, en el Menú de navegación (Ícono del menú de navegación), selecciona IAM y administración > IAM.

  2. Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com, y que tenga asignado el rol Editor. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.

El nombre de la cuenta de servicio predeterminada de Compute Engine y el estado del editor destacados en la página de pestañas Permisos

Nota: Si la cuenta no aparece en IAM o no tiene asignado el rol Editor, sigue los pasos que se indican a continuación para asignar el rol necesario.
  1. En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
  2. Copia el número del proyecto (p. ej., 729328892908).
  3. En el Menú de navegación, selecciona IAM y administración > IAM.
  4. En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
  5. En Principales nuevas, escribe lo siguiente:
{project-number}-compute@developer.gserviceaccount.com
  1. Reemplaza {número-del-proyecto} por el número de tu proyecto.
  2. En Rol, selecciona Proyecto (o Básico) > Editor.
  3. Haz clic en Guardar.

Configuración del entorno de desarrollo basado en notebooks de Jupyter

En este lab, ejecutarás todos los comandos en una terminal del notebook.

  1. En el Menú de navegación de la consola de Google Cloud, haz clic en Vertex AI > Workbench.

  2. Habilita la API de Notebooks.

  3. En la página Workbench, haz clic en CREAR NUEVO.

  4. En el cuadro de diálogo Instancia nueva que se muestra, establece la región en y la zona en .

  5. En Entorno, selecciona Apache Beam.

  6. Haz clic en CREAR en la parte inferior del cuadro de diálogo.

Nota: El aprovisionamiento completo del entorno tarda de 3 a 5 minutos. Espera hasta que se complete este paso. Nota: Haz clic en Habilitar API de Notebooks para habilitarla.
  1. Cuando el entorno esté listo, haz clic en el vínculo ABRIR JUPYTERLAB que se encuentra junto al nombre del notebook. Esto abrirá tu entorno en una nueva pestaña del navegador.

IDE_link

  1. Luego, haz clic en Terminal. Esto abrirá una terminal en la que podrás ejecutar todos los comandos del lab.

Abre la terminal

Descarga el repositorio de código

A continuación, descargarás un repositorio de código que usarás en este lab.

  1. En la terminal que acabas de abrir, ingresa lo siguiente:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.

  2. Navega al repo clonado /training-data-analyst/quests/dataflow_python/. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab con un código que debes completar y una subcarpeta solution con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.

Opción Explorador destacada en el menú Ver expandido

Nota: Para abrir un archivo y editarlo, simplemente debes navegar al archivo y hacer clic en él. Se abrirá el archivo, en el que puedes agregar o modificar código.

Haz clic en Revisar mi progreso para verificar el objetivo. Crear una instancia de notebook y clonar el repo del curso

Apache Beam y Cloud Dataflow

Aproximadamente 5 minutos

Cloud Dataflow es un servicio de Google Cloud Platform completamente administrado que se usa para ejecutar canalizaciones de procesamiento de datos por lotes y de transmisión de Apache Beam.

Apache Beam es un modelo de programación de procesamiento de datos portátil, de código abierto, unificado y avanzado que permite a los usuarios finales definir canalizaciones de procesamiento paralelo por lotes y de datos de transmisión mediante Java, Python o Go. Las canalizaciones de Apache Beam se pueden ejecutar en conjuntos de datos pequeños en tu máquina de desarrollo local y a gran escala en Cloud Dataflow. Sin embargo, debido a que Apache Beam es de código abierto, existen otros ejecutores, por lo que puedes ejecutar canalizaciones de Beam en Apache Flink y Apache Spark, entre otras opciones.

Diagrama de la arquitectura de red del lab

Parte 1 del Lab. Escribe una canalización de ETL desde cero

Introducción

En esta sección, escribirás una canalización de extracción, transformación y carga (ETL) de Apache Beam.

Revisión de casos de uso y conjuntos de datos

En cada lab de esta Quest, los datos de entrada están diseñados para ser similares a los registros del servidor web en formato de registro común junto con otros datos que pueda contener un servidor web. En este primer lab, los datos se tratan como una fuente por lotes. En los labs posteriores, se tratarán como una fuente de transmisión. Tu tarea es leer los datos, analizarlos y, luego, escribirlos en BigQuery, un almacén de datos sin servidores, para analizarlos.

Abre el lab adecuado

  • Regresa a la terminal en tu IDE y copia y pega el siguiente comando:
cd 1_Basic_ETL/lab export BASE_DIR=$(pwd)

Configura el entorno virtual y las dependencias

Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.

  1. En la terminal, crea un entorno virtual para tu trabajo en este lab:
sudo apt-get update && sudo apt-get install -y python3-venv python3 -m venv df-env source df-env/bin/activate
  1. Luego, instala los paquetes que necesitarás para ejecutar tu canalización:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Por último, asegúrate de que la API de Dataflow esté habilitada:
gcloud services enable dataflow.googleapis.com

Escribe tu primera canalización

1 hora

Tarea 1. Genera datos sintéticos

  1. Ejecuta el siguiente comando en la terminal para clonar un repositorio que contenga secuencias de comandos para generar registros sintéticos del servidor web:
cd $BASE_DIR/../.. source create_batch_sinks.sh bash generate_batch_events.sh head events.json

La secuencia de comandos crea un archivo llamado events.json que contiene líneas similares a las siguientes:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Luego, copiará automáticamente este archivo en tu bucket de Google Cloud Storage en .

  1. En otra pestaña del navegador, ve a Google Cloud Storage y comprueba que tu bucket de almacenamiento contenga un archivo llamado events.json.

Haz clic en Revisar mi progreso para verificar el objetivo. Generar datos sintéticos

Tarea 2. Lee datos desde tu fuente

Si no puedes avanzar en esta sección o en las secciones posteriores, puedes consultar la solución.

  1. En tu explorador de archivos, navega a la carpeta 1_Basic_ETL/lab del lab y haz clic en my_pipeline.py. Se abrirá el archivo en un panel de edición. Asegúrate de que se importaron los siguientes paquetes:
import argparse import time import logging import json import apache_beam as beam from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners import DataflowRunner, DirectRunner
  1. Desplázate hacia abajo hasta el método run(). Actualmente, este método contiene una canalización que no realiza ninguna acción. Observa cómo se crea un objeto Pipeline a través de un objeto PipelineOptions, y cómo la línea final del método ejecuta la canalización:
options = PipelineOptions() # Set options p = beam.Pipeline(options=options) # Do stuff p.run()
  • Todos los datos de las canalizaciones de Apache Beam se encuentran en PCollections. Para crear la PCollection inicial de tu canalización, tendrás que aplicar una transformación raíz a tu objeto de canalización. Una transformación raíz crea una PCollection a partir de una fuente de datos externa o algunos datos locales que especifiques.

  • Existen dos tipos de transformaciones raíz en los SDK de Beam: Read y Create. Las transformaciones Read leen datos de una fuente externa, como un archivo de texto o una tabla de base de datos. Las transformaciones Create crean una PCollection a partir de una list en la memoria y son especialmente útiles para las pruebas.

El siguiente código de ejemplo muestra cómo aplicar una transformación raíz ReadFromText para leer datos de un archivo de texto. La transformación se aplica a un objeto p de Pipeline y devuelve un conjunto de datos de canalización en el formato de una PCollection[str] (con la notación que proviene de sugerencias de tipos parametrizadas). “ReadLines” es el nombre que utilizarás para la transformación, que será útil más adelante cuando trabajes con canalizaciones más grandes:

lines = p | "ReadLines" >> beam.io.ReadFromText("gs://path/to/input.txt")
  1. Dentro del método run(), crea una constante de cadena llamada “input” y establece su valor en gs://<YOUR-PROJECT-ID>/events.json. En un lab futuro, usarás parámetros de línea de comandos para pasar esta información.

  2. Cree una PCollection de cadenas de todos los eventos en events.json realizando una llamada a la transformación textio.ReadFromText.

  3. Agrega las sentencias de importación apropiadas a la parte superior de my_pipeline.py.

  4. Para guardar tu trabajo, haz clic en Archivo y selecciona Guardar en el menú de navegación de la parte superior.

Tarea 3. Ejecuta la canalización para verificar que funcione

  • Regresa a la terminal, selecciona la carpeta $BASE_DIR y ejecuta los siguientes comandos. Asegúrate de configurar la variable de entorno PROJECT_ID antes de ejecutar la canalización:
cd $BASE_DIR # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) # Run the pipeline python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DirectRunner

Por el momento, tu canalización no hace nada; solo lee datos.

Sin embargo, cuando se ejecuta muestra un flujo de trabajo útil en el que puedes verificar la canalización de forma local y económica con DirectRunner, que se ejecuta en tu máquina local antes de realizar cálculos más costosos. Para ejecutar la canalización con Google Cloud Dataflow, puedes cambiar el runner a DataflowRunner.

Tarea 4. Agrega una transformación

Si no puedes avanzar, consulta la solución.

Las transformaciones son las que cambian tus datos. En Apache Beam, estas se realizan con la clase PTransform. En el entorno de ejecución, estas operaciones se realizarán para varios trabajadores independientes.

La entrada y salida de cada PTransform es una PCollection. De hecho, aunque no te hayas dado cuenta, ya usaste PTransform cuando leíste datos de Google Cloud Storage. Independientemente de si lo asignaste o no a una variable, esto creó una PCollection de cadenas.

Debido a que Beam usa un método de aplicación genérico para PCollection, representadas por el operador de canalización | en Python, puedes encadenar transformaciones de manera secuencial. Por ejemplo, puedes encadenar transformaciones para crear una canalización secuencial como la siguiente:

[Output_PCollection] = ([Input_PCollection] | [First Transform] | [Second Transform] | [Third Transform])

Para esta tarea, usarás un tipo nuevo de transformación: ParDo. ParDo es una transformación de Beam para el procesamiento paralelo genérico.

El paradigma de procesamiento ParDo es similar a la fase “Map” de un algoritmo de estilo Map/Shuffle/Reduce. Una transformación ParDo considera cada elemento de la PCollection de entrada, realiza una función de procesamiento (tu código de usuario) en ese elemento y emite cero elementos, uno solo o varios a una PCollection de salida.

ParDo sirve para una variedad de operaciones comunes de procesamiento de datos. Sin embargo, existen PTransform especiales en Python para simplificar el proceso, incluidas las siguientes:

  • Filtrar un conjunto de datos. Puedes usar Filter para considerar cada elemento de una PCollection y enviarlo a una PCollection nueva, o descartarlo según la salida de una función de Python que admita llamadas y devuelva un valor booleano.
  • Formatear o realizar conversión de tipos a cada elemento en un conjunto de datos. Si tu PCollection de entrada contiene elementos que son de un tipo o formato diferente del que quieres, puedes usar Map para realizar una conversión en cada elemento y mostrar el resultado una nueva PCollection.
  • Extraer partes de cada elemento en un conjunto de datos. Si tienes una PCollection de registros con varios campos, por ejemplo, también puedes usar Map o FlatMap para analizar solo los campos que quieras considerar en una nueva PCollection.
  • Realizar procesamientos sobre cada elemento en un conjunto de datos. Puedes usar ParDo, Map o FlatMap para realizar cálculos simples o complejos en cada elemento o en elementos específicos de una PCollection, y generar los resultados como una PCollection nueva.

Para completar esta tarea, debes escribir una transformación Map que lea en una cadena de JSON que represente un evento único, lo analice con el paquete json de Python y genere el diccionario devuelto por json.loads.

Las funciones de Map se pueden implementar de dos maneras, ya sea intercaladas o a través de una función predefinida que admita llamadas. Escribe funciones de Map intercaladas de la siguiente manera:

p | beam.Map(lambda x : something(x))

Como alternativa, beam.Map se puede usar con una función de Python definida anteriormente en la secuencia de comandos que admita llamadas:

def something(x): y = # Do something! return y p | beam.Map(something)

Si necesitas más flexibilidad de la que ofrece beam.Map (y otras opciones ligeras de DoFn), puedes implementar ParDo con DoFn personalizadas que sean subclases de DoFn. De esta manera, se integrarán en los frameworks de prueba con mayor facilidad:

class MyDoFn(beam.DoFn): def process(self, element): output = #Do Something! yield output p | beam.ParDo(MyDoFn())

Recuerda que, si tienes problemas, puedes consultar la solución.

Tarea 5. Escribe en un receptor

En este punto, la canalización lee un archivo de Google Cloud Storage, analiza cada línea y emite un diccionario de Python para cada elemento. El siguiente paso es escribir estos objetos en una tabla de BigQuery.

  1. Si bien puedes indicarle a tu canalización que cree una tabla de BigQuery si es necesario, primero deberás crear el conjunto de datos. La secuencia de comandos generate_batch_events.sh ya lo hizo. Puedes examinar el conjunto de datos con el siguiente código:
# Examine dataset bq ls # No tables yet bq ls logs

Para entregar los resultados de la PCollection final de tu canalización, aplica una transformación Write a esa PCollection. Las transformaciones Write pueden entregar los elementos de una PCollection a un receptor de datos externo, como una tabla de base de datos. Puedes usar Write para generar una PCollection en cualquier momento en tu canalización, aunque, por lo general, debes escribir datos al final de tu canalización.

En el siguiente código de ejemplo, se muestra cómo aplicar una transformación WriteToText para escribir una PCollection de cadena en un archivo de texto:

p | "WriteMyFile" >> beam.io.WriteToText("gs://path/to/output")
  1. En este caso, en lugar de usar WriteToText, utiliza WriteToBigQuery.

Esta función requiere que se determinen varios elementos, incluida la tabla específica en la que se escribirá y el esquema de esta. De manera opcional, puedes especificar si deseas adjuntar a una tabla existente, volver a crear las tablas existentes (útil en las iteraciones iniciales de la canalización) o crear la tabla si no existe. Según la configuración predeterminada, esta transformación creará tablas que no existen y no escribirá en una tabla que no esté vacía.

  1. Sin embargo, debemos especificar nuestro esquema. Existen dos maneras de hacerlo. Podemos especificarlo como una sola cadena o en formato JSON. Por ejemplo, supongamos que nuestro diccionario tiene tres campos: nombre (de tipo str), ID (de tipo int) y saldo (de tipo float). Luego, podemos especificar el esquema en una sola línea:
table_schema = 'name:STRING,id:INTEGER,balance:FLOAT'

O bien, especifícalo como JSON:

table_schema = { "fields": [ { "name": "name", "type": "STRING" }, { "name": "id", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "balance", "type": "FLOAT", "mode": "REQUIRED" } ] }

En el primer caso (con la cadena única), se supone que todos los campos son NULLABLE. Podemos especificar el modo si usamos el enfoque JSON.

  1. Una vez que definamos el esquema de la tabla, podremos agregar el receptor a nuestro DAG:
p | 'WriteToBQ' >> beam.io.WriteToBigQuery( 'project:dataset.table', schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ) Nota: WRITE_TRUNCATE borrará y volverá a crear tu tabla todas las veces. Esto es útil en las iteraciones iniciales de la canalización, especialmente durante la iteración de tu esquema, pero puede causar fácilmente problemas inesperados en la producción. WRITE_APPEND o WRITE_EMPTY son más seguros.

Recuerda definir el esquema de la tabla y agregar el receptor de BigQuery a tu canalización. No olvides que, si tienes problemas, puedes consultar la solución.

Tarea 6. Ejecuta tu canalización

  1. Regresa a la terminal y ejecuta tu canalización con un comando casi igual al de antes. Sin embargo, ahora usa DataflowRunner para ejecutar la canalización en Cloud Dataflow.
# Set up environment variables cd $BASE_DIR export PROJECT_ID=$(gcloud config get-value project) # Run the pipelines python3 my_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=gs://$PROJECT_ID/staging/ \ --tempLocation=gs://$PROJECT_ID/temp/ \ --runner=DataflowRunner

La forma general debe consistir en una sola ruta de acceso que comience con la transformación Read y finalice con la transformación Write. Desde que se ejecute la canalización, los trabajadores se agregarán de forma automática, y el servicio determinará las necesidades de esta, por lo que los trabajadores desaparecerán cuando ya no sean necesarios. Para observar esto, navega a Compute Engine, donde deberías ver máquinas virtuales creadas por el servicio de Dataflow.

Nota: Si tu canalización se crea correctamente, pero observas muchos errores debido a código o a una configuración incorrecta en el servicio de Dataflow, puedes volver a configurar runner en DirectRunner para ejecutarlo de forma local y recibir comentarios más rápido. Este enfoque funciona en este caso porque el conjunto de datos es pequeño y no estás usando ninguna función que no sea compatible con DirectRunner.
  1. Una vez finalizada la canalización, regresa a la ventana del navegador de BigQuery y consulta tu tabla.

Si el código no funciona como se espera y no sabes qué hacer, consulta la solución.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar tu canalización

Parte 2 del lab: Parametrización de ETL básico

Aproximadamente 20 minutos

Gran parte del trabajo de los ingenieros de datos es predecible, como los trabajos recurrentes, o es similar a otros trabajos. Sin embargo, se requiere experiencia en ingeniería para el proceso de ejecutar canalizaciones. Piensa en los pasos que acabas de completar:

  1. Creaste un entorno de desarrollo y desarrollaste una canalización. El entorno incluyó el SDK de Apache Beam y otras dependencias.
  2. Ejecutaste la canalización desde el entorno de desarrollo. El SDK de Apache Beam almacenó archivos en etapa intermedia en Cloud Storage, creó un archivo de solicitud de trabajo y envió el archivo al servicio de Cloud Dataflow.

Sería mucho mejor si hubiera una manera de iniciar un trabajo con una llamada a la API o sin tener que configurar un entorno de desarrollo (una tarea que los usuarios que no son técnicos no podrían realizar). Esto también te permitiría ejecutar canalizaciones.

Con las plantillas de Dataflow se busca resolver este problema cambiando la representación que se genera cuando se crea una canalización para que se pueda parametrizar. Desafortunadamente, no es tan simple como exponer parámetros de la línea de comandos, a pesar de que esto último se hace en un lab posterior. Con las plantillas de Dataflow, el flujo de trabajo anterior se convierte en lo siguiente:

  1. Los desarrolladores crean un entorno de desarrollo y desarrollan su canalización. El entorno incluye el SDK de Apache Beam y otras dependencias.
  2. Los desarrolladores ejecutan la canalización y crean una plantilla. El SDK de Apache Beam almacena archivos en etapa intermedia en Cloud Storage, crea un archivo de plantilla (similar al de solicitud de trabajo) y guarda el archivo de plantilla en Cloud Storage.
  3. Los usuarios que no son desarrolladores y otras herramientas de flujo de trabajo, como Airflow, pueden ejecutar trabajos con facilidad mediante la consola de Google Cloud, la herramienta de línea de comandos de gcloud o la API de REST para enviar solicitudes de ejecución de archivos de plantillas al servicio de Cloud Dataflow.

En este lab, practicarás con una de las múltiples plantillas de Dataflow creadas por Google para realizar la misma tarea que la canalización que creaste en la Parte 1.

Tarea 1. Crea un archivo de esquema de JSON

Al igual que antes, debes pasar a la plantilla de Dataflow un archivo JSON que represente el esquema de este ejemplo.

  1. Regresa a la terminal en tu IDE. Ejecuta los siguientes comandos para regresar al directorio principal. Luego, toma el esquema de tu tabla logs.logs existente:
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. Ahora, capta este resultado en un archivo y súbelo a GCS. Los comandos sed adicionales permiten compilar un objeto JSON completo que Dataflow esperará.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

Haz clic en Revisar mi progreso para verificar el objetivo. Crear un archivo de esquema JSON

Tarea 2. Escribe una función de JavaScript definida por el usuario

La plantilla de Dataflow de Cloud Storage a BigQuery requiere una función de JavaScript para convertir el texto sin procesar en un archivo JSON válido. En este caso, cada línea de texto tiene un formato JSON válido, por lo que la función es trivial.

  1. Para completar esta tarea, crea un archivo nuevo en la carpeta dataflow_python del explorador de archivos en tu IDE.

  2. Para crear un archivo nuevo, haz clic en Archivo >> Nuevo >> Archivo de texto.

  3. Cámbiale el nombre al archivo a transform.js. Para hacerlo, haz clic con el botón derecho sobre él.

  4. Abre transform.js file en el panel de edición y haz clic en el archivo para abrirlo.

  5. Copia la siguiente función en el archivo transform.js y guárdalo:

function transform(line) { return line; }
  1. Luego, ejecuta el siguiente comando para copiar y pegar el archivo en Google Cloud Storage:
export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

Haz clic en Revisar mi progreso para verificar el objetivo. Escribir una función definida por el usuario de JavaScript en un archivo de JavaScript

Tarea 3. Ejecuta una plantilla de Dataflow

  1. Ve a la IU web de Cloud Dataflow.
  2. Haz clic en CREAR TRABAJO A PARTIR DE UNA PLANTILLA.
  3. Ingresa un nombre para tu trabajo de Cloud Dataflow.
  4. En Plantilla de Dataflow, selecciona la plantilla Text Files on Cloud Storage to BigQuery en la sección Process Data in Bulk (batch), NO en la sección de transmisión.
  5. En Cloud Storage input file, ingresa la ruta de acceso a events.json en el formato .
  6. En Cloud Storage location of your BigQuery schema file, escribe la ruta de acceso a tu archivo schema.json en el formato .
  7. En BigQuery output table, ingresa .
  8. En Temporary BigQuery directory, ingresa una carpeta nueva dentro de este mismo bucket. El trabajo lo creará por ti.
  9. En Ubicación temporal, ingrese una segunda carpeta nueva dentro de este mismo bucket.
  10. Deja Encriptación en Google-managed encryption key.
  11. Haz clic para abrir Parámetros opcionales.
  12. En Ruta de acceso de UDF de JavaScript en Cloud Storage, ingresa la ruta de acceso a .js con el formato .
  13. En JavaScript UDF name, ingresa transform.
  14. Haz clic en el botón Ejecutar trabajo.

Mientras se ejecuta tu trabajo, puedes inspeccionarlo desde la IU web de Dataflow.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar una plantilla de Dataflow

Tarea 4. Inspecciona el código de la plantilla de Dataflow

El código de la plantilla de Dataflow que acabas de usar se encuentra en esta guía TextIOToBigQuery.

  • Desplázate hacia abajo hasta el método principal. El código debería parecer conocido para la canalización que autorizaste.

    • Comienza con un objeto Pipeline, creado con un objeto PipelineOptions.
    • Consiste en una cadena de PTransform que comienza con una transformación TextIO.read().
    • La PTransform después de la transformación de Read es un poco diferente. Permite usar JavaScript para transformar las cadenas de entrada (por ejemplo, si el formato de origen no se adapta bien al de la tabla de BigQuery). Para obtener documentación sobre cómo usar esta función, consulta esta página.
    • La PTransform después de la UDF de JavaScript usa una función de biblioteca para convertir el archivo JSON en una fila de tabla. Puedes consultar ese código aquí.
    • La PTransform Read se ve un poco diferente porque, en lugar de usar un esquema conocido en el tiempo de compilación del gráfico, el código pretende aceptar parámetros que solo se conocerán en el tiempo de ejecución. Esto es posible gracias a la clase NestedValueProvider.

Asegúrate de consultar el siguiente lab, en el que se explicará cómo crear canalizaciones que no sean únicamente cadenas de PTransform, y cómo adaptar una canalización que creaste para que sea una plantilla de Dataflow personalizada.

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible