
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Create Vertex AI Platform Notebooks instance and clone course repo
/ 15
Generate synthetic data
/ 15
Run your pipeline
/ 15
Creating a JSON schema file
/ 10
Write a JavaScript User-Defined Function in Javascript file
/ 15
Running a Dataflow Template
/ 10
En este lab, aprenderás a hacer lo siguiente:
Requisitos previos:
En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.
Accede a Qwiklabs desde una ventana de incógnito.
Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00
) y asegúrate de finalizarlo en el plazo asignado.
No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.
Cuando esté listo, haga clic en Comenzar lab.
Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.
Haga clic en Abrir Google Console.
Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
Si usa otras credenciales, se generarán errores o incurrirá en cargos.
Acepta las condiciones y omite la página de recursos de recuperación.
Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).
En la consola de Google Cloud, en el Menú de navegación (), selecciona IAM y administración > IAM.
Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com
, y que tenga asignado el rol Editor
. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.
Editor
, sigue los pasos que se indican a continuación para asignar el rol necesario.729328892908
).{número-del-proyecto}
por el número de tu proyecto.En este lab, ejecutarás todos los comandos en una terminal del notebook.
En el menú de navegación de la consola de Google Cloud, haz clic en Vertex AI > Workbench.
Haz clic en Habilitar API de Notebooks.
En la página de Workbench, selecciona NOTEBOOKS ADMINISTRADOS POR EL USUARIO y haz clic en CREAR NUEVO.
En el cuadro de diálogo Instancia nueva que se muestra, establece la región en
En Entorno, selecciona Apache Beam.
Haz clic en CREAR en la parte inferior del cuadro de diálogo.
A continuación, descargarás un repositorio de código que usarás en este lab.
En el panel izquierdo de tu entorno de notebook, en el navegador de archivos, verás que se agregó el repo training-data-analyst.
Navega al repo clonado /training-data-analyst/quests/dataflow_python/
. Verás una carpeta para cada lab. Cada una de ellas se divide en una subcarpeta lab
con un código que debes completar y una subcarpeta solution
con un ejemplo viable que puedes consultar como referencia si no sabes cómo continuar.
Haz clic en Revisar mi progreso para verificar el objetivo.
Aproximadamente 5 minutos
Cloud Dataflow es un servicio de Google Cloud Platform completamente administrado que se usa para ejecutar canalizaciones de procesamiento de datos por lotes y de transmisión de Apache Beam.
Apache Beam es un modelo de programación de procesamiento de datos portátil, de código abierto, unificado y avanzado que permite a los usuarios finales definir canalizaciones de procesamiento paralelo por lotes y de datos de transmisión mediante Java, Python o Go. Las canalizaciones de Apache Beam se pueden ejecutar en conjuntos de datos pequeños en tu máquina de desarrollo local y a gran escala en Cloud Dataflow. Sin embargo, debido a que Apache Beam es de código abierto, existen otros ejecutores, por lo que puedes ejecutar canalizaciones de Beam en Apache Flink y Apache Spark, entre otras opciones.
En esta sección, escribirás una canalización de extracción, transformación y carga (ETL) de Apache Beam.
En cada lab de esta Quest, los datos de entrada están diseñados para ser similares a los registros del servidor web en formato de registro común junto con otros datos que pueda contener un servidor web. En este primer lab, los datos se tratan como una fuente por lotes. En los labs posteriores, se tratarán como una fuente de transmisión. Tu tarea es leer los datos, analizarlos y, luego, escribirlos en BigQuery, un almacén de datos sin servidores, para analizarlos.
Antes de comenzar a editar el código de la canalización en sí, debes asegurarte de haber instalado las dependencias necesarias.
1 hora
La secuencia de comandos crea un archivo llamado events.json
que contiene líneas similares a las siguientes:
Luego, copiará automáticamente este archivo en tu bucket de Google Cloud Storage en
events.json
.Haz clic en Revisar mi progreso para verificar el objetivo.
Si no puedes avanzar en esta sección o en las secciones posteriores, puedes consultar la solución.
1_Basic_ETL/lab
del lab y haz clic en my_pipeline.py. Se abrirá el archivo en un panel de edición. Asegúrate de que se importaron los siguientes paquetes:run()
. Actualmente, este método contiene una canalización que no realiza ninguna acción. Observa cómo se crea un objeto Pipeline a través de un objeto PipelineOptions, y cómo la línea final del método ejecuta la canalización:Todos los datos de las canalizaciones de Apache Beam se encuentran en PCollections. Para crear la PCollection
inicial de tu canalización, tendrás que aplicar una transformación raíz a tu objeto de canalización. Una transformación raíz crea una PCollection
a partir de una fuente de datos externa o algunos datos locales que especifiques.
Existen dos tipos de transformaciones raíz en los SDK de Beam: Read y Create. Las transformaciones Read leen datos de una fuente externa, como un archivo de texto o una tabla de base de datos. Las transformaciones Create crean una PCollection
a partir de una list
en la memoria y son especialmente útiles para las pruebas.
El siguiente código de ejemplo muestra cómo aplicar una transformación raíz ReadFromText
para leer datos de un archivo de texto. La transformación se aplica a un objeto p
de Pipeline
y devuelve un conjunto de datos de canalización en el formato de una PCollection[str]
(con la notación que proviene de sugerencias de tipos parametrizadas). “ReadLines” es el nombre que utilizarás para la transformación, que será útil más adelante cuando trabajes con canalizaciones más grandes:
Dentro del método run()
, crea una constante de cadena llamada “input” y establece su valor en gs://<YOUR-PROJECT-ID>/events.json
. En un lab futuro, usarás parámetros de línea de comandos para pasar esta información.
Cree una PCollection
de cadenas de todos los eventos en events.json
realizando una llamada a la transformación textio.ReadFromText
.
Agrega las sentencias de importación apropiadas a la parte superior de my_pipeline.py
.
Para guardar tu trabajo, haz clic en Archivo y selecciona Guardar en el menú de navegación de la parte superior.
$BASE_DIR
y ejecuta los siguientes comandos. Asegúrate de configurar la variable de entorno PROJECT_ID
antes de ejecutar la canalización:Por el momento, tu canalización no hace nada; solo lee datos.
Sin embargo, cuando se ejecuta muestra un flujo de trabajo útil en el que puedes verificar la canalización de forma local y económica con DirectRunner, que se ejecuta en tu máquina local antes de realizar cálculos más costosos. Para ejecutar la canalización con Google Cloud Dataflow, puedes cambiar el runner
a DataflowRunner.
Si no puedes avanzar, consulta la solución.
Las transformaciones son las que cambian tus datos. En Apache Beam, estas se realizan con la clase PTransform. En el entorno de ejecución, estas operaciones se realizarán para varios trabajadores independientes.
La entrada y salida de cada PTransform
es una PCollection
. De hecho, aunque no te hayas dado cuenta, ya usaste PTransform
cuando leíste datos de Google Cloud Storage. Independientemente de si lo asignaste o no a una variable, esto creó una PCollection
de cadenas.
Debido a que Beam usa un método de aplicación genérico para PCollection
, representadas por el operador de canalización |
en Python, puedes encadenar transformaciones de manera secuencial. Por ejemplo, puedes encadenar transformaciones para crear una canalización secuencial como la siguiente:
Para esta tarea, usarás un tipo nuevo de transformación: ParDo. ParDo
es una transformación de Beam para el procesamiento paralelo genérico.
El paradigma de procesamiento ParDo
es similar a la fase “Map” de un algoritmo de estilo Map/Shuffle/Reduce. Una transformación ParDo
considera cada elemento de la PCollection
de entrada, realiza una función de procesamiento (tu código de usuario) en ese elemento y emite cero elementos, uno solo o varios a una PCollection
de salida.
ParDo
sirve para una variedad de operaciones comunes de procesamiento de datos. Sin embargo, existen PTransform
especiales en Python para simplificar el proceso, incluidas las siguientes:
Filter
para considerar cada elemento de una PCollection
y enviarlo a una PCollection
nueva, o descartarlo según la salida de una función de Python que admita llamadas y devuelva un valor booleano.PCollection
de entrada contiene elementos que son de un tipo o formato diferente del que quieres, puedes usar Map
para realizar una conversión en cada elemento y mostrar el resultado una nueva PCollection
.PCollection
de registros con varios campos, por ejemplo, también puedes usar Map
o FlatMap
para analizar solo los campos que quieras considerar en una nueva PCollection
.ParDo
, Map
o FlatMap
para realizar cálculos simples o complejos en cada elemento o en elementos específicos de una PCollection
, y generar los resultados como una PCollection
nueva.Para completar esta tarea, debes escribir una transformación Map
que lea en una cadena de JSON que represente un evento único, lo analice con el paquete json
de Python y genere el diccionario devuelto por json.loads
.
Las funciones de Map
se pueden implementar de dos maneras, ya sea intercaladas o a través de una función predefinida que admita llamadas. Escribe funciones de Map
intercaladas de la siguiente manera:
Como alternativa, beam.Map
se puede usar con una función de Python definida anteriormente en la secuencia de comandos que admita llamadas:
Si necesitas más flexibilidad de la que ofrece beam.Map
(y otras opciones ligeras de DoFn
), puedes implementar ParDo
con DoFn
personalizadas que sean subclases de DoFn. De esta manera, se integrarán en los frameworks de prueba con mayor facilidad:
Recuerda que, si tienes problemas, puedes consultar la solución.
En este punto, la canalización lee un archivo de Google Cloud Storage, analiza cada línea y emite un diccionario de Python para cada elemento. El siguiente paso es escribir estos objetos en una tabla de BigQuery.
generate_batch_events.sh
ya lo hizo. Puedes examinar el conjunto de datos con el siguiente código:Para entregar los resultados de la PCollection
final de tu canalización, aplica una transformación Write a esa PCollection
. Las transformaciones Write pueden entregar los elementos de una PCollection
a un receptor de datos externo, como una tabla de base de datos. Puedes usar Write para generar una PCollection
en cualquier momento en tu canalización, aunque, por lo general, debes escribir datos al final de tu canalización.
En el siguiente código de ejemplo, se muestra cómo aplicar una transformación WriteToText
para escribir una PCollection
de cadena en un archivo de texto:
WriteToText
, utiliza WriteToBigQuery
.Esta función requiere que se determinen varios elementos, incluida la tabla específica en la que se escribirá y el esquema de esta. De manera opcional, puedes especificar si deseas adjuntar a una tabla existente, volver a crear las tablas existentes (útil en las iteraciones iniciales de la canalización) o crear la tabla si no existe. Según la configuración predeterminada, esta transformación sí creará tablas que no existen y no escribirá en una tabla que no esté vacía.
str
), ID (de tipo int
) y saldo (de tipo float
). Luego, podemos especificar el esquema en una sola línea:O bien, especifícalo como JSON:
En el primer caso (con la cadena única), se supone que todos los campos son NULLABLE
. Podemos especificar el modo si usamos el enfoque JSON.
WRITE_TRUNCATE
borrará y volverá a crear tu tabla todas las veces. Esto es útil en las iteraciones iniciales de la canalización, especialmente durante la iteración de tu esquema, pero puede causar fácilmente problemas inesperados en la producción. WRITE_APPEND
o WRITE_EMPTY
son más seguros.Recuerda definir el esquema de la tabla y agregar el receptor de BigQuery a tu canalización. No olvides que, si tienes problemas, puedes consultar la solución.
DataflowRunner
para ejecutar la canalización en Cloud Dataflow.La forma general debe consistir en una sola ruta de acceso que comience con la transformación Read
y finalice con la transformación Write
. Desde que se ejecute la canalización, los trabajadores se agregarán de forma automática, y el servicio determinará las necesidades de esta, por lo que los trabajadores desaparecerán cuando ya no sean necesarios. Para observar esto, navega a Compute Engine, donde deberías ver máquinas virtuales creadas por el servicio de Dataflow.
runner
en DirectRunner
para ejecutarlo de forma local y recibir comentarios más rápido. Este enfoque funciona en este caso porque el conjunto de datos es pequeño y no estás usando ninguna función que no sea compatible con DirectRunner
.
Si el código no funciona como se espera y no sabes qué hacer, consulta la solución.
Haz clic en Revisar mi progreso para verificar el objetivo.
Aproximadamente 20 minutos
Gran parte del trabajo de los ingenieros de datos es predecible, como los trabajos recurrentes, o es similar a otros trabajos. Sin embargo, se requiere experiencia en ingeniería para el proceso de ejecutar canalizaciones. Piensa en los pasos que acabas de completar:
Sería mucho mejor si hubiera una manera de iniciar un trabajo con una llamada a la API o sin tener que configurar un entorno de desarrollo (una tarea que los usuarios que no son técnicos no podrían realizar). Esto también te permitiría ejecutar canalizaciones.
Con las plantillas de Dataflow se busca resolver este problema cambiando la representación que se genera cuando se crea una canalización para que se pueda parametrizar. Desafortunadamente, no es tan simple como exponer parámetros de la línea de comandos, a pesar de que esto último se hace en un lab posterior. Con las plantillas de Dataflow, el flujo de trabajo anterior se convierte en lo siguiente:
En este lab, practicarás con una de las múltiples plantillas de Dataflow creadas por Google para realizar la misma tarea que la canalización que creaste en la Parte 1.
Al igual que antes, debes pasar a la plantilla de Dataflow un archivo JSON que represente el esquema de este ejemplo.
logs.logs
existente:sed
adicionales permiten compilar un objeto JSON completo que Dataflow esperará.Haz clic en Revisar mi progreso para verificar el objetivo.
La plantilla de Dataflow de Cloud Storage a BigQuery requiere una función de JavaScript para convertir el texto sin procesar en un archivo JSON válido. En este caso, cada línea de texto tiene un formato JSON válido, por lo que la función es trivial.
Para completar esta tarea, crea un archivo nuevo en la carpeta dataflow_python del explorador de archivos en tu IDE.
Para crear un archivo nuevo, haz clic en Archivo >> Nuevo >> Archivo de texto.
Cámbiale el nombre al archivo a transform.js. Para hacerlo, haz clic con el botón derecho sobre él.
Abre transform.js file en el panel de edición y haz clic en el archivo para abrirlo.
Copia la siguiente función en el archivo transform.js y guárdalo:
Haz clic en Revisar mi progreso para verificar el objetivo.
events.json
en el formato schema.json
en el formato .js
con el formato transform
.Mientras se ejecuta tu trabajo, puedes inspeccionarlo desde la IU web de Dataflow.
Haz clic en Revisar mi progreso para verificar el objetivo.
El código de la plantilla de Dataflow que acabas de usar se encuentra en esta guía TextIOToBigQuery.
Desplázate hacia abajo hasta el método principal. El código debería parecer conocido para la canalización que autorizaste.
Pipeline
, creado con un objeto PipelineOptions
.PTransform
que comienza con una transformación TextIO.read().Asegúrate de consultar el siguiente lab, en el que se explicará cómo crear canalizaciones que no sean únicamente cadenas de PTransform
, y cómo adaptar una canalización que creaste para que sea una plantilla de Dataflow personalizada.
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.
Este contenido no está disponible en este momento
Te enviaremos una notificación por correo electrónico cuando esté disponible
¡Genial!
Nos comunicaremos contigo por correo electrónico si está disponible
One lab at a time
Confirm to end all existing labs and start this one