diff --git a/client b/client index 390a5a2..c496d36 160000 --- a/client +++ b/client @@ -1 +1 @@ -Subproject commit 390a5a217bdd5dc67c35a2d72b76915292eac519 +Subproject commit c496d36004371b8e4a2b778f45e85ca4fe5b739f diff --git a/src/noteflow/application/services/summarization/_template_helpers.py b/src/noteflow/application/services/summarization/_template_helpers.py new file mode 100644 index 0000000..7fed254 --- /dev/null +++ b/src/noteflow/application/services/summarization/_template_helpers.py @@ -0,0 +1,240 @@ +"""Helpers for summarization template service.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import TYPE_CHECKING +from uuid import UUID, uuid4 + +from noteflow.domain.entities import SummarizationTemplate, SummarizationTemplateVersion +from noteflow.domain.identity.context import OperationContext +from noteflow.domain.utils.time import utc_now + +if TYPE_CHECKING: + from noteflow.domain.ports.unit_of_work import UnitOfWork + + +@dataclass(frozen=True) +class TemplateUpdateResult: + template: SummarizationTemplate + version: SummarizationTemplateVersion | None = None + + +@dataclass(frozen=True) +class TemplateCreatePayload: + workspace_id: UUID + name: str + content: str + description: str | None = None + change_note: str | None = None + + +@dataclass(frozen=True) +class TemplateUpdatePayload: + template_id: UUID + name: str | None = None + description: str | None = None + content: str | None = None + change_note: str | None = None + + +@dataclass(frozen=True) +class TemplateUpdateMetadata: + name: str + description: str | None + current_version_id: UUID | None + updated_at: datetime + updated_by: UUID + + +@dataclass(frozen=True) +class TemplateUpdateState: + template_id: UUID + name: str + description: str | None + current_version_id: UUID | None + + +@dataclass(frozen=True) +class TemplateUpdateInputs: + name: str | None + description: str | None + current_version_id: UUID | None + updated_at: datetime + updated_by: UUID + + +@dataclass(frozen=True) +class TemplateVersionInputs: + template_id: UUID + content: str + change_note: str | None + created_at: datetime + created_by: UUID + + +def extract_update_payload( + payload: TemplateUpdatePayload, +) -> tuple[UUID, str | None, str | None, str | None, str | None]: + return ( + payload.template_id, + payload.name, + payload.description, + payload.content, + payload.change_note, + ) + + +def normalize_optional(value: str | None) -> str | None: + if value is None: + return None + trimmed = value.strip() + return trimmed if trimmed else None + + +def require_trimmed(value: str, field_label: str) -> str: + trimmed = value.strip() + if not trimmed: + raise ValueError(f"{field_label} is required") + return trimmed + + +def validate_template_scope(context: OperationContext, template: SummarizationTemplate) -> None: + if template.is_system: + raise PermissionError("System templates are read-only") + workspace_id = template.workspace_id + if workspace_id is not None and context.workspace_id != workspace_id: + raise PermissionError("Workspace scope mismatch") + + +def build_new_template_entities( + payload: TemplateCreatePayload, + user_id: UUID, +) -> tuple[SummarizationTemplate, SummarizationTemplateVersion]: + now = utc_now() + template_id = uuid4() + version_id = uuid4() + name = require_trimmed(payload.name, "Template name") + content = require_trimmed(payload.content, "Template content") + description = normalize_optional(payload.description) + change_note = normalize_optional(payload.change_note) + + template = SummarizationTemplate( + id=template_id, + workspace_id=payload.workspace_id, + name=name, + description=description, + is_system=False, + is_archived=False, + current_version_id=version_id, + created_at=now, + updated_at=now, + created_by=user_id, + updated_by=user_id, + ) + version = SummarizationTemplateVersion( + id=version_id, + template_id=template_id, + version_number=1, + content=content, + change_note=change_note, + created_at=now, + created_by=user_id, + ) + return template, version + + +def extract_template_state(template: SummarizationTemplate) -> TemplateUpdateState: + template_id = template.id + current_version_id = template.current_version_id + return TemplateUpdateState( + template_id=template_id, + name=template.name, + description=template.description, + current_version_id=current_version_id, + ) + + +def build_restored_template( + template: SummarizationTemplate, + *, + version_id: UUID, + user_id: UUID, +) -> SummarizationTemplate: + now = utc_now() + return SummarizationTemplate( + id=template.id, + workspace_id=template.workspace_id, + name=template.name, + description=template.description, + is_system=template.is_system, + is_archived=template.is_archived, + current_version_id=version_id, + created_at=template.created_at, + updated_at=now, + created_by=template.created_by, + updated_by=user_id, + ) + + +def build_updated_template( + template: SummarizationTemplate, + metadata: TemplateUpdateMetadata, +) -> SummarizationTemplate: + workspace_id = template.workspace_id + current_version_id = metadata.current_version_id + updated_at = metadata.updated_at + updated_by = metadata.updated_by + return SummarizationTemplate( + id=template.id, + workspace_id=workspace_id, + name=metadata.name, + description=metadata.description, + is_system=template.is_system, + is_archived=template.is_archived, + current_version_id=current_version_id, + created_at=template.created_at, + updated_at=updated_at, + created_by=template.created_by, + updated_by=updated_by, + ) + + +def build_update_metadata( + state: TemplateUpdateState, + inputs: TemplateUpdateInputs, +) -> TemplateUpdateMetadata: + updated_name = state.name if inputs.name is None else require_trimmed( + inputs.name, + "Template name", + ) + updated_description = ( + state.description if inputs.description is None else normalize_optional(inputs.description) + ) + return TemplateUpdateMetadata( + name=updated_name, + description=updated_description, + current_version_id=inputs.current_version_id, + updated_at=inputs.updated_at, + updated_by=inputs.updated_by, + ) + + +async def create_new_version( + uow: UnitOfWork, + inputs: TemplateVersionInputs, +) -> SummarizationTemplateVersion: + versions = await uow.summarization_templates.list_versions(inputs.template_id) + next_version_number = 1 + max((v.version_number for v in versions), default=0) + version = SummarizationTemplateVersion( + id=uuid4(), + template_id=inputs.template_id, + version_number=next_version_number, + content=inputs.content, + change_note=inputs.change_note, + created_at=inputs.created_at, + created_by=inputs.created_by, + ) + await uow.summarization_templates.add_version(version) + return version diff --git a/src/noteflow/application/services/summarization/template_service.py b/src/noteflow/application/services/summarization/template_service.py index 5a5fccd..a22b6ca 100644 --- a/src/noteflow/application/services/summarization/template_service.py +++ b/src/noteflow/application/services/summarization/template_service.py @@ -3,25 +3,39 @@ from __future__ import annotations from collections.abc import Sequence -from dataclasses import dataclass -from uuid import UUID, uuid4 +from typing import TYPE_CHECKING +from uuid import UUID from noteflow.domain.entities import SummarizationTemplate, SummarizationTemplateVersion from noteflow.domain.identity.context import OperationContext from noteflow.domain.utils.time import utc_now from noteflow.infrastructure.logging import get_logger +from ._template_helpers import ( + TemplateCreatePayload, + TemplateUpdateInputs, + TemplateUpdatePayload, + TemplateUpdateResult, + TemplateVersionInputs, + build_new_template_entities, + build_restored_template, + build_update_metadata, + build_updated_template, + create_new_version, + extract_template_state, + extract_update_payload, + normalize_optional, + require_trimmed, + validate_template_scope, +) from .summarization_service import SummarizationServiceSettings +if TYPE_CHECKING: + from noteflow.domain.ports.unit_of_work import UnitOfWork + logger = get_logger(__name__) -@dataclass(frozen=True) -class TemplateUpdateResult: - template: SummarizationTemplate - version: SummarizationTemplateVersion | None = None - - class SummarizationTemplateService: """Manage summarization templates and versions.""" @@ -89,49 +103,12 @@ class SummarizationTemplateService: self, uow: UnitOfWork, context: OperationContext, - *, - workspace_id: UUID, - name: str, - content: str, - description: str | None = None, - change_note: str | None = None, + payload: TemplateCreatePayload, ) -> TemplateUpdateResult: """Create a template with its initial version.""" self._require_admin(context) - self._ensure_workspace_scope(context, workspace_id) - - if not name.strip(): - raise ValueError("Template name is required") - if not content.strip(): - raise ValueError("Template content is required") - - template_id = uuid4() - version_id = uuid4() - now = utc_now() - - template = SummarizationTemplate( - id=template_id, - workspace_id=workspace_id, - name=name.strip(), - description=description.strip() if description else None, - is_system=False, - is_archived=False, - current_version_id=version_id, - created_at=now, - updated_at=now, - created_by=context.user_id, - updated_by=context.user_id, - ) - - version = SummarizationTemplateVersion( - id=version_id, - template_id=template_id, - version_number=1, - content=content, - change_note=change_note.strip() if change_note else None, - created_at=now, - created_by=context.user_id, - ) + self._ensure_workspace_scope(context, payload.workspace_id) + template, version = build_new_template_entities(payload, context.user_id) saved = await uow.summarization_templates.create_with_version(template, version) await uow.commit() @@ -142,63 +119,45 @@ class SummarizationTemplateService: self, uow: UnitOfWork, context: OperationContext, - *, - template_id: UUID, - name: str | None = None, - description: str | None = None, - content: str | None = None, - change_note: str | None = None, + payload: TemplateUpdatePayload, ) -> TemplateUpdateResult | None: """Update template metadata and optionally create a new version.""" self._require_admin(context) - + ( + template_id, + payload_name, + payload_description, + payload_content, + payload_change_note, + ) = extract_update_payload(payload) template = await uow.summarization_templates.get(template_id) if template is None: return None - if template.is_system: - raise PermissionError("System templates are read-only") - if template.workspace_id is not None: - self._ensure_workspace_scope(context, template.workspace_id) - - updated_name = template.name if name is None else name.strip() - updated_description = template.description if description is None else description.strip() - + validate_template_scope(context, template) + state = extract_template_state(template) new_version: SummarizationTemplateVersion | None = None - current_version_id = template.current_version_id + current_version_id = state.current_version_id now = utc_now() - - if content is not None: - if not content.strip(): - raise ValueError("Template content is required") - versions = await uow.summarization_templates.list_versions(template_id) - next_version_number = 1 + max((v.version_number for v in versions), default=0) - version_id = uuid4() - new_version = SummarizationTemplateVersion( - id=version_id, - template_id=template_id, - version_number=next_version_number, + if payload_content is not None: + content = require_trimmed(payload_content, "Template content") + version_inputs = TemplateVersionInputs( + template_id=state.template_id, content=content, - change_note=change_note.strip() if change_note else None, + change_note=normalize_optional(payload_change_note), created_at=now, created_by=context.user_id, ) - await uow.summarization_templates.add_version(new_version) + new_version = await create_new_version(uow, version_inputs) current_version_id = new_version.id - - updated = SummarizationTemplate( - id=template.id, - workspace_id=template.workspace_id, - name=updated_name, - description=updated_description, - is_system=template.is_system, - is_archived=template.is_archived, + metadata_inputs = TemplateUpdateInputs( + name=payload_name, + description=payload_description, current_version_id=current_version_id, - created_at=template.created_at, updated_at=now, - created_by=template.created_by, updated_by=context.user_id, ) - + metadata = build_update_metadata(state, metadata_inputs) + updated = build_updated_template(template, metadata) saved = await uow.summarization_templates.update(updated) await uow.commit() logger.info("Updated summarization template %s", template_id) @@ -218,28 +177,13 @@ class SummarizationTemplateService: template = await uow.summarization_templates.get(template_id) if template is None: return None - if template.is_system: - raise PermissionError("System templates are read-only") - if template.workspace_id is not None: - self._ensure_workspace_scope(context, template.workspace_id) + validate_template_scope(context, template) version = await uow.summarization_templates.get_version(version_id) if version is None or version.template_id != template_id: raise ValueError("Template version not found") - updated = SummarizationTemplate( - id=template.id, - workspace_id=template.workspace_id, - name=template.name, - description=template.description, - is_system=template.is_system, - is_archived=template.is_archived, - current_version_id=version_id, - created_at=template.created_at, - updated_at=utc_now(), - created_by=template.created_by, - updated_by=context.user_id, - ) + updated = build_restored_template(template, version_id=version_id, user_id=context.user_id) saved = await uow.summarization_templates.update(updated) await uow.commit() @@ -265,9 +209,3 @@ class SummarizationTemplateService: archived = await uow.summarization_templates.archive(template_id, context.user_id) await uow.commit() return archived - - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from noteflow.domain.ports.unit_of_work import UnitOfWork diff --git a/src/noteflow/grpc/_mixins/__init__.py b/src/noteflow/grpc/_mixins/__init__.py index 9c3bc1a..2304b44 100644 --- a/src/noteflow/grpc/_mixins/__init__.py +++ b/src/noteflow/grpc/_mixins/__init__.py @@ -14,7 +14,11 @@ from .oidc import OidcMixin from .preferences import PreferencesMixin from .project import ProjectMembershipMixin, ProjectMixin from .streaming import StreamingMixin -from .summarization import SummarizationMixin +from .summarization import ( + SummarizationConsentMixin, + SummarizationGenerationMixin, + SummarizationTemplatesMixin, +) from .sync import SyncMixin from .webhooks import WebhooksMixin @@ -35,7 +39,9 @@ __all__ = [ "ProjectMembershipMixin", "ProjectMixin", "StreamingMixin", - "SummarizationMixin", + "SummarizationConsentMixin", + "SummarizationGenerationMixin", + "SummarizationTemplatesMixin", "SyncMixin", "WebhooksMixin", ] diff --git a/src/noteflow/grpc/_mixins/identity.py b/src/noteflow/grpc/_mixins/identity.py index fbcdd96..b3ee319 100644 --- a/src/noteflow/grpc/_mixins/identity.py +++ b/src/noteflow/grpc/_mixins/identity.py @@ -28,7 +28,11 @@ if TYPE_CHECKING: from noteflow.application.services.identity import IdentityService from noteflow.domain.identity.context import OperationContext - from noteflow.domain.identity.entities import Workspace, WorkspaceMembership + from noteflow.domain.identity.entities import ( + Workspace, + WorkspaceMembership, + WorkspaceSettings, + ) logger = get_logger(__name__) @@ -48,6 +52,34 @@ async def _resolve_auth_status(uow: UnitOfWork) -> tuple[bool, str]: return False, "" +def _settings_proto_or_empty( + settings: WorkspaceSettings | None, +) -> noteflow_pb2.WorkspaceSettingsProto: + settings_proto = workspace_settings_to_proto(settings) + return settings_proto or noteflow_pb2.WorkspaceSettingsProto() + + +def _merge_workspace_settings( + current: WorkspaceSettings, + updates: WorkspaceSettings, +) -> WorkspaceSettings: + return replace( + current, + export_rules=updates.export_rules + if updates.export_rules is not None + else current.export_rules, + trigger_rules=updates.trigger_rules + if updates.trigger_rules is not None + else current.trigger_rules, + rag_enabled=updates.rag_enabled + if updates.rag_enabled is not None + else current.rag_enabled, + default_summarization_template=updates.default_summarization_template + if updates.default_summarization_template is not None + else current.default_summarization_template, + ) + + class IdentityMixin: """Mixin providing identity management functionality. @@ -246,30 +278,14 @@ class IdentityMixin: updates = proto_to_workspace_settings(request.settings) if updates is None: - settings_proto = workspace_settings_to_proto(workspace.settings) - return settings_proto or noteflow_pb2.WorkspaceSettingsProto() + return _settings_proto_or_empty(workspace.settings) current = workspace.settings - updated_settings = replace( - current, - export_rules=updates.export_rules - if updates.export_rules is not None - else current.export_rules, - trigger_rules=updates.trigger_rules - if updates.trigger_rules is not None - else current.trigger_rules, - rag_enabled=updates.rag_enabled - if updates.rag_enabled is not None - else current.rag_enabled, - default_summarization_template=updates.default_summarization_template - if updates.default_summarization_template is not None - else current.default_summarization_template, - ) + updated_settings = _merge_workspace_settings(current, updates) updated_workspace = replace(workspace, settings=updated_settings) saved = await uow.workspaces.update(updated_workspace) await uow.commit() - settings_proto = workspace_settings_to_proto(saved.settings) - return settings_proto or noteflow_pb2.WorkspaceSettingsProto() + return _settings_proto_or_empty(saved.settings) async def _verify_workspace_access( self, diff --git a/src/noteflow/grpc/_mixins/oidc/_helpers.py b/src/noteflow/grpc/_mixins/oidc/_helpers.py index 54320a4..5c15a10 100644 --- a/src/noteflow/grpc/_mixins/oidc/_helpers.py +++ b/src/noteflow/grpc/_mixins/oidc/_helpers.py @@ -31,12 +31,18 @@ _FIELD_ENABLED = "enabled" def parse_provider_id(provider_id_str: str) -> UUID: """Parse provider ID string to UUID, raising ValueError if invalid.""" - return UUID(provider_id_str) + trimmed = provider_id_str.strip() + if not trimmed: + raise ValueError(ERR_INVALID_PROVIDER_ID) + return UUID(trimmed) def parse_preset(preset_str: str) -> OidcProviderPreset: """Parse preset string to OidcProviderPreset enum.""" - return OidcProviderPreset(preset_str.lower()) + normalized = preset_str.strip().lower() + if not normalized: + raise ValueError(ERR_INVALID_PRESET) + return OidcProviderPreset(normalized) async def validate_register_request( @@ -158,13 +164,16 @@ def preset_config_to_proto( Returns: The protobuf OidcPresetProto message. """ + default_scopes = list(preset_config.default_scopes) + documentation_url = preset_config.documentation_url or "" + notes = preset_config.notes or "" return noteflow_pb2.OidcPresetProto( preset=preset_config.preset.value, display_name=preset_config.display_name, description=preset_config.description, - default_scopes=list(preset_config.default_scopes), - documentation_url=preset_config.documentation_url or "", - notes=preset_config.notes or "", + default_scopes=default_scopes, + documentation_url=documentation_url, + notes=notes, ) diff --git a/src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py b/src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py index 8a10919..951dddf 100644 --- a/src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py +++ b/src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import AsyncIterator +from dataclasses import dataclass from typing import TYPE_CHECKING import numpy as np @@ -15,6 +16,62 @@ from .._partials import clear_partial_buffer, maybe_emit_partial if TYPE_CHECKING: from ....proto import noteflow_pb2 from ...protocols import ServicerHost + from ....stream_state import MeetingStreamState + + +@dataclass(frozen=True) +class VadProcessingContext: + host: ServicerHost + meeting_id: str + state: "MeetingStreamState" + + +def _emit_vad_state_updates( + ctx: VadProcessingContext, + is_speech: bool, +) -> list[noteflow_pb2.TranscriptUpdate]: + from ....proto import noteflow_pb2 + + updates: list[noteflow_pb2.TranscriptUpdate] = [] + if is_speech and not ctx.state.was_speaking: + updates.append( + create_vad_update(ctx.meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_START) + ) + ctx.state.was_speaking = True + return updates + if not is_speech and ctx.state.was_speaking: + updates.append( + create_vad_update(ctx.meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_END) + ) + ctx.state.was_speaking = False + return updates + + +async def _maybe_emit_partial_update( + ctx: VadProcessingContext, + audio: NDArray[np.float32], + is_speech: bool, +) -> noteflow_pb2.TranscriptUpdate | None: + if not is_speech: + return None + ctx.state.partial_buffer.append(audio) + return await maybe_emit_partial(ctx.host, ctx.meeting_id) + + +async def _iter_segment_updates( + ctx: VadProcessingContext, + audio: NDArray[np.float32], + is_speech: bool, +) -> AsyncIterator[noteflow_pb2.TranscriptUpdate]: + for audio_segment in ctx.state.segmenter.process_audio(audio, is_speech): + clear_partial_buffer(ctx.host, ctx.meeting_id) + async for update in process_audio_segment( + ctx.host, + ctx.meeting_id, + audio_segment.audio, + audio_segment.start_time, + ): + yield update async def process_audio_with_vad( @@ -26,8 +83,6 @@ async def process_audio_with_vad( Uses consolidated MeetingStreamState for O(1) lookup instead of 13+ dict accesses. """ - from ....proto import noteflow_pb2 - # Single dict lookup replaces 6+ separate lookups per audio chunk state = host.get_stream_state(meeting_id) if state is None: @@ -35,40 +90,23 @@ async def process_audio_with_vad( # Get VAD decision using consolidated state is_speech = state.vad.process_chunk(audio) + ctx = VadProcessingContext(host=host, meeting_id=meeting_id, state=state) # Streaming diarization (optional) await host.process_streaming_diarization(meeting_id, audio) # Emit VAD state change events using consolidated state - if is_speech and not state.was_speaking: - # Speech started - yield create_vad_update(meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_START) - state.was_speaking = True - elif not is_speech and state.was_speaking: - # Speech ended - yield create_vad_update(meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_END) - state.was_speaking = False + for update in _emit_vad_state_updates(ctx, is_speech): + yield update # Buffer audio for partial transcription (pre-allocated buffer handles copy) - if is_speech: - state.partial_buffer.append(audio) - - # Check if we should emit a partial - partial_update = await maybe_emit_partial(host, meeting_id) - if partial_update is not None: - yield partial_update + partial_update = await _maybe_emit_partial_update(ctx, audio, is_speech) + if partial_update is not None: + yield partial_update # Process through segmenter using consolidated state - for audio_segment in state.segmenter.process_audio(audio, is_speech): - # Clear partial buffer when we get a final segment - clear_partial_buffer(host, meeting_id) - async for update in process_audio_segment( - host, - meeting_id, - audio_segment.audio, - audio_segment.start_time, - ): - yield update + async for update in _iter_segment_updates(ctx, audio, is_speech): + yield update async def flush_segmenter( diff --git a/src/noteflow/grpc/_mixins/summarization/__init__.py b/src/noteflow/grpc/_mixins/summarization/__init__.py index 159821b..d8cfd2a 100644 --- a/src/noteflow/grpc/_mixins/summarization/__init__.py +++ b/src/noteflow/grpc/_mixins/summarization/__init__.py @@ -1,4 +1,4 @@ -"""Summarization mixin composition.""" +"""Summarization mixins.""" from __future__ import annotations @@ -7,16 +7,7 @@ from ._generation_mixin import SummarizationGenerationMixin from ._templates_mixin import SummarizationTemplatesMixin -class SummarizationMixin( - SummarizationGenerationMixin, - SummarizationConsentMixin, - SummarizationTemplatesMixin, -): - """Composite mixin providing summarization functionality.""" - - __all__ = [ - "SummarizationMixin", "SummarizationConsentMixin", "SummarizationGenerationMixin", "SummarizationTemplatesMixin", diff --git a/src/noteflow/grpc/_mixins/summarization/_context_builders.py b/src/noteflow/grpc/_mixins/summarization/_context_builders.py index a9c7a48..7895283 100644 --- a/src/noteflow/grpc/_mixins/summarization/_context_builders.py +++ b/src/noteflow/grpc/_mixins/summarization/_context_builders.py @@ -24,9 +24,11 @@ def build_summary_context( settings: SummarizationServiceSettings, ) -> SummaryTemplateContext: """Build summary template context from settings.""" + max_key_points = max(settings.max_key_points, 0) + max_action_items = max(settings.max_action_items, 0) return SummaryTemplateContext( - max_key_points=settings.max_key_points, - max_action_items=settings.max_action_items, + max_key_points=max_key_points, + max_action_items=max_action_items, ) diff --git a/src/noteflow/grpc/_mixins/summarization/_template_crud.py b/src/noteflow/grpc/_mixins/summarization/_template_crud.py index 5c22f14..51731c4 100644 --- a/src/noteflow/grpc/_mixins/summarization/_template_crud.py +++ b/src/noteflow/grpc/_mixins/summarization/_template_crud.py @@ -6,6 +6,11 @@ from dataclasses import dataclass from typing import TYPE_CHECKING from uuid import UUID +from noteflow.application.services.summarization.template_service import ( + SummarizationTemplateService, + TemplateCreatePayload, + TemplateUpdatePayload, +) from noteflow.domain.ports.unit_of_work import UnitOfWork from ...proto import noteflow_pb2 @@ -22,9 +27,6 @@ from ..errors import ( ) if TYPE_CHECKING: - from noteflow.application.services.summarization.template_service import ( - SummarizationTemplateService, - ) from noteflow.domain.identity import OperationContext @@ -136,15 +138,14 @@ async def create_summarization_template( change_note = request.change_note if request.HasField("change_note") else None try: - result = await ctx.template_service.create_template( - repo, - op_context, + payload = TemplateCreatePayload( workspace_id=workspace_id, name=request.name, content=request.content, description=description, change_note=change_note, ) + result = await ctx.template_service.create_template(repo, op_context, payload) except PermissionError as exc: await abort_permission_denied(context, str(exc)) raise RuntimeError("Unreachable") @@ -178,15 +179,14 @@ async def update_summarization_template( change_note = request.change_note if request.HasField("change_note") else None try: - result = await ctx.template_service.update_template( - repo, - op_context, + payload = TemplateUpdatePayload( template_id=template_id, name=name, description=description, content=content, change_note=change_note, ) + result = await ctx.template_service.update_template(repo, op_context, payload) except PermissionError as exc: await abort_permission_denied(context, str(exc)) raise RuntimeError("Unreachable") diff --git a/src/noteflow/grpc/interceptors/logging/_handler_factory.py b/src/noteflow/grpc/interceptors/logging/_handler_factory.py index e9807ab..975fbf4 100644 --- a/src/noteflow/grpc/interceptors/logging/_handler_factory.py +++ b/src/noteflow/grpc/interceptors/logging/_handler_factory.py @@ -3,8 +3,9 @@ from __future__ import annotations from collections.abc import AsyncIterator, Awaitable, Callable -from typing import TypeVar, cast +from typing import Protocol, TypeVar, cast +import importlib import grpc from grpc import aio @@ -18,84 +19,163 @@ from ._wrappers import ( _TRequest = TypeVar("_TRequest") _TResponse = TypeVar("_TResponse") +RpcMethodHandler = grpc.RpcMethodHandler + + +class _TypedRpcMethodHandler(Protocol[_TRequest, _TResponse]): + unary_unary: Callable[ + [_TRequest, aio.ServicerContext[_TRequest, _TResponse]], + Awaitable[_TResponse], + ] | None + unary_stream: Callable[ + [_TRequest, aio.ServicerContext[_TRequest, _TResponse]], + AsyncIterator[_TResponse], + ] | None + stream_unary: Callable[ + [AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]], + Awaitable[_TResponse], + ] | None + stream_stream: Callable[ + [AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]], + AsyncIterator[_TResponse], + ] | None + request_deserializer: Callable[[bytes], _TRequest] | None + response_serializer: Callable[[_TResponse], bytes] | None + + +class _GrpcFactories(Protocol): + def unary_unary_rpc_method_handler( + self, + behavior: Callable[ + [_TRequest, aio.ServicerContext[_TRequest, _TResponse]], + Awaitable[_TResponse], + ], + *, + request_deserializer: Callable[[bytes], _TRequest] | None = None, + response_serializer: Callable[[_TResponse], bytes] | None = None, + ) -> RpcMethodHandler[_TRequest, _TResponse]: ... + + def unary_stream_rpc_method_handler( + self, + behavior: Callable[ + [_TRequest, aio.ServicerContext[_TRequest, _TResponse]], + AsyncIterator[_TResponse], + ], + *, + request_deserializer: Callable[[bytes], _TRequest] | None = None, + response_serializer: Callable[[_TResponse], bytes] | None = None, + ) -> RpcMethodHandler[_TRequest, _TResponse]: ... + + def stream_unary_rpc_method_handler( + self, + behavior: Callable[ + [AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]], + Awaitable[_TResponse], + ], + *, + request_deserializer: Callable[[bytes], _TRequest] | None = None, + response_serializer: Callable[[_TResponse], bytes] | None = None, + ) -> RpcMethodHandler[_TRequest, _TResponse]: ... + + def stream_stream_rpc_method_handler( + self, + behavior: Callable[ + [AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]], + AsyncIterator[_TResponse], + ], + *, + request_deserializer: Callable[[bytes], _TRequest] | None = None, + response_serializer: Callable[[_TResponse], bytes] | None = None, + ) -> RpcMethodHandler[_TRequest, _TResponse]: ... + + +_grpc = cast(_GrpcFactories, importlib.import_module("grpc")) + def wrap_unary_unary_handler[TRequest, TResponse]( - handler: grpc.RpcMethodHandler[TRequest, TResponse], + handler: _TypedRpcMethodHandler[TRequest, TResponse], method: str, -) -> grpc.RpcMethodHandler[TRequest, TResponse]: +) -> RpcMethodHandler[TRequest, TResponse]: """Create wrapped unary-unary handler with logging.""" - # Cast required: gRPC stub types don't fully express the generic Callable signatures - return grpc.unary_unary_rpc_method_handler( - cast( - Callable[ - [TRequest, aio.ServicerContext[TRequest, TResponse]], - Awaitable[TResponse], - ], - wrap_unary_unary(handler.unary_unary, method), - ), - request_deserializer=handler.request_deserializer, - response_serializer=handler.response_serializer, + if handler.unary_unary is None: + raise TypeError("Unary-unary handler is missing") + wrapped: Callable[ + [TRequest, aio.ServicerContext[TRequest, TResponse]], + Awaitable[TResponse], + ] = wrap_unary_unary(handler.unary_unary, method) + request_deserializer = handler.request_deserializer + response_serializer = handler.response_serializer + return _grpc.unary_unary_rpc_method_handler( + wrapped, + request_deserializer=request_deserializer, + response_serializer=response_serializer, ) def wrap_unary_stream_handler[TRequest, TResponse]( - handler: grpc.RpcMethodHandler[TRequest, TResponse], + handler: _TypedRpcMethodHandler[TRequest, TResponse], method: str, -) -> grpc.RpcMethodHandler[TRequest, TResponse]: +) -> RpcMethodHandler[TRequest, TResponse]: """Create wrapped unary-stream handler with logging.""" - return grpc.unary_stream_rpc_method_handler( - cast( - Callable[ - [TRequest, aio.ServicerContext[TRequest, TResponse]], - AsyncIterator[TResponse], - ], - wrap_unary_stream(handler.unary_stream, method), - ), - request_deserializer=handler.request_deserializer, - response_serializer=handler.response_serializer, + if handler.unary_stream is None: + raise TypeError("Unary-stream handler is missing") + wrapped: Callable[ + [TRequest, aio.ServicerContext[TRequest, TResponse]], + AsyncIterator[TResponse], + ] = wrap_unary_stream(handler.unary_stream, method) + request_deserializer = handler.request_deserializer + response_serializer = handler.response_serializer + return _grpc.unary_stream_rpc_method_handler( + wrapped, + request_deserializer=request_deserializer, + response_serializer=response_serializer, ) def wrap_stream_unary_handler[TRequest, TResponse]( - handler: grpc.RpcMethodHandler[TRequest, TResponse], + handler: _TypedRpcMethodHandler[TRequest, TResponse], method: str, -) -> grpc.RpcMethodHandler[TRequest, TResponse]: +) -> RpcMethodHandler[TRequest, TResponse]: """Create wrapped stream-unary handler with logging.""" - return grpc.stream_unary_rpc_method_handler( - cast( - Callable[ - [AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]], - Awaitable[TResponse], - ], - wrap_stream_unary(handler.stream_unary, method), - ), - request_deserializer=handler.request_deserializer, - response_serializer=handler.response_serializer, + if handler.stream_unary is None: + raise TypeError("Stream-unary handler is missing") + wrapped: Callable[ + [AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]], + Awaitable[TResponse], + ] = wrap_stream_unary(handler.stream_unary, method) + request_deserializer = handler.request_deserializer + response_serializer = handler.response_serializer + return _grpc.stream_unary_rpc_method_handler( + wrapped, + request_deserializer=request_deserializer, + response_serializer=response_serializer, ) def wrap_stream_stream_handler[TRequest, TResponse]( - handler: grpc.RpcMethodHandler[TRequest, TResponse], + handler: _TypedRpcMethodHandler[TRequest, TResponse], method: str, -) -> grpc.RpcMethodHandler[TRequest, TResponse]: +) -> RpcMethodHandler[TRequest, TResponse]: """Create wrapped stream-stream handler with logging.""" - return grpc.stream_stream_rpc_method_handler( - cast( - Callable[ - [AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]], - AsyncIterator[TResponse], - ], - wrap_stream_stream(handler.stream_stream, method), - ), - request_deserializer=handler.request_deserializer, - response_serializer=handler.response_serializer, + if handler.stream_stream is None: + raise TypeError("Stream-stream handler is missing") + wrapped: Callable[ + [AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]], + AsyncIterator[TResponse], + ] = wrap_stream_stream(handler.stream_stream, method) + request_deserializer = handler.request_deserializer + response_serializer = handler.response_serializer + return _grpc.stream_stream_rpc_method_handler( + wrapped, + request_deserializer=request_deserializer, + response_serializer=response_serializer, ) def create_logging_handler[TRequest, TResponse]( - handler: grpc.RpcMethodHandler[TRequest, TResponse], + handler: RpcMethodHandler[TRequest, TResponse], method: str, -) -> grpc.RpcMethodHandler[TRequest, TResponse]: +) -> RpcMethodHandler[TRequest, TResponse]: """Wrap an RPC handler to add request logging. Args: @@ -105,14 +185,15 @@ def create_logging_handler[TRequest, TResponse]( Returns: Wrapped handler with logging. """ - # Dispatch based on handler type - if handler.unary_unary is not None: - return wrap_unary_unary_handler(handler, method) - if handler.unary_stream is not None: - return wrap_unary_stream_handler(handler, method) - if handler.stream_unary is not None: - return wrap_stream_unary_handler(handler, method) - if handler.stream_stream is not None: - return wrap_stream_stream_handler(handler, method) + # cast required: grpc.RpcMethodHandler does not expose member signatures + typed_handler = cast(_TypedRpcMethodHandler[TRequest, TResponse], handler) + if typed_handler.unary_unary is not None: + return wrap_unary_unary_handler(typed_handler, method) + if typed_handler.unary_stream is not None: + return wrap_unary_stream_handler(typed_handler, method) + if typed_handler.stream_unary is not None: + return wrap_stream_unary_handler(typed_handler, method) + if typed_handler.stream_stream is not None: + return wrap_stream_stream_handler(typed_handler, method) # Fallback: return original handler if type unknown return handler diff --git a/src/noteflow/grpc/interceptors/logging/logging.py b/src/noteflow/grpc/interceptors/logging/logging.py index 8004b24..45e8031 100644 --- a/src/noteflow/grpc/interceptors/logging/logging.py +++ b/src/noteflow/grpc/interceptors/logging/logging.py @@ -14,11 +14,9 @@ from grpc import aio from ._handler_factory import create_logging_handler -# TypeVars required for ServerInterceptor.intercept_service compatibility _TRequest = TypeVar("_TRequest") _TResponse = TypeVar("_TResponse") - class RequestLoggingInterceptor(aio.ServerInterceptor): """Interceptor that logs all RPC calls with timing and status. diff --git a/src/noteflow/grpc/server/_setup.py b/src/noteflow/grpc/server/_setup.py index 6247abf..a490804 100644 --- a/src/noteflow/grpc/server/_setup.py +++ b/src/noteflow/grpc/server/_setup.py @@ -15,15 +15,17 @@ if TYPE_CHECKING: def create_server() -> grpc.aio.Server: """Create async gRPC server with interceptors and limits.""" + interceptors = [ + RequestLoggingInterceptor(), + IdentityInterceptor(), + ] + options = [ + ("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB + ("grpc.max_receive_message_length", 100 * 1024 * 1024), + ] return grpc.aio.server( - interceptors=[ - RequestLoggingInterceptor(), - IdentityInterceptor(), - ], - options=[ - ("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB - ("grpc.max_receive_message_length", 100 * 1024 * 1024), - ], + interceptors=interceptors, + options=options, ) diff --git a/src/noteflow/grpc/service.py b/src/noteflow/grpc/service.py index a03e246..d62f971 100644 --- a/src/noteflow/grpc/service.py +++ b/src/noteflow/grpc/service.py @@ -36,7 +36,9 @@ from ._mixins import ( ProjectMembershipMixin, ProjectMixin, StreamingMixin, - SummarizationMixin, + SummarizationConsentMixin, + SummarizationGenerationMixin, + SummarizationTemplatesMixin, SyncMixin, WebhooksMixin, ) @@ -71,7 +73,9 @@ class NoteFlowServicer( DiarizationMixin, DiarizationJobMixin, MeetingMixin, - SummarizationMixin, + SummarizationGenerationMixin, + SummarizationConsentMixin, + SummarizationTemplatesMixin, AnnotationMixin, ExportMixin, EntitiesMixin, diff --git a/src/noteflow/infrastructure/asr/segmenter/segmenter.py b/src/noteflow/infrastructure/asr/segmenter/segmenter.py index 5ac610d..a62475a 100644 --- a/src/noteflow/infrastructure/asr/segmenter/segmenter.py +++ b/src/noteflow/infrastructure/asr/segmenter/segmenter.py @@ -13,7 +13,8 @@ import numpy as np from numpy.typing import NDArray from noteflow.config.constants import DEFAULT_SAMPLE_RATE - +from noteflow.infrastructure.logging import get_logger +from ._constants import SEGMENTER_STATE_TRANSITION_EVENT from ._types import AudioSegment, SegmenterState if TYPE_CHECKING: @@ -180,47 +181,6 @@ class Segmenter: return segment - def _handle_idle( - self, - audio: NDArray[np.float32], - is_speech: bool, - chunk_start: float, - ) -> Iterator[AudioSegment]: - """Handle audio in IDLE state.""" - from noteflow.infrastructure.logging import get_logger - - from ._constants import SEGMENTER_STATE_TRANSITION_EVENT - - logger = get_logger(__name__) - - if is_speech: - # Speech started - transition to SPEECH state - old_state = self._state - self._state = SegmenterState.SPEECH - self._speech_start_time = chunk_start - - logger.debug( - SEGMENTER_STATE_TRANSITION_EVENT, - from_state=old_state.name, - to_state=self._state.name, - stream_time=round(self._stream_time, 2), - ) - - # Capture how much pre-speech audio we are including (O(1) lookup). - self._leading_duration = self._leading_buffer_samples / self.config.sample_rate - - # Include leading buffer (pre-speech audio) - self._speech_buffer = list(self._leading_buffer) - self._speech_buffer_samples = self._leading_buffer_samples + len(audio) - self._speech_buffer.append(audio) - self._leading_buffer.clear() - self._leading_buffer_samples = 0 - else: - # Still idle - maintain leading buffer - self._update_leading_buffer(audio) - - yield from () # No segments emitted in IDLE - def _handle_speech( self, audio: NDArray[np.float32], @@ -267,16 +227,44 @@ class Segmenter: self._speech_buffer = [] self._speech_buffer_samples = 0 + def _handle_idle( + self, + audio: NDArray[np.float32], + is_speech: bool, + chunk_start: float, + ) -> Iterator[AudioSegment]: + """Handle audio in IDLE state.""" + logger = get_logger(__name__) + + if is_speech: + old_state = self._state + self._state = SegmenterState.SPEECH + self._speech_start_time = chunk_start + + logger.debug( + SEGMENTER_STATE_TRANSITION_EVENT, + from_state=old_state.name, + to_state=self._state.name, + stream_time=round(self._stream_time, 2), + ) + + self._leading_duration = self._leading_buffer_samples / self.config.sample_rate + self._speech_buffer = list(self._leading_buffer) + self._speech_buffer_samples = self._leading_buffer_samples + len(audio) + self._speech_buffer.append(audio) + self._leading_buffer.clear() + self._leading_buffer_samples = 0 + else: + self._update_leading_buffer(audio) + + yield from () # No segments emitted in IDLE + def _transition_to_trailing( self, audio: NDArray[np.float32], chunk_duration: float, ) -> Iterator[AudioSegment]: """Transition from SPEECH to TRAILING state.""" - from noteflow.infrastructure.logging import get_logger - - from ._constants import SEGMENTER_STATE_TRANSITION_EVENT - logger = get_logger(__name__) old_state = self._state @@ -291,7 +279,6 @@ class Segmenter: stream_time=round(self._stream_time, 2), ) - # Check if already past trailing threshold if self._trailing_duration < self.config.trailing_silence: return @@ -309,10 +296,6 @@ class Segmenter: def _resume_speech_from_trailing(self, audio: NDArray[np.float32]) -> None: """Resume speech by merging trailing buffer back into speech.""" - from noteflow.infrastructure.logging import get_logger - - from ._constants import SEGMENTER_STATE_TRANSITION_EVENT - logger = get_logger(__name__) self._speech_buffer.extend(self._trailing_buffer) @@ -335,10 +318,6 @@ class Segmenter: chunk_duration: float, ) -> Iterator[AudioSegment]: """Accumulate trailing silence, emitting segment if threshold reached.""" - from noteflow.infrastructure.logging import get_logger - - from ._constants import SEGMENTER_STATE_TRANSITION_EVENT - logger = get_logger(__name__) self._trailing_buffer.append(audio) @@ -347,7 +326,6 @@ class Segmenter: if self._trailing_duration < self.config.trailing_silence: return - # Enough trailing silence - emit segment segment = self._emit_segment() if segment is not None: yield segment @@ -365,10 +343,8 @@ class Segmenter: self._leading_buffer.append(audio) self._leading_buffer_samples += len(audio) - # Calculate total buffer duration using cached sample count total_duration = self._leading_buffer_samples / self.config.sample_rate - # Trim to configured leading buffer size (O(1) with deque.popleft) while total_duration > self.config.leading_buffer and self._leading_buffer: removed = self._leading_buffer.popleft() self._leading_buffer_samples -= len(removed) diff --git a/src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py b/src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py index 610470b..801b723 100644 --- a/src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py +++ b/src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Callable +from dataclasses import dataclass from typing import Final, cast import httpx @@ -30,16 +31,21 @@ GRAPH_API_BASE: Final[str] = "https://graph.microsoft.com/v1.0" GRAPH_API_MAX_PAGE_SIZE: Final[int] = 100 # Graph API maximum +@dataclass(frozen=True) +class EventFetchContext: + client: httpx.AsyncClient + headers: dict[str, str] + graph_api_base: str + parse_event_func: Callable[[OutlookEvent], CalendarEventInfo] + + async def fetch_events( - client: httpx.AsyncClient, - headers: dict[str, str], + ctx: EventFetchContext, query: OutlookEventQuery, - graph_api_base: str, - parse_event_func: Callable[[OutlookEvent], CalendarEventInfo], ) -> list[CalendarEventInfo]: """Fetch events with pagination handling.""" page_size = min(query.limit, GRAPH_API_MAX_PAGE_SIZE) - url: str | None = f"{graph_api_base}/me/calendarView" + url: str | None = f"{ctx.graph_api_base}/me/calendarView" params: dict[str, str | int] | None = { "startDateTime": query.start_time, "endDateTime": query.end_time, @@ -53,7 +59,7 @@ async def fetch_events( all_events: list[CalendarEventInfo] = [] while url is not None: - response = await client.get(url, params=params, headers=headers) + response = await ctx.client.get(url, params=params, headers=ctx.headers) raise_for_status(response) parsed = parse_events_response(response) if parsed is None: @@ -61,7 +67,10 @@ async def fetch_events( items, next_url = parsed all_events, reached_limit = accumulate_events( - items, all_events, query.limit, parse_event_func + items, + all_events, + query.limit, + ctx.parse_event_func, ) if reached_limit: return all_events diff --git a/src/noteflow/infrastructure/calendar/outlook/_user_info.py b/src/noteflow/infrastructure/calendar/outlook/_user_info.py index 2965308..ef0949d 100644 --- a/src/noteflow/infrastructure/calendar/outlook/_user_info.py +++ b/src/noteflow/infrastructure/calendar/outlook/_user_info.py @@ -27,6 +27,47 @@ GRAPH_API_TIMEOUT: Final[float] = 30.0 # seconds MAX_CONNECTIONS: Final[int] = 10 +def _build_profile_request( + access_token: str, + graph_api_base: str, +) -> tuple[str, dict[str, str], dict[str, str]]: + url = f"{graph_api_base}/me" + params = {"$select": "mail,userPrincipalName,displayName"} + headers = {HTTP_AUTHORIZATION: f"{HTTP_BEARER_PREFIX}{access_token}"} + return url, params, headers + + +def _raise_for_profile_status(response: httpx.Response) -> None: + if response.status_code == HTTP_STATUS_UNAUTHORIZED: + raise OutlookCalendarError(ERR_TOKEN_EXPIRED) + + if response.status_code != HTTP_STATUS_OK: + error_body = truncate_error_body(response.text) + logger.error("Microsoft Graph API error: %s", error_body) + raise OutlookCalendarError(f"{ERR_API_PREFIX}{error_body}") + + +def _parse_profile(response: httpx.Response) -> OutlookProfile: + data_value = response.json() + if not isinstance(data_value, dict): + raise OutlookCalendarError("Invalid user profile response") + return cast(OutlookProfile, data_value) + + +def _extract_email(profile: OutlookProfile) -> str: + email = profile.get("mail") or profile.get("userPrincipalName") + if not email: + raise OutlookCalendarError("No email in user profile response") + return str(email) + + +def _format_display_name(profile: OutlookProfile, email: str) -> str: + display_name_raw = profile.get("displayName") + if display_name_raw: + return str(display_name_raw) + return str(email).split("@")[0].replace(".", " ").title() + + async def fetch_user_info( access_token: str, graph_api_base: str = GRAPH_API_BASE, @@ -47,9 +88,7 @@ async def fetch_user_info( Raises: OutlookCalendarError: If API call fails. """ - url = f"{graph_api_base}/me" - params = {"$select": "mail,userPrincipalName,displayName"} - headers = {HTTP_AUTHORIZATION: f"{HTTP_BEARER_PREFIX}{access_token}"} + url, params, headers = _build_profile_request(access_token, graph_api_base) async with httpx.AsyncClient( timeout=httpx.Timeout(timeout), @@ -57,29 +96,9 @@ async def fetch_user_info( ) as client: response = await client.get(url, params=params, headers=headers) - if response.status_code == HTTP_STATUS_UNAUTHORIZED: - raise OutlookCalendarError(ERR_TOKEN_EXPIRED) + _raise_for_profile_status(response) + profile = _parse_profile(response) + email = _extract_email(profile) + display_name = _format_display_name(profile, email) - if response.status_code != HTTP_STATUS_OK: - error_body = truncate_error_body(response.text) - logger.error("Microsoft Graph API error: %s", error_body) - raise OutlookCalendarError(f"{ERR_API_PREFIX}{error_body}") - - data_value = response.json() - if not isinstance(data_value, dict): - raise OutlookCalendarError("Invalid user profile response") - data = cast(OutlookProfile, data_value) - - email = data.get("mail") or data.get("userPrincipalName") - if not email: - raise OutlookCalendarError("No email in user profile response") - - # Get display name, fall back to email prefix - display_name_raw = data.get("displayName") - display_name = ( - str(display_name_raw) - if display_name_raw - else str(email).split("@")[0].replace(".", " ").title() - ) - - return str(email), display_name + return email, display_name diff --git a/src/noteflow/infrastructure/calendar/outlook/outlook_adapter.py b/src/noteflow/infrastructure/calendar/outlook/outlook_adapter.py index e35011e..67ac399 100644 --- a/src/noteflow/infrastructure/calendar/outlook/outlook_adapter.py +++ b/src/noteflow/infrastructure/calendar/outlook/outlook_adapter.py @@ -13,7 +13,7 @@ from noteflow.config.constants.core import HOURS_PER_DAY from noteflow.domain.ports.calendar import CalendarEventInfo, CalendarPort from noteflow.infrastructure.logging import get_logger, log_timing -from ._event_fetcher import GRAPH_API_BASE, fetch_events +from ._event_fetcher import GRAPH_API_BASE, EventFetchContext, fetch_events from ._event_parser import parse_outlook_event from ._query_builder import build_event_query, build_headers from ._user_info import fetch_user_info @@ -68,9 +68,13 @@ class OutlookCalendarAdapter(CalendarPort): timeout=httpx.Timeout(GRAPH_API_TIMEOUT), limits=httpx.Limits(max_connections=MAX_CONNECTIONS), ) as client: - all_events = await fetch_events( - client, headers, query, self.GRAPH_API_BASE, parse_outlook_event + ctx = EventFetchContext( + client=client, + headers=headers, + graph_api_base=self.GRAPH_API_BASE, + parse_event_func=parse_outlook_event, ) + all_events = await fetch_events(ctx, query) logger.info( "outlook_calendar_events_fetched", diff --git a/src/noteflow/infrastructure/observability/usage/_helpers.py b/src/noteflow/infrastructure/observability/usage/_helpers.py index 170bcdf..0c1bbeb 100644 --- a/src/noteflow/infrastructure/observability/usage/_helpers.py +++ b/src/noteflow/infrastructure/observability/usage/_helpers.py @@ -50,8 +50,10 @@ def build_usage_event( Returns: Constructed UsageEvent. """ + event_label = event_type + attributes_copy = dict(attributes) return UsageEvent( - event_type=event_type, + event_type=event_label, meeting_id=context.meeting_id, provider_name=metrics.provider_name, model_name=metrics.model_name, @@ -60,7 +62,7 @@ def build_usage_event( latency_ms=metrics.latency_ms, success=context.success, error_code=context.error_code, - attributes=attributes, + attributes=attributes_copy, ) diff --git a/src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py b/src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py index 5726f26..4eabb91 100644 --- a/src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py +++ b/src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py @@ -108,7 +108,8 @@ class SqlAlchemyDiarizationJobRepository(BaseRepository): Returns: List of jobs ordered by creation time (newest first). """ - return list(await list_for_meeting(self._session, self._execute_scalars, meeting_id)) + jobs = await list_for_meeting(self._session, self._execute_scalars, meeting_id) + return list(jobs) async def get_active_for_meeting(self, meeting_id: str) -> DiarizationJob | None: """Get active (QUEUED or RUNNING) job for a meeting. diff --git a/src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py b/src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py index 8295206..a876d10 100644 --- a/src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py +++ b/src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass from uuid import UUID from sqlalchemy import func, select @@ -17,6 +18,15 @@ from noteflow.infrastructure.persistence.models.integrations import IntegrationS logger = get_logger(__name__) +@dataclass(frozen=True) +class SyncRunQueryContext: + session: AsyncSession + execute_scalars_func: Callable[ + [Select[tuple[IntegrationSyncRunModel]]], + Awaitable[list[IntegrationSyncRunModel]], + ] + + async def get_sync_run( session: AsyncSession, execute_scalar_func: Callable[[Select[tuple[IntegrationSyncRunModel]]], Awaitable[IntegrationSyncRunModel | None]], @@ -83,8 +93,7 @@ async def update_sync_run( async def list_sync_runs( - session: AsyncSession, - execute_scalars_func: Callable[[Select[tuple[IntegrationSyncRunModel]]], Awaitable[list[IntegrationSyncRunModel]]], + ctx: SyncRunQueryContext, integration_id: UUID, limit: int = 20, offset: int = 0, @@ -110,7 +119,7 @@ async def list_sync_runs( .select_from(IntegrationSyncRunModel) .where(IntegrationSyncRunModel.integration_id == integration_id) ) - total_value = await session.scalar(count_stmt) + total_value = await ctx.session.scalar(count_stmt) total = total_value if total_value is not None else 0 # Fetch query @@ -121,7 +130,7 @@ async def list_sync_runs( .offset(offset) .limit(limit) ) - models = await execute_scalars_func(stmt) + models = await ctx.execute_scalars_func(stmt) runs = [SyncRunConverter.orm_to_domain(m) for m in models] return runs, total diff --git a/src/noteflow/infrastructure/persistence/repositories/integration/integration_repo.py b/src/noteflow/infrastructure/persistence/repositories/integration/integration_repo.py index 704f9e5..2c52163 100644 --- a/src/noteflow/infrastructure/persistence/repositories/integration/integration_repo.py +++ b/src/noteflow/infrastructure/persistence/repositories/integration/integration_repo.py @@ -25,7 +25,7 @@ from noteflow.infrastructure.persistence.repositories._base import ( from ._queries import get_by_provider, list_all, list_by_type from ._secrets import get_secrets, set_secrets -from ._sync_runs import get_sync_run, list_sync_runs, update_sync_run +from ._sync_runs import SyncRunQueryContext, get_sync_run, list_sync_runs, update_sync_run logger = get_logger(__name__) @@ -253,6 +253,8 @@ class SqlAlchemyIntegrationRepository( Returns: Tuple of (sync runs newest first, total count). """ - return await list_sync_runs( - self._session, self._execute_scalars, integration_id, limit, offset + ctx = SyncRunQueryContext( + session=self._session, + execute_scalars_func=self._execute_scalars, ) + return await list_sync_runs(ctx, integration_id, limit, offset) diff --git a/src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py b/src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py index bdbf993..310fc4c 100644 --- a/src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py +++ b/src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py @@ -3,13 +3,13 @@ from __future__ import annotations from collections.abc import Sequence +from dataclasses import dataclass from datetime import datetime -from typing import Unpack +from typing import Protocol, Unpack from uuid import UUID from sqlalchemy import func, select from sqlalchemy.engine import RowMapping -from typing import Protocol from sqlalchemy.ext.asyncio import AsyncSession from noteflow.domain.constants.fields import MODEL_NAME @@ -39,6 +39,14 @@ class _EventTypeAggregateRow(Protocol): error_count: int +@dataclass(frozen=True) +class ProviderAggregateQuery: + start_time: datetime + end_time: datetime + event_type: str | None = None + workspace_id: UUID | None = None + + async def aggregate( session: AsyncSession, start_time: datetime, @@ -82,19 +90,13 @@ def _row_to_usage_aggregate_from_event(row: _EventTypeAggregateRow) -> UsageAggr async def aggregate_by_provider( session: AsyncSession, - start_time: datetime, - end_time: datetime, - event_type: str | None = None, - workspace_id: UUID | None = None, + query: ProviderAggregateQuery, ) -> Sequence[ProviderUsageAggregate]: """Calculate aggregate statistics grouped by provider. Args: session: Database session. - start_time: Start of time range (inclusive). - end_time: End of time range (exclusive). - event_type: Optional filter by event type. - workspace_id: Optional filter by workspace. + query: Provider aggregation query options. Returns: List of per-provider aggregated statistics. @@ -108,18 +110,18 @@ async def aggregate_by_provider( *build_token_aggregate_columns(), ) .where( - UsageEventModel.timestamp >= start_time, - UsageEventModel.timestamp < end_time, + UsageEventModel.timestamp >= query.start_time, + UsageEventModel.timestamp < query.end_time, UsageEventModel.provider_name.isnot(None), ) .group_by(UsageEventModel.provider_name) .order_by(func.count(UsageEventModel.id).desc()) ) - if event_type: - stmt = stmt.where(UsageEventModel.event_type == event_type) - if workspace_id: - stmt = stmt.where(UsageEventModel.workspace_id == workspace_id) + if query.event_type: + stmt = stmt.where(UsageEventModel.event_type == query.event_type) + if query.workspace_id: + stmt = stmt.where(UsageEventModel.workspace_id == query.workspace_id) result = await session.execute(stmt) rows = result.all() diff --git a/src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py b/src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py index d979cad..9b65a2a 100644 --- a/src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py +++ b/src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable, Sequence +from dataclasses import dataclass from datetime import datetime from typing import Unpack from uuid import UUID @@ -23,9 +24,17 @@ from noteflow.infrastructure.persistence.repositories._usage_aggregates import ( from ._converters import to_domain +@dataclass(frozen=True) +class UsageEventQueryContext: + session: AsyncSession + execute_scalars_func: Callable[ + [Select[tuple[UsageEventModel]]], + Awaitable[list[UsageEventModel]], + ] + + async def get_by_meeting( - session: AsyncSession, - execute_scalars_func: Callable[[Select[tuple[UsageEventModel]]], Awaitable[list[UsageEventModel]]], + ctx: UsageEventQueryContext, meeting_id: UUID, event_type: str | None = None, limit: int = 100, @@ -51,7 +60,7 @@ async def get_by_meeting( if event_type: stmt = stmt.where(UsageEventModel.event_type == event_type) - models = await execute_scalars_func(stmt) + models = await ctx.execute_scalars_func(stmt) return [to_domain(m) for m in models] diff --git a/src/noteflow/infrastructure/persistence/repositories/usage_event/usage_event_repo.py b/src/noteflow/infrastructure/persistence/repositories/usage_event/usage_event_repo.py index ebd33f1..2ff6af3 100644 --- a/src/noteflow/infrastructure/persistence/repositories/usage_event/usage_event_repo.py +++ b/src/noteflow/infrastructure/persistence/repositories/usage_event/usage_event_repo.py @@ -21,13 +21,14 @@ from noteflow.infrastructure.persistence.repositories._usage_aggregates import ( ) from ._aggregations import ( + ProviderAggregateQuery, aggregate, aggregate_by_event_type, aggregate_by_provider, count_by_event_type, ) from ._converters import event_to_model -from ._queries import get_by_meeting, get_in_time_range +from ._queries import UsageEventQueryContext, get_by_meeting, get_in_time_range # Re-export for backward compatibility __all__ = [ @@ -88,9 +89,11 @@ class SqlAlchemyUsageEventRepository(BaseRepository): Returns: Sequence of usage events, newest first. """ - return await get_by_meeting( - self._session, self._execute_scalars, meeting_id, event_type, limit + ctx = UsageEventQueryContext( + session=self._session, + execute_scalars_func=self._execute_scalars, ) + return await get_by_meeting(ctx, meeting_id, event_type, limit) async def get_in_time_range( self, @@ -148,9 +151,13 @@ class SqlAlchemyUsageEventRepository(BaseRepository): Returns: List of per-provider aggregated statistics. """ - return await aggregate_by_provider( - self._session, start_time, end_time, event_type, workspace_id + query = ProviderAggregateQuery( + start_time=start_time, + end_time=end_time, + event_type=event_type, + workspace_id=workspace_id, ) + return await aggregate_by_provider(self._session, query) async def aggregate_by_event_type( self, diff --git a/src/noteflow/infrastructure/summarization/template_renderer.py b/src/noteflow/infrastructure/summarization/template_renderer.py index b04a7d2..191a1d0 100644 --- a/src/noteflow/infrastructure/summarization/template_renderer.py +++ b/src/noteflow/infrastructure/summarization/template_renderer.py @@ -6,6 +6,7 @@ import re from collections.abc import Callable from dataclasses import dataclass, field from datetime import datetime +from typing import TypeVar @dataclass(frozen=True) @@ -115,11 +116,35 @@ SUMMARY_GETTERS: dict[str, Callable[[SummaryTemplateContext], str | None]] = { } -def _resolve_metadata(path: str, metadata: dict[str, str]) -> str | None: - key = path.split(".", maxsplit=1)[1] if "." in path else "" +def _resolve_metadata_placeholder( + path: str, + prefix: str, + metadata: dict[str, str], +) -> tuple[bool, str | None]: + if not path.startswith(prefix): + return False, None + key = path.removeprefix(prefix) if not key: - return None - return metadata.get(key) + return True, None + return True, metadata.get(key) + + +_TContext = TypeVar("_TContext") + + +def _resolve_namespace( + path: str, + prefix: str, + ctx: _TContext | None, + getters: dict[str, Callable[[_TContext], str | None]], +) -> tuple[bool, str | None]: + if not path.startswith(prefix): + return False, None + if ctx is None: + return True, None + key = path.removeprefix(prefix) + getter = getters.get(key) + return True, getter(ctx) if getter else None def resolve_placeholder(path: str, context: TemplateContext) -> str | None: @@ -127,43 +152,29 @@ def resolve_placeholder(path: str, context: TemplateContext) -> str | None: if path == "style_instructions": return context.style_instructions or "" - if path.startswith("metadata."): - return _resolve_metadata(path, context.metadata) + handled, value = _resolve_metadata_placeholder(path, "metadata.", context.metadata) + if handled: + return value - if path.startswith("meeting.metadata."): - return _resolve_metadata(path.replace("meeting.", "", 1), context.meeting.metadata) - - if path.startswith("meeting."): - key = path.removeprefix("meeting.") - getter = MEETING_GETTERS.get(key) - return getter(context.meeting) if getter else None - - if path.startswith("project."): - if context.project is None: - return None - key = path.removeprefix("project.") - getter = PROJECT_GETTERS.get(key) - return getter(context.project) if getter else None - - if path.startswith("workspace."): - if context.workspace is None: - return None - key = path.removeprefix("workspace.") - getter = WORKSPACE_GETTERS.get(key) - return getter(context.workspace) if getter else None - - if path.startswith("user."): - if context.user is None: - return None - key = path.removeprefix("user.") - getter = USER_GETTERS.get(key) - return getter(context.user) if getter else None - - if path.startswith("summary."): - key = path.removeprefix("summary.") - getter = SUMMARY_GETTERS.get(key) - return getter(context.summary) if getter else None + handled, value = _resolve_metadata_placeholder( + path, + "meeting.metadata.", + context.meeting.metadata, + ) + if handled: + return value + namespaces = ( + ("meeting.", context.meeting, MEETING_GETTERS), + ("project.", context.project, PROJECT_GETTERS), + ("workspace.", context.workspace, WORKSPACE_GETTERS), + ("user.", context.user, USER_GETTERS), + ("summary.", context.summary, SUMMARY_GETTERS), + ) + for prefix, ctx, getters in namespaces: + handled, value = _resolve_namespace(path, prefix, ctx, getters) + if handled: + return value return None diff --git a/src/noteflow/infrastructure/triggers/app_audio/app_audio.py b/src/noteflow/infrastructure/triggers/app_audio/app_audio.py index bda1269..31392f4 100644 --- a/src/noteflow/infrastructure/triggers/app_audio/app_audio.py +++ b/src/noteflow/infrastructure/triggers/app_audio/app_audio.py @@ -69,13 +69,15 @@ class AppAudioSettings: def to_audio_activity_settings(settings: AppAudioSettings) -> AudioActivitySettings: """Convert app audio settings to audio activity settings.""" + max_history = settings.max_history + min_samples = settings.min_samples return AudioActivitySettings( enabled=settings.enabled, threshold_db=settings.threshold_db, window_seconds=settings.window_seconds, min_active_ratio=settings.min_active_ratio, - min_samples=settings.min_samples, - max_history=settings.max_history, + min_samples=min_samples, + max_history=max_history, weight=settings.weight, ) diff --git a/tests/quality/baselines.json b/tests/quality/baselines.json index dde725a..0fa50e7 100644 --- a/tests/quality/baselines.json +++ b/tests/quality/baselines.json @@ -1,63 +1,5 @@ { - "generated_at": "2026-01-06T10:09:03.265932+00:00", - "rules": { - "deep_nesting": [ - "deep_nesting|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|depth=4" - ], - "feature_envy": [ - "feature_envy|src/noteflow/application/services/summarization/template_service.py|SummarizationTemplateService.restore_version|template=11_vs_self=2", - "feature_envy|src/noteflow/application/services/summarization/template_service.py|SummarizationTemplateService.update_template|template=12_vs_self=2", - "feature_envy|src/noteflow/grpc/_mixins/identity.py|IdentityMixin.UpdateWorkspaceSettings|updates=8_vs_self=3" - ], - "god_class": [ - "god_class|src/noteflow/grpc/_mixins/summarization.py|SummarizationMixin|methods=16", - "god_class|src/noteflow/grpc/_mixins/summarization/__init__.py|SummarizationMixin|methods=16" - ], - "high_complexity": [ - "high_complexity|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|complexity=27", - "high_complexity|src/noteflow/infrastructure/summarization/template_renderer.py|resolve_placeholder|complexity=13" - ], - "long_method": [ - "long_method|src/noteflow/application/services/summarization/template_service.py|create_template|lines=52", - "long_method|src/noteflow/application/services/summarization/template_service.py|update_template|lines=65", - "long_method|src/noteflow/grpc/_mixins/identity.py|UpdateWorkspaceSettings|lines=55", - "long_method|src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py|process_audio_with_vad|lines=52", - "long_method|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|lines=127", - "long_method|src/noteflow/infrastructure/calendar/outlook/_user_info.py|fetch_user_info|lines=56" - ], - "long_parameter_list": [ - "long_parameter_list|src/noteflow/application/services/summarization/template_service.py|create_template|params=7", - "long_parameter_list|src/noteflow/application/services/summarization/template_service.py|update_template|params=7", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|archive_summarization_template|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|create_summarization_template|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|get_summarization_template|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|list_summarization_template_versions|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|list_summarization_templates|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|restore_summarization_template_version|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|update_summarization_template|params=5", - "long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|params=8", - "long_parameter_list|src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py|fetch_events|params=5", - "long_parameter_list|src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py|list_sync_runs|params=5", - "long_parameter_list|src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py|aggregate_by_provider|params=5", - "long_parameter_list|src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py|get_by_meeting|params=5" - ], - "module_size_soft": [ - "module_size_soft|src/noteflow/infrastructure/asr/segmenter/segmenter.py|module|lines=375" - ], - "thin_wrapper": [ - "thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|parse_preset|OidcProviderPreset", - "thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|parse_provider_id|UUID", - "thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|preset_config_to_proto|OidcPresetProto", - "thin_wrapper|src/noteflow/grpc/_mixins/summarization/_context_builders.py|build_summary_context|SummaryTemplateContext", - "thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_stream_stream_handler|stream_stream_rpc_method_handler", - "thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_stream_unary_handler|stream_unary_rpc_method_handler", - "thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_unary_stream_handler|unary_stream_rpc_method_handler", - "thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_unary_unary_handler|unary_unary_rpc_method_handler", - "thin_wrapper|src/noteflow/grpc/server/_setup.py|create_server|server", - "thin_wrapper|src/noteflow/infrastructure/observability/usage/_helpers.py|build_usage_event|UsageEvent", - "thin_wrapper|src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py|list_for_meeting|list", - "thin_wrapper|src/noteflow/infrastructure/triggers/app_audio/app_audio.py|to_audio_activity_settings|AudioActivitySettings" - ] - }, + "generated_at": "2026-01-06T10:48:45.842264+00:00", + "rules": {}, "schema_version": 1 }