Una implementación de codificación para crear una canalización unificada de Apache Beam que demuestra el procesamiento por lotes y transmisiones con ventanas de tiempo de eventos mediante DirectRunner

En este tutorial, demostramos cómo crear una canalización de Apache Beam unificada que funcione perfectamente tanto en modo por lotes como en modo de transmisión utilizando DirectRunner. Generamos datos sintéticos que tienen en cuenta la hora del evento y aplicamos ventanas fijas con activadores y retrasos permitidos para demostrar cómo Apache Beam maneja consistentemente eventos puntuales y tardíos. Al cambiar solo la fuente de entrada, mantenemos idéntica la lógica de agregación central, lo que nos ayuda a comprender claramente cómo se comportan el modelo de tiempo de eventos, las ventanas y los paneles de Beam sin depender de una infraestructura de transmisión externa. Consulta los CÓDIGOS COMPLETOS aquí.

!pip -q install -U “grpcio>=1.71.2” “grpcio-status>=1.71.2” !pip -q install -U apache-beam crcmod importar apache_beam como haz desde apache_beam.options.pipeline_options importar PipelineOptions, StandardOptions desde apache_beam.transforms.window importar FixWindows desde apache_beam.transforms.trigger importar AfterWatermark, AfterProcessingTime, AccumulationMode desde apache_beam.testing.test_stream importar TestStream importar json desde fecha y hora importar fecha y hora, zona horaria

Instalamos las dependencias requeridas y aseguramos la compatibilidad de versiones para que Apache Beam. Importamos las API principales de Beam junto con las ventanas, los activadores y las utilidades TestStream que se necesitarán más adelante en el proceso. También incorporamos módulos estándar de Python para el manejo del tiempo y el formato JSON. Consulta los CÓDIGOS COMPLETOS aquí.

MODE = “stream” WINDOW_SIZE_SECS = 60 ALLOWED_LATENESS_SECS = 120 def make_event(user_id, event_type, cantidad, event_time_epoch_s): return {“user_id”: user_id, “event_type”: event_type, “cantidad”: float(cantidad), “event_time”: int(event_time_epoch_s)} base = datetime.now(timezone.utc).replace(microsegundo=0) t0 = int(base.timestamp()) BATCH_EVENTS = [
make_event(“u1”, “purchase”, 20, t0 + 5),
make_event(“u1”, “purchase”, 15, t0 + 20),
make_event(“u2”, “purchase”, 8, t0 + 35),
make_event(“u1”, “refund”, -5, t0 + 62),
make_event(“u2”, “purchase”, 12, t0 + 70),
make_event(“u3”, “purchase”, 9, t0 + 75),
make_event(“u2”, “purchase”, 3, t0 + 50),
]

Definimos la configuración global que controla el tamaño de la ventana, el retraso y el modo de ejecución. Creamos eventos sintéticos con marcas de tiempo explícitas para que el comportamiento de las ventanas sea determinista y fácil de razonar. Preparamos un pequeño conjunto de datos que incluye intencionalmente eventos fuera de orden y tardíos para observar la semántica de tiempo de evento de Beam. Consulta los CÓDIGOS COMPLETOS aquí.

def format_joined_record(kv): user_id, d = kv return { “user_id”: user_id, “count”: int(d[“count”][0]) si d[“count”] demás 0, “cantidad_suma”: flotante(d[“sum_amount”][0]) si d[“sum_amount”] else 0.0, } clase WindowedUserAgg(beam.PTransform): def expandir(self, pcoll): estampado = pcoll | haz.Map(lambda e: haz.ventana.TimestampedValue(e, e[“event_time”])) con ventana = estampado | haz.WindowInto( Ventanas fijas(WINDOW_SIZE_SECS), latencia_permitida=ALLOWED_LATENESS_SECS, disparador=AfterWatermark( temprano=AfterProcessingTime(10), tarde=AfterProcessingTime(10), ), acumulacion_mode=AccumulationMode.ACCUMULATING, ) con clave = en ventana | haz.Map(lambda e: (e[“user_id”]mi[“amount”])) recuentos = con clave | beam.combiners.Count.PerKey() sumas = con clave | haz.CombinePerKey(suma) return ( {“count”: recuentos, “sum_amount”: sumas} | haz.CoGroupByKey() | haz.Map(format_joined_record) )

Construimos un Beam PTransform reutilizable que encapsula toda la lógica de agregación en ventanas. Aplicamos ventanas fijas, activadores y reglas de acumulación, luego agrupamos eventos por usuario y calculamos recuentos y sumas. Mantenemos esta transformación independiente de la fuente de datos, por lo que se aplica la misma lógica tanto a las entradas por lotes como a las de streaming. Consulta los CÓDIGOS COMPLETOS aquí.

clase AddWindowInfo(beam.DoFn): def proceso(self, elemento, ventana=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam): ws = float(window.start) nosotros = float(window.end) rendimiento { **elemento, “window_start_utc”: datetime.fromtimestamp(ws, tz=timezone.utc).strftime(“%H:%M:%S”), “window_end_utc”: datetime.fromtimestamp(nosotros, tz=timezone.utc).strftime(“%H:%M:%S”), “pane_timing”: str(pane_info.timing), “pane_is_first”: pane_info.is_first, “pane_is_last”: pane_info.is_last, } def build_test_stream(): retorno ( TestStream() .advance_watermark_to(t0) .add_elements([
beam.window.TimestampedValue(make_event(“u1”, “purchase”, 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event(“u1”, “purchase”, 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event(“u2”, “purchase”, 8, t0 + 35), t0 + 35),
]) .advance_processing_time(5) .advance_watermark_to(t0 + 61) .add_elements([
beam.window.TimestampedValue(make_event(“u1”, “refund”, -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event(“u2”, “purchase”, 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event(“u3”, “purchase”, 9, t0 + 75), t0 + 75),
]) .advance_processing_time(5) .add_elements([
beam.window.TimestampedValue(make_event(“u2”, “purchase”, 3, t0 + 50), t0 + 50),
]) .advance_watermark_to(t0 + 121) .advance_watermark_to_infinity() )

Enriquecemos cada registro agregado con metadatos de ventanas y paneles para que podamos ver claramente cuándo y por qué se emiten los resultados. Convertimos las marcas de tiempo internas de Beam en horas UTC legibles por humanos para mayor claridad. También definimos un TestStream que simula el comportamiento de transmisión real utilizando marcas de agua, avances en el tiempo de procesamiento y datos tardíos. Consulta los CÓDIGOS COMPLETOS aquí.

def run_batch(): con haz.Pipeline(opciones=PipelineOptions([])) como p: ( p | haz.Create(BATCH_EVENTS) | WindowedUserAgg() | haz.ParDo(AddWindowInfo()) | haz.Map(json.dumps) | haz.Map(imprimir) ) def run_stream(): opts = PipelineOptions([]) opts.view_as(StandardOptions).streaming = True con beam.Pipeline(options=opts) como p: ( p | build_test_stream() | WindowedUserAgg() | beam.ParDo(AddWindowInfo()) | beam.Map(json.dumps) | beam.Map(print) ) run_stream() if MODE == “stream” else run_batch()

Conectamos todo en canalizaciones ejecutables en forma de lotes y flujos. Alternamos entre modos cambiando un solo indicador mientras reutilizamos la misma transformación de agregación. Ejecutamos el proceso e imprimimos los resultados en ventana directamente, lo que hace que el flujo de ejecución y los resultados sean fáciles de inspeccionar.

En conclusión, demostramos que la misma canalización de Beam puede procesar tanto datos por lotes limitados como datos ilimitados similares a flujos, al tiempo que conserva una semántica de agregación y ventanas idéntica. Observamos cómo las marcas de agua, los activadores y los modos de acumulación influyen en el momento en que se emiten los resultados y cómo las actualizaciones tardías de datos computaban previamente las ventanas. Además, nos centramos en las bases conceptuales del modelo unificado de Beam, proporcionando una base sólida para luego escalar el mismo diseño a corredores de transmisión y entornos de producción reales.

Consulta los CÓDIGOS COMPLETOS aquí. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 100.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.

Consulte nuestra última versión de ai2025.dev, una plataforma de análisis centrada en 2025 que convierte los lanzamientos de modelos, los puntos de referencia y la actividad del ecosistema en un conjunto de datos estructurado que puede filtrar, comparar y exportar.

Michal Sutter es un profesional de la ciencia de datos con una Maestría en Ciencias de Datos de la Universidad de Padua. Con una base sólida en análisis estadístico, aprendizaje automático e ingeniería de datos, Michal se destaca en transformar conjuntos de datos complejos en conocimientos prácticos.