Files
noteflow/src/noteflow/grpc/_mixins/summarization.py
2026-01-06 08:03:04 +00:00

209 lines
8.3 KiB
Python

"""Summarization mixin for gRPC service."""
from __future__ import annotations
from typing import TYPE_CHECKING, cast
from noteflow.domain.entities import Segment, Summary
from noteflow.domain.ports.unit_of_work import UnitOfWork
from noteflow.domain.summarization import ProviderUnavailableError
from noteflow.domain.value_objects import MeetingId
from noteflow.infrastructure.logging import get_logger
from noteflow.infrastructure.summarization._parsing import build_style_prompt
from ..proto import noteflow_pb2
from ._types import GrpcContext
from .converters import parse_meeting_id_or_abort, summary_to_proto
from .errors import ENTITY_MEETING, abort_failed_precondition, abort_not_found
if TYPE_CHECKING:
from collections.abc import Callable
from noteflow.application.services.summarization import SummarizationService
from noteflow.application.services.webhook_service import WebhookService
from noteflow.domain.entities import Meeting
logger = get_logger(__name__)
class SummarizationMixin:
"""Mixin providing summarization functionality.
Requires host to implement SummarizationServicer protocol.
Works with both database and memory backends via RepositoryProvider.
"""
summarization_service: SummarizationService | None
webhook_service: WebhookService | None
create_repository_provider: Callable[..., object]
async def GenerateSummary(
self,
request: noteflow_pb2.GenerateSummaryRequest,
context: GrpcContext,
) -> noteflow_pb2.Summary:
"""Generate meeting summary using SummarizationService with fallback.
The potentially slow summarization step is executed outside the repository
context to avoid holding connections while waiting on LLMs.
"""
meeting_id = await parse_meeting_id_or_abort(request.meeting_id, context)
style_prompt = self._build_style_prompt_from_request(request)
# 1) Load meeting, existing summary, and segments in a short transaction
meeting, existing, segments = await self._load_meeting_context(meeting_id, request, context)
if existing and not request.force_regenerate:
return summary_to_proto(existing)
# 2) Run summarization outside repository context (slow LLM call)
summary = await self.summarize_or_placeholder(meeting_id, segments, style_prompt)
# 3) Persist in a fresh transaction
async with cast(UnitOfWork, self.create_repository_provider()) as repo:
saved = await repo.summaries.save(summary)
await repo.commit()
await self._trigger_summary_webhook(meeting, saved)
return summary_to_proto(saved)
def _build_style_prompt_from_request(
self,
request: noteflow_pb2.GenerateSummaryRequest,
) -> str | None:
"""Build style prompt from proto options if provided."""
if not request.HasField("options"):
return None
return build_style_prompt(
tone=request.options.tone or None,
format_style=request.options.format or None,
verbosity=request.options.verbosity or None,
) or None # Convert empty string to None
async def _load_meeting_context(
self,
meeting_id: MeetingId,
request: noteflow_pb2.GenerateSummaryRequest,
context: GrpcContext,
) -> tuple[Meeting, Summary | None, list[Segment]]:
"""Load meeting, existing summary, and segments."""
async with cast(UnitOfWork, self.create_repository_provider()) as repo:
meeting = await repo.meetings.get(meeting_id)
if meeting is None:
await abort_not_found(context, ENTITY_MEETING, request.meeting_id)
raise # Unreachable but helps type checker
existing = await repo.summaries.get_by_meeting(meeting.id)
segments = list(await repo.segments.get_by_meeting(meeting.id))
return meeting, existing, segments
async def _trigger_summary_webhook(self, meeting: Meeting, summary: Summary) -> None:
"""Trigger summary.generated webhook (fire-and-forget)."""
if self.webhook_service is None:
return
try:
meeting.summary = summary
await self.webhook_service.trigger_summary_generated(meeting)
# INTENTIONAL BROAD HANDLER: Fire-and-forget webhook
# - Webhook failures must never block summarization RPC
except Exception:
logger.exception("Failed to trigger summary.generated webhooks")
async def summarize_or_placeholder(
self,
meeting_id: MeetingId,
segments: list[Segment],
style_prompt: str | None = None,
) -> Summary:
"""Try to summarize via service, fallback to placeholder on failure."""
if self.summarization_service is None:
logger.warning("SummarizationService not configured; using placeholder summary")
return self.generate_placeholder_summary(meeting_id, segments)
try:
result = await self.summarization_service.summarize(
meeting_id=meeting_id,
segments=segments,
style_prompt=style_prompt,
)
summary = result.summary
provider_name = summary.provider_name
model_name = summary.model_name
logger.info(
"Generated summary using %s/%s",
provider_name,
model_name,
)
return summary
except ProviderUnavailableError as exc:
logger.warning("Summarization provider unavailable; using placeholder: %s", exc)
except (TimeoutError, RuntimeError, ValueError) as exc:
logger.exception(
"Summarization failed (%s); using placeholder summary", type(exc).__name__
)
return self.generate_placeholder_summary(meeting_id, segments)
def generate_placeholder_summary(
self,
meeting_id: MeetingId,
segments: list[Segment],
) -> Summary:
"""Generate a lightweight placeholder summary when summarization fails."""
full_text = " ".join(s.text for s in segments)
executive = f"{full_text[:200]}..." if len(full_text) > 200 else full_text
executive = executive or "No transcript available."
return Summary(
meeting_id=meeting_id,
executive_summary=executive,
key_points=[],
action_items=[],
provider_name="placeholder",
model_name="v0",
)
async def GrantCloudConsent(
self,
request: noteflow_pb2.GrantCloudConsentRequest,
context: GrpcContext,
) -> noteflow_pb2.GrantCloudConsentResponse:
"""Grant consent for cloud-based summarization."""
if self.summarization_service is None:
await abort_failed_precondition(
context,
"Summarization service not available",
)
raise # Unreachable but helps type checker
await self.summarization_service.grant_cloud_consent()
logger.info("Cloud consent granted")
return noteflow_pb2.GrantCloudConsentResponse()
async def RevokeCloudConsent(
self,
request: noteflow_pb2.RevokeCloudConsentRequest,
context: GrpcContext,
) -> noteflow_pb2.RevokeCloudConsentResponse:
"""Revoke consent for cloud-based summarization."""
if self.summarization_service is None:
await abort_failed_precondition(
context,
"Summarization service not available",
)
raise # Unreachable but helps type checker
await self.summarization_service.revoke_cloud_consent()
logger.info("Cloud consent revoked")
return noteflow_pb2.RevokeCloudConsentResponse()
async def GetCloudConsentStatus(
self,
request: noteflow_pb2.GetCloudConsentStatusRequest,
context: GrpcContext,
) -> noteflow_pb2.GetCloudConsentStatusResponse:
"""Return current cloud consent status."""
if self.summarization_service is None:
# Default to not granted if service unavailable
return noteflow_pb2.GetCloudConsentStatusResponse(consent_granted=False)
return noteflow_pb2.GetCloudConsentStatusResponse(
consent_granted=self.summarization_service.settings.cloud_consent_granted,
)