Files
noteflow/tests/integration/test_webhook_integration.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

234 lines
8.3 KiB
Python

"""Integration tests for webhook triggering in gRPC service.
Tests the complete webhook flow from gRPC operations to webhook delivery.
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, TypedDict
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4
import grpc
import pytest
from noteflow.application.services.webhooks import WebhookService
from noteflow.domain.entities import Meeting, Segment
from noteflow.domain.webhooks import (
DeliveryResult,
WebhookConfig,
WebhookDelivery,
WebhookEventType,
)
from noteflow.grpc.config.config import ServicesConfig
from noteflow.grpc.proto import noteflow_pb2
from noteflow.grpc.service import NoteFlowServicer
from noteflow.infrastructure.persistence.unit_of_work import SqlAlchemyUnitOfWork
from noteflow.infrastructure.webhooks import WebhookExecutor
if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def _create_recording_meeting(
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
title: str,
) -> Meeting:
"""Create a meeting in RECORDING state."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title=title)
meeting.start_recording()
await uow.meetings.create(meeting)
await uow.commit()
return meeting
def _create_failing_webhook_service(mock_executor: MagicMock) -> WebhookService:
"""Create a webhook service that fails on delivery."""
mock_executor.deliver = AsyncMock(
side_effect=RuntimeError("Webhook server unreachable")
)
service = WebhookService(executor=mock_executor)
service.register_webhook(
WebhookConfig.create(
workspace_id=uuid4(),
url="https://failing.example.com/webhook",
events=[WebhookEventType.MEETING_COMPLETED],
)
)
return service
class CapturedWebhookCall(TypedDict):
"""Structure for captured webhook call data."""
config_url: str
event_type: WebhookEventType
payload: dict[str, str]
class MockGrpcContext:
"""Mock gRPC context for testing."""
def __init__(self) -> None:
"""Initialize mock context."""
self.aborted = False
self.abort_code: grpc.StatusCode | None = None
self.abort_details: str | None = None
async def abort(self, code: grpc.StatusCode, details: str) -> None:
"""Record abort and raise to simulate gRPC behavior."""
self.aborted = True
self.abort_code = code
self.abort_details = details
raise grpc.RpcError()
@pytest.fixture
def captured_webhook_calls() -> list[CapturedWebhookCall]:
"""Store webhook calls for verification."""
return []
@pytest.fixture
def mock_webhook_executor(captured_webhook_calls: list[CapturedWebhookCall]) -> MagicMock:
"""Create a mock executor that captures calls."""
executor = MagicMock(spec=WebhookExecutor)
async def capture_delivery(
config: WebhookConfig,
event_type: WebhookEventType,
payload: dict[str, str],
) -> WebhookDelivery:
captured_webhook_calls.append({
"config_url": config.url,
"event_type": event_type,
"payload": payload,
})
result = DeliveryResult(status_code=200)
return WebhookDelivery.create(
webhook_id=config.id,
event_type=event_type,
payload=payload,
result=result,
)
executor.deliver = AsyncMock(side_effect=capture_delivery)
executor.close = AsyncMock()
return executor
@pytest.fixture
def webhook_service_with_config(
mock_webhook_executor: MagicMock,
) -> WebhookService:
"""Create a webhook service with a registered webhook."""
service = WebhookService(executor=mock_webhook_executor)
config = WebhookConfig.create(
workspace_id=uuid4(),
url="https://test.example.com/webhook",
events=[
WebhookEventType.MEETING_COMPLETED,
WebhookEventType.RECORDING_STARTED,
WebhookEventType.RECORDING_STOPPED,
],
name="Integration Test Webhook",
)
service.register_webhook(config)
return service
@pytest.mark.integration
class TestStopMeetingTriggersWebhook:
"""Test that StopMeeting triggers webhook delivery."""
async def test_stop_meeting_triggers_meeting_completed_webhook(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
webhook_service_with_config: WebhookService,
captured_webhook_calls: list[CapturedWebhookCall],
) -> None:
"""Stopping a meeting triggers meeting.completed webhook."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="Webhook Integration Test")
meeting.start_recording()
await uow.meetings.create(meeting)
await uow.segments.add(meeting.id, Segment(0, "Test segment", 0.0, 5.0, meeting_id=meeting.id))
await uow.commit()
meeting_id = str(meeting.id)
servicer = NoteFlowServicer(session_factory=session_factory, services=ServicesConfig(webhook_service=webhook_service_with_config))
result = await servicer.StopMeeting(noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id), MockGrpcContext())
assert result.state == noteflow_pb2.MEETING_STATE_STOPPED, f"got {result.state}"
assert len(captured_webhook_calls) == 2, f"got {len(captured_webhook_calls)}"
event_types = {c["event_type"] for c in captured_webhook_calls}
assert WebhookEventType.RECORDING_STOPPED in event_types, f"not in {event_types}"
assert WebhookEventType.MEETING_COMPLETED in event_types, f"not in {event_types}"
completed_calls: list[CapturedWebhookCall] = [
c for c in captured_webhook_calls
if c["event_type"] == WebhookEventType.MEETING_COMPLETED
]
assert len(completed_calls) == 1, "should have one MEETING_COMPLETED"
assert completed_calls[0]["payload"]["meeting_id"] == meeting_id, "meeting_id mismatch"
assert completed_calls[0]["payload"]["title"] == "Webhook Integration Test", "title mismatch"
async def test_stop_meeting_with_failed_webhook_still_succeeds(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
mock_webhook_executor: MagicMock,
) -> None:
"""Meeting stop succeeds even when webhook delivery fails."""
webhook_service = _create_failing_webhook_service(mock_webhook_executor)
meeting = await _create_recording_meeting(
session_factory, meetings_dir, "Webhook Failure Test"
)
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(webhook_service=webhook_service),
)
request = noteflow_pb2.StopMeetingRequest(meeting_id=str(meeting.id))
result = await servicer.StopMeeting(request, MockGrpcContext())
assert result.state == noteflow_pb2.MEETING_STATE_STOPPED, (
f"expected meeting state STOPPED despite webhook failure, got {result.state}"
)
@pytest.mark.integration
class TestNoWebhookServiceGracefulDegradation:
"""Test that operations work without webhook service configured."""
async def test_stop_meeting_works_withoutwebhook_service(
self,
session_factory: async_sessionmaker[AsyncSession],
meetings_dir: Path,
) -> None:
"""Meeting operations work when no webhook service is configured."""
async with SqlAlchemyUnitOfWork(session_factory, meetings_dir) as uow:
meeting = Meeting.create(title="No Webhooks Test")
meeting.start_recording()
await uow.meetings.create(meeting)
await uow.commit()
meeting_id = str(meeting.id)
servicer = NoteFlowServicer(
session_factory=session_factory,
services=ServicesConfig(webhook_service=None),
)
request = noteflow_pb2.StopMeetingRequest(meeting_id=meeting_id)
result = await servicer.StopMeeting(request, MockGrpcContext())
assert result.state == noteflow_pb2.MEETING_STATE_STOPPED, (
f"expected meeting state STOPPED without webhook service, got {result.state}"
)