TensorFlow Transform: cómo garantizar una preparación de datos sin problemas en la producción | por Akila Somasundaram | Jul, 2024

Aprovechamiento de TensorFlow Transform para escalar canales de datos para entornos de producción

Foto por Suzanne D. Williams en Dejar de salpicar

El preprocesamiento de datos es uno de los pasos principales en cualquier proceso de aprendizaje automático. Tensorflow Transform nos ayuda a lograrlo en un entorno distribuido sobre un conjunto de datos enorme.

Antes de profundizar en la transformación de datos, la validación de datos es el primer paso del proceso de producción, que se ha tratado en mi artículo. Validación de datos en un proceso de producción: el método TFXEche un vistazo a este artículo para comprenderlo mejor.

He utilizado Colab para esta demostración, ya que es mucho más fácil (y rápido) configurar el entorno. Si estás en la fase de exploración, también te recomendaría Colab, ya que te ayudará a concentrarte en las cosas más importantes.

Las operaciones de ML Pipeline comienzan con la ingesta y validación de datos, seguidas de la transformación. Los datos transformados se entrenan y se implementan. He abordado la parte de validación en mi artículo anterior. artículoAhora cubriremos la sección de transformación. Para comprender mejor las canalizaciones en Tensorflow, consulte el siguiente artículo.

Como se estableció anteriormente, utilizaremos Colab. Por lo tanto, solo necesitamos instalar la biblioteca tfx y listo.

! pip install tfx

Después de la instalación, reinicie la sesión para continuar.

Luego vienen las importaciones.

# Importing Libraries

import tensorflow as tf

from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.v1.components import ImportSchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform

from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from google.protobuf.json_format import MessageToDict

import os

Usaremos el conjunto de datos de la nave espacial Titanic de Kaggle, como en el artículo de validación de datos. Este conjunto de datos es de uso gratuito para fines comerciales y no comerciales. Puede acceder a él desde aquíEn la siguiente figura se muestra una descripción del conjunto de datos.

Para comenzar con la parte de transformación de datos, se recomienda crear carpetas donde se colocarán los componentes del pipeline (de lo contrario, se colocarán en el directorio predeterminado). He creado dos carpetas, una para los componentes del pipeline y la otra para nuestros datos de entrenamiento.

# Path to pipeline folder
# All the generated components will be stored here

_pipeline_root = '/content/tfx/pipeline/'

# Path to training data
# It can even contain multiple training data files
_data_root = '/content/tfx/data/'

A continuación, creamos el InteractiveContext y pasamos la ruta al directorio del pipeline. Este proceso también crea una base de datos sqlite para almacenar los metadatos del proceso del pipeline.

InteractiveContext está pensado para explorar cada etapa del proceso. En cada punto, podemos tener una vista de los artefactos que se crean. Cuando estamos en un entorno de producción, lo ideal es utilizar un marco de creación de pipelines como Apache Beam, donde todo este proceso se ejecutará automáticamente, sin intervención.

# Initializing the InteractiveContext 
# This will create an sqlite db for storing the metadata

context = InteractiveContext(pipeline_root=_pipeline_root)

A continuación, comenzamos con la ingesta de datos. Si los datos están almacenados como un archivo csv, podemos usar CsvExampleGen y pasar la ruta al directorio donde se almacenan los archivos de datos.

Asegúrate de que la carpeta contenga únicamente los datos de entrenamiento y nada más. Si tus datos de entrenamiento están divididos en varios archivos, asegúrate de que tengan el mismo encabezado.

# Input CSV files 
example_gen = CsvExampleGen(input_base=_data_root)

TFX actualmente admite csv, tf.Record, BigQuery y algunos ejecutores personalizados. Más información en el siguiente enlace.

Para ejecutar el componente ExampleGen, utilice context.run.

# Execute the component

context.run(example_gen)

Después de ejecutar el componente, este será nuestro resultado. Proporciona el ID de ejecución, los detalles del componente y dónde se guardan los resultados del componente.

Al expandirlo, deberíamos poder ver estos detalles.

La estructura del directorio se parece a la imagen que aparece a continuación. TFX ha creado todos estos artefactos para nosotros. También se versionan automáticamente y los detalles se almacenan en metadata.sqlite. El archivo sqlite ayuda a mantener la procedencia o el linaje de los datos.

Para explorar estos artefactos programáticamente, utilice el siguiente código.

# View the generated artifacts
artifact = example_gen.outputs['examples'].get()[0]

# Display split names and uri
print(f'split names: {artifact.split_names}')
print(f'artifact uri: {artifact.uri}')

La salida sería el nombre de los archivos y la URI.

Copiemos la URI del tren y observemos los detalles dentro del archivo. El archivo se almacena como un archivo zip y se guarda en formato TFRecordDataset.

# Get the URI of the output artifact representing the training examples
train_uri = os.path.join(artifact.uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

El siguiente código se obtiene de Tensorflow, es el código estándar que se puede usar para seleccionar registros de TFRecordDataset y nos devuelve los resultados para que los examinemos.

# Helper function to get individual examples
def get_records(dataset, num_records):
'''Extracts records from the given dataset.
Args:
dataset (TFRecordDataset): dataset saved by ExampleGen
num_records (int): number of records to preview
'''

# initialize an empty list
records = []

# Use the `take()` method to specify how many records to get
for tfrecord in dataset.take(num_records):

# Get the numpy property of the tensor
serialized_example = tfrecord.numpy()

# Initialize a `tf.train.Example()` to read the serialized data
example = tf.train.Example()

# Read the example data (output is a protocol buffer message)
example.ParseFromString(serialized_example)

# convert the protocol bufffer message to a Python dictionary
example_dict = (MessageToDict(example))

# append to the records list
records.append(example_dict)

return records

# Get 3 records from the dataset
sample_records = get_records(dataset, 3)

# Print the output
pp.pprint(sample_records)

Solicitamos 3 registros y el resultado es el siguiente: cada registro y sus metadatos se almacenan en formato de diccionario.

A continuación, pasamos al siguiente proceso, que consiste en generar las estadísticas de los datos mediante StatisticsGen. Pasamos los resultados del objeto example_gen como argumento.

Ejecutamos el componente utilizando statistics.run, con statistics_gen como argumento.

# Generate dataset statistics with StatisticsGen using the example_gen object

statistics_gen = StatisticsGen(
examples=example_gen.outputs['examples'])

# Execute the component
context.run(statistics_gen)

Podemos utilizar context.show para ver los resultados.

# Show the output statistics

context.show(statistics_gen.outputs['statistics'])

Se puede observar que es muy similar a la generación de estadísticas que analizamos en el artículo de TFDV. La razón es que TFX utiliza TFDV en segundo plano para realizar estas operaciones. Familiarizarse con TFDV ayudará a comprender mejor estos procesos.

El siguiente paso es crear el esquema. Esto se hace mediante SchemaGen pasando el objeto statistics_gen. Ejecute el componente y visualícelo mediante context.show.

# Generate schema using SchemaGen with the statistics_gen object

schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],
)

# Run the component
context.run(schema_gen)

# Visualize the schema

context.show(schema_gen.outputs['schema'])

La salida muestra detalles sobre el esquema subyacente de los datos. Nuevamente, lo mismo que en TFDV.

Si necesita realizar modificaciones al esquema presentado aquí, hágalo utilizando tfdv y cree un archivo de esquema. Puede pasarlo utilizando ImportSchemaGen y pedirle a tfx que utilice el nuevo archivo.

# Adding a schema file manually 
schema_gen = ImportSchemaGen(schema_file="path_to_schema_file/schema.pbtxt")

A continuación, validamos los ejemplos utilizando ExampleValidator. Pasamos statistics_gen y schema_gen como argumentos.

# Validate the examples using the ExampleValidator
# Pass statistics_gen and schema_gen objects

example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])

# Run the component.
context.run(example_validator)

Este debería ser el resultado ideal para demostrar que todo está bien.

En este punto, nuestra estructura de directorios se parece a la imagen de abajo. Podemos ver que para cada paso del proceso se crean los artefactos correspondientes.

Pasemos a la parte de transformación propiamente dicha. Ahora crearemos el archivo constants.py para agregar todas las constantes necesarias para el proceso.

# Creating the file containing all constants that are to be used for this project

_constants_module_file = 'constants.py'

Crearemos todas las constantes y las escribiremos en el archivo constants.py. Consulta el comando “%%writefile {_constants_module_file}”, este comando no permite que se ejecute el código, sino que escribe todo el código de la celda indicada en el archivo especificado.

%%writefile {_constants_module_file}

# Features with string data types that will be converted to indices
CATEGORICAL_FEATURE_KEYS = [ 'CryoSleep','Destination','HomePlanet','VIP']

# Numerical features that are marked as continuous
NUMERIC_FEATURE_KEYS = ['Age','FoodCourt','RoomService', 'ShoppingMall','Spa','VRDeck']

# Feature that can be grouped into buckets
BUCKET_FEATURE_KEYS = ['Age']

# Number of buckets used by tf.transform for encoding each bucket feature.
FEATURE_BUCKET_COUNT = {'Age': 4}

# Feature that the model will predict
LABEL_KEY = 'Transported'

# Utility function for renaming the feature
def transformed_name(key):
return key + '_xf'

Creemos el archivo transform.py, que contendrá el código real para transformar los datos.

# Creating a file that contains all preprocessing code for the project

_transform_module_file = 'transform.py'

Aquí, utilizaremos la biblioteca tensorflow_transform. El código para el proceso de transformación se escribirá bajo la función preprocessing_fn. Es obligatorio que utilicemos el mismo nombre, ya que tfx lo busca internamente durante el proceso de transformación.

%%writefile {_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft

import constants

# Unpack the contents of the constants module
_NUMERIC_FEATURE_KEYS = constants.NUMERIC_FEATURE_KEYS
_CATEGORICAL_FEATURE_KEYS = constants.CATEGORICAL_FEATURE_KEYS
_BUCKET_FEATURE_KEYS = constants.BUCKET_FEATURE_KEYS
_FEATURE_BUCKET_COUNT = constants.FEATURE_BUCKET_COUNT
_LABEL_KEY = constants.LABEL_KEY
_transformed_name = constants.transformed_name

# Define the transformations
def preprocessing_fn(inputs):

outputs = {}

# Scale these features to the range [0,1]
for key in _NUMERIC_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.scale_to_0_1(
inputs[key])

# Bucketize these features
for key in _BUCKET_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.bucketize(
inputs[key], _FEATURE_BUCKET_COUNT[key])

# Convert strings to indices in a vocabulary
for key in _CATEGORICAL_FEATURE_KEYS:
outputs[_transformed_name(key)] = tft.compute_and_apply_vocabulary(inputs[key])

# Convert the label strings to an index
outputs[_transformed_name(_LABEL_KEY)] = tft.compute_and_apply_vocabulary(inputs[_LABEL_KEY])

return outputs

Hemos utilizado algunas funciones de escalado y codificación estándar para esta demostración. La biblioteca de transformación, en realidad, alberga una gran cantidad de funciones. Explórelas aquí.

Ahora es el momento de ver el proceso de transformación en acción. Creamos un objeto Transform y pasamos los objetos example_gen y schema_gen, junto con la ruta al transform.py que creamos.

# Ignore TF warning messages
tf.get_logger().setLevel('ERROR')

# Instantiate the Transform component with example_gen and schema_gen objects
# Pass the path for transform file

transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_transform_module_file))

# Run the component
context.run(transform)

¡Ejecútalo y la parte de transformación estará completa!

Eche un vistazo a los datos transformados que se muestran en la siguiente imagen.

Ésta es tu pregunta ahora, ¿verdad?

Este proceso no está pensado para personas que desean preprocesar sus datos y comenzar con el entrenamiento de modelos. Está pensado para aplicarse en grandes cantidades de datos (datos que requieren un procesamiento distribuido) y en una cadena de producción automatizada que no puede permitirse el lujo de sufrir interrupciones.

Después de aplicar la transformación, la estructura de su carpeta se verá así

Contiene detalles previos y posteriores a la transformación. Además, también se crea un gráfico de transformación.

Recuerde que escalamos nuestras características numéricas usando tft.scale_to_0_1. Las funciones como esta requieren detalles computacionales que requieren el análisis de todos los datos (como los valores promedio, mínimo y máximo de una característica). Analizar datos distribuidos en varias máquinas para obtener estos detalles requiere un uso intensivo del rendimiento (especialmente si se hace varias veces). Dichos detalles se calculan una vez y se mantienen en el transform_graph. Cada vez que una función los necesita, se obtienen directamente del transform_graph. También ayuda a aplicar las transformaciones creadas durante la fase de entrenamiento directamente a los datos de entrega, lo que garantiza la coherencia en la fase de preprocesamiento.

Otra ventaja importante de utilizar las bibliotecas de transformación de Tensorflow es que cada fase se registra como artefacto, por lo que se mantiene el linaje de los datos. El control de versiones de los datos también se realiza automáticamente cuando los datos cambian. Por lo tanto, facilita la experimentación, la implementación y la reversión en un entorno de producción.

Eso es todo. Si tienes alguna pregunta, escríbela en la sección de comentarios.

Puedes descargar el cuaderno y los archivos de datos utilizados en este artículo desde mi repositorio de GitHub usando esto enlace