Una guía de codificación para crear una canalización de datos de aprendizaje automático escalable de un extremo a otro utilizando Daft para el procesamiento de datos estructurados y de imágenes de alto rendimiento

En este tutorial, exploramos cómo utilizamos Daft como un motor de datos nativo de Python de alto rendimiento para crear un proceso analítico de un extremo a otro. Comenzamos cargando un conjunto de datos MNIST del mundo real y luego lo transformamos progresivamente utilizando UDF, ingeniería de características, agregaciones, uniones y ejecución diferida. Además, demostramos cómo combinar a la perfección el procesamiento de datos estructurados, la computación numérica y el aprendizaje automático. Al final, no solo estamos manipulando datos, sino que estamos construyendo una canalización completa lista para el modelo impulsada por el motor de ejecución escalable de Daft.

!pip -q instalar daft pyarrow pandas numpy scikit-learn importar sistema operativo os.environ[“DO_NOT_TRACK”] = “true” importar numpy como np importar pandas como pd importar tonto desde tonto importar col print(“Versión tonta:”, getattr(daft, “__version__”, “unknown”)) URL = “https://github.com/Eventual-Inc/mnist-json/raw/master/mnist_handwriting_test.json.gz” df = daft.read_json(URL) print(“\nSchema (muestreado):”) print(df.schema()) print(“\nPeek:”) df.show(5)

Instalamos Daft y sus bibliotecas de soporte directamente en Google Colab para garantizar un entorno limpio y reproducible. Configuramos ajustes opcionales y verificamos la versión instalada para confirmar que todo funciona correctamente. Al hacer esto, establecemos una base estable para construir nuestro canal de datos de un extremo a otro.

def to_28x28(pixels): arr = np.array(pixels, dtype=np.float32) if arr.size != 784: return Ninguno return arr.reshape(28, 28) df2 = ( df .with_column( “img_28x28”, col(“image”).apply(to_28x28, return_dtype=daft.DataType.python()) ) .with_column( “pixel_mean”, col(“img_28x28”).apply(lambda x: float(np.mean(x)) si x no es Ninguno más Ninguno, return_dtype=daft.DataType.float32()) ) .with_column( “pixel_std”, col(“img_28x28”).apply(lambda x: float(np.std(x)) si x no es Ninguno más Ninguno, return_dtype=daft.DataType.float32()) ) ) print(“\nDespués de remodelar + características simples:”) df2.select(“label”, “pixel_mean”, “pixel_std”).show(5)

Cargamos un conjunto de datos MNIST JSON del mundo real directamente desde una URL remota utilizando el lector nativo de Daft. Inspeccionamos el esquema y obtenemos una vista previa de los datos para comprender su estructura y tipos de columnas. Nos permite validar el conjunto de datos antes de aplicar transformaciones e ingeniería de características.

@daft.udf(return_dtype=daft.DataType.list(daft.DataType.float32()), lote_size=512) def featurize(images_28x28): out = []
para img en imágenes_28x28.to_pylist(): si img es Ninguno: out.append(None) continuar img = np.asarray(img, dtype=np.float32) row_sums = img.sum(axis=1) / 255.0 col_sums = img.sum(axis=0) / 255.0 total = img.sum() + 1e-6 ys, xs = np.indices(img.shape) cy = float((ys * img).sum() / total) / 28.0 cx = float((xs * img).sum() / total) / 28.0 vec = np.concatenate([row_sums, col_sums, np.array([cy, cx, img.mean()/255.0, img.std()/255.0]dtype=np.float32)]) out.append(vec.astype(np.float32).tolist()) return df3 = df2.with_column(“features”, featurize(col(“img_28x28”))) print(“\nColumna de características creada (lista)[float]):”) df3.select(“etiqueta”, “características”).show(2)

Remodelamos las matrices de píxeles sin procesar en imágenes estructuradas de 28 × 28 utilizando una UDF por filas. Calculamos características estadísticas, como la media y la desviación estándar, para enriquecer el conjunto de datos. Al aplicar estas transformaciones, convertimos datos de imágenes sin procesar en representaciones estructuradas y compatibles con modelos.

label_stats = ( df3.groupby(“label”) .agg( col(“label”).count().alias(“n”), col(“pixel_mean”).mean().alias(“mean_pixel_mean”), col(“pixel_std”).mean().alias(“mean_pixel_std”), ) .sort(“label”) ) print(“\nDistribución de etiquetas + estadísticas resumidas:”) label_stats.show(10) df4 = df3.join(label_stats, on=”label”, how=”left”) print(“\nEstadísticas de etiquetas unidas nuevamente en cada fila:”) df4.select(“label”, “n”, “mean_pixel_mean”, “mean_pixel_std”).show(5)

Implementamos una UDF por lotes para extraer vectores de características más ricos de las imágenes reformadas. Realizamos agregaciones grupales y unimos estadísticas resumidas al conjunto de datos para un enriquecimiento contextual. Esto demuestra cómo combinamos la computación escalable con análisis avanzados dentro de Daft.

pequeño = df4.select(“etiqueta”, “características”).collect().to_pandas() pequeño = pequeño.dropna(subconjunto=[“label”, “features”]).reset_index(drop=True) X = np.vstack(pequeño[“features”].apply(np.array).values).astype(np.float32) y = pequeño[“label”].astype(int).values de sklearn.model_selection importar train_test_split de sklearn.linear_model importar LogisticRegression de sklearn.metrics importar precision_score, Classification_report clf = LogisticRegression(max_iter=1000, n_jobs=None) clf.fit(X_train, y_train) pred = clf.predict(X_test) acc = precision_score(y_test, pred) print(“\nPrecisión de línea base (regresión logística diseñada por funciones):”, round(acc, 4)) print(“\nInforme de clasificación:”) print(classification_report(y_test, pred, digits=4)) out_df = df4.select(“label”, “features”, “pixel_mean”, “pixel_std”, “n”) out_path = “/content/daft_mnist_features.parquet” out_df.write_parquet(out_path) print(“\nEscribí parquet en:”, out_path) df_back = daft.read_parquet(out_path) print(“\nVerificación de lectura posterior:”) df_back.show(3)

Materializamos columnas seleccionadas en pandas y entrenamos un modelo de regresión logística de referencia. Evaluamos el rendimiento para validar la utilidad de nuestras funciones de ingeniería. Además, persistimos el conjunto de datos procesados ​​en formato Parquet, completando nuestro proceso de extremo a extremo desde la ingesta de datos sin procesar hasta el almacenamiento listo para producción.

En este tutorial, creamos un flujo de trabajo de datos de estilo de producción utilizando Daft, pasando de la ingestión de JSON sin formato a la ingeniería de funciones, la agregación, el entrenamiento de modelos y la persistencia de Parquet. Demostramos cómo integrar lógica UDF avanzada, realizar operaciones de unión y agrupación eficientes y materializar resultados para el aprendizaje automático posterior, todo dentro de un marco limpio y escalable. A través de este proceso, vimos cómo Daft nos permite manejar transformaciones complejas sin dejar de ser pitónicos y eficientes. Terminamos con un proceso reutilizable de extremo a extremo que muestra cómo podemos combinar la ingeniería de datos moderna y los flujos de trabajo de aprendizaje automático en un entorno unificado.

Consulte los códigos completos aquí. Además, no dude en seguirnos en Twitter y no olvide unirse a nuestro SubReddit de más de 120.000 ML y suscribirse a nuestro boletín. ¡Esperar! estas en telegrama? Ahora también puedes unirte a nosotros en Telegram.

Michal Sutter es un profesional de la ciencia de datos con una Maestría en Ciencias de Datos de la Universidad de Padua. Con una base sólida en análisis estadístico, aprendizaje automático e ingeniería de datos, Michal se destaca en transformar conjuntos de datos complejos en conocimientos prácticos.