Construyendo un flujo de trabajo Ollama Langchain acelerado con GPU con agentes de trapo, monitoreo de rendimiento de chat de múltiples sesiones

En este tutorial, construimos una pila LLM local con capacidad de GPU que unifica a Ollama y Langchain. Instalamos las bibliotecas requeridas, lanzamos el servidor Ollama, extraemos un modelo y lo envolvemos en un Langchain LLM personalizado, lo que nos permite controlar la temperatura, los límites del token y el contexto. Agregamos una capa de generación de recuperación aumentada que ingiere PDF o texto, los fragmenta, los incrusta con transformadores de oraciones y atiende respuestas fundamentadas. Administramos la memoria de chat de múltiples sesiones, las herramientas de registro (búsqueda web + consulta de RAG) y giramos a un agente que razona cuándo llamarlos.

import os
import sys
import subprocess
import time
import threading
import queue
import json
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor


def install_packages():
    """Install required packages for Colab environment"""
    packages = [
        "langchain",
        "langchain-community",
        "langchain-core",
        "chromadb",
        "sentence-transformers",
        "faiss-cpu",
        "pypdf",
        "python-docx",
        "requests",
        "psutil",
        "pyngrok",
        "gradio"
    ]
   
    for package in packages:
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])


install_packages()


import requests
import psutil
import threading
from queue import Queue
from langchain.llms.base import LLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.schema import BaseMessage, HumanMessage, AIMessage, SystemMessage
from langchain.memory import ConversationBufferWindowMemory, ConversationSummaryBufferMemory
from langchain.chains import ConversationChain, RetrievalQA
from langchain.prompts import PromptTemplate, ChatPromptTemplate
from langchain.document_loaders import PyPDFLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS, Chroma
from langchain.agents import AgentType, initialize_agent, Tool
from langchain.tools import DuckDuckGoSearchRun

Importamos las utilidades de Python necesarias en Colab para la concurrencia, las llamadas del sistema y el manejo de JSON. Definimos y ejecutamos install_packages () para extraer langchain, incrustaciones, tiendas vectoriales, cargadores de documentos, monitoreo y dependencias de la interfaz de usuario. Luego importamos Langchain LLM, memoria, recuperación y herramientas de agentes (incluida la búsqueda de Duckduckgo) para construir un flujo de trabajo de trapo y agente extensible.

[Download the full codes with notebook here]

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4

Definimos un dataclass Ollamaconfig, por lo que mantenemos todas las configuraciones de tiempo de ejecución de Ollama en un lugar limpio. Establecemos el nombre del modelo y el punto final de la API local, así como el comportamiento de generación (max_tokens, temperatura y context_window). Controlamos el rendimiento con GPU_LAYERS (‑1 = cargamos todo a GPU cuando sea posible), Batch_Size y Hils para el paralelismo.

@dataclass
class OllamaConfig:
    """Configuration for Ollama setup"""
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    max_tokens: int = 2048
    temperature: float = 0.7
    gpu_layers: int = -1  
    context_window: int = 4096
    batch_size: int = 512
    threads: int = 4
We define an OllamaConfig dataclass so we keep all Ollama runtime settings in one clean place. We set the model name and local API endpoint, as well as the generation behavior (max_tokens, temperature, and context_window). We control performance with gpu_layers (‑1 = load all to GPU when possible), batch_size, and threads for parallelism.

class OllamaManager:
    """Advanced Ollama manager for Colab environment"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.process = None
        self.is_running = False
        self.models_cache = {}
        self.performance_monitor = PerformanceMonitor()
       
    def install_ollama(self):
        """Install Ollama in Colab environment"""
        try:
            subprocess.run([
                "curl", "-fsSL", "https://ollama.com/install.sh", "-o", "/tmp/install.sh"
            ], check=True)
           
            subprocess.run(["bash", "/tmp/install.sh"], check=True)
            print("✅ Ollama installed successfully")
           
        except subprocess.CalledProcessError as e:
            print(f"❌ Failed to install Ollama: {e}")
            raise
   
    def start_server(self):
        """Start Ollama server with GPU support"""
        if self.is_running:
            print("Ollama server is already running")
            return
           
        try:
            env = os.environ.copy()
            env["OLLAMA_NUM_PARALLEL"] = str(self.config.threads)
            env["OLLAMA_MAX_LOADED_MODELS"] = "3"
           
            self.process = subprocess.Popen(
                ["ollama", "serve"],
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE
            )
           
            time.sleep(5)
           
            if self.health_check():
                self.is_running = True
                print("✅ Ollama server started successfully")
                self.performance_monitor.start()
            else:
                raise Exception("Server failed to start properly")
               
        except Exception as e:
            print(f"❌ Failed to start Ollama server: {e}")
            raise
   
    def health_check(self) -> bool:
        """Check if Ollama server is healthy"""
        try:
            response = requests.get(f"{self.config.base_url}/api/tags", timeout=10)
            return response.status_code == 200
        except:
            return False
   
    def pull_model(self, model_name: str) -> bool:
        """Pull a model from Ollama registry"""
        try:
            print(f"🔄 Pulling model: {model_name}")
            result = subprocess.run(
                ["ollama", "pull", model_name],
                capture_output=True,
                text=True,
                timeout=1800  
            )
           
            if result.returncode == 0:
                print(f"✅ Model {model_name} pulled successfully")
                self.models_cache[model_name] = True
                return True
            else:
                print(f"❌ Failed to pull model {model_name}: {result.stderr}")
                return False
               
        except subprocess.TimeoutExpired:
            print(f"❌ Timeout pulling model {model_name}")
            return False
        except Exception as e:
            print(f"❌ Error pulling model {model_name}: {e}")
            return False
   
    def list_models(self) -> List[str]:
        """List available local models"""
        try:
            result = subprocess.run(
                ["ollama", "list"],
                capture_output=True,
                text=True
            )
           
            models = []
            for line in result.stdout.split('\n')[1:]:
                if line.strip():
                    model_name = line.split()[0]
                    models.append(model_name)
                   
            return models
           
        except Exception as e:
            print(f"❌ Error listing models: {e}")
            return []
   
    def stop_server(self):
        """Stop Ollama server"""
        if self.process:
            self.process.terminate()
            self.process.wait()
            self.is_running = False
            self.performance_monitor.stop()
            print("✅ Ollama server stopped")

Creamos la clase Ollamamanager para instalar, iniciar, monitorear y administrar el servidor Ollama en el entorno Colab. Establecemos variables de entorno para el paralelismo de GPU, ejecutamos el servidor en segundo plano y verificamos que se presente con una verificación de salud. Puse modelos a pedido, almacenamos en caché, enumeramos los disponibles localmente y cerramos con gracia el servidor cuando la tarea esté completa, todo mientras rastrea el rendimiento.

[Download the full codes with notebook here]

class PerformanceMonitor:
    """Monitor system performance and resource usage"""
   
    def __init__(self):
        self.monitoring = False
        self.stats = {
            "cpu_usage": [],
            "memory_usage": [],
            "gpu_usage": [],
            "inference_times": []
        }
        self.monitor_thread = None
   
    def start(self):
        """Start performance monitoring"""
        self.monitoring = True
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
   
    def stop(self):
        """Stop performance monitoring"""
        self.monitoring = False
        if self.monitor_thread:
            self.monitor_thread.join()
   
    def _monitor_loop(self):
        """Main monitoring loop"""
        while self.monitoring:
            try:
                cpu_percent = psutil.cpu_percent(interval=1)
                memory = psutil.virtual_memory()
               
                self.stats["cpu_usage"].append(cpu_percent)
                self.stats["memory_usage"].append(memory.percent)
               
                for key in ["cpu_usage", "memory_usage"]:
                    if len(self.stats[key]) > 100:
                        self.stats[key] = self.stats[key][-100:]
               
                time.sleep(5)
               
            except Exception as e:
                print(f"Monitoring error: {e}")
   
    def get_stats(self) -> Dict[str, Any]:
        """Get current performance statistics"""
        return {
            "avg_cpu": sum(self.stats["cpu_usage"][-10:]) / max(len(self.stats["cpu_usage"][-10:]), 1),
            "avg_memory": sum(self.stats["memory_usage"][-10:]) / max(len(self.stats["memory_usage"][-10:]), 1),
            "total_inferences": len(self.stats["inference_times"]),
            "avg_inference_time": sum(self.stats["inference_times"]) / max(len(self.stats["inference_times"]), 1)
        }

Definimos una clase de performemonitor para rastrear la CPU, la memoria y los tiempos de inferencia en tiempo real mientras se ejecuta el servidor Ollama. Lanzamos un hilo de fondo para recopilar estadísticas cada pocos segundos, almacenar métricas recientes y proporcionar resúmenes de uso promedio. Esto nos ayuda a monitorear la carga del sistema y optimizar el rendimiento durante la inferencia del modelo.

[Download the full codes with notebook here]

class OllamaLLM(LLM):
    """Custom LangChain LLM for Ollama"""
   
    model_name: str = "llama2"
    base_url: str = "http://localhost:11434"
    temperature: float = 0.7
    max_tokens: int = 2048
    performance_monitor: Optional[PerformanceMonitor] = None
   
    @property
    def _llm_type(self) -> str:
        return "ollama"
   
    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Make API call to Ollama"""
        start_time = time.time()
       
        try:
            payload = {
                "model": self.model_name,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": self.temperature,
                    "num_predict": self.max_tokens,
                    "stop": stop or []
                }
            }
           
            response = requests.post(
                f"{self.base_url}/api/generate",
                json=payload,
                timeout=120
            )
           
            response.raise_for_status()
            result = response.json()
           
            inference_time = time.time() - start_time
           
            if self.performance_monitor:
                self.performance_monitor.stats["inference_times"].append(inference_time)
           
            return result.get("response", "")
           
        except Exception as e:
            print(f"❌ Ollama API error: {e}")
            return f"Error: {str(e)}"

Envolvemos la API Ollama dentro de una clase Ollamallm personalizada compatible con la interfaz LLM de Langchain. Definimos cómo se envían las indicaciones al servidor Ollama y grabamos cada hora de inferencia para el seguimiento de rendimiento. Esto nos permite enchufar ollama directamente a las cadenas de langchain, los agentes y los componentes de la memoria mientras monitoreamos la eficiencia.

class RAGSystem:
    """Retrieval-Augmented Generation system"""
   
    def __init__(self, llm: OllamaLLM, embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.llm = llm
        self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        self.vector_store = None
        self.qa_chain = None
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
   
    def add_documents(self, file_paths: List[str]):
        """Add documents to the vector store"""
        documents = []
       
        for file_path in file_paths:
            try:
                if file_path.endswith('.pdf'):
                    loader = PyPDFLoader(file_path)
                else:
                    loader = TextLoader(file_path)
               
                docs = loader.load()
                documents.extend(docs)
               
            except Exception as e:
                print(f"❌ Error loading {file_path}: {e}")
       
        if documents:
            splits = self.text_splitter.split_documents(documents)
           
            if self.vector_store is None:
                self.vector_store = FAISS.from_documents(splits, self.embeddings)
            else:
                self.vector_store.add_documents(splits)
           
            self.qa_chain = RetrievalQA.from_chain_type(
                llm=self.llm,
                chain_type="stuff",
                retriever=self.vector_store.as_retriever(search_kwargs={"k": 3}),
                return_source_documents=True
            )
           
            print(f"✅ Added {len(splits)} document chunks to vector store")
   
    def query(self, question: str) -> Dict[str, Any]:
        """Query the RAG system"""
        if not self.qa_chain:
            return {"answer": "No documents loaded. Please add documents first."}
       
        try:
            result = self.qa_chain({"query": question})
            return {
                "answer": result["result"],
                "sources": [doc.metadata for doc in result.get("source_documents", [])]
            }
        except Exception as e:
            return {"answer": f"Error: {str(e)}"}

Utilizamos ConversationManager para administrar la memoria de la sesión múltiple, permitiendo historias de chat basadas en búfer y sumarias para cada sesión. Luego, en OllamalangChainsystem, reunimos todos los componentes, servidor, LLM, trapo, memoria, herramientas y agentes, en una interfaz unificada. Configuramos el sistema para instalar Ollama, extraer modelos, construir agentes con herramientas como búsqueda web y trapo, y exponer el chat, la carga de documentos y las capacidades de cambio de modelo para una interacción sin costura.

class ConversationManager:
    """Manage conversation history and memory"""
   
    def __init__(self, llm: OllamaLLM, memory_type: str = "buffer"):
        self.llm = llm
        self.conversations = {}
        self.memory_type = memory_type
       
    def get_conversation(self, session_id: str) -> ConversationChain:
        """Get or create conversation for session"""
        if session_id not in self.conversations:
            if self.memory_type == "buffer":
                memory = ConversationBufferWindowMemory(k=10)
            elif self.memory_type == "summary":
                memory = ConversationSummaryBufferMemory(
                    llm=self.llm,
                    max_token_limit=1000
                )
            else:
                memory = ConversationBufferWindowMemory(k=10)
           
            self.conversations[session_id] = ConversationChain(
                llm=self.llm,
                memory=memory,
                verbose=True
            )
       
        return self.conversations[session_id]
   
    def chat(self, session_id: str, message: str) -> str:
        """Chat with specific session"""
        conversation = self.get_conversation(session_id)
        return conversation.predict(input=message)
   
    def clear_session(self, session_id: str):
        """Clear conversation history for session"""
        if session_id in self.conversations:
            del self.conversations[session_id]


class OllamaLangChainSystem:
    """Main system integrating all components"""
   
    def __init__(self, config: OllamaConfig):
        self.config = config
        self.manager = OllamaManager(config)
        self.llm = None
        self.rag_system = None
        self.conversation_manager = None
        self.tools = []
        self.agent = None
       
    def setup(self):
        """Complete system setup"""
        print("🚀 Setting up Ollama + LangChain system...")
       
        self.manager.install_ollama()
        self.manager.start_server()
       
        if not self.manager.pull_model(self.config.model_name):
            print("❌ Failed to pull default model")
            return False
       
        self.llm = OllamaLLM(
            model_name=self.config.model_name,
            base_url=self.config.base_url,
            temperature=self.config.temperature,
            max_tokens=self.config.max_tokens,
            performance_monitor=self.manager.performance_monitor
        )
       
        self.rag_system = RAGSystem(self.llm)
       
        self.conversation_manager = ConversationManager(self.llm)
       
        self._setup_tools()
       
        print("✅ System setup complete!")
        return True
   
    def _setup_tools(self):
        """Setup tools for the agent"""
        search = DuckDuckGoSearchRun()
       
        self.tools = [
            Tool(
                name="Search",
                func=search.run,
                description="Search the internet for current information"
            ),
            Tool(
                name="RAG_Query",
                func=lambda q: self.rag_system.query(q)["answer"],
                description="Query loaded documents using RAG"
            )
        ]
       
        self.agent = initialize_agent(
            tools=self.tools,
            llm=self.llm,
            agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
            verbose=True
        )
   
    def chat(self, message: str, session_id: str = "default") -> str:
        """Simple chat interface"""
        return self.conversation_manager.chat(session_id, message)
   
    def rag_chat(self, question: str) -> Dict[str, Any]:
        """RAG-based chat"""
        return self.rag_system.query(question)
   
    def agent_chat(self, message: str) -> str:
        """Agent-based chat with tools"""
        return self.agent.run(message)
   
    def switch_model(self, model_name: str) -> bool:
        """Switch to different model"""
        if self.manager.pull_model(model_name):
            self.llm.model_name = model_name
            print(f"✅ Switched to model: {model_name}")
            return True
        return False
   
    def load_documents(self, file_paths: List[str]):
        """Load documents into RAG system"""
        self.rag_system.add_documents(file_paths)
   
    def get_performance_stats(self) -> Dict[str, Any]:
        """Get system performance statistics"""
        return self.manager.performance_monitor.get_stats()
   
    def cleanup(self):
        """Clean up resources"""
        self.manager.stop_server()
        print("✅ System cleanup complete")

Utilizamos el ConversationManager para mantener sesiones de chat separadas, cada una con su tipo de memoria, ya sea basado en búfer o basado en un resumen, lo que nos permite preservar o resumir el contexto según sea necesario. En el sistema OllamalangChains, integramos todo: instalamos y lanzamos Ollama, extraemos el modelo deseado, lo envolvemos en un LLM compatible con Langchain, conectamos un sistema de trapo, inicializamos la memoria de chat e registra herramientas externas como la búsqueda web.

def main():
    """Main function demonstrating the system"""
   
    config = OllamaConfig(
        model_name="llama2",
        temperature=0.7,
        max_tokens=2048
    )
   
    system = OllamaLangChainSystem(config)
   
    try:
        if not system.setup():
            return
       
        print("\n🗣️ Testing basic chat:")
        response = system.chat("Hello! How are you?")
        print(f"Response: {response}")
       
        print("\n🔄 Testing model switching:")
        models = system.manager.list_models()
        print(f"Available models: {models}")
       
       
        print("\n🤖 Testing agent:")
        agent_response = system.agent_chat("What's the current weather like?")
        print(f"Agent Response: {agent_response}")
       
        print("\n📊 Performance Statistics:")
        stats = system.get_performance_stats()
        print(json.dumps(stats, indent=2))
       
    except KeyboardInterrupt:
        print("\n⏹️ Interrupted by user")
    except Exception as e:
        print(f"❌ Error: {e}")
    finally:
        system.cleanup()


def create_gradio_interface(system: OllamaLangChainSystem):
    """Create a Gradio interface for easy interaction"""
    try:
        import gradio as gr
       
        def chat_interface(message, history, mode):
            if mode == "Basic Chat":
                response = system.chat(message)
            elif mode == "RAG Chat":
                result = system.rag_chat(message)
                response = result["answer"]
            elif mode == "Agent Chat":
                response = system.agent_chat(message)
            else:
                response = "Unknown mode"
           
            history.append((message, response))
            return "", history
       
        def upload_docs(files):
            if files:
                file_paths = [f.name for f in files]
                system.load_documents(file_paths)
                return f"Loaded {len(file_paths)} documents into RAG system"
            return "No files uploaded"
       
        def get_stats():
            stats = system.get_performance_stats()
            return json.dumps(stats, indent=2)
       
        with gr.Blocks(title="Ollama + LangChain System") as demo:
            gr.Markdown("# 🦙 Ollama + LangChain Advanced System")
           
            with gr.Tab("Chat"):
                chatbot = gr.Chatbot()
                mode = gr.Dropdown(
                    ["Basic Chat", "RAG Chat", "Agent Chat"],
                    value="Basic Chat",
                    label="Chat Mode"
                )
                msg = gr.Textbox(label="Message")
                clear = gr.Button("Clear")
               
                msg.submit(chat_interface, [msg, chatbot, mode], [msg, chatbot])
                clear.click(lambda: ([], ""), outputs=[chatbot, msg])
           
            with gr.Tab("Document Upload"):
                file_upload = gr.File(file_count="multiple", label="Upload Documents")
                upload_btn = gr.Button("Upload to RAG System")
                upload_status = gr.Textbox(label="Status")
               
                upload_btn.click(upload_docs, file_upload, upload_status)
           
            with gr.Tab("Performance"):
                stats_btn = gr.Button("Get Performance Stats")
                stats_output = gr.Textbox(label="Performance Statistics")
               
                stats_btn.click(get_stats, outputs=stats_output)
       
        return demo
       
    except ImportError:
        print("Gradio not installed. Skipping interface creation.")
        return None


if __name__ == "__main__":
    print("🚀 Ollama + LangChain System for Google Colab")
    print("=" * 50)
   
    main()
   
    # Or create a system instance for interactive use
    # config = OllamaConfig(model_name="llama2")
    # system = OllamaLangChainSystem(config)
    # system.setup()
   
    # # Create Gradio interface
    # demo = create_gradio_interface(system)
    # if demo:
    #     demo.launch(share=True)  # share=True for public link

Envolvemos todo en la función principal para ejecutar una demostración completa, configurar el sistema, probar el chat, las herramientas de agentes, la lista de modelos y las estadísticas de rendimiento. Luego, en create_gradio_interface (), creamos una aplicación de Gradio fácil de usar con pestañas para chatear, cargar documentos al sistema RAG y monitorear el rendimiento. Finalmente, llamamos a Main () en el bloque __main__ para la ejecución directa de Colab, o opcionalmente lanzamos la interfaz de usuario de Gradio para la exploración interactiva y el intercambio público.

En conclusión, tenemos un patio de recreo flexible: cambiamos los modelos Ollama, Converse con memoria amortiguada o resumida, cuestionamos nuestros propios documentos, llegamos a la búsqueda cuando faltan contexto y monitoreamos las estadísticas básicas de recursos para permanecer dentro de los límites de Colab. El código es modular, lo que nos permite extender la lista de herramientas, opciones de inferencia de ajuste (temperatura, tokens máximos, concurrencia) en Ollamaconfig, o adaptar la tubería RAG a corpus más grandes o modelos de incrustación diferentes. Lanzamos la aplicación Gradio con Share = True para colaborar o incrustar estos componentes en nuestros proyectos. Ahora poseemos una plantilla extensible para la experimentación LLM local rápida.


Mira el Codos. Todo el crédito por esta investigación va a los investigadores de este proyecto. Suscríbete ahora a nuestro boletín de IA


Asif Razzaq es el CEO de MarktechPost Media Inc .. Como empresario e ingeniero visionario, ASIF se compromete a aprovechar el potencial de la inteligencia artificial para el bien social. Su esfuerzo más reciente es el lanzamiento de una plataforma de medios de inteligencia artificial, MarktechPost, que se destaca por su cobertura profunda de noticias de aprendizaje automático y de aprendizaje profundo que es técnicamente sólido y fácilmente comprensible por una audiencia amplia. La plataforma cuenta con más de 2 millones de vistas mensuales, ilustrando su popularidad entre el público.