
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
En este lab, aprenderás a hacer lo siguiente:
Al final del lab anterior, se presentó un tipo de desafío con el que deben lidiar las canalizaciones en tiempo real: la brecha entre el momento en que transcurren los eventos y el momento en que se procesan. Esto se denomina retraso. En este lab, se presentan conceptos de Apache Beam que permiten a los creadores de canalizaciones especificar cómo estas deben abordar el retraso de manera formal.
Sin embargo, el retraso no es el único problema que pueden enfrentar las canalizaciones en un contexto de transmisión: siempre que la entrada provenga del exterior del sistema, existe la posibilidad de que tenga un formato incorrecto. En este lab, también se presentan técnicas que pueden usarse para tratar esas entradas.
La canalización final de este lab se parece a la que se muestra en la imagen a continuación. Ten en cuenta que contiene una rama.
En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.
Accede a Qwiklabs desde una ventana de incógnito.
Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00
) y asegúrate de finalizarlo en el plazo asignado.
No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.
Cuando esté listo, haga clic en Comenzar lab.
Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.
Haga clic en Abrir Google Console.
Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
Si usa otras credenciales, se generarán errores o incurrirá en cargos.
Acepta las condiciones y omite la página de recursos de recuperación.
Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).
En la consola de Google Cloud, en el Menú de navegación (), selecciona IAM y administración > IAM.
Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com
, y que tenga asignado el rol Editor
. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.
Editor
, sigue los pasos que se indican a continuación para asignar el rol necesario.729328892908
).{número-del-proyecto}
por el número de tu proyecto.Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud
, similar a Cloud Shell.
Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.
El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs
con un código que debe completar y una carpeta solution
con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer
para ver lo siguiente:
También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:
Para verificarlo, ejecute gcloud auth list
en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:
Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:
En los labs anteriores, escribiste código que dividía los elementos en ventanas de ancho fijo según el tiempo del evento, con un código similar al siguiente:
Sin embargo, como observaste al final del último lab que no es de SQL, las transmisiones de datos suelen tener retrasos. El lapso de tiempo es problemático cuando se emplea el sistema de ventanas por tiempo del evento (en lugar del tiempo de procesamiento) porque genera incertidumbre: ¿llegaron o no todos los eventos que corresponden un tiempo de evento específico?
Claramente, para generar resultados, la canalización escrita debe tomar una decisión al respecto. Para ello, se utilizó un concepto llamado marca de agua. Una marca de agua es la noción del sistema basada en la heurística sobre cuándo se puede esperar que todos los datos generados hasta un tiempo de evento determinado hayan llegado a la canalización. Una vez que la marca de agua avanza más allá del final de una ventana, cualquier elemento adicional que llegue con una marca de tiempo en esa ventana se considera un dato retrasado y, simplemente, se descarta. Por lo tanto, el comportamiento predeterminado de la renderización en ventanas es emitir un resultado único, y (ojalá) completo cuando el sistema cree que llegaron todos los datos.
Apache Beam utiliza varios métodos heurísticos para hacer una conjetura de cuál es la marca de agua. Sin embargo, esto no es más que heurística. Además, es una heurística de uso general, que puede no ser adecuada para todos los casos de uso. En lugar de usar una heurística de uso general, los diseñadores de canalizaciones deben considerar cuidadosamente las siguientes preguntas para determinar qué aspectos vale la pena sacrificar:
Con las respuestas a estas preguntas, es posible usar formalismos de Apache Beam para escribir código que sacrifique los aspectos correctos.
El retraso permitido controla por cuánto tiempo debe retenerse el estado de una ventana. Cuando la marca de agua alcanza el final del período de retraso permitido, se descarta todo el estado. Si bien sería fantástico poder mantener todo nuestro estado persistente por un tiempo ilimitado, cuando se trata de una fuente de datos no delimitada, por lo general no es práctico conservar el estado de una ventana determinada de manera indefinida, ya que corremos el riesgo de quedaremos sin espacio en el disco.
Como resultado, cualquier sistema que deba procesar datos reales desordenados necesita tener un método para limitar la duración de las ventanas de procesamiento. Una forma clara y concisa de hacer esto es definir un umbral para el retraso permitido dentro del sistema, es decir, establecer un límite sobre qué tan tarde puede llegar cualquier registro individual (con relación a la marca de agua) para que valga la pena que el sistema lo procese. Los datos que lleguen después de este umbral simplemente se descartarán. Cuando se limita el retraso máximo de cualquier dato individual, también se establece con exactitud el tiempo por el que se debe conservar el estado de las ventanas (hasta que la marca de agua supere el umbral de retraso para el final de la ventana).
Al igual que en los labs anteriores, el primer paso es generar datos para que la canalización los procese. Abre el entorno del lab y genera los datos como lo hiciste anteriormente.
Haz clic en Revisar mi progreso para verificar el objetivo.
En Apache Beam, el retraso permitido se establece con el método withAllowedLateness(), como se muestra en este siguiente ejemplo:
.withAllowedLateness()
y pasa una Duration
válida creada a partir del parámetro de línea de comandos adecuado. Usa tu criterio para decidir cuál sería un valor razonable y actualiza la línea de comandos para que refleje las unidades correctas.Los diseñadores de canalizaciones también pueden elegir a su criterio cuándo emitir resultados preliminares. Por ejemplo, supongamos que aún no se alcanzó la marca de agua para el final de una ventana, pero ya llegó el 75% de los datos esperados. En muchos casos, se puede suponer que esta muestra es representativa, por lo que tiene sentido mostrar un resultado a los usuarios finales.
Los Triggers
determinan en qué momento del tiempo de procesamiento se materializarán los resultados. A cada resultado específico de una ventana se le conoce como un panel de la ventana. Los activadores activan un panel cuando se cumplen sus condiciones. En Apache Beam, esas condiciones incluyen el progreso de la marca de agua, el progreso del tiempo de procesamiento (que avanza de manera uniforme, independiente de la cantidad de datos que llegue realmente), el recuento de elementos (como cuando llega una cantidad determinada de datos nuevos) y los activadores dependientes de los datos (como cuando se llega al final de un archivo).
Las condiciones de un activador pueden provocar que este active un panel muchas veces. Por lo tanto, también es necesario especificar cómo acumular estos resultados. Actualmente, Apache Beam admite dos modos de acumulación, uno que agrupa resultados y otro que solo devuelve las partes del resultado que son nuevas desde que se activó el último panel.
Cuando se configura una función de renderización en ventanas para una PCollection
con la transformación Window
, también se puede especificar un activador.
Debes invocar el método .triggering()
en el resultado de la transformación Window.into()
para configurar el activador de PCollection
. Window.triggering() acepta un activador como su argumento. Apache Beam incluye varios activadores:
En esta muestra de código, se configura un activador basado en el tiempo para una PCollection
que emite resultados un minuto después de que se procesa el primer elemento de la ventana. La última línea de la muestra de código, .discardingFiredPanes()
, establece el modo de acumulación de la ventana:
Window.triggering()
dentro de la transformación de renderización en ventanas y pasa un Trigger
válido. Cuando diseñes tu activador, ten en cuenta este caso de uso, en el que los datos se organizan en ventanas de un minuto y los datos pueden llegar tarde.Si quieres un ejemplo de un activador, consulta la solución.
Según cómo configures tu Trigger
, si ejecutaras la canalización en este momento y la compararas con la del lab anterior, tal vez notarías que la canalización nueva presenta resultados antes. También es posible que sus resultados tengan una exactitud mayor si la heurística no era la ideal para predecir el comportamiento de la transmisión y si el retraso permitido era mejor.
Sin embargo, aunque la canalización actual es más sólida en lo que respecta a los retrasos, sigue siendo vulnerable a los datos con formato incorrecto. Si ejecutas la canalización y se publica un mensaje que contenga algo distinto de una cadena JSON con el formato correcto que pueda analizarse como un CommonLog
, la canalización generará un error. Si bien existen herramientas, como Cloud Logging, que facilitan la lectura de esos errores, una canalización mejor diseñada puede almacenarlos en una ubicación predefinida para inspeccionarlos en otro momento.
En esta sección, agregarás componentes a la canalización que la harán más modular y sólida.
Para que la canalización sea más sólida ante los datos con formato incorrecto, necesita una forma de filtrarlos y ramificarse para procesarlos de manera diferente. Anteriormente, aprendiste que una forma de generar una rama en una canalización es hacer que una PCollection
sea la entrada de múltiples transformaciones.
Esta forma de ramificación es muy eficaz. Sin embargo, hay algunos casos de uso en los que esta estrategia es ineficiente. Por ejemplo, supongamos que quieres crear dos subconjuntos diferentes de la misma PCollection
. Con el método de transformación múltiple, podrías crear una transformación de filtro para cada subconjunto y aplicar ambas a la PCollection
original. Sin embargo, esto implicaría procesar cada elemento dos veces.
Otro método para producir una canalización con ramas es hacer que una única transformación produzca múltiples resultados, pero que procese la PCollection
de entrada solo una vez. En esta tarea, escribirás una transformación que produzca dos resultados: uno de los datos con formato correcto y otro de los elementos con formato incorrecto del flujo de entrada original.
Para emitir múltiples resultados y seguir creando una sola PCollection
, Apache Beam usa una clase llamada PCollectionTuple
. Una PCollectionTuple es una tupla inmutable de la PCollection
de tipo heterogéneo y en la que se usa TupleTag
como clave.
Este es un ejemplo de una creación de una instancia de PCollectionTuple
con dos tipos de PCollection diferentes. Luego, esas PCollection
se recuperan con el método PCollectionTuple.get()
:
Para usar este método en el contexto de una PTransform
, puedes escribir código como el del siguiente ejemplo, en el que se asigna una TupleTag
a un elemento según su contenido:
TupleTag
en la parte superior de la clase y cambia la transformación JsonToCommonLog
para que muestre una PCollectionTuple
y etiquete elementos sin analizar con una etiqueta y elementos analizados con la otra. En lugar de un bloque if/then/else
, utiliza una sentencia try/catch
.Las transformaciones pueden tener una estructura anidada en la que una transformación compleja realiza múltiples transformaciones más simples (como más de una ParDo
, Combine
, GroupByKey
o, incluso, otras transformaciones compuestas). Estas transformaciones se llaman transformaciones compuestas. Anidar múltiples transformaciones dentro de una única transformación compuesta hace que tu código sea más modular y más fácil de comprender.
PTransform
y anula el método de expansión para especificar la lógica de procesamiento real. En los parámetros del tipo de clase PTransform
, debes pasar los tipos PCollection
que tu transformación toma como entrada y produce como salida.En la siguiente muestra de código, se indica cómo declarar una PTransform
que acepte una PCollection
de strings como entrada y genere una PCollection
de números enteros:
#TODO: JsonToRow
PTransform
, deberás anular el método de expansión. El método de expansión es el que usas para agregar la lógica de procesamiento de la PTransform
. La anulación del método de expansión debe aceptar el tipo apropiado de PCollection
de entrada como un parámetro, y debe especificar la PCollection
de salida como el valor de devolución.PCollection.apply()
en la PCollection
y pasa una instancia de la transformación compuesta:JsonToCommonLog
que acabas de modificar y conviértela en una transformación compuesta. Ten en cuenta que esto generará un problema con la transformación de escritura actual, que espera instancias de CommonLog
. Guarda los resultados de la transformación compuesta en una PCollectionTuple
nueva y usa .get()
para recuperar la PCollection
que espera la transformación de escritura.Para solucionar el problema de una etapa anterior que produce datos con formato incorrecto, es importante poder analizar esos datos. Para ello, es necesario materializarlos en alguna parte. En esta tarea, escribirás datos con formato incorrecto en Google Cloud Storage. Podemos llamar a este patrón mediante el almacenamiento de mensajes no entregados.
En labs anteriores, escribiste directamente desde una fuente delimitada (por lotes) en Cloud Storage mediante TextIO.write()
. Sin embargo, cuando se escribe desde una fuente no delimitada (de transmisión), este enfoque debe modificarse levemente.
En primer lugar, en un momento anterior a la transformación de escritura, es necesario usar un Trigger
para especificar en qué momento del procesamiento debe realizarse la escritura. De lo contrario, se aplica la configuración predeterminada y los datos no se escriben nunca. De forma predeterminada, cada evento pertenece a la ventana global. Cuando se trabaja por lotes, esto es correcto porque se conoce el conjunto de datos completo en el momento de la ejecución. Sin embargo, con las fuentes no delimitadas, el tamaño completo del conjunto de datos es desconocido, por lo que los paneles de la ventana global nunca se activan, ya que nunca están completos.
Debido a que usarás un Trigger
, también deberás usar una Window
. Sin embargo, no siempre es necesario cambiar esta última. En labs y tareas anteriores, utilizaste transformaciones de sistemas de ventanas para reemplazar la ventana global por una ventana de duración fija en el tiempo del evento. En este caso, determinar qué elementos se agrupan no es tan importante como el hecho de que los resultados se materialicen de forma útil y a una tasa útil.
En el siguiente ejemplo, la ventana activa el panel de la ventana global cada 10 segundos de tiempo de procesamiento, pero solo escribe los eventos nuevos:
Una vez que hayas configurado un Trigger
, debes modificar la llamada a TextIO.write()
para realizar las operaciones de escritura. Cuando realices operaciones de escritura después de una transformación de renderización en ventanas, encadena una llamada a withWindowedWrites()
y especifica una cantidad de shards, de modo que las operaciones de escritura se puedan realizar en paralelo:
.get()
en la PCollectionTuple
para recuperar los datos con formato incorrecto. Usa tu criterio y conocimiento sobre los activadores para definir las condiciones de activación adecuadas para este activador.El código de esta Quest incluye una secuencia de comandos para publicar eventos JSON con Pub/Sub.
training-data-analyst/quests/dataflow
.true
agrega eventos retrasados a la transmisión.Haz clic en Revisar mi progreso para verificar el objetivo.
my_topic
.El mensaje debería llegar pronto al bucket de Cloud Storage destinado a los mensajes no entregados, siempre y cuando no se ajuste perfectamente a la especificación JSON CommonLog
. Para hacer un seguimiento de su ruta en la canalización, regresa a la ventana de supervisión de la canalización y haz clic en un nodo de la rama responsable de manejar los mensajes no analizados.
Haz clic en Revisar mi progreso para verificar el objetivo.
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.
Este contenido no está disponible en este momento
Te enviaremos una notificación por correo electrónico cuando esté disponible
¡Genial!
Nos comunicaremos contigo por correo electrónico si está disponible
One lab at a time
Confirm to end all existing labs and start this one