Files
noteflow/src/noteflow/grpc/mixins/calendar.py

321 lines
11 KiB
Python

"""Calendar integration mixin for gRPC service."""
from __future__ import annotations
from typing import TYPE_CHECKING
from noteflow.application.services.calendar import CalendarService, CalendarServiceError
from noteflow.domain.constants.fields import CALENDAR
from noteflow.domain.entities.integration import IntegrationStatus
from noteflow.domain.ports.calendar import CalendarEventInfo, OAuthConnectionInfo
from noteflow.domain.value_objects import OAuthProvider
from noteflow.infrastructure.logging import get_logger
from ..proto import noteflow_pb2
from .errors import (
abort_internal,
abort_invalid_argument,
abort_unavailable,
)
logger = get_logger(__name__)
_ERR_CALENDAR_NOT_ENABLED = "Calendar integration not enabled"
if TYPE_CHECKING:
from ._types import GrpcContext
from .protocols import ServicerHost
def _calendar_event_to_proto(event: CalendarEventInfo) -> noteflow_pb2.CalendarEvent:
"""Convert a domain CalendarEventInfo to protobuf message.
Args:
event: The domain calendar event entity.
Returns:
The protobuf CalendarEvent message.
"""
return noteflow_pb2.CalendarEvent(
id=event.id,
title=event.title,
start_time=int(event.start_time.timestamp()),
end_time=int(event.end_time.timestamp()),
location=event.location or "",
attendees=list(event.attendees),
meeting_url=event.meeting_url or "",
is_recurring=event.is_recurring,
provider=event.provider,
)
def _build_oauth_connection(
info: OAuthConnectionInfo,
integration_type: str,
) -> noteflow_pb2.OAuthConnection:
"""Build OAuthConnection proto from connection info."""
return noteflow_pb2.OAuthConnection(
provider=info.provider,
status=info.status,
email=info.email or "",
expires_at=int(info.expires_at.timestamp()) if info.expires_at else 0,
error_message=info.error_message or "",
integration_type=integration_type,
)
async def require_calendar_service(
host: ServicerHost,
context: GrpcContext,
operation: str,
) -> CalendarService:
"""Return calendar service or abort with UNAVAILABLE.
Returns the CalendarService instance for type-safe usage after the check.
"""
if host.calendar_service is not None:
return host.calendar_service
logger.warning(f"{operation}_unavailable", reason="service_not_enabled")
await abort_unavailable(context, _ERR_CALENDAR_NOT_ENABLED)
raise # Unreachable but helps type checker
class CalendarMixin:
"""Mixin providing calendar integration functionality.
Requires host to implement ServicerHost protocol with _calendar_service.
Provides OAuth flow and calendar event fetching for Google and Outlook.
"""
async def ListCalendarEvents(
self: ServicerHost,
request: noteflow_pb2.ListCalendarEventsRequest,
context: GrpcContext,
) -> noteflow_pb2.ListCalendarEventsResponse:
"""List upcoming calendar events from connected providers."""
service = await require_calendar_service(self, context, "calendar_list_events")
provider = request.provider or None
hours_ahead = request.hours_ahead if request.hours_ahead > 0 else None
limit = request.limit if request.limit > 0 else None
logger.debug(
"calendar_list_events_request",
provider=provider,
hours_ahead=hours_ahead,
limit=limit,
)
workspace_id = self.get_operation_context(context).workspace_id
try:
events = await service.list_calendar_events(
provider=provider,
hours_ahead=hours_ahead,
limit=limit,
workspace_id=workspace_id,
)
except CalendarServiceError as e:
logger.error("calendar_list_events_failed", error=str(e), provider=provider)
await abort_internal(context, str(e))
raise # Unreachable but helps type checker
proto_events = [_calendar_event_to_proto(event) for event in events]
logger.info(
"calendar_list_events_success",
provider=provider,
event_count=len(proto_events),
)
return noteflow_pb2.ListCalendarEventsResponse(
events=proto_events,
total_count=len(proto_events),
)
async def GetCalendarProviders(
self: ServicerHost,
request: noteflow_pb2.GetCalendarProvidersRequest,
context: GrpcContext,
) -> noteflow_pb2.GetCalendarProvidersResponse:
"""Get available calendar providers with authentication status."""
service = await require_calendar_service(self, context, "calendar_providers")
logger.debug("calendar_get_providers_request")
providers: list[noteflow_pb2.CalendarProvider] = []
for provider_name, display_name in [
(OAuthProvider.GOOGLE.value, "Google Calendar"),
(OAuthProvider.OUTLOOK.value, "Microsoft Outlook"),
]:
status: OAuthConnectionInfo = await service.get_connection_status(
provider_name,
workspace_id=self.get_operation_context(context).workspace_id,
)
is_authenticated = status.status == IntegrationStatus.CONNECTED.value
providers.append(
noteflow_pb2.CalendarProvider(
name=provider_name,
is_authenticated=is_authenticated,
display_name=display_name,
)
)
logger.debug(
"calendar_provider_status",
provider=provider_name,
is_authenticated=is_authenticated,
status=status.status,
)
authenticated_count = sum(
bool(provider.is_authenticated) for provider in providers
)
logger.info(
"calendar_get_providers_success",
total_providers=len(providers),
authenticated_count=authenticated_count,
)
return noteflow_pb2.GetCalendarProvidersResponse(providers=providers)
async def InitiateOAuth(
self: ServicerHost,
request: noteflow_pb2.InitiateOAuthRequest,
context: GrpcContext,
) -> noteflow_pb2.InitiateOAuthResponse:
"""Start OAuth flow for a calendar provider."""
service = await require_calendar_service(self, context, "oauth_initiate")
logger.debug(
"oauth_initiate_request",
provider=request.provider,
has_redirect_uri=bool(request.redirect_uri),
)
workspace_id = self.get_operation_context(context).workspace_id
try:
auth_url, state = await service.initiate_oauth(
provider=request.provider,
redirect_uri=request.redirect_uri or None,
workspace_id=workspace_id,
)
except CalendarServiceError as e:
logger.error(
"oauth_initiate_failed",
provider=request.provider,
error=str(e),
)
await abort_invalid_argument(context, str(e))
raise # Unreachable but helps type checker
logger.info(
"oauth_initiate_success",
provider=request.provider,
state=state,
)
return noteflow_pb2.InitiateOAuthResponse(
auth_url=auth_url,
state=state,
)
async def CompleteOAuth(
self: ServicerHost,
request: noteflow_pb2.CompleteOAuthRequest,
context: GrpcContext,
) -> noteflow_pb2.CompleteOAuthResponse:
"""Complete OAuth flow with authorization code."""
service = await require_calendar_service(self, context, "oauth_complete")
logger.debug(
"oauth_complete_request",
provider=request.provider,
state=request.state,
)
workspace_id = self.get_operation_context(context).workspace_id
try:
integration_id = await service.complete_oauth(
provider=request.provider,
code=request.code,
state=request.state,
workspace_id=workspace_id,
)
except CalendarServiceError as e:
logger.warning(
"oauth_complete_failed",
provider=request.provider,
error=str(e),
)
return noteflow_pb2.CompleteOAuthResponse(
success=False,
error_message=str(e),
)
status = await service.get_connection_status(request.provider, workspace_id=workspace_id)
logger.info(
"oauth_complete_success",
provider=request.provider,
email=status.email,
integration_id=str(integration_id),
)
return noteflow_pb2.CompleteOAuthResponse(
success=True,
provider_email=status.email or "",
integration_id=str(integration_id),
)
async def GetOAuthConnectionStatus(
self: ServicerHost,
request: noteflow_pb2.GetOAuthConnectionStatusRequest,
context: GrpcContext,
) -> noteflow_pb2.GetOAuthConnectionStatusResponse:
"""Get OAuth connection status for a provider."""
service = await require_calendar_service(self, context, "oauth_status")
logger.debug(
"oauth_status_request",
provider=request.provider,
integration_type=request.integration_type or CALENDAR,
)
info = await service.get_connection_status(
request.provider,
workspace_id=self.get_operation_context(context).workspace_id,
)
logger.info(
"oauth_status_retrieved",
provider=request.provider,
status=info.status,
has_email=bool(info.email),
has_error=bool(info.error_message),
)
return noteflow_pb2.GetOAuthConnectionStatusResponse(
connection=_build_oauth_connection(info, request.integration_type or CALENDAR)
)
async def DisconnectOAuth(
self: ServicerHost,
request: noteflow_pb2.DisconnectOAuthRequest,
context: GrpcContext,
) -> noteflow_pb2.DisconnectOAuthResponse:
"""Disconnect OAuth integration and revoke tokens."""
service = await require_calendar_service(self, context, "oauth_disconnect")
logger.debug("oauth_disconnect_request", provider=request.provider)
success = await service.disconnect(
request.provider,
workspace_id=self.get_operation_context(context).workspace_id,
)
if success:
logger.info("oauth_disconnect_success", provider=request.provider)
else:
logger.warning("oauth_disconnect_failed", provider=request.provider)
return noteflow_pb2.DisconnectOAuthResponse(success=success)