arrow_back

Procesamiento de datos sin servidores con Dataflow: cómo realizar pruebas con Apache Beam (Java)

Acceder Unirse
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Procesamiento de datos sin servidores con Dataflow: cómo realizar pruebas con Apache Beam (Java)

Lab 1 hora 30 minutos universal_currency_alt 7 créditos show_chart Avanzado
info Es posible que este lab incorpore herramientas de IA para facilitar tu aprendizaje.
Pon a prueba tus conocimientos y compártelos con nuestra comunidad
done
Obtén acceso a más de 700 labs prácticos, insignias de habilidad y cursos

Descripción general

En este lab, aprenderás a hacer lo siguiente:

  • Escribir pruebas de unidades para DoFn y PTransform mediante las herramientas de prueba de Apache Beam
  • Realizar una prueba de integración de la canalización
  • Utilizar la clase TestStream para probar el comportamiento del sistema de ventanas en una canalización de transmisión

La prueba de tu canalización es un paso importante en el desarrollo de una solución de procesamiento de datos efectiva. Debido a la naturaleza indirecta del modelo de Beam, la depuración de ejecuciones con errores puede convertirse en una tarea importante.

En este lab, aprenderemos a realizar pruebas de unidades de forma local con herramientas del paquete de pruebas del SDK de Beam usando DirectRunner.

Configuración y requisitos

En cada lab, recibirá un proyecto de Google Cloud y un conjunto de recursos nuevos por tiempo limitado y sin costo adicional.

  1. Accede a Qwiklabs desde una ventana de incógnito.

  2. Ten en cuenta el tiempo de acceso del lab (por ejemplo, 1:15:00) y asegúrate de finalizarlo en el plazo asignado.
    No existe una función de pausa. Si lo necesita, puede reiniciar el lab, pero deberá hacerlo desde el comienzo.

  3. Cuando esté listo, haga clic en Comenzar lab.

  4. Anote las credenciales del lab (el nombre de usuario y la contraseña). Las usarás para acceder a la consola de Google Cloud.

  5. Haga clic en Abrir Google Console.

  6. Haga clic en Usar otra cuenta, copie las credenciales para este lab y péguelas en el mensaje emergente que aparece.
    Si usa otras credenciales, se generarán errores o incurrirá en cargos.

  7. Acepta las condiciones y omite la página de recursos de recuperación.

Verifica los permisos del proyecto

Antes de comenzar a trabajar en Google Cloud, asegúrate de que tu proyecto tenga los permisos correctos en Identity and Access Management (IAM).

  1. En la consola de Google Cloud, en el Menú de navegación (Ícono del menú de navegación), selecciona IAM y administración > IAM.

  2. Confirma que aparezca la cuenta de servicio predeterminada de Compute {número-del-proyecto}-compute@developer.gserviceaccount.com, y que tenga asignado el rol Editor. El prefijo de la cuenta es el número del proyecto, que puedes encontrar en el Menú de navegación > Descripción general de Cloud > Panel.

El nombre de la cuenta de servicio predeterminada de Compute Engine y el estado del editor destacados en la página de pestañas Permisos

Nota: Si la cuenta no aparece en IAM o no tiene asignado el rol Editor, sigue los pasos que se indican a continuación para asignar el rol necesario.
  1. En la consola de Google Cloud, en el Menú de navegación, haz clic en Descripción general de Cloud > Panel.
  2. Copia el número del proyecto (p. ej., 729328892908).
  3. En el Menú de navegación, selecciona IAM y administración > IAM.
  4. En la parte superior de la tabla de funciones, debajo de Ver por principales, haz clic en Otorgar acceso.
  5. En Principales nuevas, escribe lo siguiente:
{project-number}-compute@developer.gserviceaccount.com
  1. Reemplaza {número-del-proyecto} por el número de tu proyecto.
  2. En Rol, selecciona Proyecto (o Básico) > Editor.
  3. Haz clic en Guardar.

Configura el IDE

Para los fines de este lab, usará principalmente un IDE web de Theia alojado en Google Compute Engine. El IDE tiene el repositorio de labs clonado previamente. Se ofrece compatibilidad con el servidor de lenguaje Java y una terminal para el acceso programático a las API de Google Cloud mediante la herramienta de línea de comandos de gcloud, similar a Cloud Shell.

Para acceder al IDE de Theia, copie y pegue en una pestaña nueva el vínculo que se muestra en Qwiklabs.

NOTA: Es posible que deba esperar entre 3 y 5 minutos para que se aprovisione por completo el entorno, incluso después de que aparezca la URL. Hasta ese momento, se mostrará un error en el navegador.

ide_url

El repositorio del lab se clonó en su entorno. Cada lab se divide en una carpeta labs con un código que debe completar y una carpeta solution con un ejemplo viable que puede consultar como referencia si no sabe cómo continuar. Haga clic en el botón File Explorer para ver lo siguiente:

file_explorer

También puede crear varias terminales en este entorno, como lo haría con Cloud Shell:

new_terminal

Para verificarlo, ejecute gcloud auth list en la terminal con la que accedió como cuenta de servicio proporcionada, que tiene exactamente los mismos permisos que su cuenta de usuario del lab:

gcloud_auth

Si en algún momento su entorno deja de funcionar, intente restablecer la VM en la que se aloja el IDE desde la consola de GCE de la siguiente manera:

gce_reset

El código del lab se divide en dos carpetas: 8a_Batch_Testing_Pipeline/lab y 8b_Stream_Testing_Pipeline/lab. Si en algún momento tienes problemas, puedes consultar la solución en las carpetas solution correspondientes.

Parte 1 del lab: Realiza pruebas de unidades para DoFn y PTransform

En esta parte del lab, realizaremos pruebas de unidades en DoFn y PTransform para obtener estadísticas de procesamiento de una canalización por lotes a partir de sensores meteorológicos. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:

  • Crea una TestPipeline.
  • Crea algunos datos de entrada de prueba y usa la transformación Create para crear una PCollection de tus datos de entrada.
  • Aplica tu transformación a la PCollection de entrada y guarda la PCollection resultante.
  • Usa PAssert y sus subclases para verificar que la PCollection resultante contenga los elementos que esperas.

TestPipeline es una clase especial incluida en el SDK de Beam específicamente para probar la lógica de las transformaciones y canalizaciones.

  • Cuando realices pruebas, utiliza TestPipeline en lugar de Pipeline para crear el objeto de canalización:
TestPipeline p = TestPipeline.create();

La transformación Create toma una colección de objetos en la memoria (un iterable de Java) y crea una PCollection a partir de ella. El objetivo es tener un pequeño conjunto de datos de entrada de prueba (de los cuales conozcamos la PCollection resultante esperada) a partir de nuestras PTransforms.

List<String> input = Arrays.asList(testInput); // Código para crear una TestPipeline p outputPColl = p.apply(Create.of(input).apply(...);

Por último, debemos comprobar que la PCollection resultante coincida con la salida prevista. Para verificarlo, usamos la clase PAssert. Por ejemplo, podemos usar el método containsInAnyOrder para verificar que la PCollection resultante tenga los elementos correctos:

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

Tarea 1: Explora el código de canalización principal

  1. Navega a 8a_Batch_Testing_Pipeline/lab en el IDE.

Este directorio contiene un archivo pom.xml para definir dependencias y la carpeta src, que contiene dos subdirectorios. La carpeta src/main contiene el código del paquete de canalización, y la carpeta src/test contendrá el código de prueba.

  1. Primero, abre 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.

Este archivo contiene la definición de la clase WeatherRecord que usaremos en nuestra canalización. La clase WeatherRecord tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema deberían resultarte conocidos. Sin embargo, ten en cuenta que debemos anular el método equals para definir la clase.

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

¿A qué se debe esto? PAssert usará el método equals para verificar la membresía en el resultado PCollection. Sin embargo, el método equals predeterminado de un POJO (objetos antiguos y sin formato basados en Java) solo compara las direcciones de los objetos. Es importante que nos aseguremos de comparar el contenido de los objetos. Esto es fácil de hacer, como se muestra más arriba.

  1. Ahora abre 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.

Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:

  • Las DoFn ConvertCsvToWeatherRecord (a partir de la línea 65) y ConvertTempUnits (a partir de la línea 81). Realizaremos pruebas de unidades en estas DoFn más adelante.
  • La PTransform ComputeStatistics (a partir de la línea 103). Este es un ejemplo de una transformación compuesta que podremos probar de la misma manera que una DoFn.
  • La PTransform WeatherStatsTransform (a partir de la línea 123). Esta PTransform contiene la lógica de procesamiento de toda nuestra canalización (menos las transformaciones de origen y receptor) para que podamos realizar una prueba de integración de canalización pequeña con datos sintéticos creados mediante una transformación Create.

Si observas un error lógico en el código de procesamiento, no lo corrijas todavía. Más adelante, veremos cómo identificarlo mediante pruebas.

Tarea 2: Agrega dependencias para las pruebas

  1. Ahora abre 8a_Batch_Testing_Pipeline/lab/pom.xml.

Debemos agregar algunas dependencias para las pruebas. Cualquier código de Java de Beam para las pruebas debe vincularse en JUnit y Hamcrest. En Maven, solo necesitamos actualizar el archivo pom.xml.

  1. Para completar esta tarea, copia y pega el siguiente XML en el archivo pom.xml, en el lugar que se indica en un comentario:
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

Ten en cuenta que el alcance de estas dependencias es “test”. Necesitaremos estos paquetes cuando ejecutemos una prueba con mvn test, pero no cuando se ejecute la canalización principal.

Tarea 3: Escribe la primera prueba de unidades de DoFn en Apache Beam

  1. Ahora, navega a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.

Este archivo contiene el código de las pruebas de unidades de DoFn y PTransform. Por ahora, el código está generalmente marcado como comentario, pero quitaremos el comentario a medida que avancemos.

Comenzaremos explorando una prueba de unidades de DoFn para nuestra ConvertCsvToWeatherRecord DoFn (a partir de la línea 43).

  1. Primero, crearemos una clase para probar nuestra canalización y crearemos un objeto TestPipeline:
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

Usaremos este objeto TestPipeline en todas las siguientes pruebas, aunque no tendremos que preocuparnos por los efectos secundarios de reutilizar el mismo objeto debido a la palabra clave transient cuando se crea el elemento.

  1. Ahora, observa el código (incompleto) de nuestra primera prueba:
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* Create PCollection from in-memory object */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // Include PAssert statement to check for correct results p.run().waitUntilFinish(); }

Anotamos el método que usaremos para probar nuestra canalización con la anotación @Test. Creamos una sola entrada de prueba (testInput) que representa una línea de un archivo CSV (el formato de entrada esperado para nuestra canalización) y lo colocamos en un input de un objeto List.

Hay partes que faltan en el resto del código para la prueba.

  1. Para completar esta tarea, primero agrega la transformación Create y convierte input en una PCollection.

  2. En segundo lugar, incluye una sentencia PAssert con el método containsInAnyOrder para comparar input con testOutput.

Si no puedes continuar, consulta las pruebas con comentarios más recientes o las soluciones.

Tarea 4: Ejecuta la primera prueba de unidades de DoFn

  1. Si aún no lo has hecho, crea una terminal nueva en tu entorno de IDE y, luego, pega el siguiente comando:
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

Ya tenemos todo listo para ejecutar la prueba.

  1. Para ello, simplemente ejecuta el siguiente comando en la terminal:
mvn test

Si realizaste la tarea anterior correctamente, deberías ver lo siguiente en la terminal después de que se complete la prueba (el tiempo exacto transcurrido será distinto):

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Tarea 5: Ejecuta la segunda prueba de unidades de DoFn y la canalización de depuración

  1. Regresa a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java y quita el comentario del código de la segunda prueba de unidades (alrededor de las líneas 67 a 80). Para ello, destaca el código y presiona Ctrl + / (o Cmd + / en MacOS). A continuación, se muestra el código a modo de referencia:
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

Esta prueba garantiza que la DoFn ConvertTempUnits() funcione según lo previsto.

  1. Guarda WeatherStatisticsPipelineTest.java y regresa a tu terminal.

  2. Ejecuta nuevamente el siguiente comando para ejecutar las pruebas:

mvn test

La prueba falló esta vez. Si nos desplazamos por el resultado, encontraremos la siguiente información sobre la prueba con errores:

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>

A simple vista, podría parecer que este no es un mensaje de error útil. Sin embargo, podemos ver que no coincide el WeatherRecord esperado en testOutput. Tal vez haya un problema con la conversión de temperatura.

  1. Regresa a 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java y desplázate hacia abajo, hasta la definición de ConvertTempUnits (alrededor de la línea 81).

  2. Para completar esta tarea, busca el error en la lógica de procesamiento de DoFn y vuelve a ejecutar el comando mvn test para confirmar que la prueba se complete correctamente ahora. A modo de recordatorio, esta es la fórmula para convertir grados Celsius en grados Fahrenheit:

tempF = tempC * 1.8 + 32.0

Si no puedes continuar, consulta las soluciones.

Tarea 6: Ejecuta la prueba de unidades de PTransform y prueba la canalización de extremo a extremo

  1. Regresa a 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java y quita el comentario del código para las dos pruebas finales (a partir de la línea 84, aproximadamente).

La primera prueba, a la que acabamos de quitar el comentario, corresponde a la prueba de la PTransform compuesta ComputeStatistics. A modo de referencia, este es un código con formato truncado:

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //Definir las entradas de prueba (se omite aquí) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

Ten en cuenta que esto es muy similar a las pruebas de unidades de DoFn que realizamos anteriormente. La única diferencia real (aparte de las entradas y las salidas diferentes de las pruebas) es que aplicamos PTransform en lugar de ParDo(new DoFn()).

La prueba final es para la canalización de extremo a extremo. En el código de canalización (WeatherStatisticsPipeline.java), se incluyó toda la canalización de extremo a extremo (menos la fuente y el receptor) en una sola PTransform WeatherStatsTransform.

  1. Para probar la canalización de extremo a extremo, podemos repetir lo que hicimos anteriormente, pero con PTransform:
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //Define Testing Inputs (Omitted here) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. Ahora, regresa a la terminal y ejecuta el siguiente comando para ejecutar las pruebas una vez más:
mvn test

Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Haz clic en Revisar mi progreso para verificar el objetivo. Realizar pruebas de unidades para DoFn y PTransform

Parte 2 del lab: Prueba la lógica de procesamiento de transmisión con TestStream

En esta parte del lab, realizaremos pruebas de unidades para una canalización de transmisión que procesa recuentos de los recorridos de un taxi mediante un sistema de ventanas. Para probar las transformaciones que creaste, puedes usar el siguiente patrón y las transformaciones que proporciona Beam:

  • Crea una TestPipeline.
  • Utiliza la clase TestStream para generar datos de transmisión. Esto incluye generar una serie de eventos y avanzar la marca de agua y el tiempo de procesamiento.
  • Usa PAssert y sus subclases para verificar que la PCollection resultante contenga los elementos que esperas en ventanas específicas.

Cuando se ejecuta una canalización que lee de TestStream, la operación de lectura espera a que se completen todas las consecuencias de cada evento antes de pasar al siguiente, incluso cuando se avanza el tiempo de procesamiento y se activan los activadores correspondientes. TestStream permite que el efecto de la activación y el retraso permitido se observen y prueben en una canalización. Esto incluye la lógica en torno a los activadores tardíos y los datos descartados debido a un retraso.

Tarea 1: Explora el código de canalización principal

  1. Navega a 8b_Stream_Testing_Pipeline/lab en el IDE.

Este directorio contiene un archivo pom.xml para definir dependencias y la carpeta src, que contiene dos subdirectorios. La carpeta src/main contiene el código del paquete de canalización, y la carpeta src/test contendrá el código de prueba.

  1. Primero, abre 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.

Este archivo contiene la definición de la clase TaxiRide que usaremos en nuestra canalización. La clase TaxiRide tiene un esquema asociado, y los pasos para definir el esquema con la anotación @DefaultSchema deberían resultarle conocidos.

  1. Ahora abre 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.

Este es el código principal de nuestra canalización. Si bien la mayoría de los conceptos de esta canalización se abordaron en labs anteriores, asegúrate de explorar con mayor detalle los que se indican a continuación:

  • La DoFn JsonToTaxiRide (a partir de la línea 94) usada para convertir los mensajes entrantes de Pub/Sub en objetos de la clase TaxiRide.
  • La PTransform TaxiCountTransform (a partir de la línea 113). Esta PTransform contiene la lógica principal de recuento y sistema de ventanas de la canalización. Nuestras pruebas se enfocarán en esta PTransform.

El resultado de la TaxiCountTransform debería ser un recuento de todos los recorridos registrados de taxi por ventana. Sin embargo, tendremos múltiples eventos por recorrido (partida, destino, etcétera).

  1. Filtraremos por la propiedad ride_status para garantizar que contemos cada recorrido solo una vez. Para ello, solo mantendremos los elementos cuyo ride_status sea igual a “pickup”:
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

Si lo analizamos un poco más en detalle, verás que se utilizó la siguiente lógica de renderización en ventanas en nuestra canalización:

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

Estableceremos grupos de ventanas fijas de 60 segundos de duración. No contamos con un activador anticipado, por lo que emitiremos los resultados después de que la marca de agua pase el final de la ventana. Incluiremos activaciones tardías con cada elemento nuevo que ingrese, pero con un retraso permitido de solo 1 minuto. Por último, acumularemos el estado en ventanas hasta que haya pasado el retraso permitido.

Tarea 2: Explora el uso de TestStream y ejecuta la primera prueba

  1. Ahora abre 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.

El primer objetivo será comprender el uso de TestStream en nuestro código de prueba. Recuerda que la clase TestStream nos permite simular un flujo de mensajes en tiempo real y controlar el progreso del tiempo de procesamiento y la marca de agua.

Este es el código de la primera prueba (a partir de la línea 66):

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

Creamos un TestStream nuevo con el método create y, al mismo tiempo, especificamos el codificador. Pasaremos el mensaje JSON como una string, por lo que podemos usar StringUtf8Coder. ¿Qué hace el TestStream anterior?

El TestStream realiza las siguientes tareas:

  • Establecer la marca de agua inicial en la variable startTime (Instant(0))
  • Agregar tres elementos a la string con la marca de tiempo del evento startTime; se contarán solo dos de estos eventos (ride_status = "pickup")
  • Agregar otro evento “pickup”, pero con una marca de tiempo de evento correspondiente a un minuto después de startTime
  • Avanzar la marca de agua a un minuto después de startTime, lo que activa la primera ventana
  • Agregar otro evento “pickup”, pero con una marca de tiempo de evento de dos minutos después de startTime
  • Avanzar la marca de agua al “infinito”; esto hará que todas las ventanas se cierren y permitirá que los datos nuevos superen el retraso permitido
  1. El resto del código de la primera prueba es similar al ejemplo por lotes anterior, pero ahora usamos TestStream en lugar de la transformación Create:
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

En el código anterior, definimos nuestra PCollection de salida (outputCount) creando el TestStream y aplicando la PTransform TaxiCountTransform. Usamos la clase InvervalWindow para definir las ventanas que queremos verificar y, luego, usamos PAssert con el método inWindow para verificar los resultados por ventana.

  1. Ahora, regresa a la terminal en el IDE (o abre una nueva terminal) y ejecuta los siguientes comandos para cambiar la ubicación al directorio correcto y, luego, instalar las dependencias:
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Ahora ejecuta la prueba anterior con el siguiente comando:
mvn test

Deberías ver el siguiente resultado después de la prueba (aunque el tiempo transcurrido puede variar):

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

Tarea 3: Crea TestStream para probar el manejo de datos tardíos

En esta tarea, escribirás código para un TestStream para probar la lógica relacionada con el manejo de los datos tardíos.

  1. Regresa a 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java y desplázate hasta el punto en que el método testTaxiRideLateData está marcado como comentario (alrededor de la línea 104).

  2. Quita el comentario del código de esta prueba, ya que completaremos el código para esta tarea:

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

El código de la prueba se completará fuera de la creación de TestStream.

  1. Para completar esta tarea, crea un objeto TestStream que realice las siguientes tareas:
  • Avanzar la marca de agua a startTime
  • Agregar dos TimestampedValue con el valor json.format(json, "pickup") y la marca de tiempo startTime
  • Avanzar la marca de agua a un minuto después de startTime
  • Agregar otro TimestampedValue con el valor json.format(json, "pickup") y la marca de tiempo startTime
  • Avanzar la marca de agua a dos minutos después del startTime
  • Agregar otro TimestampedValue con el valor json.format(json, "pickup") y la marca de tiempo startTime
  • Avanzar la marca de agua al infinito

Se creará un TestStream con cuatro elementos que pertenecen a la primera ventana. Los primeros dos elementos son puntuales, el segundo está retrasado (pero dentro del retraso permitido) y el último está retrasado y supera el retraso permitido. Debido a que acumulamos paneles activados, el primer activador debe contar dos eventos, mientras que el activador final debe contar tres. No se debe incluir el cuarto evento. Podemos verificarlo con los métodos inOnTimePane y inFinalPane de la clase PAssert.

Si no puedes continuar, consulta las soluciones.

Tarea 4: Ejecuta pruebas para el manejo de datos tardíos

  • Ahora, regresa a la terminal y ejecuta el siguiente comando para realizar las pruebas una vez más:
mvn test

Si completaste correctamente las tareas anteriores, deberías ver lo siguiente en la terminal una vez que finalicen las pruebas:

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

Haz clic en Revisar mi progreso para verificar el objetivo. Probar la lógica de procesamiento de transmisión con TestStream

Finalice su lab

Cuando haya completado el lab, haga clic en Finalizar lab. Google Cloud Skills Boost quitará los recursos que usó y limpiará la cuenta.

Tendrá la oportunidad de calificar su experiencia en el lab. Seleccione la cantidad de estrellas que corresponda, ingrese un comentario y haga clic en Enviar.

La cantidad de estrellas indica lo siguiente:

  • 1 estrella = Muy insatisfecho
  • 2 estrellas = Insatisfecho
  • 3 estrellas = Neutral
  • 4 estrellas = Satisfecho
  • 5 estrellas = Muy satisfecho

Puede cerrar el cuadro de diálogo si no desea proporcionar comentarios.

Para enviar comentarios, sugerencias o correcciones, use la pestaña Asistencia.

Copyright 2020 Google LLC. All rights reserved. Google y el logotipo de Google son marcas de Google LLC. Los demás nombres de productos y empresas pueden ser marcas de las respectivas empresas a las que estén asociados.

Este contenido no está disponible en este momento

Te enviaremos una notificación por correo electrónico cuando esté disponible

¡Genial!

Nos comunicaremos contigo por correo electrónico si está disponible