AI Sparks

Crawlee for Python: Build a Web Crawler Pipeline That Handles Robots, Link Graphs, and RAG Chunk Deployment

def make_rag_chunks(rows, max_chars=700):
   chunks = []
   for row in rows:
       text = (
           row.get("text_preview")
           or row.get("rendered_text")
           or row.get("description")
           or ""
       )
       text = normalize_text(text)
       if not text:
           continue
       sentences = re.split(r"(?<=[.!?])s+", text)
       current = ""
       for sentence in sentences:
           if len(current) + len(sentence) + 1 <= max_chars:
               current = (current + " " + sentence).strip()
           else:
               if current:
                   chunks.append(
                       {
                           "chunk_id": hashlib.sha1(
                               (row.get("url", "") + current).encode()
                           ).hexdigest()[:12],
                           "url": row.get("url"),
                           "source": row.get("source"),
                           "page_type": row.get("page_type"),
                           "title": row.get("title") or row.get("name"),
                           "text": current,
                       }
                   )
               current = sentence
       if current:
           chunks.append(
               {
                   "chunk_id": hashlib.sha1(
                       (row.get("url", "") + current).encode()
                   ).hexdigest()[:12],
                   "url": row.get("url"),
                   "source": row.get("source"),
                   "page_type": row.get("page_type"),
                   "title": row.get("title") or row.get("name"),
                   "text": current,
               }
           )
   return chunks
def analyze_outputs(base_url, bs4_rows, parsel_rows, playwright_rows):
   all_rows = bs4_rows + parsel_rows + playwright_rows
   products = flatten_products(all_rows)
   crawl_df = pd.DataFrame(all_rows)
   product_df = pd.DataFrame(products)
   if not product_df.empty:
       product_df["price"] = pd.to_numeric(product_df["price"], errors="coerce")
       product_df["stock"] = pd.to_numeric(product_df["stock"], errors="coerce")
       product_df["rating"] = pd.to_numeric(product_df["rating"], errors="coerce")
       product_df["inventory_value"] = product_df["price"] * product_df["stock"]
   graph = build_link_graph(base_url, bs4_rows)
   graph_path = OUTPUT_DIR / "site_link_graph.graphml"
   if graph.number_of_nodes() > 0:
       nx.write_graphml(graph, graph_path)
   chunks = make_rag_chunks(all_rows)
   rag_path = OUTPUT_DIR / "rag_chunks.jsonl"
   with rag_path.open("w", encoding="utf-8") as f:
       for chunk in chunks:
           f.write(json.dumps(chunk, ensure_ascii=False) + "n")
   crawl_json_path = OUTPUT_DIR / "combined_crawl_results.json"
   crawl_json_path.write_text(
       json.dumps(all_rows, ensure_ascii=False, indent=2),
       encoding="utf-8",
   )
   product_csv_path = OUTPUT_DIR / "normalized_product_catalog.csv"
   if not product_df.empty:
       product_df.to_csv(product_csv_path, index=False)
   price_plot_path = OUTPUT_DIR / "product_price_chart.png"
   if not product_df.empty and product_df["price"].notna().any():
       plot_df = product_df.dropna(subset=["price"]).copy()
       plot_df["label"] = plot_df["sku"].fillna("unknown") + "n" + plot_df["source"].fillna("")
       ax = plot_df.plot(
           kind="bar",
           x="label",
           y="price",
           legend=False,
           figsize=(11, 5),
           title="Extracted Product Prices by Source",
       )
       ax.set_xlabel("Product / extraction source")
       ax.set_ylabel("Price")
       plt.xticks(rotation=35, ha="right")
       plt.tight_layout()
       plt.savefig(price_plot_path, dpi=160)
       plt.show()
   graph_stats = {
       "nodes": graph.number_of_nodes(),
       "edges": graph.number_of_edges(),
       "weakly_connected_components": (
           nx.number_weakly_connected_components(graph)
           if graph.number_of_nodes()
           else 0
       ),
   }
   if graph.number_of_nodes() > 0:
       in_degrees = dict(graph.in_degree())
       out_degrees = dict(graph.out_degree())
       graph_stats["top_in_degree"] = sorted(
           in_degrees.items(),
           key=lambda x: x[1],
           reverse=True,
       )[:5]
       graph_stats["top_out_degree"] = sorted(
           out_degrees.items(),
           key=lambda x: x[1],
           reverse=True,
       )[:5]
   summary = {
       "base_url": base_url,
       "rows_total": len(all_rows),
       "beautifulsoup_rows": len(bs4_rows),
       "parsel_rows": len(parsel_rows),
       "playwright_rows": len(playwright_rows),
       "products_total": len(product_df),
       "rag_chunks_total": len(chunks),
       "graph": graph_stats,
       "outputs": {
           "beautifulsoup_json": str(OUTPUT_DIR / "beautifulsoup_crawl.json"),
           "beautifulsoup_csv": str(OUTPUT_DIR / "beautifulsoup_crawl.csv"),
           "parsel_json": str(OUTPUT_DIR / "parsel_products.json"),
           "parsel_csv": str(OUTPUT_DIR / "parsel_products.csv"),
           "playwright_json": str(OUTPUT_DIR / "playwright_dynamic.json"),
           "playwright_csv": str(OUTPUT_DIR / "playwright_dynamic.csv"),
           "combined_json": str(crawl_json_path),
           "product_csv": str(product_csv_path) if product_csv_path.exists() else None,
           "rag_jsonl": str(rag_path),
           "graphml": str(graph_path) if graph_path.exists() else None,
           "price_plot": str(price_plot_path) if price_plot_path.exists() else None,
           "screenshots_dir": str(SCREENSHOT_DIR),
       },
   }
   summary_path = OUTPUT_DIR / "run_summary.md"
   summary_path.write_text(
       "# Crawlee Python Advanced Tutorial Run Summarynn"
       f"- Local demo site: `{base_url}`n"
       f"- Total extracted rows: `{summary['rows_total']}`n"
       f"- BeautifulSoup rows: `{summary['beautifulsoup_rows']}`n"
       f"- Parsel rows: `{summary['parsel_rows']}`n"
       f"- Playwright rows: `{summary['playwright_rows']}`n"
       f"- Normalized products: `{summary['products_total']}`n"
       f"- RAG chunks: `{summary['rag_chunks_total']}`n"
       f"- Link graph nodes: `{graph_stats['nodes']}`n"
       f"- Link graph edges: `{graph_stats['edges']}`nn"
       "## Output filesnn"
       + "n".join(f"- `{k}`: `{v}`" for k, v in summary["outputs"].items())
       + "n",
       encoding="utf-8",
   )
   print("n=== 4) Analysis summary ===")
   print(json.dumps(summary, indent=2, ensure_ascii=False))
   try:
       from IPython.display import display, Markdown, Image as IPImage
       display(Markdown("## Crawlee crawl preview"))
       if not crawl_df.empty:
           preview_cols = [
               col for col in ["source", "page_type", "title", "url"]
               if col in crawl_df.columns
           ]
           display(crawl_df[preview_cols].head(12))
       display(Markdown("## Normalized product catalog"))
       if not product_df.empty:
           display(product_df.head(20))
       if price_plot_path.exists():
           display(Markdown("## Product price chart"))
           display(IPImage(filename=str(price_plot_path)))
       screenshot_path = SCREENSHOT_DIR / "dynamic_catalog_full_page.png"
       if screenshot_path.exists():
           display(Markdown("## Playwright screenshot of JavaScript-rendered page"))
           display(IPImage(filename=str(screenshot_path)))
       display(Markdown(f"## Output directoryn`{OUTPUT_DIR}`"))
   except Exception as exc:
       print("Notebook display skipped:", repr(exc))
   return summary
async def main():
   httpd, base_url = start_local_server(SITE_DIR)
   print(f"nLocal demo website is running at: {base_url}/index.html")
   try:
       bs4_rows = await run_beautifulsoup_crawl(base_url)
       parsel_rows = await run_parsel_precision_crawl(base_url)
       playwright_rows = await run_playwright_dynamic_crawl(base_url)
       summary = analyze_outputs(base_url, bs4_rows, parsel_rows, playwright_rows)
       return summary
   finally:
       httpd.shutdown()
       print("nLocal demo server shut down.")
loop = asyncio.get_event_loop()
summary = loop.run_until_complete(main())
print("nTutorial complete.")
print(f"All outputs are in: {OUTPUT_DIR}")
print("Key files:")
for file_path in sorted(OUTPUT_DIR.rglob("*")):
   if file_path.is_file():
       print(" -", file_path)

Related Articles

Leave a Reply

Your email address will not be published. Required fields are marked *

Back to top button