Introducción de eventos de servidor en Python | Hacia la ciencia de los datos

Un desarrollador, siempre estoy buscando formas de hacer que mis aplicaciones sean más dinámicas e interactivas. Los usuarios de hoy esperan funciones en tiempo real, como notificaciones en vivo, actualizaciones de transmisión y paneles que se actualizan automáticamente. La herramienta que a menudo me viene a la mente para los desarrolladores web al considerar este tipo de aplicaciones es WebSockets, y es increíblemente poderosa.

Sin embargo, hay momentos en que WebSockets puede ser excesivo, y su funcionalidad completa a menudo no es necesaria. Proporcionan un canal de comunicación bidireccional complejo, pero muchas veces, todo lo que necesito es que el servidor presione actualizaciones a el cliente. Para estos escenarios comunes, una solución más directa y elegante que se construye directamente en las plataformas web modernas se conoce como eventos de servidor (SSE).

En este artículo, te presentaré eventos de Servidor. Discutiremos cuáles son, cómo se comparan con WebSockets y por qué a menudo son la herramienta perfecta para el trabajo. Luego, nos sumergiremos en una serie de ejemplos prácticos, utilizando Python y Fastapi Framework para crear aplicaciones en tiempo real que sean sorprendentemente simples pero potentes.

¿Qué son los eventos de servidor (SSE)?

Servidor-Sent Events es un estándar de tecnología web que permite que un servidor empuja los datos a un cliente de manera asincrónica una vez que se haya establecido una conexión de cliente inicial. Proporciona una secuencia de datos unidireccional de servidor a cliente a través de una sola conexión HTTP de larga vida. El cliente, generalmente un navegador web, se suscribe a esta transmisión y puede reaccionar a los mensajes que recibe.

Algunos aspectos clave de los eventos del servidor incluyen:

  • Protocolo simple. SSE es un protocolo sencillo basado en texto. Los eventos son solo fragmentos de texto enviados a través de HTTP, lo que hace que sean fáciles de depurar con herramientas estándar como Curl.
  • HTTP estándar. SSE trabaja sobre http/https regular. Esto significa que generalmente es más compatible con los firewalls y los servidores proxy existentes.
  • Reconexión automática. Esta es una característica asesina. Si se pierde la conexión con el servidor, la API eventsurce del navegador intentará automáticamente reconectarse. Obtiene esta resiliencia de forma gratuita, sin escribir ningún código JavaScript adicional.
  • Comunicación unidireccional. SSE es estrictamente para los datos de datos de servidor a cliente. Si necesita comunicación Full-Duplex, de cliente a servidor, los websockets son la opción más apropiada.
  • Soporte nativo del navegador. Todos los navegadores web modernos tienen soporte incorporado para eventos de servidor (SSE) a través de la interfaz EventsOurce, eliminando la necesidad de bibliotecas del lado del cliente.

Por qué SSE es importante/casos de uso común

La principal ventaja de SSE es su simplicidad. Para una gran clase de problemas en tiempo real, proporciona toda la funcionalidad necesaria con una fracción de la complejidad de WebSockets, tanto en el servidor como en el cliente. Esto significa un desarrollo más rápido, un mantenimiento más fácil y menos cosas que pueden salir mal.

SSE es perfecto para cualquier escenario en el que el servidor necesite iniciar comunicación y enviar actualizaciones al cliente. Por ejemplo …

  • Sistemas de notificación en vivo. Empujando las notificaciones a un usuario cuando llega un nuevo mensaje o se produce un evento importante.
  • La actividad en tiempo real se alimenta. Transmisión de actualizaciones de la actividad de actividad de un usuario, similar a una línea de tiempo de Twitter o Facebook.
  • Paneles de datos en vivo. Enviar actualizaciones continuas para tickers de stock, puntajes deportivos o métricas de monitoreo a un tablero en vivo.
  • Salidas de registro de transmisión. Mostrando la salida del registro en vivo desde un proceso de fondo de larga duración directamente en el navegador del usuario.
  • Actualizaciones de progreso. Mostrando el progreso en tiempo real de una carga de archivo, un trabajo de procesamiento de datos o cualquier otra tarea de larga duración iniciada por el usuario.

Esa es suficiente teoría; Veamos cuán fácil es implementar estas ideas con Python.

Configuración del entorno de desarrollo

Utilizaremos Fastapi, un marco web de Python moderno y de alto rendimiento. Su soporte nativo para Asyncio y las respuestas de transmisión lo convierte en una coincidencia perfecta para implementar eventos de servidor. También necesitará el servidor ASGI UVICORN para ejecutar la aplicación.

Como de costumbre, estableceremos un entorno de desarrollo para mantener nuestros proyectos separados. Sugiero usar Miniconda para esto, pero no dude en usar la herramienta a la que esté acostumbrado.

# Create and activate a new virtual environment
(base) $ conda create -n sse-env python=3.13 -y
(base) $ activate sse-env

Ahora, instale las bibliotecas externas que necesitamos.

# Install FastAPI and Uvicorn
(sse-env) $ pip install fastapi uvicorn

Esa es toda la configuración que necesitamos. Ahora podemos comenzar a codificar.

Código Ejemplo 1 – El backend de Python. Un punto final de SSE simple

Creemos nuestro primer punto final de SSE. Enviará un mensaje con la hora actual al cliente cada segundo.

Cree un archivo llamado App.py y escriba lo siguiente en él.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import time

app = FastAPI()

# Allow requests from http://localhost:8080 (where index.html is served)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8080"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

def event_stream():
    while True:
        yield f"data: The time is {time.strftime('%X')}\n\n"
        time.sleep(1)

@app.get("/stream-time")
def stream():
    return StreamingResponse(event_stream(), media_type="text/event-stream")

Espero que esté de acuerdo en que este código es sencillo.

  1. Definimos una función event_stream (). Este bucle se repite sin cesar, produciendo una cadena cada segundo.
  2. La cadena producida se formatea de acuerdo con la especificación de SSE: debe comenzar con datos: y termina con dos nuevas líneas (\ n \ n).
  3. Nuestro punto final /tiempo de transmisión devuelve una respuesta de transmisión, transmitiendo nuestro generador a él y configurando el Media_Type en Text /Event-Stream. Fastapi maneja el resto, manteniendo la conexión abierta y el envío de cada uno arrojó un fragmento al cliente.

Para ejecutar el código, no use el estándar Python App.py comando como lo haría normalmente. En cambio, haz esto.

(sse-env)$ uvicorn app:app --reload

INFO:     Will watch for changes in these directories: ['/home/tom']
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [4109269] using WatchFiles
INFO:     Started server process [4109271]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

Ahora, escriba esta dirección en su navegador …

http://127.0.0.1:8000/stream-time

… y deberías ver algo como esto.

Imagen del autor

La pantalla debe mostrar un registro de tiempo actualizado cada segundo.

Código Ejemplo 2. Panel de monitoreo del sistema en tiempo real

En este ejemplo, monitorearemos la CPU de nuestra PC o portátil y el uso de la memoria en tiempo real.

Aquí está el código App.py que necesita.

import asyncio
import json
import psutil
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import datetime

# Define app FIRST
app = FastAPI()

# Then add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8080"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

async def system_stats_generator(request: Request):
    while True:
        if await request.is_disconnected():
            print("Client disconnected.")
            break

        cpu_usage = psutil.cpu_percent()
        memory_info = psutil.virtual_memory()

        stats = {
            "cpu_percent": cpu_usage,
            "memory_percent": memory_info.percent,
            "memory_used_mb": round(memory_info.used / (1024 * 1024), 2),
            "memory_total_mb": round(memory_info.total / (1024 * 1024), 2)
        }

        yield f"data: {json.dumps(stats)}\n\n"
        await asyncio.sleep(1)

@app.get("/system-stats")
async def stream_system_stats(request: Request):
    return StreamingResponse(system_stats_generator(request), media_type="text/event-stream")

@app.get("/", response_class=HTMLResponse)
async def read_root():
    with open("index.html") as f:
        return HTMLResponse(content=f.read())

Este código construye un servicio de monitoreo del sistema en tiempo real utilizando el marco web de Fastapi. Crea un servidor web que rastrea y transmite continuamente la CPU y el uso de memoria de la máquina host a cualquier cliente web conectado.

Primero, inicializa una aplicación FastAPI y configura el middleware de intercambio de recursos de origen cruzado (CORS). Este middleware es una característica de seguridad que se configura explícitamente aquí para permitir una página web servida de http: // localhost: 8080 para hacer solicitudes a este servidor, que es un requisito común cuando el interfaz y el backend se desarrollan por separado.

El núcleo de la aplicación es la función asincrónica System_stats_Generator. Esta función se ejecuta en un bucle infinito, y en cada iteración, utiliza la biblioteca Psutil para obtener el porcentaje de utilización actual de la CPU y las estadísticas de memoria detalladas, incluido el porcentaje utilizado, los megabytes utilizados y los megabytes totales. Empaca esta información en un diccionario, la convierte en una cadena JSON y luego la produce en el formato específico “Texto/transmisión de eventos” (datos: … \ n \ n).

El uso de Asyncio.sleep (1) introduce una pausa de un segundo entre las actualizaciones, evitando que el bucle consuma recursos excesivos. La función también está diseñada para detectar cuándo un cliente ha desconectado y deja de enviar datos para ese cliente.

El script define dos puntos finales web. El punto final @App.get (“/System-Stats”) crea una respuesta de transmisión que inicia el sistema_stats_generator. Cuando un cliente solicita a esta URL, establece una conexión persistente y el servidor comienza a transmitir las estadísticas del sistema cada segundo. El segundo punto final, @app.get (“/”), sirve un archivo HTML estático llamado index.html como la página principal. Este archivo HTML generalmente contendría el código JavaScript necesario para conectarse a la transmisión /System-Stats y mostrar dinámicamente los datos de rendimiento entrantes en la página web.

Ahora, aquí está el código frontal actualizado (index.html).

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>System Monitor</title>
    <style>
        body { font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; background-color: #f0f2f5; color: #333; display: flex; justify-content: center; align-items: center; height: 100vh; margin: 0; }
        .dashboard { background-color: white; padding: 2rem; border-radius: 8px; box-shadow: 0 4px 12px rgba(0,0,0,0.1); width: 400px; text-align: center; }
        h1 { margin-top: 0; }
        .metric { margin-bottom: 1.5rem; }
        .metric-label { font-weight: bold; font-size: 1.2rem; margin-bottom: 0.5rem; }
        .progress-bar { width: 100%; background-color: #e9ecef; border-radius: 4px; overflow: hidden; }
        .progress-bar-fill { height: 20px; background-color: #007bff; width: 0%; transition: width 0.5s ease-in-out; }
        .metric-value { margin-top: 0.5rem; font-size: 1rem; color: #555; }
    </style>
</head>
<body>
    <div class="dashboard">
        <h1>Real-Time Server Monitor</h1>
        <div class="metric">
            <div class="metric-label">CPU Usage</div>
            <div class="progress-bar">
                <div id="cpu-progress" class="progress-bar-fill"></div>
            </div>
            <div id="cpu-value" class="metric-value">0%</div>
        </div>
        <div class="metric">
            <div class="metric-label">Memory Usage</div>
            <div class="progress-bar">
                <div id="mem-progress" class="progress-bar-fill" style="background-color: #28a745;"></div>
            </div>
            <div id="mem-value" class="metric-value">0% (0 / 0 MB)</div>
        </div>
    </div>
    <script>
        const cpuProgress = document.getElementById('cpu-progress');
        const cpuValue = document.getElementById('cpu-value');
        const memProgress = document.getElementById('mem-progress');
        const memValue = document.getElementById('mem-value');

        const eventSource = new EventSource('http://localhost:8000/system-stats');

        eventSource.onmessage = function(event) {
            // Parse the JSON data from the server
            const stats = JSON.parse(event.data);

            // Update CPU elements
            cpuProgress.style.width = stats.cpu_percent + '%';
            cpuValue.textContent = stats.cpu_percent.toFixed(2) + '%';

            // Update Memory elements
            memProgress.style.width = stats.memory_percent + '%';
            memValue.textContent = `${stats.memory_percent.toFixed(2)}% (${stats.memory_used_mb} / ${stats.memory_total_mb} MB)`;
        };

        eventSource.onerror = function(err) {
            console.error("EventSource failed:", err);
            cpuValue.textContent = "Connection Error";
            memValue.textContent = "Connection Error";
        };
    </script>
</body>
</html>

Ejecute la aplicación usando Uvicorn, como lo hicimos en el Ejemplo 1. Luego, en una ventana de comando separada, escriba lo siguiente para iniciar un servidor Python.

python3 -m http.server 8080

Ahora, abra la URL http: // localhost: 8080/index.html en su navegador, y verá la salida, que debería actualizarse continuamente.

Imagen del autor

Código Ejemplo 3 – Barra de progreso de la tarea de fondo

En este ejemplo, iniciamos una tarea y mostramos una barra que indica el progreso de la tarea.

App.py actualizado

import asyncio
import json
import psutil
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import datetime

# Define app FIRST
app = FastAPI()

# Then add middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8080"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

async def training_progress_generator(request: Request):
    """
    Simulates a long-running AI training task and streams progress.
    """
    total_epochs = 10
    steps_per_epoch = 100

    for epoch in range(1, total_epochs + 1):
        # Simulate some initial processing for the epoch
        await asyncio.sleep(0.5)

        for step in range(1, steps_per_epoch + 1):
            # Check if client has disconnected
            if await request.is_disconnected():
                print("Client disconnected, stopping training task.")
                return

            # Simulate work
            await asyncio.sleep(0.02)

            progress = (step / steps_per_epoch) * 100
            simulated_loss = (1 / epoch) * (1 - (step / steps_per_epoch)) + 0.1

            progress_data = {
                "epoch": epoch,
                "total_epochs": total_epochs,
                "progress_percent": round(progress, 2),
                "loss": round(simulated_loss, 4)
            }

            # Send a named event "progress"
            yield f"event: progress\ndata: {json.dumps(progress_data)}\n\n"

    # Send a final "complete" event
    yield f"event: complete\ndata: Training complete!\n\n"

@app.get("/stream-training")
async def stream_training(request: Request):
    """SSE endpoint to stream training progress."""
    return StreamingResponse(training_progress_generator(request), media_type="text/event-stream")

@app.get("/", response_class=HTMLResponse)
async def read_root():
    """Serves the main HTML page."""
    with open("index.html") as f:
        return HTMLResponse(content=f.read())

El código actualizado index.html es este.

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Live Task Progress</title>
    <style>
        body { font-family: sans-serif; text-align: center; padding-top: 50px; }
        .progress-container { width: 80%; max-width: 700px; margin: auto; }
        #start-btn { font-size: 1.2rem; padding: 10px 20px; cursor: pointer; }
        .progress-bar-outer { border: 1px solid #ccc; padding: 3px; border-radius: 5px; margin-top: 20px; }
        .progress-bar-inner { background-color: #4CAF50; width: 0%; height: 30px; text-align: center; line-height: 30px; color: white; border-radius: 3px; transition: width 0.1s linear; }
        #status-text { margin-top: 10px; font-size: 1rem; color: #555; height: 2em; }
    </style>
</head>
<body>
    <h1>AI Model Training Simulation</h1>
    <div class="progress-container">
        <button id="start-btn">Start Training</button>
        <div id="progress-bar-outer" class="progress-bar-outer" style="display: none;">
            <div id="progress-bar-inner" class="progress-bar-inner">0%</div>
        </div>
        <div id="status-text"></div>
    </div>
    <script>
        const startBtn = document.getElementById('start-btn');
        const progressBarOuter = document.getElementById('progress-bar-outer');
        const progressBarInner = document.getElementById('progress-bar-inner');
        const statusText = document.getElementById('status-text');

        let eventSource;

        startBtn.addEventListener('click', () => {
            startBtn.disabled = true;
            progressBarOuter.style.display = 'block';
            statusText.textContent = 'Initializing...';

            // Close any existing connection
            if (eventSource) {
                eventSource.close();
            }

            // Start a new SSE connection
            eventSource = new EventSource('http://localhost:8000/stream-training');

            eventSource.addEventListener('progress', (e) => {
                const data = JSON.parse(e.data);
                const percent = data.progress_percent;
                progressBarInner.style.width = percent + '%';
                progressBarInner.textContent = percent.toFixed(0) + '%';
                statusText.textContent = `Epoch: ${data.epoch}/${data.total_epochs} | Loss: ${data.loss}`;
            });

            eventSource.addEventListener('complete', (e) => {
                statusText.textContent = e.data;
                progressBarInner.style.backgroundColor = '#007bff';
                eventSource.close(); // Close the connection
                startBtn.disabled = false;
            });

            eventSource.onerror = () => {
                statusText.textContent = 'Connection error. Please try again.';
                eventSource.close();
                startBtn.disabled = false;
            };
        });
    </script>
</body>
</html>

Detenga sus procesos existentes de servidor Uvicorn y Python si todavía se están ejecutando, y luego reinicie ambos.

Ahora, cuando abre la página index.html, debería ver una pantalla con un botón. Al presionar el botón, iniciará una tarea ficticia, y una barra en movimiento mostrará el progreso de la tarea.

Imagen del autor

Código Ejemplo 4: un ticker de acciones financieras en tiempo real

Para nuestro último ejemplo, crearemos un ticker de stock simulado. El servidor generará actualizaciones de precios aleatorios para varios símbolos de stock y los enviará usando eventos con nombre, donde el nombre del evento corresponde al símbolo de stock (por ejemplo, evento: AAPL, Event: Googl). Este es un patrón poderoso para multiplexar diferentes tipos de datos en una sola conexión SSE, lo que permite al cliente manejar cada transmisión de forma independiente.

Aquí está el código APP.py actualizado que necesitará.

import asyncio
import json
import random
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware

# Step 1: Create app first
app = FastAPI()

# Step 2: Add CORS to allow requests from http://localhost:8080
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:8080"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

# Step 3: Simulated stock prices
STOCKS = {
    "AAPL": 150.00,
    "GOOGL": 2800.00,
    "MSFT": 300.00,
}

# Step 4: Generator to simulate updates
async def stock_ticker_generator(request: Request):
    while True:
        if await request.is_disconnected():
            break

        symbol = random.choice(list(STOCKS.keys()))
        change = random.uniform(-0.5, 0.5)
        STOCKS[symbol] = max(0, STOCKS[symbol] + change)

        update = {
            "symbol": symbol,
            "price": round(STOCKS[symbol], 2),
            "change": round(change, 2)
        }

        # Send named events so the browser can listen by symbol
        yield f"event: {symbol}\ndata: {json.dumps(update)}\n\n"
        await asyncio.sleep(random.uniform(0.5, 1.5))

# Step 5: SSE endpoint
@app.get("/stream-stocks")
async def stream_stocks(request: Request):
    return StreamingResponse(stock_ticker_generator(request), media_type="text/event-stream")

Y el índice actualizado.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Live Stock Ticker</title>
    <style>
        body { font-family: sans-serif; display: flex; justify-content: center; padding-top: 50px; }
        .ticker { display: flex; gap: 20px; }
        .stock { border: 1px solid #ccc; padding: 15px; border-radius: 5px; width: 150px; text-align: center; }
        .symbol { font-weight: bold; font-size: 1.5rem; }
        .price { font-size: 2rem; margin: 10px 0; }
        .change { font-size: 1rem; }
        .up { color: green; }
        .down { color: red; }
    </style>
</head>
<body>
    <div class="ticker">
        <div id="AAPL" class="stock">
            <div class="symbol">AAPL</div>
            <div class="price">--.--</div>
            <div class="change">-.--</div>
        </div>
        <div id="GOOGL" class="stock">
            <div class="symbol">GOOGL</div>
            <div class="price">--.--</div>
            <div class="change">-.--</div>
        </div>
        <div id="MSFT" class="stock">
            <div class="symbol">MSFT</div>
            <div class="price">--.--</div>
            <div class="change">-.--</div>
        </div>
    </div>

    <script>
        const eventSource = new EventSource('http://localhost:8000/stream-stocks');

        function updateStock(data) {
            const stock = document.getElementById(data.symbol);
            if (!stock) return;

            const priceEl = stock.querySelector('.price');
            const changeEl = stock.querySelector('.change');

            priceEl.textContent = data.price.toFixed(2);
            changeEl.textContent = data.change.toFixed(2);

            const className = data.change >= 0 ? 'up' : 'down';
            priceEl.className = 'price ' + className;
            changeEl.className = 'change ' + className;
        }

        ['AAPL', 'GOOGL', 'MSFT'].forEach(symbol => {
            eventSource.addEventListener(symbol, e => {
                const stockData = JSON.parse(e.data);
                updateStock(stockData);
            });
        });

        eventSource.onerror = function(err) {
            console.error("EventSource failed:", err);
        };
    </script>
</body>
</html>

Detente luego reinicie los procesos Uvicorn y Python como antes. Esta vez, cuando abre http: // localhost: 8080/index.html En su navegador, debe ver una pantalla como esta, que actualizará continuamente los precios ficticios de las tres acciones.

Imagen del autor

Resumen

En este artículo, demostré que para muchos casos de uso en tiempo real, los eventos orientados al servidor ofrecen una alternativa más simple a WebSockets. Discutimos los principios centrales de SSE, incluido su modelo de comunicación unidireccional y sus capacidades de reconexión automática. A través de una serie de ejemplos prácticos que usan Python y Fastapi, vimos lo fácil que es construir características potentes en tiempo real. Cubrimos:

  • Un simple punto de final de Python Back-end y SSE
  • Un sistema en vivo que monitorea los datos JSON estructurados de transmisión del tablero.
  • Una barra de progreso en tiempo real para una tarea de fondo simulada de larga duración.
  • Un ticker de stock multiplexado que usa eventos con nombre para administrar diferentes flujos de datos.

La próxima vez que necesite empujar los datos de su servidor a un cliente, le animo a que haga una pausa antes de buscar WebSockets. Pregúntese si realmente necesita una comunicación bidireccional. Si la respuesta es no, entonces los eventos de servidor es probable que sea la solución más sencilla, más rápida y más robusta que haya estado buscando.