feat(grpc): add DeleteMeetings bulk delete endpoint

- Add DeleteMeetings RPC to proto schema with request/response messages
- Implement Python backend handler in MeetingMixin
- Extract bulk delete logic to _bulk_delete_ops.py module
- Skip meetings in RECORDING or STOPPING state
- Return aggregated results with succeeded/failed/skipped IDs
- Add comprehensive logging for bulk operations

Refs: mass-delete-meetings plan tasks 1-2
This commit is contained in:
2026-01-26 09:50:00 +00:00
parent bd48505249
commit 6d4725db1d
6 changed files with 669 additions and 442 deletions

View File

@@ -0,0 +1,98 @@
"""Bulk meeting deletion operations for gRPC service."""
from __future__ import annotations
from typing import TYPE_CHECKING
from noteflow.domain.errors import DomainError
from noteflow.domain.value_objects import MeetingState
from noteflow.infrastructure.logging import get_logger
from ..converters import parse_meeting_id_or_abort
from .._repository_protocols import MeetingRepositoryProvider
if TYPE_CHECKING:
from .._types import GrpcContext
logger = get_logger(__name__)
def _is_active_meeting(state: MeetingState) -> bool:
"""Check if meeting is actively recording or stopping."""
return state in (MeetingState.RECORDING, MeetingState.STOPPING)
async def aggregate_bulk_delete_results(
repo: MeetingRepositoryProvider,
meeting_ids: list[str],
context: GrpcContext,
) -> tuple[list[str], list[str], list[str]]:
"""Aggregate deletion results for multiple meetings.
Returns (succeeded_ids, failed_ids, skipped_ids).
"""
succeeded_ids: list[str] = []
failed_ids: list[str] = []
skipped_ids: list[str] = []
for meeting_id_str in meeting_ids:
succeeded, failed, skipped = await process_bulk_delete(repo, meeting_id_str, context)
if succeeded:
succeeded_ids.append(succeeded)
if failed:
failed_ids.append(failed)
if skipped:
skipped_ids.append(skipped)
return succeeded_ids, failed_ids, skipped_ids
async def process_bulk_delete(
repo: MeetingRepositoryProvider,
meeting_id_str: str,
context: GrpcContext,
) -> tuple[str | None, str | None, str | None]:
"""Process deletion of a single meeting.
Returns (succeeded_id, failed_id, skipped_id) - exactly one will be non-None.
"""
try:
meeting_id = await parse_meeting_id_or_abort(meeting_id_str, context)
meeting = await repo.meetings.get(meeting_id)
if meeting is None:
logger.warning(
"DeleteMeetings: meeting not found",
meeting_id=meeting_id_str,
)
return None, meeting_id_str, None
if _is_active_meeting(meeting.state):
logger.debug(
"DeleteMeetings: skipping active meeting",
meeting_id=meeting_id_str,
state=meeting.state.value,
)
return None, None, meeting_id_str
success = await repo.meetings.delete(meeting_id)
if success:
logger.debug(
"DeleteMeetings: meeting deleted",
meeting_id=meeting_id_str,
)
return meeting_id_str, None, None
logger.warning(
"DeleteMeetings: delete failed",
meeting_id=meeting_id_str,
)
return None, meeting_id_str, None
except DomainError as e:
logger.exception(
"DeleteMeetings: domain error",
meeting_id=meeting_id_str,
error=str(e),
)
return None, meeting_id_str, None

View File

@@ -31,6 +31,7 @@ from ._stop_ops import (
transition_to_stopped,
wait_for_stream_exit,
)
from ._bulk_delete_ops import aggregate_bulk_delete_results
if TYPE_CHECKING:
from collections.abc import Callable
@@ -255,3 +256,41 @@ class MeetingMixin:
await repo.commit()
logger.info("Meeting deleted", meeting_id=request.meeting_id)
return noteflow_pb2.DeleteMeetingResponse(success=True)
async def DeleteMeetings(
self,
request: noteflow_pb2.DeleteMeetingsRequest,
context: GrpcContext,
) -> noteflow_pb2.DeleteMeetingsResponse:
"""Delete multiple meetings in bulk.
Skips meetings that are actively recording or stopping.
Returns aggregated results with succeeded, failed, and skipped IDs.
"""
logger.info(
"DeleteMeetings requested",
count=len(request.meeting_ids),
)
async with cast(MeetingRepositoryProvider, self.create_repository_provider()) as repo:
succeeded_ids, failed_ids, skipped_ids = (
await aggregate_bulk_delete_results(
repo, list(request.meeting_ids), context
)
)
await repo.commit()
logger.info(
"Bulk delete complete",
succeeded_count=len(succeeded_ids),
failed_count=len(failed_ids),
skipped_count=len(skipped_ids),
)
return noteflow_pb2.DeleteMeetingsResponse(
deleted_count=len(succeeded_ids),
succeeded_ids=succeeded_ids,
failed_ids=failed_ids,
skipped_ids=skipped_ids,
error_message="",
)

View File

@@ -19,6 +19,7 @@ service NoteFlowService {
rpc ListMeetings(ListMeetingsRequest) returns (ListMeetingsResponse);
rpc GetMeeting(GetMeetingRequest) returns (Meeting);
rpc DeleteMeeting(DeleteMeetingRequest) returns (DeleteMeetingResponse);
rpc DeleteMeetings(DeleteMeetingsRequest) returns (DeleteMeetingsResponse);
// Summary generation
rpc GenerateSummary(GenerateSummaryRequest) returns (Summary);
@@ -413,6 +414,28 @@ message DeleteMeetingResponse {
bool success = 1;
}
message DeleteMeetingsRequest {
// Meeting IDs to delete
repeated string meeting_ids = 1;
}
message DeleteMeetingsResponse {
// Number of meetings successfully deleted
int32 deleted_count = 1;
// Meeting IDs that were successfully deleted
repeated string succeeded_ids = 2;
// Meeting IDs that failed to delete
repeated string failed_ids = 3;
// Meeting IDs that were skipped (e.g., active recordings)
repeated string skipped_ids = 4;
// Error message if batch operation failed
string error_message = 5;
}
// =============================================================================
// Summary Messages
// =============================================================================

File diff suppressed because one or more lines are too long

View File

@@ -393,6 +393,26 @@ class DeleteMeetingResponse(_message.Message):
success: bool
def __init__(self, success: bool = ...) -> None: ...
class DeleteMeetingsRequest(_message.Message):
__slots__ = ("meeting_ids",)
MEETING_IDS_FIELD_NUMBER: _ClassVar[int]
meeting_ids: _containers.RepeatedScalarFieldContainer[str]
def __init__(self, meeting_ids: _Optional[_Iterable[str]] = ...) -> None: ...
class DeleteMeetingsResponse(_message.Message):
__slots__ = ("deleted_count", "succeeded_ids", "failed_ids", "skipped_ids", "error_message")
DELETED_COUNT_FIELD_NUMBER: _ClassVar[int]
SUCCEEDED_IDS_FIELD_NUMBER: _ClassVar[int]
FAILED_IDS_FIELD_NUMBER: _ClassVar[int]
SKIPPED_IDS_FIELD_NUMBER: _ClassVar[int]
ERROR_MESSAGE_FIELD_NUMBER: _ClassVar[int]
deleted_count: int
succeeded_ids: _containers.RepeatedScalarFieldContainer[str]
failed_ids: _containers.RepeatedScalarFieldContainer[str]
skipped_ids: _containers.RepeatedScalarFieldContainer[str]
error_message: str
def __init__(self, deleted_count: _Optional[int] = ..., succeeded_ids: _Optional[_Iterable[str]] = ..., failed_ids: _Optional[_Iterable[str]] = ..., skipped_ids: _Optional[_Iterable[str]] = ..., error_message: _Optional[str] = ...) -> None: ...
class Summary(_message.Message):
__slots__ = ("meeting_id", "executive_summary", "key_points", "action_items", "generated_at", "model_version")
MEETING_ID_FIELD_NUMBER: _ClassVar[int]

View File

@@ -3,7 +3,7 @@
import grpc
import warnings
import noteflow_pb2 as noteflow__pb2
from . import noteflow_pb2 as noteflow__pb2
GRPC_GENERATED_VERSION = '1.76.0'
GRPC_VERSION = grpc.__version__
@@ -68,6 +68,11 @@ class NoteFlowServiceStub(object):
request_serializer=noteflow__pb2.DeleteMeetingRequest.SerializeToString,
response_deserializer=noteflow__pb2.DeleteMeetingResponse.FromString,
_registered_method=True)
self.DeleteMeetings = channel.unary_unary(
'/noteflow.NoteFlowService/DeleteMeetings',
request_serializer=noteflow__pb2.DeleteMeetingsRequest.SerializeToString,
response_deserializer=noteflow__pb2.DeleteMeetingsResponse.FromString,
_registered_method=True)
self.GenerateSummary = channel.unary_unary(
'/noteflow.NoteFlowService/GenerateSummary',
request_serializer=noteflow__pb2.GenerateSummaryRequest.SerializeToString,
@@ -565,6 +570,12 @@ class NoteFlowServiceServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def DeleteMeetings(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GenerateSummary(self, request, context):
"""Summary generation
"""
@@ -1164,6 +1175,11 @@ def add_NoteFlowServiceServicer_to_server(servicer, server):
request_deserializer=noteflow__pb2.DeleteMeetingRequest.FromString,
response_serializer=noteflow__pb2.DeleteMeetingResponse.SerializeToString,
),
'DeleteMeetings': grpc.unary_unary_rpc_method_handler(
servicer.DeleteMeetings,
request_deserializer=noteflow__pb2.DeleteMeetingsRequest.FromString,
response_serializer=noteflow__pb2.DeleteMeetingsResponse.SerializeToString,
),
'GenerateSummary': grpc.unary_unary_rpc_method_handler(
servicer.GenerateSummary,
request_deserializer=noteflow__pb2.GenerateSummaryRequest.FromString,
@@ -1791,6 +1807,33 @@ class NoteFlowService(object):
metadata,
_registered_method=True)
@staticmethod
def DeleteMeetings(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/noteflow.NoteFlowService/DeleteMeetings',
noteflow__pb2.DeleteMeetingsRequest.SerializeToString,
noteflow__pb2.DeleteMeetingsResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def GenerateSummary(request,
target,