Tutorial de canalización de agentes múltiples de Google ADK: carga de datos, pruebas estadísticas, visualización y generación de informes en Python
def describe_dataset(dataset_name: str, tool_context: ToolContext) -> dict: print(f”📊 Describiendo el conjunto de datos: {dataset_name}”) df = DATA_STORE.get_dataset(dataset_name) si df es Ninguno: return {“status”: “error”, “message”: f”Dataset ‘{dataset_name}’ no encontrado”} numeric_cols = df.select_dtypes(incluir=[np.number]).columns.tolist() categorical_cols = df.select_dtypes(incluir=[‘object’, ‘category’]).columns.tolist() resultado = { “status”: “éxito”, “dataset”: dataset_name, “overview”: { “total_rows”: int(len(df)), “total_columns”: int(len(df.columns)), “numeric_columns”: numeric_cols, “categorical_columns”: categorical_cols, “memory_mb”: round(float(df.memory_usage(deep=True).sum() / 1024/1024), 2), “duplicate_rows”: int(df.duplicated().sum()), “missing_total”: int(df.isnull().sum().sum()) } } if numeric_cols: stats_dict = {} for col in numeric_cols: col_data = df[col].dropna() si len(col_data) > 0: stats_dict[col] = { “count”: int(len(col_data)), “mean”: round(float(col_data.mean()), 3), “std”: round(float(col_data.std()), 3), “min”: round(float(col_data.min()), 3), “25%”: round(float(col_data.quantile(0.25)), 3), “50%”: round(float(col_data.median()), 3), “75%”: round(float(col_data.quantile(0.75)), 3), “max”: round(float(col_data.max()), 3), “skewness”: round(float(col_data.skew()), 3), “missing”: int(df[col].isnull().sum()) } resultado[“numeric_summary”] = stats_dict si categorical_cols: cat_dict = {} para col en categorical_cols[:10]:vc = df[col].value_counts() cat_dict[col] = { “valores_únicos”: int(df[col].nunique()), “top_values”: {str(k): int(v) para k, v en vc.head(5).items()}, “faltante”: int(df[col].isnull().sum()) } resultado[“categorical_summary”] = cat_dict DATA_STORE.log_analysis(“describe”, dataset_name, “Estadísticas generadas”) return make_serializable(resultado) def correlación_analysis(dataset_name: str, método: str = “pearson”, tool_context: ToolContext = Ninguno) -> dict: print(f”📊 Análisis de correlación: {dataset_name} ({method})”) df = DATA_STORE.get_dataset(dataset_name) si df es Ninguno: devuelve {“status”: “error”, “message”: f”Dataset ‘{dataset_name}’ no encontrado”} numeric_df = df.select_dtypes(include=[np.number]) si numeric_df.shape[1] < 2: return {"status": "error", "message": "Necesita al menos 2 columnas numéricas"} corr_matrix = numeric_df.corr(method=method) strong_corrs = [] para i en rango(len(corr_matrix.columns)): para j en rango(i + 1, len(corr_matrix.columns)): col1, col2 = corr_matrix.columns[i]corr_matrix.columnas[j] val = corr_matrix.iloc[i, j] if abs(val) > 0.5: strong_corrs.append({ “var1”: col1, “var2”: col2, “correlation”: round(float(val), 3), “strength”: “strong” if abs(val) > 0.7 else “moderado” }) strong_corrs.sort(key=lambda x: abs(x[“correlation”]), inversa=Verdadero) corr_dict = {} para col en corr_matrix.columns: corr_dict[col] = {k: round(float(v), 3) para k, v en corr_matrix[col].items()} DATA_STORE.log_analysis(“correlación”, nombre_conjunto de datos, f”{método} correlación”) return make_serializable({ “status”: “éxito”, “método”: método, “correlation_matrix”: corr_dict, “strong_correlations”: strong_corrs[:10]”insight”: f”Se encontraron {len(strong_corrs)} pares con |correlación| > 0.5″ }) def hipótesis_test(dataset_name: str, test_type: str, column1: str, column2: str = Ninguno, group_column: str = Ninguno, tool_context: ToolContext = Ninguno) -> dict: print(f”📊 Prueba de hipótesis: {test_type} en {dataset_name}”) df = DATA_STORE.get_dataset(dataset_name) si df es Ninguno: devuelve {“status”: “error”, “message”: f”Dataset ‘{dataset_name}’ no encontrado”} si la columna 1 no está en df.columns: devuelve {“status”: “error”, “message”: f”Columna ‘{column1}’ no encontrada”} intenta: si test_type == “normalidad”: datos = df[column1].dropna() si len(data) > 5000: data = data.sample(5000) stat, p = stats.shapiro(data) return make_serializable({ “status”: “éxito”, “test”: “Prueba de normalidad de Shapiro-Wilk”, “columna”: columna1, “estadística”: round(float(stat), 4), “p_value”: round(float(p), 6), “is_normal”: bool(p > 0.05), “interpretación”: “Los datos aparecen distribuidos normalmente” si p > 0.05 else “Los datos NO están distribuidos normalmente” }) elif test_type == “ttest”: si group_column es Ninguno: return {“status”: “error”, “message”: “group_column requerida para t-test”} groups = df[group_column].dropna().unique() if len(groups) != 2: return {“status”: “error”, “message”: f”La prueba T necesita exactamente 2 grupos, encontrados {len(groups)}: {list(groups)}”} g1 = df[df[group_column] == grupos[0]][column1].dropna() g2 = gl[df[group_column] == grupos[1]][column1].dropna() stat, p = stats.ttest_ind(g1, g2) return make_serializable({ “status”: “success”, “test”: “Test de muestras independientes”, “comparing”: column1, “group1”: {“name”: str(groups[0]), “mean”: round(float(g1.mean()), 3), “n”: int(len(g1))}, “group2”: {“name”: str(groups[1]), “mean”: round(float(g2.mean()), 3), “n”: int(len(g2))}, “t_statistic”: round(float(stat), 4), “p_value”: round(float(p), 6), “significant”: bool(p < 0,05), "interpretation": "Diferencia significativa" si p < 0,05 else "Sin diferencia significativa" }) elif test_type == "anova": si group_column es Ninguno: devuelve {"status": "error", "message": "group_column requerida para ANOVA"} groups_data = [grp[column1].dropna().valores para _, grp en df.groupby(columna_grupo)]nombres_grupo = lista(df[group_column].unique()) estadística, p = estadísticas.f_oneway(*groups_data) group_stats = [] para el nombre en group_names: grp_data = df[df[group_column] == nombre][column1].dropna() group_stats.append({ "group": str(name), "mean": round(float(grp_data.mean()), 3), "std": round(float(grp_data.std()), 3), "n": int(len(grp_data)) }) return make_serializable({ "status": "success", "test": "One-Way ANOVA", "comparing": column1, "across": group_column, "n_groups": int(len(group_names)), "group_statistics": group_stats, "f_statistic": round(float(stat), 4), "p_value": round(float(p), 6), "significant": bool(p < 0,05), "interpretation": "Diferencias significativas entre grupos" si p < 0,05 else "Sin diferencias significativas" }) elif test_type == "chi2": si la columna2 es Ninguna: devuelve {"status": "error", "message": "columna2 requerida para la prueba de chi-cuadrado"} contingencia = pd.crosstab(df[column1]df[column2]) chi2, p, dof, _ = stats.chi2_contingency(contingencia) return make_serializable({ "status": "éxito", "test": "Prueba de independencia de chi-cuadrado", "variables": [column1, column2]"chi2_statistic": round(float(chi2), 4), "p_value": round(float(p), 6), "grados_de_libertad": int(dof), "significant": bool(p < 0.05), "interpretation": "Las variables son dependientes" si p < 0.05 else "Las variables son independientes" }) else: return {"status": "error", "message": f"Unknown prueba: {test_type}. Uso: normalidad, ttest, anova, chi2"} excepto Excepción como e: return {"status": "error", "message": f"Prueba fallida: {str(e)}"} def outlier_detection(dataset_name: str, column: str, método: str = "iqr", tool_context: ToolContext = Ninguno) -> dict: print(f”📊 Detección de valores atípicos: {columna} en {dataset_name}”) df = DATA_STORE.get_dataset(dataset_name) si df es Ninguno: devuelve {“status”: “error”, “message”: f”Conjunto de datos ‘{dataset_name}’ no encontrado”} si la columna no está en df.columns: devuelve {“status”: “error”, “message”: f”Columna ‘{column}’ no encontrada”} data = df[column].dropna() if método == “iqr”: Q1 = float(data.quantile(0.25)) Q3 = float(data.quantile(0.75)) IQR = Q3 – Q1 inferior = Q1 – 1.5 * IQR superior = Q3 + 1.5 * valores atípicos de IQR = datos[(data < lower) | (data > upper)]

return make_serializable({ “status”: “éxito”, “método”: “IQR (rango intercuartil)”, “columna”: columna, “límites”: {“lower”: round(inferior, 3), “upper”: round(superior, 3)}, “iqr”: round(IQR, 3), “total_values”: int(len(data)), “outlier_count”: int(len(valores atípicos)), “outlier_pct”: round(float(len(valores atípicos) / len(datos) * 100), 2), “outlier_examples”: [round(float(x), 2) for x in outliers.head(10).tolist()]
}) método elif == “zscore”: z = np.abs(stats.zscore(data)) valores atípicos = datos[z > 3]

return make_serializable({ “status”: “éxito”, “método”: “Z-Score (umbral: 3)”, “columna”: columna, “total_values”: int(len(datos)), “outlier_count”: int(len(outliers)), “outlier_pct”: round(float(len(outliers) / len(data) * 100), 2), “outlier_examples”: [round(float(x), 2) for x in outliers.head(10).tolist()]
}) return {“status”: “error”, “message”: f”Método desconocido: {método}. Uso: iqr, zscore”} print(“✅ ¡Herramientas de análisis estadístico definidas!”)