This commit is contained in:
2025-11-22 11:40:31 +00:00
parent 9474f81ce9
commit 64d19d07f4
7 changed files with 293 additions and 459 deletions

View File

@@ -1,184 +1,73 @@
This is an exceptionally high-quality prototype. You have moved past "scripting" into proper **software engineering for automation**. The separation of concerns, type safety, and architectural patterns used here are usually only seen in mature automation frameworks, not early scaffolds.
This is a high-quality prototype. You have moved beyond simple scripting and built a structured **Automation-as-a-Service** architecture. The use of FastAPI to wrap Playwright, combined with a Command Pattern for actions and a sophisticated browser pool, indicates a strong engineering mindset.
Here is a detailed review across your requested dimensions:
Here is my review of the architecture, scalability, consistency, organization, and comprehensiveness.
### 1. Architecture & Organization
**Verdict:** Strong, Modular, and Professional.
---
* **The "Strings" Registry (`src/guide/app/strings/`):**
This is the standout feature of your architecture. Instead of scattering CSS selectors and magic strings throughout your logic, you have a centralized, type-safe registry (`AppStrings`).
* *Why its good:* If a `data-test` attribute changes in the frontend, you update it in one place. The nested class structure (`app_strings.intake.selectors...`) provides IDE autocompletion, which drastically reduces developer error.
* **The Action Pattern (`src/guide/app/actions/`):**
Using a Command Pattern (via the `DemoAction` protocol) combined with a generic Registry is excellent. It decouples the *execution* of a demo step from the *API trigger*.
* *Recommendation:* Currently, `ActionContext` is passed in. Ensure that if you need to chain actions (e.g., Login -> Navigate -> Submit), the context allows state to persist or pass between actions.
* **Dependency Injection:**
Leveraging FastAPIs `Depends` system to inject the `ActionRegistry`, `BrowserClient`, and `AppSettings` is the correct way to handle state in a web application. It makes unit testing significantly easier because you can mock these dependencies.
### 1. Architecture & Design Patterns
### 2. Scalability (The Browser Pool)
**Verdict:** Designed for Performance.
**Strengths:**
* **Command Pattern (`DemoAction`):** The decision to treat automation steps as discrete, self-contained "Actions" (`src/guide/app/actions/base.py`) is excellent. It decouples the *execution logic* from the *trigger mechanism* (API).
* **Composite Actions:** The `CompositeAction` class allows you to chain atomic actions (Login -> Intake -> Supplier) while maintaining shared state (`ActionContext`). This is crucial for complex user flows without monolithic scripts.
* **Dependency Injection:** Implementing your own DI container in `ActionRegistry` (`src/guide/app/actions/registry.py`) allows you to inject dependencies like `PersonaStore` or `LoginUrl` dynamically. This makes testing individual actions much easier.
* **Separation of Concerns:**
* **Models:** Pure data (Pydantic).
* **Browser:** Low-level Playwright management.
* **Actions:** Business logic.
* **Raindrop:** External API integration.
* **Connection Reuse (`src/guide/app/browser/pool.py`):**
Most automation prototypes launch a new browser per request, which is slow and resource-heavy. Your `BrowserPool` implementation maintains persistent connections (CDP or Headless) and simply allocates pages (tabs) or contexts. This is critical for low-latency responses.
* **CDP vs. Headless:**
Supporting both Remote CDP (for debugging or connecting to existing sessions) and Headless (for server-side execution) via configuration (`HostKind`) is a mature design choice.
* **Risk Area - Page Reclaiming:**
In `_pick_raindrop_page`, you look for an existing page with "raindrop.io".
* *Critique:* In a concurrent environment (multiple requests hitting the same browser host), "picking the first page" is dangerous. You might grab a page currently being used by another request.
* *Fix:* You should likely use **Browser Contexts** strictly. Every API request should spin up a `browser.new_context()`, do its work, and then `context.close()`. This isolates cookies/storage per request and prevents cross-contamination.
**Critique / Risks:**
* **Hybrid Browser/Context Management (`pool.py`):**
* In `BrowserInstance.allocate_page`, you distinguish between "CDP" (reuse existing pages) and "Headless" (new context).
* **Risk:** Reusing a `Page` object (CDP mode) across different "Personas" or requests is dangerous. Even with your locking mechanism, data leakage (cookies, local storage, session tokens) is highly likely unless you are strictly wiping the browser context between `acquire` calls.
* **Recommendation:** In Playwright, the unit of isolation is the **`BrowserContext`**, not the `Page`. Even over CDP, you should ideally create a new *Context* for every request to ensure full isolation, rather than recycling Pages.
### 3. Consistency & Code Quality
**Verdict:** High Discipline.
### 2. Scalability
* **Type Safety:** You are using `typing.Protocol`, `TypeVar`, and Pydantic models (`src/guide/app/models/`) extensively. This makes the codebase robust and self-documenting.
* **Config Management:** The layered configuration (Env Vars -> YAML -> JSON overrides) in `core/config.py` is production-ready. It allows you to deploy this container anywhere without code changes.
* **Error Handling:** You have a specific exception hierarchy (`GuideError`) and a centralized handler. This is much better than generic `try/except Exception` blocks.
**Strengths:**
* **Connection Pooling:** The `BrowserPool` avoids the heavy startup cost of launching a browser for every request.
* **Async First:** The entire stack is `async`/`await`, allowing the Python web server to handle concurrent incoming API requests efficiently while waiting on browser I/O.
### 4. Comprehensiveness
**Verdict:** Good Scaffold, missing Logic "Glue".
**Bottlenecks:**
* **Stateful Service:** Your service is stateful. The `BrowserPool` and `PersonaStore` live in memory.
* *Issue:* If you deploy this to Kubernetes with 2+ replicas, they will not share browser connections. If a user flow requires step 1 on Node A and step 2 on Node B, it will fail because the `shared_state` and browser session are local to Node A.
* *Fix:* Ensure sticky sessions (if using a load balancer) or design the API to be stateless (passing full context/cookies back and forth to the client), though the latter is hard with browser automation.
* **Resource Contention:** Browsers are memory hogs. The `MAX_CONTEXTS_PER_BROWSER = 10` limit is a good guardrail, but 10 concurrent Chromium contexts can easily consume 2-4GB of RAM.
* **Auth:** You have the "Happy Path" scaffolding (`DummyMfaCodeProvider`). The logic to scrape the "Current User" from the DOM to verify login status (`detect_current_persona`) is a smart, resilient touch.
* **External API (Raindrop):** You have a clean separation between UI actions (Playwright) and API actions (GraphQL).
* *Observation:* The `GraphQLClient` currently doesn't seem to share auth state with the Browser session. In many apps, you need the Browser's cookies to authorize the direct GraphQL API calls. You might need a bridge to extract cookies from Playwright and inject them into `httpx` headers.
### 3. Organization & File Structure
### 5. Specific Recommendations for Improvement
**Strengths:**
* **`src/guide/app/strings/registry.py`:** This is a standout feature. Centralizing selectors, labels, and texts into a typed registry acts as a "Page Object Model" layer. It prevents "magic strings" scattered across logic files and makes refactoring UI changes trivial.
* **`raindrop/generated` & `.graphql` files:** Storing GraphQL queries in `.graphql` files (`src/guide/app/raindrop/queries/`) is much cleaner than embedding strings in Python code.
#### A. Refine Browser Concurrency
In `src/guide/app/browser/pool.py`, the current logic for CDP hosts tries to reuse pages:
```python
# Current
pages = list(self.browser.contexts[0].pages)
```
**Recommendation:** Even for CDP, try to create ephemeral contexts if the browser supports it. If you must reuse a specific page (e.g., "The user is watching this specific tab"), ensure your `BrowserInstance` has a locking mechanism so two API requests don't drive the same page simultaneously.
**Critique:**
* **GraphQL Parsing:** In `queries.py`, you are using Regex (`re.findall`) to parse GraphQL files.
* *Risk:* Regex parsing of code is fragile. If a query contains comments or complex nesting, this might break.
* *Recommendation:* Use a proper GraphQL code generator (like `ariadne-codegen` or `gql`) to generate Pydantic models and query strings at build time.
* **Action Auto-discovery:** The logic in `_discover_action_modules` works by scanning the file system. While convenient, this can cause issues if the directory structure changes or if run in environments (like PyInstaller/Docker optimized builds) where file walking behaves differently.
#### B. Action Chaining / Orchestration
Currently, the API executes **one** action per request (`POST /actions`).
**Recommendation:** As the demo gets complex, you will want "Playbooks". You might need a `CompositeAction` that takes a list of Action IDs and executes them in sequence.
```python
# Future Concept
class OnboardingFlowAction(DemoAction):
async def run(self, page, context):
await self.registry.get("auth.login").run(page, context)
await self.registry.get("intake.basic").run(page, context)
```
### 4. Consistency & Code Quality
#### C. Resilience Utilities
You have `utils/retry.py`, which is good.
**Recommendation:** Add a specific `DOMRetry` or `Wait` utility. Playwright has auto-waiting, but often for demos, you need "visual stability" checks (waiting for animations to stop) which are distinct from "element is present" checks.
**Strengths:**
* **Type Hinting:** Extensive use of `typing` and `Pydantic` ensures data passing is rigid and predictable.
* **Error Handling:** The custom exception hierarchy (`GuideError`, `BrowserConnectionError`, etc.) in `errors/exceptions.py` is clean and allows for specific HTTP error responses.
* **Diagnostics:** The `DebugInfo` capture (screenshot/HTML/logs) in `diagnostics.py` is essential for headless automation.
#### D. Logging / Observability
**Recommendation:** Since this runs headless, when it fails, you have zero visibility.
* Add a mechanism in your `ActionEnvelope` to capture a **screenshot** or **HTML dump** if `status == "error"`.
* Return this (base64 encoded) or a link to it in the API response for easier debugging.
**Consistency Nits:**
* **Configuration Loading:** The config loader supports YAML, Env Vars, and JSON overrides. This is comprehensive but complex. Ensure you have strict precedence rules (which you seem to have) to avoid debugging nightmares where a setting is coming from an unexpected JSON env var.
### Summary
This is **not** a throwaway script; it is a microservice designed for automation. The foundations (Config, Registry, Pydantic, DI) are solid. Focus your next steps on **concurrency safety** (browser contexts) and **observability** (screenshots on failure).
### 5. Comprehensiveness
Yes, there are a few areas of **redundancy** and **duplicity** in the codebase. Some of it appears to be intentional boilerplate (to support strict typing), but some of it creates unnecessary maintenance overhead where you have to update two files to change one thing.
**Missing Elements (for a production-ready system):**
1. **Testing:** I see no unit or integration tests in the file list. Since you have Dependency Injection, you should be able to mock the `Page` object and test your Actions.
2. **Observability:** You have `logging.py`, but for an automation system, **Tracing** (e.g., OpenTelemetry) is vital. You want to see a trace span for "API Request" -> "Composite Action" -> "Child Action" -> "Playwright Click".
3. **Security (MFA):** `DummyMfaCodeProvider` is fine for a prototype. However, if this is for a demo, ensure the "Prod" implementation handles MFA secrets securely (e.g., via AWS Secrets Manager or Vault), not just environment variables.
Here are the specific areas of redundancy:
### Summary & Recommendations
### 1. The "Strings Registry" Double-Wrapping (High Boilerplate)
You have defined your selectors/labels in constant classes, and then you **re-define** pointers to them in `registry.py`.
**Verdict:** The scaffold is **excellent**. It is over-engineered for a simple script but perfectly engineered for a robust, long-term automation platform.
* **Source:** `src/guide/app/strings/selectors/intake.py`
* **Redundancy:** `src/guide/app/strings/registry.py`
**The Issue:**
In `registry.py`, you have this pattern:
```python
class IntakeStrings:
class _Selectors:
# You are manually mapping the variable AGAIN
description_field: ClassVar[str] = IntakeSelectors.DESCRIPTION_FIELD
```
If you add a new selector to `IntakeSelectors`, you must also open `registry.py` and add the pointer to expose it via `app_strings`.
**Fix:**
You can eliminate the `registry.py` mapping classes by simply importing the original classes and aliasing them in the root `__init__.py` or `registry.py`.
```python
# src/guide/app/strings/registry.py
from .selectors.intake import IntakeSelectors
from .labels.intake import IntakeLabels
class IntakeNamespace:
selectors = IntakeSelectors
labels = IntakeLabels
class AppStrings:
intake = IntakeNamespace
```
*Result: You get the same autocomplete usage (`app_strings.intake.selectors.DESCRIPTION_FIELD`), but you only define the variable once.*
### 2. Configuration vs. Domain Models (Data Mirroring)
You have two Pydantic models that represent the exact same data structure.
* **File 1:** `src/guide/app/core/config.py` -> `class PersonaConfig`
* **File 2:** `src/guide/app/models/personas/models.py` -> `class DemoPersona`
**The Issue:**
Both classes have `id`, `role`, `email`, `login_method`, `browser_host_id`.
In `src/guide/app/models/personas/store.py`, you explicitly map one to the other:
```python
# Redundant mapping logic
DemoPersona(
id=p.id,
role=PersonaRole(p.role), # Enum conversion is the only real work here
email=p.email,
...
)
```
**Fix:**
Since `DemoPersona` is the domain object, it can inherit from `PersonaConfig` or you can use `PersonaConfig` directly until you actually need domain-specific logic that doesn't belong in the config.
```python
# src/guide/app/models/personas/models.py
from guide.app.core.config import PersonaConfig
class DemoPersona(PersonaConfig):
# Add any runtime-only fields here
pass
```
### 3. GraphQL Definitions (Manual Sync)
This is a common issue in Python GraphQL implementations.
* **Queries:** `src/guide/app/strings/graphql/*.py` (The strings)
* **Types:** `src/guide/app/raindrop/types.py` (The Pydantic models)
**The Issue:**
If you add a field to the `GET_INTAKE_REQUEST` string, you **must** manually update `IntakeRequestData` in `types.py`. This is "Process Redundancy" and is error-prone.
**Fix:**
In a prototype, this is fine. In production, use a tool like **Ariadne Code Gen** or **Turms**. These tools read your `.graphql` query files and *generate* the Pydantic models automatically. This removes `types.py` from manual maintenance entirely.
### 4. Action Registration Pathways (Dual Logic)
You have two ways to register actions, which creates architectural ambiguity.
* **Pathway A (Auto):** `@register_action` decorator in `base.py`.
* **Pathway B (Manual):** Instantiating actions in `registry.py` inside `default_registry`.
**The Issue:**
In `default_registry()`, you have logic to load `@register_action` classes, but you *also* manually instantiate `LoginAsPersonaAction` because it requires dependencies (`PersonaStore`).
This means you have two "sources of truth" for what actions exist in the system.
**Fix:**
Standardize on **Dependency Injection via Factory**.
Remove the distinction. Make *all* actions registered via the class map. Modify the `ActionRegistry.get` method to inspect the class constructor; if the class needs `PersonaStore`, inject it automatically from the context (similar to how `pytest` fixtures work, or simpler manual DI).
Or, simpler: Just register the *factories* for everything, even the simple ones.
### 5. Browser Host Definitions
* **File:** `src/guide/app/models/domain/models.py` defines `BrowserHostsResponse`
* **File:** `src/guide/app/core/config.py` defines `BrowserHostConfig`
`BrowserHostsResponse` essentially wraps a dict of `BrowserHostConfig`.
```python
# models.py
class BrowserHostsResponse(BaseModel):
browser_hosts: dict[str, BrowserHostConfig]
```
This isn't strictly "bad" redundancy (it's a DTO wrapping a Config object), but strictly speaking, `BrowserHostConfig` is being used as both an internal configuration schema and a public API schema. If you ever want to hide a field (like an internal password in the host config) from the API response, you will accidentally leak it.
**Recommendation:** Create a specific `BrowserHostDTO` for the API response that *only* includes the fields the frontend needs (id, kind), and map to it. Currently, you are reusing the Config object, which is efficient but couples your backend config structure to your frontend API contract.
### Summary
The only "Must Fix" to reduce typing effort is **#1 (Strings Registry)**. The rest are architectural trade-offs typical in robust systems, which you can likely live with for now.
**Top 3 Recommendations:**
1. **Fix Browser Isolation:** Review `BrowserInstance.allocate_page`. Move towards creating a fresh `BrowserContext` for every single Action Context/Session, even when using CDP. Do not reuse Pages to avoid state pollution between demo users.
2. **Replace Regex GraphQL:** Switch from regex parsing in `queries.py` to a standard library or code generator.
3. **Add Structured Logging:** Implement JSON logging with `correlation_id` (which you already generate in `ActionContext`) included in every log line. This will be a lifesaver when debugging failed parallel demos.

View File

@@ -10,13 +10,14 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastapi>=0.121.3",
"graphql-core>=3.2.0",
"httpx>=0.27.0",
"playwright>=1.56.0",
"pydantic>=2.12.4",
"pydantic-settings>=2.4.0",
"python-dotenv>=1.2.1",
"pyyaml>=6.0.2",
"uvicorn>=0.30.6",
"httpx>=0.27.0",
]
[tool.hatch.build.targets.wheel]

View File

@@ -7,6 +7,7 @@ from guide.app.auth import DummyMfaCodeProvider, ensure_persona
from guide.app.browser.client import BrowserClient
from guide.app import errors
from guide.app.core.config import AppSettings
from guide.app.core.logging import LoggingManager
from guide.app.models.domain import (
ActionContext,
ActionEnvelope,
@@ -84,6 +85,12 @@ async def execute_action(
params=payload.params,
)
# Set request context variables for logging
_ = LoggingManager.context.correlation_id.set(context.correlation_id)
_ = LoggingManager.context.action_id.set(action_id)
_ = LoggingManager.context.persona_id.set(persona.id if persona else None)
_ = LoggingManager.context.host_id.set(target_host_id)
mfa_provider = DummyMfaCodeProvider()
try:

View File

@@ -9,16 +9,14 @@ from guide.app.browser.pool import BrowserPool
class BrowserClient:
"""Provides page access via a persistent browser pool with context isolation.
This client uses the BrowserPool to efficiently manage connections and provides
concurrent access safety through per-page context locks. Each request acquires
a context lock before using the page and releases it when done, preventing
concurrent state corruption.
This client uses the BrowserPool to obtain fresh browser contexts for each
request. Each context is isolated and closed after use to prevent state
pollution between actions.
Context lifecycle:
- Creation time: Tracked when context is first acquired
- Last access time: Updated on acquire/release
- Idle timeout: 5 minutes (enforced by background cleanup)
- Max limit: 10 contexts per browser (enforced on acquire)
- Creation: Fresh context allocated from pool on request
- Usage: Exclusive use during action execution
- Cleanup: Context closed immediately after use
"""
def __init__(self, pool: BrowserPool) -> None:
@@ -31,30 +29,30 @@ class BrowserClient:
@contextlib.asynccontextmanager
async def open_page(self, host_id: str | None = None) -> AsyncIterator[Page]:
"""Get a page from the pool with context isolation and concurrent access safety.
"""Get a fresh page from the pool with guaranteed isolation.
The page is obtained from the pool's persistent browser connection. The context
manager acquires an exclusive lock for the page duration, preventing concurrent
access. Last access time is updated to track idle timeout.
Allocates a new context and page for this request. The context is closed
after the with block completes, ensuring complete isolation from other
requests.
Args:
host_id: The host identifier, or None for the default host
Yields:
A Playwright Page instance
A Playwright Page instance with a fresh, isolated context
The page context lock is held for the duration of the with block, ensuring
only one request uses the page at a time.
Raises:
ConfigError: If the host_id is invalid or not configured
BrowserConnectionError: If the browser connection fails
"""
page = await self.pool.get_page(host_id)
_, lock = await self.pool.acquire_page_context(page, host_id)
async with lock:
context, page = await self.pool.allocate_context_and_page(host_id)
try:
yield page
finally:
# Cleanup occurs via background task checking idle timeout
pass
# Explicitly close the context to ensure complete cleanup
# and prevent state leakage to subsequent requests
with contextlib.suppress(Exception):
await context.close()
__all__ = ["BrowserClient"]

View File

@@ -5,16 +5,13 @@ expensive overhead of launching/connecting to browsers on each request.
Architecture:
- BrowserPool: Manages the lifecycle of browser instances by host
- Per CDP host: Single persistent connection, multiple pages
- Per Headless host: Single persistent browser, multiple contexts
- PageContextPool: Manages context lifecycle with TTL (5-minute idle) and concurrency limits (10 max)
- Per host: Single persistent browser connection
- Per action: Fresh BrowserContext for complete isolation
- No page/context pooling: Each action gets a clean slate
"""
import asyncio
import contextlib
import logging
from datetime import datetime, timedelta, timezone
from typing import NamedTuple
from playwright.async_api import (
Browser,
@@ -30,122 +27,12 @@ from guide.app import errors
_logger = logging.getLogger(__name__)
# Constants for context lifecycle management
CONTEXT_IDLE_TIMEOUT = timedelta(minutes=5)
MAX_CONTEXTS_PER_BROWSER = 10
class PageContextMetadata(NamedTuple):
"""Metadata for a page context instance."""
page_id: str
context_index: int
creation_time: datetime
last_access_time: datetime
access_count: int
lock: asyncio.Lock
def _now_utc() -> datetime:
"""Get current UTC time."""
return datetime.now(timezone.utc)
class PageContextPool:
"""Manages the lifecycle of browser contexts for a single page.
Tracks context metadata (creation time, last access, usage count) and enforces:
- Concurrent access safety via asyncio.Lock per context
- Context TTL (5-minute idle timeout)
- Max context limit (10 per browser)
"""
def __init__(self) -> None:
"""Initialize the page context pool."""
self._contexts: dict[str, PageContextMetadata] = {}
self._counter: int = 0
self._pool_lock: asyncio.Lock = asyncio.Lock()
async def acquire(self, page: Page) -> tuple[str, asyncio.Lock]:
"""Acquire or create a context for a page.
Returns:
Tuple of (context_id, lock) for the page
"""
async with self._pool_lock:
page_id = id(page).__str__()
now = _now_utc()
if page_id in self._contexts:
metadata = self._contexts[page_id]
# Update access metadata
self._contexts[page_id] = PageContextMetadata(
page_id=metadata.page_id,
context_index=metadata.context_index,
creation_time=metadata.creation_time,
last_access_time=now,
access_count=metadata.access_count + 1,
lock=metadata.lock,
)
return (page_id, self._contexts[page_id].lock)
# Create new context
if len(self._contexts) >= MAX_CONTEXTS_PER_BROWSER:
# Clean up expired contexts to make room
_ = self._cleanup_expired_unlocked()
if len(self._contexts) >= MAX_CONTEXTS_PER_BROWSER:
msg = f"Max contexts ({MAX_CONTEXTS_PER_BROWSER}) exceeded for page"
raise errors.GuideError(msg)
self._counter += 1
metadata = PageContextMetadata(
page_id=page_id,
context_index=self._counter,
creation_time=now,
last_access_time=now,
access_count=1,
lock=asyncio.Lock(),
)
self._contexts[page_id] = metadata
return (page_id, metadata.lock)
async def cleanup_expired(self) -> int:
"""Remove expired contexts (older than idle timeout).
Returns:
Number of contexts cleaned up
"""
async with self._pool_lock:
return self._cleanup_expired_unlocked()
def _cleanup_expired_unlocked(self) -> int:
"""Remove expired contexts (must be called with lock held)."""
now = _now_utc()
expired = [
page_id
for page_id, metadata in self._contexts.items()
if now - metadata.last_access_time > CONTEXT_IDLE_TIMEOUT
]
for page_id in expired:
del self._contexts[page_id]
if expired:
_logger.debug(f"Cleaned up {len(expired)} expired contexts")
return len(expired)
def get_stats(self) -> dict[str, int]:
"""Get pool statistics for monitoring."""
return {"contexts": len(self._contexts), "counter": self._counter}
class BrowserInstance:
"""Manages a single browser connection and its lifecycle.
Tracks page context metadata (creation time, last access, usage count) and enforces
concurrent access safety via asyncio.Lock per page, context TTL (5-minute idle),
and max context limits (10 per browser).
Creates fresh contexts for each request to ensure complete isolation
between actions. No context pooling or reuse.
"""
def __init__(
@@ -161,79 +48,30 @@ class BrowserInstance:
self.host_id: str = host_id
self.host_config: BrowserHostConfig = host_config
self.browser: Browser = browser
self._contexts: list[BrowserContext] = []
self._page_context_pool: PageContextPool = PageContextPool()
async def allocate_page(self) -> Page:
"""Allocate a new page from the browser.
async def allocate_context_and_page(self) -> tuple[BrowserContext, Page]:
"""Allocate a fresh context and page for this request.
For CDP hosts, uses the existing page pool.
For headless hosts, creates a new context and page.
"""
if self.host_config.kind == HostKind.CDP:
# CDP: reuse existing pages from Raindrop browser
return self._pick_raindrop_page()
# Headless: create a new context and page
context = await self.browser.new_context()
self._contexts.append(context)
return await context.new_page()
async def acquire_page_context(self, page: Page) -> tuple[str, asyncio.Lock]:
"""Acquire a context lock for a page (concurrent access safety).
Args:
page: The Playwright page instance
Both CDP and headless modes create new contexts for complete isolation.
Returns:
Tuple of (context_id, lock) for the page
"""
return await self._page_context_pool.acquire(page)
async def cleanup_expired_contexts(self) -> int:
"""Clean up expired (idle timeout) page contexts.
Returns:
Number of contexts cleaned up
"""
return await self._page_context_pool.cleanup_expired()
def _pick_raindrop_page(self) -> Page:
"""Find and return an existing Raindrop page from the browser.
Tuple of (context, page) - caller must close context when done
Raises:
BrowserConnectionError: If no pages are available in the browser
BrowserConnectionError: If context/page creation fails
"""
raindrop_url_snippet = "raindrop.io" # Common URL pattern
pages: list[Page] = []
for context in self.browser.contexts:
pages.extend(context.pages)
pages = pages or (
list(self.browser.contexts[0].pages) if self.browser.contexts else []
)
# Try to find a Raindrop page, fall back to any page
if not pages:
try:
context = await self.browser.new_context()
page = await context.new_page()
return context, page
except Exception as exc:
raise errors.BrowserConnectionError(
f"No pages available in {self.host_id} browser"
)
# Try to find a page with Raindrop URL
raindrop_page = next(
(
page
for page in reversed(pages)
if raindrop_url_snippet in (page.url or "")
),
None,
)
return raindrop_page or pages[-1]
f"Failed to allocate page for host {self.host_id}",
details={"host_id": self.host_id, "host_kind": self.host_config.kind},
) from exc
async def close(self) -> None:
"""Close all contexts and the browser connection."""
for context in self._contexts:
with contextlib.suppress(Exception):
await context.close()
self._contexts.clear()
"""Close the browser connection."""
with contextlib.suppress(Exception):
await self.browser.close()
@@ -241,9 +79,9 @@ class BrowserInstance:
class BrowserPool:
"""Manages browser instances across multiple hosts.
Maintains one persistent browser connection per host, allocating pages
on demand and managing the lifecycle of connections. Also manages page
context lifecycle with TTL enforcement and concurrent access safety.
Maintains one persistent browser connection per host. Browser connections are
reused, but contexts are created fresh for each request to ensure complete
isolation between actions.
"""
def __init__(self, settings: AppSettings) -> None:
@@ -256,22 +94,16 @@ class BrowserPool:
self._instances: dict[str, BrowserInstance] = {}
self._playwright: Playwright | None = None
self._closed: bool = False
self._cleanup_task: asyncio.Task[None] | None = None
async def initialize(self) -> None:
"""Initialize the browser pool.
Starts the Playwright instance and the background cleanup task.
Browser connections are created lazily on first request to avoid startup delays.
Starts the Playwright instance. Browser connections are created lazily
on first request to avoid startup delays.
"""
if self._playwright is not None:
return
self._playwright = await async_playwright().start()
# Start background cleanup task for expired contexts
if self._cleanup_task is None or self._cleanup_task.done():
self._cleanup_task = asyncio.create_task(
self._cleanup_expired_contexts_loop()
)
_logger.info("Browser pool initialized")
async def close(self) -> None:
@@ -280,12 +112,6 @@ class BrowserPool:
return
self._closed = True
# Cancel cleanup task
if self._cleanup_task and not self._cleanup_task.done():
_ = self._cleanup_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._cleanup_task
for instance in self._instances.values():
with contextlib.suppress(Exception):
await instance.close()
@@ -297,8 +123,10 @@ class BrowserPool:
self._playwright = None
_logger.info("Browser pool closed")
async def get_page(self, host_id: str | None = None) -> Page:
"""Get a page from the pool for the specified host.
async def allocate_context_and_page(
self, host_id: str | None = None
) -> tuple[BrowserContext, Page]:
"""Allocate a fresh context and page for the specified host.
Lazily creates browser connections on first request per host.
@@ -306,7 +134,7 @@ class BrowserPool:
host_id: The host identifier, or None for the default host
Returns:
A Playwright Page instance
Tuple of (context, page) - caller must close context when done
Raises:
ConfigError: If the host_id is invalid or not configured
@@ -330,42 +158,7 @@ class BrowserPool:
instance = await self._create_instance(resolved_id, host_config)
self._instances[resolved_id] = instance
return await self._instances[resolved_id].allocate_page()
async def acquire_page_context(
self, page: Page, host_id: str | None = None
) -> tuple[str, asyncio.Lock]:
"""Acquire a context lock for a page (concurrent access safety).
Args:
page: The Playwright page instance
host_id: The host identifier, or None for the default host
Returns:
Tuple of (context_id, lock) for the page
"""
resolved_id = host_id or self.settings.default_browser_host_id
if resolved_id not in self._instances:
raise errors.ConfigError(f"Unknown browser host '{resolved_id}'")
return await self._instances[resolved_id].acquire_page_context(page)
async def _cleanup_expired_contexts_loop(self) -> None:
"""Background task that periodically cleans up expired page contexts.
Runs every 30 seconds to enforce context TTL (5-minute idle timeout).
"""
try:
while not self._closed:
await asyncio.sleep(30)
for instance in self._instances.values():
with contextlib.suppress(Exception):
cleaned = await instance.cleanup_expired_contexts()
if cleaned:
_logger.debug(
f"Cleaned up {cleaned} expired contexts from host '{instance.host_id}'"
)
except asyncio.CancelledError:
_logger.debug("Context cleanup task cancelled")
return await self._instances[resolved_id].allocate_context_and_page()
async def _create_instance(
self, host_id: str, host_config: BrowserHostConfig
@@ -433,4 +226,4 @@ class BrowserPool:
raise errors.ConfigError(f"Unsupported browser type '{browser}'")
__all__ = ["BrowserPool", "BrowserInstance", "PageContextPool", "PageContextMetadata"]
__all__ = ["BrowserPool", "BrowserInstance"]

View File

@@ -1,14 +1,87 @@
import contextvars
import json
import logging
import sys
from typing import override
def configure_logging(
level: int | str = logging.INFO, correlation_id: str | None = None
) -> None:
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(message)s",
class _ContextVars:
"""Container for request-scoped logging context variables."""
correlation_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"correlation_id", default=None
)
if correlation_id:
_ = logging.LoggerAdapter(
logging.getLogger(), {"correlation_id": correlation_id}
action_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"action_id", default=None
)
persona_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"persona_id", default=None
)
host_id: contextvars.ContextVar[str | None] = contextvars.ContextVar(
"host_id", default=None
)
class ContextJsonFormatter(logging.Formatter):
"""JSON formatter that includes request context variables in every log entry."""
@override
def format(self, record: logging.LogRecord) -> str:
"""Format the log record as JSON with context variables."""
log_data: dict[str, object] = {
"timestamp": self.formatTime(record, datefmt="%Y-%m-%dT%H:%M:%S"),
"level": record.levelname,
"logger": record.name,
"module": record.module,
"line": record.lineno,
"msg": record.getMessage(),
}
# Add request context fields if set
if correlation_id := _ContextVars.correlation_id.get():
log_data["correlation_id"] = correlation_id
if action_id := _ContextVars.action_id.get():
log_data["action_id"] = action_id
if persona_id := _ContextVars.persona_id.get():
log_data["persona_id"] = persona_id
if host_id := _ContextVars.host_id.get():
log_data["host_id"] = host_id
# Add exception info if present
if record.exc_info:
log_data["exc_info"] = self.formatException(record.exc_info)
return json.dumps(log_data)
class LoggingManager:
"""Manages structured JSON logging with request context injection."""
context: type[_ContextVars] = _ContextVars
@staticmethod
def configure(level: int | str = logging.INFO) -> None:
"""Configure JSON logging with structured output and context injection.
All log entries will include:
- ISO timestamp
- Log level
- Logger name, module, line number
- Request context (if set via context variables)
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.handlers.clear()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(ContextJsonFormatter())
root_logger.addHandler(handler)
# Backward compatibility: expose configure_logging function
def configure_logging(level: int | str = logging.INFO) -> None:
"""Configure JSON logging (wrapper for LoggingManager.configure)."""
LoggingManager.configure(level)

View File

@@ -4,15 +4,15 @@ This module provides query definitions loaded from external .graphql files,
keeping queries separate from Python code for better maintainability.
"""
import re
from pathlib import Path
from typing import cast
from graphql import OperationDefinitionNode, parse
def _load_queries(filename: str) -> dict[str, str]:
"""Load GraphQL queries from a file.
"""Load GraphQL queries from a file using proper GraphQL parser.
Each query/mutation must have an explicit operation name (e.g., 'query GetUser' or 'mutation CreateUser').
Each query/mutation must have an explicit operation name.
Returns a dict mapping operation names to complete query strings.
Args:
@@ -23,6 +23,7 @@ def _load_queries(filename: str) -> dict[str, str]:
Raises:
FileNotFoundError: If the query file doesn't exist
GraphQLError: If the GraphQL syntax is invalid
"""
query_file = Path(__file__).parent.parent / "queries" / filename
if not query_file.exists():
@@ -30,34 +31,106 @@ def _load_queries(filename: str) -> dict[str, str]:
raise FileNotFoundError(msg)
content = query_file.read_text(encoding="utf-8")
# Parse the entire document with proper GraphQL parser
# Raises graphql.GraphQLError if syntax is invalid
document = parse(content)
queries: dict[str, str] = {}
# Split by 'query' or 'mutation' keywords followed by operation name
pattern = r"((?:query|mutation)\s+\w+\s*\([^)]*\)?\s*\{[^}]*\})"
matches: list[str] = cast(list[str], re.findall(pattern, content, re.DOTALL))
for match_str in matches:
match_stripped = match_str.strip()
if lines := match_stripped.split("\n"):
first_line = lines[0]
if op_match := re.search(
r"(?:query|mutation)\s+(\w+)", first_line
):
op_name = op_match[1]
queries[op_name] = match_stripped
# Extract each operation definition
for definition in document.definitions:
if isinstance(definition, OperationDefinitionNode) and definition.name:
op_name = definition.name.value
# Extract the source text for this operation from the original content
source_text = _extract_operation_source(content, op_name)
if source_text:
queries[op_name] = source_text
return queries
def _extract_operation_source(content: str, op_name: str) -> str:
"""Extract the source text of an operation from GraphQL content.
Args:
content: The full GraphQL file content
op_name: The operation name to extract
Returns:
The operation source text (trimmed)
"""
lines = content.split("\n")
start_idx: int | None = None
end_idx: int | None = None
brace_count = 0
for i, line in enumerate(lines):
stripped = line.strip()
# Look for operation definition line
if start_idx is None and op_name in stripped:
if stripped.startswith(("query ", "mutation ")):
start_idx = i
brace_count = stripped.count("{") - stripped.count("}")
continue
# Count braces if we found the start
if start_idx is not None:
if i > start_idx:
brace_count += stripped.count("{") - stripped.count("}")
# When braces balance, we've found the end
if brace_count == 0:
end_idx = i
break
return (
"\n".join(lines[start_idx : end_idx + 1]).strip()
if start_idx is not None and end_idx is not None
else ""
)
def _validate_query_loaded(query_dict: dict[str, str], name: str, value: str) -> str:
"""Validate that a required query is loaded.
Args:
query_dict: Dictionary of loaded queries
name: Operation name to look for
value: Current value (empty string if not loaded)
Returns:
The query string
Raises:
RuntimeError: If the query is not found or is empty
"""
if not value or not query_dict.get(name):
msg = f"Required GraphQL operation '{name}' not found or is empty"
raise RuntimeError(msg)
return value
# Load all intake queries
_intake_queries = _load_queries("intake.graphql")
GET_INTAKE_REQUEST = _intake_queries.get("GetIntakeRequest", "")
CREATE_INTAKE_REQUEST = _intake_queries.get("CreateIntakeRequest", "")
GET_INTAKE_REQUEST = _validate_query_loaded(
_intake_queries, "GetIntakeRequest", _intake_queries.get("GetIntakeRequest", "")
)
CREATE_INTAKE_REQUEST = _validate_query_loaded(
_intake_queries,
"CreateIntakeRequest",
_intake_queries.get("CreateIntakeRequest", ""),
)
# Load all sourcing queries
_sourcing_queries = _load_queries("sourcing.graphql")
LIST_SUPPLIERS = _sourcing_queries.get("ListSuppliers", "")
ADD_SUPPLIER = _sourcing_queries.get("AddSupplier", "")
LIST_SUPPLIERS = _validate_query_loaded(
_sourcing_queries, "ListSuppliers", _sourcing_queries.get("ListSuppliers", "")
)
ADD_SUPPLIER = _validate_query_loaded(
_sourcing_queries, "AddSupplier", _sourcing_queries.get("AddSupplier", "")
)
__all__ = [