La mayoría de los tutoriales de aprendizaje automático se centran en el servicio sincrónico en tiempo real, lo que permite respuestas inmediatas a las solicitudes de predicción. Sin embargo, este enfoque puede tener problemas con picos de tráfico y no es ideal para tareas de larga duración. También requiere máquinas más potentes para responder rápidamente y, si el cliente o el servidor fallan, el resultado de la predicción generalmente se pierde.
En esta publicación del blog, demostraremos cómo ejecutar un modelo de aprendizaje automático como un trabajador asincrónico utilizando Celery y Redis. Usaremos el modelo base Florence 2, un modelo de lenguaje Vision conocido por su impresionante rendimiento. Este tutorial proporcionará un ejemplo mínimo pero funcional que puede adaptar y ampliar para sus propios casos de uso.
Puedes ver una demostración de la aplicación aquí: https://caption-app-dfmj3maizq-ew.a.run.app/
El núcleo de nuestra solución se basa en Celery, una biblioteca de Python que implementa esta lógica de cliente/trabajador para nosotros. Nos permite distribuir el trabajo de cómputo entre muchos trabajadores, lo que mejora la escalabilidad de su caso de uso de inferencia de ML para cargas altas e impredecibles.
El proceso funciona de la siguiente manera:
- El cliente envía una tarea con algunos parámetros a una cola administrada por el broker (Redis en nuestro ejemplo).
- Un trabajador (o varios) supervisa continuamente la cola y toma las tareas a medida que llegan. Luego las ejecuta y guarda el resultado en el almacenamiento del backend.
- El cliente puede obtener el resultado de la tarea usando su identificación, ya sea consultando el backend o suscribiéndose al canal de la tarea.
Comencemos con un ejemplo simplificado:
Primero, ejecute Redis:
docker run -p 6379:6379 redis
Aquí está el código del trabajador:
from celery import Celery
# Configure Celery to use Redis as the broker and backend
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# Define a simple task
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])
Y el código del cliente:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().active()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
# Send a task to the worker
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
# Get the result
print(f"Result: {result.result}")
Esto da el resultado que esperamos: “Resultado: 10”
Ahora, pasemos al caso de uso real: servir a Florence 2.
Desarrollaremos una aplicación de subtitulado de imágenes con múltiples contenedores que utiliza Redis para la cola de tareas, Celery para la distribución de tareas y un volumen local o Google Cloud Storage para el posible almacenamiento de imágenes. La aplicación está diseñada con algunos componentes básicos: inferencia de modelos, distribución de tareas, interacción con el cliente y almacenamiento de archivos.
Descripción general de la arquitectura:
- Cliente: Inicia solicitudes de subtítulos de imágenes enviándolas al trabajador (a través del intermediario).
- Obrero: Recibe solicitudes, descarga imágenes, realiza inferencias utilizando el modelo previamente entrenado y devuelve resultados.
- Redis: Actúa como un intermediario de mensajes facilitando la comunicación entre el cliente y el trabajador.
- Almacenamiento de archivos:Almacenamiento temporal para archivos de imagen
Desglose de componentes:
1. Inferencia del modelo (model.py):
- Dependencias e inicialización:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
- Importa las bibliotecas necesarias para el procesamiento de imágenes, solicitudes web, interacción con Google Cloud Storage y registro.
- Inicializa el modelo Florence-2 previamente entrenado y el procesador para la generación de títulos de imágenes.
- Descargar imagen (download_image):
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
# Handle HTTP/HTTPS URLs
# ... (code to download image from URL) ...
elif url.startswith("gs://"):
# Handle Google Cloud Storage paths
# ... (code to download image from GCS) ...
else:
# Handle local file paths
# ... (code to open image from local path) ...
- Descarga la imagen de la URL proporcionada.
- Admite URL HTTP/HTTPS y rutas de Google Cloud Storage (
gs://) y rutas de archivos locales. - Ejecución de inferencia (run_inference):
def run_inference(url, task_prompt):
# ... (code to download image using download_image function) ...
try:
# ... (code to open and process the image) ...
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
# ... (error handling) ...
# ... (code to generate captions using the model) ...
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
# ... (model generation parameters) ...
)
# ... (code to decode generated captions) ...
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
# ... (code to post-process generated captions) ...
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer
Orquesta el proceso de subtitulado de imágenes:
- Descarga la imagen usando
download_image. - Prepara la imagen y la solicitud de tarea para el modelo.
- Genera subtítulos utilizando el modelo Florence-2 cargado.
- Decodifica y posprocesa los subtítulos generados.
- Devuelve el título final.
2. Distribución de tareas (worker.py):
import os
from celery import Celery
# ... other imports ...
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery configurations) ...
- Configura Celery para utilizar Redis como agente de mensajes para la distribución de tareas.
- Definición de tarea (inference_task):
@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
# ... (logging and error handling) ...
return run_inference(url, task_prompt)
- Define el
inference_taskque será ejecutado por los trabajadores de Celery. - Esta tarea llama al
run_inferencefunción demodel.py. - Ejecución del trabajador:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
- Inicia un trabajador Celery que escucha y ejecuta tareas.
3. Interacción con el cliente (client.py):
import os
from celery import Celery
# Get Redis URL from environment variable or use default
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Configure Celery to use Redis as the broker and backend
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
- Establece una conexión con Celery utilizando Redis como agente de mensajes.
- Envío de tarea (send_inference_task):
def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# Wait for the result
result = task.get(timeout=120)
return result
- Envía una tarea de subtitulado de imágenes (
inference_task) al trabajador de Apio. - Espera a que el trabajador complete la tarea y recupera el resultado.
Integración de Docker (docker-compose.yml):
- Define una configuración de múltiples contenedores mediante Docker Compose:
- rediseño: Ejecuta el servidor Redis para la intermediación de mensajes.
- modelo: Construye e implementa el trabajador de inferencia del modelo.
- aplicación: Construye e implementa la aplicación cliente.
- flor: Ejecuta una herramienta de monitoreo de tareas de Celery basada en web.
Puedes ejecutar la pila completa usando:
docker-compose up
¡Y ahí lo tienes! Acabamos de explorar una guía completa para crear un sistema de inferencia de aprendizaje automático asincrónico con Celery, Redis y Florence 2. Este tutorial demostró cómo usar Celery de manera eficaz para la distribución de tareas, Redis para la intermediación de mensajes y Florence 2 para el subtitulado de imágenes. Al adoptar flujos de trabajo asincrónicos, puede manejar grandes volúmenes de solicitudes, mejorar el rendimiento y mejorar la resiliencia general de sus aplicaciones de inferencia de aprendizaje automático. La configuración de Docker Compose proporcionada le permite ejecutar todo el sistema por su cuenta con un solo comando.
¿Listo para el siguiente paso? Implementar esta arquitectura en la nube puede presentar sus propios desafíos. ¡Cuéntame en los comentarios si te gustaría ver una publicación de seguimiento sobre la implementación en la nube!
Código: https://github.com/CVxTz/celery_ml_deploy
Manifestación: https://caption-app-dfmj3maizq-ew.a.run.app/