Files
noteflow/tests/integration/test_memory_fallback.py
2026-01-06 10:21:43 +00:00

1079 lines
42 KiB
Python

"""Integration tests for memory fallback path.
Tests the in-memory storage and repository fallback behavior when
no database is configured:
- MeetingStore thread safety and CRUD operations
- MemoryUnitOfWork and memory repositories
- Unsupported repository operations (annotations, diarization, preferences)
- Feature flags and capability detection
- gRPC servicer behavior with memory backend
"""
from __future__ import annotations
import threading
from datetime import datetime
from uuid import uuid4
import grpc
import pytest
from noteflow.domain.entities import Meeting, Segment, Summary
from noteflow.domain.value_objects import AnnotationId, MeetingId, MeetingState
from noteflow.grpc.meeting_store import MeetingStore
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.memory import MemoryUnitOfWork
from noteflow.infrastructure.persistence.memory.repositories import (
MemoryMeetingRepository,
MemorySegmentRepository,
MemorySummaryRepository,
UnsupportedAnnotationRepository,
UnsupportedDiarizationJobRepository,
UnsupportedPreferencesRepository,
)
class MockContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
@pytest.mark.integration
class TestMeetingStoreBasicOperations:
"""Integration tests for MeetingStore basic CRUD operations."""
def test_create_meeting_assigns_unique_id(self) -> None:
"""Test meeting creation assigns unique IDs."""
store = MeetingStore()
meeting1 = store.create(title="Meeting 1")
meeting2 = store.create(title="Meeting 2")
assert meeting1.id != meeting2.id, f"meeting IDs should be unique, got {meeting1.id} twice"
assert meeting1.title == "Meeting 1", f"expected 'Meeting 1', got {meeting1.title!r}"
assert meeting2.title == "Meeting 2", f"expected 'Meeting 2', got {meeting2.title!r}"
def test_insert_and_get_meeting(self) -> None:
"""Test inserting and retrieving a meeting."""
store = MeetingStore()
meeting = Meeting.create(title="Inserted Meeting")
stored = store.insert(meeting)
assert stored.id == meeting.id, "stored ID should match original"
retrieved = store.get(str(meeting.id))
assert retrieved is not None, "retrieved meeting should exist"
assert retrieved.id == meeting.id, "retrieved ID should match"
assert retrieved.title == "Inserted Meeting", "retrieved title should match"
def test_get_nonexistent_meeting_returns_none(self) -> None:
"""Test getting nonexistent meeting returns None."""
store = MeetingStore()
result = store.get(str(uuid4()))
assert result is None, f"get should return None for nonexistent meeting, got {result}"
def test_update_meeting_in_store(self) -> None:
"""Test updating a meeting in MeetingStore."""
store = MeetingStore()
meeting = store.create(title="Original Title")
meeting.title = "Updated Title"
store.update(meeting)
retrieved = store.get(str(meeting.id))
assert retrieved is not None, "updated meeting should exist in store"
assert retrieved.title == "Updated Title", f"expected 'Updated Title', got {retrieved.title!r}"
def test_delete_meeting_from_store(self) -> None:
"""Test deleting a meeting from MeetingStore."""
store = MeetingStore()
meeting = store.create(title="To Delete")
result = store.delete(str(meeting.id))
assert result is True, "delete should return True for existing meeting"
assert store.get(str(meeting.id)) is None, "deleted meeting should not be retrievable"
def test_delete_nonexistent_returns_false(self) -> None:
"""Test deleting nonexistent meeting returns False."""
store = MeetingStore()
result = store.delete(str(uuid4()))
assert result is False, "delete should return False for nonexistent meeting"
@pytest.mark.integration
class TestMeetingStoreListingAndFiltering:
"""Integration tests for MeetingStore listing and filtering."""
def test_list_all_with_pagination(self) -> None:
"""Test listing meetings with pagination."""
store = MeetingStore()
for i in range(10):
store.create(title=f"Meeting {i}")
meetings, total = store.list_all(limit=3, offset=0)
assert len(meetings) == 3, f"expected page size of meetings, got {len(meetings)}"
assert total == 10, f"expected all meetings in total, got {total}"
def test_list_all_with_offset(self) -> None:
"""Test listing meetings with offset."""
store = MeetingStore()
for i in range(5):
store.create(title=f"Meeting {i}")
meetings, total = store.list_all(limit=10, offset=2)
assert len(meetings) == 3, f"expected remaining meetings after offset, got {len(meetings)}"
assert total == 5, f"expected all meetings in total, got {total}"
def test_list_all_filter_by_state(self) -> None:
"""Test listing meetings filtered by state."""
store = MeetingStore()
store.create(title="Created Meeting") # Created state
recording = store.create(title="Recording Meeting")
recording.start_recording()
store.update(recording)
meetings, total = store.list_all(states=[MeetingState.RECORDING])
assert len(meetings) == 1, f"expected 1 recording meeting, got {len(meetings)}"
assert total == 1, f"expected total=1 for recording state, got {total}"
assert meetings[0].id == recording.id, f"expected meeting id {recording.id}, got {meetings[0].id}"
def test_list_all_filter_by_multiple_states(self) -> None:
"""Test filtering by multiple states."""
store = MeetingStore()
store.create(title="Created") # Created state
recording = store.create(title="Recording")
recording.start_recording()
store.update(recording)
stopping = store.create(title="Stopping")
stopping.start_recording()
stopping.begin_stopping()
store.update(stopping)
meetings, total = store.list_all(
states=[MeetingState.RECORDING, MeetingState.STOPPING]
)
assert len(meetings) == 2, f"expected 2 meetings (recording+stopping), got {len(meetings)}"
assert total == 2, f"expected total=2 for multi-state filter, got {total}"
def test_list_all_sort_order(self) -> None:
"""Test listing with sort order."""
store = MeetingStore()
store.create(title="First")
store.create(title="Second")
meetings_desc, _ = store.list_all(sort_desc=True)
meetings_asc, _ = store.list_all(sort_desc=False)
assert meetings_desc[0].created_at >= meetings_desc[-1].created_at, (
"descending sort should have newest first"
)
assert meetings_asc[0].created_at <= meetings_asc[-1].created_at, (
"ascending sort should have oldest first"
)
def test_count_by_state_in_store(self) -> None:
"""Test counting meetings by state in MeetingStore."""
store = MeetingStore()
store.create(title="Created 1")
store.create(title="Created 2")
recording = store.create(title="Recording")
recording.start_recording()
store.update(recording)
created_count = store.count_by_state(MeetingState.CREATED)
recording_count = store.count_by_state(MeetingState.RECORDING)
assert created_count == 2, f"expected 2 created meetings, got {created_count}"
assert recording_count == 1, f"expected 1 recording meeting, got {recording_count}"
@pytest.mark.integration
class TestMeetingStoreSegments:
"""Integration tests for MeetingStore segment operations."""
def test_add_and_get_segments_in_store(self) -> None:
"""Test adding and retrieving segments in MeetingStore."""
store = MeetingStore()
meeting = store.create(title="Segment Test")
segment1 = Segment(
segment_id=0,
text="First segment",
start_time=0.0,
end_time=5.0,
)
segment2 = Segment(
segment_id=1,
text="Second segment",
start_time=5.0,
end_time=10.0,
)
store.add_segment(str(meeting.id), segment1)
store.add_segment(str(meeting.id), segment2)
segments = store.fetch_segments(str(meeting.id))
assert len(segments) == 2, f"expected 2 segments, got {len(segments)}"
assert segments[0].text == "First segment", f"first segment mismatch: {segments[0].text!r}"
assert segments[1].text == "Second segment", f"second segment mismatch: {segments[1].text!r}"
def test_add_segment_to_nonexistent_meeting(self) -> None:
"""Test adding segment to nonexistent meeting returns None."""
store = MeetingStore()
segment = Segment(segment_id=0, text="Test", start_time=0.0, end_time=1.0)
result = store.add_segment(str(uuid4()), segment)
assert result is None, f"add_segment should return None for nonexistent meeting, got {result}"
def test_get_segments_from_nonexistent_in_store(self) -> None:
"""Test getting segments from nonexistent meeting returns empty list in store."""
store = MeetingStore()
segments = store.fetch_segments(str(uuid4()))
assert segments == [], f"expected empty list for nonexistent meeting, got {segments}"
def test_getnext_segment_id_in_store(self) -> None:
"""Test getting next segment ID in MeetingStore."""
store = MeetingStore()
meeting = store.create(title="Segment ID Test")
next_id = store.compute_next_segment_id(str(meeting.id))
assert next_id == 0, f"expected next_id=0 for empty meeting, got {next_id}"
segment = Segment(segment_id=0, text="First", start_time=0.0, end_time=1.0)
store.add_segment(str(meeting.id), segment)
next_id = store.compute_next_segment_id(str(meeting.id))
assert next_id == 1, f"expected next_id=1 after adding segment, got {next_id}"
def test_getnext_segment_id_nonexistent_meeting(self) -> None:
"""Test next segment ID for nonexistent meeting is 0."""
store = MeetingStore()
next_id = store.compute_next_segment_id(str(uuid4()))
assert next_id == 0, f"expected next_id=0 for nonexistent meeting, got {next_id}"
@pytest.mark.integration
class TestMeetingStoreSummary:
"""Integration tests for MeetingStore summary operations."""
def test_set_and_get_summary(self) -> None:
"""Test setting and getting meeting summary."""
store = MeetingStore()
meeting = store.create(title="Summary Test")
summary = Summary(
meeting_id=meeting.id,
executive_summary="This is the executive summary.",
)
store.set_summary(str(meeting.id), summary)
retrieved = store.get_meeting_summary(str(meeting.id))
assert retrieved is not None, "summary should be retrievable after set"
assert retrieved.executive_summary == "This is the executive summary.", (
f"summary mismatch: {retrieved.executive_summary!r}"
)
def test_set_summary_nonexistent_meeting(self) -> None:
"""Test setting summary on nonexistent meeting returns None."""
store = MeetingStore()
summary = Summary(
meeting_id=MeetingId(uuid4()),
executive_summary="Test",
)
result = store.set_summary(str(uuid4()), summary)
assert result is None, f"set_summary should return None for nonexistent meeting, got {result}"
def test_get_summary_nonexistent_meeting(self) -> None:
"""Test getting summary from nonexistent meeting returns None."""
store = MeetingStore()
result = store.get_meeting_summary(str(uuid4()))
assert result is None, f"get_summary should return None for nonexistent meeting, got {result}"
def test_clear_summary(self) -> None:
"""Test clearing meeting summary."""
store = MeetingStore()
meeting = store.create(title="Clear Summary Test")
summary = Summary(meeting_id=meeting.id, executive_summary="To clear")
store.set_summary(str(meeting.id), summary)
result = store.clear_summary(str(meeting.id))
assert result is True, "clear_summary should return True when summary existed"
assert store.get_meeting_summary(str(meeting.id)) is None, "summary should be None after clearing"
def test_clear_summary_when_none_set(self) -> None:
"""Test clearing summary when none is set returns False."""
store = MeetingStore()
meeting = store.create(title="No Summary")
result = store.clear_summary(str(meeting.id))
assert result is False, "clear_summary should return False when no summary exists"
def test_clear_summary_nonexistent_meeting(self) -> None:
"""Test clearing summary on nonexistent meeting returns False."""
store = MeetingStore()
result = store.clear_summary(str(uuid4()))
assert result is False, "clear_summary should return False for nonexistent meeting"
@pytest.mark.integration
class TestMeetingStoreAtomicUpdates:
"""Integration tests for MeetingStore atomic update operations."""
def test_update_state(self) -> None:
"""Test atomic state update."""
store = MeetingStore()
meeting = store.create(title="State Test")
result = store.update_state(str(meeting.id), MeetingState.RECORDING)
assert result is True, "update_state should return True for existing meeting"
retrieved = store.get(str(meeting.id))
assert retrieved is not None, "meeting should exist after state update"
assert retrieved.state == MeetingState.RECORDING, f"expected RECORDING state, got {retrieved.state}"
def test_update_state_nonexistent_meeting(self) -> None:
"""Test state update on nonexistent meeting returns False."""
store = MeetingStore()
result = store.update_state(str(uuid4()), MeetingState.RECORDING)
assert result is False, "update_state should return False for nonexistent meeting"
def test_update_title(self) -> None:
"""Test atomic title update."""
store = MeetingStore()
meeting = store.create(title="Original")
result = store.update_title(str(meeting.id), "Updated")
assert result is True, "update_title should return True for existing meeting"
retrieved = store.get(str(meeting.id))
assert retrieved is not None, "meeting should exist after title update"
assert retrieved.title == "Updated", f"expected 'Updated', got {retrieved.title!r}"
def test_update_end_time(self) -> None:
"""Test atomic end time update."""
store = MeetingStore()
meeting = store.create(title="End Time Test")
end_time = datetime.now()
result = store.update_end_time(str(meeting.id), end_time)
assert result is True, "update_end_time should return True for existing meeting"
retrieved = store.get(str(meeting.id))
assert retrieved is not None, "meeting should exist after end_time update"
assert retrieved.ended_at == end_time, f"expected {end_time}, got {retrieved.ended_at}"
def test_active_count_property(self) -> None:
"""Test active_count property."""
store = MeetingStore()
store.create(title="Created")
recording = store.create(title="Recording")
recording.start_recording()
store.update(recording)
stopping = store.create(title="Stopping")
stopping.start_recording()
stopping.begin_stopping()
store.update(stopping)
assert store.active_count == 2, f"expected 2 active meetings, got {store.active_count}"
@pytest.mark.integration
class TestMeetingStoreThreadSafety:
"""Integration tests for MeetingStore thread safety."""
def test_concurrent_creates(self) -> None:
"""Test concurrent meeting creation is thread-safe."""
store = MeetingStore()
created_ids: list[str] = []
lock = threading.Lock()
def create_meeting(index: int) -> None:
meeting = store.create(title=f"Meeting {index}")
with lock:
created_ids.append(str(meeting.id))
threads = [threading.Thread(target=create_meeting, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(created_ids) == 10, f"expected 10 meetings created, got {len(created_ids)}"
assert len(set(created_ids)) == 10, "all meeting IDs should be unique"
def _create_concurrent_readers(
self, store: MeetingStore, meeting_id: str, errors: list[Exception]
) -> list[threading.Thread]:
"""Create reader threads for concurrent test."""
def reader() -> None:
try:
for _ in range(100):
store.get(meeting_id)
store.fetch_segments(meeting_id)
except (KeyError, RuntimeError, ValueError) as e:
errors.append(e)
return [threading.Thread(target=reader) for _ in range(5)]
def _create_concurrent_writers(
self, store: MeetingStore, meeting_id: str, errors: list[Exception]
) -> list[threading.Thread]:
"""Create writer threads for concurrent test."""
def writer() -> None:
try:
for i in range(100):
segment = Segment(
segment_id=i,
text=f"Segment {i}",
start_time=float(i),
end_time=float(i + 1),
)
store.add_segment(meeting_id, segment)
except (KeyError, RuntimeError, ValueError) as e:
errors.append(e)
return [threading.Thread(target=writer) for _ in range(2)]
def test_concurrent_reads_and_writes(self) -> None:
"""Test concurrent reads and writes are thread-safe."""
store = MeetingStore()
meeting = store.create(title="Concurrent Test")
errors: list[Exception] = []
readers = self._create_concurrent_readers(store, str(meeting.id), errors)
writers = self._create_concurrent_writers(store, str(meeting.id), errors)
all_threads = readers + writers
for t in all_threads:
t.start()
for t in all_threads:
t.join()
assert not errors, f"Thread errors occurred: {errors}"
@pytest.mark.integration
class TestMemoryUnitOfWork:
"""Integration tests for MemoryUnitOfWork."""
async def test_context_manager_returns_self(self) -> None:
"""Test async context manager returns self."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
assert uow is not None, "context manager should return non-None UoW"
assert isinstance(uow, MemoryUnitOfWork), f"expected MemoryUnitOfWork, got {type(uow)}"
async def test_commit_is_noop(self) -> None:
"""Test commit is a no-op (changes already applied)."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting = Meeting.create(title="Test")
await uow.meetings.create(meeting)
await uow.commit()
retrieved = await uow.meetings.get(meeting.id)
assert retrieved is not None, "meeting should exist after commit"
async def test_rollback_does_not_undo_changes(self) -> None:
"""Test rollback does not undo changes (memory doesn't support rollback)."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting = Meeting.create(title="Rollback Test")
await uow.meetings.create(meeting)
await uow.rollback()
# Meeting still exists after rollback (memory doesn't support rollback)
assert store.get(str(meeting.id)) is not None, "meeting should persist despite rollback in memory mode"
async def test_feature_flag_annotations(self) -> None:
"""Test annotations feature flag is False."""
store = MeetingStore()
uow = MemoryUnitOfWork(store)
assert uow.supports_annotations is False, "memory UoW should not support annotations"
async def test_feature_flagdiarization_jobs(self) -> None:
"""Test diarization jobs feature flag is False."""
store = MeetingStore()
uow = MemoryUnitOfWork(store)
assert uow.supports_diarization_jobs is False, "memory UoW should not support diarization jobs"
async def test_feature_flag_preferences(self) -> None:
"""Test preferences feature flag is False."""
store = MeetingStore()
uow = MemoryUnitOfWork(store)
assert uow.supports_preferences is False, "memory UoW should not support preferences"
@pytest.mark.integration
class TestMemoryMeetingRepository:
"""Integration tests for MemoryMeetingRepository."""
async def test_create_and_get(self) -> None:
"""Test creating and getting a meeting."""
store = MeetingStore()
repo = MemoryMeetingRepository(store)
meeting = Meeting.create(title="Repo Test")
await repo.create(meeting)
retrieved = await repo.get(meeting.id)
assert retrieved is not None, "meeting should be retrievable after create"
assert retrieved.id == meeting.id, f"expected id {meeting.id}, got {retrieved.id}"
async def test_update(self) -> None:
"""Test updating a meeting."""
store = MeetingStore()
repo = MemoryMeetingRepository(store)
meeting = Meeting.create(title="Original")
await repo.create(meeting)
meeting.title = "Updated"
await repo.update(meeting)
retrieved = await repo.get(meeting.id)
assert retrieved is not None, "meeting should exist after update"
assert retrieved.title == "Updated", f"expected 'Updated', got {retrieved.title!r}"
async def test_delete(self) -> None:
"""Test deleting a meeting."""
store = MeetingStore()
repo = MemoryMeetingRepository(store)
meeting = Meeting.create(title="To Delete")
await repo.create(meeting)
result = await repo.delete(meeting.id)
assert result is True, "delete should return True for existing meeting"
assert await repo.get(meeting.id) is None, "meeting should not exist after delete"
async def test_list_all(self) -> None:
"""Test listing all meetings."""
store = MeetingStore()
repo = MemoryMeetingRepository(store)
for i in range(5):
meeting = Meeting.create(title=f"Meeting {i}")
await repo.create(meeting)
meetings, total = await repo.list_all()
assert len(meetings) == 5, f"expected 5 meetings, got {len(meetings)}"
assert total == 5, f"expected total=5, got {total}"
async def test_count_by_state_via_repo(self) -> None:
"""Test counting meetings by state via MemoryMeetingRepository."""
store = MeetingStore()
repo = MemoryMeetingRepository(store)
for i in range(3):
meeting = Meeting.create(title=f"Created {i}")
await repo.create(meeting)
recording = Meeting.create(title="Recording")
recording.start_recording()
await repo.create(recording)
created_count = await repo.count_by_state(MeetingState.CREATED)
recording_count = await repo.count_by_state(MeetingState.RECORDING)
assert created_count == 3, f"expected 3 created meetings, got {created_count}"
assert recording_count == 1, f"expected 1 recording meeting, got {recording_count}"
@pytest.mark.integration
class TestMemorySegmentRepository:
"""Integration tests for MemorySegmentRepository."""
async def test_add_and_get_segments_via_repo(self) -> None:
"""Test adding and getting segments via MemorySegmentRepository."""
store = MeetingStore()
meeting_repo = MemoryMeetingRepository(store)
segment_repo = MemorySegmentRepository(store)
meeting = Meeting.create(title="Segment Repo Test")
await meeting_repo.create(meeting)
segment = Segment(
segment_id=0,
text="Test segment",
start_time=0.0,
end_time=5.0,
)
await segment_repo.add(meeting.id, segment)
segments = await segment_repo.get_by_meeting(meeting.id)
assert len(segments) == 1, f"expected 1 segment, got {len(segments)}"
assert segments[0].text == "Test segment", f"segment text mismatch: {segments[0].text!r}"
async def test_add_batch(self) -> None:
"""Test adding segments in batch."""
store = MeetingStore()
meeting_repo = MemoryMeetingRepository(store)
segment_repo = MemorySegmentRepository(store)
meeting = Meeting.create(title="Batch Test")
await meeting_repo.create(meeting)
segments = [
Segment(segment_id=i, text=f"Segment {i}", start_time=float(i), end_time=float(i + 1))
for i in range(5)
]
await segment_repo.add_batch(meeting.id, segments)
retrieved = await segment_repo.get_by_meeting(meeting.id)
assert len(retrieved) == 5, f"expected 5 segments after batch add, got {len(retrieved)}"
async def test_semantic_search_returns_empty(self) -> None:
"""Test semantic search returns empty (not supported)."""
store = MeetingStore()
segment_repo = MemorySegmentRepository(store)
results = await segment_repo.search_semantic([0.1, 0.2, 0.3])
assert results == [], f"semantic search should return empty in memory mode, got {results}"
async def test_getnext_segment_id_via_repo(self) -> None:
"""Test getting next segment ID via MemorySegmentRepository."""
store = MeetingStore()
meeting_repo = MemoryMeetingRepository(store)
segment_repo = MemorySegmentRepository(store)
meeting = Meeting.create(title="Next ID Test")
await meeting_repo.create(meeting)
next_id = await segment_repo.compute_next_segment_id(meeting.id)
assert next_id == 0, f"expected next_id=0 for empty meeting, got {next_id}"
segment = Segment(segment_id=0, text="First", start_time=0.0, end_time=1.0)
await segment_repo.add(meeting.id, segment)
next_id = await segment_repo.compute_next_segment_id(meeting.id)
assert next_id == 1, f"expected next_id=1 after adding segment, got {next_id}"
@pytest.mark.integration
class TestMemorySummaryRepository:
"""Integration tests for MemorySummaryRepository."""
async def test_save_and_get(self) -> None:
"""Test saving and getting summary."""
store = MeetingStore()
meeting_repo = MemoryMeetingRepository(store)
summary_repo = MemorySummaryRepository(store)
meeting = Meeting.create(title="Summary Repo Test")
await meeting_repo.create(meeting)
summary = Summary(
meeting_id=meeting.id,
executive_summary="Executive summary content",
)
await summary_repo.save(summary)
retrieved = await summary_repo.get_by_meeting(meeting.id)
assert retrieved is not None, "summary should be retrievable after save"
assert retrieved.executive_summary == "Executive summary content", (
f"summary mismatch: {retrieved.executive_summary!r}"
)
async def test_delete_by_meeting(self) -> None:
"""Test deleting summary by meeting ID."""
store = MeetingStore()
meeting_repo = MemoryMeetingRepository(store)
summary_repo = MemorySummaryRepository(store)
meeting = Meeting.create(title="Delete Summary Test")
await meeting_repo.create(meeting)
summary = Summary(meeting_id=meeting.id, executive_summary="To delete")
await summary_repo.save(summary)
result = await summary_repo.delete_by_meeting(meeting.id)
assert result is True, "delete_by_meeting should return True when summary existed"
assert await summary_repo.get_by_meeting(meeting.id) is None, "summary should not exist after delete"
@pytest.mark.integration
class TestUnsupportedRepositories:
"""Integration tests for unsupported repository operations."""
async def test_annotation_repository_raises_on_add(self) -> None:
"""Test annotation repository raises NotImplementedError on add."""
repo = UnsupportedAnnotationRepository()
with pytest.raises(NotImplementedError, match="require database"):
from noteflow.domain.entities import Annotation
from noteflow.domain.value_objects import AnnotationType
annotation = Annotation(
id=AnnotationId(uuid4()),
meeting_id=MeetingId(uuid4()),
annotation_type=AnnotationType.NOTE,
text="Test",
start_time=0.0,
end_time=1.0,
)
await repo.add(annotation)
async def test_annotation_repository_raises_on_get(self) -> None:
"""Test annotation repository raises NotImplementedError on get."""
repo = UnsupportedAnnotationRepository()
with pytest.raises(NotImplementedError, match="require database"):
from noteflow.domain.value_objects import AnnotationId
await repo.get(AnnotationId(uuid4()))
async def test_annotation_repository_raises_on_list(self) -> None:
"""Test annotation repository raises NotImplementedError on list."""
repo = UnsupportedAnnotationRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.get_by_meeting(MeetingId(uuid4()))
async def test_annotation_repository_raises_on_update(self) -> None:
"""Test annotation repository raises NotImplementedError on update."""
repo = UnsupportedAnnotationRepository()
with pytest.raises(NotImplementedError, match="require database"):
from noteflow.domain.entities import Annotation
from noteflow.domain.value_objects import AnnotationType
annotation = Annotation(
id=AnnotationId(uuid4()),
meeting_id=MeetingId(uuid4()),
annotation_type=AnnotationType.NOTE,
text="Test",
start_time=0.0,
end_time=1.0,
)
await repo.update(annotation)
async def test_annotation_repository_raises_on_delete(self) -> None:
"""Test annotation repository raises NotImplementedError on delete."""
repo = UnsupportedAnnotationRepository()
with pytest.raises(NotImplementedError, match="require database"):
from noteflow.domain.value_objects import AnnotationId
await repo.delete(AnnotationId(uuid4()))
async def test_diarization_job_repository_raises_on_create(self) -> None:
"""Test diarization job repository raises NotImplementedError on create."""
from noteflow.infrastructure.persistence.repositories.diarization_job._constants import (
JOB_STATUS_QUEUED,
)
repo = UnsupportedDiarizationJobRepository()
with pytest.raises(NotImplementedError, match="require database"):
from noteflow.infrastructure.persistence.repositories import DiarizationJob
job = DiarizationJob(
job_id=str(uuid4()),
meeting_id=str(uuid4()),
status=JOB_STATUS_QUEUED,
)
await repo.create(job)
async def test_diarization_job_repository_raises_on_get(self) -> None:
"""Test diarization job repository raises NotImplementedError on get."""
repo = UnsupportedDiarizationJobRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.get(str(uuid4()))
async def test_diarization_job_repository_raises_on_update_status(self) -> None:
"""Test diarization job repository raises NotImplementedError on update."""
repo = UnsupportedDiarizationJobRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.update_status(str(uuid4()), 2)
async def test_diarization_job_repository_raises_on_streaming_turns(self) -> None:
"""Test diarization job repository raises on streaming turn operations."""
repo = UnsupportedDiarizationJobRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.add_streaming_turns(str(uuid4()), [])
with pytest.raises(NotImplementedError, match="require database"):
await repo.get_streaming_turns(str(uuid4()))
with pytest.raises(NotImplementedError, match="require database"):
await repo.clear_streaming_turns(str(uuid4()))
async def test_preferences_repository_raises_on_get(self) -> None:
"""Test preferences repository raises NotImplementedError on get."""
repo = UnsupportedPreferencesRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.get("test_key")
async def test_preferences_repository_raises_on_set(self) -> None:
"""Test preferences repository raises NotImplementedError on set."""
repo = UnsupportedPreferencesRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.set("test_key", "test_value")
async def test_preferences_repository_raises_on_delete(self) -> None:
"""Test preferences repository raises NotImplementedError on delete."""
repo = UnsupportedPreferencesRepository()
with pytest.raises(NotImplementedError, match="require database"):
await repo.delete("test_key")
@pytest.mark.integration
class TestGrpcServicerMemoryFallback:
"""Integration tests for gRPC servicer with memory fallback."""
async def test_servicer_usesmemory_store_when_nosession_factory(self) -> None:
"""Test servicer falls back to memory store without session factory."""
servicer = NoteFlowServicer(session_factory=None)
assert servicer.memory_store is not None, "servicer should have memory store when no session_factory"
async def test_create_meeting_in_memory_mode(self) -> None:
"""Test creating meeting works in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
request = noteflow_pb2.CreateMeetingRequest(title="Memory Meeting")
result = await servicer.CreateMeeting(request, MockContext())
assert result.id, "created meeting should have an ID"
assert result.title == "Memory Meeting", f"expected 'Memory Meeting', got {result.title!r}"
async def test_get_meeting_in_memory_mode(self) -> None:
"""Test getting meeting works in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
create_request = noteflow_pb2.CreateMeetingRequest(title="Get Memory Test")
created = await servicer.CreateMeeting(create_request, MockContext())
get_request = noteflow_pb2.GetMeetingRequest(meeting_id=created.id)
result = await servicer.GetMeeting(get_request, MockContext())
assert result.id == created.id, f"expected id {created.id}, got {result.id}"
assert result.title == "Get Memory Test", f"expected 'Get Memory Test', got {result.title!r}"
async def test_list_meetings_in_memory_mode(self) -> None:
"""Test listing meetings works in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
for i in range(3):
request = noteflow_pb2.CreateMeetingRequest(title=f"Meeting {i}")
await servicer.CreateMeeting(request, MockContext())
list_request = noteflow_pb2.ListMeetingsRequest()
result = await servicer.ListMeetings(list_request, MockContext())
assert len(result.meetings) == 3, f"expected 3 meetings, got {len(result.meetings)}"
async def test_delete_meeting_in_memory_mode(self) -> None:
"""Test deleting meeting works in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
create_request = noteflow_pb2.CreateMeetingRequest(title="To Delete")
created = await servicer.CreateMeeting(create_request, MockContext())
delete_request = noteflow_pb2.DeleteMeetingRequest(meeting_id=created.id)
result = await servicer.DeleteMeeting(delete_request, MockContext())
assert result.success is True, "delete should succeed"
get_request = noteflow_pb2.GetMeetingRequest(meeting_id=created.id)
context = MockContext()
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.GetMeeting(get_request, context)
assert context.abort_code == grpc.StatusCode.NOT_FOUND, (
f"expected NOT_FOUND, got {context.abort_code}"
)
async def test_get_server_info_in_memory_mode(self) -> None:
"""Test GetServerInfo works in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
request = noteflow_pb2.ServerInfoRequest()
result = await servicer.GetServerInfo(request, MockContext())
assert result.active_meetings >= 0, f"active_meetings should be non-negative, got {result.active_meetings}"
async def test_annotation_operations_fail_in_memory_mode(self) -> None:
"""Test annotation operations fail gracefully in memory mode."""
servicer = NoteFlowServicer(session_factory=None)
create_request = noteflow_pb2.CreateMeetingRequest(title="Annotation Test")
created = await servicer.CreateMeeting(create_request, MockContext())
add_request = noteflow_pb2.AddAnnotationRequest(
meeting_id=created.id,
annotation_type=noteflow_pb2.ANNOTATION_TYPE_NOTE,
text="Test annotation",
start_time=0.0,
end_time=1.0,
)
context = MockContext()
with pytest.raises(grpc.RpcError, match=r".*"):
await servicer.AddAnnotation(add_request, context)
assert context.abort_code == grpc.StatusCode.UNIMPLEMENTED, (
f"expected UNIMPLEMENTED for annotations in memory mode, got {context.abort_code}"
)
@pytest.mark.integration
class TestMemoryModeConstraints:
"""Integration tests for memory mode constraints and limitations."""
async def test_memory_mode_segments_persist_on_meeting(self) -> None:
"""Test segments are stored on meeting entity in memory mode."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting = Meeting.create(title="Segment Persistence")
await uow.meetings.create(meeting)
segment = Segment(
segment_id=0,
text="Test segment",
start_time=0.0,
end_time=1.0,
)
await uow.segments.add(meeting.id, segment)
# Verify through store directly
stored_meeting = store.get(str(meeting.id))
assert stored_meeting is not None, "meeting should exist in store"
assert len(stored_meeting.segments) == 1, f"expected 1 segment, got {len(stored_meeting.segments)}"
async def test_memory_mode_summary_persists_on_meeting(self) -> None:
"""Test summary is stored on meeting entity in memory mode."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting = Meeting.create(title="Summary Persistence")
await uow.meetings.create(meeting)
summary = Summary(
meeting_id=meeting.id,
executive_summary="Test summary",
)
await uow.summaries.save(summary)
# Verify through store directly
stored_meeting = store.get(str(meeting.id))
assert stored_meeting is not None, "meeting should exist in store"
assert stored_meeting.summary is not None, "summary should be attached to meeting"
assert stored_meeting.summary.executive_summary == "Test summary", (
f"summary mismatch: {stored_meeting.summary.executive_summary!r}"
)
async def test_memory_mode_no_semantic_search(self) -> None:
"""Test semantic search is not available in memory mode."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting = Meeting.create(title="Semantic Test")
await uow.meetings.create(meeting)
segment = Segment(
segment_id=0,
text="Searchable content",
start_time=0.0,
end_time=1.0,
)
await uow.segments.add(meeting.id, segment)
results = await uow.segments.search_semantic([0.1] * 384)
assert results == [], f"semantic search should return empty in memory mode, got {results}"
async def test_memory_mode_meetings_isolated(self) -> None:
"""Test meetings are isolated in memory mode."""
store = MeetingStore()
async with MemoryUnitOfWork(store) as uow:
meeting1 = Meeting.create(title="Meeting 1")
meeting2 = Meeting.create(title="Meeting 2")
await uow.meetings.create(meeting1)
await uow.meetings.create(meeting2)
segment1 = Segment(segment_id=0, text="M1 Segment", start_time=0.0, end_time=1.0)
segment2 = Segment(segment_id=0, text="M2 Segment", start_time=0.0, end_time=1.0)
await uow.segments.add(meeting1.id, segment1)
await uow.segments.add(meeting2.id, segment2)
m1_segments = await uow.segments.get_by_meeting(meeting1.id)
m2_segments = await uow.segments.get_by_meeting(meeting2.id)
assert len(m1_segments) == 1, "Meeting 1 should have 1 segment"
assert len(m2_segments) == 1, "Meeting 2 should have 1 segment"
assert m1_segments[0].text == "M1 Segment", "Meeting 1 segment text should match"
assert m2_segments[0].text == "M2 Segment", "Meeting 2 segment text should match"