Files
noteflow/tests/integration/test_e2e_summarization.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

567 lines
24 KiB
Python

"""End-to-end integration tests for summarization.
Tests the complete summarization workflow with database persistence:
- Summary generation with real database
- Force regeneration behavior
- Placeholder fallback when summarization fails
- Summary persistence and retrieval
- Key points and action items
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, MagicMock
import grpc
import pytest
from noteflow.domain.entities import ActionItem, KeyPoint, Meeting, Segment, Summary
from noteflow.domain.summarization import SummarizationResult
from noteflow.domain.value_objects import MeetingId
from noteflow.grpc.config.config import ServicesConfig
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
async def _create_test_meeting(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
) -> Meeting:
"""Create a test meeting in the database."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
await uow.commit()
return meeting
def _create_capturing_service() -> tuple[MagicMock, list[str | None]]:
"""Create a mock service that captures style_prompt values."""
captured: list[str | None] = []
async def capturing_summarize(
meeting_id: MeetingId, segments: list[Segment], style_prompt: str | None = None, **_: object
) -> SummarizationResult:
captured.append(style_prompt)
return SummarizationResult(
summary=Summary(meeting_id=meeting_id, executive_summary="Test summary"),
model_name="mock", provider_name="mock",
)
mock_service = MagicMock()
mock_service.summarize = AsyncMock(side_effect=capturing_summarize)
return mock_service, captured
def _create_mocksummarization_service(mock_summary: Summary) -> MagicMock:
"""Create a mock summarization service returning a fixed summary."""
mock_service = MagicMock()
mock_service.summarize = AsyncMock(
return_value=SummarizationResult(
summary=mock_summary, model_name="mock-model", provider_name="mock"
)
)
return mock_service
def _create_summary_with_action_items(meeting_id: MeetingId) -> Summary:
"""Create a summary with action items for testing."""
from datetime import datetime, timedelta
due_date = datetime.now() + timedelta(days=7)
return Summary(
meeting_id=meeting_id,
executive_summary="Summary with actions",
action_items=[
ActionItem(text="Action 1", assignee="Alice", priority=1),
ActionItem(text="Action 2", assignee="Bob", due_date=due_date, priority=2),
],
)
async def _verify_action_items_persisted(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
) -> None:
"""Verify action items were persisted correctly."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
saved = await uow.summaries.get_by_meeting(meeting_id)
assert saved is not None, "Summary should be saved"
assert len(saved.action_items) == 2, "Should have 2 action items"
assert saved.action_items[0].assignee == "Alice", "First action item assignee should match"
assert saved.action_items[1].priority == 2, "Second action item priority should match"
@pytest.mark.integration
class TestSummarizationGeneration:
"""Integration tests for summary generation."""
async def test_generate_summary_with_style_options(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test that proto SummarizationOptions are extracted and passed as style_prompt."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Style Options Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Test.", 0.0, 5.0))
await uow.commit()
mock_service, captured = _create_capturing_service()
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
request = noteflow_pb2.GenerateSummaryRequest(
meeting_id=str(meeting.id),
options=noteflow_pb2.SummarizationOptions(tone="professional", format="bullet_points", verbosity="detailed"),
)
await servicer.GenerateSummary(request, MockContext())
style_prompt = captured[0]
assert style_prompt is not None, "style_prompt should be set when options are provided"
assert all(
kw in style_prompt.lower() for kw in ("formal", "bullet", "comprehensive")
), f"style_prompt should contain tone/format/verbosity keywords, got: {style_prompt}"
async def test_generate_summary_without_options_passes_none(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test that missing options results in None style_prompt."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="No Options Test")
await uow.meetings.create(meeting)
await uow.commit()
mock_service, captured = _create_capturing_service()
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
await servicer.GenerateSummary(
noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext()
)
assert captured[0] is None, f"style_prompt should be None when no options provided, got: {captured[0]}"
async def test_generate_summary_withsummarization_service(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary generation using SummarizationService."""
meeting = await self._create_meeting_with_segments(session_factory, meetings_dir)
servicer = self._create_servicer_with_mock_summary(session_factory, meeting.id)
result = await servicer.GenerateSummary(
noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext()
)
assert result.executive_summary == "This meeting discussed important content.", "Executive summary should match"
assert len(result.key_points) == 2, "Should have 2 key points"
assert len(result.action_items) == 1, "Should have 1 action item"
await self._verify_summary_persisted(
session_factory, meetings_dir, meeting.id, "This meeting discussed important content."
)
async def _create_meeting_with_segments(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> Meeting:
"""Create a meeting with test segments."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Summary Test Meeting")
await uow.meetings.create(meeting)
await self._add_test_segments(uow, meeting.id, count=3)
await uow.commit()
return meeting
def _create_servicer_with_mock_summary(
self,
session_factory: async_sessionmaker[AsyncSession],
meeting_id: MeetingId,
) -> NoteFlowServicer:
"""Create servicer with mock summarization service."""
mock_summary = self._create_mock_summary(meeting_id)
mock_service = _create_mocksummarization_service(mock_summary)
return NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=mock_service),
)
async def _verify_summary_persisted(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
expected_summary: str,
) -> None:
"""Verify summary was persisted with expected content."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
saved = await uow.summaries.get_by_meeting(meeting_id)
assert saved is not None, "Summary should be persisted to database"
assert (
saved.executive_summary == expected_summary
), f"expected '{expected_summary}', got '{saved.executive_summary}'"
async def _create_meeting_with_segment(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
segment_text: str,
) -> Meeting:
"""Create a meeting with one segment."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
await uow.meetings.create(meeting)
segment = Segment(
segment_id=0,
text=segment_text,
start_time=0.0,
end_time=5.0,
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
return meeting
async def _add_test_segments(
self, uow: SqlAlchemyUnitOfWork, meeting_id: MeetingId, count: int
) -> None:
"""Helper to add test segments."""
for i in range(count):
segment = Segment(
segment_id=i,
text=f"This is segment {i} with important content.",
start_time=float(i * 10),
end_time=float((i + 1) * 10),
)
await uow.segments.add(meeting_id, segment)
def _create_mock_summary(self, meeting_id: MeetingId) -> Summary:
"""Helper to create mock summary."""
return Summary(
meeting_id=meeting_id,
executive_summary="This meeting discussed important content.",
key_points=[
KeyPoint(text="Point 1", segment_ids=[0]),
KeyPoint(text="Point 2", segment_ids=[1]),
],
action_items=[ActionItem(text="Action 1", assignee="Alice")],
provider_name="test-model",
model_name="v1",
)
async def test_generate_summary_returns_existing_without_force(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary generation returns existing summary without force flag."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Existing Summary Test")
await uow.meetings.create(meeting)
existing_summary = Summary(
meeting_id=meeting.id,
executive_summary="Existing summary content",
)
await uow.summaries.save(existing_summary)
await uow.commit()
mock_service = MagicMock()
mock_service.summarize = AsyncMock()
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=mock_service),
)
request = noteflow_pb2.GenerateSummaryRequest(
meeting_id=str(meeting.id),
force_regenerate=False,
)
result = await servicer.GenerateSummary(request, MockContext())
assert (
result.executive_summary == "Existing summary content"
), f"expected 'Existing summary content', got '{result.executive_summary}'"
mock_service.summarize.assert_not_called()
async def test_generate_summary_regenerates_with_force_flag(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary regeneration when force flag is set."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Force Regenerate Test")
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Content for regeneration", 0.0, 10.0))
await uow.summaries.save(Summary(meeting_id=meeting.id, executive_summary="Old summary"))
await uow.commit()
new_summary = Summary(meeting_id=meeting.id, executive_summary="New regenerated summary")
mock_service = MagicMock()
mock_service.summarize = AsyncMock(
return_value=SummarizationResult(summary=new_summary, model_name="mock", provider_name="mock")
)
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id), force_regenerate=True)
result = await servicer.GenerateSummary(request, MockContext())
assert result.executive_summary == "New regenerated summary", f"got '{result.executive_summary}'"
mock_service.summarize.assert_called_once()
async def test_generate_summary_placeholder_fallback(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test placeholder summary when summarization service not configured."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Placeholder Test")
await uow.meetings.create(meeting)
for i in range(2):
segment = Segment(
segment_id=i,
text=f"Segment {i} text content",
start_time=float(i * 5),
end_time=float((i + 1) * 5),
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=None),
)
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
result = await servicer.GenerateSummary(request, MockContext())
assert (
"Segment 0" in result.executive_summary or "Segment 1" in result.executive_summary
), f"placeholder summary should contain segment text, got: {result.executive_summary}"
assert (
result.model_version == "placeholder/v0"
), f"expected model_version 'placeholder/v0', got '{result.model_version}'"
def _create_error_service(self) -> MagicMock:
"""Create a mock service that raises ProviderUnavailableError."""
from noteflow.domain.summarization import ProviderUnavailableError
mock_service = MagicMock()
mock_service.summarize = AsyncMock(
side_effect=ProviderUnavailableError("All providers failed")
)
return mock_service
async def test_generate_summary_placeholder_on_service_error(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test placeholder summary when summarization service fails."""
meeting = await self._create_meeting_with_segment(
session_factory, meetings_dir, "Error Fallback Test",
"Content that should appear in placeholder"
)
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=self._create_error_service()),
)
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
result = await servicer.GenerateSummary(request, MockContext())
assert (
"Content that should appear" in result.executive_summary
), f"placeholder summary should contain segment text, got: {result.executive_summary}"
assert (
result.model_version == "placeholder/v0"
), f"expected model_version 'placeholder/v0', got '{result.model_version}'"
@pytest.mark.integration
class TestSummarizationPersistence:
"""Integration tests for summary persistence."""
async def test_summary_with_key_points_persisted(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary with key points is fully persisted."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Key Points Test")
await uow.meetings.create(meeting)
for i in range(3):
segment = Segment(
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
)
await uow.segments.add(meeting.id, segment)
await uow.commit()
summary = Summary(
meeting_id=meeting.id, executive_summary="Executive summary",
key_points=[KeyPoint(text="Key point 1", segment_ids=[0]), KeyPoint(text="Key point 2", segment_ids=[1, 2]), KeyPoint(text="Key point 3", segment_ids=[])],
)
mock_service = MagicMock()
mock_service.summarize = AsyncMock(return_value=SummarizationResult(summary=summary, model_name="mock", provider_name="mock"))
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
await servicer.GenerateSummary(noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext())
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
saved = await uow.summaries.get_by_meeting(meeting.id)
assert saved is not None, "Summary should be persisted"
assert len(saved.key_points) == 3, f"expected 3 key points, got {len(saved.key_points)}"
assert saved.key_points[0].text == "Key point 1", f"got '{saved.key_points[0].text}'"
assert saved.key_points[1].segment_ids == [1, 2], f"got {saved.key_points[1].segment_ids}"
async def test_summary_with_action_items_persisted(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary with action items is fully persisted."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Action Items Test")
summary = _create_summary_with_action_items(meeting.id)
mock_service: MagicMock = _create_mocksummarization_service(summary)
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=mock_service),
)
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
await servicer.GenerateSummary(request, MockContext())
await _verify_action_items_persisted(session_factory, meetings_dir, meeting.id)
async def _save_old_summary(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
) -> None:
"""Save an old summary for regeneration testing."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
old_summary = Summary(
meeting_id=meeting_id,
executive_summary="Old summary",
key_points=[KeyPoint(text="Old point")],
action_items=[ActionItem(text="Old action")],
)
await uow.summaries.save(old_summary)
await uow.commit()
async def _verify_regenerated_summary(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
meeting_id: MeetingId,
) -> None:
"""Verify regenerated summary replaced the old one."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
saved = await uow.summaries.get_by_meeting(meeting_id)
assert saved is not None, "Summary should be saved"
assert saved.executive_summary == "New summary", "Executive summary should be replaced"
assert len(saved.key_points) == 2, "Should have 2 key points"
assert len(saved.action_items) == 0, "Should have no action items"
async def test_regeneration_replaces_existing_summary(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test regeneration replaces existing summary completely."""
meeting = await _create_test_meeting(session_factory, meetings_dir, "Replace Test")
await self._save_old_summary(session_factory, meetings_dir, meeting.id)
new_summary = Summary(
meeting_id=meeting.id,
executive_summary="New summary",
key_points=[KeyPoint(text="New point 1"), KeyPoint(text="New point 2")],
action_items=[],
)
mock_service: MagicMock = _create_mocksummarization_service(new_summary)
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(summarization_service=mock_service),
)
request = noteflow_pb2.GenerateSummaryRequest(
meeting_id=str(meeting.id),
force_regenerate=True,
)
await servicer.GenerateSummary(request, MockContext())
await self._verify_regenerated_summary(session_factory, meetings_dir, meeting.id)
@pytest.mark.integration
class TestSummarizationErrors:
"""Integration tests for summarization error handling."""
async def test_generate_summary_nonexistent_meeting(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test summary generation fails for nonexistent meeting."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
from uuid import uuid4
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(uuid4()))
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GenerateSummary(request, context)
assert (
context.abort_code == grpc.StatusCode.NOT_FOUND
), f"expected NOT_FOUND status, got {context.abort_code}"
async def test_generate_summary_invalid_meeting_id(
self, session_factory: async_sessionmaker[AsyncSession]
) -> None:
"""Test summary generation fails for invalid meeting ID."""
servicer = NoteFlowServicer(session_factory=session_factory)
context = MockContext()
request = noteflow_pb2.GenerateSummaryRequest(meeting_id="not-a-uuid")
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GenerateSummary(request, context)
assert (
context.abort_code == grpc.StatusCode.INVALID_ARGUMENT
), f"expected INVALID_ARGUMENT status, got {context.abort_code}"
async def test_generate_summary_empty_transcript(
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
) -> None:
"""Test summary generation with no segments produces placeholder."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Empty Transcript")
await uow.meetings.create(meeting)
await uow.commit()
servicer = NoteFlowServicer(session_factory=session_factory)
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
result = await servicer.GenerateSummary(request, MockContext())
assert (
"No transcript available" in result.executive_summary
), f"empty transcript should produce 'No transcript available', got: {result.executive_summary}"