Comparación del rendimiento de formatos de archivos de Big Data: una guía práctica |  de Sarthak Sarbahi |  enero de 2024

Configuración del entorno

En esta guía, usaremos JupyterLab con Docker y MinIO. Piense en Docker como una herramienta útil que simplifica la ejecución de aplicaciones y en MinIO como una solución de almacenamiento flexible perfecta para manejar muchos tipos diferentes de datos. Así es como configuraremos las cosas:

No voy a profundizar en cada paso porque ya hay una gran tutorial para eso. Sugiero echarle un vistazo primero y luego volver para continuar con este.

Una vez que todo esté listo, comenzaremos preparando nuestros datos de muestra. Abra un nuevo cuaderno Jupyter para comenzar.

Primero, necesitamos instalar el s3fs Paquete Python, esencial para trabajar con MinIO en Python.

!pip install s3fs

Después de eso, importaremos las dependencias y módulos necesarios.

import os
import s3fs
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
import pyspark.sql.functions as F
from pyspark.sql import Row
import pyspark.sql.types as T
import datetime
import time

También configuraremos algunas variables de entorno que serán útiles al interactuar con MinIO.

# Define environment variables
os.environ["MINIO_KEY"] = "minio"
os.environ["MINIO_SECRET"] = "minio123"
os.environ["MINIO_ENDPOINT"] = "http://minio1:9000"

Luego, configuraremos nuestra sesión de Spark con la configuración necesaria.

# Create Spark session
spark = SparkSession.builder \
.appName("big_data_file_formats") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.spark:spark-avro_2.12:3.5.0,io.delta:delta-spark_2.12:3.0.0") \
.config("spark.hadoop.fs.s3a.endpoint", os.environ["MINIO_ENDPOINT"]) \
.config("spark.hadoop.fs.s3a.access.key", os.environ["MINIO_KEY"]) \
.config("spark.hadoop.fs.s3a.secret.key", os.environ["MINIO_SECRET"]) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.enableHiveSupport() \
.getOrCreate()

Simplifiquemos esto para entenderlo mejor.

  • spark.jars.packages: Descarga los archivos JAR necesarios del repositorio maven. Un repositorio Maven es un lugar central que se utiliza para almacenar artefactos de compilación como archivos JAR, bibliotecas y otras dependencias que se utilizan en proyectos basados ​​en Maven.
  • spark.hadoop.fs.s3a.endpoint: Esta es la URL del punto final de MinIO.
  • spark.hadoop.fs.s3a.access.key y spark.hadoop.fs.s3a.secret.key: Esta es la clave de acceso y la clave secreta para MinIO. Tenga en cuenta que es el mismo que el nombre de usuario y la contraseña utilizados para acceder a la interfaz web de MinIO.
  • spark.hadoop.fs.s3a.path.style.access: Se establece en verdadero para habilitar el acceso de estilo de ruta para el depósito MinIO.
  • spark.hadoop.fs.s3a.impl: Esta es la clase de implementación para el sistema de archivos S3A.
  • spark.sql.extensions: Registra las configuraciones y los comandos SQL de Delta Lake dentro del analizador Spark SQL.
  • spark.sql.catalog.spark_catalog: establece el catálogo de Spark en el catálogo de Delta Lake, lo que permite que Delta Lake maneje la administración de tablas y las operaciones de metadatos.

Elegir la versión JAR correcta es crucial para evitar errores. Usando la misma imagen de Docker, la versión JAR mencionada aquí debería funcionar bien. Si tiene problemas de configuración, no dude en dejar un comentario. Haré todo lo posible para ayudarte 🙂

Nuestro siguiente paso es crear un gran marco de datos Spark. Tendrá 10 millones de filas, divididas en diez columnas: la mitad son texto y la otra mitad son números.

# Generate sample data
num_rows = 10000000
df = spark.range(0, num_rows)

# Add columns
for i in range(1, 10): # Since we already have one column
if i % 2 == 0:
# Integer column
df = df.withColumn(f"int_col_{i}", (F.randn() * 100).cast(T.IntegerType()))
else:
# String column
df = df.withColumn(f"str_col_{i}", (F.rand() * num_rows).cast(T.IntegerType()).cast("string"))

df.count()

Echemos un vistazo a las primeras entradas para ver cómo se ven.

# Show rows from sample data
df.show(10,truncate = False)

+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|id |str_col_1|int_col_2|str_col_3|int_col_4|str_col_5|int_col_6|str_col_7|int_col_8|str_col_9|
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
|0 |7764018 |128 |1632029 |-15 |5858297 |114 |1025493 |-88 |7376083 |
|1 |2618524 |118 |912383 |235 |6684042 |-115 |9882176 |170 |3220749 |
|2 |6351000 |75 |3515510 |26 |2605886 |89 |3217428 |87 |4045983 |
|3 |4346827 |-70 |2627979 |-23 |9543505 |69 |2421674 |-141 |7049734 |
|4 |9458796 |-106 |6374672 |-142 |5550170 |25 |4842269 |-97 |5265771 |
|5 |9203992 |23 |4818602 |42 |530044 |28 |5560538 |-75 |2307858 |
|6 |8900698 |-130 |2735238 |-135 |1308929 |22 |3279458 |-22 |3412851 |
|7 |6876605 |-35 |6690534 |-41 |273737 |-178 |8789689 |88 |4200849 |
|8 |3274838 |-42 |1270841 |-62 |4592242 |133 |4665549 |-125 |3993964 |
|9 |4904488 |206 |2176042 |58 |1388630 |-63 |9364695 |78 |2657371 |
+---+---------+---------+---------+---------+---------+---------+---------+---------+---------+
only showing top 10 rows

Para comprender la estructura de nuestro marco de datos, usaremos df.printSchema() para ver los tipos de datos que contiene. Después de esto, crearemos cuatro archivos CSV. Estos se utilizarán para Parquet, Avro, ORC y Delta Lake. Estamos haciendo esto para evitar cualquier sesgo en las pruebas de rendimiento: usar el mismo CSV permite que Spark almacene en caché y optimice las cosas en segundo plano.

# Write 4 CSVs for comparing performance for every file type
df.write.csv("s3a://mybucket/ten_million_parquet.csv")
df.write.csv("s3a://mybucket/ten_million_avro.csv")
df.write.csv("s3a://mybucket/ten_million_orc.csv")
df.write.csv("s3a://mybucket/ten_million_delta.csv")

Ahora, crearemos cuatro marcos de datos separados a partir de estos CSV, cada uno para un formato de archivo diferente.

# Read all four CSVs to create dataframes
schema = T.StructType([
T.StructField("id", T.LongType(), nullable=False),
T.StructField("str_col_1", T.StringType(), nullable=True),
T.StructField("int_col_2", T.IntegerType(), nullable=True),
T.StructField("str_col_3", T.StringType(), nullable=True),
T.StructField("int_col_4", T.IntegerType(), nullable=True),
T.StructField("str_col_5", T.StringType(), nullable=True),
T.StructField("int_col_6", T.IntegerType(), nullable=True),
T.StructField("str_col_7", T.StringType(), nullable=True),
T.StructField("int_col_8", T.IntegerType(), nullable=True),
T.StructField("str_col_9", T.StringType(), nullable=True)
])

df_csv_parquet = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_parquet.csv")
df_csv_avro = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_avro.csv")
df_csv_orc = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_orc.csv")
df_csv_delta = spark.read.format("csv").option("header",True).schema(schema).load("s3a://mybucket/ten_million_delta.csv")

¡Y eso es! Estamos todos listos para explorar estos formatos de archivos de big data.

Trabajar con parquet

Parquet es un formato de archivo orientado a columnas que combina muy bien con Apache Spark, lo que lo convierte en la mejor opción para manejar big data. Brilla en escenarios analíticos, particularmente cuando se examinan datos columna por columna.

Una de sus características interesantes es la capacidad de almacenar datos en un formato comprimido, con compresión rápida siendo la opción preferida. Esto no sólo ahorra espacio sino que también mejora el rendimiento.

Otro aspecto interesante de Parquet es su enfoque flexible para los esquemas de datos. Puede comenzar con una estructura básica y luego expandirla suavemente agregando más columnas a medida que crezcan sus necesidades. Esta adaptabilidad lo hace muy fácil de usar para proyectos de datos en evolución.

Ahora que dominamos el parquet, pongámoslo a prueba. Vamos a escribir 10 millones de registros en un archivo Parquet y vigilaremos cuánto tiempo lleva. En lugar de utilizar el %timeit La función Python, que se ejecuta varias veces y puede consumir muchos recursos para tareas de big data, solo la mediremos una vez.

# Write data as Parquet
start_time = time.time()
df_csv_parquet.write.parquet("s3a://mybucket/ten_million_parquet2.parquet")
end_time = time.time()
print(f"Time taken to write as Parquet: {end_time - start_time} seconds")

Para mí, esta tarea tomó 15,14 segundos, pero recuerda, este tiempo puede cambiar dependiendo de tu computadora. Por ejemplo, en una PC menos potente, tomó más tiempo. Así que no te preocupes si tu momento es diferente. Lo importante aquí es comparar el rendimiento entre diferentes formatos de archivo.

A continuación, ejecutaremos una consulta de agregación en nuestros datos de Parquet.

# Perfom aggregation query using Parquet data
start_time = time.time()
df_parquet = spark.read.parquet("s3a://mybucket/ten_million_parquet2.parquet")
df_parquet \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+

Esta consulta terminó en 12,33 segundos. Muy bien, ahora cambiemos de tema y exploremos el formato de archivo ORC.

Trabajando con ORC

El formato de archivo ORC, otro competidor orientado a columnas, puede que no sea tan conocido como Parquet, pero tiene sus propias ventajas. Una característica destacada es su capacidad para comprimir datos incluso de manera más efectiva que Parquet, mientras utiliza el mismo algoritmo de compresión ágil.

Es un éxito en el mundo de Hive, gracias a su soporte para operaciones ACID en tablas de Hive. ORC también está hecho a medida para manejar lecturas de streaming grandes de manera eficiente.

Además, es tan flexible como Parquet cuando se trata de esquemas: puede comenzar con una estructura básica y luego agregar más columnas a medida que crece su proyecto. Esto convierte a ORC en una opción sólida para las necesidades cambiantes de big data.

Profundicemos en las pruebas del rendimiento de escritura de ORC.

# Write data as ORC
start_time = time.time()
df_csv_orc.write.orc("s3a://mybucket/ten_million_orc2.orc")
end_time = time.time()
print(f"Time taken to write as ORC: {end_time - start_time} seconds")

Me tomo 12,94 segundos para completar la tarea. Otro punto de interés es el tamaño de los datos escritos en el depósito MinIO. En el ten_million_orc2.orc carpeta, encontrará varios archivos de partición, cada uno de un tamaño constante. Cada archivo ORC de partición tiene aproximadamente 22,3 MBy hay 16 archivos en total.

Archivos de partición ORC (Imagen del autor)

Comparando esto con Parquet, cada archivo de partición de Parquet tiene aproximadamente 26,8 MB, totalizando también 16 archivos. Esto muestra que ORC ofrece una mejor compresión que Parquet.

A continuación, probaremos cómo ORC maneja una consulta de agregación. Estamos utilizando la misma consulta para todos los formatos de archivo para mantener nuestra evaluación comparativa justa.

# Perform aggregation using ORC data
df_orc = spark.read.orc("s3a://mybucket/ten_million_orc2.orc")
start_time = time.time()
df_orc \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+

La consulta ORC finalizó en 13,44 segundos, un pelín más que la época de Parquet. Con ORC marcado de nuestra lista, pasemos a experimentar con Avro.

Trabajando con Avro

Avro es un formato de archivo basado en filas con sus propias fortalezas únicas. Si bien no comprime datos tan eficientemente como Parquet u ORC, lo compensa con una velocidad de escritura más rápida.

Lo que realmente distingue a Avro son sus excelentes capacidades de evolución de esquemas. Maneja cambios como campos agregados, eliminados o modificados con facilidad, lo que lo convierte en una opción ideal para escenarios donde las estructuras de datos evolucionan con el tiempo.

Avro es particularmente adecuado para cargas de trabajo que implican una gran cantidad de escritura de datos.

Ahora, veamos cómo le va a Avro al escribir datos.

# Write data as Avro
start_time = time.time()
df_csv_avro.write.format("avro").save("s3a://mybucket/ten_million_avro2.avro")
end_time = time.time()
print(f"Time taken to write as Avro: {end_time - start_time} seconds")

Me tomo 12,81 segundos, que en realidad es más rápido que Parquet y ORC. A continuación, veremos el rendimiento de Avro con una consulta de agregación.

# Perform aggregation using Avro data
df_avro = spark.read.format("avro").load("s3a://mybucket/ten_million_avro2.avro")
start_time = time.time()
df_avro \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |6429997 |1 |
+---------+---------+-----+

Esta consulta tomó aproximadamente 15,42 segundos. Entonces, cuando se trata de consultas, Parquet y ORC están por delante en términos de velocidad. Muy bien, es hora de explorar nuestro formato de archivo final y más nuevo: Delta Lake.

Trabajando con el lago Delta

Delta Lake es una nueva estrella en el universo de formatos de archivos de big data, estrechamente relacionado con Parquet en términos de tamaño de almacenamiento: es como Parquet pero con algunas características adicionales.

Al escribir datos, Delta Lake tarda un poco más que Parquet, principalmente debido a su _delta_log carpeta, que es clave para sus capacidades avanzadas. Estas capacidades incluyen el cumplimiento de ACID para transacciones confiables, viajes en el tiempo para acceder a datos históricos y compactación de archivos pequeños para mantener todo ordenado.

Si bien es un recién llegado a la escena de big data, Delta Lake se ha convertido rápidamente en un favorito en las plataformas en la nube que ejecutan Spark, superando su uso en sistemas locales.

Pasemos a probar el rendimiento de Delta Lake, comenzando con una prueba de escritura de datos.

# Write data as Delta
start_time = time.time()
df_csv_delta.write.format("delta").save("s3a://mybucket/ten_million_delta2.delta")
end_time = time.time()
print(f"Time taken to write as Delta Lake: {end_time - start_time} seconds")

La operación de escritura tomó 17,78 segundos, que es un poco más largo que los otros formatos de archivo que hemos analizado. Algo interesante a destacar es que en el ten_million_delta2.delta carpeta, cada archivo de partición es en realidad un archivo Parquet, similar en tamaño a lo que observamos con Parquet. Además, está el _delta_log carpeta.

Escribir datos como Delta Lake (Imagen del autor)

El _delta_log La carpeta en el formato de archivo Delta Lake juega un papel fundamental en la forma en que Delta Lake administra y mantiene la integridad y el control de versiones de los datos. Es un componente clave que distingue a Delta Lake de otros formatos de archivos de big data. Aquí hay un desglose simple de su función:

  1. Registro de transacciones: El _delta_log La carpeta contiene un registro de transacciones que registra cada cambio realizado en los datos de la tabla Delta. Este registro es una serie de archivos JSON que detallan las adiciones, eliminaciones y modificaciones de los datos. Actúa como un diario completo de todas las transacciones de datos.
  2. Cumplimiento del ÁCIDO: Este registro habilita el cumplimiento de ACID (Atomicidad, Consistencia, Aislamiento, Durabilidad). Cada transacción en Delta Lake, como escribir datos nuevos o modificar datos existentes, es atómica y consistente, lo que garantiza la integridad y confiabilidad de los datos.
  3. Viaje en el tiempo y auditoría: El registro de transacciones permite “viajar en el tiempo”, lo que significa que puede ver y restaurar fácilmente versiones anteriores de los datos. Esto es extremadamente útil para la recuperación de datos, la auditoría y la comprensión de cómo han evolucionado los datos con el tiempo.
  4. Aplicación y evolución del esquema: El _delta_log También realiza un seguimiento del esquema (estructura) de los datos. Aplica el esquema durante las escrituras de datos y permite una evolución segura del esquema a lo largo del tiempo sin dañar los datos.
  5. Operaciones de concurrencia y fusión: Gestiona lecturas y escrituras simultáneas, asegurando que varios usuarios puedan acceder y modificar los datos al mismo tiempo sin conflictos. Esto lo hace ideal para operaciones complejas como fusionar, actualizar y eliminar.

En resumen, el _delta_log La carpeta es el cerebro detrás de las funciones avanzadas de administración de datos de Delta Lake, y ofrece un registro de transacciones sólido, control de versiones y mejoras de confiabilidad que normalmente no están disponibles en formatos de archivo más simples como Parquet u ORC.

Ahora es el momento de ver cómo le va a Delta Lake con una consulta de agregación.

# Perform aggregation using Delta data
df_delta = spark.read.format("delta").load("s3a://mybucket/ten_million_delta2.delta")
start_time = time.time()
df_delta \
.select("str_col_5","str_col_7","int_col_2") \
.groupBy("str_col_5","str_col_7") \
.count() \
.orderBy("count") \
.limit(1) \
.show(truncate = False)
end_time = time.time()
print(f"Time taken for query: {end_time - start_time} seconds")

+---------+---------+-----+
|str_col_5|str_col_7|count|
+---------+---------+-----+
|1 |2906292 |1 |
+---------+---------+-----+

Esta consulta terminó en aproximadamente 15,51 segundos. Si bien esto es un poco más lento en comparación con Parquet y ORC, está bastante cerca. Sugiere que el desempeño de Delta Lake en escenarios del mundo real es bastante similar al de Parquet.

¡Impresionante! Hemos concluido todos nuestros experimentos. Recapitulemos nuestros hallazgos en la siguiente sección.

¿Cuándo utilizar qué formato de archivo?

Hemos concluido nuestras pruebas, así que reunamos todos nuestros hallazgos. En escritura de datos, Avro ocupa el primer lugar. Eso es realmente lo que mejor se le da en escenarios prácticos.

Cuando se trata de leer y ejecutar consultas de agregación, Parquet lidera el grupo. Sin embargo, esto no significa que ORC y Delta Lake se queden cortos. Como formatos de archivo en columnas, funcionan admirablemente en la mayoría de situaciones.

Comparación de rendimiento (Imagen del autor)

Aquí hay un resumen rápido:

  • Elija ORC para obtener la mejor compresión, especialmente si utiliza Hive y Pig para tareas analíticas.
  • ¿Trabajando con Spark? Parquet y Delta Lake son sus opciones preferidas.
  • Para escenarios con mucha escritura de datos, como áreas de zonas de aterrizaje, Avro es la mejor opción.

¡Y eso es un resumen de este tutorial!