Flujo de trabajo de implementación de PyGraphistry para canalizaciones de inteligencia de gráficos interactivos en análisis de seguridad e investigación de riesgos
base_g = ( grafistry .bind(source=”src”, destino=”dst”, nodo=”id”) .edges(edges_df) .nodes(nodes_df) .bind( edge=”edge_id”, edge_title=”edge_title”, edge_label=”edge_label”, edge_weight=”event_count”, edge_size=”edge_size”, point_title=”point_title”, point_label=”etiqueta”, point_color=”node_color”, point_size=”node_size”, point_x=”x”, point_y=”y” ) .settings(url_params={“play”: 0, “info”: “true”}) ) print(“\nConstruí un trazador PyGraphistry llamado base_g.”) print(“Une bordes src/dst, atributos de nodo, títulos, etiquetas, tamaños, colores y datos externos diseño x/y.”) intente: dot_text = base_g.plot_static(engine=”graphviz-dot”, reuse_layout=True) dot_path = OUT_DIR / “graph_static.dot” con open(dot_path, “w”) as f: f.write(dot_text if isinstance(dot_text, str) else str(dot_text)) print(“Representación DOT guardada:”, dot_path) excepto excepción como e: print(“Exportación DOT estática omitida:”, repr(e)) def show_pyvis(nodos, bordes, ruta_salida, altura=”780px”): nodes_small = nodes.copy() bordes_pequeño = bordes.copia() max_nodes = 320 if len(nodes_small) > max_nodes: keep = set( nodes_small .sort_values([“is_anomaly”, “anomaly_score”, “max_risk”, “pagerank”]ascendente =[False, False, False, False]) .head(max_nodos)[“id”]
) nodos_pequeño = nodos_pequeño[nodes_small[“id”].isin(mantener)]bordes_pequeño = bordes_pequeño[edges_small[“src”].isin(mantener) y bordes_pequeños[“dst”].isin(keep)]net = Network( height=height, width=”100%”, dirigido=True, notebook=True, cdn_resources=”in_line” ) net.barnes_hut(gravity=-25000, central_gravity=0.2, spring_length=160, spring_strength=0.04, humedecimiento=0.92) para la fila en nodes_small.itertuples(index=False): título = str(row.point_title).replace(“
“, “\n”).replace(“”, “”).replace(“”, “”) net.add_node( fila.id, etiqueta=str(row.label), título=título, grupo=str(fila.entity_type), valor=float(row.node_size) ) para la fila en bordes_small.itertuples(index=False): título = str(row.edge_title).replace(“
“, “\n”).replace(“”, “”).replace(“”, “”) net.add_edge( fila.src, fila.dst, título=título, etiqueta=str(fila.relación) if fila.max_risk >= 0.90 else “”, valor=float(max(1.0, fila.edge_size)) ) net.write_html(str(output_path), notebook=False) display(HTML(filename=str(output_path))) print(“HTML interactivo local guardado:”, output_path) local_full_html = OUT_DIR / “local_full_graph.html” show_pyvis(nodes_df, edge_df, local_full_html) seed_node = ( nodes_df .sort_values([“is_anomaly”, “anomaly_score”, “max_risk”, “pagerank”]ascendente =[False, False, False, False]) .iloc[0][“id”]

) ego = nx.ego_graph(G.to_undirected(), seed_node, radio=2) ego_nodes = set(ego.nodes()) ego_edges_df = bordes_df[edges_df[“src”].isin(ego_nodos) y bordes_df[“dst”].isin(ego_nodes)].copy() ego_nodes_df = nodos_df[nodes_df[“id”].isin(ego_nodes)].copy() print(“\nNodo semilla de investigación enfocada:”, nodo_semilla) print(f”Nodos del subgrafo del ego: {len(ego_nodes_df):,}”) print(f”Bordes del subgrafo del ego: {len(ego_edges_df):,}”) display( ego_nodes_df .sort_values([“is_anomaly”, “anomaly_score”, “max_risk”]ascendente =[False, False, False])
[[“id”, “entity_type”, “risk_band”, “is_anomaly”, “anomaly_score”, “max_risk”, “degree_w”, “pagerank”, “community”]].head(30) ) ego_g = ( grafistry .bind(source=”src”, destino=”dst”, node=”id”) .edges(ego_edges_df) .nodes(ego_nodes_df) .bind( edge=”edge_id”, edge_title=”edge_title”, edge_label=”edge_label”, edge_weight=”event_count”, edge_size=”edge_size”, point_title=”point_title”, point_label=”label”, point_color=”node_color”, point_size=”node_size”, point_x=”x”, point_y=”y” ) .settings(url_params={“play”: 0, “info”: “true”}) ) local_ego_html = OUT_DIR / “local_ego_investigation_graph.html” show_pyvis(ego_nodes_df, ego_edges_df, local_ego_html) arriesgado_edges_df = bordes_df[
(edges_df[“max_risk”] >= 0,85) | (bordes_df[“failed_count”] >= bordes_df[“failed_count”].cuantil(0,95)) | (bordes_df[“impossible_travel_count”] > 0) ].copy() Risky_node_ids = set(risky_edges_df[“src”]).union(set(arriesgado_edges_df[“dst”])) arriesgado_nodes_df = nodos_df[nodes_df[“id”].isin(risky_node_ids)].copy() Risky_g = ( grafismo .bind(source=”src”, destino=”dst”, node=”id”) .edges(risky_edges_df) .nodes(risky_nodes_df) .bind( edge=”edge_id”, edge_title=”edge_title”, edge_label=”edge_label”, edge_weight=”event_count”, edge_size=”edge_size”, point_title=”point_title”, point_label=”label”, point_color=”node_color”, point_size=”node_size”, point_x=”x”, point_y=”y” ) .settings(url_params={“play”: 0, “info”: “true”}) ) print(“\nGráfico filtrado de alto riesgo:”) print(f”Nodos de riesgo: {len(risky_nodes_df):,}”) print(f”Bordes riesgosos: {len(risky_edges_df):,}”) local_risky_html = OUT_DIR / “local_high_risk_graph.html” show_pyvis(risky_nodes_df, Risky_edges_df, local_risky_html)