Files
rag-manager/ingest_pipeline/ingestors/firecrawl.py
2025-09-15 12:35:42 -04:00

230 lines
6.8 KiB
Python

"""Firecrawl ingestor for web and documentation sites."""
import asyncio
from collections.abc import AsyncGenerator
from datetime import UTC, datetime
from typing import Any
from uuid import uuid4
from firecrawl import AsyncFirecrawl
from typing_extensions import override
from ..config import get_settings
from ..core.models import (
Document,
DocumentMetadata,
FirecrawlConfig,
IngestionJob,
IngestionSource,
)
from .base import BaseIngestor
class FirecrawlIngestor(BaseIngestor):
"""Ingestor for web and documentation sites using Firecrawl."""
config: FirecrawlConfig
client: Any # AsyncFirecrawl client instance
def __init__(self, config: FirecrawlConfig | None = None):
"""
Initialize Firecrawl ingestor.
Args:
config: Firecrawl configuration (for operational params only)
"""
self.config = config or FirecrawlConfig()
settings = get_settings()
# All connection details come from settings/.env
# For self-hosted instances, use a dummy API key if none is provided
# The SDK requires an API key even for self-hosted instances
api_key = settings.firecrawl_api_key or "no-key-required"
# AsyncFirecrawl automatically uses v2 endpoints
self.client = AsyncFirecrawl(api_key=api_key, api_url=str(settings.firecrawl_endpoint))
@override
async def ingest(self, job: IngestionJob) -> AsyncGenerator[Document, None]:
"""
Ingest documents from a web source.
Args:
job: The ingestion job configuration
Yields:
Documents from the web source
"""
url = str(job.source_url)
# First, map the site to understand its structure
site_map = await self._map_site(url)
# If map returns empty, just use the main URL
if not site_map:
site_map = [url]
# Process pages in batches
batch_size = 10
for i in range(0, len(site_map), batch_size):
batch_urls = site_map[i : i + batch_size]
documents = await self._scrape_batch(batch_urls)
for doc_data in documents:
yield self._create_document(doc_data, job)
@override
async def validate_source(self, source_url: str) -> bool:
"""
Validate if the web source is accessible.
Args:
source_url: URL to validate
Returns:
True if source is accessible
"""
try:
# Use SDK v2 endpoints for both self-hosted and cloud
result = await self.client.scrape(source_url, formats=["markdown"])
return result is not None and hasattr(result, "markdown")
except Exception:
return False
@override
async def estimate_size(self, source_url: str) -> int:
"""
Estimate the number of pages in the website.
Args:
source_url: URL of the website
Returns:
Estimated number of pages
"""
try:
site_map = await self._map_site(source_url)
return len(site_map) if site_map else 0
except Exception:
return 0
async def _map_site(self, url: str) -> list[str]:
"""
Map a website to get all URLs.
Args:
url: Base URL to map
Returns:
List of URLs found
"""
try:
# Use SDK v2 map endpoint
result = await self.client.map(url=url, limit=self.config.limit)
if result and hasattr(result, "links"):
# Extract URLs from the result
return [
link if isinstance(link, str) else getattr(link, "url", str(link))
for link in result.links
]
return []
except Exception as e:
# If map fails (might not be available in all versions), fall back to single URL
import logging
logging.warning(f"Map endpoint not available or failed: {e}. Using single URL.")
return [url]
async def _scrape_batch(self, urls: list[str]) -> list[dict[str, str]]:
"""
Scrape a batch of URLs.
Args:
urls: List of URLs to scrape
Returns:
List of scraped documents
"""
tasks = []
for url in urls:
task = self._scrape_single(url)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
documents = []
for result in results:
if isinstance(result, Exception):
continue
if result and isinstance(result, dict) and "markdown" in result:
documents.append(result)
return documents
async def _scrape_single(self, url: str) -> dict[str, str]:
"""
Scrape a single URL.
Args:
url: URL to scrape
Returns:
Scraped document data
"""
try:
# Use SDK v2 scrape endpoint
result = await self.client.scrape(url, formats=self.config.formats)
# Extract data from the result
if result:
# The SDK returns a ScrapeResult object with markdown and metadata
metadata = getattr(result, "metadata", {})
return {
"markdown": getattr(result, "markdown", ""),
"sourceURL": url,
"title": metadata.get("title", "")
if isinstance(metadata, dict)
else getattr(metadata, "title", ""),
"description": metadata.get("description", "")
if isinstance(metadata, dict)
else getattr(metadata, "description", ""),
}
return {}
except Exception as e:
import logging
logging.debug(f"Failed to scrape {url}: {e}")
return {}
def _create_document(self, doc_data: dict[str, str], job: IngestionJob) -> Document:
"""
Create a Document from scraped data.
Args:
doc_data: Scraped document data
job: The ingestion job
Returns:
Document instance
"""
content = doc_data.get("markdown", "")
metadata: DocumentMetadata = {
"source_url": doc_data.get("sourceURL", str(job.source_url)),
"title": doc_data.get("title"),
"description": doc_data.get("description"),
"timestamp": datetime.now(UTC),
"content_type": "text/markdown",
"word_count": len(content.split()),
"char_count": len(content),
}
return Document(
id=uuid4(),
content=content,
metadata=metadata,
source=IngestionSource.WEB,
collection=job.storage_backend.value,
)