Una guía de codificación para crear un sistema de procesamiento de tareas en segundo plano de nivel de producción utilizando Huey con SQLite, programación, reintentos, canalizaciones y control de concurrencia
consumidor = huey.create_consumer( trabajadores=4, tipo_trabajador=THREAD_TRABAJADOR, periódico=Verdadero, retraso_inicial=0.1, retroceso=1.15, retraso_máximo=2.0, intervalo_programador=1, check_worker_health=Verdadero, intervalo_comprobación_salud=10, flux_locks=False,) hilo_consumidor = threading.Thread(target=consumer.run, daemon=True) consumer_thread.start() print(“Consumidor iniciado (enhebrado).”) print(“\nConceptos básicos de puesta en cola…”) r1 = quick_add(10, 32) r2 = slow_io(0.75) print(“quick_add result:”, r1(blocking=True, timeout=5)) print(“resultado de slow_io:”, r2(bloqueo=True, tiempo de espera=5)) print(“\nReintentos + demostración de prioridad (tarea inestable)…”) rf = flaky_network_call(p_fail=0.7) try: print(“resultado de Flaky_network_call:”, rf(bloqueo=True, tiempo de espera=10)) excepto excepción como e: print(“flaky_network_call falló incluso después de reintentos:”, repr(e)) print(“\nTarea de contexto (ID de tarea dentro de la carga útil)…”) rp = cpu_pi_estimate(samples=150_000) print(“carga útil de pi:”, rp(blocking=True, timeout=20)) print(“\nDemostración de bloqueos: poner en cola varios trabajos bloqueados rápidamente (debe serializarse)…”) lock_results = [locked_sync_job(tag=f”run{i}”) for i in range(3)]
imprimir([res(blocking=True, timeout=10) for res in locked_results]) print(“\nDemostración de programación: ejecute slow_io en ~3 segundos…”) rs = slow_io.schedule(args=(0.25,), delay=3) print(“control programado:”, rs) print(“resultado de slow_io programado:”, rs(blocking=True, timeout=10)) print(“\nRevocar demostración: programar una tarea en 5 segundos y luego revocarla antes se ejecuta…”) rv = slow_io.schedule(args=(0.1,), delay=5) rv.revoke() time.sleep(6) try: out = rv(blocking=False) print(“salida de la tarea revocada:”, out) excepto excepción como e: print(“la tarea revocada no produjo el resultado (esperado):”, type(e).__name__, str(e)[:120]) print(“\nDemostración de canalización…”) pipeline = ( fetch_number.s(123) .then(transform_number, 5) .then(store_result) ) pipe_res = huey.enqueue(pipeline) print(“resultado final de canalización:”, pipe_res(blocking=True, timeout=10)) print(“\nIniciando demostración de latidos de 15 segundos durante ~40 segundos…”) start_segundos_heartbeat(interval_sec=15) time.sleep(40) stop_segundos_heartbeat() print(“Se detuvo la demostración de latidos de 15 segundos.”) print_latest_events(12) print(“\nDeteniendo al consumidor con gracia…”) consumer.stop(graceful=True) consumer_thread.join(timeout=5) print(“Consumidor detenido.”)