En este cuaderno, demostramos cómo construir una tubería de “alerta de sensor” totalmente en la memoria en Google Colab usando Faststreamun marco de procesamiento de flujo-nativo de alto rendimiento de alto rendimiento, y su integración con RabbitMQ. Al aprovechar el rabbitbroker y TestRabbitbroker de rabbit. Orquestamos cuatro etapas distintas: ingestión y validación, normalización, monitoreo y generación de alerta, y archivando, cada una definida como modelos pydánticos (rawsensordata, identificación normalizada, alertas) para garantizar la calidad de los datos y la seguridad de los tipos. Bajo el capó, el Asyncio de Python impulsa el flujo de mensajes asíncronos, mientras que Nest_asyncio permite bucles de eventos anidados en Colab. También empleamos el módulo de registro estándar para la ejecución rastreable de la tubería y los pandas para la inspección de resultados finales, lo que facilita la visualización de alertas archivadas en un marco de datos.
!pip install -q faststream[rabbit] nest_asyncio
Instalamos Faststream con su integración de RabbitMQ, proporcionando el marco de procesamiento de flujo central y los conectores de corredores, así como el paquete Nest_asyncio, que permite bucles de eventos de Asyncio anidados en entornos como Colab. Todo esto se logra mientras mantiene la salida mínima con el indicador -Q.
import nest_asyncio, asyncio, logging
nest_asyncio.apply()
Importamos los módulos Nest_asyncio, Asyncio y de registro, luego aplicamos nest_asyncio.apply () al bucle de eventos de parche de Python para que pueda ejecutar tareas asíncronas anidadas dentro de entornos de colab o cuadernos Jupyter sin errores. La importación de registro lo prepara para instrumentar su tubería con registros detallados de tiempo de ejecución.
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("sensor_pipeline")
Configuramos el registro incorporado de Python para emitir mensajes de nivel de información (y arriba) con un prefijo de marca de tiempo y gravedad, luego creamos un registrador dedicado llamado “Sensor_pipeline” para emitir registros estructurados dentro de su tubería de transmisión.
from faststream import FastStream
from faststream.rabbit import RabbitBroker, TestRabbitBroker
from pydantic import BaseModel, Field, validator
import pandas as pd
from typing import List
Traemos la clase Core Faststream de Faststream junto con sus conectores RabbitMQ (Rabbitbroker para corredores reales y TestRabbitbriter para pruebas intermedio), BaseModel, campo y validador de Pydantic para la validación de datos declarativos, PANDAS para la inspección de resultados tabulares y el tipo de Python Tipo para anotar nuestros archivos In -Memory.
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)
Instanciamos una instancia de un Rabbitbroker señaló a un servidor de RabbitMQ (local) utilizando la URL AMQP, luego creamos una aplicación FastStream vinculada a ese corredor, configurando la columna vertebral de mensajes para sus etapas de tuberías.
class RawSensorData(BaseModel):
sensor_id: str = Field(..., examples=["sensor_1"])
reading_celsius: float = Field(..., ge=-50, le=150, examples=[23.5])
@validator("sensor_id")
def must_start_with_sensor(cls, v):
if not v.startswith("sensor_"):
raise ValueError("sensor_id must start with 'sensor_'")
return v
class NormalizedData(BaseModel):
sensor_id: str
reading_kelvin: float
class AlertData(BaseModel):
sensor_id: str
reading_kelvin: float
alert: bool
Estos modelos pydánticos definen el esquema para cada etapa: Rawsensordata aplica la validez de entrada (por ejemplo, el rango de lectura y un prefijo sensor), los datos normalizados convierten Celsius en Kelvin y alertdata encapsula la carga útil de alerta final (incluida una bandera booleana), asegurando un flujo de datos tipográfico a lo largo de la tubería.
archive: List[AlertData] = []
@broker.subscriber("sensor_input")
@broker.publisher("normalized_input")
async def ingest_and_validate(raw: RawSensorData) -> dict:
logger.info(f"Ingested raw data: {raw.json()}")
return raw.dict()
@broker.subscriber("normalized_input")
@broker.publisher("sensor_alert")
async def normalize(data: dict) -> dict:
norm = NormalizedData(
sensor_id=data["sensor_id"],
reading_kelvin=data["reading_celsius"] + 273.15
)
logger.info(f"Normalized to Kelvin: {norm.json()}")
return norm.dict()
ALERT_THRESHOLD_K = 323.15
@broker.subscriber("sensor_alert")
@broker.publisher("archive_topic")
async def monitor(data: dict) -> dict:
alert_flag = data["reading_kelvin"] > ALERT_THRESHOLD_K
alert = AlertData(
sensor_id=data["sensor_id"],
reading_kelvin=data["reading_kelvin"],
alert=alert_flag
)
logger.info(f"Monitor result: {alert.json()}")
return alert.dict()
@broker.subscriber("archive_topic")
async def archive_data(payload: dict):
rec = AlertData(**payload)
archive.append(rec)
logger.info(f"Archived: {rec.json()}")
Una lista de archivos en memoria recopila todas las alertas finalizadas, mientras que cuatro funciones asíncronas, conectadas a través de @Broker.SubsCriber/ @Broker.Publisher, forman las etapas de la tubería. Estas funciones ingieren y validan las entradas del sensor sin procesar, convierten Celsius en Kelvin, verifiquen un umbral de alerta y finalmente archiven cada registro de datos de alertas, emitiendo registros en cada paso para obtener una trazabilidad total.
async def main():
readings = [
{"sensor_id": "sensor_1", "reading_celsius": 45.2},
{"sensor_id": "sensor_2", "reading_celsius": 75.1},
{"sensor_id": "sensor_3", "reading_celsius": 50.0},
]
async with TestRabbitBroker(broker) as tb:
for r in readings:
await tb.publish(r, "sensor_input")
await asyncio.sleep(0.1)
df = pd.DataFrame([a.dict() for a in archive])
print("\nFinal Archived Alerts:")
display(df)
asyncio.run(main())
Finalmente, el Coroutine principal publica un conjunto de lecturas de sensores de muestra en el testRabbitbroker en memoria, se detiene brevemente para permitir que cada etapa de tubería se ejecute, y luego recopila los registros de alertas resultantes del archivo en un marco de datos de Pandas para una fácil visualización y verificación del flujo de alerta de extremo a extremo. Al final, Asyncio.run (Main ()) inicia toda la demostración de Async en Colab.
En conclusión, este tutorial demuestra cómo Faststream, combinado con las abstracciones de RabbitMQ y las pruebas en memoria a través de TestRabbitBroker, puede acelerar el desarrollo de tuberías de datos en tiempo real sin la sobrecarga de la implementación de corredores externos. Con la validación de esquema de manejo de pydantic, Asyncio que administra la concurrencia y los pandas que habilitan el análisis de datos rápidos, este patrón proporciona una base robusta para el monitoreo de sensores, las tareas ETL o los flujos de trabajo impulsados por los eventos. Puede hacer una transición sin problemas de esta demostración interna a la producción cambiando en una URL de corredor en vivo (RabbitMQ, Kafka, Nats o Redis) y ejecutando Faststream Run bajo Uvicorn o su servidor ASGI preferido, desbloqueando el procesamiento de flujo escalable y mantenible en cualquier entorno de pitón.
Aquí está el Cuaderno de colab. Además, no olvides seguirnos Gorjeo y únete a nuestro Canal de telegrama y LinkedIn GRsalpicar. No olvides unirte a nuestro 90k+ ml de subreddit.
Sana Hassan, una pasante de consultoría en MarktechPost y estudiante de doble grado en IIT Madras, le apasiona aplicar tecnología e IA para abordar los desafíos del mundo real. Con un gran interés en resolver problemas prácticos, aporta una nueva perspectiva a la intersección de la IA y las soluciones de la vida real.