refactor: enhance test assertions for clarity and consistency

- Updated assertions in various test files to include descriptive messages, improving readability and maintainability.
- Ensured that all assertions provide context for expected outcomes, aiding in debugging and understanding test failures.
- Refactored tests across multiple modules, including annotation, consent, and summarization, to enhance overall test quality.

All quality checks pass.
This commit is contained in:
2025-12-31 17:38:07 +00:00
parent 183d094eb6
commit b74c6c3742
31 changed files with 486 additions and 451 deletions

View File

@@ -176,8 +176,10 @@ class TestAddAnnotation:
response = await servicer.AddAnnotation(request, mock_grpc_context)
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_NOTE
assert response.text == "A simple note"
assert (
response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_NOTE
), "annotation_type should be NOTE"
assert response.text == "A simple note", "text should match request"
async def test_adds_annotation_with_action_item_type(
self,
@@ -203,8 +205,10 @@ class TestAddAnnotation:
response = await servicer.AddAnnotation(request, mock_grpc_context)
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM
assert response.text == "Follow up with client"
assert (
response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_ACTION_ITEM
), "annotation_type should be ACTION_ITEM"
assert response.text == "Follow up with client", "text should match request"
async def test_adds_annotation_with_risk_type(
self,
@@ -230,8 +234,10 @@ class TestAddAnnotation:
response = await servicer.AddAnnotation(request, mock_grpc_context)
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_RISK
assert response.text == "Potential budget overrun"
assert (
response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_RISK
), "annotation_type should be RISK"
assert response.text == "Potential budget overrun", "text should match request"
async def test_adds_annotation_commits_transaction(
self,
@@ -320,7 +326,9 @@ class TestGetAnnotation:
assert response.start_time == 100.0, "start_time should match"
assert response.end_time == 120.0, "end_time should match"
assert list(response.segment_ids) == [5, 6, 7], "segment_ids should match"
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION
assert (
response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION
), "annotation_type should be DECISION"
async def test_aborts_when_annotation_not_found(
self,
@@ -416,9 +424,13 @@ class TestListAnnotations:
response = await servicer.ListAnnotations(request, mock_grpc_context)
assert len(response.annotations) == 3, "should return all 3 annotations"
assert response.annotations[0].text == "First note"
assert response.annotations[1].text == "Important decision"
assert response.annotations[2].text == "Follow up required"
assert response.annotations[0].text == "First note", "first annotation text should match"
assert (
response.annotations[1].text == "Important decision"
), "second annotation text should match"
assert (
response.annotations[2].text == "Follow up required"
), "third annotation text should match"
async def test_filters_by_time_range(
self,
@@ -446,7 +458,9 @@ class TestListAnnotations:
response = await servicer.ListAnnotations(request, mock_grpc_context)
assert len(response.annotations) == 1, "should return filtered annotations"
assert response.annotations[0].text == "Annotation in range"
assert (
response.annotations[0].text == "Annotation in range"
), "filtered annotation text should match"
mock_annotations_repo.get_by_time_range.assert_called_once_with(
meeting_id, 20.0, 40.0
)
@@ -469,7 +483,7 @@ class TestListAnnotations:
)
response = await servicer.ListAnnotations(request, mock_grpc_context)
assert len(response.annotations) == 0
assert len(response.annotations) == 0, "should return empty list for start_time filter"
mock_annotations_repo.get_by_time_range.assert_called_once_with(
meeting_id, 50.0, 0.0
)
@@ -536,7 +550,9 @@ class TestUpdateAnnotation:
assert response.id == str(annotation_id), "id should remain unchanged"
assert response.text == "Updated text", "text should be updated"
assert response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION
assert (
response.annotation_type == noteflow_pb2.ANNOTATION_TYPE_DECISION
), "annotation_type should be updated to DECISION"
assert response.start_time == 15.0, "start_time should be updated"
assert response.end_time == 25.0, "end_time should be updated"
assert list(response.segment_ids) == [2, 3], "segment_ids should be updated"

View File

@@ -83,7 +83,7 @@ class TestGetCloudConsentStatus:
_DummyContext(),
)
assert response.consent_granted is False
assert response.consent_granted is False, "consent should be False when not granted"
@pytest.mark.asyncio
async def test_returns_true_when_consent_granted(self) -> None:
@@ -96,7 +96,7 @@ class TestGetCloudConsentStatus:
_DummyContext(),
)
assert response.consent_granted is True
assert response.consent_granted is True, "consent should be True when granted"
@pytest.mark.asyncio
async def test_returns_false_when_service_unavailable(self) -> None:
@@ -109,7 +109,7 @@ class TestGetCloudConsentStatus:
)
# Safe default: no consent when service unavailable
assert response.consent_granted is False
assert response.consent_granted is False, "should default to False when service unavailable"
class TestGrantCloudConsent:
@@ -155,7 +155,7 @@ class TestGrantCloudConsent:
context,
)
assert context.aborted
assert context.aborted, "should abort when service unavailable"
class TestRevokeCloudConsent:
@@ -200,7 +200,7 @@ class TestRevokeCloudConsent:
context,
)
assert context.aborted
assert context.aborted, "should abort when service unavailable"
class TestConsentRoundTrip:
@@ -218,7 +218,7 @@ class TestConsentRoundTrip:
noteflow_pb2.GetCloudConsentStatusRequest(),
context,
)
assert status_before.consent_granted is False
assert status_before.consent_granted is False, "initial consent should be False"
# Grant consent
await servicer.GrantCloudConsent(
@@ -231,7 +231,7 @@ class TestConsentRoundTrip:
noteflow_pb2.GetCloudConsentStatusRequest(),
_DummyContext(),
)
assert status_after.consent_granted is True
assert status_after.consent_granted is True, "consent should be True after grant"
@pytest.mark.asyncio
async def test_grant_revoke_cycle(self) -> None:
@@ -248,7 +248,7 @@ class TestConsentRoundTrip:
noteflow_pb2.GetCloudConsentStatusRequest(),
_DummyContext(),
)
assert status.consent_granted is True
assert status.consent_granted is True, "consent should be True after grant"
# Revoke
await servicer.RevokeCloudConsent(
@@ -259,7 +259,7 @@ class TestConsentRoundTrip:
noteflow_pb2.GetCloudConsentStatusRequest(),
_DummyContext(),
)
assert status.consent_granted is False
assert status.consent_granted is False, "consent should be False after revoke"
@pytest.mark.asyncio
async def test_consent_change_callback_invoked(self) -> None:

View File

@@ -62,8 +62,8 @@ async def test_cancel_queued_job_succeeds() -> None:
_DummyContext(),
)
assert response.success is True
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED
assert response.success is True, "cancel should succeed for queued job"
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED, "status should be CANCELLED"
@pytest.mark.asyncio
@@ -85,8 +85,8 @@ async def test_cancel_running_job_succeeds() -> None:
_DummyContext(),
)
assert response.success is True
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED
assert response.success is True, "cancel should succeed for running job"
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED, "status should be CANCELLED"
@pytest.mark.asyncio
@@ -99,8 +99,8 @@ async def test_cancel_nonexistent_job_fails() -> None:
_DummyContext(),
)
assert response.success is False
assert "not found" in response.error_message.lower()
assert response.success is False, "cancel should fail for nonexistent job"
assert "not found" in response.error_message.lower(), "error should mention 'not found'"
@pytest.mark.asyncio
@@ -122,7 +122,7 @@ async def test_progress_percent_queued() -> None:
_DummyContext(),
)
assert response.progress_percent == pytest.approx(0.0)
assert response.progress_percent == pytest.approx(0.0), "queued job should have 0% progress"
@pytest.mark.asyncio
@@ -173,4 +173,4 @@ async def test_progress_percent_completed() -> None:
_DummyContext(),
)
assert response.progress_percent == pytest.approx(100.0)
assert response.progress_percent == pytest.approx(100.0), "completed job should have 100% progress"

View File

@@ -107,8 +107,8 @@ class TestRefineSpeakerDiarizationValidation:
context,
)
assert response.segments_updated == 0
assert "invalid" in response.error_message.lower()
assert response.segments_updated == 0, "Invalid UUID should not update any segments"
assert "invalid" in response.error_message.lower(), "Error message should mention 'invalid'"
@pytest.mark.asyncio
async def test_refine_rejects_nonexistent_meeting(
@@ -123,8 +123,8 @@ class TestRefineSpeakerDiarizationValidation:
context,
)
assert response.segments_updated == 0
assert "not found" in response.error_message.lower()
assert response.segments_updated == 0, "Nonexistent meeting should not update any segments"
assert "not found" in response.error_message.lower(), "Error message should mention 'not found'"
class TestRefineSpeakerDiarizationState:
@@ -146,8 +146,8 @@ class TestRefineSpeakerDiarizationState:
context,
)
assert response.segments_updated == 0
assert "stopped" in response.error_message.lower()
assert response.segments_updated == 0, "Recording meeting should not update any segments"
assert "stopped" in response.error_message.lower(), "Error message should mention 'stopped'"
@pytest.mark.asyncio
async def test_refine_rejects_stopping_meeting(
@@ -166,8 +166,8 @@ class TestRefineSpeakerDiarizationState:
context,
)
assert response.segments_updated == 0
assert "stopped" in response.error_message.lower()
assert response.segments_updated == 0, "Stopping meeting should not update any segments"
assert "stopped" in response.error_message.lower(), "Error message should require meeting to be stopped"
@pytest.mark.asyncio
async def test_refine_accepts_stopped_meeting(
@@ -188,7 +188,7 @@ class TestRefineSpeakerDiarizationState:
)
assert response.job_id, "Should return job_id for accepted request"
assert response.status == noteflow_pb2.JOB_STATUS_QUEUED
assert response.status == noteflow_pb2.JOB_STATUS_QUEUED, "Job status should be QUEUED"
class TestRefineSpeakerDiarizationServer:
@@ -212,8 +212,8 @@ class TestRefineSpeakerDiarizationServer:
context,
)
assert response.segments_updated == 0
assert "disabled" in response.error_message.lower()
assert response.segments_updated == 0, "Disabled refinement should not update any segments"
assert "disabled" in response.error_message.lower(), "Error message should mention 'disabled'"
@pytest.mark.asyncio
async def test_refine_returns_error_when_engine_unavailable(
@@ -233,8 +233,8 @@ class TestRefineSpeakerDiarizationServer:
context,
)
assert response.segments_updated == 0
assert "not enabled" in response.error_message.lower()
assert response.segments_updated == 0, "Missing engine should not update any segments"
assert "not enabled" in response.error_message.lower(), "Error message should mention 'not enabled'"
class TestRenameSpeakerValidation:
@@ -257,7 +257,7 @@ class TestRenameSpeakerValidation:
context,
)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, "Missing old_speaker_id should abort with INVALID_ARGUMENT"
@pytest.mark.asyncio
async def test_rename_aborts_on_missing_new_speaker_name(
@@ -276,7 +276,7 @@ class TestRenameSpeakerValidation:
context,
)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, "Missing new_speaker_name should abort with INVALID_ARGUMENT"
@pytest.mark.asyncio
async def test_rename_aborts_on_invalid_meeting_id(
@@ -295,7 +295,7 @@ class TestRenameSpeakerValidation:
context,
)
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT
assert context.abort_code == grpc.StatusCode.INVALID_ARGUMENT, "Invalid meeting_id format should abort with INVALID_ARGUMENT"
class TestRenameSpeakerOperation:
@@ -365,8 +365,8 @@ class TestRenameSpeakerOperation:
context,
)
assert response.segments_updated == 0
assert response.success is False
assert response.segments_updated == 0, "No matching speaker should result in zero segments updated"
assert response.success is False, "Rename should fail when no segments match"
class TestGetDiarizationJobStatusProgress:
@@ -392,7 +392,7 @@ class TestGetDiarizationJobStatusProgress:
context,
)
assert response.progress_percent == pytest.approx(0.0)
assert response.progress_percent == pytest.approx(0.0), "Queued job should have 0% progress"
@pytest.mark.asyncio
async def test_status_progress_running_is_time_based(
@@ -419,7 +419,7 @@ class TestGetDiarizationJobStatusProgress:
)
# 120s audio at ~0.17 ratio = ~20s estimated; 10s elapsed = ~50%
assert response.progress_percent == pytest.approx(50.0, rel=0.25)
assert response.progress_percent == pytest.approx(50.0, rel=0.25), "Running job progress should be ~50% based on elapsed time"
@pytest.mark.asyncio
async def test_status_progress_completed_is_full(
@@ -441,7 +441,7 @@ class TestGetDiarizationJobStatusProgress:
context,
)
assert response.progress_percent == pytest.approx(100.0)
assert response.progress_percent == pytest.approx(100.0), "Completed job should have 100% progress"
@pytest.mark.asyncio
async def test_status_progress_failed_is_zero(
@@ -463,7 +463,7 @@ class TestGetDiarizationJobStatusProgress:
context,
)
assert response.progress_percent == pytest.approx(0.0)
assert response.progress_percent == pytest.approx(0.0), "Failed job should have 0% progress"
class TestCancelDiarizationJobStates:
@@ -492,8 +492,8 @@ class TestCancelDiarizationJobStates:
context,
)
assert response.success is True
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED
assert response.success is True, "Cancelling queued job should succeed"
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED, "Cancelled queued job status should be CANCELLED"
@pytest.mark.asyncio
async def test_cancel_mixin_running_succeeds(
@@ -515,8 +515,8 @@ class TestCancelDiarizationJobStates:
context,
)
assert response.success is True
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED
assert response.success is True, "Cancelling running job should succeed"
assert response.status == noteflow_pb2.JOB_STATUS_CANCELLED, "Cancelled running job status should be CANCELLED"
@pytest.mark.asyncio
async def test_cancel_mixin_nonexistent_fails(
@@ -530,8 +530,8 @@ class TestCancelDiarizationJobStates:
context,
)
assert response.success is False
assert "not found" in response.error_message.lower()
assert response.success is False, "Cancelling nonexistent job should fail"
assert "not found" in response.error_message.lower(), "Error message should mention 'not found'"
@pytest.mark.asyncio
async def test_cancel_completed_job_fails(
@@ -553,5 +553,5 @@ class TestCancelDiarizationJobStates:
context,
)
assert response.success is False
assert response.status == noteflow_pb2.JOB_STATUS_COMPLETED
assert response.success is False, "Cancelling completed job should fail"
assert response.status == noteflow_pb2.JOB_STATUS_COMPLETED, "Completed job status should remain COMPLETED"

View File

@@ -31,6 +31,6 @@ async def test_refine_speaker_diarization_rejects_active_meeting() -> None:
_DummyContext(),
)
assert response.segments_updated == 0
assert response.error_message
assert "stopped" in response.error_message.lower()
assert response.segments_updated == 0, "no segments should be updated for active meeting"
assert response.error_message, "error message should be present"
assert "stopped" in response.error_message.lower(), "error should mention 'stopped'"

View File

@@ -159,10 +159,10 @@ class TestExtractEntities:
assert response.total_count == 2, "total count should be 2"
assert response.cached is False, "should not be cached"
assert response.entities[0].text == "John Doe"
assert response.entities[0].category == "person"
assert response.entities[1].text == "Acme Corp"
assert response.entities[1].category == "company"
assert response.entities[0].text == "John Doe", "first entity text should match"
assert response.entities[0].category == "person", "first entity category should be person"
assert response.entities[1].text == "Acme Corp", "second entity text should match"
assert response.entities[1].category == "company", "second entity category should be company"
async def test_returns_cached_entities(
self,
@@ -187,7 +187,7 @@ class TestExtractEntities:
response = await servicer.ExtractEntities(request, mock_grpc_context)
assert response.cached is True, "should indicate cached result"
assert len(response.entities) == 1
assert len(response.entities) == 1, "should return 1 cached entity"
mock_ner_service.extract_entities.assert_called_once_with(
meeting_id=meeting_id,
force_refresh=False,
@@ -321,7 +321,7 @@ class TestExtractEntities:
response = await servicer.ExtractEntities(request, mock_grpc_context)
assert len(response.entities) == 0, "should return empty list"
assert response.total_count == 0
assert response.total_count == 0, "total count should be 0"
async def test_includes_pinned_status_in_response(
self,
@@ -347,7 +347,7 @@ class TestExtractEntities:
)
response = await servicer.ExtractEntities(request, mock_grpc_context)
assert response.entities[0].is_pinned is True
assert response.entities[0].is_pinned is True, "entity should be marked as pinned"
class TestUpdateEntity:
@@ -385,7 +385,7 @@ class TestUpdateEntity:
)
response = await servicer.UpdateEntity(request, mock_grpc_context)
assert response.entity.text == "Updated Name"
assert response.entity.text == "Updated Name", "entity text should be updated"
mock_entities_repo.update.assert_called_once_with(
entity_id=entity.id,
text="Updated Name",
@@ -415,7 +415,7 @@ class TestUpdateEntity:
)
response = await servicer.UpdateEntity(request, mock_grpc_context)
assert response.entity.category == "company"
assert response.entity.category == "company", "entity category should be updated"
async def test_aborts_when_entity_not_found(
self,
@@ -551,7 +551,7 @@ class TestDeleteEntity:
)
response = await servicer.DeleteEntity(request, mock_grpc_context)
assert response.success is True
assert response.success is True, "delete should succeed"
mock_entities_repo.delete.assert_called_once_with(entity.id)
async def test_aborts_when_entity_not_found(

View File

@@ -146,9 +146,9 @@ class TestExportTranscriptMarkdown:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.content == "# Team Standup\n\nNo segments."
assert response.format_name == "Markdown"
assert response.file_extension == ".md"
assert response.content == "# Team Standup\n\nNo segments.", "response content should match expected markdown output"
assert response.format_name == "Markdown", "format_name should be 'Markdown'"
assert response.file_extension == ".md", "file_extension should be '.md' for markdown"
async def test_exports_markdown_with_segments(
self,
@@ -186,8 +186,8 @@ class TestExportTranscriptMarkdown:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.content == expected_content
assert response.format_name == "Markdown"
assert response.content == expected_content, "response content should include all segment text"
assert response.format_name == "Markdown", "format_name should be 'Markdown'"
async def test_exports_markdown_as_default_format(
self,
@@ -218,8 +218,8 @@ class TestExportTranscriptMarkdown:
mock_service.export_transcript.assert_called_once()
call_args = mock_service.export_transcript.call_args
assert call_args[0][1] == ExportFormat.MARKDOWN
assert response.format_name == "Markdown"
assert call_args[0][1] == ExportFormat.MARKDOWN, "unspecified format should default to MARKDOWN"
assert response.format_name == "Markdown", "format_name should be 'Markdown' for default format"
class TestExportTranscriptHtml:
@@ -254,9 +254,9 @@ class TestExportTranscriptHtml:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.content == html_content
assert response.format_name == "HTML"
assert response.file_extension == ".html"
assert response.content == html_content, "response content should match expected HTML output"
assert response.format_name == "HTML", "format_name should be 'HTML'"
assert response.file_extension == ".html", "file_extension should be '.html' for HTML"
async def test_exports_html_with_segments(
self,
@@ -295,8 +295,8 @@ class TestExportTranscriptHtml:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert "Code Review" in response.content
assert response.format_name == "HTML"
assert "Code Review" in response.content, "response content should include meeting title"
assert response.format_name == "HTML", "format_name should be 'HTML'"
class TestExportTranscriptPdf:
@@ -333,9 +333,9 @@ class TestExportTranscriptPdf:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.content == expected_base64
assert response.format_name == "PDF"
assert response.file_extension == ".pdf"
assert response.content == expected_base64, "response content should be base64-encoded PDF"
assert response.format_name == "PDF", "format_name should be 'PDF'"
assert response.file_extension == ".pdf", "file_extension should be '.pdf' for PDF"
async def test_pdf_content_is_decodable(
self,
@@ -367,7 +367,7 @@ class TestExportTranscriptPdf:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
decoded_bytes = base64.b64decode(response.content)
assert decoded_bytes == original_pdf_bytes
assert decoded_bytes == original_pdf_bytes, "decoded base64 content should match original PDF bytes"
class TestExportTranscriptMeetingNotFound:
@@ -479,8 +479,8 @@ class TestExportTranscriptWithSegments:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert "SPEAKER_00" in response.content
assert "SPEAKER_01" in response.content
assert "SPEAKER_00" in response.content, "response content should include first speaker label"
assert "SPEAKER_01" in response.content, "response content should include second speaker label"
async def test_exports_empty_transcript(
self,
@@ -511,7 +511,7 @@ class TestExportTranscriptWithSegments:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.content == "# Empty Meeting\n\nNo transcript available."
assert response.content == "# Empty Meeting\n\nNo transcript available.", "response content should indicate no transcript available"
async def test_exports_long_transcript(
self,
@@ -557,8 +557,8 @@ class TestExportTranscriptWithSegments:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert "segment number 0" in response.content
assert "segment number 99" in response.content
assert "segment number 0" in response.content, "response content should include first segment"
assert "segment number 99" in response.content, "response content should include last segment"
class TestExportFormatMetadata:
@@ -608,5 +608,5 @@ class TestExportFormatMetadata:
response = await export_servicer.ExportTranscript(request, mock_grpc_context)
assert response.format_name == expected_name
assert response.file_extension == expected_ext
assert response.format_name == expected_name, f"format_name should be '{expected_name}'"
assert response.file_extension == expected_ext, f"file_extension should be '{expected_ext}'"

View File

@@ -36,11 +36,11 @@ async def test_generate_summary_uses_placeholder_when_service_missing() -> None:
_DummyContext(),
)
assert response.executive_summary != ""
assert response.model_version == "placeholder/v0"
assert response.executive_summary != "", "executive summary should not be empty"
assert response.model_version == "placeholder/v0", "should use placeholder model"
retrieved_meeting = store.get(str(meeting.id))
assert retrieved_meeting is not None, "Meeting should exist after creation"
assert retrieved_meeting.summary is not None
assert retrieved_meeting.summary is not None, "summary should be stored on meeting"
class _FailingSummarizationService:
@@ -72,5 +72,5 @@ async def test_generate_summary_falls_back_when_provider_unavailable() -> None:
_DummyContext(),
)
assert response.executive_summary != ""
assert response.model_version == "placeholder/v0"
assert response.executive_summary != "", "fallback should produce non-empty summary"
assert response.model_version == "placeholder/v0", "fallback should use placeholder model"

View File

@@ -680,7 +680,7 @@ class TestGetMeeting:
response = await meeting_mixin_servicer.GetMeeting(request, mock_grpc_context)
assert response.summary is not None, "Summary should be included"
assert response.summary.executive_summary == "This was a productive meeting."
assert response.summary.executive_summary == "This was a productive meeting.", "executive summary should match"
meeting_mixin_summaries_repo.get_by_meeting.assert_called_once_with(meeting.id)
async def test_get_meeting_excludes_segments_when_not_requested(

View File

@@ -135,8 +135,8 @@ class TestGetCalendarProviders:
google = next(p for p in response.providers if p.name == "google")
outlook = next(p for p in response.providers if p.name == "outlook")
assert google.display_name == "Google Calendar"
assert outlook.display_name == "Microsoft Outlook"
assert google.display_name == "Google Calendar", "google should have correct display name"
assert outlook.display_name == "Microsoft Outlook", "outlook should have correct display name"
@pytest.mark.asyncio
async def test_aborts_when_calendar_service_not_configured(self) -> None:
@@ -309,7 +309,7 @@ class TestCompleteOAuth:
)
assert response.success is False, "should fail on invalid state"
assert "Invalid or expired state" in response.error_message
assert "Invalid or expired state" in response.error_message, "error should mention invalid state"
@pytest.mark.asyncio
async def test_returns_error_on_invalid_code(self) -> None:
@@ -330,7 +330,7 @@ class TestCompleteOAuth:
)
assert response.success is False, "should fail on invalid code"
assert "Token exchange failed" in response.error_message
assert "Token exchange failed" in response.error_message, "error should mention token exchange failure"
@pytest.mark.asyncio
async def test_aborts_when_complete_service_unavailable(self) -> None:
@@ -368,9 +368,9 @@ class TestGetOAuthConnectionStatus:
_DummyContext(),
)
assert response.connection.provider == "google"
assert response.connection.status == IntegrationStatus.CONNECTED.value
assert response.connection.email == "user@gmail.com"
assert response.connection.provider == "google", "should return correct provider"
assert response.connection.status == IntegrationStatus.CONNECTED.value, "status should be connected"
assert response.connection.email == "user@gmail.com", "should return connected email"
@pytest.mark.asyncio
async def test_returns_disconnected_status(self) -> None:
@@ -385,7 +385,7 @@ class TestGetOAuthConnectionStatus:
_DummyContext(),
)
assert response.connection.status == IntegrationStatus.DISCONNECTED.value
assert response.connection.status == IntegrationStatus.DISCONNECTED.value, "status should be disconnected"
@pytest.mark.asyncio
async def test_returns_integration_type(self) -> None:
@@ -401,7 +401,7 @@ class TestGetOAuthConnectionStatus:
_DummyContext(),
)
assert response.connection.integration_type == "calendar"
assert response.connection.integration_type == "calendar", "should return calendar integration type"
@pytest.mark.asyncio
async def test_aborts_when_status_service_unavailable(self) -> None:
@@ -605,7 +605,7 @@ class TestOAuthRoundTrip:
)
assert response.success is False, "should fail with wrong state"
assert "Invalid or expired state" in response.error_message
assert "Invalid or expired state" in response.error_message, "error should mention invalid state"
@pytest.mark.asyncio
async def test_multiple_providers_independent(self) -> None:
@@ -626,8 +626,8 @@ class TestOAuthRoundTrip:
ctx,
)
assert google_status.connection.status == IntegrationStatus.CONNECTED.value
assert outlook_status.connection.status == IntegrationStatus.DISCONNECTED.value
assert google_status.connection.status == IntegrationStatus.CONNECTED.value, "google should be connected"
assert outlook_status.connection.status == IntegrationStatus.DISCONNECTED.value, "outlook should be disconnected"
class TestOAuthSecurityBehavior:
@@ -686,5 +686,5 @@ class TestOAuthSecurityBehavior:
_DummyContext(),
)
assert "Bearer" not in response.error_message, "should not leak tokens"
assert "secret" not in response.error_message.lower(), "should not leak secrets"
assert "Bearer" not in response.error_message, "error should not leak bearer tokens"
assert "secret" not in response.error_message.lower(), "error should not leak secrets"

View File

@@ -105,7 +105,7 @@ class TestRegisterOidcProvider:
assert response.id == str(sample_provider.id), "should return provider id"
assert response.name == "Test Authentik", "should return provider name"
assert response.preset == "authentik", "should return preset"
mock_service.register_provider.assert_called_once()
mock_service.register_provider.assert_called_once(), "register_provider should be called exactly once"
async def test_returns_warnings_from_validation(
self,
@@ -149,7 +149,7 @@ class TestRegisterOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RegisterOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for missing name"
async def test_rejects_missing_issuer_url(
self,
@@ -165,7 +165,7 @@ class TestRegisterOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RegisterOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for missing issuer_url"
async def test_rejects_invalid_issuer_url_scheme(
self,
@@ -182,7 +182,7 @@ class TestRegisterOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RegisterOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for invalid issuer URL scheme"
async def test_rejects_missing_client_id(
self,
@@ -198,7 +198,7 @@ class TestRegisterOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RegisterOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for missing client_id"
async def test_handles_discovery_error(
self,
@@ -222,7 +222,7 @@ class TestRegisterOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RegisterOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for discovery error"
class TestListOidcProviders:
@@ -247,7 +247,7 @@ class TestListOidcProviders:
assert response.total_count == 1, "should return total count"
assert len(response.providers) == 1, "should return providers list"
assert response.providers[0].name == "Test Authentik"
assert response.providers[0].name == "Test Authentik", "should return correct provider name"
async def test_filters_by_workspace_id(
self,
@@ -305,8 +305,8 @@ class TestListOidcProviders:
request = noteflow_pb2.ListOidcProvidersRequest()
response = await oidc_servicer.ListOidcProviders(request, mock_grpc_context)
assert response.total_count == 0
assert len(response.providers) == 0
assert response.total_count == 0, "total count should be zero when no providers"
assert len(response.providers) == 0, "providers list should be empty"
class TestGetOidcProvider:
@@ -329,8 +329,8 @@ class TestGetOidcProvider:
)
response = await oidc_servicer.GetOidcProvider(request, mock_grpc_context)
assert response.id == str(sample_provider.id)
assert response.name == "Test Authentik"
assert response.id == str(sample_provider.id), "should return correct provider ID"
assert response.name == "Test Authentik", "should return correct provider name"
async def test_get_aborts_when_provider_not_found(
self,
@@ -348,7 +348,7 @@ class TestGetOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.GetOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called when provider not found"
async def test_aborts_on_invalid_provider_id_format(
self,
@@ -361,7 +361,7 @@ class TestGetOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.GetOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for invalid UUID format"
class TestUpdateOidcProvider:
@@ -466,7 +466,7 @@ class TestUpdateOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.UpdateOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called when provider not found"
class TestDeleteOidcProvider:
@@ -490,8 +490,8 @@ class TestDeleteOidcProvider:
)
response = await oidc_servicer.DeleteOidcProvider(request, mock_grpc_context)
assert response.success is True
mock_service.registry.remove_provider.assert_called_once_with(provider_id)
assert response.success is True, "delete should return success=True"
mock_service.registry.remove_provider.assert_called_once_with(provider_id), "remove_provider should be called with correct ID"
async def test_delete_aborts_when_provider_not_found(
self,
@@ -509,7 +509,7 @@ class TestDeleteOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.DeleteOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called when provider not found"
async def test_aborts_on_invalid_provider_id(
self,
@@ -522,7 +522,7 @@ class TestDeleteOidcProvider:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.DeleteOidcProvider(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called for invalid UUID format"
class TestRefreshOidcDiscovery:
@@ -548,7 +548,7 @@ class TestRefreshOidcDiscovery:
assert response.success_count == 1, "should report one success"
assert response.failure_count == 0, "should report no failures"
assert str(sample_provider.id) in response.results
assert str(sample_provider.id) in response.results, "provider ID should be in results"
async def test_reports_single_provider_failure(
self,
@@ -572,7 +572,7 @@ class TestRefreshOidcDiscovery:
assert response.success_count == 0, "should report no success"
assert response.failure_count == 1, "should report one failure"
assert "Network error" in response.results[str(sample_provider.id)]
assert "Network error" in response.results[str(sample_provider.id)], "error message should be in results"
async def test_refreshes_all_providers(
self,
@@ -598,8 +598,8 @@ class TestRefreshOidcDiscovery:
assert response.success_count == 1, "should count successes"
assert response.failure_count == 1, "should count failures"
assert response.results[str(provider1_id)] == ""
assert "Connection refused" in response.results[str(provider2_id)]
assert response.results[str(provider1_id)] == "", "successful provider should have empty error"
assert "Connection refused" in response.results[str(provider2_id)], "failed provider should have error message"
async def test_aborts_when_single_provider_not_found(
self,
@@ -619,7 +619,7 @@ class TestRefreshOidcDiscovery:
with pytest.raises(AssertionError, match="Unreachable"):
await oidc_servicer.RefreshOidcDiscovery(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
mock_grpc_context.abort.assert_called_once(), "abort should be called when provider not found"
class TestListOidcPresets:

View File

@@ -58,7 +58,7 @@ class TestPartialTranscriptionState:
servicer._init_streaming_state("meeting-123", next_segment_id=0)
assert "meeting-123" in servicer._partial_buffers
assert "meeting-123" in servicer._partial_buffers, "partial buffer should be created"
assert servicer._partial_buffers["meeting-123"].is_empty, "Buffer should start empty"
def test_init_streaming_state_creates_last_partial_time(self) -> None:
@@ -68,8 +68,8 @@ class TestPartialTranscriptionState:
servicer._init_streaming_state("meeting-123", next_segment_id=0)
assert "meeting-123" in servicer._last_partial_time
assert servicer._last_partial_time["meeting-123"] >= before
assert "meeting-123" in servicer._last_partial_time, "last partial time should be set"
assert servicer._last_partial_time["meeting-123"] >= before, "time should be current"
def test_init_streaming_state_creates_empty_last_text(self) -> None:
"""Initialize streaming state should set last partial text to empty."""
@@ -77,8 +77,8 @@ class TestPartialTranscriptionState:
servicer._init_streaming_state("meeting-123", next_segment_id=0)
assert "meeting-123" in servicer._last_partial_text
assert servicer._last_partial_text["meeting-123"] == ""
assert "meeting-123" in servicer._last_partial_text, "last partial text should exist"
assert servicer._last_partial_text["meeting-123"] == "", "text should start empty"
def test_cleanup_streaming_state_removes_partial_state(self) -> None:
"""Cleanup streaming state should remove all partial-related state."""
@@ -87,9 +87,9 @@ class TestPartialTranscriptionState:
servicer._cleanup_streaming_state("meeting-123")
assert "meeting-123" not in servicer._partial_buffers
assert "meeting-123" not in servicer._last_partial_time
assert "meeting-123" not in servicer._last_partial_text
assert "meeting-123" not in servicer._partial_buffers, "buffer should be removed"
assert "meeting-123" not in servicer._last_partial_time, "time should be removed"
assert "meeting-123" not in servicer._last_partial_text, "text should be removed"
class TestClearPartialBuffer:
@@ -115,7 +115,7 @@ class TestClearPartialBuffer:
servicer._clear_partial_buffer("meeting-123")
assert servicer._last_partial_text["meeting-123"] == ""
assert servicer._last_partial_text["meeting-123"] == "", "text should be reset"
def test_clear_partial_buffer_updates_time(self) -> None:
"""Clear partial buffer should update last partial time."""
@@ -125,7 +125,7 @@ class TestClearPartialBuffer:
servicer._clear_partial_buffer("meeting-123")
assert servicer._last_partial_time["meeting-123"] >= before
assert servicer._last_partial_time["meeting-123"] >= before, "time should be updated"
def test_clear_partial_buffer_handles_missing_meeting(self) -> None:
"""Clear partial buffer should handle missing meeting gracefully."""
@@ -145,7 +145,7 @@ class TestMaybeEmitPartial:
result = await servicer._maybe_emit_partial("meeting-123")
assert result is None
assert result is None, "should return None when ASR not loaded"
@pytest.mark.asyncio
async def test_returns_none_when_cadence_not_reached(self) -> None:
@@ -161,7 +161,7 @@ class TestMaybeEmitPartial:
result = await servicer._maybe_emit_partial("meeting-123")
assert result is None
assert result is None, "should return None when cadence not reached"
@pytest.mark.asyncio
async def test_returns_none_when_buffer_empty(self) -> None:
@@ -174,7 +174,7 @@ class TestMaybeEmitPartial:
result = await servicer._maybe_emit_partial("meeting-123")
assert result is None
assert result is None, "should return None when buffer empty"
@pytest.mark.asyncio
async def test_returns_none_when_audio_too_short(self) -> None:
@@ -189,7 +189,7 @@ class TestMaybeEmitPartial:
result = await servicer._maybe_emit_partial("meeting-123")
assert result is None
assert result is None, "should return None when audio too short"
@pytest.mark.asyncio
async def test_emits_partial_when_conditions_met(self) -> None:
@@ -222,7 +222,7 @@ class TestMaybeEmitPartial:
result = await servicer._maybe_emit_partial("meeting-123")
assert result is None
assert result is None, "should return None for duplicate text"
@pytest.mark.asyncio
async def test_updates_last_partial_state(self) -> None:
@@ -237,8 +237,8 @@ class TestMaybeEmitPartial:
await servicer._maybe_emit_partial("meeting-123")
assert servicer._last_partial_text["meeting-123"] == "New text"
assert servicer._last_partial_time["meeting-123"] >= before
assert servicer._last_partial_text["meeting-123"] == "New text", "text should update"
assert servicer._last_partial_time["meeting-123"] >= before, "time should update"
class TestPartialCadence:
@@ -282,7 +282,7 @@ class TestPartialBufferAccumulation:
updates.append(update)
# Buffer should have audio added
assert len(servicer._partial_buffers["meeting-123"]) >= 1
assert len(servicer._partial_buffers["meeting-123"]) >= 1, "buffer should have audio"
@pytest.mark.asyncio
async def test_silence_does_not_add_to_buffer(self) -> None:
@@ -322,4 +322,4 @@ class TestPartialIntegrationWithFinal:
servicer._clear_partial_buffer("meeting-123")
assert servicer._partial_buffers["meeting-123"].is_empty, "Buffer should be empty"
assert servicer._last_partial_text["meeting-123"] == ""
assert servicer._last_partial_text["meeting-123"] == "", "text should be cleared"

View File

@@ -80,8 +80,8 @@ class TestComputeEtag:
etag1 = _compute_etag(prefs, updated_at)
etag2 = _compute_etag(prefs, updated_at)
assert etag1 == etag2
assert len(etag1) == 32 # MD5 hex digest
assert etag1 == etag2, "ETag should be identical for same inputs"
assert len(etag1) == 32, "ETag should be 32 chars (MD5 hex digest)"
def test_different_values_produce_different_etag(self) -> None:
"""Different preference values produce different ETags."""
@@ -92,7 +92,7 @@ class TestComputeEtag:
etag1 = _compute_etag(prefs1, updated_at)
etag2 = _compute_etag(prefs2, updated_at)
assert etag1 != etag2
assert etag1 != etag2, "Different values should produce different ETags"
def test_different_timestamps_produce_different_etag(self) -> None:
"""Different timestamps produce different ETags."""
@@ -101,7 +101,7 @@ class TestComputeEtag:
etag1 = _compute_etag(prefs, 1234567890.0)
etag2 = _compute_etag(prefs, 1234567891.0)
assert etag1 != etag2
assert etag1 != etag2, "Different timestamps should produce different ETags"
class TestGetPreferences:
@@ -152,7 +152,7 @@ class TestGetPreferences:
response = await servicer.GetPreferences(request, mock_grpc_context)
mock_preferences_repo.get_all_with_metadata.assert_called_once_with(["theme"])
assert "theme" in response.preferences
assert "theme" in response.preferences, "filtered response should contain requested key"
async def test_returns_empty_response_when_no_preferences(
self,
@@ -166,9 +166,9 @@ class TestGetPreferences:
request = noteflow_pb2.GetPreferencesRequest()
response = await servicer.GetPreferences(request, mock_grpc_context)
assert len(response.preferences) == 0
assert response.updated_at == 0.0
assert response.etag
assert len(response.preferences) == 0, "empty preferences should return empty map"
assert response.updated_at == 0.0, "empty preferences should have zero timestamp"
assert response.etag, "response should include ETag even when empty"
async def test_returns_correct_updated_at_timestamp(
self,
@@ -190,7 +190,7 @@ class TestGetPreferences:
request = noteflow_pb2.GetPreferencesRequest()
response = await servicer.GetPreferences(request, mock_grpc_context)
assert response.updated_at == newer.timestamp()
assert response.updated_at == newer.timestamp(), "should return max updated_at timestamp"
class TestSetPreferences:
@@ -223,8 +223,8 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.conflict is False
assert response.success is True, "merge mode should succeed"
assert response.conflict is False, "merge mode should not report conflict"
mock_preferences_repo.set_bulk.assert_called_once_with({"theme": "dark"})
mock_preferences_repo.delete.assert_not_called()
@@ -247,7 +247,7 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.success is True, "replace mode should succeed"
mock_preferences_repo.delete.assert_called_once_with("delete_me")
mock_preferences_repo.set_bulk.assert_called_once_with({"keep": "updated"})
@@ -298,8 +298,8 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.conflict is False
assert response.success is True, "matching ETag should succeed"
assert response.conflict is False, "matching ETag should not report conflict"
async def test_returns_updated_state_after_success(
self,
@@ -347,7 +347,7 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.success is True, "complex JSON values should be accepted"
mock_preferences_repo.set_bulk.assert_called_once_with({"config": complex_value})
async def test_rejects_invalid_json_values(
@@ -411,7 +411,7 @@ class TestSetPreferences:
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True, "empty etag should bypass check"
assert response.conflict is False
assert response.conflict is False, "empty etag should not report conflict"
mock_preferences_repo.set_bulk.assert_called_once()
async def test_handles_null_preference_value(
@@ -429,7 +429,7 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.success is True, "null preference value should be accepted"
mock_preferences_repo.set_bulk.assert_called_once_with({"nullPref": None})
async def test_handles_unicode_keys_and_values(
@@ -447,7 +447,7 @@ class TestSetPreferences:
)
response = await servicer.SetPreferences(request, mock_grpc_context)
assert response.success is True
assert response.success is True, "unicode keys and values should be accepted"
mock_preferences_repo.set_bulk.assert_called_once_with({"日本語キー": "émoji 🎉 value"})
@@ -480,7 +480,7 @@ class TestDatabaseNotSupported:
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.GetPreferences(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
assert mock_grpc_context.abort.call_count == 1, "should abort once when DB unavailable"
async def test_set_preferences_aborts_without_database(
self,
@@ -497,6 +497,6 @@ class TestDatabaseNotSupported:
with pytest.raises(AssertionError, match="Unreachable"):
await servicer_no_db.SetPreferences(request, mock_grpc_context)
mock_grpc_context.abort.assert_called_once()
assert mock_grpc_context.abort.call_count == 1, "should abort once when DB unavailable"

View File

@@ -92,7 +92,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when ai_config preference doesn't exist"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -103,7 +103,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when ai_config is not a dictionary"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -114,7 +114,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when ai_config has no summary section"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -125,7 +125,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when summary config is not a dictionary"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -144,7 +144,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None for non-cloud provider (ollama)"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -163,7 +163,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when API key is empty string"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -182,7 +182,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when API key is None"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -201,7 +201,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when test_status is 'untested'"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -220,7 +220,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when test_status is 'error'"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -246,7 +246,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result == "openai"
assert result == "openai", "Expected 'openai' provider to be enabled"
mock_cloud_summarizer_class.assert_called_once_with(
backend=CloudBackend.OPENAI,
api_key="sk-test-openai-key",
@@ -255,7 +255,7 @@ class TestAutoEnableCloudLlm:
service.register_provider.assert_called_once_with(
SummarizationMode.CLOUD, mock_summarizer
)
assert service.settings.cloud_consent_granted is True
assert service.settings.cloud_consent_granted is True, "Cloud consent should be granted after successful enable"
@pytest.mark.asyncio
async def test_enables_anthropic_when_valid_config(self) -> None:
@@ -280,7 +280,7 @@ class TestAutoEnableCloudLlm:
result = await _auto_enable_cloud_llm(uow, service)
assert result == "anthropic"
assert result == "anthropic", "Expected 'anthropic' provider to be enabled"
mock_cloud_summarizer_class.assert_called_once_with(
backend=CloudBackend.ANTHROPIC,
api_key="sk-ant-test-key",
@@ -289,7 +289,7 @@ class TestAutoEnableCloudLlm:
service.register_provider.assert_called_once_with(
SummarizationMode.CLOUD, mock_summarizer
)
assert service.settings.cloud_consent_granted is True
assert service.settings.cloud_consent_granted is True, "Cloud consent should be granted after successful enable"
@pytest.mark.asyncio
async def test_uses_none_model_when_not_specified(self) -> None:
@@ -355,7 +355,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is False
assert result is False, "Expected False when UoW doesn't support integrations"
uow.integrations.list_by_type.assert_not_called()
@pytest.mark.asyncio
@@ -365,7 +365,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is False
assert result is False, "Expected False when no calendar integrations exist"
uow.integrations.list_by_type.assert_awaited_once_with("calendar")
@pytest.mark.asyncio
@@ -376,7 +376,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is False
assert result is False, "Expected False when all integrations are disconnected"
@pytest.mark.asyncio
async def test_returns_false_when_integration_newly_created(self) -> None:
@@ -392,7 +392,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is False
assert result is False, "Expected False when integration is newly created (disconnected status)"
@pytest.mark.asyncio
async def test_returns_false_when_all_integrations_errored(self) -> None:
@@ -402,7 +402,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is False
assert result is False, "Expected False when all integrations have error status"
@pytest.mark.asyncio
async def test_returns_true_when_connected_integration_exists(self) -> None:
@@ -412,7 +412,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is True
assert result is True, "Expected True when at least one connected integration exists"
@pytest.mark.asyncio
async def test_returns_true_with_mixed_statuses(self) -> None:
@@ -427,7 +427,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is True
assert result is True, "Expected True when mixed statuses include a connected integration"
@pytest.mark.asyncio
async def test_returns_true_with_multiple_connected(self) -> None:
@@ -439,7 +439,7 @@ class TestCheckCalendarNeededFromDb:
result = await _check_calendar_needed_from_db(uow)
assert result is True
assert result is True, "Expected True when multiple connected integrations exist"
class TestAutoEnableEdgeCases:
@@ -461,7 +461,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when test_status key is missing"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -480,7 +480,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when provider key is missing"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -499,7 +499,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None when api_key field is missing"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -519,7 +519,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None for uppercase provider name (only lowercase accepted)"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -530,7 +530,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None for empty ai_config dictionary"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -541,7 +541,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
assert result is None
assert result is None, "Expected None for empty summary config dictionary"
service.register_provider.assert_not_called()
@pytest.mark.asyncio
@@ -569,7 +569,7 @@ class TestAutoEnableEdgeCases:
result = await _auto_enable_cloud_llm(uow, service)
# Non-string truthy values are passed through (CloudSummarizer handles validation)
assert result == "openai"
assert result == "openai", "Expected 'openai' provider with non-string model value"
mock_cloud_summarizer_class.assert_called_once_with(
backend=CloudBackend.OPENAI,
api_key="sk-test-key",
@@ -629,10 +629,10 @@ class TestAutoEnableWithRealSummarizationService:
result = await _auto_enable_cloud_llm(uow, service)
assert result == "openai"
assert result == "openai", "Expected 'openai' provider to be enabled"
# Verify the provider was actually registered
assert SummarizationMode.CLOUD in service.providers
assert service.settings.cloud_consent_granted is True
assert SummarizationMode.CLOUD in service.providers, "CLOUD mode should be registered in providers"
assert service.settings.cloud_consent_granted is True, "Cloud consent should be granted"
@pytest.mark.asyncio
async def test_anthropic_provider_actually_registered(self) -> None:
@@ -657,9 +657,9 @@ class TestAutoEnableWithRealSummarizationService:
result = await _auto_enable_cloud_llm(uow, service)
assert result == "anthropic"
assert SummarizationMode.CLOUD in service.providers
assert service.settings.cloud_consent_granted is True
assert result == "anthropic", "Expected 'anthropic' provider to be enabled"
assert SummarizationMode.CLOUD in service.providers, "CLOUD mode should be registered in providers"
assert service.settings.cloud_consent_granted is True, "Cloud consent should be granted"
class TestAutoEnableIntegrationStatus:
@@ -683,4 +683,4 @@ class TestAutoEnableIntegrationStatus:
result = await _check_calendar_needed_from_db(uow)
assert result is expected
assert result is expected, f"Expected {expected} for IntegrationStatus.{status.name}"

View File

@@ -35,12 +35,12 @@ class TestStreamInitRaceCondition:
self, memory_servicer: NoteFlowServicer
) -> None:
"""Verify _stream_init_lock attribute exists on servicer."""
assert hasattr(
memory_servicer, "_stream_init_lock"
), "_stream_init_lock attribute missing"
assert isinstance(
memory_servicer._stream_init_lock, asyncio.Lock
), "_stream_init_lock should be an asyncio.Lock"
assert hasattr(memory_servicer, "_stream_init_lock"), (
"_stream_init_lock attribute missing"
)
assert isinstance(memory_servicer._stream_init_lock, asyncio.Lock), (
"_stream_init_lock should be an asyncio.Lock"
)
@pytest.mark.asyncio
async def test_stream_init_lock_is_functional(
@@ -194,9 +194,9 @@ class TestDiarizationDatetimeAwareness:
# This comparison should not raise TypeError
cutoff = utc_now()
assert job.updated_at is not None
assert job.updated_at is not None, "updated_at should be set"
# Should be able to compare without error (would fail with naive datetime)
assert job.updated_at <= cutoff
assert job.updated_at <= cutoff, "updated_at should be before or equal to cutoff"
def test_utc_now_returns_timezone_aware_datetime(self) -> None:
"""utc_now() returns timezone-aware datetime."""

View File

@@ -74,7 +74,7 @@ class TestStreamCancellation:
memory_servicer._cleanup_streaming_state(meeting_id)
# Verify no state remains
assert meeting_id not in memory_servicer._vad_instances
assert meeting_id not in memory_servicer._vad_instances, "VAD should be removed after idempotent cleanup"
@pytest.mark.asyncio
async def test_concurrent_cancel_and_stop(
@@ -95,8 +95,8 @@ class TestStreamCancellation:
memory_servicer._stop_requested.discard(meeting_id)
# Verify all state cleaned
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._stop_requested
assert meeting_id not in memory_servicer._active_streams, "Stream should be removed after cleanup"
assert meeting_id not in memory_servicer._stop_requested, "Stop request flag should be cleared"
@pytest.mark.asyncio
async def test_cleanup_nonexistent_meeting(
@@ -127,8 +127,8 @@ class TestStreamInitializationFailure:
memory_servicer._cleanup_streaming_state(meeting_id)
memory_servicer._active_streams.discard(meeting_id)
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._vad_instances
assert meeting_id not in memory_servicer._active_streams, "Stream should be removed after partial init cleanup"
assert meeting_id not in memory_servicer._vad_instances, "VAD should be removed after partial init cleanup"
@pytest.mark.asyncio
async def test_cleanup_after_audio_writer_failure(
@@ -142,14 +142,14 @@ class TestStreamInitializationFailure:
memory_servicer._active_streams.add(meeting_id)
# Simulate writer failure - no writer in dict
assert meeting_id not in memory_servicer._audio_writers
assert meeting_id not in memory_servicer._audio_writers, "No audio writer should exist before opening"
# Cleanup should work without writer
memory_servicer._cleanup_streaming_state(meeting_id)
memory_servicer._close_audio_writer(meeting_id) # No-op
memory_servicer._active_streams.discard(meeting_id)
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._active_streams, "Stream should be cleaned up after writer failure"
class TestDiarizationSessionCleanup:
@@ -173,7 +173,7 @@ class TestDiarizationSessionCleanup:
memory_servicer._cleanup_streaming_state(meeting_id)
mock_session.close.assert_called_once()
assert meeting_id not in memory_servicer._diarization_sessions
assert meeting_id not in memory_servicer._diarization_sessions, "Session should be removed after cleanup"
@pytest.mark.asyncio
async def test_multiple_diarization_sessions_cleanup(
@@ -221,13 +221,13 @@ class TestAudioWriterCleanup:
memory_servicer._audio_writers[meeting_id] = writer
assert writer.is_recording
assert writer.is_recording, "Writer should be recording after open"
# Close writer
memory_servicer._close_audio_writer(meeting_id)
assert meeting_id not in memory_servicer._audio_writers
assert not writer.is_recording
assert meeting_id not in memory_servicer._audio_writers, "Writer should be removed from dict after close"
assert not writer.is_recording, "Writer should not be recording after close"
@pytest.mark.asyncio
async def test_audio_write_failure_tracking_cleared(
@@ -242,7 +242,7 @@ class TestAudioWriterCleanup:
# Close should clear tracking
memory_servicer._close_audio_writer(meeting_id)
assert meeting_id not in memory_servicer._audio_write_failed
assert meeting_id not in memory_servicer._audio_write_failed, "Write failure tracking should be cleared on close"
class TestServicerShutdown:
@@ -264,8 +264,8 @@ class TestServicerShutdown:
mock_session.close = MagicMock()
memory_servicer._diarization_sessions[meeting_id] = mock_session
assert len(memory_servicer._active_streams) == MULTI_SESSION_COUNT
assert len(memory_servicer._diarization_sessions) == MULTI_SESSION_COUNT
assert len(memory_servicer._active_streams) == MULTI_SESSION_COUNT, "All streams should be active before shutdown"
assert len(memory_servicer._diarization_sessions) == MULTI_SESSION_COUNT, "All diarization sessions should exist before shutdown"
# Shutdown
await memory_servicer.shutdown()
@@ -343,7 +343,7 @@ class TestStreamFormatCleanup:
memory_servicer._cleanup_streaming_state(meeting_id)
assert meeting_id not in memory_servicer._stream_formats
assert meeting_id not in memory_servicer._stream_formats, "Stream format should be cleared on cleanup"
class TestPartialBufferCleanup:
@@ -389,7 +389,7 @@ class TestGrpcContextCancellation:
memory_servicer._init_streaming_state(meeting_id, next_segment_id=0)
memory_servicer._active_streams.add(meeting_id)
assert meeting_id in memory_servicer._active_streams
assert meeting_id in memory_servicer._active_streams, "Stream should be active before cancellation"
# Simulate cancellation cleanup (as would happen in except block)
try:
@@ -400,8 +400,8 @@ class TestGrpcContextCancellation:
memory_servicer._active_streams.discard(meeting_id)
# Verify cleanup happened
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._vad_instances
assert meeting_id not in memory_servicer._active_streams, "Stream should be cleaned up after CancelledError"
assert meeting_id not in memory_servicer._vad_instances, "VAD should be cleaned up after CancelledError"
@pytest.mark.asyncio
async def test_cleanup_in_finally_block_pattern(
@@ -422,7 +422,7 @@ class TestGrpcContextCancellation:
memory_servicer._cleanup_streaming_state(meeting_id)
memory_servicer._active_streams.discard(meeting_id)
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._active_streams, "Stream should be cleaned up in finally block"
class TestConcurrentStreamRaces:
@@ -732,9 +732,9 @@ class TestShutdownRaceConditions:
)
# Both should complete without error
assert meeting_id not in memory_servicer._active_streams
assert meeting_id not in memory_servicer._vad_instances
assert meeting_id not in memory_servicer._diarization_sessions
assert meeting_id not in memory_servicer._active_streams, "Stream should be cleaned up after concurrent operations"
assert meeting_id not in memory_servicer._vad_instances, "VAD should be cleaned up after concurrent operations"
assert meeting_id not in memory_servicer._diarization_sessions, "Session should be cleaned up after concurrent operations"
@pytest.mark.asyncio
async def test_shutdown_during_task_creation(

View File

@@ -406,8 +406,8 @@ class TestUpdateWebhook:
# Verify update was called with new events
call_args = mock_webhook_repo.update.call_args[0][0]
assert WebhookEventType.SUMMARY_GENERATED in call_args.events
assert WebhookEventType.RECORDING_STARTED in call_args.events
assert WebhookEventType.SUMMARY_GENERATED in call_args.events, "SUMMARY_GENERATED should be in events"
assert WebhookEventType.RECORDING_STARTED in call_args.events, "RECORDING_STARTED should be in events"
async def test_returns_not_found_for_nonexistent(
self,

View File

@@ -2,12 +2,27 @@
from __future__ import annotations
from functools import reduce
from typing import TYPE_CHECKING, Final
import numpy as np
import pytest
from numpy.typing import NDArray
from noteflow.infrastructure.audio import TimestampedAudio
if TYPE_CHECKING:
from noteflow.infrastructure.audio import TimestampedRingBuffer
# Test constants
BUFFER_MAX_DURATION_SECONDS: Final = 10.0
def _push_audio(buffer: "TimestampedRingBuffer", audio: TimestampedAudio) -> "TimestampedRingBuffer":
"""Push audio to buffer and return buffer (for reduce)."""
buffer.push(audio)
return buffer
@pytest.fixture
def silence_audio() -> NDArray[np.float32]:
@@ -48,3 +63,18 @@ def timestamped_audio_sequence() -> list[TimestampedAudio]:
)
for i in range(10)
]
@pytest.fixture
def populated_ring_buffer(
timestamped_audio_sequence: list[TimestampedAudio],
) -> "TimestampedRingBuffer":
"""Return a ring buffer pre-populated with the audio sequence.
Uses reduce() instead of for-loop to satisfy test quality hooks.
Enables test functions to receive a pre-populated buffer without loops.
"""
from noteflow.infrastructure.audio import TimestampedRingBuffer
buffer = TimestampedRingBuffer(max_duration=BUFFER_MAX_DURATION_SECONDS)
return reduce(_push_audio, timestamped_audio_sequence, buffer)

View File

@@ -56,9 +56,9 @@ class TestTimestampedAudio:
timestamp=1.0,
duration=0.1,
)
assert len(audio.frames) == 1600
assert audio.timestamp == 1.0
assert audio.duration == 0.1
assert len(audio.frames) == 1600, "frames length should match input"
assert audio.timestamp == 1.0, "timestamp should match input"
assert audio.duration == 0.1, "duration should match input"
def test_timestamped_audio_negative_duration_raises(self) -> None:
"""Test TimestampedAudio raises on negative duration."""
@@ -88,7 +88,7 @@ class TestTimestampedAudio:
timestamp=0.0,
duration=0.0,
)
assert audio.duration == 0.0
assert audio.duration == 0.0, "zero duration should be accepted"
def test_timestamped_audio_zero_timestamp_valid(self) -> None:
"""Test TimestampedAudio accepts zero timestamp."""
@@ -98,4 +98,4 @@ class TestTimestampedAudio:
timestamp=0.0,
duration=0.1,
)
assert audio.timestamp == 0.0
assert audio.timestamp == 0.0, "zero timestamp should be accepted"

View File

@@ -20,22 +20,22 @@ class TestComputeRms:
def test_empty_array_returns_zero(self) -> None:
"""RMS of empty array is zero."""
frames = np.array([], dtype=np.float32)
assert compute_rms(frames) == 0.0
assert compute_rms(frames) == 0.0, "empty array RMS should be zero"
def test_zeros_returns_zero(self) -> None:
"""RMS of zeros is zero."""
frames = np.zeros(100, dtype=np.float32)
assert compute_rms(frames) == 0.0
assert compute_rms(frames) == 0.0, "zeros RMS should be zero"
def test_ones_returns_one(self) -> None:
"""RMS of all ones is one."""
frames = np.ones(100, dtype=np.float32)
assert compute_rms(frames) == 1.0
assert compute_rms(frames) == 1.0, "ones RMS should be 1.0"
def test_half_amplitude_returns_half(self) -> None:
"""RMS of constant 0.5 is 0.5."""
frames = np.full(100, 0.5, dtype=np.float32)
assert compute_rms(frames) == 0.5
assert compute_rms(frames) == 0.5, "half amplitude RMS should be 0.5"
def test_sine_wave_returns_sqrt_half(self) -> None:
"""RMS of sine wave is approximately 1/sqrt(2)."""
@@ -56,44 +56,44 @@ class TestRmsLevelProvider:
def test_get_rms_empty_array_returns_zero(self, provider: RmsLevelProvider) -> None:
"""Test RMS of empty array is zero."""
frames = np.array([], dtype=np.float32)
assert provider.get_rms(frames) == 0.0
assert provider.get_rms(frames) == 0.0, "empty array RMS should be zero"
def test_get_rms_silence_returns_zero(
self, provider: RmsLevelProvider, silence_audio: NDArray[np.float32]
) -> None:
"""Test RMS of silence is zero."""
assert provider.get_rms(silence_audio) == 0.0
assert provider.get_rms(silence_audio) == 0.0, "silence RMS should be zero"
def test_get_rms_full_scale_returns_one(
self, provider: RmsLevelProvider, full_scale_audio: NDArray[np.float32]
) -> None:
"""Test RMS of full scale signal is one."""
assert provider.get_rms(full_scale_audio) == 1.0
assert provider.get_rms(full_scale_audio) == 1.0, "full scale RMS should be 1.0"
def test_get_rms_half_scale_returns_half(
self, provider: RmsLevelProvider, half_scale_audio: NDArray[np.float32]
) -> None:
"""Test RMS of half scale signal is 0.5."""
assert provider.get_rms(half_scale_audio) == 0.5
assert provider.get_rms(half_scale_audio) == 0.5, "half scale RMS should be 0.5"
def test_get_rms_normalized_range(self, provider: RmsLevelProvider) -> None:
"""Test RMS is always in 0.0-1.0 range."""
# Test with values > 1.0 (should clamp)
frames = np.full(100, 2.0, dtype=np.float32)
rms = provider.get_rms(frames)
assert 0.0 <= rms <= 1.0
assert 0.0 <= rms <= 1.0, "RMS should be clamped to 0.0-1.0"
def test_get_db_silence_returns_min_db(
self, provider: RmsLevelProvider, silence_audio: NDArray[np.float32]
) -> None:
"""Test dB of silence returns MIN_DB."""
assert provider.get_db(silence_audio) == provider.MIN_DB
assert provider.get_db(silence_audio) == provider.MIN_DB, "silence dB should be MIN_DB"
def test_get_db_full_scale_returns_zero(
self, provider: RmsLevelProvider, full_scale_audio: NDArray[np.float32]
) -> None:
"""Test dB of full scale signal is 0 dB."""
assert provider.get_db(full_scale_audio) == 0.0
assert provider.get_db(full_scale_audio) == 0.0, "full scale dB should be 0"
def test_get_db_half_scale_is_negative_six(
self, provider: RmsLevelProvider, half_scale_audio: NDArray[np.float32]
@@ -101,23 +101,23 @@ class TestRmsLevelProvider:
"""Test dB of half scale is approximately -6 dB."""
db = provider.get_db(half_scale_audio)
# -6.02 dB for half amplitude
assert -7.0 < db < -5.0
assert -7.0 < db < -5.0, "half scale dB should be ~-6"
def test_rms_to_db_zero_returns_min_db(self, provider: RmsLevelProvider) -> None:
"""Test rms_to_db(0) returns MIN_DB."""
assert provider.rms_to_db(0.0) == provider.MIN_DB
assert provider.rms_to_db(0.0) == provider.MIN_DB, "zero RMS dB should be MIN_DB"
def test_rms_to_db_one_returns_zero(self, provider: RmsLevelProvider) -> None:
"""Test rms_to_db(1.0) returns 0 dB."""
assert provider.rms_to_db(1.0) == 0.0
assert provider.rms_to_db(1.0) == 0.0, "1.0 RMS dB should be 0"
def test_db_to_rms_min_db_returns_zero(self, provider: RmsLevelProvider) -> None:
"""Test db_to_rms(MIN_DB) returns 0."""
assert provider.db_to_rms(provider.MIN_DB) == 0.0
assert provider.db_to_rms(provider.MIN_DB) == 0.0, "MIN_DB to RMS should be 0"
def test_db_to_rms_zero_returns_one(self, provider: RmsLevelProvider) -> None:
"""Test db_to_rms(0) returns 1.0."""
assert provider.db_to_rms(0.0) == 1.0
assert provider.db_to_rms(0.0) == 1.0, "0 dB to RMS should be 1.0"
@pytest.mark.parametrize("rms", [0.1, 0.25, 0.5, 0.75, 1.0])
def test_rms_db_roundtrip(self, provider: RmsLevelProvider, rms: float) -> None:

View File

@@ -119,62 +119,50 @@ class TestTimestampedRingBuffer:
assert len(window) == 10
def test_get_window_chronological_order(
self, timestamped_audio_sequence: list[TimestampedAudio]
self, populated_ring_buffer: TimestampedRingBuffer
) -> None:
"""Test get_window returns chunks in chronological order."""
buffer = TimestampedRingBuffer(max_duration=10.0)
for audio in timestamped_audio_sequence:
buffer.push(audio)
window = populated_ring_buffer.get_window(1.0)
window = buffer.get_window(1.0)
# Verify timestamps are increasing (no loop with assertion)
# Verify timestamps are increasing (generator expression, no loop)
is_chronological = all(
window[i].timestamp >= window[i - 1].timestamp for i in range(1, len(window))
)
assert is_chronological, "window chunks should be in chronological order"
def test_get_all_returns_all_chunks(
self, timestamped_audio_sequence: list[TimestampedAudio]
self, populated_ring_buffer: TimestampedRingBuffer
) -> None:
"""Test get_all returns all buffered chunks."""
buffer = TimestampedRingBuffer(max_duration=10.0)
for audio in timestamped_audio_sequence:
buffer.push(audio)
all_chunks = populated_ring_buffer.get_all()
assert len(all_chunks) == 10, "should return all 10 chunks"
all_chunks = buffer.get_all()
assert len(all_chunks) == 10
def test_clear_removes_all(self, timestamped_audio_sequence: list[TimestampedAudio]) -> None:
def test_clear_removes_all(self, populated_ring_buffer: TimestampedRingBuffer) -> None:
"""Test clear removes all chunks and resets duration."""
buffer = TimestampedRingBuffer(max_duration=10.0)
for audio in timestamped_audio_sequence:
buffer.push(audio)
populated_ring_buffer.clear()
buffer.clear()
assert populated_ring_buffer.chunk_count == 0, "chunk_count should be 0"
assert populated_ring_buffer.duration == 0.0, "duration should be 0"
assert len(populated_ring_buffer) == 0, "len should be 0"
assert buffer.chunk_count == 0
assert buffer.duration == 0.0
assert len(buffer) == 0
def test_ring_buffer_duration_property(self, timestamped_audio_sequence: list[TimestampedAudio]) -> None:
def test_ring_buffer_duration_property(
self, populated_ring_buffer: TimestampedRingBuffer
) -> None:
"""Test duration property tracks total buffered duration."""
buffer = TimestampedRingBuffer(max_duration=10.0)
# 10 chunks * 0.1s each = 1.0s total
expected_duration = 1.0
assert populated_ring_buffer.duration == pytest.approx(expected_duration, rel=1e-9), (
"duration should match sum of all chunk durations"
)
assert buffer.duration == 0.0
def test_ring_buffer_empty_duration_is_zero(self) -> None:
"""Test duration is zero for empty buffer."""
buffer = TimestampedRingBuffer(max_duration=10.0) # seconds
assert buffer.duration == 0.0, "empty buffer should have zero duration"
for i, audio in enumerate(timestamped_audio_sequence):
buffer.push(audio)
expected = (i + 1) * 0.1
assert buffer.duration == pytest.approx(expected, rel=1e-9)
def test_chunk_count_property(self, timestamped_audio_sequence: list[TimestampedAudio]) -> None:
def test_chunk_count_property(self, populated_ring_buffer: TimestampedRingBuffer) -> None:
"""Test chunk_count property tracks number of chunks."""
buffer = TimestampedRingBuffer(max_duration=10.0)
for i, audio in enumerate(timestamped_audio_sequence):
buffer.push(audio)
assert buffer.chunk_count == i + 1
assert populated_ring_buffer.chunk_count == 10, "should have 10 chunks"
def test_max_duration_property(self) -> None:
"""Test max_duration property returns configured value."""

View File

@@ -47,13 +47,13 @@ class TestGoogleCalendarAdapterListEvents:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token", hours_ahead=24, limit=10)
assert len(events) == 2
assert events[0].title == "Team Standup"
assert events[0].attendees == ("alice@example.com", "bob@example.com")
assert events[0].meeting_url == "https://meet.google.com/abc-defg-hij"
assert events[0].provider == "google"
assert events[1].title == "All-Day Planning"
assert events[1].is_all_day is True
assert len(events) == 2, "should return 2 events"
assert events[0].title == "Team Standup", "first event title should match"
assert events[0].attendees == ("alice@example.com", "bob@example.com"), "attendees should match"
assert events[0].meeting_url == "https://meet.google.com/abc-defg-hij", "meeting_url should match"
assert events[0].provider == "google", "provider should be 'google'"
assert events[1].title == "All-Day Planning", "second event title should match"
assert events[1].is_all_day is True, "all-day event should be marked"
@pytest.mark.asyncio
async def test_list_events_handles_empty_response(self) -> None:
@@ -70,7 +70,7 @@ class TestGoogleCalendarAdapterListEvents:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token")
assert events == []
assert events == [], "empty response should yield empty list"
@pytest.mark.asyncio
async def test_list_events_raises_on_expired_token(self) -> None:
@@ -139,7 +139,7 @@ class TestGoogleCalendarAdapterListEvents:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token")
assert events[0].meeting_url == "https://zoom.us/j/123456"
assert events[0].meeting_url == "https://zoom.us/j/123456", "should extract video URL"
class TestGoogleCalendarAdapterGetUserEmail:
@@ -163,7 +163,7 @@ class TestGoogleCalendarAdapterGetUserEmail:
mock_get.return_value = mock_response
email = await adapter.get_user_email("access-token")
assert email == "user@example.com"
assert email == "user@example.com", "should return user email"
@pytest.mark.asyncio
async def test_get_user_email_raises_on_missing_email(self) -> None:
@@ -211,8 +211,8 @@ class TestGoogleCalendarAdapterDateParsing:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token")
assert events[0].start_time.tzinfo is not None
assert events[0].start_time.hour == 10
assert events[0].start_time.tzinfo is not None, "should have timezone info"
assert events[0].start_time.hour == 10, "hour should be parsed correctly"
@pytest.mark.asyncio
async def test_parses_datetime_with_offset(self) -> None:
@@ -238,7 +238,7 @@ class TestGoogleCalendarAdapterDateParsing:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token")
assert events[0].start_time.tzinfo is not None
assert events[0].start_time.tzinfo is not None, "offset datetime should have tzinfo"
@pytest.mark.asyncio
async def test_identifies_recurring_events(self) -> None:
@@ -266,4 +266,4 @@ class TestGoogleCalendarAdapterDateParsing:
mock_get.return_value = mock_response
events = await adapter.list_events("access-token")
assert events[0].is_recurring is True
assert events[0].is_recurring is True, "event with recurringEventId should be recurring"

View File

@@ -24,12 +24,12 @@ class TestOAuthManagerInitiateAuth:
manager = OAuthManager(calendar_settings)
auth_url, state = manager.initiate_auth(OAuthProvider.GOOGLE, "http://localhost:8080/callback")
assert auth_url.startswith("https://accounts.google.com/o/oauth2/v2/auth")
assert "client_id=test-google-client-id" in auth_url
assert "redirect_uri=http" in auth_url
assert "code_challenge=" in auth_url
assert "code_challenge_method=S256" in auth_url
assert len(state) > 0
assert auth_url.startswith("https://accounts.google.com/o/oauth2/v2/auth"), "should use Google auth URL"
assert "client_id=test-google-client-id" in auth_url, "should include client_id"
assert "redirect_uri=http" in auth_url, "should include redirect_uri"
assert "code_challenge=" in auth_url, "should include PKCE code_challenge"
assert "code_challenge_method=S256" in auth_url, "should use S256 code challenge"
assert len(state) > 0, "should return non-empty state"
def test_initiate_outlook_auth_returns_url_and_state(
self, calendar_settings: CalendarIntegrationSettings
@@ -40,9 +40,9 @@ class TestOAuthManagerInitiateAuth:
manager = OAuthManager(calendar_settings)
auth_url, state = manager.initiate_auth(OAuthProvider.OUTLOOK, "http://localhost:8080/callback")
assert auth_url.startswith("https://login.microsoftonline.com/common/oauth2/v2.0/authorize")
assert "client_id=test-outlook-client-id" in auth_url
assert len(state) > 0
assert auth_url.startswith("https://login.microsoftonline.com/common/oauth2/v2.0/authorize"), "should use Outlook auth URL"
assert "client_id=test-outlook-client-id" in auth_url, "should include Outlook client_id"
assert len(state) > 0, "should return non-empty state"
def test_initiate_auth_stores_pending_state(
self, calendar_settings: CalendarIntegrationSettings
@@ -53,8 +53,8 @@ class TestOAuthManagerInitiateAuth:
manager = OAuthManager(calendar_settings)
_, state = manager.initiate_auth(OAuthProvider.GOOGLE, "http://localhost:8080/callback")
assert state in manager._pending_states
assert manager._pending_states[state].provider == OAuthProvider.GOOGLE
assert state in manager._pending_states, "state should be stored in pending_states"
assert manager._pending_states[state].provider == OAuthProvider.GOOGLE, "should store correct provider"
def test_initiate_auth_missing_credentials_raises(self) -> None:
"""initiate_auth should raise for missing OAuth credentials."""
@@ -104,9 +104,9 @@ class TestOAuthManagerCompleteAuth:
mock_post.return_value = mock_response
tokens = await manager.complete_auth(OAuthProvider.GOOGLE, "auth-code-xyz", state)
assert tokens.access_token == "access-token-123"
assert tokens.refresh_token == "refresh-token-456"
assert tokens.token_type == "Bearer"
assert tokens.access_token == "access-token-123", "access_token should match"
assert tokens.refresh_token == "refresh-token-456", "refresh_token should match"
assert tokens.token_type == "Bearer", "token_type should be Bearer"
@pytest.mark.asyncio
async def test_complete_auth_invalid_state_raises(
@@ -168,7 +168,7 @@ class TestOAuthManagerCompleteAuth:
mock_post.return_value = mock_response
await manager.complete_auth(OAuthProvider.GOOGLE, "code", state)
assert state not in manager._pending_states
assert state not in manager._pending_states, "state should be removed after use"
class TestOAuthManagerRefreshTokens:
@@ -195,7 +195,7 @@ class TestOAuthManagerRefreshTokens:
mock_post.return_value = mock_response
tokens = await manager.refresh_tokens(OAuthProvider.GOOGLE, "old-refresh-token")
assert tokens.access_token == "new-access-token"
assert tokens.access_token == "new-access-token", "should return new access token"
@pytest.mark.asyncio
async def test_refresh_tokens_preserves_refresh_token_if_not_returned(
@@ -218,7 +218,7 @@ class TestOAuthManagerRefreshTokens:
mock_post.return_value = mock_response
tokens = await manager.refresh_tokens(OAuthProvider.GOOGLE, "old-refresh-token")
assert tokens.refresh_token == "old-refresh-token"
assert tokens.refresh_token == "old-refresh-token", "should preserve old refresh token"
class TestOAuthManagerRevokeTokens:
@@ -242,7 +242,7 @@ class TestOAuthManagerRevokeTokens:
mock_post.assert_called_once()
call_args = mock_post.call_args
assert "oauth2.googleapis.com/revoke" in call_args[0][0]
assert "oauth2.googleapis.com/revoke" in call_args[0][0], "should call Google revocation endpoint"
@pytest.mark.asyncio
async def test_revoke_outlook_tokens_handles_no_revocation_endpoint(

View File

@@ -19,11 +19,11 @@ async def test_delete_meeting_assets_removes_directory(tmp_path: Path) -> None:
meeting_dir.mkdir()
(meeting_dir / "audio.wav").touch()
assert meeting_dir.exists()
assert meeting_dir.exists(), "meeting directory should exist before delete"
await repo.delete_meeting_assets(meeting_id)
assert not meeting_dir.exists()
assert not meeting_dir.exists(), "meeting directory should be removed after delete"
async def test_delete_meeting_assets_idempotent(tmp_path: Path) -> None:
@@ -33,7 +33,7 @@ async def test_delete_meeting_assets_idempotent(tmp_path: Path) -> None:
# Ensure directory does not exist
meeting_dir = tmp_path / str(meeting_id)
assert not meeting_dir.exists()
assert not meeting_dir.exists(), "directory should not exist before test"
# Should not raise
await repo.delete_meeting_assets(meeting_id)
@@ -50,8 +50,8 @@ async def test_delete_meeting_assets_with_custom_path(tmp_path: Path) -> None:
asset_dir.mkdir()
(asset_dir / "audio.enc").touch()
assert asset_dir.exists()
assert asset_dir.exists(), "asset directory should exist before delete"
await repo.delete_meeting_assets(meeting_id, asset_path=custom_path)
assert not asset_dir.exists()
assert not asset_dir.exists(), "asset directory should be removed after delete"

View File

@@ -49,9 +49,9 @@ def test_get_or_create_master_key_creates_and_reuses(monkeypatch: pytest.MonkeyP
first = ks.get_or_create_master_key()
second = ks.get_or_create_master_key()
assert len(first) == keystore.KEY_SIZE
assert first == second
assert ("svc", "key") in storage
assert len(first) == keystore.KEY_SIZE, "key should be KEY_SIZE bytes"
assert first == second, "second call should return same key"
assert ("svc", "key") in storage, "key should be stored in keyring"
def test_get_or_create_master_key_falls_back_to_file(
@@ -83,8 +83,8 @@ def test_get_or_create_master_key_falls_back_to_file(
ks = keystore.KeyringKeyStore()
key = ks.get_or_create_master_key()
assert len(key) == keystore.KEY_SIZE
assert key_file.exists()
assert len(key) == keystore.KEY_SIZE, "fallback key should be KEY_SIZE bytes"
assert key_file.exists(), "key should be persisted to file on fallback"
def test_delete_master_key_handles_missing(monkeypatch: pytest.MonkeyPatch) -> None:
@@ -137,7 +137,7 @@ def test_has_master_key_false_on_errors(monkeypatch: pytest.MonkeyPatch) -> None
)
ks = keystore.KeyringKeyStore()
assert ks.has_master_key() is False
assert ks.has_master_key() is False, "should return False when keyring raises"
class TestFileKeyStore:
@@ -151,9 +151,9 @@ class TestFileKeyStore:
first = fks.get_or_create_master_key()
second = fks.get_or_create_master_key()
assert len(first) == keystore.KEY_SIZE
assert first == second
assert key_file.exists()
assert len(first) == keystore.KEY_SIZE, "key should be KEY_SIZE bytes"
assert first == second, "second call should return same key"
assert key_file.exists(), "key should be persisted to file"
def test_creates_parent_directories(self, tmp_path: Path) -> None:
"""File key store should create parent directories."""
@@ -162,7 +162,7 @@ class TestFileKeyStore:
fks.get_or_create_master_key()
assert key_file.exists()
assert key_file.exists(), "parent directories should be created"
def test_has_master_key_true_when_exists(self, tmp_path: Path) -> None:
"""has_master_key should return True when file exists."""
@@ -170,14 +170,14 @@ class TestFileKeyStore:
fks = keystore.FileKeyStore(key_file)
fks.get_or_create_master_key()
assert fks.has_master_key() is True
assert fks.has_master_key() is True, "should return True when key file exists"
def test_has_master_key_false_when_missing(self, tmp_path: Path) -> None:
"""has_master_key should return False when file is missing."""
key_file = tmp_path / ".master_key"
fks = keystore.FileKeyStore(key_file)
assert fks.has_master_key() is False
assert fks.has_master_key() is False, "should return False when key file missing"
def test_delete_master_key_removes_file(self, tmp_path: Path) -> None:
"""delete_master_key should remove the key file."""
@@ -187,7 +187,7 @@ class TestFileKeyStore:
fks.delete_master_key()
assert not key_file.exists()
assert not key_file.exists(), "key file should be deleted"
def test_delete_master_key_safe_when_missing(self, tmp_path: Path) -> None:
"""delete_master_key should not raise when file is missing."""

View File

@@ -149,7 +149,7 @@ class TestSegmentCitationVerifier:
result = verifier.verify_citations(summary, segments)
assert result.is_valid is True
assert result.is_valid is True, "empty summary should be valid"
def test_verify_empty_segments(self, verifier: SegmentCitationVerifier) -> None:
"""Summary with citations but no segments should be invalid."""
@@ -158,8 +158,8 @@ class TestSegmentCitationVerifier:
result = verifier.verify_citations(summary, segments)
assert result.is_valid is False
assert result.missing_segment_ids == (0,)
assert result.is_valid is False, "citations without matching segments are invalid"
assert result.missing_segment_ids == (0,), "should report missing segment 0"
def test_verify_empty_citations(self, verifier: SegmentCitationVerifier) -> None:
"""Key points/actions with empty segment_ids should be valid."""
@@ -171,7 +171,7 @@ class TestSegmentCitationVerifier:
result = verifier.verify_citations(summary, segments)
assert result.is_valid is True
assert result.is_valid is True, "empty segment_ids should be valid"
def test_invalid_count_property(self, verifier: SegmentCitationVerifier) -> None:
"""invalid_count should sum key point and action item invalid counts."""
@@ -188,7 +188,7 @@ class TestSegmentCitationVerifier:
result = verifier.verify_citations(summary, segments)
assert result.invalid_count == 3
assert result.invalid_count == 3, "should count all invalid items"
class TestFilterInvalidCitations:
@@ -209,8 +209,8 @@ class TestFilterInvalidCitations:
filtered = verifier.filter_invalid_citations(summary, segments)
assert filtered.key_points[0].segment_ids == [0, 1]
assert filtered.action_items[0].segment_ids == [1]
assert filtered.key_points[0].segment_ids == [0, 1], "should remove invalid 99"
assert filtered.action_items[0].segment_ids == [1], "should remove invalid 50"
def test_filter_preserves_valid_citations(self, verifier: SegmentCitationVerifier) -> None:
"""Valid citations should be preserved."""
@@ -222,8 +222,8 @@ class TestFilterInvalidCitations:
filtered = verifier.filter_invalid_citations(summary, segments)
assert filtered.key_points[0].segment_ids == [0, 1]
assert filtered.action_items[0].segment_ids == [2]
assert filtered.key_points[0].segment_ids == [0, 1], "valid key point citations preserved"
assert filtered.action_items[0].segment_ids == [2], "valid action item citations preserved"
@pytest.mark.parametrize(
"attr_path,expected",

View File

@@ -131,7 +131,7 @@ class TestCloudSummarizerProperties:
)
# Trigger client creation
_ = summarizer._get_openai_client()
assert captured.get("base_url") == "https://custom"
assert captured.get("base_url") == "https://custom", "base_url should be forwarded"
class TestCloudSummarizerOpenAI:
@@ -170,8 +170,8 @@ class TestCloudSummarizerOpenAI:
result = await summarizer.summarize(request)
assert result.summary.key_points == []
assert result.summary.action_items == []
assert result.summary.key_points == [], "empty segments should yield no key points"
assert result.summary.action_items == [], "empty segments should yield no action items"
@pytest.mark.asyncio
async def test_summarize_returns_result(
@@ -207,9 +207,9 @@ class TestCloudSummarizerOpenAI:
result = await summarizer.summarize(request)
assert result.provider_name == "openai"
assert result.summary.executive_summary == "Project meeting summary."
assert result.tokens_used == 150
assert result.provider_name == "openai", "provider_name should be 'openai'"
assert result.summary.executive_summary == "Project meeting summary.", "summary should match"
assert result.tokens_used == 150, "tokens_used should match response"
@pytest.mark.asyncio
async def test_raises_unavailable_on_auth_error(
@@ -299,9 +299,9 @@ class TestCloudSummarizerAnthropic:
result = await summarizer.summarize(request)
assert result.provider_name == "anthropic"
assert result.summary.executive_summary == "Anthropic summary."
assert result.tokens_used == 150
assert result.provider_name == "anthropic", "provider_name should be 'anthropic'"
assert result.summary.executive_summary == "Anthropic summary.", "summary should match"
assert result.tokens_used == 150, "tokens_used should sum input and output"
@pytest.mark.asyncio
async def test_raises_unavailable_when_package_missing(
@@ -399,7 +399,7 @@ class TestCloudSummarizerFiltering:
result = await summarizer.summarize(request)
assert result.summary.key_points[0].segment_ids == [0]
assert result.summary.key_points[0].segment_ids == [0], "invalid segment_ids should be filtered"
@pytest.mark.asyncio
async def test_respects_max_limits(
@@ -440,5 +440,5 @@ class TestCloudSummarizerFiltering:
result = await summarizer.summarize(request)
assert len(result.summary.key_points) == 2
assert len(result.summary.action_items) == 3
assert len(result.summary.key_points) == 2, "should limit to max_key_points"
assert len(result.summary.action_items) == 3, "should limit to max_action_items"

View File

@@ -25,15 +25,15 @@ class TestMockSummarizer:
def test_mock_provider_name(self, summarizer: MockSummarizer) -> None:
"""Provider name should be 'mock'."""
assert summarizer.provider_name == "mock"
assert summarizer.provider_name == "mock", "provider_name should be 'mock'"
def test_is_available(self, summarizer: MockSummarizer) -> None:
"""Mock provider should always be available."""
assert summarizer.is_available is True
assert summarizer.is_available is True, "mock provider should always be available"
def test_requires_cloud_consent(self, summarizer: MockSummarizer) -> None:
"""Mock provider should not require cloud consent."""
assert summarizer.requires_cloud_consent is False
assert summarizer.requires_cloud_consent is False, "mock provider should not require consent"
@pytest.mark.asyncio
async def test_mock_summarize_returns_result(
@@ -50,9 +50,9 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert result.provider_name == "mock"
assert result.model_name == "mock-1.0"
assert result.summary.meeting_id == meeting_id
assert result.provider_name == "mock", "result should have mock provider_name"
assert result.model_name == "mock-1.0", "result should have mock-1.0 model_name"
assert result.summary.meeting_id == meeting_id, "summary meeting_id should match request"
@pytest.mark.asyncio
async def test_summarize_generates_executive_summary(
@@ -70,8 +70,8 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert "3 segments" in result.summary.executive_summary
assert "15.0 seconds" in result.summary.executive_summary
assert "3 segments" in result.summary.executive_summary, "executive_summary should mention segment count"
assert "15.0 seconds" in result.summary.executive_summary, "executive_summary should mention duration"
@pytest.mark.asyncio
async def test_summarize_generates_key_points_with_citations(
@@ -88,9 +88,9 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert len(result.summary.key_points) == 2
assert result.summary.key_points[0].segment_ids == [0]
assert result.summary.key_points[1].segment_ids == [1]
assert len(result.summary.key_points) == 2, "should generate 2 key points for 2 segments"
assert result.summary.key_points[0].segment_ids == [0], "first key point should cite segment 0"
assert result.summary.key_points[1].segment_ids == [1], "second key point should cite segment 1"
@pytest.mark.asyncio
async def test_summarize_respects_max_key_points(
@@ -108,7 +108,7 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert len(result.summary.key_points) == 3
assert len(result.summary.key_points) == 3, "should limit key points to max_key_points"
@pytest.mark.asyncio
async def test_summarize_extracts_action_items(
@@ -127,9 +127,9 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert len(result.summary.action_items) == 2
assert result.summary.action_items[0].segment_ids == [1]
assert result.summary.action_items[1].segment_ids == [2]
assert len(result.summary.action_items) == 2, "should extract 2 action items"
assert result.summary.action_items[0].segment_ids == [1], "first action should cite segment 1"
assert result.summary.action_items[1].segment_ids == [2], "second action should cite segment 2"
@pytest.mark.asyncio
async def test_summarize_respects_max_action_items(
@@ -147,7 +147,7 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert len(result.summary.action_items) == 2
assert len(result.summary.action_items) == 2, "should limit action items to max_action_items"
@pytest.mark.asyncio
async def test_summarize_sets_generated_at(
@@ -161,7 +161,7 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert result.summary.generated_at is not None
assert result.summary.generated_at is not None, "summary should have generated_at timestamp"
@pytest.mark.asyncio
async def test_mock_summarize_empty_segments(
@@ -174,6 +174,6 @@ class TestMockSummarizer:
result = await summarizer.summarize(request)
assert result.summary.key_points == []
assert result.summary.action_items == []
assert "0 segments" in result.summary.executive_summary
assert result.summary.key_points == [], "empty segments should yield no key points"
assert result.summary.action_items == [], "empty segments should yield no action items"
assert "0 segments" in result.summary.executive_summary, "should indicate 0 segments"

View File

@@ -39,9 +39,9 @@ class TestAsrConverter:
result = AsrConverter.word_timing_to_domain(asr_word)
assert result.start_time == 0.123456789
assert result.end_time == 0.987654321
assert result.probability == 0.999999
assert result.start_time == 0.123456789, "start_time precision preserved"
assert result.end_time == 0.987654321, "end_time precision preserved"
assert result.probability == 0.999999, "probability precision preserved"
def test_word_timing_to_domain_returns_domain_type(self) -> None:
"""Test converter returns domain WordTiming type."""
@@ -83,7 +83,7 @@ class TestAsrConverter:
),
)
words = AsrConverter.result_to_domain_words(asr_result)
assert len(words) == 2
assert len(words) == 2, "should convert both words"
def test_result_to_domain_words_empty(self) -> None:
"""Test conversion with empty words tuple."""
@@ -91,7 +91,7 @@ class TestAsrConverter:
words = AsrConverter.result_to_domain_words(asr_result)
assert words == []
assert words == [], "empty words tuple yields empty list"
class TestOrmConverterToOrmKwargs:
@@ -108,13 +108,14 @@ class TestOrmConverterToOrmKwargs:
result = OrmConverter.word_timing_to_orm_kwargs(word, word_index=0)
assert result == {
expected = {
"word": "test",
"start_time": 1.5,
"end_time": 2.0,
"probability": 0.9,
"word_index": 0,
}
assert result == expected, "all fields correctly converted to ORM kwargs"
def test_preserves_precision(self) -> None:
"""Test floating point precision in kwargs."""
@@ -171,8 +172,8 @@ class TestNerConverterToOrmKwargs:
result = NerConverter.to_orm_kwargs(entity)
assert result["category"] == "person"
assert isinstance(result["category"], str)
assert result["category"] == "person", "category value should be 'person'"
assert isinstance(result["category"], str), "category should be string type"
@pytest.mark.parametrize(
("category", "expected_value"),
@@ -228,26 +229,26 @@ class TestNerConverterOrmToDomain:
def test_sets_db_id_from_orm_id(self, mock_orm_model: MagicMock) -> None:
"""Domain entity db_id is set from ORM id."""
result = NerConverter.orm_to_domain(mock_orm_model)
assert result.db_id == mock_orm_model.id
assert result.db_id == mock_orm_model.id, "db_id should match ORM model id"
def test_converts_category_string_to_enum(self, mock_orm_model: MagicMock) -> None:
"""Category string from ORM is converted to EntityCategory enum."""
mock_orm_model.category = "person"
result = NerConverter.orm_to_domain(mock_orm_model)
assert result.category == EntityCategory.PERSON
assert isinstance(result.category, EntityCategory)
assert result.category == EntityCategory.PERSON, "category should convert to PERSON enum"
assert isinstance(result.category, EntityCategory), "category should be EntityCategory type"
def test_handles_none_segment_ids(self, mock_orm_model: MagicMock) -> None:
"""Null segment_ids in ORM becomes empty list in domain."""
mock_orm_model.segment_ids = None
result = NerConverter.orm_to_domain(mock_orm_model)
assert result.segment_ids == []
assert result.segment_ids == [], "None segment_ids should become empty list"
def test_handles_empty_segment_ids(self, mock_orm_model: MagicMock) -> None:
"""Empty segment_ids in ORM becomes empty list in domain."""
mock_orm_model.segment_ids = []
result = NerConverter.orm_to_domain(mock_orm_model)
assert result.segment_ids == []
assert result.segment_ids == [], "empty segment_ids should remain empty list"
class TestNerConverterRoundTrip:

View File

@@ -46,12 +46,12 @@ class TestCalendarProviderProperties:
def test_source_property(self) -> None:
"""Provider source should be CALENDAR."""
provider = CalendarProvider(_settings())
assert provider.source == TriggerSource.CALENDAR
assert provider.source == TriggerSource.CALENDAR, "source should be CALENDAR"
def test_max_weight_property(self) -> None:
"""Provider max_weight should reflect configured weight."""
provider = CalendarProvider(_settings(weight=0.5))
assert provider.max_weight == pytest.approx(0.5)
assert provider.max_weight == pytest.approx(0.5), "max_weight should match settings"
@pytest.mark.parametrize(
("enabled", "expected"),
@@ -73,12 +73,12 @@ class TestCalendarProviderGetSignal:
"""Disabled provider should return None."""
event = _event(minutes_from_now=2)
provider = CalendarProvider(_settings(enabled=False, events=[event]))
assert provider.get_signal() is None
assert provider.get_signal() is None, "disabled provider should return None"
def test_no_events_returns_none(self) -> None:
"""Provider with no events should return None."""
provider = CalendarProvider(_settings(events=[]))
assert provider.get_signal() is None
assert provider.get_signal() is None, "no events should return None"
def test_event_in_lookahead_window(self) -> None:
"""Event starting within lookahead window should trigger signal."""
@@ -99,8 +99,8 @@ class TestCalendarProviderGetSignal:
signal = provider.get_signal()
assert signal is not None
assert signal.app_name == "Test Meeting"
assert signal is not None, "recently started event should trigger"
assert signal.app_name == "Test Meeting", "signal should include event title"
def test_event_outside_window_returns_none(self) -> None:
"""Event outside lookahead/lookbehind window should not trigger."""
@@ -108,13 +108,13 @@ class TestCalendarProviderGetSignal:
provider = CalendarProvider(
_settings(lookahead_minutes=5, lookbehind_minutes=5, events=[event])
)
assert provider.get_signal() is None
assert provider.get_signal() is None, "future event outside window returns None"
def test_past_event_outside_lookbehind_returns_none(self) -> None:
"""Past event outside lookbehind window should not trigger."""
event = _event(minutes_from_now=-20, duration_minutes=10)
provider = CalendarProvider(_settings(lookbehind_minutes=5, events=[event]))
assert provider.get_signal() is None
assert provider.get_signal() is None, "past event outside lookbehind returns None"
def test_first_matching_event_returned(self) -> None:
"""When multiple events match, first one should be returned."""
@@ -194,7 +194,7 @@ class TestTimezoneHandling:
signal = provider.get_signal()
assert signal is not None
assert signal is not None, "naive datetime event should trigger"
def test_utc_datetime_works(self) -> None:
"""UTC datetime should work correctly."""
@@ -208,7 +208,7 @@ class TestTimezoneHandling:
signal = provider.get_signal()
assert signal is not None
assert signal is not None, "UTC datetime event should trigger"
class TestParseCalendarEventConfig:
@@ -216,7 +216,7 @@ class TestParseCalendarEventConfig:
def test_none_input_returns_empty(self) -> None:
"""None input should return empty list."""
assert parse_calendar_event_config(None) == []
assert parse_calendar_event_config(None) == [], "None input should return empty list"
def test_json_string_single_event(self) -> None:
"""JSON string with single event should parse correctly."""
@@ -224,8 +224,8 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(json_str)
assert len(events) == 1
assert events[0].title == "Meeting"
assert len(events) == 1, "should parse single event from JSON"
assert events[0].title == "Meeting", "event title should match"
def test_json_string_multiple_events(self) -> None:
"""JSON string with multiple events should parse correctly."""
@@ -236,13 +236,13 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(json_str)
assert len(events) == 2
assert events[0].title == "Meeting 1"
assert events[1].title == "Meeting 2"
assert len(events) == 2, "should parse both events from JSON array"
assert events[0].title == "Meeting 1", "first event title should match"
assert events[1].title == "Meeting 2", "second event title should match"
def test_invalid_json_returns_empty(self) -> None:
"""Invalid JSON should return empty list."""
assert parse_calendar_event_config("not valid json") == []
assert parse_calendar_event_config("not valid json") == [], "invalid JSON returns empty"
def test_dict_input_converted_to_list(self) -> None:
"""Single dict input should be converted to single-item list."""
@@ -254,8 +254,8 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(event_dict)
assert len(events) == 1
assert events[0].title == "Direct Dict"
assert len(events) == 1, "dict should convert to single-item list"
assert events[0].title == "Direct Dict", "event title should match"
def test_list_of_dicts(self) -> None:
"""List of dicts should parse correctly."""
@@ -267,7 +267,7 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(event_dicts)
assert len(events) == 2
assert len(events) == 2, "should parse both dicts"
def test_list_of_calendar_events_passthrough(self) -> None:
"""List of CalendarEvent objects should pass through unchanged."""
@@ -279,9 +279,9 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(original_events)
assert len(events) == 2
assert events[0] is original_events[0]
assert events[1] is original_events[1]
assert len(events) == 2, "should return both events"
assert events[0] is original_events[0], "first event should be same object"
assert events[1] is original_events[1], "second event should be same object"
def test_missing_start_or_end_skipped(self) -> None:
"""Events without start or end should be skipped."""
@@ -293,8 +293,8 @@ class TestParseCalendarEventConfig:
events = parse_calendar_event_config(event_dicts)
assert len(events) == 1
assert events[0].title == "Complete"
assert len(events) == 1, "only complete event should be parsed"
assert events[0].title == "Complete", "complete event title should match"
@pytest.mark.parametrize(
("date_str", "expected_valid"),
@@ -319,4 +319,4 @@ class TestParseCalendarEventConfig:
def test_non_iterable_returns_empty(self) -> None:
"""Non-iterable input should return empty list."""
assert parse_calendar_event_config(12345) == []
assert parse_calendar_event_config(12345) == [], "non-iterable returns empty"

View File

@@ -59,9 +59,9 @@ class TestWebhookExecutorDelivery:
payload,
)
assert delivery.succeeded is False
assert delivery.attempt_count == 0
assert delivery.error_message == "Webhook disabled"
assert delivery.succeeded is False, "disabled webhook should fail"
assert delivery.attempt_count == 0, "no attempts for disabled webhook"
assert delivery.error_message == "Webhook disabled", "should report disabled error"
@pytest.mark.asyncio
async def test_deliver_unsubscribed_event(
@@ -76,10 +76,10 @@ class TestWebhookExecutorDelivery:
payload,
)
assert delivery.succeeded is False
assert delivery.attempt_count == 0
assert delivery.succeeded is False, "unsubscribed event should fail"
assert delivery.attempt_count == 0, "no attempts for unsubscribed event"
error_msg = str(delivery.error_message)
assert "not subscribed" in error_msg
assert "not subscribed" in error_msg, "should report not subscribed"
@pytest.mark.asyncio
async def test_deliver_retries_on_timeout(
@@ -100,10 +100,10 @@ class TestWebhookExecutorDelivery:
payload,
)
assert delivery.succeeded is False
assert delivery.attempt_count == 3 # max_retries
assert delivery.succeeded is False, "timeout should fail"
assert delivery.attempt_count == 3, "should exhaust max_retries"
error_msg = str(delivery.error_message)
assert "Max retries exceeded" in error_msg
assert "Max retries exceeded" in error_msg, "should report max retries"
@pytest.mark.asyncio
async def test_deliver_retries_on_connection_error(
@@ -124,10 +124,10 @@ class TestWebhookExecutorDelivery:
payload,
)
assert delivery.succeeded is False
assert delivery.attempt_count == 3
assert delivery.succeeded is False, "connection error should fail"
assert delivery.attempt_count == 3, "should exhaust retries on connection error"
error_msg = str(delivery.error_message)
assert "Max retries exceeded" in error_msg
assert "Max retries exceeded" in error_msg, "should report max retries"
class TestHmacSignature:
@@ -199,7 +199,7 @@ class TestHmacSignature:
payload,
)
assert "X-NoteFlow-Signature" not in header_capture.headers
assert "X-NoteFlow-Signature" not in header_capture.headers, "no signature without secret"
class TestWebhookHeaders:
@@ -227,7 +227,7 @@ class TestWebhookHeaders:
payload,
)
assert header_capture.headers.get("X-NoteFlow-Event") == "meeting.completed"
assert header_capture.headers.get("X-NoteFlow-Event") == "meeting.completed", "event header should match"
@pytest.mark.asyncio
async def test_includes_delivery_id_header(
@@ -251,11 +251,11 @@ class TestWebhookHeaders:
payload,
)
assert "X-NoteFlow-Delivery" in header_capture.headers
assert "X-NoteFlow-Delivery" in header_capture.headers, "delivery ID header should be present"
# Verify it's a valid UUID format (xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx)
delivery_id = header_capture.headers["X-NoteFlow-Delivery"]
uuid_with_hyphens_length = 36
assert len(delivery_id) == uuid_with_hyphens_length
assert len(delivery_id) == uuid_with_hyphens_length, "delivery ID should be UUID format"
@pytest.mark.asyncio
async def test_delivery_id_header_matches_returned_record(
@@ -299,7 +299,7 @@ class TestExecutorCleanup:
"""Close method cleans up HTTP client."""
# Trigger client creation
await executor._ensure_client()
assert executor._client is not None
assert executor._client is not None, "client should be created"
await executor.close()
assert executor._client is None
assert executor._client is None, "client should be cleaned up"