chore: update client submodule and add summarization template helpers

- Updated the client submodule to the latest commit for improved features and stability.
- Introduced new helper functions for managing summarization templates, including creation, updating, and validation.
- Refactored the summarization template service to utilize the new helper functions, enhancing code clarity and maintainability.
This commit is contained in:
2026-01-06 21:48:29 +00:00
parent 2c0e017811
commit 7b0f0124d0
28 changed files with 811 additions and 491 deletions

2
client

Submodule client updated: 390a5a217b...c496d36004

View File

@@ -0,0 +1,240 @@
"""Helpers for summarization template service."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import TYPE_CHECKING
from uuid import UUID, uuid4
from noteflow.domain.entities import SummarizationTemplate, SummarizationTemplateVersion
from noteflow.domain.identity.context import OperationContext
from noteflow.domain.utils.time import utc_now
if TYPE_CHECKING:
from noteflow.domain.ports.unit_of_work import UnitOfWork
@dataclass(frozen=True)
class TemplateUpdateResult:
template: SummarizationTemplate
version: SummarizationTemplateVersion | None = None
@dataclass(frozen=True)
class TemplateCreatePayload:
workspace_id: UUID
name: str
content: str
description: str | None = None
change_note: str | None = None
@dataclass(frozen=True)
class TemplateUpdatePayload:
template_id: UUID
name: str | None = None
description: str | None = None
content: str | None = None
change_note: str | None = None
@dataclass(frozen=True)
class TemplateUpdateMetadata:
name: str
description: str | None
current_version_id: UUID | None
updated_at: datetime
updated_by: UUID
@dataclass(frozen=True)
class TemplateUpdateState:
template_id: UUID
name: str
description: str | None
current_version_id: UUID | None
@dataclass(frozen=True)
class TemplateUpdateInputs:
name: str | None
description: str | None
current_version_id: UUID | None
updated_at: datetime
updated_by: UUID
@dataclass(frozen=True)
class TemplateVersionInputs:
template_id: UUID
content: str
change_note: str | None
created_at: datetime
created_by: UUID
def extract_update_payload(
payload: TemplateUpdatePayload,
) -> tuple[UUID, str | None, str | None, str | None, str | None]:
return (
payload.template_id,
payload.name,
payload.description,
payload.content,
payload.change_note,
)
def normalize_optional(value: str | None) -> str | None:
if value is None:
return None
trimmed = value.strip()
return trimmed if trimmed else None
def require_trimmed(value: str, field_label: str) -> str:
trimmed = value.strip()
if not trimmed:
raise ValueError(f"{field_label} is required")
return trimmed
def validate_template_scope(context: OperationContext, template: SummarizationTemplate) -> None:
if template.is_system:
raise PermissionError("System templates are read-only")
workspace_id = template.workspace_id
if workspace_id is not None and context.workspace_id != workspace_id:
raise PermissionError("Workspace scope mismatch")
def build_new_template_entities(
payload: TemplateCreatePayload,
user_id: UUID,
) -> tuple[SummarizationTemplate, SummarizationTemplateVersion]:
now = utc_now()
template_id = uuid4()
version_id = uuid4()
name = require_trimmed(payload.name, "Template name")
content = require_trimmed(payload.content, "Template content")
description = normalize_optional(payload.description)
change_note = normalize_optional(payload.change_note)
template = SummarizationTemplate(
id=template_id,
workspace_id=payload.workspace_id,
name=name,
description=description,
is_system=False,
is_archived=False,
current_version_id=version_id,
created_at=now,
updated_at=now,
created_by=user_id,
updated_by=user_id,
)
version = SummarizationTemplateVersion(
id=version_id,
template_id=template_id,
version_number=1,
content=content,
change_note=change_note,
created_at=now,
created_by=user_id,
)
return template, version
def extract_template_state(template: SummarizationTemplate) -> TemplateUpdateState:
template_id = template.id
current_version_id = template.current_version_id
return TemplateUpdateState(
template_id=template_id,
name=template.name,
description=template.description,
current_version_id=current_version_id,
)
def build_restored_template(
template: SummarizationTemplate,
*,
version_id: UUID,
user_id: UUID,
) -> SummarizationTemplate:
now = utc_now()
return SummarizationTemplate(
id=template.id,
workspace_id=template.workspace_id,
name=template.name,
description=template.description,
is_system=template.is_system,
is_archived=template.is_archived,
current_version_id=version_id,
created_at=template.created_at,
updated_at=now,
created_by=template.created_by,
updated_by=user_id,
)
def build_updated_template(
template: SummarizationTemplate,
metadata: TemplateUpdateMetadata,
) -> SummarizationTemplate:
workspace_id = template.workspace_id
current_version_id = metadata.current_version_id
updated_at = metadata.updated_at
updated_by = metadata.updated_by
return SummarizationTemplate(
id=template.id,
workspace_id=workspace_id,
name=metadata.name,
description=metadata.description,
is_system=template.is_system,
is_archived=template.is_archived,
current_version_id=current_version_id,
created_at=template.created_at,
updated_at=updated_at,
created_by=template.created_by,
updated_by=updated_by,
)
def build_update_metadata(
state: TemplateUpdateState,
inputs: TemplateUpdateInputs,
) -> TemplateUpdateMetadata:
updated_name = state.name if inputs.name is None else require_trimmed(
inputs.name,
"Template name",
)
updated_description = (
state.description if inputs.description is None else normalize_optional(inputs.description)
)
return TemplateUpdateMetadata(
name=updated_name,
description=updated_description,
current_version_id=inputs.current_version_id,
updated_at=inputs.updated_at,
updated_by=inputs.updated_by,
)
async def create_new_version(
uow: UnitOfWork,
inputs: TemplateVersionInputs,
) -> SummarizationTemplateVersion:
versions = await uow.summarization_templates.list_versions(inputs.template_id)
next_version_number = 1 + max((v.version_number for v in versions), default=0)
version = SummarizationTemplateVersion(
id=uuid4(),
template_id=inputs.template_id,
version_number=next_version_number,
content=inputs.content,
change_note=inputs.change_note,
created_at=inputs.created_at,
created_by=inputs.created_by,
)
await uow.summarization_templates.add_version(version)
return version

View File

@@ -3,25 +3,39 @@
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from uuid import UUID, uuid4
from typing import TYPE_CHECKING
from uuid import UUID
from noteflow.domain.entities import SummarizationTemplate, SummarizationTemplateVersion
from noteflow.domain.identity.context import OperationContext
from noteflow.domain.utils.time import utc_now
from noteflow.infrastructure.logging import get_logger
from ._template_helpers import (
TemplateCreatePayload,
TemplateUpdateInputs,
TemplateUpdatePayload,
TemplateUpdateResult,
TemplateVersionInputs,
build_new_template_entities,
build_restored_template,
build_update_metadata,
build_updated_template,
create_new_version,
extract_template_state,
extract_update_payload,
normalize_optional,
require_trimmed,
validate_template_scope,
)
from .summarization_service import SummarizationServiceSettings
if TYPE_CHECKING:
from noteflow.domain.ports.unit_of_work import UnitOfWork
logger = get_logger(__name__)
@dataclass(frozen=True)
class TemplateUpdateResult:
template: SummarizationTemplate
version: SummarizationTemplateVersion | None = None
class SummarizationTemplateService:
"""Manage summarization templates and versions."""
@@ -89,49 +103,12 @@ class SummarizationTemplateService:
self,
uow: UnitOfWork,
context: OperationContext,
*,
workspace_id: UUID,
name: str,
content: str,
description: str | None = None,
change_note: str | None = None,
payload: TemplateCreatePayload,
) -> TemplateUpdateResult:
"""Create a template with its initial version."""
self._require_admin(context)
self._ensure_workspace_scope(context, workspace_id)
if not name.strip():
raise ValueError("Template name is required")
if not content.strip():
raise ValueError("Template content is required")
template_id = uuid4()
version_id = uuid4()
now = utc_now()
template = SummarizationTemplate(
id=template_id,
workspace_id=workspace_id,
name=name.strip(),
description=description.strip() if description else None,
is_system=False,
is_archived=False,
current_version_id=version_id,
created_at=now,
updated_at=now,
created_by=context.user_id,
updated_by=context.user_id,
)
version = SummarizationTemplateVersion(
id=version_id,
template_id=template_id,
version_number=1,
content=content,
change_note=change_note.strip() if change_note else None,
created_at=now,
created_by=context.user_id,
)
self._ensure_workspace_scope(context, payload.workspace_id)
template, version = build_new_template_entities(payload, context.user_id)
saved = await uow.summarization_templates.create_with_version(template, version)
await uow.commit()
@@ -142,63 +119,45 @@ class SummarizationTemplateService:
self,
uow: UnitOfWork,
context: OperationContext,
*,
template_id: UUID,
name: str | None = None,
description: str | None = None,
content: str | None = None,
change_note: str | None = None,
payload: TemplateUpdatePayload,
) -> TemplateUpdateResult | None:
"""Update template metadata and optionally create a new version."""
self._require_admin(context)
(
template_id,
payload_name,
payload_description,
payload_content,
payload_change_note,
) = extract_update_payload(payload)
template = await uow.summarization_templates.get(template_id)
if template is None:
return None
if template.is_system:
raise PermissionError("System templates are read-only")
if template.workspace_id is not None:
self._ensure_workspace_scope(context, template.workspace_id)
updated_name = template.name if name is None else name.strip()
updated_description = template.description if description is None else description.strip()
validate_template_scope(context, template)
state = extract_template_state(template)
new_version: SummarizationTemplateVersion | None = None
current_version_id = template.current_version_id
current_version_id = state.current_version_id
now = utc_now()
if content is not None:
if not content.strip():
raise ValueError("Template content is required")
versions = await uow.summarization_templates.list_versions(template_id)
next_version_number = 1 + max((v.version_number for v in versions), default=0)
version_id = uuid4()
new_version = SummarizationTemplateVersion(
id=version_id,
template_id=template_id,
version_number=next_version_number,
if payload_content is not None:
content = require_trimmed(payload_content, "Template content")
version_inputs = TemplateVersionInputs(
template_id=state.template_id,
content=content,
change_note=change_note.strip() if change_note else None,
change_note=normalize_optional(payload_change_note),
created_at=now,
created_by=context.user_id,
)
await uow.summarization_templates.add_version(new_version)
new_version = await create_new_version(uow, version_inputs)
current_version_id = new_version.id
updated = SummarizationTemplate(
id=template.id,
workspace_id=template.workspace_id,
name=updated_name,
description=updated_description,
is_system=template.is_system,
is_archived=template.is_archived,
metadata_inputs = TemplateUpdateInputs(
name=payload_name,
description=payload_description,
current_version_id=current_version_id,
created_at=template.created_at,
updated_at=now,
created_by=template.created_by,
updated_by=context.user_id,
)
metadata = build_update_metadata(state, metadata_inputs)
updated = build_updated_template(template, metadata)
saved = await uow.summarization_templates.update(updated)
await uow.commit()
logger.info("Updated summarization template %s", template_id)
@@ -218,28 +177,13 @@ class SummarizationTemplateService:
template = await uow.summarization_templates.get(template_id)
if template is None:
return None
if template.is_system:
raise PermissionError("System templates are read-only")
if template.workspace_id is not None:
self._ensure_workspace_scope(context, template.workspace_id)
validate_template_scope(context, template)
version = await uow.summarization_templates.get_version(version_id)
if version is None or version.template_id != template_id:
raise ValueError("Template version not found")
updated = SummarizationTemplate(
id=template.id,
workspace_id=template.workspace_id,
name=template.name,
description=template.description,
is_system=template.is_system,
is_archived=template.is_archived,
current_version_id=version_id,
created_at=template.created_at,
updated_at=utc_now(),
created_by=template.created_by,
updated_by=context.user_id,
)
updated = build_restored_template(template, version_id=version_id, user_id=context.user_id)
saved = await uow.summarization_templates.update(updated)
await uow.commit()
@@ -265,9 +209,3 @@ class SummarizationTemplateService:
archived = await uow.summarization_templates.archive(template_id, context.user_id)
await uow.commit()
return archived
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from noteflow.domain.ports.unit_of_work import UnitOfWork

View File

@@ -14,7 +14,11 @@ from .oidc import OidcMixin
from .preferences import PreferencesMixin
from .project import ProjectMembershipMixin, ProjectMixin
from .streaming import StreamingMixin
from .summarization import SummarizationMixin
from .summarization import (
SummarizationConsentMixin,
SummarizationGenerationMixin,
SummarizationTemplatesMixin,
)
from .sync import SyncMixin
from .webhooks import WebhooksMixin
@@ -35,7 +39,9 @@ __all__ = [
"ProjectMembershipMixin",
"ProjectMixin",
"StreamingMixin",
"SummarizationMixin",
"SummarizationConsentMixin",
"SummarizationGenerationMixin",
"SummarizationTemplatesMixin",
"SyncMixin",
"WebhooksMixin",
]

View File

@@ -28,7 +28,11 @@ if TYPE_CHECKING:
from noteflow.application.services.identity import IdentityService
from noteflow.domain.identity.context import OperationContext
from noteflow.domain.identity.entities import Workspace, WorkspaceMembership
from noteflow.domain.identity.entities import (
Workspace,
WorkspaceMembership,
WorkspaceSettings,
)
logger = get_logger(__name__)
@@ -48,6 +52,34 @@ async def _resolve_auth_status(uow: UnitOfWork) -> tuple[bool, str]:
return False, ""
def _settings_proto_or_empty(
settings: WorkspaceSettings | None,
) -> noteflow_pb2.WorkspaceSettingsProto:
settings_proto = workspace_settings_to_proto(settings)
return settings_proto or noteflow_pb2.WorkspaceSettingsProto()
def _merge_workspace_settings(
current: WorkspaceSettings,
updates: WorkspaceSettings,
) -> WorkspaceSettings:
return replace(
current,
export_rules=updates.export_rules
if updates.export_rules is not None
else current.export_rules,
trigger_rules=updates.trigger_rules
if updates.trigger_rules is not None
else current.trigger_rules,
rag_enabled=updates.rag_enabled
if updates.rag_enabled is not None
else current.rag_enabled,
default_summarization_template=updates.default_summarization_template
if updates.default_summarization_template is not None
else current.default_summarization_template,
)
class IdentityMixin:
"""Mixin providing identity management functionality.
@@ -246,30 +278,14 @@ class IdentityMixin:
updates = proto_to_workspace_settings(request.settings)
if updates is None:
settings_proto = workspace_settings_to_proto(workspace.settings)
return settings_proto or noteflow_pb2.WorkspaceSettingsProto()
return _settings_proto_or_empty(workspace.settings)
current = workspace.settings
updated_settings = replace(
current,
export_rules=updates.export_rules
if updates.export_rules is not None
else current.export_rules,
trigger_rules=updates.trigger_rules
if updates.trigger_rules is not None
else current.trigger_rules,
rag_enabled=updates.rag_enabled
if updates.rag_enabled is not None
else current.rag_enabled,
default_summarization_template=updates.default_summarization_template
if updates.default_summarization_template is not None
else current.default_summarization_template,
)
updated_settings = _merge_workspace_settings(current, updates)
updated_workspace = replace(workspace, settings=updated_settings)
saved = await uow.workspaces.update(updated_workspace)
await uow.commit()
settings_proto = workspace_settings_to_proto(saved.settings)
return settings_proto or noteflow_pb2.WorkspaceSettingsProto()
return _settings_proto_or_empty(saved.settings)
async def _verify_workspace_access(
self,

View File

@@ -31,12 +31,18 @@ _FIELD_ENABLED = "enabled"
def parse_provider_id(provider_id_str: str) -> UUID:
"""Parse provider ID string to UUID, raising ValueError if invalid."""
return UUID(provider_id_str)
trimmed = provider_id_str.strip()
if not trimmed:
raise ValueError(ERR_INVALID_PROVIDER_ID)
return UUID(trimmed)
def parse_preset(preset_str: str) -> OidcProviderPreset:
"""Parse preset string to OidcProviderPreset enum."""
return OidcProviderPreset(preset_str.lower())
normalized = preset_str.strip().lower()
if not normalized:
raise ValueError(ERR_INVALID_PRESET)
return OidcProviderPreset(normalized)
async def validate_register_request(
@@ -158,13 +164,16 @@ def preset_config_to_proto(
Returns:
The protobuf OidcPresetProto message.
"""
default_scopes = list(preset_config.default_scopes)
documentation_url = preset_config.documentation_url or ""
notes = preset_config.notes or ""
return noteflow_pb2.OidcPresetProto(
preset=preset_config.preset.value,
display_name=preset_config.display_name,
description=preset_config.description,
default_scopes=list(preset_config.default_scopes),
documentation_url=preset_config.documentation_url or "",
notes=preset_config.notes or "",
default_scopes=default_scopes,
documentation_url=documentation_url,
notes=notes,
)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import TYPE_CHECKING
import numpy as np
@@ -15,6 +16,62 @@ from .._partials import clear_partial_buffer, maybe_emit_partial
if TYPE_CHECKING:
from ....proto import noteflow_pb2
from ...protocols import ServicerHost
from ....stream_state import MeetingStreamState
@dataclass(frozen=True)
class VadProcessingContext:
host: ServicerHost
meeting_id: str
state: "MeetingStreamState"
def _emit_vad_state_updates(
ctx: VadProcessingContext,
is_speech: bool,
) -> list[noteflow_pb2.TranscriptUpdate]:
from ....proto import noteflow_pb2
updates: list[noteflow_pb2.TranscriptUpdate] = []
if is_speech and not ctx.state.was_speaking:
updates.append(
create_vad_update(ctx.meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_START)
)
ctx.state.was_speaking = True
return updates
if not is_speech and ctx.state.was_speaking:
updates.append(
create_vad_update(ctx.meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_END)
)
ctx.state.was_speaking = False
return updates
async def _maybe_emit_partial_update(
ctx: VadProcessingContext,
audio: NDArray[np.float32],
is_speech: bool,
) -> noteflow_pb2.TranscriptUpdate | None:
if not is_speech:
return None
ctx.state.partial_buffer.append(audio)
return await maybe_emit_partial(ctx.host, ctx.meeting_id)
async def _iter_segment_updates(
ctx: VadProcessingContext,
audio: NDArray[np.float32],
is_speech: bool,
) -> AsyncIterator[noteflow_pb2.TranscriptUpdate]:
for audio_segment in ctx.state.segmenter.process_audio(audio, is_speech):
clear_partial_buffer(ctx.host, ctx.meeting_id)
async for update in process_audio_segment(
ctx.host,
ctx.meeting_id,
audio_segment.audio,
audio_segment.start_time,
):
yield update
async def process_audio_with_vad(
@@ -26,8 +83,6 @@ async def process_audio_with_vad(
Uses consolidated MeetingStreamState for O(1) lookup instead of 13+ dict accesses.
"""
from ....proto import noteflow_pb2
# Single dict lookup replaces 6+ separate lookups per audio chunk
state = host.get_stream_state(meeting_id)
if state is None:
@@ -35,40 +90,23 @@ async def process_audio_with_vad(
# Get VAD decision using consolidated state
is_speech = state.vad.process_chunk(audio)
ctx = VadProcessingContext(host=host, meeting_id=meeting_id, state=state)
# Streaming diarization (optional)
await host.process_streaming_diarization(meeting_id, audio)
# Emit VAD state change events using consolidated state
if is_speech and not state.was_speaking:
# Speech started
yield create_vad_update(meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_START)
state.was_speaking = True
elif not is_speech and state.was_speaking:
# Speech ended
yield create_vad_update(meeting_id, noteflow_pb2.UPDATE_TYPE_VAD_END)
state.was_speaking = False
for update in _emit_vad_state_updates(ctx, is_speech):
yield update
# Buffer audio for partial transcription (pre-allocated buffer handles copy)
if is_speech:
state.partial_buffer.append(audio)
# Check if we should emit a partial
partial_update = await maybe_emit_partial(host, meeting_id)
if partial_update is not None:
yield partial_update
partial_update = await _maybe_emit_partial_update(ctx, audio, is_speech)
if partial_update is not None:
yield partial_update
# Process through segmenter using consolidated state
for audio_segment in state.segmenter.process_audio(audio, is_speech):
# Clear partial buffer when we get a final segment
clear_partial_buffer(host, meeting_id)
async for update in process_audio_segment(
host,
meeting_id,
audio_segment.audio,
audio_segment.start_time,
):
yield update
async for update in _iter_segment_updates(ctx, audio, is_speech):
yield update
async def flush_segmenter(

View File

@@ -1,4 +1,4 @@
"""Summarization mixin composition."""
"""Summarization mixins."""
from __future__ import annotations
@@ -7,16 +7,7 @@ from ._generation_mixin import SummarizationGenerationMixin
from ._templates_mixin import SummarizationTemplatesMixin
class SummarizationMixin(
SummarizationGenerationMixin,
SummarizationConsentMixin,
SummarizationTemplatesMixin,
):
"""Composite mixin providing summarization functionality."""
__all__ = [
"SummarizationMixin",
"SummarizationConsentMixin",
"SummarizationGenerationMixin",
"SummarizationTemplatesMixin",

View File

@@ -24,9 +24,11 @@ def build_summary_context(
settings: SummarizationServiceSettings,
) -> SummaryTemplateContext:
"""Build summary template context from settings."""
max_key_points = max(settings.max_key_points, 0)
max_action_items = max(settings.max_action_items, 0)
return SummaryTemplateContext(
max_key_points=settings.max_key_points,
max_action_items=settings.max_action_items,
max_key_points=max_key_points,
max_action_items=max_action_items,
)

View File

@@ -6,6 +6,11 @@ from dataclasses import dataclass
from typing import TYPE_CHECKING
from uuid import UUID
from noteflow.application.services.summarization.template_service import (
SummarizationTemplateService,
TemplateCreatePayload,
TemplateUpdatePayload,
)
from noteflow.domain.ports.unit_of_work import UnitOfWork
from ...proto import noteflow_pb2
@@ -22,9 +27,6 @@ from ..errors import (
)
if TYPE_CHECKING:
from noteflow.application.services.summarization.template_service import (
SummarizationTemplateService,
)
from noteflow.domain.identity import OperationContext
@@ -136,15 +138,14 @@ async def create_summarization_template(
change_note = request.change_note if request.HasField("change_note") else None
try:
result = await ctx.template_service.create_template(
repo,
op_context,
payload = TemplateCreatePayload(
workspace_id=workspace_id,
name=request.name,
content=request.content,
description=description,
change_note=change_note,
)
result = await ctx.template_service.create_template(repo, op_context, payload)
except PermissionError as exc:
await abort_permission_denied(context, str(exc))
raise RuntimeError("Unreachable")
@@ -178,15 +179,14 @@ async def update_summarization_template(
change_note = request.change_note if request.HasField("change_note") else None
try:
result = await ctx.template_service.update_template(
repo,
op_context,
payload = TemplateUpdatePayload(
template_id=template_id,
name=name,
description=description,
content=content,
change_note=change_note,
)
result = await ctx.template_service.update_template(repo, op_context, payload)
except PermissionError as exc:
await abort_permission_denied(context, str(exc))
raise RuntimeError("Unreachable")

View File

@@ -3,8 +3,9 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import TypeVar, cast
from typing import Protocol, TypeVar, cast
import importlib
import grpc
from grpc import aio
@@ -18,84 +19,163 @@ from ._wrappers import (
_TRequest = TypeVar("_TRequest")
_TResponse = TypeVar("_TResponse")
RpcMethodHandler = grpc.RpcMethodHandler
class _TypedRpcMethodHandler(Protocol[_TRequest, _TResponse]):
unary_unary: Callable[
[_TRequest, aio.ServicerContext[_TRequest, _TResponse]],
Awaitable[_TResponse],
] | None
unary_stream: Callable[
[_TRequest, aio.ServicerContext[_TRequest, _TResponse]],
AsyncIterator[_TResponse],
] | None
stream_unary: Callable[
[AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]],
Awaitable[_TResponse],
] | None
stream_stream: Callable[
[AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]],
AsyncIterator[_TResponse],
] | None
request_deserializer: Callable[[bytes], _TRequest] | None
response_serializer: Callable[[_TResponse], bytes] | None
class _GrpcFactories(Protocol):
def unary_unary_rpc_method_handler(
self,
behavior: Callable[
[_TRequest, aio.ServicerContext[_TRequest, _TResponse]],
Awaitable[_TResponse],
],
*,
request_deserializer: Callable[[bytes], _TRequest] | None = None,
response_serializer: Callable[[_TResponse], bytes] | None = None,
) -> RpcMethodHandler[_TRequest, _TResponse]: ...
def unary_stream_rpc_method_handler(
self,
behavior: Callable[
[_TRequest, aio.ServicerContext[_TRequest, _TResponse]],
AsyncIterator[_TResponse],
],
*,
request_deserializer: Callable[[bytes], _TRequest] | None = None,
response_serializer: Callable[[_TResponse], bytes] | None = None,
) -> RpcMethodHandler[_TRequest, _TResponse]: ...
def stream_unary_rpc_method_handler(
self,
behavior: Callable[
[AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]],
Awaitable[_TResponse],
],
*,
request_deserializer: Callable[[bytes], _TRequest] | None = None,
response_serializer: Callable[[_TResponse], bytes] | None = None,
) -> RpcMethodHandler[_TRequest, _TResponse]: ...
def stream_stream_rpc_method_handler(
self,
behavior: Callable[
[AsyncIterator[_TRequest], aio.ServicerContext[_TRequest, _TResponse]],
AsyncIterator[_TResponse],
],
*,
request_deserializer: Callable[[bytes], _TRequest] | None = None,
response_serializer: Callable[[_TResponse], bytes] | None = None,
) -> RpcMethodHandler[_TRequest, _TResponse]: ...
_grpc = cast(_GrpcFactories, importlib.import_module("grpc"))
def wrap_unary_unary_handler[TRequest, TResponse](
handler: grpc.RpcMethodHandler[TRequest, TResponse],
handler: _TypedRpcMethodHandler[TRequest, TResponse],
method: str,
) -> grpc.RpcMethodHandler[TRequest, TResponse]:
) -> RpcMethodHandler[TRequest, TResponse]:
"""Create wrapped unary-unary handler with logging."""
# Cast required: gRPC stub types don't fully express the generic Callable signatures
return grpc.unary_unary_rpc_method_handler(
cast(
Callable[
[TRequest, aio.ServicerContext[TRequest, TResponse]],
Awaitable[TResponse],
],
wrap_unary_unary(handler.unary_unary, method),
),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
if handler.unary_unary is None:
raise TypeError("Unary-unary handler is missing")
wrapped: Callable[
[TRequest, aio.ServicerContext[TRequest, TResponse]],
Awaitable[TResponse],
] = wrap_unary_unary(handler.unary_unary, method)
request_deserializer = handler.request_deserializer
response_serializer = handler.response_serializer
return _grpc.unary_unary_rpc_method_handler(
wrapped,
request_deserializer=request_deserializer,
response_serializer=response_serializer,
)
def wrap_unary_stream_handler[TRequest, TResponse](
handler: grpc.RpcMethodHandler[TRequest, TResponse],
handler: _TypedRpcMethodHandler[TRequest, TResponse],
method: str,
) -> grpc.RpcMethodHandler[TRequest, TResponse]:
) -> RpcMethodHandler[TRequest, TResponse]:
"""Create wrapped unary-stream handler with logging."""
return grpc.unary_stream_rpc_method_handler(
cast(
Callable[
[TRequest, aio.ServicerContext[TRequest, TResponse]],
AsyncIterator[TResponse],
],
wrap_unary_stream(handler.unary_stream, method),
),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
if handler.unary_stream is None:
raise TypeError("Unary-stream handler is missing")
wrapped: Callable[
[TRequest, aio.ServicerContext[TRequest, TResponse]],
AsyncIterator[TResponse],
] = wrap_unary_stream(handler.unary_stream, method)
request_deserializer = handler.request_deserializer
response_serializer = handler.response_serializer
return _grpc.unary_stream_rpc_method_handler(
wrapped,
request_deserializer=request_deserializer,
response_serializer=response_serializer,
)
def wrap_stream_unary_handler[TRequest, TResponse](
handler: grpc.RpcMethodHandler[TRequest, TResponse],
handler: _TypedRpcMethodHandler[TRequest, TResponse],
method: str,
) -> grpc.RpcMethodHandler[TRequest, TResponse]:
) -> RpcMethodHandler[TRequest, TResponse]:
"""Create wrapped stream-unary handler with logging."""
return grpc.stream_unary_rpc_method_handler(
cast(
Callable[
[AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]],
Awaitable[TResponse],
],
wrap_stream_unary(handler.stream_unary, method),
),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
if handler.stream_unary is None:
raise TypeError("Stream-unary handler is missing")
wrapped: Callable[
[AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]],
Awaitable[TResponse],
] = wrap_stream_unary(handler.stream_unary, method)
request_deserializer = handler.request_deserializer
response_serializer = handler.response_serializer
return _grpc.stream_unary_rpc_method_handler(
wrapped,
request_deserializer=request_deserializer,
response_serializer=response_serializer,
)
def wrap_stream_stream_handler[TRequest, TResponse](
handler: grpc.RpcMethodHandler[TRequest, TResponse],
handler: _TypedRpcMethodHandler[TRequest, TResponse],
method: str,
) -> grpc.RpcMethodHandler[TRequest, TResponse]:
) -> RpcMethodHandler[TRequest, TResponse]:
"""Create wrapped stream-stream handler with logging."""
return grpc.stream_stream_rpc_method_handler(
cast(
Callable[
[AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]],
AsyncIterator[TResponse],
],
wrap_stream_stream(handler.stream_stream, method),
),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer,
if handler.stream_stream is None:
raise TypeError("Stream-stream handler is missing")
wrapped: Callable[
[AsyncIterator[TRequest], aio.ServicerContext[TRequest, TResponse]],
AsyncIterator[TResponse],
] = wrap_stream_stream(handler.stream_stream, method)
request_deserializer = handler.request_deserializer
response_serializer = handler.response_serializer
return _grpc.stream_stream_rpc_method_handler(
wrapped,
request_deserializer=request_deserializer,
response_serializer=response_serializer,
)
def create_logging_handler[TRequest, TResponse](
handler: grpc.RpcMethodHandler[TRequest, TResponse],
handler: RpcMethodHandler[TRequest, TResponse],
method: str,
) -> grpc.RpcMethodHandler[TRequest, TResponse]:
) -> RpcMethodHandler[TRequest, TResponse]:
"""Wrap an RPC handler to add request logging.
Args:
@@ -105,14 +185,15 @@ def create_logging_handler[TRequest, TResponse](
Returns:
Wrapped handler with logging.
"""
# Dispatch based on handler type
if handler.unary_unary is not None:
return wrap_unary_unary_handler(handler, method)
if handler.unary_stream is not None:
return wrap_unary_stream_handler(handler, method)
if handler.stream_unary is not None:
return wrap_stream_unary_handler(handler, method)
if handler.stream_stream is not None:
return wrap_stream_stream_handler(handler, method)
# cast required: grpc.RpcMethodHandler does not expose member signatures
typed_handler = cast(_TypedRpcMethodHandler[TRequest, TResponse], handler)
if typed_handler.unary_unary is not None:
return wrap_unary_unary_handler(typed_handler, method)
if typed_handler.unary_stream is not None:
return wrap_unary_stream_handler(typed_handler, method)
if typed_handler.stream_unary is not None:
return wrap_stream_unary_handler(typed_handler, method)
if typed_handler.stream_stream is not None:
return wrap_stream_stream_handler(typed_handler, method)
# Fallback: return original handler if type unknown
return handler

View File

@@ -14,11 +14,9 @@ from grpc import aio
from ._handler_factory import create_logging_handler
# TypeVars required for ServerInterceptor.intercept_service compatibility
_TRequest = TypeVar("_TRequest")
_TResponse = TypeVar("_TResponse")
class RequestLoggingInterceptor(aio.ServerInterceptor):
"""Interceptor that logs all RPC calls with timing and status.

View File

@@ -15,15 +15,17 @@ if TYPE_CHECKING:
def create_server() -> grpc.aio.Server:
"""Create async gRPC server with interceptors and limits."""
interceptors = [
RequestLoggingInterceptor(),
IdentityInterceptor(),
]
options = [
("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB
("grpc.max_receive_message_length", 100 * 1024 * 1024),
]
return grpc.aio.server(
interceptors=[
RequestLoggingInterceptor(),
IdentityInterceptor(),
],
options=[
("grpc.max_send_message_length", 100 * 1024 * 1024), # 100MB
("grpc.max_receive_message_length", 100 * 1024 * 1024),
],
interceptors=interceptors,
options=options,
)

View File

@@ -36,7 +36,9 @@ from ._mixins import (
ProjectMembershipMixin,
ProjectMixin,
StreamingMixin,
SummarizationMixin,
SummarizationConsentMixin,
SummarizationGenerationMixin,
SummarizationTemplatesMixin,
SyncMixin,
WebhooksMixin,
)
@@ -71,7 +73,9 @@ class NoteFlowServicer(
DiarizationMixin,
DiarizationJobMixin,
MeetingMixin,
SummarizationMixin,
SummarizationGenerationMixin,
SummarizationConsentMixin,
SummarizationTemplatesMixin,
AnnotationMixin,
ExportMixin,
EntitiesMixin,

View File

@@ -13,7 +13,8 @@ import numpy as np
from numpy.typing import NDArray
from noteflow.config.constants import DEFAULT_SAMPLE_RATE
from noteflow.infrastructure.logging import get_logger
from ._constants import SEGMENTER_STATE_TRANSITION_EVENT
from ._types import AudioSegment, SegmenterState
if TYPE_CHECKING:
@@ -180,47 +181,6 @@ class Segmenter:
return segment
def _handle_idle(
self,
audio: NDArray[np.float32],
is_speech: bool,
chunk_start: float,
) -> Iterator[AudioSegment]:
"""Handle audio in IDLE state."""
from noteflow.infrastructure.logging import get_logger
from ._constants import SEGMENTER_STATE_TRANSITION_EVENT
logger = get_logger(__name__)
if is_speech:
# Speech started - transition to SPEECH state
old_state = self._state
self._state = SegmenterState.SPEECH
self._speech_start_time = chunk_start
logger.debug(
SEGMENTER_STATE_TRANSITION_EVENT,
from_state=old_state.name,
to_state=self._state.name,
stream_time=round(self._stream_time, 2),
)
# Capture how much pre-speech audio we are including (O(1) lookup).
self._leading_duration = self._leading_buffer_samples / self.config.sample_rate
# Include leading buffer (pre-speech audio)
self._speech_buffer = list(self._leading_buffer)
self._speech_buffer_samples = self._leading_buffer_samples + len(audio)
self._speech_buffer.append(audio)
self._leading_buffer.clear()
self._leading_buffer_samples = 0
else:
# Still idle - maintain leading buffer
self._update_leading_buffer(audio)
yield from () # No segments emitted in IDLE
def _handle_speech(
self,
audio: NDArray[np.float32],
@@ -267,16 +227,44 @@ class Segmenter:
self._speech_buffer = []
self._speech_buffer_samples = 0
def _handle_idle(
self,
audio: NDArray[np.float32],
is_speech: bool,
chunk_start: float,
) -> Iterator[AudioSegment]:
"""Handle audio in IDLE state."""
logger = get_logger(__name__)
if is_speech:
old_state = self._state
self._state = SegmenterState.SPEECH
self._speech_start_time = chunk_start
logger.debug(
SEGMENTER_STATE_TRANSITION_EVENT,
from_state=old_state.name,
to_state=self._state.name,
stream_time=round(self._stream_time, 2),
)
self._leading_duration = self._leading_buffer_samples / self.config.sample_rate
self._speech_buffer = list(self._leading_buffer)
self._speech_buffer_samples = self._leading_buffer_samples + len(audio)
self._speech_buffer.append(audio)
self._leading_buffer.clear()
self._leading_buffer_samples = 0
else:
self._update_leading_buffer(audio)
yield from () # No segments emitted in IDLE
def _transition_to_trailing(
self,
audio: NDArray[np.float32],
chunk_duration: float,
) -> Iterator[AudioSegment]:
"""Transition from SPEECH to TRAILING state."""
from noteflow.infrastructure.logging import get_logger
from ._constants import SEGMENTER_STATE_TRANSITION_EVENT
logger = get_logger(__name__)
old_state = self._state
@@ -291,7 +279,6 @@ class Segmenter:
stream_time=round(self._stream_time, 2),
)
# Check if already past trailing threshold
if self._trailing_duration < self.config.trailing_silence:
return
@@ -309,10 +296,6 @@ class Segmenter:
def _resume_speech_from_trailing(self, audio: NDArray[np.float32]) -> None:
"""Resume speech by merging trailing buffer back into speech."""
from noteflow.infrastructure.logging import get_logger
from ._constants import SEGMENTER_STATE_TRANSITION_EVENT
logger = get_logger(__name__)
self._speech_buffer.extend(self._trailing_buffer)
@@ -335,10 +318,6 @@ class Segmenter:
chunk_duration: float,
) -> Iterator[AudioSegment]:
"""Accumulate trailing silence, emitting segment if threshold reached."""
from noteflow.infrastructure.logging import get_logger
from ._constants import SEGMENTER_STATE_TRANSITION_EVENT
logger = get_logger(__name__)
self._trailing_buffer.append(audio)
@@ -347,7 +326,6 @@ class Segmenter:
if self._trailing_duration < self.config.trailing_silence:
return
# Enough trailing silence - emit segment
segment = self._emit_segment()
if segment is not None:
yield segment
@@ -365,10 +343,8 @@ class Segmenter:
self._leading_buffer.append(audio)
self._leading_buffer_samples += len(audio)
# Calculate total buffer duration using cached sample count
total_duration = self._leading_buffer_samples / self.config.sample_rate
# Trim to configured leading buffer size (O(1) with deque.popleft)
while total_duration > self.config.leading_buffer and self._leading_buffer:
removed = self._leading_buffer.popleft()
self._leading_buffer_samples -= len(removed)

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass
from typing import Final, cast
import httpx
@@ -30,16 +31,21 @@ GRAPH_API_BASE: Final[str] = "https://graph.microsoft.com/v1.0"
GRAPH_API_MAX_PAGE_SIZE: Final[int] = 100 # Graph API maximum
@dataclass(frozen=True)
class EventFetchContext:
client: httpx.AsyncClient
headers: dict[str, str]
graph_api_base: str
parse_event_func: Callable[[OutlookEvent], CalendarEventInfo]
async def fetch_events(
client: httpx.AsyncClient,
headers: dict[str, str],
ctx: EventFetchContext,
query: OutlookEventQuery,
graph_api_base: str,
parse_event_func: Callable[[OutlookEvent], CalendarEventInfo],
) -> list[CalendarEventInfo]:
"""Fetch events with pagination handling."""
page_size = min(query.limit, GRAPH_API_MAX_PAGE_SIZE)
url: str | None = f"{graph_api_base}/me/calendarView"
url: str | None = f"{ctx.graph_api_base}/me/calendarView"
params: dict[str, str | int] | None = {
"startDateTime": query.start_time,
"endDateTime": query.end_time,
@@ -53,7 +59,7 @@ async def fetch_events(
all_events: list[CalendarEventInfo] = []
while url is not None:
response = await client.get(url, params=params, headers=headers)
response = await ctx.client.get(url, params=params, headers=ctx.headers)
raise_for_status(response)
parsed = parse_events_response(response)
if parsed is None:
@@ -61,7 +67,10 @@ async def fetch_events(
items, next_url = parsed
all_events, reached_limit = accumulate_events(
items, all_events, query.limit, parse_event_func
items,
all_events,
query.limit,
ctx.parse_event_func,
)
if reached_limit:
return all_events

View File

@@ -27,6 +27,47 @@ GRAPH_API_TIMEOUT: Final[float] = 30.0 # seconds
MAX_CONNECTIONS: Final[int] = 10
def _build_profile_request(
access_token: str,
graph_api_base: str,
) -> tuple[str, dict[str, str], dict[str, str]]:
url = f"{graph_api_base}/me"
params = {"$select": "mail,userPrincipalName,displayName"}
headers = {HTTP_AUTHORIZATION: f"{HTTP_BEARER_PREFIX}{access_token}"}
return url, params, headers
def _raise_for_profile_status(response: httpx.Response) -> None:
if response.status_code == HTTP_STATUS_UNAUTHORIZED:
raise OutlookCalendarError(ERR_TOKEN_EXPIRED)
if response.status_code != HTTP_STATUS_OK:
error_body = truncate_error_body(response.text)
logger.error("Microsoft Graph API error: %s", error_body)
raise OutlookCalendarError(f"{ERR_API_PREFIX}{error_body}")
def _parse_profile(response: httpx.Response) -> OutlookProfile:
data_value = response.json()
if not isinstance(data_value, dict):
raise OutlookCalendarError("Invalid user profile response")
return cast(OutlookProfile, data_value)
def _extract_email(profile: OutlookProfile) -> str:
email = profile.get("mail") or profile.get("userPrincipalName")
if not email:
raise OutlookCalendarError("No email in user profile response")
return str(email)
def _format_display_name(profile: OutlookProfile, email: str) -> str:
display_name_raw = profile.get("displayName")
if display_name_raw:
return str(display_name_raw)
return str(email).split("@")[0].replace(".", " ").title()
async def fetch_user_info(
access_token: str,
graph_api_base: str = GRAPH_API_BASE,
@@ -47,9 +88,7 @@ async def fetch_user_info(
Raises:
OutlookCalendarError: If API call fails.
"""
url = f"{graph_api_base}/me"
params = {"$select": "mail,userPrincipalName,displayName"}
headers = {HTTP_AUTHORIZATION: f"{HTTP_BEARER_PREFIX}{access_token}"}
url, params, headers = _build_profile_request(access_token, graph_api_base)
async with httpx.AsyncClient(
timeout=httpx.Timeout(timeout),
@@ -57,29 +96,9 @@ async def fetch_user_info(
) as client:
response = await client.get(url, params=params, headers=headers)
if response.status_code == HTTP_STATUS_UNAUTHORIZED:
raise OutlookCalendarError(ERR_TOKEN_EXPIRED)
_raise_for_profile_status(response)
profile = _parse_profile(response)
email = _extract_email(profile)
display_name = _format_display_name(profile, email)
if response.status_code != HTTP_STATUS_OK:
error_body = truncate_error_body(response.text)
logger.error("Microsoft Graph API error: %s", error_body)
raise OutlookCalendarError(f"{ERR_API_PREFIX}{error_body}")
data_value = response.json()
if not isinstance(data_value, dict):
raise OutlookCalendarError("Invalid user profile response")
data = cast(OutlookProfile, data_value)
email = data.get("mail") or data.get("userPrincipalName")
if not email:
raise OutlookCalendarError("No email in user profile response")
# Get display name, fall back to email prefix
display_name_raw = data.get("displayName")
display_name = (
str(display_name_raw)
if display_name_raw
else str(email).split("@")[0].replace(".", " ").title()
)
return str(email), display_name
return email, display_name

View File

@@ -13,7 +13,7 @@ from noteflow.config.constants.core import HOURS_PER_DAY
from noteflow.domain.ports.calendar import CalendarEventInfo, CalendarPort
from noteflow.infrastructure.logging import get_logger, log_timing
from ._event_fetcher import GRAPH_API_BASE, fetch_events
from ._event_fetcher import GRAPH_API_BASE, EventFetchContext, fetch_events
from ._event_parser import parse_outlook_event
from ._query_builder import build_event_query, build_headers
from ._user_info import fetch_user_info
@@ -68,9 +68,13 @@ class OutlookCalendarAdapter(CalendarPort):
timeout=httpx.Timeout(GRAPH_API_TIMEOUT),
limits=httpx.Limits(max_connections=MAX_CONNECTIONS),
) as client:
all_events = await fetch_events(
client, headers, query, self.GRAPH_API_BASE, parse_outlook_event
ctx = EventFetchContext(
client=client,
headers=headers,
graph_api_base=self.GRAPH_API_BASE,
parse_event_func=parse_outlook_event,
)
all_events = await fetch_events(ctx, query)
logger.info(
"outlook_calendar_events_fetched",

View File

@@ -50,8 +50,10 @@ def build_usage_event(
Returns:
Constructed UsageEvent.
"""
event_label = event_type
attributes_copy = dict(attributes)
return UsageEvent(
event_type=event_type,
event_type=event_label,
meeting_id=context.meeting_id,
provider_name=metrics.provider_name,
model_name=metrics.model_name,
@@ -60,7 +62,7 @@ def build_usage_event(
latency_ms=metrics.latency_ms,
success=context.success,
error_code=context.error_code,
attributes=attributes,
attributes=attributes_copy,
)

View File

@@ -108,7 +108,8 @@ class SqlAlchemyDiarizationJobRepository(BaseRepository):
Returns:
List of jobs ordered by creation time (newest first).
"""
return list(await list_for_meeting(self._session, self._execute_scalars, meeting_id))
jobs = await list_for_meeting(self._session, self._execute_scalars, meeting_id)
return list(jobs)
async def get_active_for_meeting(self, meeting_id: str) -> DiarizationJob | None:
"""Get active (QUEUED or RUNNING) job for a meeting.

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from uuid import UUID
from sqlalchemy import func, select
@@ -17,6 +18,15 @@ from noteflow.infrastructure.persistence.models.integrations import IntegrationS
logger = get_logger(__name__)
@dataclass(frozen=True)
class SyncRunQueryContext:
session: AsyncSession
execute_scalars_func: Callable[
[Select[tuple[IntegrationSyncRunModel]]],
Awaitable[list[IntegrationSyncRunModel]],
]
async def get_sync_run(
session: AsyncSession,
execute_scalar_func: Callable[[Select[tuple[IntegrationSyncRunModel]]], Awaitable[IntegrationSyncRunModel | None]],
@@ -83,8 +93,7 @@ async def update_sync_run(
async def list_sync_runs(
session: AsyncSession,
execute_scalars_func: Callable[[Select[tuple[IntegrationSyncRunModel]]], Awaitable[list[IntegrationSyncRunModel]]],
ctx: SyncRunQueryContext,
integration_id: UUID,
limit: int = 20,
offset: int = 0,
@@ -110,7 +119,7 @@ async def list_sync_runs(
.select_from(IntegrationSyncRunModel)
.where(IntegrationSyncRunModel.integration_id == integration_id)
)
total_value = await session.scalar(count_stmt)
total_value = await ctx.session.scalar(count_stmt)
total = total_value if total_value is not None else 0
# Fetch query
@@ -121,7 +130,7 @@ async def list_sync_runs(
.offset(offset)
.limit(limit)
)
models = await execute_scalars_func(stmt)
models = await ctx.execute_scalars_func(stmt)
runs = [SyncRunConverter.orm_to_domain(m) for m in models]
return runs, total

View File

@@ -25,7 +25,7 @@ from noteflow.infrastructure.persistence.repositories._base import (
from ._queries import get_by_provider, list_all, list_by_type
from ._secrets import get_secrets, set_secrets
from ._sync_runs import get_sync_run, list_sync_runs, update_sync_run
from ._sync_runs import SyncRunQueryContext, get_sync_run, list_sync_runs, update_sync_run
logger = get_logger(__name__)
@@ -253,6 +253,8 @@ class SqlAlchemyIntegrationRepository(
Returns:
Tuple of (sync runs newest first, total count).
"""
return await list_sync_runs(
self._session, self._execute_scalars, integration_id, limit, offset
ctx = SyncRunQueryContext(
session=self._session,
execute_scalars_func=self._execute_scalars,
)
return await list_sync_runs(ctx, integration_id, limit, offset)

View File

@@ -3,13 +3,13 @@
from __future__ import annotations
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime
from typing import Unpack
from typing import Protocol, Unpack
from uuid import UUID
from sqlalchemy import func, select
from sqlalchemy.engine import RowMapping
from typing import Protocol
from sqlalchemy.ext.asyncio import AsyncSession
from noteflow.domain.constants.fields import MODEL_NAME
@@ -39,6 +39,14 @@ class _EventTypeAggregateRow(Protocol):
error_count: int
@dataclass(frozen=True)
class ProviderAggregateQuery:
start_time: datetime
end_time: datetime
event_type: str | None = None
workspace_id: UUID | None = None
async def aggregate(
session: AsyncSession,
start_time: datetime,
@@ -82,19 +90,13 @@ def _row_to_usage_aggregate_from_event(row: _EventTypeAggregateRow) -> UsageAggr
async def aggregate_by_provider(
session: AsyncSession,
start_time: datetime,
end_time: datetime,
event_type: str | None = None,
workspace_id: UUID | None = None,
query: ProviderAggregateQuery,
) -> Sequence[ProviderUsageAggregate]:
"""Calculate aggregate statistics grouped by provider.
Args:
session: Database session.
start_time: Start of time range (inclusive).
end_time: End of time range (exclusive).
event_type: Optional filter by event type.
workspace_id: Optional filter by workspace.
query: Provider aggregation query options.
Returns:
List of per-provider aggregated statistics.
@@ -108,18 +110,18 @@ async def aggregate_by_provider(
*build_token_aggregate_columns(),
)
.where(
UsageEventModel.timestamp >= start_time,
UsageEventModel.timestamp < end_time,
UsageEventModel.timestamp >= query.start_time,
UsageEventModel.timestamp < query.end_time,
UsageEventModel.provider_name.isnot(None),
)
.group_by(UsageEventModel.provider_name)
.order_by(func.count(UsageEventModel.id).desc())
)
if event_type:
stmt = stmt.where(UsageEventModel.event_type == event_type)
if workspace_id:
stmt = stmt.where(UsageEventModel.workspace_id == workspace_id)
if query.event_type:
stmt = stmt.where(UsageEventModel.event_type == query.event_type)
if query.workspace_id:
stmt = stmt.where(UsageEventModel.workspace_id == query.workspace_id)
result = await session.execute(stmt)
rows = result.all()

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from datetime import datetime
from typing import Unpack
from uuid import UUID
@@ -23,9 +24,17 @@ from noteflow.infrastructure.persistence.repositories._usage_aggregates import (
from ._converters import to_domain
@dataclass(frozen=True)
class UsageEventQueryContext:
session: AsyncSession
execute_scalars_func: Callable[
[Select[tuple[UsageEventModel]]],
Awaitable[list[UsageEventModel]],
]
async def get_by_meeting(
session: AsyncSession,
execute_scalars_func: Callable[[Select[tuple[UsageEventModel]]], Awaitable[list[UsageEventModel]]],
ctx: UsageEventQueryContext,
meeting_id: UUID,
event_type: str | None = None,
limit: int = 100,
@@ -51,7 +60,7 @@ async def get_by_meeting(
if event_type:
stmt = stmt.where(UsageEventModel.event_type == event_type)
models = await execute_scalars_func(stmt)
models = await ctx.execute_scalars_func(stmt)
return [to_domain(m) for m in models]

View File

@@ -21,13 +21,14 @@ from noteflow.infrastructure.persistence.repositories._usage_aggregates import (
)
from ._aggregations import (
ProviderAggregateQuery,
aggregate,
aggregate_by_event_type,
aggregate_by_provider,
count_by_event_type,
)
from ._converters import event_to_model
from ._queries import get_by_meeting, get_in_time_range
from ._queries import UsageEventQueryContext, get_by_meeting, get_in_time_range
# Re-export for backward compatibility
__all__ = [
@@ -88,9 +89,11 @@ class SqlAlchemyUsageEventRepository(BaseRepository):
Returns:
Sequence of usage events, newest first.
"""
return await get_by_meeting(
self._session, self._execute_scalars, meeting_id, event_type, limit
ctx = UsageEventQueryContext(
session=self._session,
execute_scalars_func=self._execute_scalars,
)
return await get_by_meeting(ctx, meeting_id, event_type, limit)
async def get_in_time_range(
self,
@@ -148,9 +151,13 @@ class SqlAlchemyUsageEventRepository(BaseRepository):
Returns:
List of per-provider aggregated statistics.
"""
return await aggregate_by_provider(
self._session, start_time, end_time, event_type, workspace_id
query = ProviderAggregateQuery(
start_time=start_time,
end_time=end_time,
event_type=event_type,
workspace_id=workspace_id,
)
return await aggregate_by_provider(self._session, query)
async def aggregate_by_event_type(
self,

View File

@@ -6,6 +6,7 @@ import re
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import datetime
from typing import TypeVar
@dataclass(frozen=True)
@@ -115,11 +116,35 @@ SUMMARY_GETTERS: dict[str, Callable[[SummaryTemplateContext], str | None]] = {
}
def _resolve_metadata(path: str, metadata: dict[str, str]) -> str | None:
key = path.split(".", maxsplit=1)[1] if "." in path else ""
def _resolve_metadata_placeholder(
path: str,
prefix: str,
metadata: dict[str, str],
) -> tuple[bool, str | None]:
if not path.startswith(prefix):
return False, None
key = path.removeprefix(prefix)
if not key:
return None
return metadata.get(key)
return True, None
return True, metadata.get(key)
_TContext = TypeVar("_TContext")
def _resolve_namespace(
path: str,
prefix: str,
ctx: _TContext | None,
getters: dict[str, Callable[[_TContext], str | None]],
) -> tuple[bool, str | None]:
if not path.startswith(prefix):
return False, None
if ctx is None:
return True, None
key = path.removeprefix(prefix)
getter = getters.get(key)
return True, getter(ctx) if getter else None
def resolve_placeholder(path: str, context: TemplateContext) -> str | None:
@@ -127,43 +152,29 @@ def resolve_placeholder(path: str, context: TemplateContext) -> str | None:
if path == "style_instructions":
return context.style_instructions or ""
if path.startswith("metadata."):
return _resolve_metadata(path, context.metadata)
handled, value = _resolve_metadata_placeholder(path, "metadata.", context.metadata)
if handled:
return value
if path.startswith("meeting.metadata."):
return _resolve_metadata(path.replace("meeting.", "", 1), context.meeting.metadata)
if path.startswith("meeting."):
key = path.removeprefix("meeting.")
getter = MEETING_GETTERS.get(key)
return getter(context.meeting) if getter else None
if path.startswith("project."):
if context.project is None:
return None
key = path.removeprefix("project.")
getter = PROJECT_GETTERS.get(key)
return getter(context.project) if getter else None
if path.startswith("workspace."):
if context.workspace is None:
return None
key = path.removeprefix("workspace.")
getter = WORKSPACE_GETTERS.get(key)
return getter(context.workspace) if getter else None
if path.startswith("user."):
if context.user is None:
return None
key = path.removeprefix("user.")
getter = USER_GETTERS.get(key)
return getter(context.user) if getter else None
if path.startswith("summary."):
key = path.removeprefix("summary.")
getter = SUMMARY_GETTERS.get(key)
return getter(context.summary) if getter else None
handled, value = _resolve_metadata_placeholder(
path,
"meeting.metadata.",
context.meeting.metadata,
)
if handled:
return value
namespaces = (
("meeting.", context.meeting, MEETING_GETTERS),
("project.", context.project, PROJECT_GETTERS),
("workspace.", context.workspace, WORKSPACE_GETTERS),
("user.", context.user, USER_GETTERS),
("summary.", context.summary, SUMMARY_GETTERS),
)
for prefix, ctx, getters in namespaces:
handled, value = _resolve_namespace(path, prefix, ctx, getters)
if handled:
return value
return None

View File

@@ -69,13 +69,15 @@ class AppAudioSettings:
def to_audio_activity_settings(settings: AppAudioSettings) -> AudioActivitySettings:
"""Convert app audio settings to audio activity settings."""
max_history = settings.max_history
min_samples = settings.min_samples
return AudioActivitySettings(
enabled=settings.enabled,
threshold_db=settings.threshold_db,
window_seconds=settings.window_seconds,
min_active_ratio=settings.min_active_ratio,
min_samples=settings.min_samples,
max_history=settings.max_history,
min_samples=min_samples,
max_history=max_history,
weight=settings.weight,
)

View File

@@ -1,63 +1,5 @@
{
"generated_at": "2026-01-06T10:09:03.265932+00:00",
"rules": {
"deep_nesting": [
"deep_nesting|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|depth=4"
],
"feature_envy": [
"feature_envy|src/noteflow/application/services/summarization/template_service.py|SummarizationTemplateService.restore_version|template=11_vs_self=2",
"feature_envy|src/noteflow/application/services/summarization/template_service.py|SummarizationTemplateService.update_template|template=12_vs_self=2",
"feature_envy|src/noteflow/grpc/_mixins/identity.py|IdentityMixin.UpdateWorkspaceSettings|updates=8_vs_self=3"
],
"god_class": [
"god_class|src/noteflow/grpc/_mixins/summarization.py|SummarizationMixin|methods=16",
"god_class|src/noteflow/grpc/_mixins/summarization/__init__.py|SummarizationMixin|methods=16"
],
"high_complexity": [
"high_complexity|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|complexity=27",
"high_complexity|src/noteflow/infrastructure/summarization/template_renderer.py|resolve_placeholder|complexity=13"
],
"long_method": [
"long_method|src/noteflow/application/services/summarization/template_service.py|create_template|lines=52",
"long_method|src/noteflow/application/services/summarization/template_service.py|update_template|lines=65",
"long_method|src/noteflow/grpc/_mixins/identity.py|UpdateWorkspaceSettings|lines=55",
"long_method|src/noteflow/grpc/_mixins/streaming/_processing/_vad_processing.py|process_audio_with_vad|lines=52",
"long_method|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|lines=127",
"long_method|src/noteflow/infrastructure/calendar/outlook/_user_info.py|fetch_user_info|lines=56"
],
"long_parameter_list": [
"long_parameter_list|src/noteflow/application/services/summarization/template_service.py|create_template|params=7",
"long_parameter_list|src/noteflow/application/services/summarization/template_service.py|update_template|params=7",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|archive_summarization_template|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|create_summarization_template|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|get_summarization_template|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|list_summarization_template_versions|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|list_summarization_templates|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|restore_summarization_template_version|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_crud.py|update_summarization_template|params=5",
"long_parameter_list|src/noteflow/grpc/_mixins/summarization/_template_resolution.py|resolve_template_prompt|params=8",
"long_parameter_list|src/noteflow/infrastructure/calendar/outlook/_event_fetcher.py|fetch_events|params=5",
"long_parameter_list|src/noteflow/infrastructure/persistence/repositories/integration/_sync_runs.py|list_sync_runs|params=5",
"long_parameter_list|src/noteflow/infrastructure/persistence/repositories/usage_event/_aggregations.py|aggregate_by_provider|params=5",
"long_parameter_list|src/noteflow/infrastructure/persistence/repositories/usage_event/_queries.py|get_by_meeting|params=5"
],
"module_size_soft": [
"module_size_soft|src/noteflow/infrastructure/asr/segmenter/segmenter.py|module|lines=375"
],
"thin_wrapper": [
"thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|parse_preset|OidcProviderPreset",
"thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|parse_provider_id|UUID",
"thin_wrapper|src/noteflow/grpc/_mixins/oidc/_helpers.py|preset_config_to_proto|OidcPresetProto",
"thin_wrapper|src/noteflow/grpc/_mixins/summarization/_context_builders.py|build_summary_context|SummaryTemplateContext",
"thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_stream_stream_handler|stream_stream_rpc_method_handler",
"thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_stream_unary_handler|stream_unary_rpc_method_handler",
"thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_unary_stream_handler|unary_stream_rpc_method_handler",
"thin_wrapper|src/noteflow/grpc/interceptors/logging/_handler_factory.py|wrap_unary_unary_handler|unary_unary_rpc_method_handler",
"thin_wrapper|src/noteflow/grpc/server/_setup.py|create_server|server",
"thin_wrapper|src/noteflow/infrastructure/observability/usage/_helpers.py|build_usage_event|UsageEvent",
"thin_wrapper|src/noteflow/infrastructure/persistence/repositories/diarization_job/diarization_job_repo.py|list_for_meeting|list",
"thin_wrapper|src/noteflow/infrastructure/triggers/app_audio/app_audio.py|to_audio_activity_settings|AudioActivitySettings"
]
},
"generated_at": "2026-01-06T10:48:45.842264+00:00",
"rules": {},
"schema_version": 1
}