Puntos de control
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
Procesamiento de datos sin servidores con Dataflow: canalización avanzada de análisis de transmisiones con Cloud Dataflow (Java)
- Descripción general
- Configuración y requisitos
- Parte 1 del lab. Cómo abordar los datos retrasados
- Tarea 1. Prepara el entorno
- Tarea 2. Configura el retraso permitido
- Tarea 3. Configura un activador
- Parte 2 del lab. Cómo abordar los datos con errores de formato
- Tarea 1. Recopila datos con errores de formato
- Tarea 2. Haz que el código sea más modular con una transformación compuesta
- Tarea 3. Escribe los datos con formato incorrecto para su análisis posterior
- Tarea 4. Ejecuta tu canalización
- Tarea 5. Prueba tu canalización
- Finalice su lab
Descripción general
En este lab, aprenderás a hacer lo siguiente:
- Manejar los datos con retraso
- Manejar los datos con errores de formato de las siguientes maneras:
- Escribir una transformación compuesta para crear un código más modular
- Escribir una transformación que emita varias salidas de distintos tipos
- Recopilar datos con errores de formato y escribirlos en una ubicación en la que se los pueda examinar
Al final del lab anterior, se presentó un tipo de desafío con el que deben lidiar las canalizaciones en tiempo real: la brecha entre el momento en que transcurren los eventos y el momento en que se procesan. Esto se denomina retraso. En este lab, se presentan conceptos de Apache Beam que permiten a los creadores de canalizaciones especificar cómo estas deben abordar el retraso de manera formal.
Sin embargo, el retraso no es el único problema que pueden enfrentar las canalizaciones en un contexto de transmisión: siempre que la entrada provenga del exterior del sistema, existe la posibilidad de que tenga un formato incorrecto. En este lab, también se presentan técnicas que pueden usarse para tratar esas entradas.
La canalización final de este lab se parece a la que se muestra en la imagen a continuación. Ten en cuenta que contiene una rama.
Configuración y requisitos
En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.
-
Accede a Qwiklabs desde una ventana de incógnito.
-
Ten en cuenta el tiempo de acceso del lab (por ejemplo,
1:15:00
) y asegúrate de finalizarlo en el plazo asignado.
No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo. -
Cuando esté listo, haga clic en Comenzar lab.
-
Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.
-
Haga clic en Abrir Google Console.
-
Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
Si usa otras credenciales, se generarán errores o incurrirá en cargos. -
Acepta las condiciones y omite la página de recursos de recuperación.
Verifica los permisos del proyecto
Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).
-
En la consola de Google Cloud, en el Menú de navegación (), selecciona IAM y administración > IAM.
-
Confirma que aparezca la cuenta de servicio predeterminada de Compute
{número-del-proyecto}-compute@developer.gserviceaccount.com
, y que tenga asignado el rolEditor
. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.
Editor
, sigue los pasos que se indican a continuación para asignar el rol necesario.- En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
- Copia el número del proyecto (p. ej.,
729328892908
). - En el Menú de navegación, selecciona IAM y administración > IAM.
- En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
- En Principales nuevas, escribe lo siguiente:
- Reemplaza
{número-del-proyecto}
por el número de tu proyecto. - En Rol, selecciona Proyecto (o Básico) > Editor.
- Haz clic en Guardar.
Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud
, similar a Cloud Shell.
Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.
El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs
con un código que debe completar y una carpeta solution
con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer
para ver lo siguiente:
También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:
Para verificarlo, ejecute gcloud auth list
en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:
Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:
Parte 1 del lab. Cómo abordar los datos retrasados
En los labs anteriores, escribiste código que dividía los elementos en ventanas de ancho fijo según el tiempo del evento, con un código similar al siguiente:
Sin embargo, como observaste al final del último lab que no es de SQL, las transmisiones de datos suelen tener retrasos. El lapso de tiempo es problemático cuando se emplea el sistema de ventanas por tiempo del evento (en lugar del tiempo de procesamiento) porque genera incertidumbre: ¿llegaron o no todos los eventos que corresponden un tiempo de evento específico?
Claramente, para generar resultados, la canalización escrita debe tomar una decisión al respecto. Para ello, se utilizó un concepto llamado marca de agua. Una marca de agua es la noción del sistema basada en la heurística sobre cuándo se puede esperar que todos los datos generados hasta un tiempo de evento determinado hayan llegado a la canalización. Una vez que la marca de agua avanza más allá del final de una ventana, cualquier elemento adicional que llegue con una marca de tiempo en esa ventana se considera un dato retrasado y, simplemente, se descarta. Por lo tanto, el comportamiento predeterminado de la renderización en ventanas es emitir un resultado único, y (ojalá) completo cuando el sistema cree que llegaron todos los datos.
Apache Beam utiliza varios métodos heurísticos para hacer una conjetura de cuál es la marca de agua. Sin embargo, esto no es más que heurística. Además, es una heurística de uso general, que puede no ser adecuada para todos los casos de uso. En lugar de usar una heurística de uso general, los diseñadores de canalizaciones deben considerar cuidadosamente las siguientes preguntas para determinar qué aspectos vale la pena sacrificar:
- Integridad: ¿Qué tan importante es contar con todos los datos antes de calcular el resultado?
- Latencia: ¿Cuánto tiempo estás dispuesto a esperar a que lleguen los datos? Por ejemplo, ¿esperas a tener todos los datos para procesarlos o los procesas a medida que llegan?
- Costo: ¿Cuánto dinero y capacidad de procesamiento estás dispuesto a invertir para reducir la latencia?
Con las respuestas a estas preguntas, es posible usar formalismos de Apache Beam para escribir código que sacrifique los aspectos correctos.
Retraso permitido
El retraso permitido controla por cuánto tiempo debe retenerse el estado de una ventana. Cuando la marca de agua alcanza el final del período de retraso permitido, se descarta todo el estado. Si bien sería fantástico poder mantener todo nuestro estado persistente por un tiempo ilimitado, cuando se trata de una fuente de datos no delimitada, por lo general no es práctico conservar el estado de una ventana determinada de manera indefinida, ya que corremos el riesgo de quedaremos sin espacio en el disco.
Como resultado, cualquier sistema que deba procesar datos reales desordenados necesita tener un método para limitar la duración de las ventanas de procesamiento. Una forma clara y concisa de hacer esto es definir un umbral para el retraso permitido dentro del sistema, es decir, establecer un límite sobre qué tan tarde puede llegar cualquier registro individual (con relación a la marca de agua) para que valga la pena que el sistema lo procese. Los datos que lleguen después de este umbral simplemente se descartarán. Cuando se limita el retraso máximo de cualquier dato individual, también se establece con exactitud el tiempo por el que se debe conservar el estado de las ventanas (hasta que la marca de agua supere el umbral de retraso para el final de la ventana).
Tarea 1. Prepara el entorno
Al igual que en los labs anteriores, el primer paso es generar datos para que la canalización los procese. Abre el entorno del lab y genera los datos como lo hiciste anteriormente.
Abre el lab adecuado
- Si aún no lo has hecho, crea una terminal nueva en tu IDE y, luego, copia y pega el siguiente comando:
- Configura el entorno de datos:
Haz clic en Revisar mi progreso para verificar el objetivo.
Tarea 2. Configura el retraso permitido
En Apache Beam, el retraso permitido se establece con el método withAllowedLateness(), como se muestra en este siguiente ejemplo:
- Para completar esta tarea, examina la transformación de la renderización en ventanas, agrega una llamada a
.withAllowedLateness()
y pasa unaDuration
válida creada a partir del parámetro de línea de comandos adecuado. Usa tu criterio para decidir cuál sería un valor razonable y actualiza la línea de comandos para que refleje las unidades correctas.
Activadores
Los diseñadores de canalizaciones también pueden elegir a su criterio cuándo emitir resultados preliminares. Por ejemplo, supongamos que aún no se alcanzó la marca de agua para el final de una ventana, pero ya llegó el 75% de los datos esperados. En muchos casos, se puede suponer que esta muestra es representativa, por lo que tiene sentido mostrar un resultado a los usuarios finales.
Los Triggers
determinan en qué momento del tiempo de procesamiento se materializarán los resultados. A cada resultado específico de una ventana se le conoce como un panel de la ventana. Los activadores activan un panel cuando se cumplen sus condiciones. En Apache Beam, esas condiciones incluyen el progreso de la marca de agua, el progreso del tiempo de procesamiento (que avanza de manera uniforme, independiente de la cantidad de datos que llegue realmente), el recuento de elementos (como cuando llega una cantidad determinada de datos nuevos) y los activadores dependientes de los datos (como cuando se llega al final de un archivo).
Las condiciones de un activador pueden provocar que este active un panel muchas veces. Por lo tanto, también es necesario especificar cómo acumular estos resultados. Actualmente, Apache Beam admite dos modos de acumulación, uno que agrupa resultados y otro que solo devuelve las partes del resultado que son nuevas desde que se activó el último panel.
Tarea 3. Configura un activador
Cuando se configura una función de renderización en ventanas para una PCollection
con la transformación Window
, también se puede especificar un activador.
Debes invocar el método .triggering()
en el resultado de la transformación Window.into()
para configurar el activador de PCollection
. Window.triggering() acepta un activador como su argumento. Apache Beam incluye varios activadores:
- AfterWatermark, que realiza la activación cuando la marca de agua pasa una marca de tiempo determinada desde el final de la ventana o desde la llegada del primer elemento de un panel
- AfterProcessingTime, que realiza la activación cuando transcurre determinado tiempo de procesamiento (por lo general, desde la llegada del primer elemento de un panel).
- AfterPane, que activa una propiedad de los elementos en el panel actual, como la cantidad de elementos que se asignaron al panel actual.
En esta muestra de código, se configura un activador basado en el tiempo para una PCollection
que emite resultados un minuto después de que se procesa el primer elemento de la ventana. La última línea de la muestra de código, .discardingFiredPanes()
, establece el modo de acumulación de la ventana:
- Para completar esta tarea, agrega una llamada a
Window.triggering()
dentro de la transformación de renderización en ventanas y pasa unTrigger
válido. Cuando diseñes tu activador, ten en cuenta este caso de uso, en el que los datos se organizan en ventanas de un minuto y los datos pueden llegar tarde.
Si quieres un ejemplo de un activador, consulta la solución.
Parte 2 del lab. Cómo abordar los datos con errores de formato
Según cómo configures tu Trigger
, si ejecutaras la canalización en este momento y la compararas con la del lab anterior, tal vez notarías que la canalización nueva presenta resultados antes. También es posible que sus resultados tengan una exactitud mayor si la heurística no era la ideal para predecir el comportamiento de la transmisión y si el retraso permitido era mejor.
Sin embargo, aunque la canalización actual es más sólida en lo que respecta a los retrasos, sigue siendo vulnerable a los datos con formato incorrecto. Si ejecutas la canalización y se publica un mensaje que contenga algo distinto de una cadena JSON con el formato correcto que pueda analizarse como un CommonLog
, la canalización generará un error. Si bien existen herramientas, como Cloud Logging, que facilitan la lectura de esos errores, una canalización mejor diseñada puede almacenarlos en una ubicación predefinida para inspeccionarlos en otro momento.
En esta sección, agregarás componentes a la canalización que la harán más modular y sólida.
Tarea 1. Recopila datos con errores de formato
Para que la canalización sea más sólida ante los datos con formato incorrecto, necesita una forma de filtrarlos y ramificarse para procesarlos de manera diferente. Anteriormente, aprendiste que una forma de generar una rama en una canalización es hacer que una PCollection
sea la entrada de múltiples transformaciones.
Esta forma de ramificación es muy eficaz. Sin embargo, hay algunos casos de uso en los que esta estrategia es ineficiente. Por ejemplo, supongamos que quieres crear dos subconjuntos diferentes de la misma PCollection
. Con el método de transformación múltiple, podrías crear una transformación de filtro para cada subconjunto y aplicar ambas a la PCollection
original. Sin embargo, esto implicaría procesar cada elemento dos veces.
Otro método para producir una canalización con ramas es hacer que una única transformación produzca múltiples resultados, pero que procese la PCollection
de entrada solo una vez. En esta tarea, escribirás una transformación que produzca dos resultados: uno de los datos con formato correcto y otro de los elementos con formato incorrecto del flujo de entrada original.
Para emitir múltiples resultados y seguir creando una sola PCollection
, Apache Beam usa una clase llamada PCollectionTuple
. Una PCollectionTuple es una tupla inmutable de la PCollection
de tipo heterogéneo y en la que se usa TupleTag
como clave.
Este es un ejemplo de una creación de una instancia de PCollectionTuple
con dos tipos de PCollection diferentes. Luego, esas PCollection
se recuperan con el método PCollectionTuple.get()
:
Para usar este método en el contexto de una PTransform
, puedes escribir código como el del siguiente ejemplo, en el que se asigna una TupleTag
a un elemento según su contenido:
- Para completar esta tarea, declara dos constantes de
TupleTag
en la parte superior de la clase y cambia la transformaciónJsonToCommonLog
para que muestre unaPCollectionTuple
y etiquete elementos sin analizar con una etiqueta y elementos analizados con la otra. En lugar de un bloqueif/then/else
, utiliza una sentenciatry/catch
.
Tarea 2. Haz que el código sea más modular con una transformación compuesta
Las transformaciones pueden tener una estructura anidada en la que una transformación compleja realiza múltiples transformaciones más simples (como más de una ParDo
, Combine
, GroupByKey
o, incluso, otras transformaciones compuestas). Estas transformaciones se llaman transformaciones compuestas. Anidar múltiples transformaciones dentro de una única transformación compuesta hace que tu código sea más modular y más fácil de comprender.
- Para crear tu propia transformación compuesta, crea una subclase de la clase
PTransform
y anula el método de expansión para especificar la lógica de procesamiento real. En los parámetros del tipo de clasePTransform
, debes pasar los tiposPCollection
que tu transformación toma como entrada y produce como salida.
En la siguiente muestra de código, se indica cómo declarar una PTransform
que acepte una PCollection
de strings como entrada y genere una PCollection
de números enteros:
#TODO: JsonToRow
- Dentro de tu subclase
PTransform
, deberás anular el método de expansión. El método de expansión es el que usas para agregar la lógica de procesamiento de laPTransform
. La anulación del método de expansión debe aceptar el tipo apropiado dePCollection
de entrada como un parámetro, y debe especificar laPCollection
de salida como el valor de devolución.
- Para invocar la transformación, usa
PCollection.apply()
en laPCollection
y pasa una instancia de la transformación compuesta:
- Para completar esta tarea, toma la transformación
JsonToCommonLog
que acabas de modificar y conviértela en una transformación compuesta. Ten en cuenta que esto generará un problema con la transformación de escritura actual, que espera instancias deCommonLog
. Guarda los resultados de la transformación compuesta en unaPCollectionTuple
nueva y usa.get()
para recuperar laPCollection
que espera la transformación de escritura.
Tarea 3. Escribe los datos con formato incorrecto para su análisis posterior
Para solucionar el problema de una etapa anterior que produce datos con formato incorrecto, es importante poder analizar esos datos. Para ello, es necesario materializarlos en alguna parte. En esta tarea, escribirás datos con formato incorrecto en Google Cloud Storage. Podemos llamar a este patrón mediante el almacenamiento de mensajes no entregados.
En labs anteriores, escribiste directamente desde una fuente delimitada (por lotes) en Cloud Storage mediante TextIO.write()
. Sin embargo, cuando se escribe desde una fuente no delimitada (de transmisión), este enfoque debe modificarse levemente.
En primer lugar, en un momento anterior a la transformación de escritura, es necesario usar un Trigger
para especificar en qué momento del procesamiento debe realizarse la escritura. De lo contrario, se aplica la configuración predeterminada y los datos no se escriben nunca. De forma predeterminada, cada evento pertenece a la ventana global. Cuando se trabaja por lotes, esto es correcto porque se conoce el conjunto de datos completo en el momento de la ejecución. Sin embargo, con las fuentes no delimitadas, el tamaño completo del conjunto de datos es desconocido, por lo que los paneles de la ventana global nunca se activan, ya que nunca están completos.
Debido a que usarás un Trigger
, también deberás usar una Window
. Sin embargo, no siempre es necesario cambiar esta última. En labs y tareas anteriores, utilizaste transformaciones de sistemas de ventanas para reemplazar la ventana global por una ventana de duración fija en el tiempo del evento. En este caso, determinar qué elementos se agrupan no es tan importante como el hecho de que los resultados se materialicen de forma útil y a una tasa útil.
En el siguiente ejemplo, la ventana activa el panel de la ventana global cada 10 segundos de tiempo de procesamiento, pero solo escribe los eventos nuevos:
Una vez que hayas configurado un Trigger
, debes modificar la llamada a TextIO.write()
para realizar las operaciones de escritura. Cuando realices operaciones de escritura después de una transformación de renderización en ventanas, encadena una llamada a withWindowedWrites()
y especifica una cantidad de shards, de modo que las operaciones de escritura se puedan realizar en paralelo:
- Para completar esta tarea, crea una transformación nueva con
.get()
en laPCollectionTuple
para recuperar los datos con formato incorrecto. Usa tu criterio y conocimiento sobre los activadores para definir las condiciones de activación adecuadas para este activador.
Tarea 4. Ejecuta tu canalización
- Para ejecutar tu canalización, crea un comando similar al siguiente ejemplo. Ten en cuenta que deberás modificarlo para que refleje los nombres de las opciones de línea de comandos que incluiste.
El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub.
- Para completar esta tarea y comenzar a publicar mensajes, abre una nueva terminal al lado de la actual y ejecuta la siguiente secuencia de comandos. Se seguirán publicando mensajes hasta que finalices la secuencia de comandos. Asegúrate de estar en la carpeta
training-data-analyst/quests/dataflow
.
true
agrega eventos retrasados a la transmisión.Haz clic en Revisar mi progreso para verificar el objetivo.
Tarea 5. Prueba tu canalización
- Navega a Pub/Sub > Temas y haz clic en el tema
my_topic
. - Haz clic en la pestaña Mensajes y, luego, en el botón Publicar mensaje.
- En la página siguiente, ingresa el mensaje que deseas publicar.
El mensaje debería llegar pronto al bucket de Cloud Storage destinado a los mensajes no entregados, siempre y cuando no se ajuste perfectamente a la especificación JSON CommonLog
. Para hacer un seguimiento de su ruta en la canalización, regresa a la ventana de supervisión de la canalización y haz clic en un nodo de la rama responsable de manejar los mensajes no analizados.
- Cuando veas un elemento agregado a esta rama, podrás navegar a Cloud Storage y verificar que el mensaje se haya escrito en el disco:
Haz clic en Revisar mi progreso para verificar el objetivo.
Finalice su lab
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
- 1 estrella = Muy insatisfecho
- 2 estrellas = Insatisfecho
- 3 estrellas = Neutral
- 4 estrellas = Satisfecho
- 5 estrellas = Muy satisfecho
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.