def make_rag_chunks(filas, max_chars=700): trozos = []
para fila en filas: texto = ( fila.get(“text_preview”) o fila.get(“rendered_text”) o fila.get(“descripción”) o “” ) texto = normalize_text(texto) si no texto: continuar oraciones = re.split(r”(?<=[.!?])\s+", text) current = "" para oración en oraciones: if len(current) + len(sentence) + 1 <= max_chars: current = (current + " " + oración).strip() else: if current: chunks.append( { "chunk_id": hashlib.sha1( (row.get("url", "") + current).encode() ).hexdigest()[:12]"url": fila.get("url"), "fuente": fila.get("fuente"), "tipo_página": fila.get("tipo_página"), "título": fila.get("título") o fila.get("nombre"), "texto": actual, } ) actual = oración si es actual: trozos.append( { "chunk_id": hashlib.sha1( (row.get("url", "") + actual).encode() ).hexdigest()[:12]"url": fila.get("url"), "fuente": fila.get("fuente"), "page_type": fila.get("página_tipo"), "title": fila.get("título") o fila.get("nombre"), "texto": actual, } ) devuelve fragmentos def analyse_outputs(base_url, bs4_rows, parsel_rows, playwright_rows): all_rows = bs4_rows + parsel_rows + dramaturgo_rows productos = flatten_products(all_rows) crawl_df = pd.DataFrame(all_rows) product_df = pd.DataFrame(products) si no product_df.empty: product_df["price"] = pd.to_numeric(producto_df["price"]errores="coercer") product_df["stock"] = pd.to_numeric(producto_df["stock"]errores="coercer") product_df["rating"] = pd.to_numeric(producto_df["rating"]errores="coercer") product_df["inventory_value"] = producto_df["price"] * producto_df["stock"] gráfico = build_link_graph(base_url, bs4_rows) graph_path = OUTPUT_DIR / "site_link_graph.graphml" si graph.number_of_nodes() > 0: nx.write_graphml(graph, graph_path) fragmentos = make_rag_chunks(all_rows) rag_path = OUTPUT_DIR / “rag_chunks.jsonl” con rag_path.open(“w”, encoding=”utf-8″) as f: para fragmentos en fragmentos: f.write(json.dumps(chunk, asegurar_ascii=False) + “\n”) crawl_json_path = OUTPUT_DIR / “combined_crawl_results.json” crawl_json_path.write_text( json.dumps(all_rows, asegurar_ascii=False, sangría=2), codificación=”utf-8″, ) product_csv_path = OUTPUT_DIR / “normalized_product_catalog.csv” si no es product_df.empty: product_df.to_csv(product_csv_path, index=False) price_plot_path = OUTPUT_DIR / “product_price_chart.png” si no es product_df.empty y producto_df[“price”].notna().any(): plot_df = product_df.dropna(subconjunto=[“price”]).copiar() trazado_df[“label”] = trama_df[“sku”].fillna(“desconocido”) + “\n” + plot_df[“source”].fillna(“”) ax = plot_df.plot( kind=”bar”, x=”label”, y=”price”, legend=False, figsize=(11, 5), title=”Precios de productos extraídos por fuente”, ) ax.set_xlabel(“Producto/fuente de extracción”) ax.set_ylabel(“Precio”) plt.xticks(rotation=35, ha=”right”) plt.tight_layout() plt.savefig(price_plot_path, dpi=160) plt.show() graph_stats = { “nodos”: graph.number_of_nodes(), “edges”: graph.number_of_edges(), “weakly_connected_components”: ( nx.number_weakly_connected_components(graph) if graph.number_of_nodes() else 0 ), } if gráfico.número_de_nodos() > 0: grados_en = dict(graf.grados_en()) grados_fuera = dict(grados_fuera()) estadísticas_grafo[“top_in_degree”] = ordenado (en_grados.items(), clave = lambda x: x[1]reverso = Verdadero, )[:5]
gráfica_estadísticas[“top_out_degree”] = ordenado (out_grados.items(), clave = lambda x: x[1]reverso = Verdadero, )[:5]
resumen = { “base_url”: base_url, “rows_total”: len(all_rows), “beautifulsoup_rows”: len(bs4_rows), “parsel_rows”: len(parsel_rows), “playwright_rows”: len(playwright_rows), “products_total”: len(product_df), “rag_chunks_total”: len(chunks), “graph”: graph_stats, “outputs”: { “beautifulsoup_json”: str(OUTPUT_DIR / “beautifulsoup_crawl.json”), “beautifulsoup_csv”: str(OUTPUT_DIR / “beautifulsoup_crawl.csv”), “parsel_json”: str(OUTPUT_DIR / “parsel_products.json”), “parsel_csv”: str(OUTPUT_DIR / “parsel_products.csv”), “playwright_json”: str(OUTPUT_DIR / “playwright_dynamic.json”), “playwright_csv”: str(OUTPUT_DIR / “playwright_dynamic.csv”), “combined_json”: str(crawl_json_path), “product_csv”: str(product_csv_path) if product_csv_path.exists() else Ninguno, “rag_jsonl”: str(rag_path), “graphml”: str(graph_path) if graph_path.exists() else Ninguno, “price_plot”: str(price_plot_path) if price_plot_path.exists() else Ninguno, “screenshots_dir”: str(SCREENSHOT_DIR), }, } resumen_path = OUTPUT_DIR / “run_summary.md” resumen_path.write_text( “# Resumen de ejecución del tutorial avanzado de Crawlee Python\n\n” f”- Sitio de demostración local: `{base_url}`\n” f”- Total de filas extraídas: `{summary[‘rows_total’]}`\n” f”- Filas de BeautifulSoup: `{resumen[‘beautifulsoup_rows’]}`\n” f”- Filas de Parsel: `{resumen[‘parsel_rows’]}`\n” f”- Filas del dramaturgo: `{resumen[‘playwright_rows’]}`\n” f”- Productos normalizados: `{resumen[‘products_total’]}`\n” f”- fragmentos de RAG: `{resumen[‘rag_chunks_total’]}`\n” f”- Vincular nodos del gráfico: `{graph_stats[‘nodes’]}`\n” f”- Vincular bordes del gráfico: `{graph_stats[‘edges’]}`\n\n” “## Archivos de salida\n\n” + “\n”.join(f”- `{k}`: `{v}`” para k, v en resumen[“outputs”].items()) + “\n”, encoding=”utf-8″, ) print(“\n=== 4) Resumen de análisis ===”) print(json.dumps(summary, indent=2, sure_ascii=False)) intente: desde IPython.display importar visualización, Markdown, imagen como IPImage display(Markdown(“## Crawlee Crawl Preview”)) si no es Crawl_df.empty: vista previa_cols = [
col for col in [“source”, “page_type”, “title”, “url”]
si col en crawl_df.columns]mostrar(crawl_df[preview_cols].head(12)) display(Markdown(“## Catálogo de productos normalizado”)) si no product_df.empty: display(product_df.head(20)) if price_plot_path.exists(): display(Markdown(“## Gráfico de precios del producto”)) display(IPImage(filename=str(price_plot_path))) capture_path = SCREENSHOT_DIR / “dynamic_catalog_full_page.png” if capture_path.exists(): display(Markdown(“## Captura de pantalla de Playwright de la página renderizada con JavaScript”)) display(IPImage(filename=str(screenshot_path))) display(Markdown(f”## Directorio de salida\n`{OUTPUT_DIR}`”)) excepto excepción como exc: print(“Pantalla del portátil omitida:”, repr(exc)) return resumen async def main(): httpd, base_url = start_local_server(SITE_DIR) print(f”\nEl sitio web de demostración local se está ejecutando en: {base_url}/index.html”) try: bs4_rows = esperar run_beautifulsoup_crawl(base_url) parsel_rows = esperar run_parsel_precision_crawl(base_url) playwright_rows = esperar run_playwright_dynamic_crawl(base_url) resumen = analice_outputs(base_url, bs4_rows, parsel_rows, playwright_rows) resumen de retorno finalmente: httpd.shutdown() print(“\nServidor de demostración local cerrado.”) loop = asyncio.get_event_loop() resumen = loop.run_until_complete(main()) print(“\nTutorial completo.”) print(f”Todo las salidas están en: {OUTPUT_DIR}”) print(“Archivos clave:”) para file_path en ordenado(OUTPUT_DIR.rglob(“*”)): if file_path.is_file(): print(” -“, file_path)
para fila en filas: texto = ( fila.get(“text_preview”) o fila.get(“rendered_text”) o fila.get(“descripción”) o “” ) texto = normalize_text(texto) si no texto: continuar oraciones = re.split(r”(?<=[.!?])\s+", text) current = "" para oración en oraciones: if len(current) + len(sentence) + 1 <= max_chars: current = (current + " " + oración).strip() else: if current: chunks.append( { "chunk_id": hashlib.sha1( (row.get("url", "") + current).encode() ).hexdigest()[:12]"url": fila.get("url"), "fuente": fila.get("fuente"), "tipo_página": fila.get("tipo_página"), "título": fila.get("título") o fila.get("nombre"), "texto": actual, } ) actual = oración si es actual: trozos.append( { "chunk_id": hashlib.sha1( (row.get("url", "") + actual).encode() ).hexdigest()[:12]"url": fila.get("url"), "fuente": fila.get("fuente"), "page_type": fila.get("página_tipo"), "title": fila.get("título") o fila.get("nombre"), "texto": actual, } ) devuelve fragmentos def analyse_outputs(base_url, bs4_rows, parsel_rows, playwright_rows): all_rows = bs4_rows + parsel_rows + dramaturgo_rows productos = flatten_products(all_rows) crawl_df = pd.DataFrame(all_rows) product_df = pd.DataFrame(products) si no product_df.empty: product_df["price"] = pd.to_numeric(producto_df["price"]errores="coercer") product_df["stock"] = pd.to_numeric(producto_df["stock"]errores="coercer") product_df["rating"] = pd.to_numeric(producto_df["rating"]errores="coercer") product_df["inventory_value"] = producto_df["price"] * producto_df["stock"] gráfico = build_link_graph(base_url, bs4_rows) graph_path = OUTPUT_DIR / "site_link_graph.graphml" si graph.number_of_nodes() > 0: nx.write_graphml(graph, graph_path) fragmentos = make_rag_chunks(all_rows) rag_path = OUTPUT_DIR / “rag_chunks.jsonl” con rag_path.open(“w”, encoding=”utf-8″) as f: para fragmentos en fragmentos: f.write(json.dumps(chunk, asegurar_ascii=False) + “\n”) crawl_json_path = OUTPUT_DIR / “combined_crawl_results.json” crawl_json_path.write_text( json.dumps(all_rows, asegurar_ascii=False, sangría=2), codificación=”utf-8″, ) product_csv_path = OUTPUT_DIR / “normalized_product_catalog.csv” si no es product_df.empty: product_df.to_csv(product_csv_path, index=False) price_plot_path = OUTPUT_DIR / “product_price_chart.png” si no es product_df.empty y producto_df[“price”].notna().any(): plot_df = product_df.dropna(subconjunto=[“price”]).copiar() trazado_df[“label”] = trama_df[“sku”].fillna(“desconocido”) + “\n” + plot_df[“source”].fillna(“”) ax = plot_df.plot( kind=”bar”, x=”label”, y=”price”, legend=False, figsize=(11, 5), title=”Precios de productos extraídos por fuente”, ) ax.set_xlabel(“Producto/fuente de extracción”) ax.set_ylabel(“Precio”) plt.xticks(rotation=35, ha=”right”) plt.tight_layout() plt.savefig(price_plot_path, dpi=160) plt.show() graph_stats = { “nodos”: graph.number_of_nodes(), “edges”: graph.number_of_edges(), “weakly_connected_components”: ( nx.number_weakly_connected_components(graph) if graph.number_of_nodes() else 0 ), } if gráfico.número_de_nodos() > 0: grados_en = dict(graf.grados_en()) grados_fuera = dict(grados_fuera()) estadísticas_grafo[“top_in_degree”] = ordenado (en_grados.items(), clave = lambda x: x[1]reverso = Verdadero, )[:5]
gráfica_estadísticas[“top_out_degree”] = ordenado (out_grados.items(), clave = lambda x: x[1]reverso = Verdadero, )[:5]
resumen = { “base_url”: base_url, “rows_total”: len(all_rows), “beautifulsoup_rows”: len(bs4_rows), “parsel_rows”: len(parsel_rows), “playwright_rows”: len(playwright_rows), “products_total”: len(product_df), “rag_chunks_total”: len(chunks), “graph”: graph_stats, “outputs”: { “beautifulsoup_json”: str(OUTPUT_DIR / “beautifulsoup_crawl.json”), “beautifulsoup_csv”: str(OUTPUT_DIR / “beautifulsoup_crawl.csv”), “parsel_json”: str(OUTPUT_DIR / “parsel_products.json”), “parsel_csv”: str(OUTPUT_DIR / “parsel_products.csv”), “playwright_json”: str(OUTPUT_DIR / “playwright_dynamic.json”), “playwright_csv”: str(OUTPUT_DIR / “playwright_dynamic.csv”), “combined_json”: str(crawl_json_path), “product_csv”: str(product_csv_path) if product_csv_path.exists() else Ninguno, “rag_jsonl”: str(rag_path), “graphml”: str(graph_path) if graph_path.exists() else Ninguno, “price_plot”: str(price_plot_path) if price_plot_path.exists() else Ninguno, “screenshots_dir”: str(SCREENSHOT_DIR), }, } resumen_path = OUTPUT_DIR / “run_summary.md” resumen_path.write_text( “# Resumen de ejecución del tutorial avanzado de Crawlee Python\n\n” f”- Sitio de demostración local: `{base_url}`\n” f”- Total de filas extraídas: `{summary[‘rows_total’]}`\n” f”- Filas de BeautifulSoup: `{resumen[‘beautifulsoup_rows’]}`\n” f”- Filas de Parsel: `{resumen[‘parsel_rows’]}`\n” f”- Filas del dramaturgo: `{resumen[‘playwright_rows’]}`\n” f”- Productos normalizados: `{resumen[‘products_total’]}`\n” f”- fragmentos de RAG: `{resumen[‘rag_chunks_total’]}`\n” f”- Vincular nodos del gráfico: `{graph_stats[‘nodes’]}`\n” f”- Vincular bordes del gráfico: `{graph_stats[‘edges’]}`\n\n” “## Archivos de salida\n\n” + “\n”.join(f”- `{k}`: `{v}`” para k, v en resumen[“outputs”].items()) + “\n”, encoding=”utf-8″, ) print(“\n=== 4) Resumen de análisis ===”) print(json.dumps(summary, indent=2, sure_ascii=False)) intente: desde IPython.display importar visualización, Markdown, imagen como IPImage display(Markdown(“## Crawlee Crawl Preview”)) si no es Crawl_df.empty: vista previa_cols = [
col for col in [“source”, “page_type”, “title”, “url”]
si col en crawl_df.columns]mostrar(crawl_df[preview_cols].head(12)) display(Markdown(“## Catálogo de productos normalizado”)) si no product_df.empty: display(product_df.head(20)) if price_plot_path.exists(): display(Markdown(“## Gráfico de precios del producto”)) display(IPImage(filename=str(price_plot_path))) capture_path = SCREENSHOT_DIR / “dynamic_catalog_full_page.png” if capture_path.exists(): display(Markdown(“## Captura de pantalla de Playwright de la página renderizada con JavaScript”)) display(IPImage(filename=str(screenshot_path))) display(Markdown(f”## Directorio de salida\n`{OUTPUT_DIR}`”)) excepto excepción como exc: print(“Pantalla del portátil omitida:”, repr(exc)) return resumen async def main(): httpd, base_url = start_local_server(SITE_DIR) print(f”\nEl sitio web de demostración local se está ejecutando en: {base_url}/index.html”) try: bs4_rows = esperar run_beautifulsoup_crawl(base_url) parsel_rows = esperar run_parsel_precision_crawl(base_url) playwright_rows = esperar run_playwright_dynamic_crawl(base_url) resumen = analice_outputs(base_url, bs4_rows, parsel_rows, playwright_rows) resumen de retorno finalmente: httpd.shutdown() print(“\nServidor de demostración local cerrado.”) loop = asyncio.get_event_loop() resumen = loop.run_until_complete(main()) print(“\nTutorial completo.”) print(f”Todo las salidas están en: {OUTPUT_DIR}”) print(“Archivos clave:”) para file_path en ordenado(OUTPUT_DIR.rglob(“*”)): if file_path.is_file(): print(” -“, file_path)