Comunicación de múltiples agentes con el A2A Python SDK

Si bajo una roca y trabaja con IA, probablemente haya escuchado sobre el protocolo de Agent2Agent (A2A), “Un estándar abierto diseñado para permitir la comunicación y la colaboración entre Agentes de IA“. Todavía es bastante nuevo, pero ya está recibiendo mucho zumbido. Dado que juega muy bien con MCP (que parece que se está convirtiendo en el estándar de la industria), A2A se está perfilando para ser el Estándar de referencia para la comunicación de múltiples agentes en la industria.

Cuando Google dejó caer por primera vez el Especificación de protocolomi primera reacción fue básicamente: “Está bien, genial … pero ¿qué se supone que debo hacer con esto?” Afortunadamente, esta semana lanzaron el oficial Pitón SDK para el protocolo, así que ahora finalmente habla un idioma que entiendo.

En este artículo vamos a sumergirnos en cómo el protocolo realmente establece la comunicación entre agentes y clientes. Spoiler: todo está de manera orientada a tareas. Para que las cosas sean menos abstractas, construamos un pequeño ejemplo de juguete juntos.

Comunicación entre el agente detector de eventos y un cliente A2A

En nuestros sistemas tenemos un Agente de IA del detector de eventos (responsable de detectar eventos) y un ALERTO DE ALERTA AGENTE DE AI (Responsable de alertar al usuario de los eventos). Como me estoy centrando en el protocolo A2A aquí, ambos agentes se burlan de los simples métodos de Python que devuelven cadenas. Pero en la vida real puede construir a sus agentes con cualquier marco que desee (Langgraph, Google ADK, Crewai, etc.).

Tenemos tres personajes en nuestro sistema, el usuarioel Agente de eventos y el Agente de alerta. Todos se comunican usando Messages. A Message representa un solo giro de comunicación en el protocolo A2A. Envolvemos a los agentes en Servidores A2A. Los servidores exponen un punto final HTTP que implementa el protocolo. Cada servidor A2A tiene Event queues Ese actúa como un búfer entre la ejecución asincrónica del agente y el manejo de respuesta del servidor.

El Cliente A2A inicia la comunicación, y si dos agentes necesitan comunicar un Servidor A2A También puede desempeñar el papel de un Cliente A2A. El siguiente diagrama muestra cómo se comunican un cliente y un servidor dentro del protocolo.

Imagen del autor

El EventQueue víveres Messages, Tasks, TaskStatusUpdateEvent, TaskArtifactUpdateEvent, A2AError y JSONRPCError objetos. El Task Podría ser el objeto más importante para comprender cómo construir sistemas de múltiples agentes con A2A. Según el Documentación de A2A:

  • Cuando un cliente envía un mensaje a un agente, el agente podría determinar que cumplir con la solicitud requiere que se complete una tarea con estado (por ejemplo, “generar un informe”, “reservar un vuelo”, “Responda una pregunta”).
  • Cada tarea tiene una identificación única definida por el agente y progresa a través de un ciclo de vida definido (por ejemplo, submitted, working, input-required, completed, failed).
  • Las tareas son con estado y pueden involucrar múltiples intercambios (mensajes) entre el cliente y el servidor.

Piense en un Task como algo en su sistema de múltiples agentes que tiene un claro y único meta. Tenemos dos tareas en nuestro sistema:

  1. Detectar un evento
  2. Alerta al usuario

Cada agente hace lo suyo (tarea). Construyamos el servidor A2A para el agente de eventos para que las cosas se vuelvan más tangibles.

Construyendo el servidor A2A para el agente de eventos

Primero: el Tarjeta de agente. La tarjeta de agente es el documento JSON utilizado para conocer a otros agentes disponibles. :

  • Identidad del servidor
  • Capacidades
  • Habilidades
  • Punto final de servicio
  • Url
  • Cómo los clientes deben autenticarse e interactuar con el agente

Primero definamos la tarjeta de agente para el agente de IA del detector de eventos (he definido las habilidades basadas en este ejemplo de Google):

agent_card = AgentCard(  
    name='Event Detection Agent',  
    description='Detects relevant events and alerts the user',  
    url='http://localhost:10008/',  
    version='1.0.0',  
    defaultInputModes=['text'],  
    defaultOutputModes=['text'],  
    capabilities=AgentCapabilities(streaming=False),  
    authentication={ "schemes": ["basic"] },  
    skills=[  
        AgentSkill(  
            id='detect_events',  
            name='Detect Events',  
            description='Detects events and alert the user',  
            tags=['event'],  
        ),  
    ],  
)

Puede obtener más información sobre la estructura del objeto de la tarjeta de agente aquí: https://google.github.io/a2a/specification/#55-agentcard-object-structure

El agente en sí será un servidor UVicorn, así que construamos el main() Método para ponerlo en funcionamiento. Toda la solicitud será manejada por el DefaultRequestHandler del A2A-Python SDK. El controlador necesita un TaskStore para almacenar las tareas y un AgentExecutor que tiene la implementación de la lógica central del agente (construiremos el EventAgentExecutor en un minuto).

El último componente del main() El método es el A2AStarletteApplicationque es el Starlette Aplicación que implementa los puntos finales del servidor de protocolo A2A. Necesitamos proporcionar el Agent Card y el DefaultRequestHandler para inicializarlo. Ahora el último paso es ejecutar la aplicación usando Uvicorn. Aquí está el código completo del main() método:

import click  
import uvicorn  
from a2a.types import (  
    AgentCard, AgentCapabilities, AgentSkill
) 
from a2a.server.request_handlers import DefaultRequestHandler  
from a2a.server.tasks import InMemoryTaskStore  
from a2a.server.apps import A2AStarletteApplication 
 
@click.command()  
@click.option('--host', default='localhost')  
@click.option('--port', default=10008)  
def main(host: str, port: int):  
    agent_executor = EventAgentExecutor()

    agent_card = AgentCard(  
        name='Event Detection Agent',  
        description='Detects relevant events and alerts the user',  
        url='http://localhost:10008/',  
        version='1.0.0',  
        defaultInputModes=['text'],  
        defaultOutputModes=['text'],  
        capabilities=AgentCapabilities(streaming=False),  
        authentication={ "schemes": ["basic"] },  
        skills=[              AgentSkill(                  id='detect_events',                  name='Detect Events',                  description='Detects events and alert the user',                  tags=['event'],  
            ),  
        ],  
    )
      
    request_handler = DefaultRequestHandler(  
        agent_executor=agent_executor,  
        task_store=InMemoryTaskStore()  
    ) 
 
    a2a_app = A2AStarletteApplication(  
        agent_card=agent_card,  
        http_handler=request_handler  
    )  

    uvicorn.run(a2a_app.build(), host=host, port=port)

Creación del EventAgentExecutor

Ahora es el momento de construir el núcleo de nuestro agente y finalmente ver cómo usar las tareas para que los agentes interactúen entre sí. El EventAgentExecutor La clase hereda de AgentExecutor interfaz y, por lo tanto, necesitamos implementar el execute() y el cancel() métodos. Ambos toman un RequestContext y un EventQueue objeto como parámetros. El RequestContext contiene información sobre la solicitud actual procesada por el servidor y el EventQueue actúa como un búfer entre la ejecución asincrónica del agente y el manejo de respuesta del servidor.

Nuestro agente simplemente verificará si la cadena “event“Está en el mensaje que el usuario ha enviado (BESO ✨). Si el “event“¿Está ahí, entonces deberíamos llamar al agente de alerta. Lo haremos enviando un Message a este otro agente de alerta. Este es el Configuración directa Estrategia, lo que significa que configuraremos el agente con una URL para obtener la tarjeta de agente del agente de alerta. Para hacer eso, nuestro agente de eventos actuará como un cliente A2A.

Construyamos el ejecutor paso a paso. Primero creemos la tarea principal (la tarea para detectar los eventos). Necesitamos instanciar un TaskUpdater objeto (una clase de ayuda para que los agentes publiquen actualizaciones en la cola de eventos de una tarea), luego envíe la tarea y anuncie que estamos trabajando en ello con el start_work() método:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

El mensaje que el usuario enviará al agente se verá así:

send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'it has an event!'}],  
            'messageId': uuid4().hex,  
        }  
    }

A Part representa una pieza de contenido distinta dentro de un Messagerepresentando contenido exportable como cualquiera TextPart, FileParto DataPart. Usaremos un TextPart Por lo tanto, necesitamos desenvolverlo en el ejecutor:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

Es hora de crear la lógica súper avanzada de nuestro agente. Si el mensaje no tiene la cadena “event“No necesitamos llamar al agente de alerta y la tarea está realizada:

from a2a.server.agent_execution import AgentExecutor

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )

Creación de un cliente A2A para el usuario

Creemos un cliente A2A para que podamos probar el agente tal como está. El cliente usa el get_client_from_agent_card_url() método de A2AClient clase para (adivina qué) obtenga la tarjeta de agente. Luego envolvemos el mensaje en un SendMessageRequest objeto y envíelo al agente usando el send_message() Método del cliente. Aquí está el código completo:

import httpx  
import asyncio  
from a2a.client import A2AClient  
from a2a.types import SendMessageRequest, MessageSendParams  
from uuid import uuid4  
from pprint import pprint
  
async def main():    
    send_message_payload = {  
        'message': {  
            'role': 'user',  
            'parts': [{'type': 'text', 'text': f'nothing happening here'}],  
            'messageId': uuid4().hex,  
        }  
    }  

    async with httpx.AsyncClient() as httpx_client:  
        client = await A2AClient.get_client_from_agent_card_url(  
            httpx_client, 'http://localhost:10008'  
        )  
        request = SendMessageRequest(  
            params=MessageSendParams(**send_message_payload)  
        )  
        response = await client.send_message(request)  
        pprint(response.model_dump(mode='json', exclude_none=True))  
  
if __name__ == "__main__":  
    asyncio.run(main())

Esto es lo que sucede en el terminal que ejecuta el servidor EventAgent:

Imagen del autor

Y este es el mensaje que ve el cliente:

Imagen del autor

Se creó la tarea para detectar el evento y no se detectó ningún evento, ¡agradable! Pero el objetivo de A2A es hacer que los agentes se comuniquen entre sí, así que hagamos que el agente del evento hable con el agente de alerta.

Hacer que el agente del evento hable con el agente de alerta

Para que el agente del evento hable con el agente de alerta, el agente del evento también actuará como cliente:

from a2a.server.agent_execution import AgentExecutor

ALERT_AGENT_URL = "http://localhost:10009/" 

class EventAgentExecutor(AgentExecutor):  
    async def execute(self, context: RequestContext, event_queue: EventQueue):  
        task_updater = TaskUpdater(event_queue, context.task_id, context.context_id)  
        task_updater.submit()  
        task_updater.start_work()

        await asyncio.sleep(1) #let's pretend we're actually doing something

        user_message = context.message.parts[0].root.text # unwraping the TextPart

        if "event" not in user_message:  
            task_updater.update_status(  
                TaskState.completed,  
                message=task_updater.new_agent_message(parts=[TextPart(text=f"No event detected")]),
            )
        else:
            alert_message = task_updater.new_agent_message(parts=[TextPart(text="Event detected!")])

            send_alert_payload = SendMessageRequest(  
                params=MessageSendParams(  
                    message=alert_message  
                )  
            )  

            async with httpx.AsyncClient() as client:  
                alert_agent = A2AClient(httpx_client=client, url=ALERT_AGENT_URL)  
                response = await alert_agent.send_message(send_alert_payload)  

                if hasattr(response.root, "result"):  
                    alert_task = response.root.result  
                    # Polling until the task is done
                    while alert_task.status.state not in (  
                        TaskState.completed, TaskState.failed, TaskState.canceled, TaskState.rejected  
                    ):  
                        await asyncio.sleep(0.5)  
                        get_resp = await alert_agent.get_task(  
                            GetTaskRequest(params=TaskQueryParams(id=alert_task.id))  
                        )  
                        if isinstance(get_resp.root, GetTaskSuccessResponse):  
                            alert_task = get_resp.root.result  
                        else:  
                            break  
  
                    # Complete the original task  
                    if alert_task.status.state == TaskState.completed:  
                        task_updater.update_status(  
                            TaskState.completed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text="Event detected and alert sent!")]),  
                        )  
                    else:  
                        task_updater.update_status(  
                            TaskState.failed,  
                            message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to send alert: {alert_task.status.state}")]),  
                        )  
                else:  
                    task_updater.update_status(  
                        TaskState.failed,  
                        message=task_updater.new_agent_message(parts=[TextPart(text=f"Failed to create alert task")]),  
                    )

Llamamos al agente de alerta tal como llamamos al agente de eventos como el usuario, y cuando se realiza la tarea del agente de alerta, completamos la tarea del agente de eventos original. Vamos a llamar al agente del evento nuevamente, pero esta vez con un evento:

Imagen del autor

La belleza aquí es que simplemente llamamos al agente de alerta y no necesitamos saber nada sobre cómo alerta al usuario. Simplemente le enviamos un mensaje y esperamos a que termine.

El agente de alerta es muy similar al agente del evento. Puede consultar el código completo aquí: https://github.com/dmesquita/multi-agent-communication-a2a-python

Pensamientos finales

Comprender cómo construir sistemas de múltiples agentes con A2A podría ser desalentador al principio, pero al final solo Enviar mensajes para que los agentes hagan lo suyo. Todo lo que necesita hacer para integrar a sus agentes con A2A es crear una clase con la lógica del agente que herede del AgentExecutor y ejecute el agente como servidor.

Espero que este artículo te haya ayudado en tu viaje A2A, ¡gracias por leer!

Referencias

[1] Padgham, Lin y Michael Winikoff. Desarrollo de sistemas de agentes inteligentes: una guía práctica. John Wiley & Sons, 2005.

[2] https://github.com/google/a2a-python

[3] https://github.com/google/a2a-python/tree/main/examples/google_adk

[4] https://developers.googleblog.com/en/agents-adk-agent-ingine-a2a-enhancements-google-io/