Files
noteflow/tests/grpc/conftest.py
Travis Vasceannie 1ce24cdf7b feat: reorganize Claude hooks and add RAG documentation structure with error handling policies
- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization
- Added four new blocking hooks to prevent common error handling anti-patterns:
  - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging
  - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption
  - `block-default
2026-01-15 15:58:06 +00:00

133 lines
3.8 KiB
Python

"""Pytest fixtures for gRPC tests.
Provides shared fixtures for gRPC servicer testing including mock contexts,
repositories, and servicer host implementations.
Note: Common fixtures like `mock_grpc_context` are inherited from tests/conftest.py.
Only gRPC-specific fixtures are defined here.
"""
from __future__ import annotations
from unittest.mock import AsyncMock
from uuid import uuid4
import pytest
from noteflow.domain.entities import Meeting
from noteflow.domain.value_objects import MeetingId, MeetingState
from noteflow.domain.webhooks import (
DeliveryResult,
WebhookConfig,
WebhookDelivery,
WebhookEventType,
)
@pytest.fixture
def mock_meetings_repo() -> AsyncMock:
"""Create mock meetings repository.
Returns:
AsyncMock with get method returning None.
"""
repo = AsyncMock()
repo.get = AsyncMock(return_value=None)
return repo
@pytest.fixture
def mock_preferences_repo() -> AsyncMock:
"""Create mock preferences repository with common methods.
Returns:
AsyncMock with get_all_with_metadata, set_bulk, and delete methods.
"""
repo = AsyncMock()
repo.get_all_with_metadata = AsyncMock(return_value=[])
repo.set_bulk = AsyncMock()
repo.delete = AsyncMock()
return repo
@pytest.fixture
def mock_webhook_repo() -> AsyncMock:
"""Create mock webhook repository with common methods.
Returns:
AsyncMock with all webhook repository methods.
"""
repo = AsyncMock()
repo.create = AsyncMock()
repo.get_by_id = AsyncMock(return_value=None)
repo.get_all = AsyncMock(return_value=[])
repo.get_all_enabled = AsyncMock(return_value=[])
repo.update = AsyncMock()
repo.delete = AsyncMock(return_value=False)
repo.add_delivery = AsyncMock()
repo.get_deliveries = AsyncMock(return_value=[])
return repo
@pytest.fixture
def sample_webhook_config() -> WebhookConfig:
"""Create sample webhook config with all optional fields for testing."""
return WebhookConfig.create(
workspace_id=uuid4(),
url="https://example.com/webhook",
events=[WebhookEventType.MEETING_COMPLETED],
name="Test Webhook",
secret="test-secret",
timeout_ms=15000,
max_retries=5,
)
@pytest.fixture
def sample_webhook_delivery(sample_webhook_config: WebhookConfig) -> WebhookDelivery:
"""Create sample webhook delivery for testing."""
result = DeliveryResult(status_code=200, response_body='{"ok": true}', duration_ms=150)
return WebhookDelivery.create(
webhook_id=sample_webhook_config.id,
event_type=WebhookEventType.MEETING_COMPLETED,
payload={"event": "meeting.completed", "meeting_id": str(uuid4())},
result=result,
)
def create_test_meeting(
meeting_id: MeetingId | None = None,
title: str = "Test Meeting",
state: MeetingState = MeetingState.CREATED,
) -> Meeting:
"""Factory for creating test Meeting entities with specific state.
Transitions through valid states to reach target state.
Args:
meeting_id: Optional meeting ID to assign.
title: Meeting title (default "Test Meeting").
state: Target meeting state (default CREATED).
Returns:
Meeting entity in the specified state.
"""
meeting = Meeting.create(title=title)
if meeting_id is not None:
meeting.id = meeting_id
# Transition through valid states to reach target
if state in {
MeetingState.RECORDING,
MeetingState.STOPPING,
MeetingState.STOPPED,
MeetingState.COMPLETED,
}:
meeting.start_recording()
if state in {MeetingState.STOPPING, MeetingState.STOPPED, MeetingState.COMPLETED}:
meeting.stop_recording()
if state == MeetingState.COMPLETED:
meeting.complete()
return meeting