Cómo crear un protocolo estilo MCP sin estado, seguro y asíncrono para flujos de trabajo de agentes escalables

En este tutorial, creamos una demostración limpia y avanzada del diseño MCP moderno centrándonos en tres ideas centrales: comunicación sin estado, validación estricta a nivel de SDK y operaciones asincrónicas de larga duración. Implementamos un protocolo mínimo similar a MCP utilizando sobres estructurados, solicitudes firmadas y herramientas validadas por Pydantic para mostrar cómo los agentes y servicios pueden interactuar de forma segura sin depender de sesiones persistentes. Consulta los CÓDIGOS COMPLETOS aquí.

importar asyncio, time, json, uuid, hmac, hashlib desde clases de datos importar clase de datos desde escribir importar Cualquiera, Dict, Opcional, Literal, Lista desde pydantic importar BaseModel, Campo, ValidationError, ConfigDict def _now_ms(): return int(time.time() * 1000) def _uuid(): return str(uuid.uuid4()) def _canonical_json(obj): return json.dumps(obj, separators=(“,”, “:”), sort_keys=True).encode() def _hmac_hex(secreto, carga útil): return hmac.new(secret, _canonical_json(carga útil), hashlib.sha256).hexdigest()

Configuramos las utilidades principales necesarias en todo el sistema, incluidos asistentes de tiempo, generación de UUID, serialización JSON canónica y firma criptográfica. Nos aseguramos de que todas las solicitudes y respuestas puedan firmarse y verificarse de manera determinista mediante HMAC. Consulta los CÓDIGOS COMPLETOS aquí.

clase MCPEnvelope(BaseModel): model_config = ConfigDict(extra=”prohibir”) v: Literal[“mcp/0.1”] = “mcp/0.1″ request_id: str = Campo(default_factory=_uuid) ts_ms: int = Campo(default_factory=_now_ms) client_id: str server_id: str herramienta: str args: Dict[str, Any] = Campo(default_factory=dict) nonce: str = Campo(default_factory=_uuid) firma: str clase MCPResponse(BaseModel): model_config = ConfigDict(extra=”prohibir”) v: Literal[“mcp/0.1”] = “mcp/0.1” request_id: str ts_ms: int = Field(default_factory=_now_ms) ok: bool server_id: str estado: Literal[“ok”, “accepted”, “running”, “done”, “error”]
resultado: Opcional[Dict[str, Any]]= Ninguno error: Opcional[str] = Ninguna firma: str

Definimos el sobre estructurado de MCP y los formatos de respuesta que sigue cada interacción. Aplicamos esquemas estrictos utilizando Pydantic para garantizar que los campos con formato incorrecto o inesperados se rechacen anticipadamente. Garantiza contratos coherentes entre clientes y servidores, lo cual es fundamental para la estandarización del SDK. Consulta los CÓDIGOS COMPLETOS aquí.

clase ServerIdentityOut(BaseModel): model_config = ConfigDict(extra=”forbid”) server_id: str huella digital: str capacidades: Dict[str, Any]

clase BatchSumIn(BaseModel): model_config = ConfigDict(extra=”prohibir”) números: Lista[float] = Campo(min_length=1) clase BatchSumOut(BaseModel): model_config = ConfigDict(extra=”prohibir”) recuento: int total: clase flotante StartLongTaskIn(BaseModel): model_config = ConfigDict(extra=”prohibir”) segundos: int = Campo(ge=1, le=20) carga útil: Dict[str, Any] = Campo(default_factory=dict) clase PollJobIn(BaseModel): model_config = ConfigDict(extra=”prohibir”) job_id: str

Declaramos los modelos de entrada y salida validados para cada herramienta expuesta por el servidor. Usamos restricciones de Pydantic para expresar claramente lo que cada herramienta acepta y devuelve. Hace que el comportamiento de la herramienta sea predecible y seguro, incluso cuando la invocan agentes basados ​​en LLM. Consulta los CÓDIGOS COMPLETOS aquí.

@dataclass clase JobState: job_id: str estado: str resultado: Opcional[Dict[str, Any]]= Ninguno error: Opcional[str] = Ninguna clase MCPServer: def __init__(self, server_id, secret): self.server_id = server_id self.secret = secret self.jobs = {} self.tasks = {} def _fingerprint(self): return hashlib.sha256(self.secret).hexdigest()[:16]

async def handle(self, env_dict, client_secret): env = MCPEnvelope(**env_dict) carga útil = env.model_dump() sig = payload.pop(“signature”) if _hmac_hex(client_secret, payload) != sig: return {“error”: “firma incorrecta”} if env.tool == “server_identity”: out = ServerIdentityOut( server_id=self.server_id, huella digital=self._fingerprint(), capacidades={“async”: True, “stateless”: True}, ) resp = MCPResponse( request_id=env.request_id, ok=True, server_id=self.server_id, status=”ok”, resultado=out.model_dump(), firma=””, ) elif env.tool == “batch_sum”: args = BatchSumIn(**env.args) out = BatchSumOut(count=len(args.numbers), total=sum(args.numbers)) resp = MCPResponse( request_id=env.request_id, ok=True, server_id=self.server_id, status=”ok”, resultado=out.model_dump(), firma=””, ) elif env.tool == “start_long_task”: args = StartLongTaskIn(**env.args) jid = _uuid() self.jobs[jid] = JobState(jid, “ejecutando”) async def run(): espera asyncio.sleep(args.segundos) self.jobs[jid].status = “hecho” self.jobs[jid].resultado = args.payload self.tareas[jid] = asyncio.create_task(run()) resp = MCPResponse( request_id=env.request_id, ok=True, server_id=self.server_id, status=”aceptado”, resultado={“job_id”: jid}, firma=””, ) elif env.tool == “poll_job”: args = PollJobIn(**env.args) trabajo = self.jobs[args.job_id]
resp = MCPResponse( request_id=env.request_id, ok=True, server_id=self.server_id, status=job.status, result=job.result, firma=””, ) carga útil = resp.model_dump() resp.signature = _hmac_hex(self.secret, carga útil) return resp.model_dump()

Implementamos el servidor MCP sin estado junto con su lógica de administración de tareas asíncronas. Manejamos la verificación de solicitudes, el envío de herramientas y la ejecución de trabajos de larga duración sin depender del estado de la sesión. Al devolver identificadores de trabajo y permitir el sondeo, demostramos una ejecución de tareas escalable y sin bloqueo. Consulta los CÓDIGOS COMPLETOS aquí.

clase MCPClient: def __init__(self, client_id, secret, server): self.client_id = client_id self.secret = secret self.server = servidor asíncrono def call(self, herramienta, args=Ninguno): env = MCPEnvelope( client_id=self.client_id, server_id=self.server.server_id, herramienta=herramienta, args=args o {}, firma=””, ).entorno model_dump()[“signature”] = _hmac_hex(self.secret, {k: v for k, v in env.items() if k != “signature”}) return await self.server.handle(env, self.secret) async def demo(): server_secret = b”server_secret” client_secret = b”client_secret” server = MCPServer(“mcp-server-001”, server_secret) client = MCPClient(“cliente-001”, client_secret, server) print(await client.call(“server_identity”)) print(await client.call(“batch_sum”, {“números”: [1, 2, 3]})) inicio = await client.call(“start_long_task”, {“segundos”: 2, “carga útil”: {“tarea”: ​​”demo”}}) jid = inicio[“result”][“job_id”]

mientras que Verdadero: encuesta = await client.call(“poll_job”, {“job_id”: jid}) si la encuesta[“status”] == “hecho”: imprimir (encuesta) pausa en espera asyncio.sleep (0.5) en espera de demostración ()

Construimos un cliente ligero sin estado que firma cada solicitud e interactúa con el servidor a través de sobres estructurados. Demostramos llamadas sincrónicas, fallas de validación de entradas y sondeo de tareas asincrónicas en un solo flujo. Muestra cómo los clientes pueden consumir de manera confiable servicios estilo MCP en canales de agentes reales.

En conclusión, mostramos cómo MCP evoluciona desde una simple interfaz de llamada de herramientas a un protocolo robusto adecuado para sistemas del mundo real. Iniciamos tareas de forma asincrónica y sondeamos resultados sin bloquear la ejecución, aplicamos contratos claros mediante la validación del esquema y confiamos en mensajes firmados y sin estado para preservar la seguridad y la flexibilidad. En conjunto, estos patrones demuestran cómo los sistemas modernos de estilo MCP admiten flujos de trabajo de agentes confiables y listos para la empresa, sin dejar de ser simples, transparentes y fáciles de ampliar.

Consulta los CÓDIGOS COMPLETOS aquí. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 100.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.

Asif Razzaq es el director ejecutivo de Marktechpost Media Inc.. Como empresario e ingeniero visionario, Asif está comprometido a aprovechar el potencial de la inteligencia artificial para el bien social. Su esfuerzo más reciente es el lanzamiento de una plataforma de medios de inteligencia artificial, Marktechpost, que se destaca por su cobertura en profundidad del aprendizaje automático y las noticias sobre aprendizaje profundo que es técnicamente sólida y fácilmente comprensible para una amplia audiencia. La plataforma cuenta con más de 2 millones de visitas mensuales, lo que ilustra su popularidad entre el público.