Una guía de codificación para comprender cómo los reintentos desencadenan cascadas de fallas en RPC y arquitecturas basadas en eventos

En este tutorial, creamos una comparación práctica entre un sistema síncrono basado en RPC y una arquitectura asíncrona basada en eventos para comprender cómo se comportan los sistemas distribuidos reales bajo carga y falla. Simulamos servicios descendentes con latencia variable, condiciones de sobrecarga y errores transitorios, y luego impulsamos ambas arquitecturas utilizando patrones de tráfico en ráfagas. Al observar métricas como la latencia de cola, los reintentos, las fallas y las colas de mensajes no entregados, examinamos cómo el estrecho acoplamiento de RPC amplifica las fallas y cómo los diseños asíncronos impulsados ​​por eventos intercambian consistencia inmediata por resiliencia. A lo largo del tutorial, nos centramos en mecanismos prácticos, reintentos, retrocesos exponenciales, disyuntores, mamparos y colas que los ingenieros utilizan para controlar fallas en cascada en los sistemas de producción. Consulta los CÓDIGOS COMPLETOS aquí.

importar asyncio, aleatorio, tiempo, matemáticas, estadísticas de clases de datos importar clase de datos, campo de colecciones importar deque def now_ms(): devolver time.perf_counter() * 1000.0 def pctl(xs, p): si no xs: devolver Ninguno xs2 = ordenado(xs) k = (len(xs2) – 1) * p f = math.floor(k) c = math.ceil(k) if f == c: devolver xs2[int(k)]
volver xs2[f] + (xs2[c] – xs2[f]) * (k – f) @dataclass class Estadísticas: latencias_ms: lista = campo(default_factory=list) ok: int = 0 falla: int = 0 descartado: int = 0 reintentos: int = 0 tiempos de espera: int = 0 cb_open: int = 0 dlq: int = 0 def resumen(self, nombre): l = self.latencies_ms return { “nombre”: nombre, “ok”: self.ok, “fail”: self.fail, “dropped”: self.dropped, “retries”: self.retries, “timeouts”: self.timeouts, “cb_open”: self.cb_open, “dlq”: self.dlq, “lat_p50_ms”: round(pctl(l, 0.50), 2) si l else Ninguno, “lat_p95_ms”: round(pctl(l, 0.95), 2) if l else Ninguno, “lat_p99_ms”: round(pctl(l, 0.99), 2) if l else Ninguno, “lat_mean_ms”: round(statistics.mean(l), 2) if l else Ninguno, }

Definimos las utilidades principales y las estructuras de datos utilizadas a lo largo del tutorial. Establecemos ayudas de sincronización, cálculos de percentiles y un contenedor de métricas unificado para realizar un seguimiento de la latencia, los reintentos, las fallas y el comportamiento de cola. Nos brinda una forma consistente de medir y comparar RPC y ejecuciones basadas en eventos. Consulta los CÓDIGOS COMPLETOS aquí.

@dataclass clase FailureModel: base_latency_ms: float = 8.0 jitter_ms: float = 6.0 fail_prob: float = 0.05 sobrecarga_fail_prob: float = 0.40 sobrecarga_latency_ms: float = 50.0 def sample(self, load_factor: float): base = self.base_latency_ms + random.random() * self.jitter_ms if load_factor > 1.0: base += (load_factor – 1.0) * self.overload_latency_ms fail_p = min(0.95, self.fail_prob + (load_factor – 1.0) * self.overload_fail_prob) else: fail_p = self.fail_prob base de retorno, (random.random() < fail_p) clase CircuitBreaker: def __init__(self, umbral_fallo=8, ventana=20, open_ms=500): self.umbral_fail = umbral_fallo self.window = ventana self.open_ms = open_ms self.events = deque(maxlen=ventana) self.open_until_ms = 0.0 def permitir(self): return now_ms() >= self.open_until_ms def record(self, ok: bool): self.events.append(no está bien) if len(self.events) >= self.window y sum(self.events) >= self.fail_threshold: self.open_until_ms = now_ms() + self.open_ms class Bulkhead: def __init__(self, limit): self.sem = asyncio.Semaphore(limit) async def __aenter__(self): await self.sem.acquire() async def __aexit__(self, exc_type, exc, tb): self.sem.release() def exp_backoff(intento, base_ms=20, cap_ms=400): devuelve random.random() * min(cap_ms, base_ms * (2 ** (intento – 1)))

Modelamos el comportamiento de falla y las primitivas de resiliencia que dan forma a la estabilidad del sistema. Simulamos fallas y latencia sensibles a sobrecargas, e introducimos disyuntores, mamparos y retroceso exponencial para controlar los efectos en cascada. Estos componentes nos permiten experimentar con configuraciones de sistemas distribuidos seguras versus inseguras. Consulta los CÓDIGOS COMPLETOS aquí.

clase DownstreamService: def __init__(self, fm: FailureModel, capacidad_rps=250): self.fm = fm self.capacity_rps = capacidad_rps self._inflight = 0 async def handle(self, payload: dict): self._inflight += 1 intento: load_factor = max(0.5, self._inflight / (self.capacity_rps / 10)) lat, debería_fail = self.fm.sample(load_factor) await asyncio.sleep(lat / 1000.0) si debería_fail: elevar RuntimeError(“downstream_error”) return {“status”: “ok”} finalmente: self._inflight -= 1 async def rpc_call( svc, req, stats, timeout_ms=120, max_retries=0, cb=Ninguno, Bulkhead=None,): t0 = now_ms() si cb y no cb.allow(): stats.cb_open += 1 stats.fail += 1 retorno Intento falso = 0 mientras que Verdadero: intento += 1 intento: si Bulkhead: asíncrono con Bulkhead: await asyncio.wait_for(svc.handle(req), timeout=timeout_ms / 1000.0) else: await asyncio.wait_for(svc.handle(req), timeout=timeout_ms / 1000.0) stats.latencies_ms.append(now_ms() – t0) stats.ok += 1 si cb: cb.record(True) devuelve True excepto asyncio.TimeoutError: stats.timeouts += 1 excepto Excepción: pasar stats.fail += 1 si cb: cb.record(False) si intento <= max_retries: stats.retries += 1 await asyncio.sleep(exp_backoff(intento) / 1000.0) continuar devolver False

Implementamos la ruta RPC síncrona y su interacción con los servicios posteriores. Observamos cómo los tiempos de espera, los reintentos y la carga en vuelo afectan directamente la latencia y la propagación de fallas. También destaca cómo el acoplamiento estrecho en RPC puede amplificar los problemas transitorios en condiciones de tráfico intenso. Consulta los CÓDIGOS COMPLETOS aquí.

@dataclass clase Evento: id: int intentos: int = 0 clase EventBus: def __init__(self, max_queue=5000): self.q = asyncio.Queue(maxsize=max_queue) async def publicar(self, e: Evento): try: self.q.put_nowait(e) return True excepto asyncio.QueueFull: return False async def event_consumer( bus, svc, stats, stop, max_retries=0, dlq=None, Bulkhead=None, timeout_ms=200,): mientras no stop.is_set() o no bus.q.empty(): intente: e = await asyncio.wait_for(bus.q.get(), timeout=0.2) excepto asyncio.TimeoutError: continúe t0 = now_ms() e.tries += 1 intento: si mamparo: asíncrono con mamparo: espere asyncio.wait_for(svc.handle({“id”: e.id}), timeout=timeout_ms / 1000.0) más: espere asyncio.wait_for(svc.handle({“id”: e.id}), timeout=timeout_ms / 1000.0) stats.ok += 1 stats.latencies_ms.append(now_ms() – t0) excepto Excepción: stats.fail += 1 si e.tries <= max_retries: stats.retries += 1 await asyncio.sleep(exp_backoff(e.tries) / 1000.0) await bus.publish(e) else: stats.dlq += 1 si dlq no es Ninguno: dlq.append(e) finalmente: bus.q.task_done()

Construimos la canalización asincrónica basada en eventos utilizando una cola y consumidores en segundo plano. Procesamos eventos independientemente del envío de solicitudes, aplicamos lógica de reintento y enrutamos mensajes irrecuperables a una cola de mensajes no entregados. Demuestra cómo el desacoplamiento mejora la resiliencia al tiempo que introduce nuevas consideraciones operativas. Consulta los CÓDIGOS COMPLETOS aquí.

async def generate_requests(total=2000, burst=350, gap_ms=80): reqs = []
deshacerse = 0 mientras que deshacerse < total: n = min(ráfaga, total - deshacerse) para _ en el rango (n): reqs.append(rid) deshacerse += 1 await asyncio.sleep(gap_ms / 1000.0) return reqs async def main(): random.seed(7) fm = FailureModel() svc = DownstreamService(fm) ids = await generate_requests() rpc_stats = Estadísticas() cb = CircuitBreaker() masivo = Mamparo(40) espera asyncio.gather(*[ rpc_call(svc, {"id": i}, rpc_stats, max_retries=3, cb=cb, bulkhead=bulk) for i in ids ]) bus = EventBus() ev_stats = Estadísticas() parada = asyncio.Event() dlq = [] consumidores = [ asyncio.create_task(event_consumer(bus, svc, ev_stats, stop, max_retries=3, dlq=dlq)) for _ in range(16) ] para i en ids: await bus.publish(Event(i)) await bus.q.join() stop.set() para c en consumidores: c.cancel() print(rpc_stats.summary("RPC")) print(ev_stats.summary("EventDriven")) print("DLQ size:", len(dlq)) await main()

Impulsamos ambas arquitecturas con cargas de trabajo en ráfagas y organizamos el experimento completo. Recopilamos métricas, finalizamos limpiamente a los consumidores y comparamos resultados entre RPC y ejecuciones basadas en eventos. El paso final une la latencia, el rendimiento y el comportamiento de falla en una comparación coherente a nivel del sistema.

En conclusión, vimos claramente las ventajas y desventajas entre RPC y las arquitecturas basadas en eventos en sistemas distribuidos. Observamos que RPC ofrece una latencia más baja cuando las dependencias están en buen estado, pero se vuelve frágil cuando se satura, donde los reintentos y los tiempos de espera se convierten rápidamente en fallas en todo el sistema. Por el contrario, el enfoque basado en eventos desacopla a los productores de los consumidores, absorbe las ráfagas mediante el almacenamiento en búfer y localiza las fallas, pero requiere un manejo cuidadoso de los reintentos, la contrapresión y las colas de mensajes fallidos para evitar sobrecargas ocultas y colas ilimitadas. A través de este tutorial, demostramos que la resiliencia en los sistemas distribuidos no proviene de elegir una única arquitectura, sino de combinar el modelo de comunicación correcto con patrones disciplinados de manejo de fallas y un diseño consciente de la capacidad.

Consulta los CÓDIGOS COMPLETOS aquí. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 100.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.

Michal Sutter es un profesional de la ciencia de datos con una Maestría en Ciencias de Datos de la Universidad de Padua. Con una base sólida en análisis estadístico, aprendizaje automático e ingeniería de datos, Michal se destaca en transformar conjuntos de datos complejos en conocimientos prácticos.