Cómo dockericé Apache Flink, Kafka y PostgreSQL para la transmisión de datos en tiempo real |  de Augusto de Nevrezé |  junio de 2024

Me embarqué en la misión de integrar Apache Flink con Kafka y PostgreSQL usando Docker. Lo que hace que este esfuerzo sea particularmente emocionante es el uso de pyFlink, la versión Python de Flink, que es poderoso y relativamente raro. Esta configuración tiene como objetivo manejar de manera eficiente el procesamiento y almacenamiento de datos en tiempo real. En las siguientes secciones, demostraré cómo lo logré, analizando los desafíos encontrados y cómo los superé. Concluiré con una guía paso a paso para que usted mismo pueda crear y experimentar con este canal de transmisión.

La infraestructura que construiremos se ilustra a continuación. Externamente, hay un módulo de publicación que simula mensajes de sensores de IoT, similar a lo que se discutió en un Publicación anterior. Dentro del contenedor Docker, crearemos dos temas de Kafka. El primer tema, sensores, almacenará los mensajes entrantes de los dispositivos IoT en tiempo real. Luego, una aplicación Flink consumirá los mensajes de este tema, filtrará aquellos con temperaturas superiores a 30 °C y los publicará en un segundo tema. alertas. Además, la aplicación Flink insertará los mensajes consumidos en una tabla PostgreSQL creada específicamente para este propósito. Esta configuración nos permite conservar los datos de los sensores en un formato tabular estructurado, lo que brinda oportunidades para una mayor transformación y análisis. Se pueden conectar herramientas de visualización como Tableau o Power BI a estos datos para realizar gráficos y paneles en tiempo real.

Además, otros clientes pueden consumir el tema de alertas para iniciar acciones basadas en los mensajes que contiene, como activar sistemas de aire acondicionado o activar protocolos de seguridad contra incendios.

Servicios incluidos en el contenedor Docker – imagen del autor

Para seguir el tutorial, puedes clonar lo siguiente repositorio. Se coloca un docker-compose.yml en la raíz del proyecto para que pueda inicializar la aplicación de contenedores múltiples. Además, puede encontrar instrucciones detalladas en el archivo README.

Problemas con los puertos Kafka en docker-compose.yml

Inicialmente, encontré problemas con la configuración del puerto de Kafka cuando usaba la imagen confluente de Kafka Docker, una opción popular para este tipo de configuraciones. Este problema se hizo evidente a través de los registros, enfatizando la importancia de no ejecutar docker-compose up en modo independiente (-d) durante las fases iniciales de configuración y solución de problemas.

El motivo del fallo fue que los hosts internos y externos estaban usando el mismo puerto, lo que provocó problemas de conectividad. Solucioné esto cambiando el puerto interno a 19092. Encontré este publicación de blog bastante esclarecedora.

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092

Configurar Flink en modo sesión

Para ejecutar Flink en modo de sesión (permitiendo múltiples trabajos en un solo clúster), estoy usando las siguientes directivas en docker-compose.yml.

Imagen de Docker personalizada para PyFlink

Dadas las limitaciones de la imagen predeterminada de Apache Flink Docker, que no incluye soporte para Python, creé una imagen de Docker personalizada para pyFlink. Esta imagen personalizada garantiza que Flink pueda ejecutar trabajos de Python e incluye las dependencias necesarias para la integración con Kafka y PostgreSQL. El Dockerfile utilizado para esto se encuentra en el subdirectorio pyflink.

  1. Imagen base: Empezamos con la imagen oficial de Flink.
  2. Instalación de Python: Python y pip están instalados, actualizando pip a la última versión.
  3. Gestión de dependencias: Las dependencias se instalan a través de requisitos.txt. Alternativamente, se comentan líneas para demostrar cómo instalar manualmente dependencias desde archivos locales, lo que resulta útil para la implementación en entornos sin acceso a Internet.
  4. Bibliotecas de conectores: Los conectores para Kafka y PostgreSQL se descargan directamente en el directorio lib de Flink. Esto permite a Flink interactuar con Kafka y PostgreSQL durante la ejecución del trabajo.
  5. Copia de guiones: Los scripts del repositorio se copian en el directorio /opt/flink para que los ejecute el administrador de tareas de Flink.

Con esta imagen de Docker personalizada, garantizamos que pyFlink pueda ejecutarse correctamente dentro del contenedor de Docker, equipado con las bibliotecas necesarias para interactuar con Kafka y PostgreSQL sin problemas. Este enfoque proporciona flexibilidad y es adecuado tanto para entornos de desarrollo como de producción.

Nota: Asegúrese de que cualquier consideración de red o seguridad para descargar conectores y otras dependencias se aborden de acuerdo con las políticas de su entorno de implementación.

Integrando PostgreSQL

Para conectar Apache Flink a la base de datos PostgreSQL, se requiere un conector JDBC adecuado. La imagen de Docker personalizada para pyFlink descarga el conector JDBC para PostgreSQL, que es compatible con PostgreSQL 16.

Para simplificar este proceso, se incluye un script download_libs.sh en el repositorio, que refleja las acciones realizadas en el contenedor Flink Docker. Este script automatiza la descarga de las bibliotecas necesarias, garantizando la coherencia entre Docker y los entornos locales.

Nota: Los conectores suelen tener dos versiones. En este caso particular, como estoy usando Flink 1.18, la última versión estable disponible, descargué 3.1.2–1.18. Supongo que la primera versión rastrea la implementación de JDBC para varias bases de datos. Están disponibles en el directorio maven.

env.add_jars(
f"file://{current_dir}/flink-connector-jdbc-3.1.2–1.18.jar",
f"file://{current_dir}/postgresql-42.7.3.jar"
)

Definición del sumidero JDBC

En nuestra tarea de Flink, hay una función crucial llamada configure_postgre_sink ubicada en el archivo usr_jobs/postgres_sink.py. Esta función es responsable de configurar un receptor PostgreSQL genérico. Para utilizarlo de forma eficaz, debe proporcionar la declaración del lenguaje de manipulación de datos (DML) SQL y los tipos de valores correspondientes. Los tipos utilizados en los datos de transmisión se definen como TYPE_INFO… me tomó un tiempo encontrar la declaración correcta 😅.

Observe también que JdbcSink tiene un parámetro opcional para definir ExecutionOptions. Para este caso particular, usaré un intervalo de actualización de 1 segundo y limitaré la cantidad de filas a 200. Puede encontrar más información en documentación oficial. Sí, lo has adivinado, dado que estoy definiendo un intervalo, esto puede considerarse un ETL de microlotes. Sin embargo, debido al paralelismo de Flink, puede manejar múltiples transmisiones a la vez en un script simple que, al mismo tiempo, es fácil de seguir.

Nota: No olvide crear la tabla raw_sensors_data en Postgres, donde se recibirán los datos sin procesar provenientes de los sensores de IoT. Esto se trata en la guía paso a paso en las secciones siguientes.

Hundir datos a Kafka

He cubierto cómo consumir datos de un tema de Kafka en una discusión previa. Sin embargo, todavía no he configurado un receptor y eso es lo que haremos. La configuración tiene algunas complejidades y está definida en una función, de manera similar al receptor de Postgres. Además, debe definir el tipo de flujo de datos antes de enviarlo a Kafka. Observe que la secuencia alarms_data se convierte correctamente como una cadena con output_type=Types.STRING() antes de enviarla a Kafka, ya que declaré el serializador como SimpleStringSchema().

Le mostraré cómo recuperar datos del tema de alertas en los siguientes pasos.

Configuración local o en contenedores

Una de las mejores cosas de esta configuración de Docker es que puede ejecutar Flink desde local o dentro del contenedor como una tarea administrada. La configuración local de Flink se muestra en la siguiente figura, donde puede ver nuestra aplicación Flink separada del contenedor acoplable. Esto puede ayudar a solucionar problemas de Flink, que no tiene un buen conjunto de herramientas de observabilidad nativas. En realidad, nos gustaría intentarlo datorios herramientas para Flink, son muy prometedoras para fines de monitoreo.

Ejecutar aplicaciones Flink en local con otros servicios ejecutándose dentro del contenedor – imagen del autor

Si desea probar la aplicación Flink localmente, debe definir correctamente los hosts y puertos utilizados por el script, que en realidad son dos constantes en el archivo usr_jobs/postgres_sink.py:

Para ejecutar contenedores, utilice:

KAFKA_HOST = "kafka:19092"
POSTGRES_HOST = "postgres:5432"

Para ejecución local, utilice:

KAFKA_HOST = "localhost:9092"
POSTGRES_HOST = "localhost:5432"

De forma predeterminada, el repositorio configura la aplicación Flink para que se ejecute dentro del contenedor. Puede monitorear los trabajos que se ejecutan usando la interfaz de usuario web, accediendo desde http://localhost:8081. No podrá verlo si elige ejecutar el trabajo localmente.

Captura de pantalla de la interfaz de usuario web de Flink con el trabajo en ejecución: imagen del autor

Nota: Si ejecuta el trabajo localmente, debe instalar las dependencias de Flink ubicadas en requisitos.txt. También se proporciona un archivo pyproject.toml si desea configurar el entorno con poesía.