
Before you begin
- Labs create a Google Cloud project and resources for a fixed time
- Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
- On the top left of your screen, click Start lab to begin
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
En este lab, aprenderás a hacer lo siguiente:
DoFn
y PTransform
mediante las herramientas de prueba de Apache BeamTestStream
para probar el comportamiento del sistema de ventanas en una canalización de transmisiónLa prueba de tu canalización es un paso importante en el desarrollo de una solución de procesamiento de datos efectiva. Debido a la naturaleza indirecta del modelo de Beam, la depuración de ejecuciones con errores puede convertirse en una tarea importante.
En este lab, aprenderemos a realizar pruebas de unidades de forma local con herramientas del paquete de pruebas del SDK de Beam usando DirectRunner
.
En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.
Accede a Qwiklabs desde una ventana de incógnito.
Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00
) y asegúrate de finalizarlo en el plazo asignado.
No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.
Cuando esté listo, haga clic en Comenzar lab.
Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.
Haga clic en Abrir Google Console.
Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
Si usa otras credenciales, se generarán errores o incurrirá en cargos.
Acepta las condiciones y omite la página de recursos de recuperación.
Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).
En la consola de Google Cloud, en el Menú de navegación (), selecciona IAM y administración > IAM.
Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com
, y que tenga asignado el rol Editor
. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.
Editor
, sigue los pasos que se indican a continuación para asignar el rol necesario.729328892908
).{número-del-proyecto}
por el número de tu proyecto.Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud
, similar a Cloud Shell.
Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.
El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs
con un código que debe completar y una carpeta solution
con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer
para ver lo siguiente:
También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:
Para verificarlo, ejecute gcloud auth list
en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:
Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:
El código del lab se divide en dos carpetas: 8a_Batch_Testing_Pipeline/lab y 8b_Stream_Testing_Pipeline/lab. Si en algún momento tienes problemas, puedes consultar la solución en las carpetas solution correspondientes.
En esta parte del lab, realizaremos pruebas de unidades en DoFn y PTransform para obtener estadísticas de procesamiento de una canalización por lotes a partir de sensores meteorológicos. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:
TestPipeline
.Create
para crear una PCollection
de tus datos de entrada.PCollection
de entrada y guarda la PCollection
resultante.PAssert
y sus subclases para verificar que la PCollection
resultante contenga los elementos que esperas.TestPipeline
es una clase especial incluida en el SDK de Beam específicamente para probar la lógica de las transformaciones y canalizaciones.
TestPipeline
en lugar de Pipeline
para crear el objeto de canalización:La transformación Create
toma una colección de objetos en la memoria (un iterable de Java) y crea una PCollection
a partir de ella. El objetivo es tener un pequeño conjunto de datos de entrada de prueba (de los cuales conozcamos la PCollection
resultante esperada) a partir de nuestras PTransforms
.
Por último, debemos comprobar que la PCollection resultante coincida con la salida prevista. Para verificarlo, usamos la clase PAssert
. Por ejemplo, podemos usar el método containsInAnyOrder
para verificar que la PCollection resultante tenga los elementos correctos:
Este directorio contiene un archivo pom.xml
para definir dependencias y la carpeta src
, que contiene dos subdirectorios. La carpeta src/main
contiene el código del paquete de canalización, y la carpeta src/test
contendrá el código de prueba.
Este archivo contiene la definición de la clase WeatherRecord
que usaremos en nuestra canalización. La clase WeatherRecord
tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema
deberían resultarte conocidos. Sin embargo, ten en cuenta que debemos anular el método equals
para definir la clase.
¿A qué se debe esto? PAssert
usará el método equals
para verificar la membresía en el resultado PCollection
. Sin embargo, el método equals predeterminado de un POJO (objetos antiguos y sin formato basados en Java) solo compara las direcciones de los objetos. Es importante que nos aseguremos de comparar el contenido de los objetos. Esto es fácil de hacer, como se muestra más arriba.
Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:
DoFn
ConvertCsvToWeatherRecord
(a partir de la línea 65) y ConvertTempUnits
(a partir de la línea 81). Realizaremos pruebas de unidades en estas DoFn
más adelante.PTransform
ComputeStatistics
(a partir de la línea 103). Este es un ejemplo de una transformación compuesta que podremos probar de la misma manera que una DoFn
.PTransform
WeatherStatsTransform
(a partir de la línea 123). Esta PTransform
contiene la lógica de procesamiento de toda nuestra canalización (menos las transformaciones de origen y receptor) para que podamos realizar una prueba de integración de canalización pequeña con datos sintéticos creados mediante una transformación Create
.Si observas un error lógico en el código de procesamiento, no lo corrijas todavía. Más adelante, veremos cómo identificarlo mediante pruebas.
Debemos agregar algunas dependencias para las pruebas. Cualquier código de Java de Beam para las pruebas debe vincularse en JUnit
y Hamcrest
. En Maven, solo necesitamos actualizar el archivo pom.xml
.
pom.xml
, en el lugar que se indica en un comentario:Ten en cuenta que el alcance de estas dependencias es “test”. Necesitaremos estos paquetes cuando ejecutemos una prueba con mvn test
, pero no cuando se ejecute la canalización principal.
Este archivo contiene el código de las pruebas de unidades de DoFn
y PTransform
. Por ahora, el código está generalmente marcado como comentario, pero quitaremos el comentario a medida que avancemos.
Comenzaremos explorando una prueba de unidades de DoFn
para nuestra ConvertCsvToWeatherRecord DoFn
(a partir de la línea 43).
TestPipeline
:Usaremos este objeto TestPipeline
en todas las siguientes pruebas, aunque no tendremos que preocuparnos por los efectos secundarios de reutilizar el mismo objeto debido a la palabra clave transient
cuando se crea el elemento.
Anotamos el método que usaremos para probar nuestra canalización con la anotación @Test
. Creamos una sola entrada de prueba (testInput
) que representa una línea de un archivo CSV (el formato de entrada esperado para nuestra canalización) y lo colocamos en un input
de un objeto List
.
Hay partes que faltan en el resto del código para la prueba.
Para completar esta tarea, primero agrega la transformación Create
y convierte input
en una PCollection
.
En segundo lugar, incluye una sentencia PAssert
con el método containsInAnyOrder
para comparar input
con testOutput
.
Si no puedes continuar, consulta las pruebas con comentarios más recientes o las soluciones.
Ya tenemos todo listo para ejecutar la prueba.
Si realizaste la tarea anterior correctamente, deberías ver lo siguiente en la terminal después de que se complete la prueba (el tiempo exacto transcurrido será distinto):
Esta prueba garantiza que la DoFn
ConvertTempUnits()
funcione según lo previsto.
Guarda WeatherStatisticsPipelineTest.java
y regresa a tu terminal.
Ejecuta nuevamente el siguiente comando para ejecutar las pruebas:
La prueba falló esta vez. Si nos desplazamos por el resultado, encontraremos la siguiente información sobre la prueba con errores:
A simple vista, podría parecer que este no es un mensaje de error útil. Sin embargo, podemos ver que no coincide el WeatherRecord
esperado en testOutput
. Tal vez haya un problema con la conversión de temperatura.
Regresa a 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java y desplázate hacia abajo, hasta la definición de ConvertTempUnits
(alrededor de la línea 81).
Para completar esta tarea, busca el error en la lógica de procesamiento de DoFn
y vuelve a ejecutar el comando mvn test
para confirmar que la prueba se complete correctamente ahora. A modo de recordatorio, esta es la fórmula para convertir grados Celsius en grados Fahrenheit:
Si no puedes continuar, consulta las soluciones.
La primera prueba, a la que acabamos de quitar el comentario, corresponde a la prueba de la PTransform
compuesta ComputeStatistics
. A modo de referencia, este es un código con formato truncado:
Ten en cuenta que esto es muy similar a las pruebas de unidades de DoFn
que realizamos anteriormente. La única diferencia real (aparte de las entradas y las salidas diferentes de las pruebas) es que aplicamos PTransform
en lugar de ParDo(new DoFn())
.
La prueba final es para la canalización de extremo a extremo. En el código de canalización (WeatherStatisticsPipeline.java), se incluyó toda la canalización de extremo a extremo (menos la fuente y el receptor) en una sola PTransform
WeatherStatsTransform
.
PTransform
:Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:
Haz clic en Revisar mi progreso para verificar el objetivo.
En esta parte del lab, realizaremos pruebas de unidades para una canalización de transmisión que procesa recuentos de los recorridos de un taxi mediante un sistema de ventanas. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:
TestPipeline
.TestStream
para generar datos de transmisión. Esto incluye generar una serie de eventos y avanzar la marca de agua y el tiempo de procesamiento.PAssert
y sus subclases para verificar que la PCollection
resultante contenga los elementos que esperas en ventanas específicas.Cuando se ejecuta una canalización que lee de TestStream
, la operación de lectura espera a que se completen todas las consecuencias de cada evento antes de pasar al siguiente, incluso cuando se avanza el tiempo de procesamiento y se activan los activadores correspondientes. TestStream
permite que el efecto de la activación y el retraso permitido se observen y prueben en una canalización. Esto incluye la lógica en torno a los activadores tardíos y los datos descartados debido a un retraso.
Este directorio contiene un archivo pom.xml
para definir dependencias y la carpeta src
, que contiene dos subdirectorios. La carpeta src/main
contiene el código del paquete de canalización, y la carpeta src/test
contendrá el código de prueba.
Este archivo contiene la definición de la clase TaxiRide
que usaremos en nuestra canalización. La clase TaxiRide
tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema
deberían resultarle conocidos.
Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:
DoFn
JsonToTaxiRide
(a partir de la línea 94) usada para convertir los mensajes entrantes de Pub/Sub en objetos de la clase TaxiRide
.PTransform
TaxiCountTransform
(a partir de la línea 113). Esta PTransform
contiene la lógica principal de recuento y sistema de ventanas de la canalización. Nuestras pruebas se enfocarán en esta PTransform
.El resultado de la TaxiCountTransform
debería ser un recuento de todos los recorridos registrados de taxi por ventana. Sin embargo, tendremos múltiples eventos por recorrido (partida, destino, etcétera).
ride_status
para garantizar que contemos cada recorrido solo una vez. Para ello, solo mantendremos los elementos cuyo ride_status
sea igual a “pickup”:Si lo analizamos un poco más en detalle, verás que se utilizó la siguiente lógica de renderización en ventanas en nuestra canalización:
Estableceremos grupos de ventanas fijas de 60 segundos de duración. No contamos con un activador anticipado, por lo que emitiremos los resultados después de que la marca de agua pase el final de la ventana. Incluiremos activaciones tardías con cada elemento nuevo que ingrese, pero con un retraso permitido de solo 1 minuto. Por último, acumularemos el estado en ventanas hasta que haya pasado el retraso permitido.
El primer objetivo será comprender el uso de TestStream
en nuestro código de prueba. Recuerda que la clase TestStream
nos permite simular un flujo de mensajes en tiempo real y controlar el progreso del tiempo de procesamiento y la marca de agua.
Este es el código de la primera prueba (a partir de la línea 66):
Creamos un TestStream
nuevo con el método create
y, al mismo tiempo, especificamos el codificador. Pasaremos el mensaje JSON como una string, por lo que podemos usar StringUtf8Coder
. ¿Qué hace el TestStream
anterior?
El TestStream
realiza las siguientes tareas:
startTime
(Instant(0)
)startTime
; se contarán solo dos de estos eventos (ride_status = "pickup"
)startTime
startTime
, lo que activa la primera ventanastartTime
TestStream
en lugar de la transformación Create
:En el código anterior, definimos nuestra PCollection
de salida (outputCount
) creando el TestStream
y aplicando la PTransform
TaxiCountTransform
. Usamos la clase InvervalWindow
para definir las ventanas que queremos verificar y, luego, usamos PAssert
con el método inWindow
para verificar los resultados por ventana.
Deberías ver el siguiente resultado después de la prueba (aunque el tiempo transcurrido puede variar):
En esta tarea, escribirás código para un TestStream
para probar la lógica relacionada con el manejo de los datos tardíos.
Regresa a 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java y desplázate hasta el punto en que el método testTaxiRideLateData
está marcado como comentario (alrededor de la línea 104).
Quita el comentario del código de esta prueba, ya que completaremos el código para esta tarea:
El código de la prueba se completará fuera de la creación de TestStream
.
TestStream
que realice las siguientes tareas:startTime
TimestampedValue
con el valor json.format(json, "pickup")
y la marca de tiempo startTime
startTime
TimestampedValue
con el valor json.format(json, "pickup")
y la marca de tiempo startTime
startTime
TimestampedValue
con el valor json.format(json, "pickup")
y la marca de tiempo startTime
Se creará un TestStream
con cuatro elementos que pertenecen a la primera ventana. Los primeros dos elementos son puntuales, el segundo está retrasado (pero dentro del retraso permitido) y el último está retrasado y supera el retraso permitido. Debido a que acumulamos paneles activados, el primer activador debe contar dos eventos, mientras que el activador final debe contar tres. No se debe incluir el cuarto evento. Podemos verificarlo con los métodos inOnTimePane
y inFinalPane
de la clase PAssert
.
Si no puedes continuar, consulta las soluciones.
Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:
Haz clic en Revisar mi progreso para verificar el objetivo.
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.
Este contenido no está disponible en este momento
Te enviaremos una notificación por correo electrónico cuando esté disponible
¡Genial!
Nos comunicaremos contigo por correo electrónico si está disponible
One lab at a time
Confirm to end all existing labs and start this one