Puntos de control
Perform Unit Tests for DoFns and PTransforms
/ 10
Test Stream Processing Logic with TestStream
/ 10
Procesamiento de datos sin servidores con Dataflow: cómo realizar pruebas con Apache Beam (Java)
- Descripción general
- Configuración y requisitos
- Parte 1 del lab: Realiza pruebas de unidades para DoFn y PTransform
- Tarea 1: Explora el código de canalización principal
- Tarea 2: Agrega dependencias para las pruebas
- Tarea 3: Escribe la primera prueba de unidades de DoFn en Apache Beam
- Tarea 4: Ejecuta la primera prueba de unidades de DoFn
- Tarea 5: Ejecuta la segunda prueba de unidades de DoFn y la canalización de depuración
- Tarea 6: Ejecuta la prueba de unidades de PTransform y prueba la canalización de extremo a extremo
- Parte 2 del lab: Prueba la lógica de procesamiento de transmisión con TestStream
- Tarea 1: Explora el código de canalización principal
- Tarea 2: Explora el uso de TestStream y ejecuta la primera prueba
- Tarea 3: Crea TestStream para probar el manejo de datos tardíos
- Tarea 4: Ejecuta pruebas para el manejo de datos tardíos
- Finalice su lab
Descripción general
En este lab, aprenderás a hacer lo siguiente:
- Escribir pruebas de unidades para
DoFn
yPTransform
mediante las herramientas de prueba de Apache Beam - Realizar una prueba de integración de la canalización
- Utilizar la clase
TestStream
para probar el comportamiento del sistema de ventanas en una canalización de transmisión
La prueba de tu canalización es un paso importante en el desarrollo de una solución de procesamiento de datos efectiva. Debido a la naturaleza indirecta del modelo de Beam, la depuración de ejecuciones con errores puede convertirse en una tarea importante.
En este lab, aprenderemos a realizar pruebas de unidades de forma local con herramientas del paquete de pruebas del SDK de Beam usando DirectRunner
.
Configuración y requisitos
En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.
-
Accede a Qwiklabs desde una ventana de incógnito.
-
Ten en cuenta el tiempo de acceso del lab (por ejemplo,
1:15:00
) y asegúrate de finalizarlo en el plazo asignado.
No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo. -
Cuando esté listo, haga clic en Comenzar lab.
-
Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.
-
Haga clic en Abrir Google Console.
-
Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
Si usa otras credenciales, se generarán errores o incurrirá en cargos. -
Acepta las condiciones y omite la página de recursos de recuperación.
Verifica los permisos del proyecto
Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).
-
En la consola de Google Cloud, en el Menú de navegación (), selecciona IAM y administración > IAM.
-
Confirma que aparezca la cuenta de servicio predeterminada de Compute
{número-del-proyecto}-compute@developer.gserviceaccount.com
, y que tenga asignado el rolEditor
. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.
Editor
, sigue los pasos que se indican a continuación para asignar el rol necesario.- En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
- Copia el número del proyecto (p. ej.,
729328892908
). - En el Menú de navegación, selecciona IAM y administración > IAM.
- En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
- En Principales nuevas, escribe lo siguiente:
- Reemplaza
{número-del-proyecto}
por el número de tu proyecto. - En Rol, selecciona Proyecto (o Básico) > Editor.
- Haz clic en Guardar.
Configura el IDE
Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud
, similar a Cloud Shell.
Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.
El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs
con un código que debe completar y una carpeta solution
con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer
para ver lo siguiente:
También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:
Para verificarlo, ejecute gcloud auth list
en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:
Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:
El código del lab se divide en dos carpetas: 8a_Batch_Testing_Pipeline/lab y 8b_Stream_Testing_Pipeline/lab. Si en algún momento tienes problemas, puedes consultar la solución en las carpetas solution correspondientes.
Parte 1 del lab: Realiza pruebas de unidades para DoFn y PTransform
En esta parte del lab, realizaremos pruebas de unidades en DoFn y PTransform para obtener estadísticas de procesamiento de una canalización por lotes a partir de sensores meteorológicos. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:
- Crea una
TestPipeline
. - Crea algunos datos de entrada de prueba y usa la transformación
Create
para crear unaPCollection
de tus datos de entrada. - Aplica tu transformación a la
PCollection
de entrada y guarda laPCollection
resultante. - Usa
PAssert
y sus subclases para verificar que laPCollection
resultante contenga los elementos que esperas.
TestPipeline
es una clase especial incluida en el SDK de Beam específicamente para probar la lógica de las transformaciones y canalizaciones.
- Cuando realices pruebas, utiliza
TestPipeline
en lugar dePipeline
para crear el objeto de canalización:
La transformación Create
toma una colección de objetos en la memoria (un iterable de Java) y crea una PCollection
a partir de ella. El objetivo es tener un pequeño conjunto de datos de entrada de prueba (de los cuales conozcamos la PCollection
resultante esperada) a partir de nuestras PTransforms
.
Por último, debemos comprobar que la PCollection resultante coincida con la salida prevista. Para verificarlo, usamos la clase PAssert
. Por ejemplo, podemos usar el método containsInAnyOrder
para verificar que la PCollection resultante tenga los elementos correctos:
Tarea 1: Explora el código de canalización principal
- Navega a 8a_Batch_Testing_Pipeline/lab en el IDE.
Este directorio contiene un archivo pom.xml
para definir dependencias y la carpeta src
, que contiene dos subdirectorios. La carpeta src/main
contiene el código del paquete de canalización, y la carpeta src/test
contendrá el código de prueba.
- Primero, abre 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.
Este archivo contiene la definición de la clase WeatherRecord
que usaremos en nuestra canalización. La clase WeatherRecord
tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema
deberían resultarte conocidos. Sin embargo, ten en cuenta que debemos anular el método equals
para definir la clase.
¿A qué se debe esto? PAssert
usará el método equals
para verificar la membresía en el resultado PCollection
. Sin embargo, el método equals predeterminado de un POJO (objetos antiguos y sin formato basados en Java) solo compara las direcciones de los objetos. Es importante que nos aseguremos de comparar el contenido de los objetos. Esto es fácil de hacer, como se muestra más arriba.
- Ahora abre 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.
Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:
- Las
DoFn
ConvertCsvToWeatherRecord
(a partir de la línea 65) yConvertTempUnits
(a partir de la línea 81). Realizaremos pruebas de unidades en estasDoFn
más adelante. - La
PTransform
ComputeStatistics
(a partir de la línea 103). Este es un ejemplo de una transformación compuesta que podremos probar de la misma manera que unaDoFn
. - La
PTransform
WeatherStatsTransform
(a partir de la línea 123). EstaPTransform
contiene la lógica de procesamiento de toda nuestra canalización (menos las transformaciones de origen y receptor) para que podamos realizar una prueba de integración de canalización pequeña con datos sintéticos creados mediante una transformaciónCreate
.
Si observas un error lógico en el código de procesamiento, no lo corrijas todavía. Más adelante, veremos cómo identificarlo mediante pruebas.
Tarea 2: Agrega dependencias para las pruebas
- Ahora abre 8a_Batch_Testing_Pipeline/lab/pom.xml.
Debemos agregar algunas dependencias para las pruebas. Cualquier código de Java de Beam para las pruebas debe vincularse en JUnit
y Hamcrest
. En Maven, solo necesitamos actualizar el archivo pom.xml
.
- Para completar esta tarea, copia y pega el siguiente XML en el archivo
pom.xml
, en el lugar que se indica en un comentario:
Ten en cuenta que el alcance de estas dependencias es “test”. Necesitaremos estos paquetes cuando ejecutemos una prueba con mvn test
, pero no cuando se ejecute la canalización principal.
Tarea 3: Escribe la primera prueba de unidades de DoFn en Apache Beam
- Ahora, navega a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.
Este archivo contiene el código de las pruebas de unidades de DoFn
y PTransform
. Por ahora, el código está generalmente marcado como comentario, pero quitaremos el comentario a medida que avancemos.
Comenzaremos explorando una prueba de unidades de DoFn
para nuestra ConvertCsvToWeatherRecord DoFn
(a partir de la línea 43).
- Primero, crearemos una clase para probar nuestra canalización y crearemos un objeto
TestPipeline
:
Usaremos este objeto TestPipeline
en todas las siguientes pruebas, aunque no tendremos que preocuparnos por los efectos secundarios de reutilizar el mismo objeto debido a la palabra clave transient
cuando se crea el elemento.
- Ahora, observa el código (incompleto) de nuestra primera prueba:
Anotamos el método que usaremos para probar nuestra canalización con la anotación @Test
. Creamos una sola entrada de prueba (testInput
) que representa una línea de un archivo CSV (el formato de entrada esperado para nuestra canalización) y lo colocamos en un input
de un objeto List
.
Hay partes que faltan en el resto del código para la prueba.
-
Para completar esta tarea, primero agrega la transformación
Create
y convierteinput
en unaPCollection
. -
En segundo lugar, incluye una sentencia
PAssert
con el métodocontainsInAnyOrder
para compararinput
contestOutput
.
Si no puedes continuar, consulta las pruebas con comentarios más recientes o las soluciones.
Tarea 4: Ejecuta la primera prueba de unidades de DoFn
- Si aún no lo has hecho, crea una terminal nueva en tu entorno de IDE y, luego, pega el siguiente comando:
Ya tenemos todo listo para ejecutar la prueba.
- Para ello, simplemente ejecuta el siguiente comando en la terminal:
Si realizaste la tarea anterior correctamente, deberías ver lo siguiente en la terminal después de que se complete la prueba (el tiempo exacto transcurrido será distinto):
Tarea 5: Ejecuta la segunda prueba de unidades de DoFn y la canalización de depuración
- Regresa a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java y quita el comentario del código de la segunda prueba de unidades (alrededor de las líneas 67 a 80). Para ello, destaca el código y presiona Ctrl + / (o Cmd + / en MacOS). A continuación, se muestra el código a modo de referencia:
Esta prueba garantiza que la DoFn
ConvertTempUnits()
funcione según lo previsto.
-
Guarda
WeatherStatisticsPipelineTest.java
y regresa a tu terminal. -
Ejecuta nuevamente el siguiente comando para ejecutar las pruebas:
La prueba falló esta vez. Si nos desplazamos por el resultado, encontraremos la siguiente información sobre la prueba con errores:
A simple vista, podría parecer que este no es un mensaje de error útil. Sin embargo, podemos ver que no coincide el WeatherRecord
esperado en testOutput
. Tal vez haya un problema con la conversión de temperatura.
-
Regresa a 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java y desplázate hacia abajo, hasta la definición de
ConvertTempUnits
(alrededor de la línea 81). -
Para completar esta tarea, busca el error en la lógica de procesamiento de
DoFn
y vuelve a ejecutar el comandomvn test
para confirmar que la prueba se complete correctamente ahora. A modo de recordatorio, esta es la fórmula para convertir grados Celsius en grados Fahrenheit:
Si no puedes continuar, consulta las soluciones.
Tarea 6: Ejecuta la prueba de unidades de PTransform y prueba la canalización de extremo a extremo
- Regresa a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java y quita el comentario del código para las dos pruebas finales (a partir de la línea 84, aproximadamente).
La primera prueba, a la que acabamos de quitar el comentario, corresponde a la prueba de la PTransform
compuesta ComputeStatistics
. A modo de referencia, este es un código con formato truncado:
Ten en cuenta que esto es muy similar a las pruebas de unidades de DoFn
que realizamos anteriormente. La única diferencia real (aparte de las entradas y las salidas diferentes de las pruebas) es que aplicamos PTransform
en lugar de ParDo(new DoFn())
.
La prueba final es para la canalización de extremo a extremo. En el código de canalización (WeatherStatisticsPipeline.java), se incluyó toda la canalización de extremo a extremo (menos la fuente y el receptor) en una sola PTransform
WeatherStatsTransform
.
- Para probar la canalización de extremo a extremo, podemos repetir lo que hicimos anteriormente, pero con
PTransform
:
- Ahora, regresa a la terminal y ejecuta el siguiente comando para ejecutar las pruebas una vez más:
Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:
Haz clic en Revisar mi progreso para verificar el objetivo.
Parte 2 del lab: Prueba la lógica de procesamiento de transmisión con TestStream
En esta parte del lab, realizaremos pruebas de unidades para una canalización de transmisión que procesa recuentos de los recorridos de un taxi mediante un sistema de ventanas. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:
- Crea una
TestPipeline
. - Utiliza la clase
TestStream
para generar datos de transmisión. Esto incluye generar una serie de eventos y avanzar la marca de agua y el tiempo de procesamiento. - Usa
PAssert
y sus subclases para verificar que laPCollection
resultante contenga los elementos que esperas en ventanas específicas.
Cuando se ejecuta una canalización que lee de TestStream
, la operación de lectura espera a que se completen todas las consecuencias de cada evento antes de pasar al siguiente, incluso cuando se avanza el tiempo de procesamiento y se activan los activadores correspondientes. TestStream
permite que el efecto de la activación y el retraso permitido se observen y prueben en una canalización. Esto incluye la lógica en torno a los activadores tardíos y los datos descartados debido a un retraso.
Tarea 1: Explora el código de canalización principal
- Navega a 8b_Stream_Testing_Pipeline/lab en el IDE.
Este directorio contiene un archivo pom.xml
para definir dependencias y la carpeta src
, que contiene dos subdirectorios. La carpeta src/main
contiene el código del paquete de canalización, y la carpeta src/test
contendrá el código de prueba.
- Primero, abre 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.
Este archivo contiene la definición de la clase TaxiRide
que usaremos en nuestra canalización. La clase TaxiRide
tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema
deberían resultarle conocidos.
- Ahora abre 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.
Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:
- La
DoFn
JsonToTaxiRide
(a partir de la línea 94) usada para convertir los mensajes entrantes de Pub/Sub en objetos de la claseTaxiRide
. - La
PTransform
TaxiCountTransform
(a partir de la línea 113). EstaPTransform
contiene la lógica principal de recuento y sistema de ventanas de la canalización. Nuestras pruebas se enfocarán en estaPTransform
.
El resultado de la TaxiCountTransform
debería ser un recuento de todos los recorridos registrados de taxi por ventana. Sin embargo, tendremos múltiples eventos por recorrido (partida, destino, etcétera).
- Filtraremos por la propiedad
ride_status
para garantizar que contemos cada recorrido solo una vez. Para ello, solo mantendremos los elementos cuyoride_status
sea igual a “pickup”:
Si lo analizamos un poco más en detalle, verás que se utilizó la siguiente lógica de renderización en ventanas en nuestra canalización:
Estableceremos grupos de ventanas fijas de 60 segundos de duración. No contamos con un activador anticipado, por lo que emitiremos los resultados después de que la marca de agua pase el final de la ventana. Incluiremos activaciones tardías con cada elemento nuevo que ingrese, pero con un retraso permitido de solo 1 minuto. Por último, acumularemos el estado en ventanas hasta que haya pasado el retraso permitido.
Tarea 2: Explora el uso de TestStream y ejecuta la primera prueba
- Ahora abre 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.
El primer objetivo será comprender el uso de TestStream
en nuestro código de prueba. Recuerda que la clase TestStream
nos permite simular un flujo de mensajes en tiempo real y controlar el progreso del tiempo de procesamiento y la marca de agua.
Este es el código de la primera prueba (a partir de la línea 66):
Creamos un TestStream
nuevo con el método create
y, al mismo tiempo, especificamos el codificador. Pasaremos el mensaje JSON como una string, por lo que podemos usar StringUtf8Coder
. ¿Qué hace el TestStream
anterior?
El TestStream
realiza las siguientes tareas:
- Establecer la marca de agua inicial en la variable
startTime
(Instant(0)
) - Agregar tres elementos a la string con la marca de tiempo del evento
startTime
; se contarán solo dos de estos eventos (ride_status = "pickup"
) - Agregar otro evento “pickup”, pero con una marca de tiempo de evento correspondiente a un minuto después de
startTime
- Avanzar la marca de agua a un minuto después de
startTime
, lo que activa la primera ventana - Agregar otro evento “pickup”, pero con una marca de tiempo de evento de dos minutos después de
startTime
- Avanzar la marca de agua al “infinito”; esto hará que todas las ventanas se cierren y permitirá que los datos nuevos superen el retraso permitido
- El resto del código de la primera prueba es similar al ejemplo por lotes anterior, pero ahora usamos
TestStream
en lugar de la transformaciónCreate
:
En el código anterior, definimos nuestra PCollection
de salida (outputCount
) creando el TestStream
y aplicando la PTransform
TaxiCountTransform
. Usamos la clase InvervalWindow
para definir las ventanas que queremos verificar y, luego, usamos PAssert
con el método inWindow
para verificar los resultados por ventana.
- Ahora, regresa a la terminal en el IDE (o abre una nueva terminal) y ejecuta los siguientes comandos para cambiar la ubicación al directorio correcto y, luego, instalar las dependencias:
- Ahora ejecuta la prueba anterior con el siguiente comando:
Deberías ver el siguiente resultado después de la prueba (aunque el tiempo transcurrido puede variar):
Tarea 3: Crea TestStream para probar el manejo de datos tardíos
En esta tarea, escribirás código para un TestStream
para probar la lógica relacionada con el manejo de los datos tardíos.
-
Regresa a 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java y desplázate hasta el punto en que el método
testTaxiRideLateData
está marcado como comentario (alrededor de la línea 104). -
Quita el comentario del código de esta prueba, ya que completaremos el código para esta tarea:
El código de la prueba se completará fuera de la creación de TestStream
.
- Para completar esta tarea, crea un objeto
TestStream
que realice las siguientes tareas:
- Avanzar la marca de agua a
startTime
- Agregar dos
TimestampedValue
con el valorjson.format(json, "pickup")
y la marca de tiempostartTime
- Avanzar la marca de agua a un minuto después de
startTime
- Agregar otro
TimestampedValue
con el valorjson.format(json, "pickup")
y la marca de tiempostartTime
- Avanzar la marca de agua a dos minutos después del
startTime
- Agregar otro
TimestampedValue
con el valorjson.format(json, "pickup")
y la marca de tiempostartTime
- Avanzar la marca de agua al infinito
Se creará un TestStream
con cuatro elementos que pertenecen a la primera ventana. Los primeros dos elementos son puntuales, el segundo está retrasado (pero dentro del retraso permitido) y el último está retrasado y supera el retraso permitido. Debido a que acumulamos paneles activados, el primer activador debe contar dos eventos, mientras que el activador final debe contar tres. No se debe incluir el cuarto evento. Podemos verificarlo con los métodos inOnTimePane
y inFinalPane
de la clase PAssert
.
Si no puedes continuar, consulta las soluciones.
Tarea 4: Ejecuta pruebas para el manejo de datos tardíos
- Ahora, regresa a la terminal y ejecuta el siguiente comando para realizar las pruebas una vez más:
Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:
Haz clic en Revisar mi progreso para verificar el objetivo.
Finalice su lab
Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.
Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.
La cantidad de estrellas indica lo siguiente:
- 1 estrella = Muy insatisfecho
- 2 estrellas = Insatisfecho
- 3 estrellas = Neutral
- 4 estrellas = Satisfecho
- 5 estrellas = Muy satisfecho
Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.
Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.
Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.