Una guía de implementación para diseñar flujos de trabajo paralelos inteligentes en PARSL para la ejecución de agentes de IA múltiples

En este tutorial, implementamos una tubería de agente de IA utilizando Parslaprovechando sus capacidades de ejecución paralela para ejecutar múltiples tareas computacionales como aplicaciones de Python independientes. Configuramos un TreadPoolExecutor local para la concurrencia, definimos herramientas especializadas como el cálculo de Fibonacci, el conteo principal, la extracción de palabras clave y las llamadas de API simuladas, y coordinamos a través de un planificador liviano que mapea un objetivo de usuario para las invocaciones de tareas. Las salidas de todas las tareas se agregan y pasan a través de un modelo de generación de texto de la cara abrazada para producir un resumen coherente y legible por el humano. Mira el Códigos completos aquí.

!pip install -q parsl transformers accelerate


import math, json, time, random
from typing import List, Dict, Any
import parsl
from parsl.config import Config
from parsl.executors import ThreadPoolExecutor
from parsl import python_app


parsl.load(Config(executors=[ThreadPoolExecutor(label="local", max_threads=8)]))

Comenzamos instalando las bibliotecas requeridas e importando todos los módulos necesarios para nuestro flujo de trabajo. Luego configuramos PARSL con un ThreadPoolExecutor local para ejecutar tareas simultáneamente y cargamos esta configuración para que podamos ejecutar nuestras aplicaciones de Python en paralelo. Mira el Códigos completos aquí.

@python_app
def calc_fibonacci(n: int) -> Dict[str, Any]:
   def fib(k):
       a, b = 0, 1
       for _ in range(k): a, b = b, a + b
       return a
   t0 = time.time(); val = fib(n); dt = time.time() - t0
   return {"task": "fibonacci", "n": n, "value": val, "secs": round(dt, 4)}


@python_app
def extract_keywords(text: str, k: int = 8) -> Dict[str, Any]:
   import re, collections
   words = [w.lower() for w in re.findall(r"[a-zA-Z][a-zA-Z0-9\-]+", text)]
   stop = set("the a an and or to of is are was were be been in on for with as by from at this that it its if then else not no".split())
   cand = [w for w in words if w not in stop and len(w) > 3]
   freq = collections.Counter(cand)
   scored = sorted(freq.items(), key=lambda x: (x[1], len(x[0])), reverse=True)[:k]
   return {"task":"keywords","keywords":[w for w,_ in scored]}


@python_app
def simulate_tool(name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
   time.sleep(0.3 + random.random()*0.5)
   return {"task": name, "payload": payload, "status": "ok", "timestamp": time.time()}

Definimos cuatro funciones parsl @python_app que se ejecutan asincrónicamente como parte del flujo de trabajo de nuestro agente. Creamos una calculadora Fibonacci, una rutina de conteo primo, un extractor de palabras clave para el procesamiento de texto y una herramienta simulada que imita las llamadas de API externos con retrasos aleatorios. Estas aplicaciones modulares nos permiten realizar diversos cálculos en paralelo, formando los bloques de construcción para nuestro agente de IA múltiple. Mira el Códigos completos aquí.

def tiny_llm_summary(bullets: List[str]) -> str:
   from transformers import pipeline
   gen = pipeline("text-generation", model="sshleifer/tiny-gpt2")
   prompt = "Summarize these agent results clearly:\n- " + "\n- ".join(bullets) + "\nConclusion:"
   out = gen(prompt, max_length=160, do_sample=False)[0]["generated_text"]
   return out.split("Conclusion:", 1)[-1].strip()

Implementamos una función Tiny_LLM_SUMMARY que utiliza la tubería de Hugging Face con el modelo liviano Sshleifer/Tiny-GPT2 para generar resúmenes concisos de los resultados de nuestro agente. Formatea las salidas de tareas recolectadas como puntos de bala, agrega una “conclusión”: “y extrae solo la conclusión final generada para un resumen limpio y legible por humanos. Mira el Códigos completos aquí.

def plan(user_goal: str) -> List[Dict[str, Any]]:
   intents = []
   if "fibonacci" in user_goal.lower():
       intents.append({"tool":"calc_fibonacci", "args":{"n":35}})
   if "primes" in user_goal.lower():
       intents.append({"tool":"count_primes", "args":{"limit":100_000}})
   intents += [
       {"tool":"simulate_tool", "args":{"name":"vector_db_search","payload":{"q":user_goal}}},
       {"tool":"simulate_tool", "args":{"name":"metrics_fetch","payload":{"kpi":"latency_ms"}}},
       {"tool":"extract_keywords", "args":{"text":user_goal}}
   ]
   return intents

Definimos la función del plan para asignar el objetivo de un usuario en una lista estructurada de invocaciones de herramientas. Verifica el texto de la meta para palabras clave como “fibonacci” o “primos” para activar tareas computacionales específicas, luego agrega acciones predeterminadas como consultas API simuladas, recuperación de métricas y extracción de palabras clave, formando el plan de ejecución para nuestro agente AI. Mira el Códigos completos aquí.

def run_agent(user_goal: str) -> Dict[str, Any]:
   tasks = plan(user_goal)
   futures = []
   for t in tasks:
       if t["tool"]=="calc_fibonacci": futures.append(calc_fibonacci(**t["args"]))
       elif t["tool"]=="count_primes": futures.append(count_primes(**t["args"]))
       elif t["tool"]=="extract_keywords": futures.append(extract_keywords(**t["args"]))
       elif t["tool"]=="simulate_tool": futures.append(simulate_tool(**t["args"]))
   raw = [f.result() for f in futures]


   bullets = []
   for r in raw:
       if r["task"]=="fibonacci":
           bullets.append(f"Fibonacci({r['n']}) = {r['value']} computed in {r['secs']}s.")
       elif r["task"]=="count_primes":
           bullets.append(f"{r['count']} primes found ≤ {r['limit']}.")
       elif r["task"]=="keywords":
           bullets.append("Top keywords: " + ", ".join(r["keywords"]))
       else:
           bullets.append(f"Tool {r['task']} responded with status={r['status']}.")


   narrative = tiny_llm_summary(bullets)
   return {"goal": user_goal, "bullets": bullets, "summary": narrative, "raw": raw}

En la función run_agent, ejecutamos el flujo de trabajo del agente completo generando primero un plan de tareas desde el objetivo del usuario, luego enviando cada herramienta como una aplicación PARSL para ejecutarse en paralelo. Una vez que todos los futuros están completos, convertimos sus resultados en puntos de bala claros y los alimentamos a nuestra función Tiny_LLM_SUMMARY para crear una narrativa concisa. La función devuelve un diccionario estructurado que contiene el objetivo original, los puntos detallados de la bala, el resumen generado por LLM y las salidas de herramientas sin procesar. Mira el Códigos completos aquí.

if __name__ == "__main__":
   goal = ("Analyze fibonacci(35) performance, count primes under 100k, "
           "and prepare a concise executive summary highlighting insights for planning.")
   result = run_agent(goal)
   print("\n=== Agent Bullets ===")
   for b in result["bullets"]: print("•", b)
   print("\n=== LLM Summary ===\n", result["summary"])
   print("\n=== Raw JSON ===\n", json.dumps(result["raw"], indent=2)[:800], "...")

En el bloque de ejecución principal, definimos un objetivo de muestra que combina el cálculo numérico, el conteo principal y la generación de resumen. Ejecutamos el agente en este objetivo, imprimimos los puntos de bala generados, mostramos el resumen artesanal de LLM y previsimos la salida de JSON sin procesar para verificar los resultados estructurados y legibles por humanos.

En conclusión, esta implementación demuestra cómo el modelo de aplicaciones asíncronos de Parsl puede orquestar eficientemente diversas cargas de trabajo en paralelo, lo que permite a un agente de IA combinar análisis numéricos, procesamiento de texto y servicios externos simulados en una tubería unificada. Al integrar un pequeño LLM en la etapa final, transformamos los resultados estructurados en lenguaje natural, ilustrando cómo se pueden combinar los modelos paralelos de computación y IA para crear agentes receptivos y extensibles adecuados para tareas en tiempo real o a gran escala.


Mira el Códigos completos aquí. No dude en ver nuestro Página de Github para tutoriales, códigos y cuadernos. Además, siéntete libre de seguirnos Gorjeo Y no olvides unirte a nuestro Subreddit de 100k+ ml y suscribirse a Nuestro boletín.


Asif Razzaq es el CEO de MarktechPost Media Inc .. Como empresario e ingeniero visionario, ASIF se compromete a aprovechar el potencial de la inteligencia artificial para el bien social. Su esfuerzo más reciente es el lanzamiento de una plataforma de medios de inteligencia artificial, MarktechPost, que se destaca por su cobertura profunda de noticias de aprendizaje automático y de aprendizaje profundo que es técnicamente sólido y fácilmente comprensible por una audiencia amplia. La plataforma cuenta con más de 2 millones de vistas mensuales, ilustrando su popularidad entre el público.