Cómo diseñar un sistema de comunicación multiagente de nivel de producción utilizando el bus de mensajes estructurados LangGraph, el registro ACP y la arquitectura de estado compartido persistente

En este tutorial, construimos un sistema avanzado de comunicación multiagente utilizando una arquitectura de bus de mensajes estructurados impulsada por LangGraph y Pydantic. Definimos un estricto esquema de mensajes estilo ACP que permite a los agentes comunicarse a través de un estado compartido en lugar de llamarse entre sí directamente, lo que permite modularidad, trazabilidad y orquestación de nivel de producción. Implementamos tres agentes especializados, un Planificador, un Ejecutor y un Validador, que se coordinan a través de mensajes estructurados, estado persistente y lógica de enrutamiento. También integramos la persistencia basada en SQLite para proporcionar memoria duradera entre ejecuciones y visualizar el flujo de comunicación del agente para comprender cómo se propagan los mensajes a través del sistema.

!pip -q install -U “pydantic==2.12.3” !pip -q install -U langgraph langchain-core networkx matplotlib !pip -q install -U langgraph-checkpoint-sqlite importar os importar json importar uuid importar sqlite3 desde fecha y hora importar fecha y hora, zona horaria desde escribir importar Cualquiera, Dict, Lista, Literal, Opcional, Tupla de pydantic importar BaseModel, Importar campo networkx como nx importar matplotlib.pyplot como plt desde langgraph.graph importar StateGraph, END desde langgraph.checkpoint.sqlite importar SqliteSaver Role = Literal[“planner”, “executor”, “validator”, “user”, “system”]
Tipo de mensaje = Literal[“task”, “plan”, “result”, “validation”, “error”, “control”]

clase ACPMessage(BaseModel): msg_id: str = Field(default_factory=lambda: str(uuid.uuid4())) ts: str = Field(default_factory=lambda: datetime.now(timezone.utc).isoformat().replace(“+00:00”, “Z”)) remitente: Rol receptor: Rol msg_type: MsgType contenido: str meta: Dict[str, Any] = Seguimiento de campo (default_factory = dict): Dict[str, Any] = Campo(default_factory=dict) def acp_log_path() -> str: os.makedirs(“acp_logs”, exist_ok=True) return os.path.join(“acp_logs”, “acp_messages.jsonl”) def append_acp_log(m: ACPMessage) -> Ninguno: con open(acp_log_path(), “a”, codificación=”utf-8″) como f: f.write(m.model_dump_json() + “\n”)

Instalamos e importamos todas las bibliotecas necesarias para construir un sistema estructurado de comunicación multiagente. Definimos el esquema de mensajes estilo ACP utilizando Pydantic, lo que nos permite aplicar un formato estricto y estructurado para la comunicación de los agentes. También implementamos registros estructurados para conservar cada mensaje intercambiado entre agentes, lo que permite la trazabilidad y observabilidad del sistema.

clase BusState(BaseModel): objetivo: str = “” hecho: bool = False errores: Lista[str] = Campo(default_factory=lista) buzón: Lista[ACPMessage] = Campo(default_factory=lista) bordes: Lista[Tuple[str, str, str]]= Campo(default_factory=list) active_role: Rol = “usuario” paso: int = 0 def bus_update( estado: BusState, remitente: Rol, receptor: Rol, msg_type: MsgType, contenido: str, meta: Opcional[Dict[str, Any]]= Ninguno, seguimiento: Opcional[Dict[str, Any]]= Ninguno, ) -> Dict[str, Any]: m = ACPMessage( remitente=remitente, receptor=receptor, msg_type=msg_type, content=content, meta=meta o {}, trace=trace o {}, ) append_acp_log(m) return { “meta”: estado.meta, “hecho”: estado.hecho, “errores”: estado.errors, “buzón”: estado.buzón + [m]”bordes”: estado.bordes + [(sender, receiver, msg_type)]”active_role”: receptor, “paso”: estado.paso + 1, }

Definimos la estructura de estado compartido que actúa como bus de mensajes centralizado para todos los agentes. Implementamos la clase BusState para almacenar el objetivo, el buzón, la información de enrutamiento y el progreso de la ejecución. También creamos la función bus_update, que nos permite generar mensajes estructurados, actualizar el estado compartido y conservar registros de mensajes de manera consistente.

def agente_planificador(state_dict: Dict[str, Any]) -> Dictar[str, Any]: estado = BusState.model_validate(state_dict) objetivo = state.goal.strip() si no es objetivo: devolver bus_update(state, “planner”, “validator”, “error”, “No se proporcionó objetivo.”, meta={“reason”: “empty_goal”}) plan = [
“Interpret the goal and extract requirements.”,
“Decide an execution strategy with clear outputs.”,
“Ask Executor to produce the result.”,
“Ask Validator to check correctness + completeness.”,
]
plan_text = “\n”.join([f”{i+1}. {p}” for i, p in enumerate(plan)]) return bus_update( estado, “planificador”, “ejecutor”, “plan”, plan_text, meta={“meta”: meta, “plan_steps”: len(plan)}, trace={“policy”: “deterministic_planner_v1”}, ) def executor_agent(state_dict: Dict[str, Any]) -> Dictar[str, Any]: estado = BusState.model_validate(state_dict) objetivo = estado.goal.strip() último_plan = Ninguno para m en reverso(estado.mailbox): if m.receiver == “ejecutor” y m.msg_type == “plan”: último_plan = m.content break resultado = { “objetivo”: objetivo, “supuestos”: [
“We can produce a concise, actionable output.”,
“We can validate via rule-based checks.”,
]”salida”: f”Tarea ejecutada para el objetivo: {objetivo}”, “entregables”: [
“A clear summary”,
“A step-by-step action list”,
“Any constraints and edge cases”,
]”plan_seen”: bool(latest_plan), } result_text = json.dumps(resultado, sangría=2) return bus_update( estado, “ejecutor”, “validador”, “resultado”, result_text, meta={“artifact_type”: “json”, “bytes”: len(result_text.encode(“utf-8”))}, trace={“policy”: “deterministic_executor_v1”}, )

Implementamos los agentes Planificador y Ejecutor, que se encargan de la planificación y ejecución de tareas. Diseñamos el agente Planner para interpretar el objetivo y generar un plan de ejecución estructurado, que luego pasa a través del bus de mensajes. Implementamos el agente Ejecutor para leer el plan, ejecutarlo y producir un artefacto de resultado estructurado que los agentes posteriores puedan validar.

def validator_agent(state_dict: Dict[str, Any]) -> Dictar[str, Any]: estado = BusState.model_validate(state_dict) objetivo = estado.goal.strip() último_resultado = Ninguno para m en reverso(estado.mailbox): if m.receiver == “validador” y m.msg_type in (“resultado”, “error”): último_resultado = m break si último_resultado es Ninguno: upd = bus_update(estado, “validador”, “planificador”, “error”, “Sin resultado para validar.”, meta={“razón”: “missing_result”}) actualizar[“done”] = Actualización verdadera[“errors”] = estado.errores + [“missing_result”]
return upd if last_result.msg_type == “error”: upd = bus_update( state, “validator”, “planner”, “validation”, f”La validación falló porque ocurrió un error ascendente: {latest_result.content}”, meta={“status”: “fail”}, ) upd[“done”] = Actualización verdadera[“errors”] = estado.errores + [latest_result.content]
devolver intento de actualización: parsed = json.loads(latest_result.content) excepto Excepción como e: upd = bus_update( state, “validator”, “planner”, “validation”, f”El resultado no es válido JSON: {e}”, meta={“status”: “fail”}, ) upd[“done”] = Actualización verdadera[“errors”] = estado.errores + [f”invalid_json: {e}”]
devolver problemas actualizados = []
if parsed.get(“meta”)! = objetivo: issues.append(“Result.goal no coincide con el objetivo de entrada.”) si los “entregables” no están analizados o no son instancia (analizado[“deliverables”]lista) o len(analizado[“deliverables”]) == 0: issues.append(“Lista de entregables faltantes o vacías.”) if issues: upd = bus_update( state, “validator”, “planner”, “validation”, “Validación fallida:\n- ” + “\n- “.join(issues), meta={“status”: “fail”, “issues”: issues}, ) upd[“done”] = Actualización verdadera[“errors”] = state.errors + problemas devuelven upd upd = bus_update( estado, “validador”, “usuario”, “validación”, “Validación aprobada ✅ El resultado parece consistente y completo.”, meta={“status”: “pass”}, ) upd[“done”] = Actualización verdadera[“errors”] = state.errors return upd def route_next(state_dict: Dict[str, Any]) -> str: if state_dict.get(“hecho”, False): devuelve END rol = state_dict.get(“active_role”, “usuario”) si rol == “planificador”: devuelve “planificador” si rol == “ejecutor”: devuelve “ejecutor” si rol == “validador”: devuelve “validador” devuelve END

Implementamos el agente Validador y la lógica de enrutamiento que controla el flujo de ejecución del agente. Diseñamos el Validador para inspeccionar los resultados de la ejecución, verificar la corrección y generar resultados de validación a través de controles estructurados. También implementamos la función de enrutamiento que determina dinámicamente qué agente debe ejecutarse a continuación, lo que permite una orquestación coordinada de múltiples agentes.

gráfico = StateGraph(dict) Graph.add_node(“planner”, planificador_agent) Graph.add_node(“ejecutor”, executor_agent) Graph.add_node(“validator”, validator_agent) Graph.set_entry_point(“planner”) Graph.add_conditional_edges(“planner”, route_next, {“planner”: “planner”, “executor”: “ejecutor”, “validator”: “validator”, END: END}) graph.add_conditional_edges(“executor”, route_next, {“planner”: “planner”, “executor”: “executor”, “validator”: “validator”, END: END}) graph.add_conditional_edges(“validator”, route_next, {“planner”: “planner”, “executor”: “ejecutor”, “validator”: “validator”, END: END}) os.makedirs(“checkpoints”, exist_ok=True) db_path = “checkpoints/langgraph_bus.sqlite” conn = sqlite3.connect(db_path, check_same_thread=False) checkpointer = SqliteSaver(conn) app = graph.compile(checkpointer=checkpointer) def run_thread(goal: str, thread_id: str) -> BusState: init = BusState(meta=meta, active_role=”planner”, done=False).model_dump() final_state_dict = app.invoke(init, config={“configurable”: {“thread_id”: thread_id}}) return BusState.model_validate(final_state_dict) thread_id = “demo-thread-001” meta = “Diseñe un bus de mensajes estilo ACP donde el planificador/ejecutor/validador se coordine a través del estado compartido”. final_state = run_thread(meta, thread_id) print(“Listo:”, final_state.done) print(“Pasos:”, final_state.step) print(“Errores:”, final_state.errors) print(“\nÚltimos 5 mensajes:”) para m en final_state.mailbox[-5:]: imprimir(f”- [{m.msg_type}] {m.remitente} -> {m.receptor}: {m.contenido[:80]}”) instantánea = checkpointer.get_tuple({“configurable”: {“thread_id”: thread_id}}) cp = snapshot.checkpoint o {} cv = cp.get(“channel_values”, {}) o {} sv = cp.get(“state”, {}) o {} vals = cv if isinstance(cv, dict) y len(cv) else sv if isinstance(sv, dict) else {} print(“\nClaves de punto de control:”, list(cp.keys())) if isinstance(cv, dict): print(“claves de valores_canal:”, list(cv.keys())[:30]) si esinstancia(sv, dict): print(“claves de estado:”, lista(sv.keys())[:30]) print(“\nPaso persistente (mejor esfuerzo):”, vals.get(“paso”, “NOT_FOUND”)) print(“Active_role persistente (mejor esfuerzo):”, vals.get(“active_role”, “NOT_FOUND”)) print(“\nRegistros ACP:”, acp_log_path()) print(“Checkpoint DB:”, db_path) G = nx.DiGraph() G.add_edge(“planificador”, “ejecutor”) G.add_edge(“ejecutor”, “validador”) G.add_edge(“validador”, “usuario”) plt.figure(figsize=(6, 4)) pos = nx.spring_layout(G, semilla=7) nx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, flechas=True) plt.title(“Gráfico de orquestación: Planificador → Ejecutor → Validador”) plt.show() comm = nx.MultiDiGraph() para (s, r, t) en final_state.edges: comm.add_edge(s, r, label=t) plt.figure(figsize=(8, 5)) pos2 = nx.spring_layout(comm, seed=11) nx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, flechas=True) plt.title(“Gráfico de comunicación desde el bus de mensajes estructurado (bordes en tiempo de ejecución)”) plt.show() def tail_jsonl(ruta: str, n: int = 8) -> Lista[Dict[str, Any]]: si no os.path.exists(ruta): regresar []
con open(ruta, “r”, codificación=”utf-8″) como f: líneas = f.readlines()[-n:]
devolver [json.loads(x) for x in lines]

print(“\nÚltimas entradas del registro ACP:”) para la fila en tail_jsonl(acp_log_path(), 6): print(f”{fila[‘msg_type’]:>10} | {fila[‘sender’]} -> {fila[‘receiver’]} | {fila[‘ts’]}”)

Construimos el gráfico de estado de LangGraph, habilitamos la persistencia basada en SQLite y ejecutamos el flujo de trabajo de múltiples agentes. Usamos un identificador de subproceso para garantizar que el estado del agente se pueda guardar y recuperar de manera confiable en todas las ejecuciones. También visualizamos los gráficos de orquestación y comunicación e inspeccionamos los registros persistentes, lo que nos permite comprender cómo interactúan los agentes a través del bus de mensajes estructurados.

En este tutorial, diseñamos e implementamos con éxito un marco de comunicación estructurado de múltiples agentes utilizando la arquitectura de estado compartido de LangGraph y los principios del bus de mensajes estilo ACP. Permitimos que los agentes operaran de forma independiente mientras se comunicaban a través de mensajes estructurados y persistentes, lo que mejora la confiabilidad, la observabilidad y la escalabilidad. Registramos cada interacción, mantuvimos el estado del agente en las ejecuciones y visualizamos patrones de comunicación para obtener una visión profunda de la coordinación de los agentes. Esta arquitectura nos permite construir sistemas multiagente robustos, modulares y listos para producción que se pueden ampliar con agentes adicionales, razonamiento LLM, sistemas de memoria y estrategias de enrutamiento complejas.

Consulte los códigos completos aquí. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 120.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.