Files
biz-bud/tests/conftest.py
Travis Vasceannie 8ad47a7640 Modernize research graph metadata for LangGraph v1 (#60)
* Modernize research graph metadata for LangGraph v1

* Update src/biz_bud/core/langgraph/graph_builder.py

Co-authored-by: qodo-merge-pro[bot] <151058649+qodo-merge-pro[bot]@users.noreply.github.com>

---------

Co-authored-by: qodo-merge-pro[bot] <151058649+qodo-merge-pro[bot]@users.noreply.github.com>
2025-09-19 03:01:18 -04:00

1011 lines
33 KiB
Python

"""Root pytest configuration with hierarchical fixtures."""
import asyncio
import asyncio
import importlib
import importlib.util
import os
import sys
import tempfile
from pathlib import Path
from types import ModuleType
from typing import Any, AsyncGenerator, Generator, TypeVar, cast
from unittest.mock import AsyncMock, Mock
import pytest
from _pytest.config import Config
# ---------------------------------------------------------------------------
# Optional third-party dependency shims
# ---------------------------------------------------------------------------
#
# The production environment installs the full Anthropic SDK. However, the
# lightweight test environments that power unit tests in CI do not ship the
# dependency by default. Several core modules import Anthropic exception types
# at module import time, so we provide a minimal stub here to keep those imports
# working without the real package. The stub exposes the subset of exceptions
# that the codebase interacts with and emulates the handful of attributes that
# tests assert against (message, response, body, retry_after, etc.).
try: # pragma: no cover - import guard for optional dependency
import anthropic # type: ignore # noqa: F401
except ModuleNotFoundError: # pragma: no cover - executed only in lightweight envs
anthropic_stub = ModuleType("anthropic")
class _AnthropicError(Exception):
"""Base stub for Anthropic exceptions used in tests."""
def __init__(
self,
message: str | None = None,
*,
response: Any | None = None,
body: Any | None = None,
status_code: int | None = None,
**extra: Any,
) -> None:
self.message = message or self.__class__.__name__
self.response = response
self.body = body
self.status_code = status_code
for key, value in extra.items():
setattr(self, key, value)
super().__init__(self.message)
class APIError(_AnthropicError):
"""Stub Anthropic APIError."""
class AuthenticationError(APIError):
"""Stub Anthropic AuthenticationError."""
class RateLimitError(APIError):
"""Stub Anthropic RateLimitError with retry metadata support."""
def __init__(
self,
message: str | None = None,
*,
retry_after: float | None = None,
**kwargs: Any,
) -> None:
super().__init__(message, **kwargs)
self.retry_after = retry_after
class APITimeoutError(APIError, TimeoutError):
"""Stub Anthropic APITimeoutError inheriting from TimeoutError."""
def __init__(self, message: str | None = None, **kwargs: Any) -> None:
APIError.__init__(self, message, **kwargs)
TimeoutError.__init__(self, self.message)
anthropic_stub.APIError = APIError
anthropic_stub.AuthenticationError = AuthenticationError
anthropic_stub.RateLimitError = RateLimitError
anthropic_stub.APITimeoutError = APITimeoutError
anthropic_stub.__all__ = [
"APIError",
"AuthenticationError",
"RateLimitError",
"APITimeoutError",
]
sys.modules["anthropic"] = anthropic_stub
# pytest-asyncio is part of the development dependency set, but it may be absent
# in lightweight execution environments used for kata validation. Provide a
# minimal stand-in so that ``pytest_plugins = ["pytest_asyncio"]`` continues to
# work without the real package.
try: # pragma: no cover - optional dependency
from dotenv import load_dotenv
except ModuleNotFoundError: # pragma: no cover - fallback implementation
dotenv_stub = ModuleType("dotenv")
def load_dotenv(*_: Any, **__: Any) -> None:
return None
def dotenv_values(*_: Any, **__: Any) -> dict[str, str]:
"""Return an empty mapping when python-dotenv is unavailable."""
return {}
dotenv_stub.load_dotenv = load_dotenv
dotenv_stub.dotenv_values = dotenv_values
sys.modules["dotenv"] = dotenv_stub
# Prepend the absolute path to the 'src' directory to sys.path
src_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))
if src_path not in sys.path:
sys.path.insert(0, src_path)
# Add the project root directory (parent of 'tests' and 'src') to sys.path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(1, str(project_root)) # Insert after 'src' to preserve order
# ---------------------------------------------------------------------------
# Optional dependency stubs
# ---------------------------------------------------------------------------
def _install_stub(package: str) -> None:
"""Load a lightweight stub module or package from ``tests/stubs``."""
stubs_root = project_root / "tests" / "stubs"
package_root = stubs_root / package
module_path: Path | None = None
search_locations: list[str] | None = None
if package_root.is_dir():
candidate = package_root / "__init__.py"
if not candidate.is_file(): # pragma: no cover - defensive guard
return
module_path = candidate
search_locations = [str(package_root)]
else:
single_file = stubs_root / f"{package}.py"
if not single_file.is_file(): # pragma: no cover - defensive guard
return
module_path = single_file
spec = importlib.util.spec_from_file_location(
package,
module_path,
submodule_search_locations=search_locations,
)
if spec is None or spec.loader is None: # pragma: no cover - defensive guard
return
module = importlib.util.module_from_spec(spec)
sys.modules[package] = module
spec.loader.exec_module(module)
def _ensure_optional_dependency(package: str) -> None:
"""Import a package, falling back to the local stub when unavailable."""
try:
importlib.import_module(package)
except ModuleNotFoundError:
_install_stub(package)
for optional_package in (
"langgraph",
"langchain_core",
"langchain_anthropic",
"langchain_openai",
"pydantic",
"nltk",
"rich",
"aiohttp",
"aiofiles",
"asyncpg",
"qdrant_client",
"bs4",
"r2r",
"pythonjsonlogger",
"dateutil",
"docling",
"httpx",
"requests",
"openai",
"numpy",
"yaml",
"pandas",
"pytest_asyncio",
):
_ensure_optional_dependency(optional_package)
# Type variable for generic service typing
T = TypeVar("T")
# Configure pytest plugins
pytest_plugins = ["pytest_asyncio"]
# Import all fixtures from helpers to make them available globally
# Import factories and assertions for easy access
from tests.helpers.assertions.custom_assertions import * # noqa: F401, F403, E402
from tests.helpers.factories.state_factories import * # noqa: F401, F403, E402
from tests.helpers.fixtures.config_fixtures import * # noqa: F401, F403, E402
from tests.helpers.fixtures.factory_fixtures import * # noqa: F401, F403, E402
from tests.helpers.fixtures.mock_fixtures import * # noqa: F401, F403, E402
from tests.helpers.fixtures.state_fixtures import * # noqa: F401, F403, E402
from tests.helpers.mocks.mock_builders import * # noqa: F401, F403, E402
def pytest_addoption(parser: pytest.Parser) -> None:
"""Register no-op coverage options so pytest invocations succeed without pytest-cov."""
cov_group = parser.getgroup("cov")
cov_group.addoption(
"--cov",
action="append",
dest="cov",
default=[],
help="stubbed coverage target (no-op)",
)
cov_group.addoption(
"--cov-report",
action="append",
dest="cov_report",
default=[],
help="stubbed coverage report option (no-op)",
)
cov_group.addoption(
"--cov-fail-under",
action="store",
dest="cov_fail_under",
default=None,
help="stubbed coverage threshold (no-op)",
)
parser.addini(
"asyncio_default_fixture_loop_scope",
"stubbed asyncio loop scope option",
default="function",
)
parser.addini("asyncio_mode", "stubbed asyncio mode", default="auto")
def pytest_configure(config: Config) -> None:
"""Configure pytest with custom settings."""
# Add custom markers
config.addinivalue_line(
"markers", "slow: marks tests as slow (deselect with '-m \"not slow\"')"
)
config.addinivalue_line("markers", "integration: marks tests as integration tests")
config.addinivalue_line("markers", "unit: marks tests as unit tests")
config.addinivalue_line("markers", "e2e: marks tests as end-to-end tests")
config.addinivalue_line("markers", "web: marks tests that require internet access")
config.addinivalue_line(
"markers", "browser: marks tests that require browser automation"
)
# Add crash test markers
config.addinivalue_line("markers", "crash: marks tests as crash tests")
config.addinivalue_line("markers", "memory: marks tests as memory-related")
config.addinivalue_line("markers", "network: marks tests as network-related")
config.addinivalue_line("markers", "database: marks tests as database-related")
config.addinivalue_line("markers", "concurrent: marks tests as concurrency-related")
@pytest.hookimpl(tryfirst=True)
def pytest_pyfunc_call(pyfuncitem: pytest.Function) -> bool | None:
"""Execute ``async`` tests marked with ``@pytest.mark.asyncio`` using a local loop."""
marker = pyfuncitem.get_closest_marker("asyncio")
if marker is None:
return None
loop = asyncio.new_event_loop()
try:
kwargs = {name: pyfuncitem.funcargs[name] for name in pyfuncitem._fixtureinfo.argnames} # type: ignore[attr-defined]
loop.run_until_complete(pyfuncitem.obj(**kwargs))
finally:
loop.close()
return True
@pytest.fixture(scope="session")
def anyio_backend() -> str:
"""Use asyncio for async tests."""
return "asyncio"
@pytest.fixture(scope="session", autouse=True)
def setup_test_environment() -> None:
"""Set up test environment variables."""
# Load environment variables from .env file
from dotenv import load_dotenv
load_dotenv()
# Override specific values for testing
os.environ["ENVIRONMENT"] = "test"
os.environ["LOG_LEVEL"] = "WARNING" # Reduce log noise in tests
os.environ["DISABLE_TELEMETRY"] = "true"
@pytest.fixture(scope="function")
def clean_state() -> dict[str, Any]:
"""Provide a clean state dictionary for testing."""
return {
"messages": [],
"errors": [],
"metadata": {},
"step_count": 0,
"workflow_status": "initialized",
}
# ================================
# CENTRALIZED COMMON FIXTURES
# ================================
# These fixtures replace duplicates found across multiple conftest.py files
@pytest.fixture(scope="session")
def event_loop_policy():
"""Set cross-platform event loop policy for all tests."""
if os.name == "nt" and hasattr(asyncio, "WindowsProactorEventLoopPolicy"):
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # type: ignore[attr-defined]
return asyncio.get_event_loop_policy()
@pytest.fixture(scope="session")
def temp_dir_session() -> Generator[Path, None, None]:
"""Provide a session-scoped temporary directory for expensive setup."""
with tempfile.TemporaryDirectory(prefix="bizbudz_test_session_") as tmpdir:
yield Path(tmpdir)
@pytest.fixture(scope="function")
def temp_dir() -> Generator[Path, None, None]:
"""Provide a function-scoped temporary directory for test isolation."""
with tempfile.TemporaryDirectory(prefix="bizbudz_test_") as tmpdir:
yield Path(tmpdir)
@pytest.fixture
def mock_logger() -> Mock:
"""Provide a comprehensive mock logger for all tests."""
logger = Mock()
logger.debug = Mock()
logger.info = Mock()
logger.warning = Mock()
logger.error = Mock()
logger.exception = Mock()
logger.critical = Mock()
logger.setLevel = Mock()
logger.getEffectiveLevel = Mock(return_value=20) # INFO level
logger.isEnabledFor = Mock(return_value=True)
logger.handlers = []
logger.level = 20 # INFO level
return logger
@pytest.fixture
async def mock_http_client() -> AsyncMock:
"""Provide a standard mock HTTP client for async tests."""
client = AsyncMock()
client.get = AsyncMock(return_value={"status": "ok"})
client.post = AsyncMock(return_value={"id": "123"})
client.put = AsyncMock(return_value={"status": "updated"})
client.delete = AsyncMock(return_value={"status": "deleted"})
client.request = AsyncMock(return_value={"status": "success"})
client.close = AsyncMock()
return client
@pytest.fixture
async def mock_aiohttp_session() -> AsyncGenerator[AsyncMock, None]:
"""Provide a mock aiohttp ClientSession with context manager support."""
mock_session = AsyncMock()
mock_session.__aenter__ = AsyncMock(return_value=mock_session)
mock_session.__aexit__ = AsyncMock(return_value=None)
mock_session.closed = False
# Set up default successful response
default_response = AsyncMock()
default_response.status = 200
default_response.text = AsyncMock(return_value='{"success": true}')
default_response.json = AsyncMock(return_value={"success": True})
default_response.raise_for_status = Mock()
default_response.__aenter__ = AsyncMock(return_value=default_response)
default_response.__aexit__ = AsyncMock(return_value=None)
mock_session.get = AsyncMock(return_value=default_response)
mock_session.post = AsyncMock(return_value=default_response)
mock_session.put = AsyncMock(return_value=default_response)
mock_session.delete = AsyncMock(return_value=default_response)
mock_session.close = AsyncMock()
yield mock_session
@pytest.fixture
def error_info_factory():
"""Create ErrorInfo TypedDict instances with standard defaults."""
def _create(
error_type: str = "TestError",
message: str = "Test error message",
severity: str = "error",
category: str = "test",
context: dict[str, Any] | None = None,
node: str = "test_node",
) -> dict[str, Any]:
"""Create a standardized ErrorInfo structure.
Args:
error_type: Type of error
message: Error message
severity: Error severity level
category: Error category
context: Additional context data
node: Node where error occurred
Returns:
ErrorInfo TypedDict structure
"""
return {
"message": message,
"node": node,
"details": {
"type": error_type,
"message": message,
"severity": severity,
"category": category,
"timestamp": "2024-01-01T00:00:00Z",
"context": context or {},
"traceback": None,
},
}
return _create
@pytest.fixture
def benchmark_timer():
"""Provide simple timer for performance benchmarking across all tests."""
import time
class Timer:
def __init__(self):
self.start_time: float | None = None
self.end_time: float | None = None
def __enter__(self):
self.start_time = time.time()
return self
def __exit__(self, *args):
self.end_time = time.time()
@property
def elapsed(self) -> float:
if self.start_time and self.end_time:
return self.end_time - self.start_time
return 0.0
return Timer
# ================================
# CENTRALIZED SERVICE FIXTURES
# ================================
# These fixtures provide standardized mocking for service factories
# and configurations across all test types
@pytest.fixture(scope="session")
def app_config():
"""Provide application configuration for all tests.
This fixture loads the application configuration once per session
and can be used by unit, integration, and e2e tests.
"""
from biz_bud.core.config.loader import load_config
return load_config()
@pytest.fixture(scope="function")
def mock_service_factory(app_config):
"""Create a comprehensive mock ServiceFactory for tests.
This fixture provides a consistent mock ServiceFactory that returns
mock services for all common service types. It combines the best
features from both unit test and integration test implementations.
Returns:
ServiceFactory: A mock factory with all service methods mocked
"""
from unittest.mock import AsyncMock
from biz_bud.services.factory import ServiceFactory
factory = ServiceFactory(app_config)
# Mock all common service methods to return AsyncMocks
factory.get_llm_client = AsyncMock()
factory.get_db_service = AsyncMock()
factory.get_vector_store = AsyncMock()
factory.get_redis_cache = AsyncMock()
factory.get_semantic_extraction = AsyncMock()
factory.get_service = AsyncMock()
factory.cleanup = AsyncMock()
factory.lifespan = AsyncMock()
# Set up common LLM client return values
mock_llm = AsyncMock()
mock_llm.llm_chat = AsyncMock(return_value="Mock LLM response")
mock_llm.llm_json = AsyncMock(return_value={"result": "mock"})
mock_llm.call_model_lc = AsyncMock()
mock_llm.llm = AsyncMock()
mock_llm.llm.ainvoke = AsyncMock(return_value=AsyncMock(content="Mock response"))
mock_llm.llm.bind_tools = lambda tools: mock_llm.llm
factory.get_llm_client.return_value = mock_llm
# Set up database service
mock_db = AsyncMock()
mock_db.execute_query = AsyncMock(return_value=[])
mock_db.insert_data = AsyncMock(return_value={"id": "mock-id"})
factory.get_db_service.return_value = mock_db
# Set up vector store
mock_vector_store = AsyncMock()
mock_vector_store.semantic_search = AsyncMock(return_value=[])
mock_vector_store.add_documents = AsyncMock(return_value=True)
factory.get_vector_store.return_value = mock_vector_store
# Set up Redis cache
mock_redis = AsyncMock()
mock_redis.get = AsyncMock(return_value=None)
mock_redis.set = AsyncMock(return_value=True)
factory.get_redis_cache.return_value = mock_redis
# Generic service getter for other services
async def mock_get_service(service_class: type[T]) -> T:
"""Return appropriate mock based on service class."""
from biz_bud.services.llm import LangchainLLMClient
if service_class == LangchainLLMClient:
return cast(T, mock_llm)
# For other services, return a generic mock
mock_service = AsyncMock()
mock_service.initialize = AsyncMock()
mock_service.cleanup = AsyncMock()
return cast(T, mock_service)
# Use setattr to avoid type checker issues with method replacement
setattr(factory, "get_service", mock_get_service)
# Set up context manager support
factory.__aenter__ = AsyncMock(return_value=factory)
factory.__aexit__ = AsyncMock(return_value=None)
return factory
@pytest.fixture(scope="function")
def mock_service_factory_minimal():
"""Create a minimal mock ServiceFactory for simple unit tests.
This is a lighter-weight version of mock_service_factory for tests
that don't need full service mocking.
"""
from unittest.mock import AsyncMock, Mock
factory = Mock()
factory.get_llm_client = AsyncMock(return_value=AsyncMock())
factory.get_service = AsyncMock(return_value=AsyncMock())
factory.cleanup = AsyncMock()
return factory
# ================================
# API CONFIGURATION FIXTURES
# ================================
# Configuration fixtures for API testing
@pytest.fixture
def api_key_config() -> dict[str, str]:
"""Provide API key configuration for testing.
Returns standardized API key configuration that can be used
across all client tests. Keys are prefixed with 'test_' to
ensure they're not accidentally used in production.
"""
return {
"api_key": "fake-api-key-for-testing-only-12345",
"anthropic_api_key": "fake-anthropic-key-for-testing-only-67890",
"openai_api_key": "fake-openai-key-for-testing-only-abcde",
"google_api_key": "fake-google-key-for-testing-only-fghij",
"perplexity_api_key": "fake-perplexity-key-for-testing-only-klmno",
"tavily_api_key": "fake-tavily-key-for-testing-only-pqrst",
"jina_api_key": "fake-jina-key-for-testing-only-uvwxy",
"firecrawl_api_key": "fake-firecrawl-key-for-testing-only-z1234",
}
@pytest.fixture
def base_url_config() -> dict[str, str]:
"""Provide base URL configuration for testing.
Returns standardized base URLs that can be used across
all client tests. URLs point to localhost or httpbin
for safe testing.
"""
return {
"base_url": "http://localhost:8080",
"api_base_url": "http://localhost:8080/api/v1",
"anthropic_base_url": "https://httpbin.org/anything/anthropic",
"openai_base_url": "https://httpbin.org/anything/openai",
"google_base_url": "https://httpbin.org/anything/google",
"perplexity_base_url": "https://httpbin.org/anything/perplexity",
"tavily_base_url": "https://httpbin.org/anything/tavily",
"jina_base_url": "https://httpbin.org/anything/jina",
"firecrawl_base_url": "https://httpbin.org/anything/firecrawl",
}
@pytest.fixture
def test_config(api_key_config: dict[str, str], base_url_config: dict[str, str]) -> dict[str, Any]:
"""Provide combined test configuration for easy access.
Combines API keys and base URLs into a single configuration
dictionary for convenience in tests.
"""
return {
"api_keys": api_key_config,
"base_urls": base_url_config,
"timeout": 30.0,
"max_retries": 3,
"user_agent": "biz-bud-test-client/1.0.0",
}
# ================================
# HTTPX CLIENT FIXTURES
# ================================
# Modern HTTP client fixtures using HTTPX
@pytest.fixture
async def mock_httpx_client() -> AsyncGenerator[AsyncMock, None]:
"""Provide a mock HTTPX AsyncClient for testing.
This fixture creates a comprehensive mock of httpx.AsyncClient
with all common HTTP methods (GET, POST, PUT, DELETE, PATCH)
and proper async context manager support.
"""
client = AsyncMock()
# Set up context manager methods
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=None)
# Set up HTTP methods with default responses
default_response = AsyncMock()
default_response.status_code = 200
default_response.headers = {"Content-Type": "application/json"}
default_response.text = '{"status": "success"}'
default_response.json = AsyncMock(return_value={"status": "success"})
default_response.content = b'{"status": "success"}'
default_response.raise_for_status = Mock()
default_response.is_success = True
default_response.is_error = False
client.get = AsyncMock(return_value=default_response)
client.post = AsyncMock(return_value=default_response)
client.put = AsyncMock(return_value=default_response)
client.delete = AsyncMock(return_value=default_response)
client.patch = AsyncMock(return_value=default_response)
client.head = AsyncMock(return_value=default_response)
client.options = AsyncMock(return_value=default_response)
client.request = AsyncMock(return_value=default_response)
# Set up stream method for streaming responses
async def mock_stream(*args, **kwargs):
stream_response = AsyncMock()
stream_response.status_code = 200
stream_response.headers = {"Content-Type": "application/json"}
stream_response.aiter_bytes = AsyncMock(return_value=[b'{"chunk": 1}', b'{"chunk": 2}'])
stream_response.aiter_text = AsyncMock(return_value=['{"chunk": 1}', '{"chunk": 2}'])
stream_response.aiter_lines = AsyncMock(return_value=['{"chunk": 1}', '{"chunk": 2}'])
return stream_response
client.stream = mock_stream
client.close = AsyncMock()
client.is_closed = False
yield client
@pytest.fixture
def httpx_client_factory(api_key_config: dict[str, str], base_url_config: dict[str, str]):
"""Factory for creating configured HTTPX clients.
This factory can create mock or real HTTPX clients with
proper configuration for different services.
"""
def _create_client(
service: str = "default",
mock: bool = True,
**kwargs: Any
) -> Any:
"""Create an HTTPX client for the specified service.
Args:
service: Service name (e.g., 'anthropic', 'openai')
mock: Whether to return a mock client or real client
**kwargs: Additional client configuration
Returns:
Configured HTTPX client (mock or real)
"""
if mock:
from unittest.mock import AsyncMock
return AsyncMock()
# For real clients (integration tests)
import httpx
base_url = base_url_config.get(f"{service}_base_url", base_url_config["base_url"])
api_key = api_key_config.get(f"{service}_api_key", api_key_config["api_key"])
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"User-Agent": "biz-bud-test-client/1.0.0",
}
headers.update(kwargs.pop("headers", {}))
return httpx.AsyncClient(
base_url=base_url,
headers=headers,
timeout=30.0,
**kwargs
)
return _create_client
# ================================
# STANDARDIZED MOCK RESPONSES
# ================================
# Consistent mock HTTP responses for testing
@pytest.fixture
def mock_response_factory():
"""Create mock HTTP responses with various status codes and scenarios."""
import json
from unittest.mock import AsyncMock, Mock
def _create_response(
status: int = 200,
json_data: dict[str, Any] | None = None,
text: str = "",
headers: dict[str, str] | None = None,
raise_on_status: bool = False,
content_type: str = "application/json",
encoding: str = "utf-8",
simulate_network_delay: bool = False,
stream_chunks: list[str] | None = None,
) -> AsyncMock:
"""Create a mock HTTP response with comprehensive scenarios.
Args:
status: HTTP status code
json_data: JSON response data
text: Text response data
headers: Response headers
raise_on_status: Whether to raise on status check
content_type: Response content type
encoding: Response encoding
simulate_network_delay: Whether to simulate network delay
stream_chunks: List of chunks for streaming responses
Returns:
Mock HTTP response object with comprehensive scenarios
"""
response = AsyncMock()
response.status = status
response.status_code = status # Some libraries use status_code
response.encoding = encoding
# Set up headers with defaults
default_headers = {"Content-Type": content_type}
if headers:
default_headers.update(headers)
response.headers = default_headers
# Handle different content types
if content_type == "application/json":
response_text = json.dumps(json_data) if json_data else text or "{}"
response_json = json_data or {}
else:
response_text = text or ""
response_json = json_data or {}
# Set up response methods
if simulate_network_delay:
import asyncio
async def delayed_text():
await asyncio.sleep(0.1) # Simulate network delay
return response_text
async def delayed_json():
await asyncio.sleep(0.1)
return response_json
response.text = delayed_text
response.json = delayed_json
else:
response.text = AsyncMock(return_value=response_text)
response.json = AsyncMock(return_value=response_json)
response.read = AsyncMock(return_value=response_text.encode(encoding))
response.content = response_text.encode(encoding)
# Set up status flags
response.is_success = 200 <= status < 300
response.is_error = status >= 400
response.is_redirect = 300 <= status < 400
response.is_client_error = 400 <= status < 500
response.is_server_error = status >= 500
# Set up streaming support
if stream_chunks:
async def aiter_bytes():
for chunk in stream_chunks or []:
yield chunk.encode(encoding)
async def aiter_text():
for chunk in stream_chunks or []:
yield chunk
async def aiter_lines():
for chunk in stream_chunks or []:
yield chunk
response.aiter_bytes = aiter_bytes
response.aiter_text = aiter_text
response.aiter_lines = aiter_lines
# Set up error handling
if raise_on_status and status >= 400:
if status == 404:
response.raise_for_status = Mock(side_effect=FileNotFoundError(f"HTTP {status}: Not Found"))
elif status == 429:
response.raise_for_status = Mock(side_effect=ConnectionError(f"HTTP {status}: Too Many Requests"))
elif status == 500:
response.raise_for_status = Mock(side_effect=RuntimeError(f"HTTP {status}: Internal Server Error"))
elif status == 503:
response.raise_for_status = Mock(side_effect=ConnectionError(f"HTTP {status}: Service Unavailable"))
else:
response.raise_for_status = Mock(side_effect=RuntimeError(f"HTTP {status}"))
else:
response.raise_for_status = Mock()
# Add context manager support
response.__aenter__ = AsyncMock(return_value=response)
response.__aexit__ = AsyncMock(return_value=None)
# Add additional properties commonly used in tests
response.url = "https://example.com/api/test"
response.request = Mock()
response.history = []
response.cookies = {}
response.elapsed = Mock()
response.elapsed.total_seconds = Mock(return_value=0.5)
return response
return _create_response
@pytest.fixture
def mock_response_200(mock_response_factory):
"""Create a mock successful (200) HTTP response."""
return mock_response_factory(
status=200,
json_data={"status": "success", "data": {"message": "OK"}},
text='{"status": "success", "data": {"message": "OK"}}',
)
@pytest.fixture
def mock_response_404(mock_response_factory):
"""Create a mock not found (404) HTTP response."""
return mock_response_factory(
status=404,
json_data={"error": "Not found"},
text="Not found",
raise_on_status=True,
)
@pytest.fixture
def mock_response_500(mock_response_factory):
"""Create a mock server error (500) HTTP response."""
return mock_response_factory(
status=500,
json_data={"error": "Internal server error"},
text="Internal server error",
raise_on_status=True,
)
@pytest.fixture
def mock_response_timeout(mock_response_factory):
"""Create a mock timeout response for testing timeout scenarios."""
return mock_response_factory(
status=408,
json_data={"error": "Request timeout"},
text="Request timeout",
simulate_network_delay=True,
raise_on_status=True,
)
@pytest.fixture
def mock_response_rate_limit(mock_response_factory):
"""Create a mock rate limit (429) HTTP response."""
return mock_response_factory(
status=429,
json_data={"error": "Too many requests"},
text="Too many requests",
headers={"Retry-After": "60"},
raise_on_status=True,
)
@pytest.fixture
def mock_response_streaming(mock_response_factory):
"""Create a mock streaming response."""
return mock_response_factory(
status=200,
content_type="text/plain",
stream_chunks=[
"chunk 1\n",
"chunk 2\n",
"chunk 3\n",
],
)
@pytest.fixture
def mock_response_large_json(mock_response_factory):
"""Create a mock response with large JSON payload."""
large_data = {
"items": [{"id": i, "name": f"item_{i}", "data": f"content_{i}"} for i in range(100)],
"metadata": {"total": 100, "page": 1, "per_page": 100},
}
return mock_response_factory(
status=200,
json_data=large_data,
)
@pytest.fixture
def mock_response_html(mock_response_factory):
"""Create a mock HTML response."""
html_content = """
<html>
<head><title>Test Page</title></head>
<body>
<h1>Test Content</h1>
<p>This is test HTML content for scraping.</p>
</body>
</html>
"""
return mock_response_factory(
status=200,
text=html_content,
content_type="text/html",
)
@pytest.fixture
def mock_response_xml(mock_response_factory):
"""Create a mock XML response."""
xml_content = """<?xml version="1.0" encoding="UTF-8"?>
<root>
<item id="1">
<name>Test Item</name>
<value>123</value>
</item>
</root>
"""
return mock_response_factory(
status=200,
text=xml_content,
content_type="application/xml",
)