- Moved all hookify configuration files from `.claude/` to `.claude/hooks/` subdirectory for better organization - Added four new blocking hooks to prevent common error handling anti-patterns: - `block-broad-exception-handler`: Prevents catching generic `Exception` with only logging - `block-datetime-now-fallback`: Blocks returning `datetime.now()` as fallback on parse failures to prevent data corruption - `block-default
133 lines
3.8 KiB
Python
133 lines
3.8 KiB
Python
"""Pytest fixtures for gRPC tests.
|
|
|
|
Provides shared fixtures for gRPC servicer testing including mock contexts,
|
|
repositories, and servicer host implementations.
|
|
|
|
Note: Common fixtures like `mock_grpc_context` are inherited from tests/conftest.py.
|
|
Only gRPC-specific fixtures are defined here.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock
|
|
from uuid import uuid4
|
|
|
|
import pytest
|
|
|
|
from noteflow.domain.entities import Meeting
|
|
from noteflow.domain.value_objects import MeetingId, MeetingState
|
|
from noteflow.domain.webhooks import (
|
|
DeliveryResult,
|
|
WebhookConfig,
|
|
WebhookDelivery,
|
|
WebhookEventType,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_meetings_repo() -> AsyncMock:
|
|
"""Create mock meetings repository.
|
|
|
|
Returns:
|
|
AsyncMock with get method returning None.
|
|
"""
|
|
repo = AsyncMock()
|
|
repo.get = AsyncMock(return_value=None)
|
|
return repo
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_preferences_repo() -> AsyncMock:
|
|
"""Create mock preferences repository with common methods.
|
|
|
|
Returns:
|
|
AsyncMock with get_all_with_metadata, set_bulk, and delete methods.
|
|
"""
|
|
repo = AsyncMock()
|
|
repo.get_all_with_metadata = AsyncMock(return_value=[])
|
|
repo.set_bulk = AsyncMock()
|
|
repo.delete = AsyncMock()
|
|
return repo
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_webhook_repo() -> AsyncMock:
|
|
"""Create mock webhook repository with common methods.
|
|
|
|
Returns:
|
|
AsyncMock with all webhook repository methods.
|
|
"""
|
|
repo = AsyncMock()
|
|
repo.create = AsyncMock()
|
|
repo.get_by_id = AsyncMock(return_value=None)
|
|
repo.get_all = AsyncMock(return_value=[])
|
|
repo.get_all_enabled = AsyncMock(return_value=[])
|
|
repo.update = AsyncMock()
|
|
repo.delete = AsyncMock(return_value=False)
|
|
repo.add_delivery = AsyncMock()
|
|
repo.get_deliveries = AsyncMock(return_value=[])
|
|
return repo
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_webhook_config() -> WebhookConfig:
|
|
"""Create sample webhook config with all optional fields for testing."""
|
|
return WebhookConfig.create(
|
|
workspace_id=uuid4(),
|
|
url="https://example.com/webhook",
|
|
events=[WebhookEventType.MEETING_COMPLETED],
|
|
name="Test Webhook",
|
|
secret="test-secret",
|
|
timeout_ms=15000,
|
|
max_retries=5,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_webhook_delivery(sample_webhook_config: WebhookConfig) -> WebhookDelivery:
|
|
"""Create sample webhook delivery for testing."""
|
|
result = DeliveryResult(status_code=200, response_body='{"ok": true}', duration_ms=150)
|
|
return WebhookDelivery.create(
|
|
webhook_id=sample_webhook_config.id,
|
|
event_type=WebhookEventType.MEETING_COMPLETED,
|
|
payload={"event": "meeting.completed", "meeting_id": str(uuid4())},
|
|
result=result,
|
|
)
|
|
|
|
|
|
def create_test_meeting(
|
|
meeting_id: MeetingId | None = None,
|
|
title: str = "Test Meeting",
|
|
state: MeetingState = MeetingState.CREATED,
|
|
) -> Meeting:
|
|
"""Factory for creating test Meeting entities with specific state.
|
|
|
|
Transitions through valid states to reach target state.
|
|
|
|
Args:
|
|
meeting_id: Optional meeting ID to assign.
|
|
title: Meeting title (default "Test Meeting").
|
|
state: Target meeting state (default CREATED).
|
|
|
|
Returns:
|
|
Meeting entity in the specified state.
|
|
"""
|
|
meeting = Meeting.create(title=title)
|
|
if meeting_id is not None:
|
|
meeting.id = meeting_id
|
|
|
|
# Transition through valid states to reach target
|
|
if state in {
|
|
MeetingState.RECORDING,
|
|
MeetingState.STOPPING,
|
|
MeetingState.STOPPED,
|
|
MeetingState.COMPLETED,
|
|
}:
|
|
meeting.start_recording()
|
|
if state in {MeetingState.STOPPING, MeetingState.STOPPED, MeetingState.COMPLETED}:
|
|
meeting.stop_recording()
|
|
if state == MeetingState.COMPLETED:
|
|
meeting.complete()
|
|
|
|
return meeting
|