Desde la configuración hasta la orquestación: la construcción de un flujo de trabajo ETL con AWS ya no es una lucha

Liderar la industria en la nube con una amplia participación del 32% debido a su entrada temprana del mercado, tecnología robusta y ofertas de servicios integrales. Sin embargo, muchos usuarios consideran que AWS es difícil navegar, y este descontento lleva a más empresas y organizaciones a preferir a sus competidores Microsoft Azure y Google Cloud Platform.

A pesar de su curva de aprendizaje más empinada y su interfaz menos intuitiva, AWS sigue siendo el servicio de nube superior debido a su confiabilidad, nube híbrida y opciones de servicio máximo. Más importante aún, la selección de estrategias adecuadas puede reducir significativamente la complejidad de la configuración, agilizar los flujos de trabajo y aumentar el rendimiento.

En este artículo, introduciré una forma eficiente de configurar una tubería ETL completa con orquestación en AWS, basada en mi propia experiencia. También le dará una visión renovada sobre la producción de datos con AWS o le hará sentir menos dificultades al realizar la configuración si esta es su primera vez para usar AWS para ciertas tareas.

Estrategia para diseñar una cartera de datos eficiente

AWS tiene el ecosistema más completo con sus vastos servicios. Para construir un almacén de datos listo para la producción en AWS, al menos requiere los siguientes servicios:

  • IAM: aunque este servicio no está incluido en ninguna parte del flujo de trabajo, es la base para acceder a todos los demás servicios.
  • AWS S3 – Almacenamiento de Data Lake
  • AWS Glue – ETL Processing
  • Amazon Redshift – Data Warehouse
  • CloudWatch: monitoreo y registro

También necesita acceso al flujo de aire si tiene que programar dependencias más complejas y realizar reintentos avanzados en términos de manejo de errores, aunque el desplazamiento rojo puede manejar algunos trabajos básicos de Cron.

Para facilitar su trabajo, recomiendo instalar un IDE (Código de Visual Studio o Pycharm y, por supuesto, puede elegir su propio IDE favorito). Un IDE mejora dramáticamente su eficiencia para el código complejo de Python, las pruebas/depuración locales, la integración de control de versiones y la colaboración del equipo. Y en la próxima sesión, proporcionaré configuraciones paso a paso.

Configuración inicial

Estos son los pasos de las configuraciones iniciales:

  • Iniciar un entorno virtual en su IDE
  • Instalar dependencias: básicamente, necesitamos instalar las bibliotecas que se utilizarán más adelante.
pip install apache-airflow==2.7.0 boto3 pandas pyspark sqlalchemy
  • Instale AWS CLI: este paso le permite escribir scripts para automatizar varias operaciones de AWS y hace que la gestión de los recursos de AWS sea de manera más eficiente.
  • Configuración de AWS: asegúrese de ingresar estas credenciales de usuario de IAM cuando se le solicite:
    • ID de clave de acceso AWS: desde su usuario de IAM.
    • Clave de acceso secreto de AWS: de su usuario de IAM.
    • Región predeterminada: us-east-1 (o su región preferida)
    • Formato de salida predeterminado: json.
  • Integre el flujo de aire: aquí están los pasos:
    • Inicializar el flujo de aire
    • Crear archivos DAG en Airflow
    • Ejecute el servidor web en http: // localhost: 8080 (inicio de sesión: admin/admin)
    • Abra otra pestaña Terminal e inicie el planificador
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
airflow users create \
  --username admin \
  --password admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email [email protected]
#Initialize Airflow
airflow webserver --port 8080 ##run the webserver
airflow scheduler #start the scheduler

Flujo de trabajo de desarrollo: estudio de caso de datos Covid-19

Estoy usando el conjunto de datos Public Covid-19 de JHU (CC por 4.0 con licencia) para fines de demostración. Puede consultar datos aquí,

El cuadro a continuación muestra el flujo de trabajo desde la ingestión de datos hasta la carga de datos a las tablas de desplazamiento rojo en el entorno de desarrollo.

Flujo de trabajo de desarrollo creado por el autor

Ingestión de datos

En el primer paso de la ingestión de datos a AWS S3, procesé los datos derritiéndolos a formato largo y convirtiendo el formato de fecha. Guardé los datos en el formato Parquet para mejorar la eficiencia de almacenamiento, mejorar el rendimiento de la consulta y reducir los costos de almacenamiento. El código para este paso es el siguiente:

import pandas as pd
from datetime import datetime
import os
import boto3
import sys

def process_covid_data():
    try:
        # Load raw data
        url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
        df = pd.read_csv(url)
        
        # --- Data Processing ---
        # 1. Melt to long format
        df = df.melt(
            id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
            var_name='date_str',
            value_name='confirmed_cases'
        )
        
        # 2. Convert dates (JHU format: MM/DD/YY)
        df['date'] = pd.to_datetime(
            df['date_str'], 
            format='%m/%d/%y',
            errors='coerce'
        ).dropna()
        
        # 3. Save as partitioned Parquet
        output_dir = "covid_processed"
        df.to_parquet(
            output_dir,
            engine='pyarrow',
            compression='snappy',
            partition_cols=['date']
        )
        
        # 4. Upload to S3
        s3 = boto3.client('s3')
        total_files = 0
        
        for root, _, files in os.walk(output_dir):
            for file in files:
                local_path = os.path.join(root, file)
                s3_path = os.path.join(
                    'raw/covid/',
                    os.path.relpath(local_path, output_dir)
                )
                s3.upload_file(
                    Filename=local_path,
                    Bucket='my-dev-bucket',
                    Key=s3_path
                )
            total_files += len(files)
        
        print(f"Successfully processed and uploaded {total_files} Parquet files")
        print(f"Data covers from {df['date'].min()} to {df['date'].max()}")
        return True

    except Exception as e:
        print(f"Error: {str(e)}", file=sys.stderr)
        return False

if __name__ == "__main__":
    process_covid_data()

Después de ejecutar el código Python, debería poder ver los archivos de Parquet en los cubos S3, debajo de la carpeta de ‘Raw/Covid/’.

Captura de pantalla del autor

Desarrollo de la tubería ETL

El pegamento AWS se usa principalmente para el desarrollo de la tubería ETL. Aunque también se puede utilizar para la ingestión de datos, incluso si los datos no se han cargado a S3, su resistencia radica en procesar datos una vez que está en S3 para fines de almacenamiento de datos. Aquí están los scripts de Pyspark para la transformación de datos:

# transform_covid.py
from awsglue.context import GlueContext
from pyspark.sql.functions import *

glueContext = GlueContext(SparkContext.getOrCreate())
df = glueContext.create_dynamic_frame.from_options(
    "s3",
    {"paths": ["s3://my-dev-bucket/raw/covid/"]},
    format="parquet"
).toDF()

# Add transformations here
df_transformed = df.withColumn("load_date", current_date())

# Write to processed zone
df_transformed.write.parquet(
    "s3://my-dev-bucket/processed/covid/",
    mode="overwrite"
)
Captura de pantalla del autor

El siguiente paso es cargar datos en RedShift. En la consola Redshift, haga clic en “Editor de consultas Q2” en el lado izquierdo y puede editar su código SQL y finalizar la copia del desplazamiento rojo.

# Create a table covid_data in dev schema
CREATE TABLE dev.covid_data (
    "Province/State" VARCHAR(100),  
    "Country/Region" VARCHAR(100),
    "Lat" FLOAT8,
    "Long" FLOAT8,
    date_str VARCHAR(100),
    confirmed_cases FLOAT8  
)
DISTKEY("Country/Region")   
SORTKEY(date_str);
# COPY data to redshift
COPY dev.covid_data (
    "Province/State",
    "Country/Region",
    "Lat",
    "Long",
    date_str,
    confirmed_cases
)
FROM 's3://my-dev-bucket/processed/covid/'
IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
REGION 'your-region'
FORMAT PARQUET;

Luego verá los datos cargados con éxito en el almacén de datos.

Captura de pantalla del autor

Automatización de tuberías

La forma más fácil de automatizar su canal de datos es programar trabajos bajo el editor de consultas de cambio rojo V2 mediante la creación de un procedimiento almacenado (tengo una introducción más detallada sobre el procedimiento almacenado de SQL, puede consultar. Este artículo).

CREATE OR REPLACE PROCEDURE dev.run_covid_etl()
AS $$
BEGIN
  TRUNCATE TABLE dev.covid_data;
  COPY dev.covid_data 
  FROM 's3://simba-dev-bucket/raw/covid'
  IAM_ROLE 'arn:aws:iam::your-account-id:role/RedshiftLoadRole'
  REGION 'your-region'
  FORMAT PARQUET;
END;
$$ LANGUAGE plpgsql;
Captura de pantalla del autor

Alternativamente, puede ejecutar el flujo de aire para trabajos programados.

from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 2
}

with DAG(
    'redshift_etl_dev',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    run_etl = RedshiftSQLOperator(
        task_id='run_covid_etl',
        redshift_conn_id='redshift_dev',
        sql='CALL dev.run_covid_etl()',
    )

Flujo de trabajo de producción

Airflow DAG es poderoso para orquestar toda su tubería ETL si hay muchas dependencias y también es una buena práctica en el entorno de producción.

Después de desarrollar y probar su tubería ETL, puede automatizar sus tareas en el entorno de producción utilizando el flujo de aire.

Flujo de trabajo de producción creado por el autor

Aquí está la lista de verificación de los pasos de preparación clave para ayudar a la implementación exitosa en el flujo de aire:

  • Crear cubo S3 my-prod-bucket
  • Crear trabajo de pegamento prod_covid_transformation en la consola de AWS
  • Crear procedimiento almacenado de desplazamiento al rojo prod.load_covid_data()
  • Configurar el flujo de aire
  • Configurar SMTP para correos electrónicos en airflow.cfg

Entonces la implementación de la tubería de datos en el flujo de aire es:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator
from airflow.operators.email import EmailOperator

# 1. DAG CONFIGURATION
default_args = {
    'owner': 'data_team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 1, 1)
}

# 2. DATA INGESTION FUNCTION
def load_covid_data():
    import pandas as pd
    import boto3
    
    url = "https://github.com/CSSEGISandData/COVID-19/raw/master/archived_data/archived_time_series/time_series_19-covid-Confirmed_archived_0325.csv"
    df = pd.read_csv(url)

    df = df.melt(
        id_vars=['Province/State', 'Country/Region', 'Lat', 'Long'], 
        var_name='date_str',
        value_name='confirmed_cases'
    )
    df['date'] = pd.to_datetime(df['date_str'], format='%m/%d/%y')
    
    df.to_parquet(
        's3://my-prod-bucket/raw/covid/',
        engine='pyarrow',
        partition_cols=['date']
    )

# 3. DAG DEFINITION
with DAG(
    'covid_etl',
    default_args=default_args,
    schedule_interval='@daily',
    catchup=False
) as dag:

    # Task 1: Ingest Data
    ingest = PythonOperator(
        task_id='ingest_data',
        python_callable=load_covid_data
    )

    # Task 2: Transform with Glue
    transform = GlueJobOperator(
        task_id='transform_data',
        job_name='prod_covid_transformation',
        script_args={
            '--input_path': 's3://my-prod-bucket/raw/covid/',
            '--output_path': 's3://my-prod-bucket/processed/covid/'
        }
    )

    # Task 3: Load to Redshift
    load = RedshiftSQLOperator(
        task_id='load_data',
        sql="CALL prod.load_covid_data()"
    )

    # Task 4: Notifications
    notify = EmailOperator(
        task_id='send_email',
        to='you-email-address',
        subject='ETL Status: {{ ds }}',
        html_content='ETL job completed: <a href="{{ ti.log_url }}">View Logs</a>'
    )

Mis pensamientos finales

Aunque algunos usuarios, especialmente aquellos que son nuevos en la nube y buscan soluciones simples, tienden a ser intimidadas por la alta barrera de entrada de AWS y se sienten abrumados por las opciones masivas de los servicios, vale la pena el tiempo y los esfuerzos y aquí están las razones:

  • El proceso de configuración y el diseño, la construcción y la prueba de las tuberías de datos le brindan la comprensión profunda de un flujo de trabajo de ingeniería de datos típico. Las habilidades lo beneficiarán incluso si produce sus proyectos con otros servicios en la nube, como Azure, GCP y Alibaba Cloud.
  • El ecosistema maduro que tiene AWS y una amplia gama de servicios que ofrece permite a los usuarios personalizar sus estrategias de arquitectura de datos y disfrutar de más flexibilidad y escalabilidad en sus proyectos.

¡Gracias por leer! ¡Espero que este artículo sea útil para construir su canalización de datos de base en la nube!