Files
rag-manager/docs/prefect.md
2025-09-18 09:44:16 +00:00

5.5 KiB
Raw Blame History

To break down your described workflow into Prefect tasks, blocks, and flows, here's a direct answer: Prefect flows should manage the orchestration of scraping and ingestion, while tasks encapsulate each atomic operation (e.g., fetching the sitemap, checking the database, scraping, inserting records). Prefects blocks can securely handle configuration/secrets for external services and database storage. Asynchronous monitoring can be achieved using subflows/concurrent tasks with state handlers or notification integrations.


Workflow Breakdown into Prefect Concepts

Flows

  • Main Flow: Orchestrates the entire end-to-end job, handling each site sequentially or in parallel.
  • Scrape Subflow (optional): Manages the crawling/scraping of a single site, which can be spawned concurrently.

Tasks

Each of the workflow steps becomes an individual Prefect task:

  1. Crawl Sitemap Task:

    • Calls Firecrawls /map API to retrieve all URLs and pages for a given site.
  2. Check Existing Data Task:

    • Checks your destination storage (database, vectorstore, etc.) for the URLs existence and retrieves the last scraped date.
  3. Metadata Extraction Task:

    • For eligible sites (not scraped or updated in 30 days), iterate through sitemap, extracting structured metadata and clerical attributes.
  4. Queue for Scraping Task:

    • Sends pages or batches to Firecrawls scrape or batch scrape API for processing.
  5. Insert to Storage Task:

    • Inserts the returned scraped content and metadata into the designated storage backend.

Blocks

  • API Credentials Block:
    • Securely stores Firecrawl API keys and connection params.
  • Database Connection Block:
    • Stores credentials/URI/config for your storage destination.
  • Notification/State Blocks (optional):
    • Slack/email notifications or custom state reporting.

Asynchronous Monitoring

  • Run page scraping and ingestion as concurrent Prefect tasks (using task_runner=ConcurrentTaskRunner() or subflows).
  • Register custom state handlers on tasks or use Prefects built-in notification integrations to async-update job/page status.
  • Flow runs and task runs expose state logs and real-time UI feedback for user-facing transparency.

Example High-Level Prefect Blueprint

from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner

@task
def crawl_sitemap(site_url): ...
@task
def check_url_in_db(url): ...
@task
def extract_metadata(page): ...
@task
def queue_scrape(payload): ...
@task
def insert_scraped_data(data): ...

@flow(task_runner=ConcurrentTaskRunner())
def process_site(site_url):
    sitemap = crawl_sitemap(site_url)
    for url in sitemap:
        if not check_url_in_db(url):
            meta = extract_metadata(url)
            scraped = queue_scrape(meta)
            insert_scraped_data(scraped)
        # monitoring could happen here per-page

@flow
def main_flow(sites: list):
    for site in sites:
        process_site.submit(site)
    # could monitor process_site subflow statuses for UI


Key Points

  • Each atomic action becomes a task.
  • Use concurrent task runners or async subflows for parallelism and monitoring.
  • Use blocks for credentials and external config.
  • Store scrape statuses in the destination storage or use Prefect notifications for detailed monitoring.

Reference

These patterns are standard in modern Prefect 2.x orchestration for web scraping and ETL.


Step Prefect Concept Comments
Map site with Firecrawl Task API call; parallelize by site
Check/update status in storage Task DB lookup; can batch or async
Metadata extraction Task Loop per-page or batch
Trigger Firecrawl scrape Task Async tasks/subflows for parallel scraping
Insert pages/metadata to storage Task Bulk insert recommended
Monitor ingestion & scrape status State, notification Prefect UI + optional state handler/alerts

Implementation Status (September 17, 2025)

Workflow Stage Prefect Artifact Location
Map site with Firecrawl map_firecrawl_site_task ingest_pipeline/flows/ingestion.py
Filter existing R2R entries filter_existing_documents_task ingest_pipeline/flows/ingestion.py
Queue Firecrawl scrape batches scrape_firecrawl_batch_task ingest_pipeline/flows/ingestion.py
Annotate metadata annotate_firecrawl_metadata_task ingest_pipeline/flows/ingestion.py
Upsert annotated documents upsert_r2r_documents_task ingest_pipeline/flows/ingestion.py
Specialized orchestration firecrawl_to_r2r_flow subflow ingest_pipeline/flows/ingestion.py

Supporting implementations:

  • Deterministic Firecrawl document IDs via FirecrawlIngestor.compute_document_id
  • Metadata enrichment with MetadataTagger while preserving required fields
  • R2R upsert logic in ingest_pipeline/storage/r2r/storage.py with description-aware metadata payloads

For a real implementation, Prefect code can be provided tailored to your stack or how you want to batch/parallelize jobs. This general layout is robust for scalable, observable, and manageable workflows.