- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization - Added four new blocking hooks to prevent common error handling anti-patterns: - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption - `block-default
567 lines
24 KiB
Python
567 lines
24 KiB
Python
"""End-to-end integration tests for summarization.
|
|
|
|
Tests the complete summarization workflow with database persistence:
|
|
- Summary generation with real database
|
|
- Force regeneration behavior
|
|
- Placeholder fallback when summarization fails
|
|
- Summary persistence and retrieval
|
|
- Key points and action items
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import grpc
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import ActionItem, KeyPoint, Meeting, Segment, Summary
|
|
from noteflow.domain.summarization import SummarizationResult
|
|
from noteflow.domain.value_objects import MeetingId
|
|
from noteflow.grpc.config.config import ServicesConfig
|
|
from noteflow.grpc.proto import noteflow_pb2
|
|
from noteflow.grpc.service import NoteFlowServicer
|
|
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
|
|
|
|
if TYPE_CHECKING:
|
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
|
|
|
|
|
|
class MockContext:
|
|
"""Mock gRPC context for testing."""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize mock context."""
|
|
self.aborted = False
|
|
self.abort_code: grpc.StatusCode | None = None
|
|
self.abort_details: str | None = None
|
|
|
|
async def abort(self, code: grpc.StatusCode, details: str) -> None:
|
|
"""Record abort and raise to simulate gRPC behavior."""
|
|
self.aborted = True
|
|
self.abort_code = code
|
|
self.abort_details = details
|
|
raise grpc.RpcError()
|
|
|
|
|
|
async def _create_test_meeting(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
title: str,
|
|
) -> Meeting:
|
|
"""Create a test meeting in the database."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title=title)
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
|
|
def _create_capturing_service() -> tuple[MagicMock, list[str | None]]:
|
|
"""Create a mock service that captures style_prompt values."""
|
|
captured: list[str | None] = []
|
|
|
|
async def capturing_summarize(
|
|
meeting_id: MeetingId, segments: list[Segment], style_prompt: str | None = None, **_: object
|
|
) -> SummarizationResult:
|
|
captured.append(style_prompt)
|
|
return SummarizationResult(
|
|
summary=Summary(meeting_id=meeting_id, executive_summary="Test summary"),
|
|
model_name="mock", provider_name="mock",
|
|
)
|
|
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock(side_effect=capturing_summarize)
|
|
return mock_service, captured
|
|
|
|
|
|
def _create_mocksummarization_service(mock_summary: Summary) -> MagicMock:
|
|
"""Create a mock summarization service returning a fixed summary."""
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock(
|
|
return_value=SummarizationResult(
|
|
summary=mock_summary, model_name="mock-model", provider_name="mock"
|
|
)
|
|
)
|
|
return mock_service
|
|
|
|
|
|
def _create_summary_with_action_items(meeting_id: MeetingId) -> Summary:
|
|
"""Create a summary with action items for testing."""
|
|
from datetime import datetime, timedelta
|
|
|
|
due_date = datetime.now() + timedelta(days=7)
|
|
return Summary(
|
|
meeting_id=meeting_id,
|
|
executive_summary="Summary with actions",
|
|
action_items=[
|
|
ActionItem(text="Action 1", assignee="Alice", priority=1),
|
|
ActionItem(text="Action 2", assignee="Bob", due_date=due_date, priority=2),
|
|
],
|
|
)
|
|
|
|
|
|
async def _verify_action_items_persisted(
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
meeting_id: MeetingId,
|
|
) -> None:
|
|
"""Verify action items were persisted correctly."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
saved = await uow.summaries.get_by_meeting(meeting_id)
|
|
assert saved is not None, "Summary should be saved"
|
|
assert len(saved.action_items) == 2, "Should have 2 action items"
|
|
assert saved.action_items[0].assignee == "Alice", "First action item assignee should match"
|
|
assert saved.action_items[1].priority == 2, "Second action item priority should match"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSummarizationGeneration:
|
|
"""Integration tests for summary generation."""
|
|
|
|
async def test_generate_summary_with_style_options(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test that proto SummarizationOptions are extracted and passed as style_prompt."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Style Options Test")
|
|
await uow.meetings.create(meeting)
|
|
await uow.segments.add(meeting.id, Segment(0, "Test.", 0.0, 5.0))
|
|
await uow.commit()
|
|
|
|
mock_service, captured = _create_capturing_service()
|
|
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(
|
|
meeting_id=str(meeting.id),
|
|
options=noteflow_pb2.SummarizationOptions(tone="professional", format="bullet_points", verbosity="detailed"),
|
|
)
|
|
await servicer.GenerateSummary(request, MockContext())
|
|
|
|
style_prompt = captured[0]
|
|
assert style_prompt is not None, "style_prompt should be set when options are provided"
|
|
assert all(
|
|
kw in style_prompt.lower() for kw in ("formal", "bullet", "comprehensive")
|
|
), f"style_prompt should contain tone/format/verbosity keywords, got: {style_prompt}"
|
|
|
|
async def test_generate_summary_without_options_passes_none(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test that missing options results in None style_prompt."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="No Options Test")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
mock_service, captured = _create_capturing_service()
|
|
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
|
|
|
|
await servicer.GenerateSummary(
|
|
noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext()
|
|
)
|
|
|
|
assert captured[0] is None, f"style_prompt should be None when no options provided, got: {captured[0]}"
|
|
|
|
async def test_generate_summary_withsummarization_service(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary generation using SummarizationService."""
|
|
meeting = await self._create_meeting_with_segments(session_factory, meetings_dir)
|
|
servicer = self._create_servicer_with_mock_summary(session_factory, meeting.id)
|
|
|
|
result = await servicer.GenerateSummary(
|
|
noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext()
|
|
)
|
|
assert result.executive_summary == "This meeting discussed important content.", "Executive summary should match"
|
|
assert len(result.key_points) == 2, "Should have 2 key points"
|
|
assert len(result.action_items) == 1, "Should have 1 action item"
|
|
await self._verify_summary_persisted(
|
|
session_factory, meetings_dir, meeting.id, "This meeting discussed important content."
|
|
)
|
|
|
|
async def _create_meeting_with_segments(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
) -> Meeting:
|
|
"""Create a meeting with test segments."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Summary Test Meeting")
|
|
await uow.meetings.create(meeting)
|
|
await self._add_test_segments(uow, meeting.id, count=3)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
def _create_servicer_with_mock_summary(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meeting_id: MeetingId,
|
|
) -> NoteFlowServicer:
|
|
"""Create servicer with mock summarization service."""
|
|
mock_summary = self._create_mock_summary(meeting_id)
|
|
mock_service = _create_mocksummarization_service(mock_summary)
|
|
return NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=mock_service),
|
|
)
|
|
|
|
async def _verify_summary_persisted(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
meeting_id: MeetingId,
|
|
expected_summary: str,
|
|
) -> None:
|
|
"""Verify summary was persisted with expected content."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
saved = await uow.summaries.get_by_meeting(meeting_id)
|
|
assert saved is not None, "Summary should be persisted to database"
|
|
assert (
|
|
saved.executive_summary == expected_summary
|
|
), f"expected '{expected_summary}', got '{saved.executive_summary}'"
|
|
|
|
async def _create_meeting_with_segment(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
title: str,
|
|
segment_text: str,
|
|
) -> Meeting:
|
|
"""Create a meeting with one segment."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title=title)
|
|
await uow.meetings.create(meeting)
|
|
segment = Segment(
|
|
segment_id=0,
|
|
text=segment_text,
|
|
start_time=0.0,
|
|
end_time=5.0,
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
return meeting
|
|
|
|
async def _add_test_segments(
|
|
self, uow: SqlAlchemyUnitOfWork, meeting_id: MeetingId, count: int
|
|
) -> None:
|
|
"""Helper to add test segments."""
|
|
for i in range(count):
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"This is segment {i} with important content.",
|
|
start_time=float(i * 10),
|
|
end_time=float((i + 1) * 10),
|
|
)
|
|
await uow.segments.add(meeting_id, segment)
|
|
|
|
def _create_mock_summary(self, meeting_id: MeetingId) -> Summary:
|
|
"""Helper to create mock summary."""
|
|
return Summary(
|
|
meeting_id=meeting_id,
|
|
executive_summary="This meeting discussed important content.",
|
|
key_points=[
|
|
KeyPoint(text="Point 1", segment_ids=[0]),
|
|
KeyPoint(text="Point 2", segment_ids=[1]),
|
|
],
|
|
action_items=[ActionItem(text="Action 1", assignee="Alice")],
|
|
provider_name="test-model",
|
|
model_name="v1",
|
|
)
|
|
|
|
async def test_generate_summary_returns_existing_without_force(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary generation returns existing summary without force flag."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Existing Summary Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
existing_summary = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="Existing summary content",
|
|
)
|
|
await uow.summaries.save(existing_summary)
|
|
await uow.commit()
|
|
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock()
|
|
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=mock_service),
|
|
)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(
|
|
meeting_id=str(meeting.id),
|
|
force_regenerate=False,
|
|
)
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert (
|
|
result.executive_summary == "Existing summary content"
|
|
), f"expected 'Existing summary content', got '{result.executive_summary}'"
|
|
mock_service.summarize.assert_not_called()
|
|
|
|
async def test_generate_summary_regenerates_with_force_flag(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary regeneration when force flag is set."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Force Regenerate Test")
|
|
await uow.meetings.create(meeting)
|
|
await uow.segments.add(meeting.id, Segment(0, "Content for regeneration", 0.0, 10.0))
|
|
await uow.summaries.save(Summary(meeting_id=meeting.id, executive_summary="Old summary"))
|
|
await uow.commit()
|
|
|
|
new_summary = Summary(meeting_id=meeting.id, executive_summary="New regenerated summary")
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock(
|
|
return_value=SummarizationResult(summary=new_summary, model_name="mock", provider_name="mock")
|
|
)
|
|
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id), force_regenerate=True)
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert result.executive_summary == "New regenerated summary", f"got '{result.executive_summary}'"
|
|
mock_service.summarize.assert_called_once()
|
|
|
|
async def test_generate_summary_placeholder_fallback(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test placeholder summary when summarization service not configured."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Placeholder Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
for i in range(2):
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"Segment {i} text content",
|
|
start_time=float(i * 5),
|
|
end_time=float((i + 1) * 5),
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=None),
|
|
)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert (
|
|
"Segment 0" in result.executive_summary or "Segment 1" in result.executive_summary
|
|
), f"placeholder summary should contain segment text, got: {result.executive_summary}"
|
|
assert (
|
|
result.model_version == "placeholder/v0"
|
|
), f"expected model_version 'placeholder/v0', got '{result.model_version}'"
|
|
|
|
def _create_error_service(self) -> MagicMock:
|
|
"""Create a mock service that raises ProviderUnavailableError."""
|
|
from noteflow.domain.summarization import ProviderUnavailableError
|
|
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock(
|
|
side_effect=ProviderUnavailableError("All providers failed")
|
|
)
|
|
return mock_service
|
|
|
|
async def test_generate_summary_placeholder_on_service_error(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test placeholder summary when summarization service fails."""
|
|
meeting = await self._create_meeting_with_segment(
|
|
session_factory, meetings_dir, "Error Fallback Test",
|
|
"Content that should appear in placeholder"
|
|
)
|
|
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=self._create_error_service()),
|
|
)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert (
|
|
"Content that should appear" in result.executive_summary
|
|
), f"placeholder summary should contain segment text, got: {result.executive_summary}"
|
|
assert (
|
|
result.model_version == "placeholder/v0"
|
|
), f"expected model_version 'placeholder/v0', got '{result.model_version}'"
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSummarizationPersistence:
|
|
"""Integration tests for summary persistence."""
|
|
|
|
async def test_summary_with_key_points_persisted(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary with key points is fully persisted."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Key Points Test")
|
|
await uow.meetings.create(meeting)
|
|
|
|
for i in range(3):
|
|
segment = Segment(
|
|
segment_id=i,
|
|
text=f"Segment {i}",
|
|
start_time=float(i),
|
|
end_time=float(i + 1),
|
|
)
|
|
await uow.segments.add(meeting.id, segment)
|
|
await uow.commit()
|
|
|
|
summary = Summary(
|
|
meeting_id=meeting.id, executive_summary="Executive summary",
|
|
key_points=[KeyPoint(text="Key point 1", segment_ids=[0]), KeyPoint(text="Key point 2", segment_ids=[1, 2]), KeyPoint(text="Key point 3", segment_ids=[])],
|
|
)
|
|
mock_service = MagicMock()
|
|
mock_service.summarize = AsyncMock(return_value=SummarizationResult(summary=summary, model_name="mock", provider_name="mock"))
|
|
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(summarization_service=mock_service))
|
|
await servicer.GenerateSummary(noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id)), MockContext())
|
|
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
saved = await uow.summaries.get_by_meeting(meeting.id)
|
|
assert saved is not None, "Summary should be persisted"
|
|
assert len(saved.key_points) == 3, f"expected 3 key points, got {len(saved.key_points)}"
|
|
assert saved.key_points[0].text == "Key point 1", f"got '{saved.key_points[0].text}'"
|
|
assert saved.key_points[1].segment_ids == [1, 2], f"got {saved.key_points[1].segment_ids}"
|
|
|
|
async def test_summary_with_action_items_persisted(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary with action items is fully persisted."""
|
|
meeting = await _create_test_meeting(session_factory, meetings_dir, "Action Items Test")
|
|
summary = _create_summary_with_action_items(meeting.id)
|
|
|
|
mock_service: MagicMock = _create_mocksummarization_service(summary)
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=mock_service),
|
|
)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
|
|
await servicer.GenerateSummary(request, MockContext())
|
|
await _verify_action_items_persisted(session_factory, meetings_dir, meeting.id)
|
|
|
|
async def _save_old_summary(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
meeting_id: MeetingId,
|
|
) -> None:
|
|
"""Save an old summary for regeneration testing."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
old_summary = Summary(
|
|
meeting_id=meeting_id,
|
|
executive_summary="Old summary",
|
|
key_points=[KeyPoint(text="Old point")],
|
|
action_items=[ActionItem(text="Old action")],
|
|
)
|
|
await uow.summaries.save(old_summary)
|
|
await uow.commit()
|
|
|
|
async def _verify_regenerated_summary(
|
|
self,
|
|
session_factory: async_sessionmaker[AsyncSession],
|
|
meetings_dir: Path,
|
|
meeting_id: MeetingId,
|
|
) -> None:
|
|
"""Verify regenerated summary replaced the old one."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
saved = await uow.summaries.get_by_meeting(meeting_id)
|
|
assert saved is not None, "Summary should be saved"
|
|
assert saved.executive_summary == "New summary", "Executive summary should be replaced"
|
|
assert len(saved.key_points) == 2, "Should have 2 key points"
|
|
assert len(saved.action_items) == 0, "Should have no action items"
|
|
|
|
async def test_regeneration_replaces_existing_summary(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test regeneration replaces existing summary completely."""
|
|
meeting = await _create_test_meeting(session_factory, meetings_dir, "Replace Test")
|
|
await self._save_old_summary(session_factory, meetings_dir, meeting.id)
|
|
|
|
new_summary = Summary(
|
|
meeting_id=meeting.id,
|
|
executive_summary="New summary",
|
|
key_points=[KeyPoint(text="New point 1"), KeyPoint(text="New point 2")],
|
|
action_items=[],
|
|
)
|
|
mock_service: MagicMock = _create_mocksummarization_service(new_summary)
|
|
servicer = NoteFlowServicer(
|
|
session_factory=session_factory,
|
|
services=ServicesConfig(summarization_service=mock_service),
|
|
)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(
|
|
meeting_id=str(meeting.id),
|
|
force_regenerate=True,
|
|
)
|
|
await servicer.GenerateSummary(request, MockContext())
|
|
await self._verify_regenerated_summary(session_factory, meetings_dir, meeting.id)
|
|
|
|
|
|
@pytest.mark.integration
|
|
class TestSummarizationErrors:
|
|
"""Integration tests for summarization error handling."""
|
|
|
|
async def test_generate_summary_nonexistent_meeting(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test summary generation fails for nonexistent meeting."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
from uuid import uuid4
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(uuid4()))
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GenerateSummary(request, context)
|
|
|
|
assert (
|
|
context.abort_code == grpc.StatusCode.NOT_FOUND
|
|
), f"expected NOT_FOUND status, got {context.abort_code}"
|
|
|
|
async def test_generate_summary_invalid_meeting_id(
|
|
self, session_factory: async_sessionmaker[AsyncSession]
|
|
) -> None:
|
|
"""Test summary generation fails for invalid meeting ID."""
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
context = MockContext()
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id="not-a-uuid")
|
|
|
|
with pytest.raises(grpc.RpcError, match=r".*"):
|
|
await servicer.GenerateSummary(request, context)
|
|
|
|
assert (
|
|
context.abort_code == grpc.StatusCode.INVALID_ARGUMENT
|
|
), f"expected INVALID_ARGUMENT status, got {context.abort_code}"
|
|
|
|
async def test_generate_summary_empty_transcript(
|
|
self, session_factory: async_sessionmaker[AsyncSession], meetings_dir: Path
|
|
) -> None:
|
|
"""Test summary generation with no segments produces placeholder."""
|
|
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
|
|
meeting = Meeting.create(title="Empty Transcript")
|
|
await uow.meetings.create(meeting)
|
|
await uow.commit()
|
|
|
|
servicer = NoteFlowServicer(session_factory=session_factory)
|
|
|
|
request = noteflow_pb2.GenerateSummaryRequest(meeting_id=str(meeting.id))
|
|
result = await servicer.GenerateSummary(request, MockContext())
|
|
|
|
assert (
|
|
"No transcript available" in result.executive_summary
|
|
), f"empty transcript should produce 'No transcript available', got: {result.executive_summary}"
|