Deje de crear malos DAG: optimice su entorno de flujo de aire mejorando su código Python | por Álvaro Leandro Cavalcante Carneiro | Enero de 2025

Apache Airflow es una de las herramientas de orquestación más populares en el campo de datos, impulsando los flujos de trabajo para empresas de todo el mundo. Sin embargo, cualquiera que ya haya trabajado con Airflow en un entorno de producción, especialmente en uno complejo, sabe que ocasionalmente puede presentar algunos problemas y errores extraños.

Entre los muchos aspectos que necesita administrar en un entorno de flujo de aire, una métrica crítica a menudo vuela bajo el radar: Tiempo de análisis de DAG. Monitorear y optimizar el tiempo de análisis es esencial para evitar cuellos de botella de rendimiento y garantizar el funcionamiento correcto de sus orquestaciones, como exploraremos en este artículo.

Dicho esto, este tutorial tiene como objetivo introducir airflow-parse-benchuna herramienta de código abierto que desarrollé para ayudar a los ingenieros de datos a monitorear y optimizar sus entornos de flujo de aire, proporcionando información para reducir la complejidad del código y el tiempo de análisis.

Con respecto al flujo de aire, el tiempo de análisis de DAG es a menudo un métrica pasada por alto. El análisis ocurre cada vez que el flujo de aire procesa sus archivos de Python para construir dinámicamente los DAG.

Por defecto, todos sus DAG se analizan cada 30 segundos, una frecuencia controlada por la variable de configuración min_file_process_interval. Esto significa que cada 30 segundos, todo el código de Python que está presente en su dags La carpeta se lee, se importa y procesa para generar objetos DAG que contienen las tareas que se programarán. Luego se agregan archivos procesados ​​con éxito al Mochila.

Dos componentes de flujo de aire clave manejan este proceso:

Juntos, ambos componentes (comúnmente conocidos como el Procesador DAG) son ejecutados por el flujo de aire Planificadorasegurando que sus objetos DAG se actualicen antes de ser activados. Sin embargo, por la escalabilidad y las razones de seguridad, también es posible ejecutar su procesador DAG como un componente separado en su clúster.

Si su entorno solo tiene unas pocas docenas de DAG, es poco probable que el proceso de análisis cause algún tipo de problema. Sin embargo, es común encontrar entornos de producción con cientos o incluso miles de DAG. En este caso, si su tiempo de análisis es demasiado alto, puede conducir a:

  • Retraso de programación DAG.
  • Aumentar la utilización de recursos.
  • Entorno problemas de latidos del corazón.
  • Fallos de programador.
  • CPU excesivo y uso de memoria, desperdicio de recursos.

Ahora, imagine tener un entorno con cientos de DAG que contienen lógica de análisis innecesariamente compleja. Las pequeñas ineficiencias pueden convertirse rápidamente en problemas significativos, afectando la estabilidad y el rendimiento de toda su configuración de flujo de aire.

Al escribir DAG de flujo de aire, hay algunas mejores prácticas importantes a tener en cuenta para crear un código optimizado. Aunque puede encontrar muchos tutoriales sobre cómo mejorar sus DAG, resumiré algunos de los principios clave que pueden mejorar significativamente su rendimiento de DAG.

Limite el código de nivel superior

Una de las causas más comunes de los tiempos de análisis DAG altos es el código de nivel superior ineficiente o complejo. El código de nivel superior en un archivo DAG de flujo de aire se ejecuta cada vez que el planificador analiza el archivo. Si este código incluye operaciones intensivas en recursos, como consultas de bases de datos, llamadas de API o generación de tareas dinámicas, puede afectar significativamente el rendimiento del análisis.

El siguiente código muestra un ejemplo de un DAG no optimizado:

En este caso, cada vez que el planificador analiza el archivo, se ejecuta el código de nivel superior, haciendo una solicitud de API y procesando el marco de datos, lo que puede afectar significativamente el tiempo de análisis.

Otro factor importante que contribuye al análisis lento son las importaciones de alto nivel. Cada biblioteca importada en el nivel superior se carga en la memoria durante el análisis, lo que puede llevar mucho tiempo. Para evitar esto, puede mover las importaciones a funciones o definiciones de tareas.

El siguiente código muestra una mejor versión del mismo DAG:

Evite XCOM y variables en código de nivel superior

Todavía hablar sobre el mismo tema, es particularmente interesante para evitar usar XCOM y variables en su código de nivel superior. Como se indica por Documentación de Google:

Si está utilizando variable.get () en el código de nivel superior, cada vez que se analiza el archivo .py, Airflow ejecuta una variable.get () que abre una sesión a la DB. Esto puede ralentizar drásticamente los tiempos de análisis.

Para abordar esto, considere usar un Diccionario JSON Para recuperar múltiples variables en una sola consulta de base de datos, en lugar de hacer múltiples Variable.get() llamadas. Alternativamente, use Plantillas de jinjacomo las variables recuperadas de esta manera solo se procesan durante la ejecución de la tarea, no durante el análisis de DAG.

Eliminar los DAG innecesarios

Aunque parece obvio, siempre es importante recordar limpiar periódicamente DAG y archivos innecesarios de su entorno:

  • Eliminar los DAG no utilizados: Revisa tu dags carpeta y eliminar cualquier archivo que ya no sea necesario.
  • Usar .airflowignore: Especifique que el flujo de aire de archivos debe ignorar intencionalmente, saltando el análisis.
  • Revisión de DAG pausados: Los DAG pausados ​​todavía están analizados por el planificador, consumiendo recursos. Si ya no son necesarios, considere eliminarlos o archivarlos.

Cambiar las configuraciones de flujo de aire

Por último, puede cambiar algunas configuraciones de flujo de aire para reducir el uso de recursos del planificador:

  • min_file_process_interval: Esta configuración controla con qué frecuencia (en segundos) el flujo de aire analiza sus archivos DAG. Aumentarlo de los 30 segundos predeterminados puede reducir la carga del planificador a costa de las actualizaciones DAG más lentas.
  • dag_dir_list_interval: Esto determina con qué frecuencia (en segundos) el flujo de aire escanea el dags Directorio para nuevos DAG. Si implementa nuevos DAG con poca frecuencia, considere aumentar este intervalo para reducir el uso de la CPU.

Hemos discutido mucho sobre la importancia de crear DAG optimizados para mantener un entorno de flujo de aire saludable. Pero, ¿cómo se mide realmente el tiempo de análisis de sus DAG? Afortunadamente, hay varias formas de hacer esto, dependiendo de su implementación de flujo de aire u sistema operativo.

Por ejemplo, si tienes un Compositor de nubes Implementación, puede recuperar fácilmente un informe de PARSE DAG ejecutando el siguiente comando en Google CLI:

gcloud composer environments run $ENVIRONMENT_NAME \
— location $LOCATION \
dags report

Si bien recuperar las métricas de análisis es sencilla, medir la efectividad de sus optimizaciones de código puede ser menos. Cada vez que modifica su código, debe redistribuir el archivo de Python actualizado a su proveedor de la nube, esperar a que se analice el DAG y luego extraer un nuevo informe, un proceso lento y lento.

Otro enfoque posible, si está en Linux o Mac, es ejecutar este comando para medir el tiempo de análisis localmente en su máquina:

time python airflow/example_dags/example.py

Sin embargo, aunque simple, este enfoque no es práctico para medir y comparar sistemáticamente los tiempos de análisis de múltiples DAG.

Para abordar estos desafíos, creé el airflow-parse-benchuna biblioteca de Python que simplifica la medición y la comparación de los tiempos de análisis de sus DAG utilizando el método de análisis nativo de Airflow.

El airflow-parse-bench La herramienta facilita el almacenamiento de los tiempos de análisis, comparar los resultados y estandarizar las comparaciones en sus DAG.

Instalación de la biblioteca

Antes de la instalación, se recomienda usar un virtualenv para evitar conflictos de biblioteca. Una vez configurado, puede instalar el paquete ejecutando el siguiente comando:

pip install airflow-parse-bench

Nota: Este comando solo instala las dependencias esenciales (relacionadas con el flujo de aire y los proveedores de flujo de aire). Debe instalar manualmente cualquier biblioteca adicional de las que dependan sus DAG.

Por ejemplo, si un DAG usa boto3 Para interactuar con AWS, asegúrese de que boto3 está instalado en su entorno. De lo contrario, encontrará errores de análisis.

Después de eso, es necesario inicializar su base de datos de flujo de aire. Esto se puede hacer ejecutando el siguiente comando:

airflow db init

Además, si sus DAG usan Variables de flujo de airedebes definirlos localmente también. Sin embargo, no es necesario poner valores reales en sus variables, ya que los valores reales no son necesarios para fines de análisis:

airflow variables set MY_VARIABLE 'ANY TEST VALUE'

Sin esto, encontrarás un error como:

error: 'Variable MY_VARIABLE does not exist'

Usando la herramienta

Después de instalar la biblioteca, puede comenzar a medir los tiempos de análisis. Por ejemplo, suponga que tiene un archivo DAG con nombre dag_test.py que contiene el código DAG no optimizado utilizado en el ejemplo anterior.

Para medir su tiempo de análisis, simplemente ejecute:

airflow-parse-bench --path dag_test.py

Esta ejecución produce la siguiente salida:

Resultado de ejecución. Imagen del autor.

Como se observó, nuestro DAG presentó un tiempo de análisis de 0.61 segundos. Si ejecuto el comando nuevamente, veré algunas pequeñas diferencias, ya que los tiempos de análisis pueden variar ligeramente a través de las ejecuciones debido al sistema y los factores ambientales:

Resultado de otra ejecución del mismo DAG. Imagen del autor.

Para presentar un número más conciso, es posible agregar ejecuciones múltiples especificando el número de iteraciones:

airflow-parse-bench --path dag_test.py --num-iterations 5

Aunque lleva un poco más tiempo terminar, esto calcula el Tiempo de análisis promedio en cinco ejecuciones.

Ahora, para evaluar el impacto de las optimizaciones antes mencionadas, reemplacé el código en midag_test.py con la versión optimizada compartida anteriormente. Después de ejecutar el mismo comando, obtuve el siguiente resultado:

Resultado de análisis del código optimizado. Imagen del autor.

Como se indicó, solo aplicar algunas buenas prácticas era capaz de reducir casi 0.5 segundos ¡En el tiempo de análisis de DAG, destacando la importancia de los cambios que hicimos!

Hay otras características interesantes que creo que es relevante para compartir.

Como recordatorio, si tiene dudas o problemas para usar la herramienta, puede acceder a la documentación completa en Github.

Además de eso, para ver todos los parámetros admitidos por la biblioteca, simplemente ejecute:

airflow-parse-bench --help

Prueba de múltiples DAG

En la mayoría de los casos, es probable que tenga docenas de DAG para probar los tiempos de análisis. Para abordar este caso de uso, creé una carpeta llamada dags y coloque cuatro archivos de Python dentro de él.

Para medir los tiempos de análisis para todos los DAG en una carpeta, es necesario especificar la ruta de la carpeta en el --path parámetro:

airflow-parse-bench --path my_path/dags

Ejecutar este comando produce una tabla que resume los tiempos de análisis para todos los DAG en la carpeta:

Prueba del tiempo de análisis de múltiples DAG. Imagen del autor.

Por defecto, la tabla se clasifica desde la DAG más rápida a la más lenta. Sin embargo, puede revertir el pedido utilizando el --order parámetro:

airflow-parse-bench --path my_path/dags --order desc
Orden de clasificación invertida. Imagen del autor.

Omitir Dags sin cambios

El --skip-unchanged El parámetro puede ser especialmente útil durante el desarrollo. Como su nombre indica, esta opción omite la ejecución de análisis para DAG que no se han modificado desde la última ejecución:

airflow-parse-bench --path my_path/dags --skip-unchanged

Como se muestra a continuación, cuando los DAG permanecen sin cambios, la salida no refleja diferencias en los tiempos de análisis:

Salida sin diferencia para archivos sin cambios. Imagen del autor.

Restablecer la base de datos

Toda la información de DAG, incluidas las métricas y la historia, se almacena en una base de datos SQLite local. Si desea borrar todos los datos almacenados y comenzar de nuevo, use el --reset-db bandera:

airflow-parse-bench --path my_path/dags --reset-db

Este comando restablece la base de datos y procesa los DAG como si fuera la primera ejecución.

El tiempo de análisis es una métrica importante para mantener entornos de flujo de aire escalables y eficientes, especialmente a medida que sus requisitos de orquestación se vuelven cada vez más complejos.

Por esta razón, el airflow-parse-bench La biblioteca puede ser una herramienta importante para ayudar a los ingenieros de datos a crear mejores DAG. Al probar el tiempo de análisis de sus DAG localmente, puede encontrar su cuello de botella de código y rápidamente, lo que hace que sus DAG sean más rápidos y más desempeñados.

Dado que el código se ejecuta localmente, el tiempo de análisis producido no será el mismo que el presente en su clúster de flujo de aire. Sin embargo, si puede reducir el tiempo de análisis en su máquina local, lo mismo podría reproducirse en su entorno de nubes.

¡Finalmente, este proyecto está abierto para la colaboración! Si tiene sugerencias, ideas o mejoras, no dude en contribuir en Github.