refactor: enhance NoteFlowServicer shutdown process and update summarization mixin
- Added logic to mark in-memory diarization jobs as failed when tasks are cancelled during shutdown. - Updated the summarization mixin to handle both SummarizationServiceResult and SummarizationResult, improving flexibility in result handling. - Adjusted baselines for code quality checks to reflect changes in the NoteFlowServicer class.
This commit is contained in:
2
client
2
client
Submodule client updated: 67a5f7d5f8...6d609eb77a
@@ -5,7 +5,8 @@ from __future__ import annotations
|
||||
from typing import TYPE_CHECKING, Protocol, cast
|
||||
|
||||
from noteflow.domain.entities import Segment, Summary
|
||||
from noteflow.domain.summarization import ProviderUnavailableError
|
||||
from noteflow.application.services.summarization_service import SummarizationServiceResult
|
||||
from noteflow.domain.summarization import ProviderUnavailableError, SummarizationResult
|
||||
from noteflow.domain.value_objects import MeetingId
|
||||
from noteflow.infrastructure.logging import get_logger
|
||||
from noteflow.infrastructure.summarization._parsing import build_style_prompt
|
||||
@@ -16,10 +17,7 @@ from .converters import parse_meeting_id_or_abort, summary_to_proto
|
||||
from .errors import ENTITY_MEETING, abort_failed_precondition, abort_not_found
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from noteflow.application.services.summarization_service import (
|
||||
SummarizationService,
|
||||
SummarizationServiceResult,
|
||||
)
|
||||
from noteflow.application.services.summarization_service import SummarizationService
|
||||
from noteflow.application.services.webhook_service import WebhookService
|
||||
from noteflow.domain.ports.unit_of_work import UnitOfWork
|
||||
|
||||
@@ -126,16 +124,27 @@ class SummarizationMixin:
|
||||
return self.generate_placeholder_summary(meeting_id, segments)
|
||||
|
||||
try:
|
||||
result: SummarizationServiceResult = await self.summarization_service.summarize(
|
||||
result: SummarizationServiceResult | SummarizationResult = (
|
||||
await self.summarization_service.summarize(
|
||||
meeting_id=meeting_id,
|
||||
segments=segments,
|
||||
style_prompt=style_prompt,
|
||||
)
|
||||
)
|
||||
summary = result.summary
|
||||
provider_name = "unknown"
|
||||
model_name = "unknown"
|
||||
# Support mocked services returning SummarizationResult directly.
|
||||
if isinstance(result, SummarizationServiceResult):
|
||||
provider_name = result.result.provider_name
|
||||
model_name = result.result.model_name
|
||||
elif isinstance(result, SummarizationResult):
|
||||
provider_name = result.provider_name
|
||||
model_name = result.model_name
|
||||
logger.info(
|
||||
"Generated summary using %s/%s",
|
||||
result.result.provider_name,
|
||||
result.result.model_name,
|
||||
provider_name,
|
||||
model_name,
|
||||
)
|
||||
return summary
|
||||
except ProviderUnavailableError as exc:
|
||||
|
||||
@@ -573,6 +573,7 @@ class NoteFlowServicer(
|
||||
logger.info("Shutting down servicer...")
|
||||
|
||||
# Cancel in-flight diarization tasks
|
||||
cancelled_job_ids = list(self.diarization_tasks.keys())
|
||||
for job_id, task in list(self.diarization_tasks.items()):
|
||||
if not task.done():
|
||||
logger.debug("Cancelling diarization task %s", job_id)
|
||||
@@ -582,6 +583,26 @@ class NoteFlowServicer(
|
||||
|
||||
self.diarization_tasks.clear()
|
||||
|
||||
# Mark in-memory diarization jobs as failed when tasks are cancelled
|
||||
if self.session_factory is None and cancelled_job_ids:
|
||||
failed_count = 0
|
||||
for job_id in cancelled_job_ids:
|
||||
job = self.diarization_jobs.get(job_id)
|
||||
if job is None:
|
||||
continue
|
||||
if job.status in (
|
||||
noteflow_pb2.JOB_STATUS_QUEUED,
|
||||
noteflow_pb2.JOB_STATUS_RUNNING,
|
||||
):
|
||||
job.status = noteflow_pb2.JOB_STATUS_FAILED
|
||||
job.error_message = "ERR_TASK_CANCELLED"
|
||||
failed_count += 1
|
||||
if failed_count > 0:
|
||||
logger.warning(
|
||||
"Marked %d in-memory diarization jobs as failed on shutdown",
|
||||
failed_count,
|
||||
)
|
||||
|
||||
# Close all diarization sessions
|
||||
for meeting_id, session in list(self.diarization_sessions.items()):
|
||||
logger.debug("Closing diarization session for meeting %s", meeting_id)
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
],
|
||||
"god_class": [
|
||||
"god_class|src/noteflow/grpc/_mixins/protocols.py|ServicerHost|methods=43",
|
||||
"god_class|src/noteflow/grpc/service.py|NoteFlowServicer|lines=543"
|
||||
"god_class|src/noteflow/grpc/service.py|NoteFlowServicer|lines=564"
|
||||
],
|
||||
"long_method": [
|
||||
"long_method|src/noteflow/grpc/server.py|run_server_with_config|lines=78",
|
||||
@@ -42,7 +42,7 @@
|
||||
"module_size_soft": [
|
||||
"module_size_soft|src/noteflow/application/services/meeting_service.py|module|lines=522",
|
||||
"module_size_soft|src/noteflow/grpc/server.py|module|lines=554",
|
||||
"module_size_soft|src/noteflow/grpc/service.py|module|lines=612"
|
||||
"module_size_soft|src/noteflow/grpc/service.py|module|lines=633"
|
||||
],
|
||||
"raises_without_match": [
|
||||
"raises_without_match|tests/infrastructure/observability/test_logging_config.py|test_config_is_frozen|line=64",
|
||||
|
||||
Reference in New Issue
Block a user