arrow_back

Procesamiento de datos sin servidores con Dataflow: Cómo escribir una canalización de ETL mediante Apache Beam y Cloud Dataflow (Java)

Acceder Unirse
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Procesamiento de datos sin servidores con Dataflow: Cómo escribir una canalización de ETL mediante Apache Beam y Cloud Dataflow (Java)

Lab 1 hora 30 minutos universal_currency_alt 5 créditos show_chart Intermedio
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Crear una canalización de extracción, transformación y carga por lotes en Apache Beam, que utiliza datos sin procesar de Google Cloud Storage y los escribe en Google BigQuery
  • Ejecutar la canalización de Apache Beam en Cloud Dataflow
  • Parametrizar la ejecución de la canalización

Requisitos previos:

  • Conocimientos básicos sobre Java

Configuración y requisitos

En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Qwiklabs desde una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usa otras credenciales, se generarán errores o incurrirá en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Activa Google Cloud Shell

Google Cloud Shell es una máquina virtual que cuenta con herramientas para desarrolladores. Ofrece un directorio principal persistente de 5 GB y se ejecuta en Google Cloud.

Google Cloud Shell proporciona acceso de línea de comandos a tus recursos de Google Cloud.

  1. En la consola de Cloud, en la barra de herramientas superior derecha, haz clic en el botón Abrir Cloud Shell.

    Ícono de Cloud Shell destacado

  2. Haz clic en Continuar.

El aprovisionamiento y la conexión al entorno demorarán unos minutos. Cuando te conectes, habrás completado la autenticación, y el proyecto estará configurado con tu PROJECT_ID. Por ejemplo:

ID del proyecto destacado en la terminal de Cloud Shell

gcloud es la herramienta de línea de comandos de Google Cloud. Viene preinstalada en Cloud Shell y es compatible con el completado de línea de comando.

  • Puedes solicitar el nombre de la cuenta activa con este comando:
gcloud auth list

Resultado:

Credentialed accounts: - @.com (active)

Resultado de ejemplo:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Puedes solicitar el ID del proyecto con este comando:
gcloud config list project

Resultado:

[core] project =

Resultado de ejemplo:

[core] project = qwiklabs-gcp-44776a13dea667a6 Nota: La documentación completa de gcloud está disponible en la guía de descripción general de gcloud CLI .

Verifica los permisos del proyecto

Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).

  1. En la consola de Google Cloud, en el Menú de navegación (Ícono del menú de navegación), selecciona IAM y administración > IAM.

  2. Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com, y que tenga asignado el rol Editor. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.

El nombre de la cuenta de servicio predeterminada de Compute Engine y el estado del editor destacados en la página de pestañas Permisos

Nota: Si la cuenta no aparece en IAM o no tiene asignado el rol Editor, sigue los pasos que se indican a continuación para asignar el rol necesario.
  1. En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
  2. Copia el número del proyecto (p. ej., 729328892908).
  3. En el Menú de navegación, selecciona IAM y administración > IAM.
  4. En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
  5. En Principales nuevas, escribe lo siguiente:
{project-number}-compute@developer.gserviceaccount.com
  1. Reemplaza {número-del-proyecto} por el número de tu proyecto.
  2. En Rol, selecciona Proyecto (o Básico) > Editor.
  3. Haz clic en Guardar.

Configura tu IDE

Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud, similar a Cloud Shell.

Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.

NOTA: Es posible que deba esperar entre 3 y 5 minutos para que se aprovisione por completo el entorno, incluso después de que aparezca la URL. Hasta ese momento, se mostrará un error en el navegador.

ide_url

El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs con un código que debe completar y una carpeta solution con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer para ver lo siguiente:

file_explorer

También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:

new_terminal

Para verificarlo, ejecute gcloud auth list en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:

gcloud_auth

Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:

gce_reset

Apache Beam y Cloud Dataflow

Aproximadamente 5 minutos

Cloud Dataflow es un servicio de Google Cloud completamente administrado que se usa para ejecutar canalizaciones de procesamiento de datos por lotes y de transmisión de Apache Beam.

Apache Beam es un modelo de programación de procesamiento de datos portátil, de código abierto, unificado y avanzado que permite a los usuarios finales definir canalizaciones de procesamiento paralelo de datos por lotes y de transmisión mediante Java, Python o Go. Las canalizaciones de Apache Beam se pueden ejecutar en tu máquina de desarrollo local, en conjuntos de datos pequeños, y a gran escala en Cloud Dataflow. Sin embargo, debido a que Apache Beam es de código abierto, existen otros ejecutores, por lo que puedes ejecutar canalizaciones de Beam en Apache Flink y Apache Spark, entre otras.

Diagrama de la arquitectura de modelos de dBeam.

Parte 1 del Lab. Escribe una canalización de ETL desde cero

Introducción

En esta sección, escribirás una canalización de extracción, transformación y carga (ETL) de Apache Beam.

Revisión de casos de uso y conjuntos de datos

En cada lab de esta Quest, los datos de entrada están diseñados para ser similares a los registros del servidor web en formato de registro común junto con otros datos que pueda contener un servidor web. En este primer lab, los datos se tratan como una fuente por lotes. En labs posteriores, los datos se tratarán como una fuente de transmisión. Tu tarea es leer los datos, analizarlos y, luego, escribirlos en BigQuery, un almacén de datos sin servidores, para su análisis posterior.

Abre el lab adecuado

  • Si aún no lo has hecho, crea una terminal nueva en tu entorno de IDE y, luego, copia y pega el siguiente comando:
# Change directory into the lab cd 1_Basic_ETL/labs export BASE_DIR=$(pwd)

Modifica el archivo pom.xml

Antes de comenzar a editar el código real de la canalización, debes agregar las dependencias necesarias.

  1. Agrega las siguientes dependencias a tu archivo pom.xml, ubicado en 1_Basic_ETL/labs, dentro de la etiqueta de dependencias:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> <version>${beam.version}</version> </dependency>
  1. Ya se agregó una etiqueta <beam.version> en pom.xml para indicar qué versión de Beam se debe instalar. Guarda el archivo.

  2. Por último, descarga estas dependencias para usarlas en tu canalización:

# Download dependencies listed in pom.xml mvn clean dependency:resolve

Escribe tu primera canalización

1 hora

Tarea 1. Genera datos sintéticos

  1. Ejecuta el siguiente comando en la shell para clonar un repositorio que contenga secuencias de comandos para generar registros sintéticos del servidor web:
# Change to the directory containing the relevant code cd $BASE_DIR/../.. # Create GCS buckets and BQ dataset source create_batch_sinks.sh # Run a script to generate a batch of web server log events bash generate_batch_events.sh # Examine some sample events head events.json

La secuencia de comandos crea un archivo llamado events.json que contiene líneas similares a los siguientes ejemplos:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Luego, copia este archivo automáticamente en tu bucket de Google Cloud Storage, en gs://<ID-del-proyecto>/events.json.

  1. Navega a Google Cloud Storage y confirma que tu bucket de almacenamiento contenga un archivo llamado events.json.

Haz clic en Revisar mi progreso para verificar el objetivo. Generar datos sintéticos

Tarea 2. Lee datos desde tu fuente

Si no puedes avanzar en esta sección o en secciones posteriores, consulta la solución.

  1. Abre MyPipeline.java en tu IDE, que se puede encontrar en 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline. Asegúrate de que se importaron los siguientes paquetes:
import com.google.gson.Gson; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  1. Desplázate hacia abajo hasta el método run(). Actualmente, este método contiene una canalización que no realiza ninguna acción. Observa cómo se crea un objeto Pipeline con un objeto PipelineOptions, y cómo la línea final del método ejecuta la canalización.
Pipeline pipeline = Pipeline.create(options); // Do stuff pipeline.run();

Todos los datos en las canalizaciones de Apache Beam residen en PCollections. Para crear la PCollection inicial de tu canalización, aplica una transformación raíz a tu objeto de canalización. Una transformación raíz crea una PCollection a partir de una fuente de datos externa o algunos datos locales que especifiques.

Existen dos tipos de transformaciones raíz en los SDK de Beam: Read y Create. Las transformaciones Read leen datos de una fuente externa, como un archivo de texto o una tabla de base de datos. Las transformaciones Create crean una PCollection a partir de una java.util.Collection en la memoria y son especialmente útiles para las pruebas.

El siguiente código de ejemplo muestra cómo aplicar una transformación raíz TextIO.Read para leer datos desde un archivo de texto. La transformación se aplica a un objeto p de Pipeline y devuelve un conjunto de datos de canalización en el formato de una PCollection<String>. “ReadLines” es tu nombre para la transformación, que será útil más adelante cuando trabajes con canalizaciones más grandes:

PCollection<String> lines = pipeline.apply("ReadLines", TextIO.read().from("gs://path/to/input.txt"));
  1. Dentro del método run(), crea una constante de cadena llamada “input” y establece su valor en gs://<ID-del-proyecto>/events.json. En un lab futuro, utilizarás parámetros de línea de comandos para pasar esta información.

  2. Crea una PCollection de strings de todos los eventos en events.json mediante una llamada a la transformación TextIO.read().

  3. Agrega las sentencias de importación correspondientes a la parte superior de MyPipeline.java, en este caso, import org.apache.beam.sdk.values.PCollection;

Tarea 3. Ejecuta la canalización para verificar que funcione

  • Regresa a la terminal, selecciona la carpeta $BASE_DIR y ejecuta el comando mvn compile exec:java:
cd $BASE_DIR # Set up environment variables export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} Nota: En caso de que la compilación falle, ejecuta el comando mvn clean install.

Por el momento, tu canalización no hace nada; solo lee datos. Sin embargo, cuando se ejecuta muestra un flujo de trabajo útil en el que puedes verificar la canalización de forma local y económica con DirectRunner, que se ejecuta en tu máquina local antes de realizar cálculos más costosos. Para ejecutar la canalización con Google Cloud Dataflow, puedes cambiar el runner a DataflowRunner.

Tarea 4. Agrega una transformación

Si no puedes avanzar, consulta la solución.

Las transformaciones son las que cambian sus datos. En Apache Beam, estas se realizan con la clase PTransform. En el entorno de ejecución, estas operaciones se realizarán para varios trabajadores independientes. La entrada y salida de cada PTransform es una PCollection. De hecho, aunque no te hayas dado cuenta, ya usaste PTransform cuando leíste datos de Google Cloud Storage. Ya sea que lo hayas asignado o no a una variable, esto creó una PCollection de strings.

Debido a que Beam usa un método de aplicación genérico para las PCollection, puedes encadenar las transformaciones de forma secuencial. Por ejemplo, puedes encadenar transformaciones para crear una canalización secuencial como la siguiente:

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]);

Para esta tarea, usa un tipo nuevo de transformación: una ParDo. ParDo es una transformación de Beam para el procesamiento paralelo genérico. El paradigma de procesamiento ParDo es similar a la fase “Map” de un algoritmo de estilo Map/Shuffle/Reduce. Una transformación ParDo considera cada elemento de la PCollection de entrada, realiza funciones de procesamiento (tu código de usuario) en ese elemento y emite cero, uno o varios elementos a una PCollection de salida.

ParDo es útil para una variedad de operaciones de procesamiento de datos, como las siguientes opciones:

  • Filtrar un conjunto de datos. Puedes usar ParDo para considerar cada elemento en una PCollection y enviar ese elemento a una colección nueva o descartarlo.
  • Formatear o convertir tipos de cada elemento en un conjunto de datos. Si tu PCollection de entrada contiene elementos que son de un tipo o formato diferente al que deseas, puedes usar ParDo para realizar una conversión en cada elemento y mostrar el resultado en una PCollection nueva.
  • Extraer partes de cada elemento en un conjunto de datos. Si tienes una PCollection de los registros con varios campos, por ejemplo, puedes usar una ParDo para analizar solo los campos que quieras tener en cuenta en una nueva PCollection.
  • Realizar procesamientos sobre cada elemento en un conjunto de datos. Puede usar ParDo para realizar cálculos simples o complejos en cada elemento, o en ciertos elementos de una PCollection, y mostrar los resultados como una PCollection nueva.
  1. Para completar esta tarea, escribe una transformación ParDo que se lea en una string JSON que represente un solo evento, la analice con Gson y emita el objeto personalizado que muestra Gson.

Las funciones de ParDo se pueden implementar de dos maneras, ya sea intercaladas o como una clase estática. Escriba funciones de ParDo intercaladas como esta:

pCollection.apply(ParDo.of(new DoFn<T1, T2>() { @ProcessElement public void processElement(@Element T1 i, OutputReceiver<T2> r) { // Do something r.output(0); } }));

De forma alternativa, se pueden implementar como clases estáticas que extienden DoFn. De esta manera, se pueden integrar con mayor facilidad en los frameworks de prueba:

static class MyDoFn extends DoFn<T1, T2> { @ProcessElement public void processElement(@Element T1 json, OutputReceiver<T2> r) throws Exception { // Do something r.output(0); } }

Luego, ejecuta este código dentro de la canalización:

[Initial Input PCollection].apply(ParDo.of(new MyDoFn());
  1. Para usar Gson, deberás crear una clase interna dentro de MyPipeline. Para aprovechar los esquemas de Beam, agregue la anotación @DefaultSchema. Luego hablaremos sobre ese tema. Este es un ejemplo de cómo usar Gson:
// Elsewhere @DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; String field2; } // Within the DoFn Gson gson = new Gson(); MyClass myClass = gson.fromJson(jsonString, MyClass.class);
  1. Asígnale el nombre CommmonLog a tu clase interna. Para construir esta clase interna con las variables de estado correctas, consulta el ejemplo de JSON anterior: la clase debe tener una variable de estado para cada clave del JSON entrante, y esta variable debe aceptar el tipo y el nombre con el valor y la clave.

  2. Por ahora, usa String para la “marca de tiempo”, Long para “NÚMERO ENTERO” (el número entero de BigQuery es INT64), Double para “NÚMERO DE PUNTO FLOTANTE” (el NÚMERO DE PUNTO FLOTANTE de BigQuery es FLOAT64) y haz que esto coincida con el siguiente esquema de BigQuery:

Página con pestañas de Esquema CommonLog, que incluye información de registro como user_id, timestamp y num_bytes.

Recuerda que, si tienes problemas, puedes consultar la solución.

Tarea 5. Escribe en un receptor

En este punto, tu canalización lee un archivo de Google Cloud Storage, analiza cada línea y emite un CommonLog para cada elemento. El siguiente paso es escribir estos objetos de CommonLog en una tabla de BigQuery.

Si bien puedes indicarle a tu canalización que cree una tabla de BigQuery, si es necesario, primero debes crear el conjunto de datos. La secuencia de comandos generate_batch_events.sh ya lo hizo,

así que puedes examinarlo, como se muestra a continuación:

# Examine dataset bq ls # No tables yet bq ls logs

Para entregar los resultados de la PCollection final de tsu canalización, aplica una transformación Write a esa PCollection. Las transformaciones Write pueden enviar los elementos de una PCollection a un receptor de datos externo, como una tabla de base de datos. Puedes usar Write para entregar una PCollection en cualquier momento en tu canalización, aunque, por lo general, debes escribir datos al final de la canalización.

En el siguiente código de ejemplo, se muestra cómo aplicar una transformación TextIO.Write para escribir una PCollection de String en un archivo de texto:

PCollection<String> pCollection = ...; pCollection.apply("WriteMyFile", TextIO.write().to("gs://path/to/output"));
  1. En este caso, en lugar de usar TextIO.write(), usa BigQueryIO.write().

Esta función requiere que se especifiquen algunos aspectos, como la tabla en la que se debe escribir, el esquema de esta tabla. De manera opcional, puedes especificar si deseas adjuntar a una tabla existente, volver a crear las tablas actuales (útil en las primeras iteraciones de la canalización) o crear una si no existe. De forma predeterminada, esta transformación creará tablas que no existen y no escribirá en una tabla que no esté en blanco..

Desde la adición de esquemas de Beam al SDK, puedes indicar a la transformación que deduzca el esquema de la tabla del objeto que se le pasó. Para ello, usa .useBeamSchema() y marca el tipo de entrada. De manera alternativa, puedes proporcionar el esquema explícitamente con .withSchema(), pero necesitarías crear un objeto TableSchema de BigQuery para pasar. Como se indicó en la clase CommonLog con @DefaultSchema(JavaFieldSchema.class), cada transformación reconoce los nombres y tipos de los campos del objeto, incluido BigQueryIO.write().

  1. Examina las diversas alternativas en la sección “Escritura” de BigQueryIO. En este caso, dado que anotaste tu objeto CommonLog, usa .useBeamSchema() y orienta la tabla <ID-del-proyecto>:logs.logs de la siguiente forma:
pCollection.apply(BigQueryIO.<MyObject>write() .to("my-project:output_dataset.output_table") .useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) ); Nota: WRITE_TRUNCATE borrará y volverá a crear la tabla todas las veces. Esto es útil en las iteraciones iniciales de la canalización, especialmente durante la iteración de tu esquema, pero puede causar fácilmente problemas inesperados en la producción. WRITE_APPEND o WRITE_EMPTY son más seguros.

El conjunto de todos los tipos disponibles en esquemas de Beam se puede encontrar en la documentación Schema. FieldType. Todos los posibles tipos de datos de BigQuery en SQL estándar que se pueden usar se encuentran en la documentación de setType y, si quieres, inspecciona la conversión del esquema de Beam a BigQuery.

Tarea 6. Ejecuta tu canalización

Regresa a la terminal, cambia el valor de la variable de entorno RUNNER por DataflowRunner y ejecuta tu canalización con el mismo comando de antes. Una vez iniciada, navega a la página del producto Dataflow y observa la disposición de tu canalización. Se mostrarán los nombres que hayas asignado a tus transformaciones. Si haces clic en cada uno, se mostrará en tiempo real la cantidad de elementos que se procesan por segundo.

La forma general debe consistir en una sola ruta de acceso que comience con la transformación Read y finalice con la transformación Write. Desde que se ejecute la canalización, los trabajadores se agregarán de forma automática, y el servicio determinará las necesidades de esta, por lo que los trabajadores desaparecerán cuando ya no sean necesarios. Para observar esto, navega a Compute Engine, donde deberías ver máquinas virtuales creadas por el servicio de Dataflow.

Nota: Si tu canalización se crea correctamente, pero observas muchos errores debido a un código o a una configuración incorrecta en el servicio de Dataflow, puedes configurar RUNNER de nuevo en “DirectRunner” para ejecutarlo de forma local y recibir resultados más rápido. Este enfoque funciona en este caso porque el conjunto de datos es pequeño y no estás usando ninguna función que no sea compatible con DirectRunner. # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export PIPELINE_FOLDER=gs://${PROJECT_ID} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER}"
  • Una vez finalizada la canalización, regresa a la ventana del navegador de BigQuery y consulta tu tabla.

Si el código no funciona como se espera y no sabes qué hacer, consulta la solución.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar tu canalización

Parte 2 del lab. Parametrización del proceso de ETL básico

Aproximadamente 20 minutos

Gran parte del trabajo de los ingenieros de datos es predecible, como los trabajos recurrentes, o es similar a otros trabajos. Sin embargo, se requiere experiencia en ingeniería para el proceso de ejecutar canalizaciones. Piensa en los pasos que acabas de completar:

  1. Creaste un entorno de desarrollo y desarrollaste una canalización. El entorno incluyó el SDK de Apache Beam y otras dependencias.
  2. Ejecutaste la canalización desde el entorno de desarrollo. El SDK de Apache Beam almacenó archivos en etapa intermedia en Cloud Storage, creó un archivo de solicitud de trabajo y envió el archivo al servicio de Cloud Dataflow.

Sería mucho mejor si existiera una forma de iniciar un trabajo a través de una llamada a la API o sin tener que configurar un entorno de desarrollo (que los usuarios que no son técnicos no podrían hacer). Esto también te permitiría ejecutar canalizaciones.

Con las plantillas de Dataflow se busca resolver este problema cambiando la representación que se genera cuando se crea una canalización para que se pueda parametrizar. Desafortunadamente, no es tan simple como exponer parámetros de la línea de comandos, a pesar de que esto último se hace en un lab posterior. Con las plantillas de Dataflow, el flujo de trabajo anterior se convierte en lo siguiente:

  1. Los desarrolladores crean un entorno de desarrollo y desarrollan su canalización. El entorno incluye el SDK de Apache Beam y otras dependencias.
  2. Los desarrolladores ejecutan la canalización y crean una plantilla. El SDK de Apache Beam almacena archivos en etapa intermedia en Cloud Storage, crea un archivo de plantilla (similar al de solicitud de trabajo) y guarda el archivo de plantilla en Cloud Storage.
  3. Los usuarios que no son desarrolladores y otras herramientas de flujo de trabajo, como Airflow, pueden ejecutar trabajos con facilidad mediante la consola de Google Cloud, la herramienta de línea de comandos de gcloud o la API de REST para enviar solicitudes de ejecución de archivos de plantillas al servicio de Cloud Dataflow.

En este lab, practicarás con una de las múltiples plantillas de Dataflow creadas por Google para realizar la misma tarea que la canalización que creaste en la Parte 1.

Tarea 1. Crea un archivo de esquema JSON

Aunque no tuviste que pasar un objeto TableSchema a la transformación BigQueryIO.writeTableRows() porque usaste .usedBeamSchema(), debes pasar a la plantilla de Dataflow un archivo JSON que represente el esquema en este ejemplo.

  1. Abre la terminal y vuelve al directorio principal. Ejecuta el siguiente comando para obtener el esquema de tu tabla logs.logs existente:
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. Ahora, capta este resultado en un archivo y súbelo a GCS. Los comandos sed adicionales permiten compilar un objeto JSON completo que Dataflow esperará.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gsutil cp schema.json gs://${PROJECT_ID}/

Haz clic en Revisar mi progreso para verificar el objetivo. Crear un archivo de esquema JSON

Tarea 2. Escribe una función de JavaScript definida por el usuario

La plantilla de Dataflow de Cloud Storage a BigQuery requiere una función de JavaScript para convertir el texto sin procesar en un archivo JSON válido. En este caso, cada línea de texto tiene un formato JSON válido, por lo que la función es trivial.

Para completar esta tarea, usa el IDE para crear un archivo .js con el contenido que se muestra a continuación y, luego, cópialo en Google Cloud Storage.

  1. Copia la siguiente función en su propio archivo transform.js en la carpeta 1_Basic_ETL/:
function transform(line) { return line; }
  1. Luego, ejecuta el siguiente comando para copiar el archivo en Google Cloud Storage:
cd 1_Basic_ETL/ export PROJECT_ID=$(gcloud config get-value project) gsutil cp *.js gs://${PROJECT_ID}/

Haz clic en Revisar mi progreso para verificar el objetivo. Escribir una función de JavaScript definida por el usuario

Tarea 3. Ejecuta una plantilla de Dataflow

  1. Ve a la IU web de Cloud Dataflow.
  2. Haz clic en CREAR UN TRABAJO A PARTIR DE UNA PLANTILLA.
  3. Ingresa un nombre para tu trabajo de Cloud Dataflow.
  4. En la plantilla de Cloud Dataflow, selecciona la plantilla Text Files on Cloud Storage to BigQuery en la sección Process Data in Bulk (batch), NO en la sección Streaming.
  5. En Ruta de acceso de UDF de JavaScript en Cloud Storage, ingresa la ruta de acceso a .js, con el formato gs://<ID-del-proyecto>/transform.js.
  6. En Ruta de JSON, escribe la ruta a tu archivo schema.json, con el formato gs://<ID-del-proyecto>/schema.json.
  7. En JavaScript UDF name, ingresa transform.
  8. En BigQuery output table, ingresa <myprojectid>:logs.logs.
  9. En Cloud Storage input path, ingresa la ruta de acceso a events.json con el formato gs://<ID-del-proyecto>/events.json.
  10. En Temporary BigQuery directory, ingresa una carpeta nueva dentro de este mismo bucket. El trabajo lo creará por ti.
  11. En Ubicación temporal, ingresa una segunda carpeta nueva dentro de este mismo bucket.
  12. Deja Encriptación en Clave administrada por Google.
  13. Haz clic en el botón Ejecutar trabajo.

Mientras se ejecuta tu trabajo, puedes inspeccionarlo desde la IU web de Dataflow.

Haz clic en Revisar mi progreso para verificar el objetivo. Ejecutar una plantilla de Dataflow

Tarea 4. Inspecciona el código de la plantilla de Dataflow

  1. Recupera el código de la plantilla de Dataflow que acabas de usar.

  2. Desplázate hacia abajo hasta el método principal. El código debería parecer conocido para la canalización que autorizaste.

  • Comienza con un objeto Pipeline, creado con un objeto PipelineOptions.
  • Consiste en una cadena de PTransform que comience con una transformación TextIO.read().
  • La PTransform después de la transformación de Read es un poco diferente. Permite usar JavaScript para transformar las cadenas de entrada (por ejemplo, si el formato de origen no se adapta bien al de la tabla de BigQuery). Para obtener documentación sobre cómo usar esta función, consulta esta página.
  • PTransform después de la UDF de JavaScript usa una función de biblioteca para convertir el JSON en una fila de tabla. Consulta ese código aquí.
  • La PTransform write se ve un poco diferente porque, en lugar de usar un esquema conocido en el tiempo de compilación del gráfico, el código pretende aceptar parámetros que solo se conocerán en el tiempo de ejecución. Esto es posible gracias a la clase NestedValueProvider. También usa un esquema establecido explícitamente, en lugar de uno inferido de un esquema de Beam con .useBeamSchema() como lo hiciste tú.
  1. Asegúrate de revisar el siguiente lab, en el que se explicará cómo crear canalizaciones que no sean únicamente cadenas de PTransform, y cómo adaptar una canalización que creaste para que sea una plantilla de Dataflow personalizada.

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible