Files
noteflow/tests/integration/test_e2e_ner.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

680 lines
27 KiB
Python

"""End-to-end integration tests for NER extraction.
Tests the complete NER workflow with database persistence:
- Entity extraction from meeting segments
- Persistence and retrieval
- Caching behavior
- Force refresh functionality
- Pin entity operations
"""
from __future__ import annotations
from collections.abc import Generator, Sequence
from pathlib import Path
from typing import TYPE_CHECKING, Final
from unittest.mock import MagicMock, patch
from uuid import UUID, uuid4
import grpc
import pytest
from noteflow.application.services.ner import NerService
from noteflow.domain.entities import Meeting, Segment
from noteflow.domain.entities.named_entity import EntityCategory, NamedEntity
from noteflow.domain.value_objects import MeetingId
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
# Test constants
DEFAULT_SEGMENT_COUNT: Final = 3
@pytest.fixture(autouse=True)
def mock_feature_flags() -> Generator[MagicMock, None, None]:
"""Mock feature flags to enable NER for all tests."""
mock_flags = MagicMock(ner_enabled=True)
with patch(
"noteflow.application.services.ner.service.get_feature_flags",
return_value=mock_flags,
):
yield mock_flags
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
class MockNerEngine:
"""Mock NER engine that returns controlled entities."""
def __init__(self, entities: list[NamedEntity] | None = None, *, ready: bool = False) -> None:
"""Initialize with predefined entities.
Args:
entities: List of entities to return from extraction.
ready: Initial ready state for the engine.
"""
self._entities = entities or []
self._ready = ready
self.extract_call_count = 0
self.extract_from_segments_call_count = 0
def set_ready(self, ready: bool = True) -> None:
"""Set the engine ready state.
Args:
ready: Whether the engine should be considered ready.
"""
self._ready = ready
def extract(self, text: str) -> list[NamedEntity]:
"""Extract entities from text (mock)."""
self._ready = True
self.extract_call_count += 1
return self._entities
def extract_from_segments(
self, segments: list[tuple[int, str]]
) -> list[NamedEntity]:
"""Extract entities from segments (mock)."""
self._ready = True
self.extract_from_segments_call_count += 1
return self._entities
def is_ready(self) -> bool:
"""Check if engine is ready."""
return self._ready
def _create_test_entity(
text: str,
category: EntityCategory = EntityCategory.PERSON,
segment_ids: list[int] | None = None,
confidence: float = 0.9,
) -> NamedEntity:
"""Create a test entity."""
return NamedEntity.create(
text=text,
category=category,
segment_ids=segment_ids or [0],
confidence=confidence,
)
def _make_segments(
segment_count: int,
text_template: str,
) -> list[Segment]:
"""Create segment objects for testing (helper to avoid loops in tests)."""
return [
Segment(
segment_id=i,
text=text_template.format(i=i),
start_time=float(i * 10),
end_time=float((i + 1) * 10),
)
for i in range(segment_count)
]
async def _add_segments_to_meeting(
uow: SqlAlchemyUnitOfWork,
meeting_id: MeetingId,
segments: list[Segment],
) -> None:
"""Add segments to a meeting (async helper to avoid loops in tests)."""
for segment in segments:
await uow.segments.add(meeting_id, segment)
async def _create_meeting_with_segments(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
segment_count: int = DEFAULT_SEGMENT_COUNT,
segment_text_template: str = "Segment {i} mentioning John Smith.",
) -> MeetingId:
"""Create a meeting with segments for testing.
Args:
session_factory: Database session factory.
meetings_dir: Directory for meeting files.
title: Meeting title.
segment_count: Number of segments to create.
segment_text_template: Template for segment text (uses {i} placeholder).
Returns:
The created meeting ID.
"""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
segments = _make_segments(segment_count, segment_text_template)
await _add_segments_to_meeting(uow, meeting.id, segments)
await uow.commit()
return meeting.id
async def _create_meeting_with_segment_text(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
segment_text: str,
) -> MeetingId:
"""Create a meeting with one segment using specific text."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, segment_text, 0.0, 5.0))
await uow.commit()
return meeting.id
async def _setup_ner_service_with_entities(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
entities: list[NamedEntity],
) -> NerService:
"""Set up NER service with mock engine and extract entities."""
mock_engine = MockNerEngine(entities=entities)
mock_engine.set_ready()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
await service.extract_entities(meeting_id)
return service
def _find_entity_id_by_text(entities: list[NamedEntity], text: str) -> UUID:
"""Find entity ID by text."""
return next(e.id for e in entities if e.text == text)
async def _delete_entity_and_verify_remaining(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
entity_id_to_delete: UUID,
expected_remaining_text: str,
) -> None:
"""Delete an entity and verify the remaining entity."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
await uow.entities.delete(entity_id_to_delete)
await uow.commit()
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
remaining = await uow.entities.get_by_meeting(meeting_id)
assert len(remaining) == 1, f"Expected 1 entity remaining after delete, got {len(remaining)}"
assert remaining[0].text == expected_remaining_text, f"Remaining entity text should be '{expected_remaining_text}', got '{remaining[0].text}'"
async def _extract_and_verify_entities(
service: NerService,
meeting_id: MeetingId,
expected_count: int,
) -> Sequence[NamedEntity]:
"""Extract entities and verify count."""
entities = await service.get_entities(meeting_id)
assert len(entities) == expected_count, f"Expected {expected_count} entities after extraction, got {len(entities)}"
return entities
def _create_ner_service(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
entities: list[NamedEntity],
) -> tuple[NerService, MockNerEngine]:
"""Create NER service with mock engine.
Args:
session_factory: Database session factory.
meetings_dir: Directory for meeting files.
entities: Entities for the mock engine to return.
Returns:
Tuple of (NerService, MockNerEngine).
"""
mock_engine = MockNerEngine(entities=entities)
mock_engine.set_ready()
def uow_factory() -> SqlAlchemyUnitOfWork:
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
return service, mock_engine
@pytest.mark.integration
class TestNerExtractionFlow:
"""Integration tests for entity extraction workflow."""
async def test_extract_entities_persists_to_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Extracted entities are persisted to database."""
meeting_id = await _create_meeting_with_segments(
session_factory, meetings_dir, "NER Test Meeting"
)
test_entity = _create_test_entity("John Smith", EntityCategory.PERSON, [0, 1, 2])
service, _ = _create_ner_service(session_factory, meetings_dir, [test_entity])
result = await service.extract_entities(meeting_id)
assert result.total_count == 1, "Should extract exactly one entity"
assert not result.cached, "First extraction should not be cached"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
entities = await uow.entities.get_by_meeting(meeting_id)
assert len(entities) == 1, "Should persist exactly one entity"
assert entities[0].text == "John Smith", "Entity text should match"
assert entities[0].category == EntityCategory.PERSON, "Category should be PERSON"
async def test_extract_entities_returns_cached_on_second_call(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Second extraction returns cached entities without re-extraction."""
meeting_id = await _create_meeting_with_segments(
session_factory, meetings_dir, "Cache Test",
segment_count=1, segment_text_template="John mentioned Acme Corp."
)
entities = [
_create_test_entity("John", EntityCategory.PERSON, [0]),
_create_test_entity("Acme Corp", EntityCategory.COMPANY, [0]),
]
service, mock_engine = _create_ner_service(session_factory, meetings_dir, entities)
result1 = await service.extract_entities(meeting_id)
assert result1.total_count == 2, "First extraction should find 2 entities"
assert not result1.cached, "First extraction should not be cached"
initial_count = mock_engine.extract_from_segments_call_count
result2 = await service.extract_entities(meeting_id)
assert result2.total_count == 2, "Second extraction should return same count"
assert result2.cached, "Second extraction should come from cache"
assert mock_engine.extract_from_segments_call_count == initial_count, (
"Engine should not be called again when using cache"
)
async def test_extract_entities_force_refresh_re_extracts(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Force refresh re-extracts and replaces cached entities."""
meeting_id = await _create_meeting_with_segments(
session_factory, meetings_dir, "Force Refresh Test",
segment_count=1, segment_text_template="Testing force refresh."
)
entities = [_create_test_entity("Test", EntityCategory.OTHER, [0])]
service, mock_engine = _create_ner_service(session_factory, meetings_dir, entities)
await service.extract_entities(meeting_id)
initial_count = mock_engine.extract_from_segments_call_count
result = await service.extract_entities(meeting_id, force_refresh=True)
assert not result.cached, "Force refresh should not return cached result"
assert mock_engine.extract_from_segments_call_count == initial_count + 1, (
f"Engine should be called again on force refresh, expected {initial_count + 1}, "
f"got {mock_engine.extract_from_segments_call_count}"
)
@pytest.mark.integration
class TestNerPersistence:
"""Integration tests for entity persistence operations."""
async def test_entities_persist_across_service_instances(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Entities persist in database across service instances."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Persistence Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Hello world.", 0.0, 5.0))
await uow.commit()
meeting_id = meeting.id
# Extract with first service instance
mock_engine = MockNerEngine(
entities=[_create_test_entity("World", EntityCategory.OTHER, [0])]
)
mock_engine.set_ready()
service1 = NerService(
mock_engine, lambda: SqlAlchemyUnitOfWork(session_factory, meetings_dir)
)
await service1.extract_entities(meeting_id)
# Create new service instance (simulating server restart)
service2 = NerService(
MockNerEngine(), lambda: SqlAlchemyUnitOfWork(session_factory, meetings_dir)
)
# Should get cached result without extraction
result = await service2.get_entities(meeting_id)
assert len(result) == 1, f"Expected 1 entity from cache, got {len(result)}"
assert result[0].text == "World", f"Expected entity text 'World', got '{result[0].text}'"
async def test_clear_entities_removes_all_for_meeting(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Clear entities removes all entities for meeting."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Clear Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Test content.", 0.0, 5.0))
await uow.commit()
meeting_id = meeting.id
mock_engine = MockNerEngine(
entities=[
_create_test_entity("Entity1", EntityCategory.PERSON, [0]),
_create_test_entity("Entity2", EntityCategory.COMPANY, [0]),
]
)
mock_engine.set_ready()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
await service.extract_entities(meeting_id)
deleted_count = await service.clear_entities(meeting_id)
assert deleted_count == 2, f"Expected 2 entities deleted, got {deleted_count}"
# Verify entities are gone
entities = await service.get_entities(meeting_id)
assert len(entities) == 0, f"Expected no entities after clear, got {len(entities)}"
@pytest.mark.integration
class TestNerPinning:
"""Integration tests for entity pinning operations."""
async def test_pin_entity_persists_pinned_state(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Pin entity updates and persists is_pinned flag."""
meeting_id = await _create_meeting_with_segments(
session_factory, meetings_dir, "Pin Test",
segment_count=1, segment_text_template="John Doe test."
)
entities = [_create_test_entity("John Doe", EntityCategory.PERSON, [0])]
service, _ = _create_ner_service(session_factory, meetings_dir, entities)
await service.extract_entities(meeting_id)
result_entities = await service.get_entities(meeting_id)
entity_id = result_entities[0].id
result = await service.pin_entity(entity_id, is_pinned=True)
assert result is True, "Pin operation should return True for existing entity"
pinned = await service.get_entities(meeting_id)
assert pinned[0].is_pinned is True, "Entity should be pinned after pin operation"
await service.pin_entity(entity_id, is_pinned=False)
unpinned = await service.get_entities(meeting_id)
assert unpinned[0].is_pinned is False, "Entity should be unpinned after unpin operation"
async def test_pin_entity_nonexistent_returns_false(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Pin entity returns False for nonexistent entity."""
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(MockNerEngine(), uow_factory)
result = await service.pin_entity(uuid4(), is_pinned=True)
assert result is False, "Pin operation should return False for nonexistent entity"
@pytest.mark.integration
class TestEntityMutations:
"""Integration tests for entity update and delete operations."""
async def test_update_entity_text(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Update entity changes text and normalized_text."""
meeting_id = await _create_meeting_with_segments(
session_factory, meetings_dir, "Update Test",
segment_count=1, segment_text_template="John Smith test."
)
entities = [_create_test_entity("John Smith", EntityCategory.PERSON, [0])]
service, _ = _create_ner_service(session_factory, meetings_dir, entities)
await service.extract_entities(meeting_id)
result_entities = await service.get_entities(meeting_id)
entity_id = result_entities[0].id
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.update(entity_id, text="Jonathan Smith")
await uow.commit()
assert updated is not None, "Update should return the entity"
assert updated.text == "Jonathan Smith", "Text should be updated"
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
persisted = await uow.entities.get(entity_id)
assert persisted is not None, "Updated entity should be persisted in database"
assert persisted.text == "Jonathan Smith", f"Persisted text should be 'Jonathan Smith', got '{persisted.text}'"
async def test_update_entity_category(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Update entity changes category."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Category Update Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Acme test.", 0.0, 5.0))
await uow.commit()
meeting_id = meeting.id
mock_engine = MockNerEngine(
entities=[_create_test_entity("Acme", EntityCategory.OTHER, [0])]
)
mock_engine.set_ready()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
await service.extract_entities(meeting_id)
entities = await service.get_entities(meeting_id)
entity_id = entities[0].id
# Update category
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.update(entity_id, category="company")
await uow.commit()
assert updated is not None, "Update should return the updated entity"
assert updated.category == EntityCategory.COMPANY, "Category should be updated"
async def test_update_entity_text_and_category(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Update entity can change both text and category in one call."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Dual Update Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Test entity.", 0.0, 5.0))
await uow.commit()
meeting_id = meeting.id
mock_engine = MockNerEngine(
entities=[_create_test_entity("TestEntity", EntityCategory.OTHER, [0])]
)
mock_engine.set_ready()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
await service.extract_entities(meeting_id)
entities = await service.get_entities(meeting_id)
entity_id = entities[0].id
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
updated = await uow.entities.update(
entity_id, text="New Product", category="product"
)
await uow.commit()
assert updated is not None, "Update should return the updated entity"
assert updated.text == "New Product", f"Text should be 'New Product', got '{updated.text}'"
assert updated.category == EntityCategory.PRODUCT, (
f"Category should be PRODUCT, got {updated.category}"
)
async def test_update_nonexistent_entity_returns_none(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Update on nonexistent entity returns None."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.entities.update(uuid4(), text="Doesn't exist")
assert result is None, "Update on nonexistent entity should return None"
async def test_delete_entity_removes_from_database(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Delete entity removes it from the database."""
meeting_id = await _create_meeting_with_segment_text(
session_factory, meetings_dir, "Delete Test", "Delete me."
)
service = await _setup_ner_service_with_entities(
session_factory,
meetings_dir,
meeting_id,
[_create_test_entity("ToDelete", EntityCategory.OTHER, [0])],
)
entities = await service.get_entities(meeting_id)
entity_id = entities[0].id
# Delete via repository
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
deleted = await uow.entities.delete(entity_id)
await uow.commit()
assert deleted is True, "Delete should return True for existing entity"
# Verify removed
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
persisted = await uow.entities.get(entity_id)
assert persisted is None, "Entity should be deleted"
async def test_delete_nonexistent_entity_returns_false(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Delete on nonexistent entity returns False."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
result = await uow.entities.delete(uuid4())
assert result is False, "Delete on nonexistent entity should return False"
async def test_delete_does_not_affect_other_entities(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Deleting one entity doesn't affect others in same meeting."""
meeting_id = await _create_meeting_with_segment_text(
session_factory, meetings_dir, "Multi-Delete Test", "John and Jane."
)
service = await _setup_ner_service_with_entities(
session_factory,
meetings_dir,
meeting_id,
[
_create_test_entity("John", EntityCategory.PERSON, [0]),
_create_test_entity("Jane", EntityCategory.PERSON, [0]),
],
)
entities = await _extract_and_verify_entities(service, meeting_id, 2)
john_id = _find_entity_id_by_text(entities, "John")
await _delete_entity_and_verify_remaining(
session_factory, meetings_dir, meeting_id, john_id, "Jane"
)
@pytest.mark.integration
class TestNerEdgeCases:
"""Integration tests for edge cases."""
async def test_extract_from_meeting_with_no_segments(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Extract from meeting with no segments returns empty result."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Empty Meeting")
await uow.meetings.create(meeting)
await uow.commit()
meeting_id = meeting.id
mock_engine = MockNerEngine()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
result = await service.extract_entities(meeting_id)
assert result.total_count == 0, f"Expected 0 entities for empty meeting, got {result.total_count}"
assert result.entities == [], f"Expected empty entities list, got {result.entities}"
assert not result.cached, "First extraction should not be cached"
async def test_extract_from_nonexistent_meeting_raises(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Extract from nonexistent meeting raises ValueError."""
mock_engine = MockNerEngine()
def uow_factory():
return SqlAlchemyUnitOfWork(session_factory, meetings_dir)
service = NerService(mock_engine, uow_factory)
nonexistent_id = MeetingId(uuid4())
with pytest.raises(ValueError, match=str(nonexistent_id)):
await service.extract_entities(nonexistent_id)
async def test_has_entities_reflects_extraction_state(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""has_entities returns correct state before and after extraction."""
meeting_id = await _create_meeting_with_segment_text(
session_factory, meetings_dir, "Has Entities Test", "Test."
)
service = await _setup_ner_service_with_entities(
session_factory,
meetings_dir,
meeting_id,
[_create_test_entity("Test", EntityCategory.OTHER, [0])],
)
assert await service.has_entities(meeting_id) is True, (
"has_entities should return True after extraction"
)
# After clearing
await service.clear_entities(meeting_id)
assert await service.has_entities(meeting_id) is False, (
"has_entities should return False after clearing"
)