¿Qué podemos hacer cuando la memoria se convierte en el nuevo cuello de botella en la ingeniería de datos?

Sin duda, la memoria se ha convertido en un recurso fundamental. A medida que la demanda de infraestructura de memoria y almacenamiento impulsada por el auge de la IA ha alcanzado máximos históricos, empresas como Micron Technology y Sandisk han atraído una atención sin precedentes y han aumentado los precios de los productos al ejercer un fuerte poder de fijación de precios. Pero esto no es una buena noticia para las empresas que crean aplicaciones con uso intensivo de datos, dependen del almacenamiento de alta capacidad para el entrenamiento de IA, implementan análisis a gran escala u operan con márgenes ajustados en servicios en la nube.

Para los ingenieros de datos, esto no es sólo una noticia del mercado. Más bien, es una limitación diaria. Cuando la RAM y la memoria flash se vuelven cada vez más caras, el viejo reflejo de “agregar más capacidad” ya no funciona. Los presupuestos no crecen con el volumen de datos y las facturas de la nube están bajo escrutinio. Como ingenieros de datos, ¿qué podemos hacer cuando nuestro conjunto de datos se duplica pero el clúster no? Debemos ser creativos.

En este artículo, comenzaré con un desafío ETL del mundo real que requiere completar la transformación de datos de más de 6 millones de publicaciones en redes sociales con campos de datos de tipos mixtos dentro de una capacidad informática limitada. Explicaré algunas soluciones, incluidas las clásicas y las de vanguardia, para mantener su proceso de ETL en funcionamiento sin necesidad de una actualización de hardware o de la nube.

El problema: un conjunto de datos de 6,2 millones de filas que no cabía en la memoria

La historia comienza a partir de una nueva canalización ETL que vas a crear. Los datos sin procesar son 6,2 millones de publicaciones de una plataforma de redes sociales. El conjunto de datos se extrae de la API de redes sociales y se convierte en más de 200 columnas después del aplanamiento JSON, y la parte más problemática es: una gran cantidad de campos de datos son tipos de datos mixtos.

A continuación se muestran algunos ejemplos de estas columnas en el formato JSON original:

{ “reaction_count”: 1250 } { “reaction_count”: “1250” } { “reaction_count”: null } { “hashtags”: [“AI”, “Python”]
} { “hashtags”: “AI” } { “hashtags”: nulo }

Debido a que PySpark requiere un esquema consistente y el esquema de API de redes sociales cambia de vez en cuando, considere usar Pandas para abordar las columnas de tipos mixtos. A diferencia de PySpark, Pandas almacena estas columnas como objetos de forma predeterminada y no requiere que cada fila se ajuste al mismo esquema.

importar pandas como pd def normalize_mixed_columns(df, Mixed_columns): “”” Convierte columnas de tipo mixto en cadenas. “”” clean_df = df.copy() para la columna en Mixed_columns: clean_df[column] = (limpiado_df[column]
.dónde(limpiado_df[column].isna(), clean_df[column].astype(str)) ) return clean_df social_posts_clean = normalize_mixed_columns( social_posts_df, Mixed_columns )

Sencillo y directo. Sin embargo, cuando intentó ejecutar este código, el proceso de ejecución finalizó porque el uso de la memoria excedió los recursos disponibles. El trabajo fracasó.

Una solución clásica: reducir el pico de memoria con procesamiento basado en fragmentos

El cuello de botella son los 6,2 millones de filas. El tamaño del conjunto de datos es de aproximadamente 30 GB, que es mayor que la instancia de memoria de trabajador en la nube estándar. Excede la memoria de trabajo disponible durante las transformaciones de marcos de datos intermedios. Entonces, en lugar de realizar conversiones de tipos de datos para toda la columna, lo que obliga a Pandas a materializar grandes objetos temporales en la memoria, la técnica de fragmentación divide cada columna en fragmentos. En este caso, es adecuado establecer el tamaño de fragmentación en 250.000. Por lo tanto, Pandas solo necesita avanzar 250.000 filas a la vez, luego liberar la memoria y pasar al siguiente fragmento.

importar gc def normalize_mixed_columns_chunked( df, Mixed_columns, chunk_size=250000 ): clean_df = df.copy() para la columna en Mixed_columns: col_idx = clean_df.columns.get_loc(columna) para comenzar en rango(0, len(cleaned_df), chunk_size): end = min(start + chunk_size, len(cleaned_df)) fragmento = clean_df.iloc[start:end, col_idx]

máscara = trozo.notna() si máscara.cualquiera(): trozo = trozo.astype(objeto) trozo.loc[mask] = ( trozo.loc[mask]
.astype(str) .values ​​) clean_df.iloc[start:end, col_idx] = fragmento.valores del fragmento de la máscara gc.collect() return clean_df social_posts_clean = normalize_mixed_columns_chunked( social_posts_df, Mixed_columns )

Una vez que la memoria máxima se vuelve mucho más pequeña, la transformación de datos se completa con éxito y la canalización se vuelve estable. Sin embargo, el tiempo de ejecución se vuelve mucho más largo. Esto no es sorprendente, ya que la técnica de fragmentación es fundamentalmente una compensación. Cambia la velocidad de ejecución por la confiabilidad de la canalización.

De la fragmentación manual a la ejecución paralela automatizada

Además de la fragmentación manual en Pandas, Dask divide automáticamente un DataFrame en varias particiones más pequeñas y resuelve la falla de la memoria durante la transformación de datos. Pero su mecánica de ejecución interna es diferente a la fragmentación. Cuando configuro chunk_size en Pandas, lee un fragmento, ejecuta mi código en él, lo elimina de la RAM y luego pasa al siguiente. Utiliza un núcleo de CPU a la vez, por lo que para los servicios en la nube que proporcionan múltiples núcleos de CPU, no aprovechó todas las capacidades. Además, tengo que escribir manualmente un bucle para agregar los resultados, lo que hace que el código sea complejo y largo.

Dask divide el conjunto de datos en fragmentos automáticamente. Dask construye un gráfico de tareas y programa particiones en los núcleos de CPU disponibles; esto reduce significativamente el tiempo de ejecución.

Sin embargo, no podemos ignorar el problema de los tipos de datos mixtos en Dask. Debido a que un Dask DataFrame se compone de múltiples particiones Pandas DataFrame, al leer archivos CSV o JSON, Dask infiere tipos de datos a partir de una muestra de los datos. Si una columna contiene valores inconsistentes, por ejemplo, cadenas vacías (“”), Ninguno, enteros y cadenas, Dask probablemente generará un ValueError, TypeError o un error de inferencia de metadatos. Esto sucede porque Dask infiere el tipo de datos de una columna a partir de una muestra inicial de datos y supone que la columna es un número entero. Pero si luego encuentra una cadena en uno de los siguientes fragmentos, Dask genera un error.

Para resolver este problema, debemos especificar explícitamente los tipos de datos de columna esperados en lugar de depender de la inferencia automática. El siguiente código sirve para utilizar Dask para realizar la transformación de datos para columnas de tipos mixtos.

importar dask.dataframe como dd df = dd.read_parquet( “social_posts.parquet”, motor = “pyarrow”) Mixed_columns = [
“hashtags”,
“mentions”,
“location”,
“reaction_count”,
]

para columna en columnas_mixtas: df[column] = gl[column].map(str, meta=(columna, ‘str’)) df.to_parquet(“social_posts_clean/”, motor=”pyarrow”)

Dask no es tan flexible como la fragmentación de Pandas cuando maneja columnas dinámicas con tipos de datos mixtos porque requiere especificar qué columnas convertir. Además, todavía ejecuta muchas operaciones de Pandas dentro de cada partición, por lo que las cargas de trabajo dominadas por columnas de objetos de Python pueden consumir mucha memoria y no ser rápidas al procesar conjuntos de datos de millones de filas. ¿Existe alguna otra solución eficiente en cuanto a caché de CPU?

Una alternativa más fuerte: los polares

Quizás se pregunte si existe algún método que pueda equilibrar la velocidad y la eficiencia de la memoria. La respuesta es Polars, una biblioteca DataFrame implementada en Rust Engine. En comparación con Python, Rust produce código de máquina nativo altamente optimizado y ofrece una excelente gestión de la memoria. Minimiza las asignaciones de memoria y elimina la sobrecarga de recolección de basura. Sin embargo, Rust también tiene sus inconvenientes. Su velocidad de desarrollo es mucho más lenta que la de Python debido a estrictas comprobaciones del compilador y su curva de aprendizaje es extremadamente pronunciada. Esas son las razones por las que es mucho menos popular que Python desde que se creó en 2010. ¿Eso significa que los ingenieros de datos no pueden usar este método si no están familiarizados con Rust?

Polars es una biblioteca DataFrame ultrarrápida creada con el motor Rust y expuesta a través de una API de Python. Se lanzó en 2020 y está diseñado para manejar conjuntos de datos masivos mucho más rápido que los Python Pandas nativos. Mantiene la potencia del motor Rust pero permite a los usuarios de Python importar desde una biblioteca de Python.

Polars utiliza el formato de datos en columnas en memoria Apache Arrow, que está diseñado para minimizar las copias de memoria y al mismo tiempo maximizar la eficiencia de la caché de la CPU. Ejecuta la operación .cast(pl.String) directamente en el código Rust. Estas características permiten que Polars se ejecute varias veces más rápido que Python y use solo una fracción de la memoria.

Polars crea un plan de consulta diferido y el optimizador de consultas determina el plan de ejecución más eficiente antes de leer los datos. Estas mecánicas reducen el uso innecesario de memoria. Por lo tanto, cuando maneja conjuntos de datos que exceden la memoria disponible, Polars puede procesar los datos en modo de transmisión y evita que todo el conjunto de datos se cargue en la RAM a la vez.

importar polares como pl df = pl.read_parquet(“social_posts.parquet”) Mixed_columns = [
“hashtags”,
“mentions”,
“location”,
“reaction_count”,
]

df = df.con_columnas([
pl.col(col).cast(pl.String) for col in mixed_columns
]) df.write_parquet( “social_posts_clean.parquet”)

Polars supera a las dos soluciones anteriores en administración de memoria porque está diseñado desde cero para usar la memoria de manera más eficiente, mientras que Pandas Chunking y Dask se enfocan en reducir la presión de la memoria durante la ejecución.

Sin embargo, Polars no es una solución milagrosa. Tiene sus propias desventajas. Primero, Polars presenta su propia API DataFrame aunque usa Python. Las operaciones comunes de Pandas como apply(), indexación y sintaxis groupby a menudo necesitan reescribirse. En segundo lugar, muchas bibliotecas de terceros todavía se basan en Pandas, por lo que la integración con Polars aún requiere la conversión entre formatos de DataFrame.

Pensamientos finales

Entonces, ¿Pandas está desactualizado? No necesariamente.

Cada uno de los tres enfoques resuelve un problema diferente. La mejor elección depende más de las limitaciones que de la última tecnología.

Si tiene recursos informáticos y esquemas dinámicos de proceso realmente limitados, la fragmentación de Pandas sigue siendo una excelente solución. Reduce drásticamente el uso máximo de memoria. La contrapartida es el tiempo de ejecución. Pero en muchos entornos de producción, un proceso más lento pero más estable es mucho más valioso que uno más rápido que falla repetidamente.

Si su carga de trabajo ya ha superado la capacidad de una sola máquina y desea utilizar varios núcleos de CPU, Dask es una mejor opción. Automatiza la partición y la ejecución paralela. Sin embargo, se debe prestar atención a la coherencia del esquema y a los tipos de datos, especialmente cuando se trabaja con datos semiestructurados.

Polars se selecciona cuando la carga de trabajo es crítica para el rendimiento y ha aprendido la nueva API DataFrame. Polar suele considerarse la opción más sólida debido a su motor Rust, formato de memoria Apache Arrow y optimizador de consultas. Todas estas características permiten a Polars procesar grandes conjuntos de datos con un consumo de memoria significativamente menor y un rendimiento mucho mayor. De manera similar a Dask, debe abordar los problemas causados ​​por tipos de datos mixtos y garantizar la coherencia del esquema.

Para concluir, la optimización de la memoria no se trata de encontrar la mejor solución. Más bien, se trata de comprender las limitaciones de su proyecto y elegir la herramienta adecuada. En la era de la IA, la capacidad de optimizar los canales de datos bajo limitaciones de memoria se está convirtiendo en una habilidad valiosa para los ingenieros de datos.

Una canalización ETL sólida requiere más que eficiencia de memoria. También depende de la capacidad de prueba, el mantenimiento y la confiabilidad de la implementación.

Este artículo es parte de mi serie práctica de ingeniería de datos. Si está interesado en crear canales ETL listos para producción más allá de la optimización del rendimiento, también puede disfrutar del artículo ¿Su primera tarea como ingeniero de datos en una nueva empresa? Haga que el canal ETL sea comprobable, que cubre la configuración del entorno, las pruebas automatizadas y el desarrollo asistido por IA.

¡Gracias por tu lectura!