This commit is contained in:
2025-09-21 10:25:54 +00:00
parent e017e0533d
commit 52c972bcdf
34 changed files with 5587 additions and 414 deletions

View File

@@ -17,7 +17,7 @@ from ..core.models import (
StorageBackend,
StorageConfig,
)
from ..flows.ingestion import create_ingestion_flow
from ..flows.ingestion_flows import create_ingestion_flow
from ..flows.scheduler import create_scheduled_deployment, serve_deployments
app = typer.Typer(

View File

@@ -363,7 +363,9 @@ class CollectionOverviewScreen(Screen[None]):
table.clear(columns=True)
# Add enhanced columns with more metadata
table.add_columns("Collection", "Backend", "Documents", "Size", "Type", "Status", "Updated")
table.add_columns(
"Collection", "Backend", "Documents", "Size", "Type", "Status", "Updated"
)
# Add rows with enhanced formatting
for collection in self.collections:
@@ -395,7 +397,9 @@ class CollectionOverviewScreen(Screen[None]):
collection["last_updated"],
)
except Exception as e:
LOGGER.warning(f"Failed to add collection row for {collection.get('name', 'unknown')}: {e}")
LOGGER.warning(
f"Failed to add collection row for {collection.get('name', 'unknown')}: {e}"
)
continue
if self.collections:

View File

@@ -150,12 +150,13 @@ class ConfirmDeleteScreen(Screen[None]):
except Exception as e:
self.notify(f"Failed to delete collection: {e}", severity="error", markup=False)
def _refresh_parent_collections(self) -> None:
def _refresh_parent_collections(self, *args) -> None:
"""Helper method to refresh parent collections."""
try:
self.parent_screen.refresh_collections()
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.exception(f"Failed to refresh parent collections after deletion: {e}")
# Don't re-raise to prevent TUI crash

View File

@@ -13,7 +13,7 @@ from textual.widgets import Button, Checkbox, Input, Label, LoadingIndicator, Ru
from typing_extensions import override
from ....core.models import IngestionResult, IngestionSource, StorageBackend
from ....flows.ingestion import create_ingestion_flow
from ....flows.ingestion_flows import create_ingestion_flow
from ..models import CollectionInfo
from ..utils.storage_manager import StorageManager
from ..widgets import EnhancedProgressBar

View File

@@ -2,7 +2,7 @@
from datetime import UTC, datetime
from enum import Enum
from typing import Annotated, ClassVar, TypedDict
from typing import Annotated, ClassVar, Literal, TypedDict
from uuid import UUID, uuid4
from prefect.blocks.core import Block
@@ -105,8 +105,8 @@ class FirecrawlConfig(Block):
_description: ClassVar[str | None] = "Configures Firecrawl web scraping and crawling parameters"
formats: list[str] = Field(default_factory=lambda: ["markdown", "html"])
max_depth: Annotated[int, Field(ge=1, le=20)] = Field(default_factory=_default_max_crawl_depth)
limit: Annotated[int, Field(ge=1, le=1000)] = Field(default_factory=_default_max_crawl_pages)
max_depth: Annotated[int, Field(ge=1, le=100)] = Field(default_factory=_default_max_crawl_depth)
limit: Annotated[int, Field(ge=1, le=5000)] = Field(default_factory=_default_max_crawl_pages)
only_main_content: bool = Field(default=True)
include_subdomains: bool = Field(default=False)
@@ -129,6 +129,30 @@ class RepomixConfig(Block):
max_file_size: int = Field(default_factory=_default_max_file_size) # 1MB
respect_gitignore: bool = Field(default=True)
# Smart extraction configuration
extraction_mode: Literal["full", "smart", "structure", "docs"] = Field(
default="smart",
description="Extraction strategy: full=all files, smart=prioritized, structure=compress only, docs=documentation only",
)
use_compression: bool = Field(
default=False, description="Use repomix --compress to extract code structure only"
)
prioritize_docs: bool = Field(
default=True, description="Prioritize README and documentation files in extraction"
)
max_output_size: Annotated[int, Field(ge=1_000_000, le=50_000_000)] = Field(
default=5_000_000, description="Maximum output size in bytes (1MB-50MB)"
)
include_git_history: bool = Field(
default=False, description="Include git diffs and commit history"
)
top_files_count: Annotated[int, Field(ge=1, le=50)] = Field(
default=10, description="Number of top files to show in summary"
)
output_format: Literal["xml", "markdown", "json"] = Field(
default="markdown", description="Repomix output format"
)
class R2RConfig(Block):
"""Configuration for R2R ingestion."""
@@ -198,6 +222,7 @@ class DocumentMetadata(DocumentMetadataRequired, total=False):
# Custom business metadata
importance_score: float | None
extraction_mode: str | None
review_status: str | None
assigned_team: str | None
@@ -226,6 +251,7 @@ class IngestionJob(BaseModel):
completed_at: datetime | None = Field(default=None)
error_message: str | None = Field(default=None)
document_count: int = Field(default=0)
documents_skipped: int = Field(default=0)
storage_backend: StorageBackend
@@ -236,5 +262,6 @@ class IngestionResult(BaseModel):
status: IngestionStatus
documents_processed: int
documents_failed: int
documents_skipped: int = Field(default=0)
duration_seconds: float
error_messages: list[str] = Field(default_factory=list)

View File

@@ -1,6 +1,6 @@
"""Prefect flows for orchestration."""
from .ingestion import create_ingestion_flow
from .ingestion_flows import create_ingestion_flow
from .scheduler import create_scheduled_deployment
__all__ = [

View File

@@ -1,7 +1,8 @@
"""Prefect flow for ingestion pipeline."""
"""Main ingestion flows with improved duplicate handling and logging."""
from __future__ import annotations
import asyncio
from collections.abc import Callable
from datetime import UTC, datetime
from typing import TYPE_CHECKING, Literal, TypeAlias, assert_never, cast
@@ -29,221 +30,51 @@ from ..storage import OpenWebUIStorage, WeaviateStorage
from ..storage import R2RStorage as RuntimeR2RStorage
from ..storage.base import BaseStorage
from ..utils.metadata_tagger import MetadataTagger
from .tasks import (
annotate_firecrawl_metadata_task,
filter_existing_documents_task,
initialize_storage_task,
map_firecrawl_site_task,
scrape_firecrawl_batch_task,
update_job_status_task,
upsert_r2r_documents_task,
validate_source_task,
)
SourceTypeLiteral = Literal["web", "repository", "documentation"]
StorageBackendLiteral = Literal["weaviate", "open_webui", "r2r"]
SourceTypeLike: TypeAlias = IngestionSource | SourceTypeLiteral
StorageBackendLike: TypeAlias = StorageBackend | StorageBackendLiteral
def _safe_cache_key(prefix: str, params: dict[str, object], key: str) -> str:
"""Create a type-safe cache key from task parameters."""
value = params.get(key, "")
return f"{prefix}_{hash(str(value))}"
if TYPE_CHECKING:
from ..storage.r2r.storage import R2RStorage as R2RStorageType
else:
R2RStorageType = BaseStorage
@task(name="validate_source", retries=2, retry_delay_seconds=10, tags=["validation"])
async def validate_source_task(source_url: str, source_type: IngestionSource) -> bool:
"""
Validate that a source is accessible.
Args:
source_url: URL or path to source
source_type: Type of source
Returns:
True if valid
"""
if source_type == IngestionSource.WEB:
ingestor = FirecrawlIngestor()
elif source_type == IngestionSource.REPOSITORY:
ingestor = RepomixIngestor()
else:
raise ValueError(f"Unsupported source type: {source_type}")
result = await ingestor.validate_source(source_url)
return bool(result)
@task(name="initialize_storage", retries=3, retry_delay_seconds=5, tags=["storage"])
async def initialize_storage_task(config: StorageConfig | str) -> BaseStorage:
"""
Initialize storage backend.
Args:
config: Storage configuration block or block name
Returns:
Initialized storage adapter
"""
# Load block if string provided
if isinstance(config, str):
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"storage-config/{config}")
config = cast(StorageConfig, loaded_block)
if config.backend == StorageBackend.WEAVIATE:
storage = WeaviateStorage(config)
elif config.backend == StorageBackend.OPEN_WEBUI:
storage = OpenWebUIStorage(config)
elif config.backend == StorageBackend.R2R:
if RuntimeR2RStorage is None:
raise ValueError("R2R storage not available. Check dependencies.")
storage = RuntimeR2RStorage(config)
else:
raise ValueError(f"Unsupported backend: {config.backend}")
await storage.initialize()
return storage
def _cache_key_for_ingest_documents(ctx, params):
"""Custom cache key function that excludes non-serializable progress_callback."""
# Create cache key from serializable parameters only
job = params.get("job")
cache_params = {
"job_id": getattr(job, "id", None) if job else None,
"job_source_url": str(getattr(job, "source_url", "")) if job else "",
"job_source_type": getattr(job, "source_type", None) if job else None,
"collection_name": params.get("collection_name"),
"batch_size": params.get("batch_size"),
"storage_block_name": params.get("storage_block_name"),
"ingestor_config_block_name": params.get("ingestor_config_block_name"),
}
return f"ingest_docs_{hash(str(cache_params))}"
@task(
name="map_firecrawl_site",
name="ingest_documents",
retries=2,
retry_delay_seconds=15,
tags=["firecrawl", "map"],
cache_key_fn=lambda ctx, p: _safe_cache_key("firecrawl_map", p, "source_url"),
retry_delay_seconds=30,
tags=["ingestion"],
cache_key_fn=_cache_key_for_ingest_documents,
)
async def map_firecrawl_site_task(source_url: str, config: FirecrawlConfig | str) -> list[str]:
"""Map a site using Firecrawl and return discovered URLs."""
# Load block if string provided
if isinstance(config, str):
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"firecrawl-config/{config}")
config = cast(FirecrawlConfig, loaded_block)
ingestor = FirecrawlIngestor(config)
mapped = await ingestor.map_site(source_url)
return mapped or [source_url]
@task(
name="filter_existing_documents",
retries=1,
retry_delay_seconds=5,
tags=["dedup"],
cache_key_fn=lambda ctx, p: _safe_cache_key("filter_docs", p, "urls"),
) # Cache based on URL list
async def filter_existing_documents_task(
urls: list[str],
storage_client: BaseStorage,
stale_after_days: int = 30,
*,
collection_name: str | None = None,
) -> list[str]:
"""Filter URLs to only those that need scraping (missing or stale in storage)."""
import asyncio
logger = get_run_logger()
# Use semaphore to limit concurrent existence checks
semaphore = asyncio.Semaphore(20)
async def check_url_exists(url: str) -> tuple[str, bool]:
async with semaphore:
try:
document_id = str(FirecrawlIngestor.compute_document_id(url))
exists = await storage_client.check_exists(
document_id, collection_name=collection_name, stale_after_days=stale_after_days
)
return url, exists
except Exception as e:
logger.warning("Error checking existence for URL %s: %s", url, e)
# Assume doesn't exist on error to ensure we scrape it
return url, False
# Check all URLs in parallel - use return_exceptions=True for partial failure handling
results = await asyncio.gather(*[check_url_exists(url) for url in urls], return_exceptions=True)
# Collect URLs that need scraping, handling any exceptions
eligible = []
for result in results:
if isinstance(result, Exception):
logger.error("Unexpected error in parallel existence check: %s", result)
continue
# Type narrowing: result is now known to be tuple[str, bool]
if isinstance(result, tuple) and len(result) == 2:
url, exists = result
if not exists:
eligible.append(url)
skipped = len(urls) - len(eligible)
if skipped > 0:
logger.info("Skipping %s up-to-date documents in %s", skipped, storage_client.display_name)
return eligible
@task(
name="scrape_firecrawl_batch", retries=2, retry_delay_seconds=20, tags=["firecrawl", "scrape"]
)
async def scrape_firecrawl_batch_task(
batch_urls: list[str], config: FirecrawlConfig
) -> list[FirecrawlPage]:
"""Scrape a batch of URLs via Firecrawl."""
ingestor = FirecrawlIngestor(config)
result: list[FirecrawlPage] = await ingestor.scrape_pages(batch_urls)
return result
@task(name="annotate_firecrawl_metadata", retries=1, retry_delay_seconds=10, tags=["metadata"])
async def annotate_firecrawl_metadata_task(
pages: list[FirecrawlPage], job: IngestionJob
) -> list[Document]:
"""Annotate scraped pages with standardized metadata."""
if not pages:
return []
ingestor = FirecrawlIngestor()
documents = [ingestor.create_document(page, job) for page in pages]
try:
from ..config import get_settings
settings = get_settings()
async with MetadataTagger(llm_endpoint=str(settings.llm_endpoint)) as tagger:
tagged_documents: list[Document] = await tagger.tag_batch(documents)
return tagged_documents
except IngestionError as exc: # pragma: no cover - logging side effect
logger = get_run_logger()
logger.warning("Metadata tagging failed: %s", exc)
return documents
except Exception as exc: # pragma: no cover - defensive
logger = get_run_logger()
logger.warning("Metadata tagging unavailable, using base metadata: %s", exc)
return documents
@task(name="upsert_r2r_documents", retries=2, retry_delay_seconds=20, tags=["storage", "r2r"])
async def upsert_r2r_documents_task(
storage_client: R2RStorageType,
documents: list[Document],
collection_name: str | None,
) -> tuple[int, int]:
"""Upsert documents into R2R storage."""
if not documents:
return 0, 0
stored_ids: list[str] = await storage_client.store_batch(
documents, collection_name=collection_name
)
processed = len(stored_ids)
failed = len(documents) - processed
if failed:
logger = get_run_logger()
logger.warning("Failed to upsert %s documents to R2R", failed)
return processed, failed
@task(name="ingest_documents", retries=2, retry_delay_seconds=30, tags=["ingestion"])
async def ingest_documents_task(
job: IngestionJob,
collection_name: str | None = None,
@@ -252,7 +83,7 @@ async def ingest_documents_task(
storage_block_name: str | None = None,
ingestor_config_block_name: str | None = None,
progress_callback: Callable[[int, str], None] | None = None,
) -> tuple[int, int]:
) -> tuple[int, int, int]:
"""
Ingest documents from source with optional pre-initialized storage client.
@@ -266,8 +97,11 @@ async def ingest_documents_task(
progress_callback: Optional callback for progress updates
Returns:
Tuple of (processed_count, failed_count)
Tuple of (processed_count, failed_count, skipped_count)
"""
logger = get_run_logger()
logger.info("Starting document ingestion from %s source", job.source_type.value)
if progress_callback:
progress_callback(35, "Creating ingestor and storage clients...")
@@ -288,6 +122,10 @@ async def ingest_documents_task(
ingestor = await _create_ingestor(job, ingestor_config_block_name)
storage = storage_client or await _create_storage(job, collection_name, storage_block_name)
logger.info(
"Using %s storage backend with batch size %d", storage.__class__.__name__, batch_size
)
if progress_callback:
progress_callback(40, "Starting document processing...")
@@ -298,14 +136,40 @@ async def ingest_documents_task(
async def _create_ingestor(job: IngestionJob, config_block_name: str | None = None) -> BaseIngestor:
"""Create appropriate ingestor based on job source type."""
logger = get_run_logger()
if job.source_type == IngestionSource.WEB:
if config_block_name:
logger.info("Loading Firecrawl configuration block: %s", config_block_name)
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"firecrawl-config/{config_block_name}")
config = cast(FirecrawlConfig, loaded_block)
else:
# Fallback to default configuration
config = FirecrawlConfig()
logger.info(
"Created Firecrawl ingestor with max depth %d, limit %d", config.max_depth, config.limit
)
return FirecrawlIngestor(config)
elif job.source_type == IngestionSource.DOCUMENTATION:
if config_block_name:
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"firecrawl-config/{config_block_name}")
config = cast(FirecrawlConfig, loaded_block)
else:
# Use documentation-optimized configuration with deep crawling
config = FirecrawlConfig(
max_depth=50,
limit=2000,
include_subdomains=True,
only_main_content=True,
formats=["markdown", "html"],
)
logger.info(
"Created documentation ingestor with max depth %d, limit %d",
config.max_depth,
config.limit,
)
return FirecrawlIngestor(config)
elif job.source_type == IngestionSource.REPOSITORY:
if config_block_name:
@@ -313,8 +177,41 @@ async def _create_ingestor(job: IngestionJob, config_block_name: str | None = No
loaded_block = await Block.aload(f"repomix-config/{config_block_name}")
config = cast(RepomixConfig, loaded_block)
else:
# Fallback to default configuration
config = RepomixConfig()
# Create configuration with smart defaults from Prefect Variables
extraction_mode_var = await Variable.aget(
"default_repo_extraction_mode", default="smart"
)
max_output_size_var = await Variable.aget(
"default_repo_max_output_size", default="5000000"
)
# Handle variable types safely
extraction_mode = str(extraction_mode_var) if extraction_mode_var else "smart"
try:
max_output_size = int(float(str(max_output_size_var)))
except (ValueError, TypeError):
max_output_size = 5_000_000
# Validate extraction mode
valid_modes = ["full", "smart", "structure", "docs"]
if extraction_mode not in valid_modes:
extraction_mode = "smart"
config = RepomixConfig(
extraction_mode=cast(
Literal["full", "smart", "structure", "docs"], extraction_mode
),
max_output_size=max_output_size,
prioritize_docs=True,
use_compression=extraction_mode in ["structure", "smart"],
)
logger.info(
"Created Repomix ingestor with extraction mode '%s', max file size %d, max output size %d",
config.extraction_mode,
config.max_file_size,
config.max_output_size,
)
return RepomixIngestor(config)
else:
raise ValueError(f"Unsupported source: {job.source_type}")
@@ -324,12 +221,15 @@ async def _create_storage(
job: IngestionJob, collection_name: str | None, storage_block_name: str | None = None
) -> BaseStorage:
"""Create and initialize storage client."""
logger = get_run_logger()
if collection_name is None:
# Use variable for default collection prefix
prefix = await Variable.aget("default_collection_prefix", default="docs")
collection_name = f"{prefix}_{job.source_type.value}"
if storage_block_name:
logger.info("Loading storage configuration block: %s", storage_block_name)
# Load storage config from block
loaded_block = await Block.aload(f"storage-config/{storage_block_name}")
storage_config = cast(StorageConfig, loaded_block)
@@ -344,6 +244,7 @@ async def _create_storage(
storage = _instantiate_storage(job.storage_backend, storage_config)
await storage.initialize()
logger.info("Storage client initialized for collection: %s", collection_name)
return storage
@@ -415,10 +316,12 @@ async def _process_documents(
batch_size: int,
collection_name: str | None,
progress_callback: Callable[[int, str], None] | None = None,
) -> tuple[int, int]:
) -> tuple[int, int, int]:
"""Process documents in batches."""
logger = get_run_logger()
processed = 0
failed = 0
skipped = 0
batch: list[Document] = []
total_documents = 0
batch_count = 0
@@ -429,14 +332,17 @@ async def _process_documents(
# Use smart ingestion with deduplication if storage supports it
if hasattr(storage, "check_exists"):
try:
logger.info("Using smart ingestion with deduplication")
# Try to use the smart ingestion method
document_generator = ingestor.ingest_with_dedup(
job, storage, collection_name=collection_name
)
except Exception:
except Exception as e:
logger.warning("Smart ingestion failed, falling back to regular ingestion: %s", e)
# Fall back to regular ingestion if smart method fails
document_generator = ingestor.ingest(job)
else:
logger.info("Using regular ingestion (storage doesn't support existence checks)")
document_generator = ingestor.ingest(job)
async for document in document_generator:
@@ -451,7 +357,7 @@ async def _process_documents(
f"Processing batch {batch_count} ({total_documents} documents so far)...",
)
batch_processed, batch_failed = await _store_batch(storage, batch, collection_name)
batch_processed, batch_failed, _ = await _store_batch(storage, batch, collection_name)
processed += batch_processed
failed += batch_failed
batch = []
@@ -462,22 +368,31 @@ async def _process_documents(
if progress_callback:
progress_callback(80, f"Processing final batch ({total_documents} total documents)...")
batch_processed, batch_failed = await _store_batch(storage, batch, collection_name)
batch_processed, batch_failed, _ = await _store_batch(storage, batch, collection_name)
processed += batch_processed
failed += batch_failed
if progress_callback:
progress_callback(85, f"Completed processing {total_documents} documents")
return processed, failed
logger.info(
"Document processing complete: %d processed, %d failed, %d skipped",
processed,
failed,
skipped,
)
return processed, failed, skipped
async def _store_batch(
storage: BaseStorage,
batch: list[Document],
collection_name: str | None,
) -> tuple[int, int]:
"""Store a batch of documents and return processed/failed counts."""
) -> tuple[int, int, int]:
"""Store a batch of documents and return processed/failed counts and skipped count."""
logger = get_run_logger()
try:
# Apply metadata tagging for backends that benefit from it
processed_batch = batch
@@ -489,10 +404,11 @@ async def _store_batch(
from ..config import get_settings
settings = get_settings()
logger.info("Enhancing batch with LLM metadata tagging")
async with MetadataTagger(llm_endpoint=str(settings.llm_endpoint)) as tagger:
processed_batch = await tagger.tag_batch(batch)
except Exception as exc:
print(f"Metadata tagging failed, using original documents: {exc}")
logger.warning("Metadata tagging failed, using original documents: %s", exc)
processed_batch = batch
stored_ids = await storage.store_batch(processed_batch, collection_name=collection_name)
@@ -502,13 +418,17 @@ async def _store_batch(
batch_type = (
"final" if len(processed_batch) < 50 else ""
) # Assume standard batch size is 50
print(f"Successfully stored {processed_count} documents in {batch_type} batch".strip())
logger.info(
"Successfully stored %d documents in %s batch",
processed_count,
batch_type if batch_type else "regular",
)
return processed_count, failed_count
return processed_count, failed_count, 0 # No skipped in storage batch
except Exception as e:
batch_type = "Final" if len(batch) < 50 else "Batch"
print(f"{batch_type} storage failed: {e}")
return 0, len(batch)
logger.error("%s storage failed: %s", batch_type, e)
return 0, len(batch), 0 # 0 processed, all failed, 0 skipped
@flow(
@@ -521,9 +441,11 @@ async def firecrawl_to_r2r_flow(
job: IngestionJob,
collection_name: str | None = None,
progress_callback: Callable[[int, str], None] | None = None,
) -> tuple[int, int]:
) -> tuple[int, int, int]:
"""Specialized flow for Firecrawl ingestion into R2R."""
logger = get_run_logger()
logger.info("Starting Firecrawl to R2R ingestion for %s", job.source_url)
from ..config import get_settings
if progress_callback:
@@ -556,38 +478,40 @@ async def firecrawl_to_r2r_flow(
logger.info("Base URL %s exists and is fresh, skipping expensive mapping", base_url)
if progress_callback:
progress_callback(100, "Content is up to date, no processing needed")
return 0, 0
return 0, 0, 1 # 0 processed, 0 failed, 1 skipped
if progress_callback:
progress_callback(50, "Discovering pages with Firecrawl...")
discovered_urls = await map_firecrawl_site_task(base_url, firecrawl_config)
unique_urls = _deduplicate_urls(discovered_urls)
logger.info("Discovered %s unique URLs from Firecrawl map", len(unique_urls))
logger.info("Discovered %d unique URLs from Firecrawl map", len(unique_urls))
if progress_callback:
progress_callback(60, f"Found {len(unique_urls)} pages, filtering existing content...")
eligible_urls = await filter_existing_documents_task(
eligible_urls, skipped_count = await filter_existing_documents_task(
unique_urls, r2r_storage, collection_name=resolved_collection
)
if not eligible_urls:
logger.info("All Firecrawl pages are up to date for %s", job.source_url)
logger.info(
"All %d Firecrawl pages are up to date for %s", len(unique_urls), job.source_url
)
if progress_callback:
progress_callback(100, "All pages are up to date, no processing needed")
return 0, 0
progress_callback(
100, f"All {len(unique_urls)} pages are up to date, no processing needed"
)
return 0, 0, skipped_count
if progress_callback:
progress_callback(70, f"Scraping {len(eligible_urls)} new/updated pages...")
batch_size = min(settings.default_batch_size, firecrawl_config.limit)
url_batches = _chunk_urls(eligible_urls, batch_size)
logger.info("Scraping %s batches of Firecrawl pages", len(url_batches))
logger.info("Scraping %d URLs in %d batches", len(eligible_urls), len(url_batches))
# Use asyncio.gather for concurrent scraping
import asyncio
scrape_tasks = [scrape_firecrawl_batch_task(batch, firecrawl_config) for batch in url_batches]
batch_results = await asyncio.gather(*scrape_tasks)
@@ -595,6 +519,8 @@ async def firecrawl_to_r2r_flow(
for batch_pages in batch_results:
scraped_pages.extend(batch_pages)
logger.info("Successfully scraped %d pages", len(scraped_pages))
if progress_callback:
progress_callback(80, f"Processing {len(scraped_pages)} scraped pages...")
@@ -602,50 +528,23 @@ async def firecrawl_to_r2r_flow(
if not documents:
logger.warning("No documents produced after scraping for %s", job.source_url)
return 0, len(eligible_urls)
return 0, len(eligible_urls), skipped_count
if progress_callback:
progress_callback(90, f"Storing {len(documents)} documents in R2R...")
processed, failed = await upsert_r2r_documents_task(r2r_storage, documents, resolved_collection)
processed, failed, _ = await upsert_r2r_documents_task(
r2r_storage, documents, resolved_collection
)
logger.info("Upserted %s documents into R2R (%s failed)", processed, failed)
logger.info(
"Firecrawl to R2R flow complete: %d processed, %d failed, %d skipped",
processed,
failed,
skipped_count,
)
return processed, failed
@task(name="update_job_status", tags=["tracking"])
async def update_job_status_task(
job: IngestionJob,
status: IngestionStatus,
processed: int = 0,
_failed: int = 0,
error: str | None = None,
) -> IngestionJob:
"""
Update job status.
Args:
job: Ingestion job
status: New status
processed: Documents processed
_failed: Documents failed (currently unused)
error: Error message if any
Returns:
Updated job
"""
job.status = status
job.updated_at = datetime.now(UTC)
job.document_count = processed
if status == IngestionStatus.COMPLETED:
job.completed_at = datetime.now(UTC)
if error:
job.error_message = error
return job
return processed, failed, skipped_count
@flow(
@@ -677,7 +576,13 @@ async def create_ingestion_flow(
Returns:
Ingestion result
"""
print(f"Starting ingestion from {source_url}")
logger = get_run_logger()
logger.info(
"Starting ingestion pipeline: source=%s, type=%s, backend=%s",
source_url,
source_type,
storage_backend,
)
source_enum = IngestionSource(source_type)
backend_enum = StorageBackend(storage_backend)
@@ -694,18 +599,21 @@ async def create_ingestion_flow(
error_messages: list[str] = []
processed = 0
failed = 0
skipped = 0
try:
# Validate source if requested
if validate_first:
if progress_callback:
progress_callback(10, "Validating source...")
print("Validating source...")
logger.info("Validating source accessibility: %s", source_url)
is_valid = await validate_source_task(source_url, job.source_type)
if not is_valid:
raise IngestionError(f"Source validation failed: {source_url}")
logger.info("Source validation successful")
# Update status to in progress
if progress_callback:
progress_callback(20, "Initializing storage...")
@@ -714,52 +622,78 @@ async def create_ingestion_flow(
# Run ingestion
if progress_callback:
progress_callback(30, "Starting document ingestion...")
print("Ingesting documents...")
if job.source_type == IngestionSource.WEB and job.storage_backend == StorageBackend.R2R:
processed, failed = await firecrawl_to_r2r_flow(
logger.info("Starting document ingestion process")
if (
job.source_type in (IngestionSource.WEB, IngestionSource.DOCUMENTATION)
and job.storage_backend == StorageBackend.R2R
):
processed, failed, skipped = await firecrawl_to_r2r_flow(
job, collection_name, progress_callback=progress_callback
)
else:
processed, failed = await ingest_documents_task(
processed, failed, skipped = await ingest_documents_task(
job, collection_name, progress_callback=progress_callback
)
if progress_callback:
progress_callback(90, "Finalizing ingestion...")
# Update final status
# Update final status with improved logic
if failed > 0:
error_messages.append(f"{failed} documents failed to process")
# Set status based on results
if processed == 0 and failed > 0:
# Improved status determination logic
if processed == 0 and skipped > 0 and failed == 0:
final_status = IngestionStatus.COMPLETED # All documents were up-to-date
logger.info("Ingestion completed: all %d documents were already up-to-date", skipped)
elif processed == 0 and failed > 0:
final_status = IngestionStatus.FAILED
logger.error("Ingestion failed: %d failed, %d skipped", failed, skipped)
elif failed > 0:
final_status = IngestionStatus.PARTIAL
logger.warning(
"Ingestion partial: %d processed, %d failed, %d skipped", processed, failed, skipped
)
else:
final_status = IngestionStatus.COMPLETED
logger.info("Ingestion completed: %d processed, %d skipped", processed, skipped)
job = await update_job_status_task(job, final_status, processed=processed, _failed=failed)
print(f"Ingestion completed: {processed} processed, {failed} failed")
job = await update_job_status_task(
job, final_status, processed=processed, _failed=failed, skipped=skipped
)
except Exception as e:
print(f"Ingestion failed: {e}")
logger.error("Ingestion failed with exception: %s", e)
error_messages.append(str(e))
# Don't reset counts - keep whatever was processed before the error
job = await update_job_status_task(
job, IngestionStatus.FAILED, processed=processed, _failed=failed, error=str(e)
job,
IngestionStatus.FAILED,
processed=processed,
_failed=failed,
skipped=skipped,
error=str(e),
)
# Calculate duration
duration = (datetime.now(UTC) - start_time).total_seconds()
logger.info(
"Ingestion pipeline completed in %.2f seconds: %d processed, %d failed, %d skipped",
duration,
processed,
failed,
skipped,
)
return IngestionResult(
job_id=job.id,
status=job.status,
documents_processed=processed,
documents_failed=failed,
documents_skipped=skipped,
duration_seconds=duration,
error_messages=error_messages,
)

View File

@@ -9,7 +9,7 @@ from prefect.schedules import Cron, Interval
from prefect.variables import Variable
from ..core.models import IngestionSource, StorageBackend
from .ingestion import SourceTypeLike, StorageBackendLike, create_ingestion_flow
from .ingestion_flows import SourceTypeLike, StorageBackendLike, create_ingestion_flow
class FlowWithDeployment(Protocol):

View File

@@ -0,0 +1,24 @@
"""Task modules for ingestion flows."""
from .firecrawl_tasks import (
annotate_firecrawl_metadata_task,
map_firecrawl_site_task,
scrape_firecrawl_batch_task,
)
from .storage_tasks import (
filter_existing_documents_task,
initialize_storage_task,
upsert_r2r_documents_task,
)
from .validation_tasks import update_job_status_task, validate_source_task
__all__ = [
"annotate_firecrawl_metadata_task",
"filter_existing_documents_task",
"initialize_storage_task",
"map_firecrawl_site_task",
"scrape_firecrawl_batch_task",
"update_job_status_task",
"upsert_r2r_documents_task",
"validate_source_task",
]

View File

@@ -0,0 +1,100 @@
"""Firecrawl-specific tasks for ingestion flows."""
from typing import cast
from prefect import get_run_logger, task
from prefect.blocks.core import Block
from ...core.exceptions import IngestionError
from ...core.models import Document, FirecrawlConfig, IngestionJob
from ...ingestors import FirecrawlIngestor, FirecrawlPage
from ...utils.metadata_tagger import MetadataTagger
def _safe_cache_key(prefix: str, params: dict[str, object], key: str) -> str:
"""Create a type-safe cache key from task parameters."""
value = params.get(key, "")
return f"{prefix}_{hash(str(value))}"
@task(
name="map_firecrawl_site",
retries=2,
retry_delay_seconds=15,
tags=["firecrawl", "map"],
cache_key_fn=lambda ctx, p: _safe_cache_key("firecrawl_map", p, "source_url"),
)
async def map_firecrawl_site_task(source_url: str, config: FirecrawlConfig | str) -> list[str]:
"""Map a site using Firecrawl and return discovered URLs."""
logger = get_run_logger()
logger.info("Mapping site: %s", source_url)
# Load block if string provided
if isinstance(config, str):
logger.info("Loading Firecrawl configuration block: %s", config)
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"firecrawl-config/{config}")
config = cast(FirecrawlConfig, loaded_block)
ingestor = FirecrawlIngestor(config)
mapped = await ingestor.map_site(source_url)
urls = mapped or [source_url]
logger.info("Site map complete: found %d URLs", len(urls))
return urls
@task(
name="scrape_firecrawl_batch", retries=2, retry_delay_seconds=20, tags=["firecrawl", "scrape"]
)
async def scrape_firecrawl_batch_task(
batch_urls: list[str], config: FirecrawlConfig
) -> list[FirecrawlPage]:
"""Scrape a batch of URLs via Firecrawl."""
logger = get_run_logger()
logger.info("Scraping batch of %d URLs", len(batch_urls))
ingestor = FirecrawlIngestor(config)
result: list[FirecrawlPage] = await ingestor.scrape_pages(batch_urls)
logger.info(
"Batch scrape complete: %d successful, %d failed",
len(result),
len(batch_urls) - len(result),
)
return result
@task(name="annotate_firecrawl_metadata", retries=1, retry_delay_seconds=10, tags=["metadata"])
async def annotate_firecrawl_metadata_task(
pages: list[FirecrawlPage], job: IngestionJob
) -> list[Document]:
"""Annotate scraped pages with standardized metadata."""
logger = get_run_logger()
if not pages:
logger.info("No pages to annotate")
return []
logger.info("Annotating %d pages with metadata", len(pages))
ingestor = FirecrawlIngestor()
documents = [ingestor.create_document(page, job) for page in pages]
try:
from ...config import get_settings
settings = get_settings()
logger.info("Enhancing metadata with LLM tagging")
async with MetadataTagger(llm_endpoint=str(settings.llm_endpoint)) as tagger:
tagged_documents: list[Document] = await tagger.tag_batch(documents)
logger.info("Successfully enhanced metadata for %d documents", len(tagged_documents))
return tagged_documents
except IngestionError as exc: # pragma: no cover - logging side effect
logger.warning("Metadata tagging failed: %s", exc)
return documents
except Exception as exc: # pragma: no cover - defensive
logger.warning("Metadata tagging unavailable, using base metadata: %s", exc)
return documents

View File

@@ -0,0 +1,195 @@
"""Storage-related tasks for ingestion flows."""
from typing import TYPE_CHECKING, cast
from prefect import get_run_logger, task
from prefect.blocks.core import Block
from ...core.models import Document, StorageBackend, StorageConfig
from ...ingestors import FirecrawlIngestor
from ...storage import OpenWebUIStorage, WeaviateStorage
from ...storage import R2RStorage as RuntimeR2RStorage
from ...storage.base import BaseStorage
if TYPE_CHECKING:
from ...storage.r2r.storage import R2RStorage as R2RStorageType
else:
R2RStorageType = BaseStorage
def _safe_cache_key(prefix: str, params: dict[str, object], key: str) -> str:
"""Create a type-safe cache key from task parameters."""
value = params.get(key, "")
return f"{prefix}_{hash(str(value))}"
@task(name="initialize_storage", retries=3, retry_delay_seconds=5, tags=["storage"])
async def initialize_storage_task(config: StorageConfig | str) -> BaseStorage:
"""
Initialize storage backend.
Args:
config: Storage configuration block or block name
Returns:
Initialized storage adapter
"""
logger = get_run_logger()
# Load block if string provided
if isinstance(config, str):
logger.info("Loading storage configuration block: %s", config)
# Use Block.aload with type slug for better type inference
loaded_block = await Block.aload(f"storage-config/{config}")
config = cast(StorageConfig, loaded_block)
logger.info("Initializing %s storage backend at %s", config.backend.value, config.endpoint)
if config.backend == StorageBackend.WEAVIATE:
storage = WeaviateStorage(config)
elif config.backend == StorageBackend.OPEN_WEBUI:
storage = OpenWebUIStorage(config)
elif config.backend == StorageBackend.R2R:
if RuntimeR2RStorage is None:
raise ValueError("R2R storage not available. Check dependencies.")
storage = RuntimeR2RStorage(config)
else:
raise ValueError(f"Unsupported backend: {config.backend}")
await storage.initialize()
logger.info("Storage backend initialized successfully")
return storage
def _cache_key_for_filter_documents(ctx, params):
"""Custom cache key function that excludes non-serializable storage_client."""
# Create cache key from serializable parameters only
cache_params = {
"urls": params.get("urls"),
"stale_after_days": params.get("stale_after_days"),
"collection_name": params.get("collection_name"),
}
return f"filter_docs_{hash(str(cache_params))}"
@task(
name="filter_existing_documents",
retries=1,
retry_delay_seconds=5,
tags=["dedup"],
cache_key_fn=_cache_key_for_filter_documents,
)
async def filter_existing_documents_task(
urls: list[str],
storage_client: BaseStorage,
stale_after_days: int = 30,
*,
collection_name: str | None = None,
) -> tuple[list[str], int]:
"""
Filter URLs to only those that need scraping (missing or stale in storage).
Returns:
Tuple of (eligible_urls, skipped_count)
"""
import asyncio
logger = get_run_logger()
logger.info(
"Checking %d URLs for existing documents in %s", len(urls), storage_client.display_name
)
# Use semaphore to limit concurrent existence checks
semaphore = asyncio.Semaphore(20)
async def check_url_exists(url: str) -> tuple[str, bool]:
async with semaphore:
try:
document_id = str(FirecrawlIngestor.compute_document_id(url))
exists = await storage_client.check_exists(
document_id, collection_name=collection_name, stale_after_days=stale_after_days
)
return url, exists
except Exception as e:
logger.warning("Error checking existence for URL %s: %s", url, e)
# Assume doesn't exist on error to ensure we scrape it
return url, False
# Check all URLs in parallel - use return_exceptions=True for partial failure handling
results = await asyncio.gather(*[check_url_exists(url) for url in urls], return_exceptions=True)
# Collect URLs that need scraping, handling any exceptions
eligible = []
for result in results:
if isinstance(result, Exception):
logger.error("Unexpected error in parallel existence check: %s", result)
continue
# Type narrowing: result is now known to be tuple[str, bool]
if isinstance(result, tuple) and len(result) == 2:
url, exists = result
if not exists:
eligible.append(url)
skipped = len(urls) - len(eligible)
if skipped > 0:
logger.info(
"Found %d existing up-to-date documents, %d need processing", skipped, len(eligible)
)
else:
logger.info("All %d URLs need processing (no existing documents found)", len(urls))
return eligible, skipped
def _cache_key_for_upsert_r2r(ctx, params):
"""Custom cache key function that excludes non-serializable storage_client."""
# Create cache key from serializable parameters only
documents = params.get("documents", [])
cache_params = {
"documents_count": len(documents),
"collection_name": params.get("collection_name"),
"document_ids": [getattr(doc, "id", None) for doc in documents if hasattr(doc, "id")],
}
return f"upsert_r2r_{hash(str(cache_params))}"
@task(
name="upsert_r2r_documents",
retries=2,
retry_delay_seconds=20,
tags=["storage", "r2r"],
cache_key_fn=_cache_key_for_upsert_r2r,
)
async def upsert_r2r_documents_task(
storage_client: R2RStorageType,
documents: list[Document],
collection_name: str | None,
) -> tuple[int, int, int]:
"""
Upsert documents into R2R storage.
Returns:
Tuple of (processed_count, failed_count, skipped_count)
"""
logger = get_run_logger()
if not documents:
logger.info("No documents to upsert")
return 0, 0, 0
logger.info(
"Upserting %d documents to R2R collection: %s", len(documents), collection_name or "default"
)
stored_ids: list[str] = await storage_client.store_batch(
documents, collection_name=collection_name
)
processed = len(stored_ids)
failed = len(documents) - processed
if failed:
logger.warning("Failed to upsert %d documents to R2R", failed)
else:
logger.info("Successfully upserted all %d documents to R2R", processed)
return processed, failed, 0 # No skipped in R2R upsert

View File

@@ -0,0 +1,70 @@
"""Validation and job management tasks."""
from datetime import UTC, datetime
from prefect import task
from ...core.models import IngestionJob, IngestionSource, IngestionStatus
from ...ingestors import FirecrawlIngestor, RepomixIngestor
@task(name="validate_source", retries=2, retry_delay_seconds=10, tags=["validation"])
async def validate_source_task(source_url: str, source_type: IngestionSource) -> bool:
"""
Validate that a source is accessible.
Args:
source_url: URL or path to source
source_type: Type of source
Returns:
True if valid
"""
if source_type == IngestionSource.WEB:
ingestor = FirecrawlIngestor()
elif source_type == IngestionSource.DOCUMENTATION:
ingestor = FirecrawlIngestor()
elif source_type == IngestionSource.REPOSITORY:
ingestor = RepomixIngestor()
else:
raise ValueError(f"Unsupported source type: {source_type}")
result = await ingestor.validate_source(source_url)
return bool(result)
@task(name="update_job_status", tags=["tracking"])
async def update_job_status_task(
job: IngestionJob,
status: IngestionStatus,
processed: int = 0,
_failed: int = 0,
skipped: int = 0,
error: str | None = None,
) -> IngestionJob:
"""
Update job status.
Args:
job: Ingestion job
status: New status
processed: Documents processed
_failed: Documents failed (currently unused)
skipped: Documents skipped
error: Error message if any
Returns:
Updated job
"""
job.status = status
job.updated_at = datetime.now(UTC)
job.document_count = processed
job.documents_skipped = skipped
if status == IngestionStatus.COMPLETED:
job.completed_at = datetime.now(UTC)
if error:
job.error_message = error
return job

View File

@@ -20,7 +20,6 @@ from ..core.models import (
DocumentMetadata,
FirecrawlConfig,
IngestionJob,
IngestionSource,
)
from .base import BaseIngestor
@@ -252,6 +251,17 @@ class FirecrawlIngestor(BaseIngestor):
# First, map the site to understand its structure
site_map = await self.map_site(url) or [url]
# Use Prefect logger if available, fall back to standard logging
try:
from prefect import get_run_logger
logger = get_run_logger()
except ImportError:
import logging
logger = logging.getLogger(__name__)
logger.info(f"Mapped {len(site_map)} URLs from {url}")
# Filter out URLs that already exist in storage and are fresh
eligible_urls: list[str] = []
for check_url in site_map:
@@ -262,7 +272,11 @@ class FirecrawlIngestor(BaseIngestor):
if not exists:
eligible_urls.append(check_url)
skipped_count = len(site_map) - len(eligible_urls)
logger.info(f"Found {len(eligible_urls)} new URLs, {skipped_count} already exist")
if not eligible_urls:
logger.info("No new documents to scrape - all are up to date")
return # No new documents to scrape
# Process eligible pages in batches
@@ -638,7 +652,7 @@ class FirecrawlIngestor(BaseIngestor):
id=self.compute_document_id(source_url),
content=content,
metadata=metadata,
source=IngestionSource.WEB,
source=job.source_type,
collection=job.storage_backend.value,
)

View File

@@ -1,6 +1,7 @@
"""Repomix ingestor for Git repositories."""
import asyncio
import logging
import re
import subprocess
import tempfile
@@ -78,9 +79,7 @@ class RepomixIngestor(BaseIngestor):
["git", "ls-remote", "--heads", source_url], timeout=10
)
return result.returncode == 0
except Exception as e:
import logging
except (subprocess.SubprocessError, TimeoutError, OSError) as e:
logging.warning(f"Failed to validate repository {source_url}: {e}")
return False
@@ -108,9 +107,7 @@ class RepomixIngestor(BaseIngestor):
file_count += len(files)
return file_count
except Exception as e:
import logging
except (subprocess.SubprocessError, TimeoutError, OSError, IngestionError) as e:
logging.warning(f"Failed to estimate size for repository {source_url}: {e}")
return 0
@@ -143,6 +140,102 @@ class RepomixIngestor(BaseIngestor):
return repo_path
def _build_repomix_command(self, repo_path: Path, output_file: Path) -> list[str]:
"""
Build repomix command based on extraction mode and configuration.
Args:
repo_path: Path to the repository
output_file: Path for output file
Returns:
Complete repomix command as list of strings
"""
cmd = ["repomix", "--output", str(output_file)]
# Set output format
cmd.extend(["--style", self.config.output_format])
# Configure based on extraction mode
if self.config.extraction_mode == "structure":
cmd.append("--compress")
cmd.append("--no-file-summary")
elif self.config.extraction_mode == "docs":
cmd.append("--no-files")
cmd.append("--no-directory-structure")
elif self.config.extraction_mode == "smart":
if self.config.use_compression:
cmd.append("--compress")
cmd.extend(["--top-files-len", str(self.config.top_files_count)])
cmd.append("--output-show-line-numbers")
# Add include patterns based on extraction mode
include_patterns = self._get_smart_include_patterns()
for pattern in include_patterns:
if pattern.strip(): # Avoid empty patterns
cmd.extend(["--include", pattern])
# Add ignore patterns (repomix uses --ignore, not --exclude)
if self.config.exclude_patterns:
for pattern in self.config.exclude_patterns:
if pattern.strip(): # Avoid empty patterns
cmd.extend(["--ignore", pattern])
# Git-related options
if self.config.include_git_history:
cmd.append("--include-diffs")
cmd.append("--include-logs")
# Other options - gitignore is respected by default
if not self.config.respect_gitignore:
cmd.append("--no-gitignore")
cmd.append("--truncate-base64")
return cmd
def _get_smart_include_patterns(self) -> list[str]:
"""Get include patterns based on extraction mode and prioritization."""
if self.config.extraction_mode == "docs":
return [
"README*",
"*.md",
"docs/**",
"documentation/**",
"*.rst",
"*.txt",
"CHANGELOG*",
"LICENSE*",
"CONTRIBUTING*",
]
elif self.config.extraction_mode == "structure":
return self.config.include_patterns
elif self.config.extraction_mode == "smart" and self.config.prioritize_docs:
# Prioritize docs but include code patterns
return [
"README*",
"*.md",
"docs/**",
"*.py",
"*.js",
"*.ts",
"*.jsx",
"*.tsx",
"*.java",
"*.go",
"*.rs",
"*.yaml",
"*.yml",
"*.json",
"package.json",
"pyproject.toml",
"Cargo.toml",
"go.mod",
"requirements*.txt",
]
else:
return self.config.include_patterns
async def _run_repomix(self, repo_path: Path) -> Path:
"""
Run repomix on a repository.
@@ -153,23 +246,13 @@ class RepomixIngestor(BaseIngestor):
Returns:
Path to repomix output file
"""
output_file = repo_path / "repomix-output.md"
# Set output file extension based on format
extension_map = {"xml": "xml", "markdown": "md", "json": "json"}
ext = extension_map.get(self.config.output_format, "md")
output_file = repo_path / f"repomix-output.{ext}"
# Build repomix command
cmd = ["npx", "repomix", "--output", str(output_file)]
# Add include patterns
if self.config.include_patterns:
for pattern in self.config.include_patterns:
cmd.extend(["--include", pattern])
# Add exclude patterns
if self.config.exclude_patterns:
for pattern in self.config.exclude_patterns:
cmd.extend(["--exclude", pattern])
if self.config.respect_gitignore:
cmd.append("--respect-gitignore")
# Build command using the new smart command builder
cmd = self._build_repomix_command(repo_path, output_file)
result = await self._run_command(cmd, cwd=str(repo_path), timeout=120)
@@ -181,14 +264,14 @@ class RepomixIngestor(BaseIngestor):
async def _parse_repomix_output(self, output_file: Path, job: IngestionJob) -> list[Document]:
"""
Parse repomix output into documents.
Parse repomix output into documents with intelligent content extraction.
Args:
output_file: Path to repomix output
job: The ingestion job
Returns:
List of documents
List of documents, prioritized by importance
"""
documents: list[Document] = []
@@ -199,14 +282,51 @@ class RepomixIngestor(BaseIngestor):
repo_info = self._extract_repository_info(str(job.source_url))
content = output_file.read_text()
# Check if output exceeds size limit
if len(content) > self.config.max_output_size:
logging.warning(
f"Repomix output size ({len(content)} bytes) exceeds limit ({self.config.max_output_size} bytes)"
)
# Split by file markers (repomix uses specific delimiters)
file_sections = self._split_by_files(content)
# Calculate importance scores for all files
file_importance_scores: list[tuple[str, str, float]] = []
for file_path, file_content in file_sections.items():
if len(file_content) > self.config.max_file_size:
# Split large files into chunks
chunks = self._chunk_content(file_content)
if file_path != "repository": # Skip the general repository content
importance = self._calculate_file_importance(file_path, file_content)
file_importance_scores.append((file_path, file_content, importance))
# Sort by importance (highest first)
file_importance_scores.sort(key=lambda x: x[2], reverse=True)
# Process files in order of importance, stopping if size limit reached
current_output_size = 0
processed_files = 0
for file_path, file_content, importance in file_importance_scores:
# Early termination if we've reached size limit
if current_output_size >= self.config.max_output_size:
logging.info(f"Stopping processing at file {processed_files} due to size limit")
break
# Extract meaningful content based on extraction mode
meaningful_content = self._extract_meaningful_content(
file_path, file_content, self.config.extraction_mode
)
if not meaningful_content.strip():
continue
# Handle large files with chunking
if len(meaningful_content) > self.config.max_file_size:
chunks = self._chunk_content(meaningful_content)
for i, chunk in enumerate(chunks):
if current_output_size + len(chunk) > self.config.max_output_size:
break
doc = self._create_document(
file_path,
chunk,
@@ -214,19 +334,162 @@ class RepomixIngestor(BaseIngestor):
chunk_index=i,
git_metadata=git_metadata,
repo_info=repo_info,
importance_score=importance,
)
documents.append(doc)
current_output_size += len(chunk)
else:
doc = self._create_document(
file_path, file_content, job, git_metadata=git_metadata, repo_info=repo_info
)
documents.append(doc)
if current_output_size + len(meaningful_content) <= self.config.max_output_size:
doc = self._create_document(
file_path,
meaningful_content,
job,
git_metadata=git_metadata,
repo_info=repo_info,
importance_score=importance,
)
documents.append(doc)
current_output_size += len(meaningful_content)
processed_files += 1
# Add repository summary if we have space and it exists
if (
"repository" in file_sections
and current_output_size < self.config.max_output_size * 0.8
):
repo_content = file_sections["repository"]
if repo_content.strip():
# Limit repo content to stay within size bounds
truncated_content = repo_content[: self.config.max_file_size]
if current_output_size + len(truncated_content) <= self.config.max_output_size:
doc = self._create_document(
"repository_summary",
truncated_content,
job,
git_metadata=git_metadata,
repo_info=repo_info,
importance_score=0.9,
)
documents.append(doc)
current_output_size += len(truncated_content)
logging.info(
f"Processed {processed_files} files, created {len(documents)} documents, "
f"total size: {current_output_size} bytes"
)
except Exception as e:
raise IngestionError(f"Failed to parse repomix output: {e}") from e
return documents
def _extract_meaningful_content(
self, file_path: str, content: str, extraction_mode: str
) -> str:
"""
Extract meaningful content from a file based on extraction mode.
Args:
file_path: Path to the file
content: File content
extraction_mode: Extraction strategy
Returns:
Processed content with meaningful parts extracted
"""
if extraction_mode == "full":
return content
# For documentation files, preserve full content
if any(ext in file_path.lower() for ext in [".md", ".rst", ".txt", "readme", "changelog"]):
return content
# For structure mode, content should already be compressed by repomix
if extraction_mode == "structure":
return content
# For smart mode, extract key patterns based on file type
if extraction_mode == "smart":
return self._extract_code_surface(file_path, content)
return content
def _extract_code_surface(self, file_path: str, content: str) -> str:
"""
Extract surface-level code patterns (APIs, interfaces, classes) from code files.
Args:
file_path: Path to the file
content: File content
Returns:
Extracted surface content
"""
lines = content.split("\n")
extracted_lines: list[str] = []
language = self._detect_programming_language(file_path, content)
# Always include imports and module-level declarations
for line in lines[:50]: # Check first 50 lines for imports
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
# Python imports and class/function definitions
if language == "python":
if any(
stripped.startswith(keyword)
for keyword in ["import ", "from ", "class ", "def ", "async def ", "@"]
):
extracted_lines.append(line)
elif stripped.startswith('"""') or stripped.startswith("'''"):
# Include docstrings
extracted_lines.append(line)
# JavaScript/TypeScript exports, imports, interfaces
elif language in ["javascript", "typescript"]:
if any(
keyword in stripped
for keyword in [
"import ",
"export ",
"interface ",
"type ",
"class ",
"function ",
"const ",
"let ",
"var ",
]
) and not stripped.startswith("//"):
extracted_lines.append(line)
# Java public declarations
elif language == "java":
if any(
keyword in stripped
for keyword in [
"package ",
"import ",
"public class",
"public interface",
"public ",
"@",
]
) and not stripped.startswith("//"):
extracted_lines.append(line)
# Include meaningful content but limit size
result = "\n".join(extracted_lines)
# If we extracted very little, include more of the original
if len(result) < len(content) * 0.1:
return content[:2000] # First 2KB as fallback
return result
def _split_by_files(self, content: str) -> dict[str, str]:
"""
Split repomix output by files.
@@ -243,10 +506,15 @@ class RepomixIngestor(BaseIngestor):
for line in content.split("\n"):
# Look for file markers (adjust based on actual repomix format)
if line.startswith("## File:") or line.startswith("### "):
if line.startswith("## File:"):
if current_file:
files[current_file] = "\n".join(current_content)
current_file = line.replace("## File:", "").replace("### ", "").strip()
current_file = line.replace("## File:", "").strip()
current_content = []
elif line.startswith("### "):
if current_file:
files[current_file] = "\n".join(current_content)
current_file = line.replace("### ", "").strip()
current_content = []
else:
current_content.append(line)
@@ -355,6 +623,91 @@ class RepomixIngestor(BaseIngestor):
return None
@staticmethod
def _calculate_file_importance(file_path: str, content: str) -> float:
"""
Calculate importance score for a file based on path and content.
Returns score from 0.0 to 1.0 where 1.0 is most important.
"""
path = Path(file_path)
filename = path.name.lower()
path_parts = [part.lower() for part in path.parts]
# Documentation files get highest priority
if any(
keyword in filename for keyword in ["readme", "changelog", "license", "contributing"]
):
return 1.0
if path.suffix.lower() in [".md", ".rst", ".txt"] and any(
keyword in path_parts for keyword in ["docs", "documentation", "guide"]
):
return 0.95
# Configuration and project files
config_files = {
"package.json": 0.9,
"pyproject.toml": 0.9,
"cargo.toml": 0.9,
"go.mod": 0.9,
"pom.xml": 0.9,
"dockerfile": 0.85,
"docker-compose.yml": 0.85,
"requirements.txt": 0.8,
"makefile": 0.8,
}
if filename in config_files:
return config_files[filename]
# API and interface files
if any(keyword in filename for keyword in ["api", "interface", "types", "schema"]):
return 0.85
# Main entry points
if filename in ["main.py", "index.js", "app.py", "server.py", "main.go", "lib.rs"]:
return 0.8
# Source code by language (prioritize by surface area)
extension_scores = {
".py": 0.7,
".js": 0.7,
".ts": 0.75, # TypeScript often has better interfaces
".jsx": 0.65,
".tsx": 0.7,
".java": 0.65,
".go": 0.7,
".rs": 0.7,
".cpp": 0.6,
".c": 0.6,
".h": 0.75, # Header files are interfaces
".hpp": 0.75,
}
if path.suffix.lower() in extension_scores:
base_score = extension_scores[path.suffix.lower()]
# Reduce score for test files
if any(keyword in path_parts for keyword in ["test", "tests", "spec", "__tests__"]):
return base_score * 0.3
# Reduce score for generated or build files
if any(
keyword in path_parts
for keyword in ["dist", "build", "generated", ".next", "target"]
):
return base_score * 0.1
return base_score
# YAML/JSON configuration files
if path.suffix.lower() in [".yaml", ".yml", ".json"]:
return 0.6
# Other files get low priority
return 0.3
@staticmethod
def _analyze_code_structure(content: str, language: str | None) -> dict[str, object]:
"""Analyze code structure and extract metadata."""
@@ -441,7 +794,7 @@ class RepomixIngestor(BaseIngestor):
"branch_name": branch_name,
"commit_hash": commit_hash[:8] if commit_hash else None, # Short hash
}
except Exception:
except (subprocess.SubprocessError, TimeoutError, OSError):
return {"branch_name": None, "commit_hash": None}
def _create_document(
@@ -452,6 +805,7 @@ class RepomixIngestor(BaseIngestor):
chunk_index: int = 0,
git_metadata: dict[str, str | None] | None = None,
repo_info: dict[str, str] | None = None,
importance_score: float | None = None,
) -> Document:
"""
Create a Document from repository content with enriched metadata.
@@ -463,6 +817,7 @@ class RepomixIngestor(BaseIngestor):
chunk_index: Index if content is chunked
git_metadata: Git repository metadata
repo_info: Repository information
importance_score: File importance score (0.0-1.0)
Returns:
Document instance with rich metadata
@@ -515,6 +870,13 @@ class RepomixIngestor(BaseIngestor):
metadata["branch_name"] = git_metadata.get("branch_name")
metadata["commit_hash"] = git_metadata.get("commit_hash")
# Add file importance score
if importance_score is not None:
metadata["importance_score"] = importance_score
# Add extraction mode info
metadata["extraction_mode"] = self.config.extraction_mode
# Add code-specific metadata for programming files
if programming_language and structure_info:
# Calculate code quality score
@@ -577,7 +939,5 @@ class RepomixIngestor(BaseIngestor):
try:
await asyncio.wait_for(proc.wait(), timeout=2.0)
except TimeoutError:
import logging
logging.warning(f"Process {proc.pid} did not terminate cleanly")
raise IngestionError(f"Command timed out: {' '.join(cmd)}") from e

View File

@@ -114,7 +114,7 @@ class MetadataTagger:
self, document: Document, custom_instructions: str | None = None
) -> Document:
"""
Analyze document and generate metadata tags.
Analyze document and generate metadata tags with retry logic.
Args:
document: Document to tag
@@ -126,16 +126,77 @@ class MetadataTagger:
if not document.content:
return document
return await self._tag_with_retry(document, custom_instructions, max_retries=2)
async def _tag_with_retry(
self, document: Document, custom_instructions: str | None = None, max_retries: int = 2
) -> Document:
"""
Tag document with retry logic for handling LLM failures.
Args:
document: Document to tag
custom_instructions: Optional custom instructions for tagging
max_retries: Maximum number of retry attempts
Returns:
Document with enriched metadata
Raises:
IngestionError: If all retry attempts fail
"""
last_error: Exception | None = None
for attempt in range(max_retries + 1):
try:
# Generate metadata using LLM
metadata = await self._generate_metadata(
document.content,
document.metadata.get("title") if document.metadata else None,
custom_instructions,
)
# If we got here, LLM call succeeded, process the metadata
return await self._apply_metadata_to_document(document, metadata)
except IngestionError as e:
last_error = e
if attempt < max_retries:
# Log retry attempt and wait briefly before retrying
import logging
logger = logging.getLogger(__name__)
logger.warning(
f"Metadata tagging attempt {attempt + 1} failed: {e}. Retrying..."
)
import asyncio
await asyncio.sleep(0.5 * (attempt + 1)) # Exponential backoff
else:
# All retries exhausted
raise e
# This should never be reached, but satisfy type checker
if last_error:
raise last_error
else:
raise IngestionError("Unknown error in metadata tagging retry logic")
async def _apply_metadata_to_document(
self, document: Document, metadata: DocumentMetadata
) -> Document:
"""
Apply generated metadata to document.
Args:
document: Document to update
metadata: Generated metadata to apply
Returns:
Document with updated metadata
"""
try:
# Generate metadata using LLM
metadata = await self._generate_metadata(
document.content,
document.metadata.get("title") if document.metadata else None,
custom_instructions,
)
# Merge with existing metadata - preserve ALL existing fields and add LLM-generated ones
from ..core.models import DocumentMetadata as CoreDocumentMetadata
# Start with a copy of existing metadata to preserve all fields
@@ -185,11 +246,10 @@ class MetadataTagger:
new_metadata["language"] = str(updated_metadata["language"])
document.metadata = new_metadata
return document
except Exception as e:
raise IngestionError(f"Failed to tag document: {e}") from e
raise IngestionError(f"Failed to apply metadata to document: {e}") from e
async def tag_batch(
self,
@@ -234,16 +294,29 @@ class MetadataTagger:
# Prepare the prompt
system_prompt = """You are a document metadata tagger. Analyze the given content and generate relevant metadata.
Return a JSON object with the following structure:
CRITICAL: Return ONLY a valid JSON object with no additional text, comments, or markdown formatting.
Required JSON structure (no comments allowed in actual response):
{
"tags": ["tag1", "tag2", ...], # 3-7 relevant topic tags
"category": "string", # Main category
"summary": "string", # 1-2 sentence summary
"key_topics": ["topic1", "topic2", ...], # Main topics discussed
"document_type": "string", # Type of document (e.g., "technical", "tutorial", "reference")
"language": "string", # Primary language (e.g., "en", "es")
"technical_level": "string" # One of: "beginner", "intermediate", "advanced"
}"""
"tags": ["tag1", "tag2"],
"category": "string",
"summary": "string",
"key_topics": ["topic1", "topic2"],
"document_type": "string",
"language": "string",
"technical_level": "string"
}
Rules:
- tags: 3-7 relevant topic tags
- category: Main category (one word/phrase)
- summary: 1-2 sentence summary (under 200 chars)
- key_topics: Main topics discussed (2-5 items)
- document_type: One of "technical", "tutorial", "reference", "guide", "documentation"
- language: Primary language code (e.g., "en", "es", "fr")
- technical_level: One of "beginner", "intermediate", "advanced"
Do not include any text before or after the JSON object."""
if custom_instructions:
system_prompt += f"\n\nAdditional instructions: {custom_instructions}"
@@ -296,7 +369,12 @@ Return a JSON object with the following structure:
try:
raw_metadata = cast(dict[str, object], json.loads(content_str))
except json.JSONDecodeError as e:
raise IngestionError(f"Failed to parse LLM response: {e}") from e
# Attempt to repair common JSON issues
try:
repaired_content = self._attempt_json_repair(content_str)
raw_metadata = cast(dict[str, object], json.loads(repaired_content))
except json.JSONDecodeError:
raise IngestionError(f"Failed to parse LLM response: {e}") from e
# Ensure it's a dict before processing
if not isinstance(raw_metadata, dict):
@@ -305,6 +383,56 @@ Return a JSON object with the following structure:
# Validate and sanitize metadata
return self._sanitize_metadata(raw_metadata)
def _attempt_json_repair(self, content: str) -> str:
"""
Attempt to repair common JSON issues in LLM responses.
Args:
content: The potentially malformed JSON string
Returns:
Repaired JSON string
"""
# Remove common prefixes/suffixes that LLMs sometimes add
content = content.strip()
# Remove markdown code block markers
if content.startswith("```json"):
content = content[7:]
if content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
# Try to find JSON object boundaries
start_idx = content.find("{")
end_idx = content.rfind("}")
if start_idx != -1 and end_idx != -1 and end_idx > start_idx:
content = content[start_idx : end_idx + 1]
# Fix common quote issues
quote_count = content.count('"')
if quote_count % 2 != 0: # Odd number of quotes
# Try to close the last unterminated string
content = content.rstrip() + '"'
# Fix missing closing brace
open_braces = content.count("{")
close_braces = content.count("}")
if open_braces > close_braces:
content += "}" * (open_braces - close_braces)
# Fix missing closing bracket for arrays
open_brackets = content.count("[")
close_brackets = content.count("]")
if open_brackets > close_brackets:
content += "]" * (open_brackets - close_brackets)
return content
def _sanitize_metadata(self, metadata: dict[str, object]) -> DocumentMetadata:
"""
Sanitize and validate metadata.

File diff suppressed because it is too large Load Diff

View File

@@ -42,6 +42,7 @@ dev-dependencies = [
"sourcery>=1.37.0",
"pylance>=0.36.0",
"marimo>=0.16.0",
"basedpyright>=1.31.4",
]
[tool.ruff]

View File

@@ -57,7 +57,9 @@ async def test_filter_existing_documents_task_filters_known_urls(
class StubStorage(BaseStorage):
def __init__(self) -> None:
super().__init__(StorageConfig(backend=StorageBackend.WEAVIATE, endpoint="http://test.local"))
super().__init__(
StorageConfig(backend=StorageBackend.WEAVIATE, endpoint="http://test.local")
)
@property
def display_name(self) -> str:

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
import pytest
from ingest_pipeline.core.models import IngestionJob, IngestionSource, StorageBackend
from ingest_pipeline.core.models import IngestionJob, IngestionSource, RepomixConfig, StorageBackend
from ingest_pipeline.ingestors.repomix import RepomixIngestor
@@ -86,3 +86,275 @@ def test_create_document_enriches_metadata() -> None:
assert title is not None
assert title.endswith("(chunk 1)")
assert document.collection == job.storage_backend.value
# New tests for smart extraction functionality
@pytest.mark.parametrize(
("file_path", "content", "expected_score"),
(
("README.md", "# Project", 1.0),
("package.json", '{"name": "test"}', 0.9),
("src/api.py", "def get_user():\n pass", 0.85),
("main.py", "if __name__ == '__main__':", 0.8),
("src/utils.py", "def helper():\n pass", 0.7),
("tests/test_app.py", "def test_feature():\n pass", 0.21), # 0.7 * 0.3
("dist/bundle.js", "compiled code", 0.07), # 0.7 * 0.1
),
)
def test_calculate_file_importance_scores_correctly(
file_path: str, content: str, expected_score: float
) -> None:
score = RepomixIngestor._calculate_file_importance(file_path, content)
assert abs(score - expected_score) < 0.01
@pytest.mark.parametrize(
("extraction_mode", "expected_patterns"),
(
("docs", ["README*", "*.md", "docs/**", "*.rst"]),
("smart", ["README*", "*.md", "docs/**", "*.py", "*.js", "*.ts"]),
("structure", ["*.py", "*.js", "*.ts", "*.md", "*.yaml", "*.json"]),
),
)
def test_get_smart_include_patterns_by_mode(
extraction_mode: str, expected_patterns: list[str]
) -> None:
config = RepomixConfig(extraction_mode=extraction_mode) # type: ignore[arg-type]
ingestor = RepomixIngestor(config)
patterns = ingestor._get_smart_include_patterns()
# Check that expected patterns are present
for pattern in expected_patterns:
assert pattern in patterns
def test_build_repomix_command_with_smart_mode() -> None:
from pathlib import Path
config = RepomixConfig(
extraction_mode="smart",
use_compression=True,
output_format="markdown",
top_files_count=15,
include_git_history=True,
)
ingestor = RepomixIngestor(config)
repo_path = Path("/tmp/repo")
output_file = Path("/tmp/repo/output.md")
cmd = ingestor._build_repomix_command(repo_path, output_file)
assert "npx" in cmd
assert "repomix" in cmd
assert "--style" in cmd
assert "markdown" in cmd
assert "--compress" in cmd
assert "--top-files-len" in cmd
assert "15" in cmd
assert "--include-diffs" in cmd
assert "--include-logs" in cmd
assert "--truncate-base64" in cmd
def test_build_repomix_command_with_docs_mode() -> None:
from pathlib import Path
config = RepomixConfig(
extraction_mode="docs",
output_format="json",
)
ingestor = RepomixIngestor(config)
repo_path = Path("/tmp/repo")
output_file = Path("/tmp/repo/output.json")
cmd = ingestor._build_repomix_command(repo_path, output_file)
assert "--style" in cmd
assert "json" in cmd
assert "--no-files" in cmd
assert "--no-directory-structure" in cmd
def test_extract_meaningful_content_preserves_docs() -> None:
config = RepomixConfig(extraction_mode="smart")
ingestor = RepomixIngestor(config)
content = "# Title\n\nThis is documentation content."
result = ingestor._extract_meaningful_content("README.md", content, "smart")
assert result == content
def test_extract_code_surface_extracts_key_patterns() -> None:
config = RepomixConfig()
ingestor = RepomixIngestor(config)
content = '''
import os
from typing import Dict
class UserService:
"""Service for user operations."""
def __init__(self):
self.db = None
def get_user(self, user_id: int) -> Dict[str, str]:
"""Get user by ID."""
# Implementation details
result = self.db.query("SELECT * FROM users WHERE id = ?", user_id)
return result
def _private_method(self):
"""Private helper method."""
pass
'''
result = ingestor._extract_code_surface("src/user_service.py", content)
# Should include imports and class/function definitions
assert "import os" in result
assert "from typing import Dict" in result
assert "class UserService:" in result
assert "def get_user(" in result
# Should not include implementation details
assert "Implementation details" not in result
def test_create_document_includes_importance_score() -> None:
config = RepomixConfig()
ingestor = RepomixIngestor(config)
job = IngestionJob(
source_url="https://example.com/repo.git",
source_type=IngestionSource.REPOSITORY,
storage_backend=StorageBackend.WEAVIATE,
)
document = ingestor._create_document(
"README.md",
"# Project Title",
job,
importance_score=1.0,
)
assert document.metadata["importance_score"] == 1.0
assert document.metadata["extraction_mode"] == "smart"
def test_create_document_without_importance_score() -> None:
config = RepomixConfig()
ingestor = RepomixIngestor(config)
job = IngestionJob(
source_url="https://example.com/repo.git",
source_type=IngestionSource.REPOSITORY,
storage_backend=StorageBackend.WEAVIATE,
)
document = ingestor._create_document(
"src/app.py",
"def main(): pass",
job,
)
assert "importance_score" not in document.metadata
assert document.metadata["extraction_mode"] == "smart"
def test_split_by_files_handles_mixed_markers() -> None:
"""Test that file splitting correctly handles both types of markers."""
ingestor = RepomixIngestor()
content = """
## File: README.md
# Project Title
### src/app.py
def main():
pass
## File: setup.py
from setuptools import setup
"""
result = ingestor._split_by_files(content)
assert "README.md" in result
assert "src/app.py" in result
assert "setup.py" in result
assert len(result) == 3
def test_chunk_content_handles_oversized_lines() -> None:
"""Test chunking behavior with lines larger than chunk size."""
ingestor = RepomixIngestor()
# Create content with a line larger than chunk size
oversized_line = "x" * 1000 # 1000 chars
content = f"small line\n{oversized_line}\nanother small line"
chunks = ingestor._chunk_content(content, chunk_size=500)
# Should still include the oversized line in a chunk
assert len(chunks) >= 1
assert any(oversized_line in chunk for chunk in chunks)
def test_build_repomix_command_avoids_empty_patterns() -> None:
"""Test that empty patterns are filtered out."""
config = RepomixConfig(
include_patterns=["*.py", "", "*.md"],
exclude_patterns=["node_modules", "", "*.tmp"],
)
ingestor = RepomixIngestor(config)
from pathlib import Path
cmd = ingestor._build_repomix_command(Path("/tmp"), Path("/tmp/out.md"))
# Should not include empty patterns
cmd_str = " ".join(cmd)
assert "--include *.py" in cmd_str
assert "--include *.md" in cmd_str
assert "--exclude node_modules" in cmd_str
assert "--exclude *.tmp" in cmd_str
# Should not have double spaces indicating empty patterns were filtered
assert "--include " not in cmd_str
assert "--exclude " not in cmd_str
def test_extract_code_surface_handles_empty_extraction() -> None:
"""Test behavior when code surface extraction yields minimal content."""
config = RepomixConfig()
ingestor = RepomixIngestor(config)
# Content with no meaningful patterns
content = "# Just comments\n# More comments\n\n\n"
result = ingestor._extract_code_surface("test.py", content)
# Should fall back to first 2KB when extraction is minimal
assert len(result) <= 2000
assert result == content[:2000]
def test_calculate_file_importance_edge_cases() -> None:
"""Test file importance calculation edge cases."""
# Test with empty content
score = RepomixIngestor._calculate_file_importance("", "")
assert score == 0.3 # Default low score
# Test with unusual file path
score = RepomixIngestor._calculate_file_importance("./weird/path//file.py", "")
assert score == 0.7 # Python file score
# Test file with multiple matching criteria (API wins over test)
score = RepomixIngestor._calculate_file_importance("tests/api_test.py", "def test_api():")
assert abs(score - 0.85) < 0.01 # API files get highest priority
# Test regular test file
score = RepomixIngestor._calculate_file_importance("tests/test_utils.py", "def test_helper():")
assert abs(score - 0.21) < 0.01 # 0.7 * 0.3 for test files

View File

@@ -45,7 +45,9 @@ def r2r_client_stub(
status_code=self.status_code,
request=mock_request,
)
raise httpx.HTTPStatusError("HTTP error", request=mock_request, response=mock_response)
raise httpx.HTTPStatusError(
"HTTP error", request=mock_request, response=mock_response
)
class MockAsyncClient:
def __init__(self, service: Any) -> None:

View File

@@ -133,14 +133,17 @@ async def test_multi_storage_adapter_reports_replication_failure(document_factor
def test_storage_manager_build_multi_storage_adapter_deduplicates(document_factory) -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
weaviate_config = StorageConfig(
@@ -167,14 +170,17 @@ def test_storage_manager_build_multi_storage_adapter_deduplicates(document_facto
def test_storage_manager_build_multi_storage_adapter_missing_backend() -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
with pytest.raises(ValueError):
@@ -183,14 +189,17 @@ def test_storage_manager_build_multi_storage_adapter_missing_backend() -> None:
@pytest.mark.asyncio
async def test_storage_manager_search_across_backends_groups_results(document_factory) -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
document_weaviate = document_factory(
@@ -228,7 +237,9 @@ async def test_storage_manager_search_across_backends_groups_results(document_fa
@pytest.mark.asyncio
async def test_multi_storage_adapter_store_batch_replicates_to_all_backends(document_factory) -> None:
async def test_multi_storage_adapter_store_batch_replicates_to_all_backends(
document_factory,
) -> None:
primary_config = StorageConfig(
backend=StorageBackend.WEAVIATE,
endpoint="http://weaviate.local",
@@ -280,14 +291,17 @@ async def test_multi_storage_adapter_delete_reports_secondary_failures() -> None
@pytest.mark.asyncio
async def test_storage_manager_initialize_all_backends_registers_capabilities(monkeypatch) -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key="key",
openwebui_endpoint="http://chat.local",
openwebui_api_key="token",
r2r_endpoint="http://r2r.local",
r2r_api_key="secret",
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key="key",
openwebui_endpoint="http://chat.local",
openwebui_api_key="token",
r2r_endpoint="http://r2r.local",
r2r_api_key="secret",
),
)
manager = StorageManager(settings)
monkeypatch.setattr(
@@ -324,14 +338,17 @@ async def test_storage_manager_initialize_all_backends_registers_capabilities(mo
@pytest.mark.asyncio
async def test_storage_manager_initialize_all_backends_handles_missing_config() -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint=None,
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint=None,
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
results = await manager.initialize_all_backends()
@@ -345,14 +362,17 @@ async def test_storage_manager_initialize_all_backends_handles_missing_config()
@pytest.mark.asyncio
async def test_storage_manager_get_all_collections_merges_counts_and_backends() -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
weaviate_storage = CollectionStubStorage(
@@ -390,14 +410,17 @@ async def test_storage_manager_get_all_collections_merges_counts_and_backends()
@pytest.mark.asyncio
async def test_storage_manager_get_backend_status_reports_failures() -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
healthy_storage = CollectionStubStorage(
@@ -436,14 +459,17 @@ async def test_storage_manager_get_backend_status_reports_failures() -> None:
@pytest.mark.asyncio
async def test_storage_manager_close_all_clears_state() -> None:
settings = cast(Settings, SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
))
settings = cast(
Settings,
SimpleNamespace(
weaviate_endpoint="http://weaviate.local",
weaviate_api_key=None,
openwebui_endpoint="http://chat.local",
openwebui_api_key=None,
r2r_endpoint=None,
r2r_api_key=None,
),
)
manager = StorageManager(settings)
closable_storage = ClosableStubStorage(

30
uv.lock generated
View File

@@ -236,6 +236,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/25/2f/efa9d26dbb612b774990741fd8f13c7cf4cfd085b870e4a5af5c82eaf5f1/authlib-1.6.3-py2.py3-none-any.whl", hash = "sha256:7ea0f082edd95a03b7b72edac65ec7f8f68d703017d7e37573aee4fc603f2a48", size = 240105, upload-time = "2025-08-26T12:13:23.889Z" },
]
[[package]]
name = "basedpyright"
version = "1.31.4"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nodejs-wheel-binaries" },
]
sdist = { url = "https://files.pythonhosted.org/packages/0b/53/570b03ec0445a9b2cc69788482c1d12902a9b88a9b159e449c4c537c4e3a/basedpyright-1.31.4.tar.gz", hash = "sha256:2450deb16530f7c88c1a7da04530a079f9b0b18ae1c71cb6f812825b3b82d0b1", size = 22494467, upload-time = "2025-09-03T13:05:55.817Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e5/40/d1047a5addcade9291685d06ef42a63c1347517018bafd82747af9da0294/basedpyright-1.31.4-py3-none-any.whl", hash = "sha256:055e4a38024bd653be12d6216c1cfdbee49a1096d342b4d5f5b4560f7714b6fc", size = 11731440, upload-time = "2025-09-03T13:05:52.308Z" },
]
[[package]]
name = "cachetools"
version = "6.2.0"
@@ -986,6 +998,7 @@ dependencies = [
[package.dev-dependencies]
dev = [
{ name = "basedpyright" },
{ name = "marimo" },
{ name = "mypy" },
{ name = "pylance" },
@@ -1017,6 +1030,7 @@ requires-dist = [
[package.metadata.requires-dev]
dev = [
{ name = "basedpyright", specifier = ">=1.31.4" },
{ name = "marimo", specifier = ">=0.16.0" },
{ name = "mypy", specifier = ">=1.7.0" },
{ name = "pylance", specifier = ">=0.36.0" },
@@ -1591,6 +1605,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a0/c4/c2971a3ba4c6103a3d10c4b0f24f461ddc027f0f09763220cf35ca1401b3/nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c", size = 5195, upload-time = "2024-01-21T14:25:17.223Z" },
]
[[package]]
name = "nodejs-wheel-binaries"
version = "22.19.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/bd/ca/6033f80b7aebc23cb31ed8b09608b6308c5273c3522aedd043e8a0644d83/nodejs_wheel_binaries-22.19.0.tar.gz", hash = "sha256:e69b97ef443d36a72602f7ed356c6a36323873230f894799f4270a853932fdb3", size = 8060, upload-time = "2025-09-12T10:33:46.935Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/93/a2/0d055fd1d8c9a7a971c4db10cf42f3bba57c964beb6cf383ca053f2cdd20/nodejs_wheel_binaries-22.19.0-py2.py3-none-macosx_11_0_arm64.whl", hash = "sha256:43eca1526455a1fb4cb777095198f7ebe5111a4444749c87f5c2b84645aaa72a", size = 50902454, upload-time = "2025-09-12T10:33:18.3Z" },
{ url = "https://files.pythonhosted.org/packages/b5/f5/446f7b3c5be1d2f5145ffa3c9aac3496e06cdf0f436adeb21a1f95dd79a7/nodejs_wheel_binaries-22.19.0-py2.py3-none-macosx_11_0_x86_64.whl", hash = "sha256:feb06709e1320790d34babdf71d841ec7f28e4c73217d733e7f5023060a86bfc", size = 51837860, upload-time = "2025-09-12T10:33:21.599Z" },
{ url = "https://files.pythonhosted.org/packages/1e/4e/d0a036f04fd0f5dc3ae505430657044b8d9853c33be6b2d122bb171aaca3/nodejs_wheel_binaries-22.19.0-py2.py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:db9f5777292491430457c99228d3a267decf12a09d31246f0692391e3513285e", size = 57841528, upload-time = "2025-09-12T10:33:25.433Z" },
{ url = "https://files.pythonhosted.org/packages/e2/11/4811d27819f229cc129925c170db20c12d4f01ad366a0066f06d6eb833cf/nodejs_wheel_binaries-22.19.0-py2.py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1392896f1a05a88a8a89b26e182d90fdf3020b4598a047807b91b65731e24c00", size = 58368815, upload-time = "2025-09-12T10:33:29.083Z" },
{ url = "https://files.pythonhosted.org/packages/6e/94/df41416856b980e38a7ff280cfb59f142a77955ccdbec7cc4260d8ab2e78/nodejs_wheel_binaries-22.19.0-py2.py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:9164c876644f949cad665e3ada00f75023e18f381e78a1d7b60ccbbfb4086e73", size = 59690937, upload-time = "2025-09-12T10:33:32.771Z" },
{ url = "https://files.pythonhosted.org/packages/d1/39/8d0d5f84b7616bdc4eca725f5d64a1cfcac3d90cf3f30cae17d12f8e987f/nodejs_wheel_binaries-22.19.0-py2.py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:6b4b75166134010bc9cfebd30dc57047796a27049fef3fc22316216d76bc0af7", size = 60751996, upload-time = "2025-09-12T10:33:36.962Z" },
{ url = "https://files.pythonhosted.org/packages/41/93/2d66b5b60055dd1de6e37e35bef563c15e4cafa5cfe3a6990e0ab358e515/nodejs_wheel_binaries-22.19.0-py2.py3-none-win_amd64.whl", hash = "sha256:3f271f5abfc71b052a6b074225eca8c1223a0f7216863439b86feaca814f6e5a", size = 40026140, upload-time = "2025-09-12T10:33:40.33Z" },
{ url = "https://files.pythonhosted.org/packages/a3/46/c9cf7ff7e3c71f07ca8331c939afd09b6e59fc85a2944ea9411e8b29ce50/nodejs_wheel_binaries-22.19.0-py2.py3-none-win_arm64.whl", hash = "sha256:666a355fe0c9bde44a9221cd543599b029045643c8196b8eedb44f28dc192e06", size = 38804500, upload-time = "2025-09-12T10:33:43.302Z" },
]
[[package]]
name = "numpy"
version = "2.3.3"