¿Cómo diseñar un sistema autónomo de estrategia de infraestructura y datos de múltiples agentes utilizando modelos Qwen livianos para una inteligencia de canalización eficiente?

En este tutorial, construimos un sistema de estrategia de infraestructura y datos agentes utilizando el modelo liviano Qwen2.5-0.5B-Instruct para una ejecución eficiente. Comenzamos creando un marco de agentes LLM flexible y luego desarrollamos agentes especializados que manejan diferentes capas de gestión de datos, desde la ingesta y el análisis de calidad hasta la optimización de la infraestructura. Integramos a estos agentes en un orquestador que coordina sus interacciones, garantizando una colaboración fluida entre múltiples agentes en todo el proceso de datos. A través de ejemplos prácticos como el comercio electrónico y los canales de IoT, exploramos cómo la toma de decisiones autónoma puede optimizar operaciones de datos complejas. Consulta los CÓDIGOS COMPLETOS aquí.

!pip install -q transformadores antorcha acelerar conjuntos de datos huggingface_hub importar antorcha desde transformadores importar AutoModelForCausalLM, AutoTokenizer importar json, tiempo desde escribir importar Lista, Dict, Cualquiera de clases de datos importar clase de datos desde fecha y hora importar fecha y hora importar pandas como clase pd LightweightLLMAgent: def __init__(self, role: str, model_name: str = “Qwen/Qwen2.5-0.5B-Instruct”): self.role = rol self.model_name = nombre_modelo self.device = “cuda” if torch.cuda.is_available() else “cpu” print(f”Cargando {nombre_modelo} para {rol} agente en {self.dispositivo}…”) self.tokenizer = AutoTokenizer.from_pretrained(nombre_modelo) self.model = AutoModelForCausalLM.from_pretrained( nombre_modelo, torch_dtype=torch.float16 if self.device == “cuda” else torch.float32, device_map=”auto” ) self.conversation_history = []

def generar_respuesta(self, mensaje: str, max_tokens: int = 150) -> str: mensajes = [
{“role”: “system”, “content”: f”You are a {self.role} agent in a data infrastructure system.”},
{“role”: “user”, “content”: prompt}
]
texto = self.tokenizer.apply_chat_template(mensajes, tokenize=False, add_generación_prompt=True) model_inputs = self.tokenizer([text]return_tensors=”pt”).to(self.device) con torch.no_grad(): generate_ids = self.model.generate( model_inputs.input_ids, max_new_tokens=max_tokens, temperatura=0.7, do_sample=True, top_p=0.95 ) generate_ids = [output_ids[len(input_ids):] para input_ids, output_ids en zip(model_inputs.input_ids, generate_ids)]respuesta = self.tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
self.conversation_history.append({“prompt”: mensaje, “respuesta”: respuesta}) devolver respuesta

Comenzamos configurando la infraestructura liviana del agente LLM utilizando el modelo Qwen2.5-0.5B-Instruct. Cargamos el modelo y el tokenizador, y definimos una clase de agente base capaz de manejar conversaciones contextuales y generar respuestas inteligentes. Esto constituye la base fundamental sobre la cual nuestros agentes especializados operan de manera eficiente dentro de Colab. Consulta los CÓDIGOS COMPLETOS aquí.

class DataIngestionAgent(LightweightLLMAgent): def __init__(self): super().__init__(role=”Especialista en ingesta de datos”) def analyse_data_source(self, source_info: Dict) -> Dict: Prompt = f”””Analice esta fuente de datos y proporcione una estrategia de ingesta: Tipo de fuente: {source_info.get(‘type’, ‘unknown’)} Volumen: {source_info.get(‘volume’, ‘unknown’)} Frecuencia: {source_info.get(‘frequency’, ‘unknown’)} Proporcione una breve estrategia centrada en: 1) Método de ingesta, 2) Consideraciones clave.””” estrategia = self.generate_response(prompt, max_tokens=100) return {“source”: source_info, “strategy”: estrategia, “timestamp”: datetime.now().isoformat()} class DataQualityAgent(LightweightLLMAgent): def __init__(self): super().__init__(role=”Analista de calidad de datos”) def evalua_data_quality(self, data_sample: Dict) -> Dict: Prompt = f”””Evaluar la calidad de los datos para esta muestra: Integridad: {data_sample.get(‘completeness’, ‘N/A’)}% de coherencia: {data_sample.get(‘consistency’, ‘N/A’)}% de problemas encontrados: {data_sample.get(‘issues’, 0)} Proporcione una breve evaluación de calidad y las 2 recomendaciones principales.””” Assessment = self.generate_response(prompt, max_tokens=100) return {“assessment”: evaluación, “severity”: self._calculate_severity(data_sample), “timestamp”: datetime.now().isoformat()} def _calculate_severity(self, data_sample: Dict) -> str: integridad = data_sample.get(‘integridad’, 100) consistencia = data_sample.get(‘consistencia’, 100) avg_score = (integridad + consistencia) / 2 si avg_score >= 90: devuelve “BAJO” elif avg_score >= 70: devuelve “MEDIO” más: devuelve “ALTO”

Diseñamos los agentes de ingesta y calidad de datos para centrarnos en el análisis estructurado de los canales de datos. Dejamos que el agente de ingesta determine el mejor enfoque para el flujo de datos, mientras que el agente de calidad evalúa la integridad, la coherencia y los problemas de los datos para proporcionar información útil. Juntos, establecen las dos primeras capas de gestión autónoma de datos. Consulta los CÓDIGOS COMPLETOS aquí.

class InfrastructureOptimizationAgent(LightweightLLMAgent): def __init__(self): super().__init__(role=”Especialista en optimización de infraestructura”) def optimizar_resources(self, metrics: Dict) -> Dict: Prompt = f”””Analizar métricas de infraestructura y sugerir optimizaciones: Uso de CPU: {metrics.get(‘cpu_usage’, 0)}% de uso de memoria: {metrics.get(‘memory_usage’, 0)}% Almacenamiento: {metrics.get(‘storage_used’, 0)}GB / {metrics.get(‘storage_total’, 0)}GB Latencia de consulta: {metrics.get(‘query_latency’, 0)}ms Proporcionar 2 recomendaciones de optimización.””” recomendaciones = self.generate_response(prompt, max_tokens=100) return {“current_metrics”: métricas, “recomendaciones”: recomendaciones, “prioridad”: self._calculate_priority(métricas), “marca de tiempo”: datetime.now().isoformat()} def _calculate_priority(self, métricas: Dict) -> str: cpu = metrics.get(‘cpu_usage’, 0) memoria = metrics.get(‘memory_usage’, 0) si cpu > 85 o memoria > 85: devuelve “CRÍTICO” elif cpu > 70 o memoria > 70: devuelve “ALTO” en caso contrario: devuelve “NORMAL”

Desarrollamos el Agente de optimización de infraestructura para analizar continuamente métricas clave como la utilización de CPU, memoria y almacenamiento. Lo utilizamos para generar sugerencias de optimización inteligentes, lo que nos ayuda a mantener un alto rendimiento y eficiencia de recursos. Este agente garantiza que nuestra infraestructura siga siendo receptiva y escalable durante las operaciones de datos. Consulta los CÓDIGOS COMPLETOS aquí.

clase AgenticDataOrchestrator: def __init__(self): print(“\n” + “=”*70) print(“Inicializando el sistema de infraestructura de datos Agentic”) print(“=”*70 + “\n”) self.ingestion_agent = DataIngestionAgent() self.quality_agent = DataQualityAgent() self.optimization_agent = InfrastructureOptimizationAgent() self.execution_log = []
def Process_data_pipeline(self, pipeline_config: Dict) -> Dict: resultados = {“pipeline_id”: pipeline_config.get(“id”, “unknown”), “start_time”: datetime.now().isoformat(), “etapas”: []} imprimir(“\n[Stage 1] Análisis de ingesta de datos”) ingestion_result = self.ingestion_agent.analyze_data_source(pipeline_config.get(“source”, {})) print(f”Estrategia: {ingestion_result[‘strategy’][:150]}…”) resultados[“stages”].append({“etapa”: “ingestión”, “resultado”: ingestión_result}) print(“\n[Stage 2] Evaluación de la calidad de los datos”) resultado_calidad = self.quality_agent.assess_data_quality(pipeline_config.get(“quality_metrics”, {})) print(f”Evaluación: {resultado_calidad[‘assessment’][:150]}…”) print(f”Severidad: {calidad_resultado[‘severity’]}”) resultados[“stages”].append({“etapa”: “calidad”, “resultado”: calidad_resultado}) print(“\n[Stage 3] Optimización de infraestructura”) optimización_result = self.optimization_agent.optimize_resources(pipeline_config.get(“infrastructure_metrics”, {})) print(f”Recomendaciones: {optimization_result[‘recommendations’][:150]}…”) print(f”Prioridad: {resultado_optimización[‘priority’]}”) resultados[“stages”].append({“etapa”: “optimización”, “resultado”: optimización_resultado}) resultados[“end_time”] = datetime.now().isoformat() resultados[“status”] = “completado” self.execution_log.append(resultados) devuelve resultados def generate_summary_report(self) -> pd.DataFrame: si no, self.execution_log: devuelve pd.DataFrame() resumen_data = []
para iniciar sesión self.execution_log: resumen_data.append({“ID de canalización”: registro[“pipeline_id”]”Hora de inicio”: registro[“start_time”]”Estado”: iniciar sesión[“status”]”Etapas completadas”: len(log[“stages”])}) devolver pd.DataFrame(summary_data)

Construimos un Agentic Data Orchestrator para coordinar a todos los agentes especializados bajo un flujo de trabajo unificado. Lo usamos para gestionar la ejecución de la canalización de un extremo a otro, activando la ingesta, los controles de calidad y la optimización de forma secuencial. Al hacer esto, aportamos estructura, colaboración y automatización a todo el sistema multiagente. Consulta los CÓDIGOS COMPLETOS aquí.

def main(): Orchestrator = AgenticDataOrchestrator() print(“\n” + “=”*70) print(“EJEMPLO 1: Canalización de datos de comercio electrónico”) print(“=”*70) ecommerce_pipeline = { “id”: “ecommerce_pipeline_001”, “source”: {“type”: “REST API”, “volume”: “10GB/day”, “frequency”: “real-time”}, “quality_metrics”: {“integridad”: 87, “consistencia”: 92, “problemas”: 15}, “infrastructure_metrics”: {“cpu_usage”: 78, “memory_usage”: 82, “storage_used”: 450, “storage_total”: 1000, “query_latency”: 250} } resultado1 = Orchestrator.process_data_pipeline(ecommerce_pipeline) print(“\n\n” + “=”*70) print(“EJEMPLO 2: Canalización de datos del sensor IoT”) print(“=”*70) iot_pipeline = { “id”: “iot_pipeline_002”, “source”: {“type”: “Message Queue (Kafka)”, “volume”: “50GB/day”, “frequency”: “streaming”}, “quality_metrics”: {“integridad”: 95, “consistencia”: 88, “problemas”: 8}, “infrastructure_metrics”: {“cpu_usage”: 65, “memory_usage”: 71, “storage_used”: 780, “storage_total”: 2000, “query_latency”: 180} } resultado2 = Orchestrator.process_data_pipeline(iot_pipeline) print(“\n\n” + “=”*70) print(“INFORME DE RESUMEN DE EJECUCIÓN”) print(“=”*70 + “\n”) resumen_df = Orchestrator.generate_summary_report() print(summary_df.to_string(index=False)) print(“\n” + “=”*70) print(“¡Tutorial completo!”) print(“=”*70) print(“\nConceptos clave demostrados:”) print(” ✓ Arquitectura ligera de agentes LLM”) print(” ✓ Agentes especializados para diferentes tareas de datos”) print(” ✓ Orquestación de múltiples agentes”) print(” ✓ Monitoreo y optimización de infraestructura”) print(” ✓ Toma de decisiones autónoma en canalizaciones de datos”) if __name__ == “__main__”: main()

Demostramos nuestro sistema completo a través de dos ejemplos del mundo real: un comercio electrónico y un canal de datos de IoT. Observamos cómo cada agente desempeña su rol de forma autónoma mientras contribuye a un objetivo compartido. Finalmente, generamos un informe resumido que confirma la eficiencia de la orquestación y el poder de la inteligencia agente ligera.

En conclusión, diseñamos y ejecutamos un marco de infraestructura de datos inteligente y multiagente impulsado por un modelo compacto de código abierto. Somos testigos de cómo agentes independientes pero cooperativos pueden analizar, evaluar y optimizar de forma autónoma sistemas de datos del mundo real. Toda la configuración demuestra cómo los LLM livianos pueden manejar de manera eficiente la inteligencia de infraestructura, al mismo tiempo que resalta cómo la orquestación agente transforma los flujos de trabajo de datos tradicionales en sistemas adaptables y autooptimizados listos para aplicaciones empresariales escalables.

Consulta los CÓDIGOS COMPLETOS aquí. No dude en consultar nuestra página de GitHub para tutoriales, códigos y cuadernos. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 100.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.

Asif Razzaq es el director ejecutivo de Marktechpost Media Inc.. Como emprendedor e ingeniero visionario, Asif está comprometido a aprovechar el potencial de la inteligencia artificial para el bien social. Su esfuerzo más reciente es el lanzamiento de una plataforma de medios de inteligencia artificial, Marktechpost, que se destaca por su cobertura en profundidad del aprendizaje automático y las noticias sobre aprendizaje profundo que es técnicamente sólida y fácilmente comprensible para una amplia audiencia. La plataforma cuenta con más de 2 millones de visitas mensuales, lo que ilustra su popularidad entre el público.

🙌 Siga MARKTECHPOST: agréguenos como fuente preferida en Google.