Add logout action and enhance OTP request with parameter metadata

- Introduced LogoutAction for clearing browser session state, including localStorage cleanup and optional logout button interaction.
- Added comprehensive CLAUDE_ENHANCED.md documentation covering architecture patterns, dependency injection, browser pool management, and development workflows.
- Enhanced RequestOtpAction with ActionParamInfo metadata for better API documentation and validation.
- Updated auth module exports to include the
This commit is contained in:
2025-12-08 22:45:08 +00:00
parent 07e7f4bfff
commit 891df0c924
24 changed files with 1922 additions and 250 deletions

359
CLAUDE_ENHANCED.md Normal file
View File

@@ -0,0 +1,359 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
A FastAPI-based guided demo platform that automates browser interactions with Raindrop using Playwright and browser extensions. The app executes data-driven actions (stored in `ActionRegistry`) on behalf of personas that target configured browser hosts (CDP, headless, or extension). All configuration is externalized via YAML files and environment overrides.
**Core Architecture:**
- **Dependency Injection System** - Auto-wires actions with dependencies through constructor parameter matching
- **Session Management** - JWT verification, localStorage injection, and offline session validation
- **Multi-Mode Browser Pool** - CDP (cached pages), headless (fresh contexts), and extension modes
- **Form Field Intelligence** - Type inference, selector escaping, and helper dispatch for smart form filling
- **GraphQL Integration** - Auto token extraction, connection pooling, and structured error handling
**Entry Point:** `python -m guide` (runs `src/guide/main.py``guide.app.main:app`)
**Python Version:** 3.12+
**Key Dependencies:** FastAPI, Playwright, Pydantic v2, PyYAML, httpx
## Essential Commands
```bash
# Install dependencies
uv sync
# Type checking (required before commits)
basedpyright src
# Compile sanity check
python -m compileall src/guide
# Run development server (default: localhost:8765)
python -m guide
# or with custom host/port:
HOST=127.0.0.1 PORT=9000 python -m guide
# View API docs
# Navigate to http://localhost:8000/docs
# Key endpoints:
# GET /healthz # liveness check
# GET /actions # list action metadata
# POST /actions/{id}/execute # execute action; returns ActionEnvelope with correlation_id
# GET /config/browser-hosts # view current default + host map
```
## Code Structure
**Root module:** `src/guide/app/`
- **`actions/`** — Demo action implementations with auto-discovery via `@register_action` decorator. Auto-wired via `ActionRegistry` with dependency injection.
- **`auth/`** — Session management with JWT verification, localStorage injection, and MFA provider interfaces.
- **`browser/`** — Multi-mode browser automation core:
- `pool.py``BrowserPool` manages persistent browser instances per host with context allocation
- `client.py``BrowserClient` wraps BrowserPool with async context manager pattern
- `elements/` — Reusable UI interaction helpers with type inference and selector escaping
- `utils.py` — Cross-cutting utilities for JavaScript injection safety
- **`core/`** — App bootstrap: `config.py` (AppSettings with Pydantic v2), `logging.py` (structured logging)
- **`errors/`** — `GuideError` hierarchy with consistent HTTP response mapping
- **`raindrop/`** — GraphQL client with auto token extraction and connection pooling
- **`strings/`** — Centralized selectors, labels, and copy with domain-keyed lookups
- **`models/`** — Domain and persona models using Pydantic v2
- **`utils/`** — Shared helpers including retry logic and JWT verification
- **`api/`** — FastAPI routers mapping requests to ActionRegistry
## Architecture Patterns
### Application Startup & Dependency Wiring
The application follows a structured initialization flow in `main.py`:
1. **Entry point** (`main.py:17`) - Uvicorn loads FastAPI app from `guide.app.main:app`
2. **Load configuration** (`main.py:27`) - Settings loaded from YAML files and environment variables
3. **Create stores** (`main.py:28-29`) - PersonaStore and BoardStore from configuration
4. **Session manager** (`main.py:39`) - Creates SessionManager with storage and TTL
5. **Action registry** (`main.py:47`) - Auto-discovers actions via `actions/registry.py`
6. **Browser pool** (`main.py:55`) - Multi-mode browser instance management
7. **Keep-alive service** (`main.py:59`) - Background task to prevent CDP timeouts
**Dependency Injection Context:**
```python
registry = default_registry(
persona_store, # PersonaStore(settings)
settings.raindrop_base_url, # Raindrop GraphQL endpoint
# Additional dependencies injected by parameter name matching
)
```
### Action Execution with Dependency Injection
Actions are resolved and executed through a structured pipeline:
1. **Request handling** (`actions.py:40`) - POST to `/actions/{action_id}` endpoint
2. **Registry resolution** (`base.py:349`) - ActionRegistry.get() looks up action by ID
3. **Instantiation with DI** (`base.py:321`) - Inspects constructor signature and injects matching dependencies
4. **Page allocation** (`client.py:40`) - BrowserClient.open_page() delegates to pool
5. **Action execution** (`actions.py:64`) - Calls action.run() with page and context
**Dependency Injection Example:**
```python
@register_action
class LoginAsPersonaAction(DemoAction):
def __init__(self, session_manager: SessionManager):
# session_manager injected by matching parameter name
self._session_manager = session_manager
```
### Browser Pool Context Allocation
The browser pool supports three modes with different allocation strategies:
**CDP Mode (browserless):**
- Queries `browser.contexts` on first use
- Caches page reference to avoid refresh on subsequent requests
- Optional storage clearing if `isolate` flag enabled
**Headless Mode:**
- Creates fresh browser context for complete isolation
- New page allocated per request
**Extension Mode:**
- Uses WebSocket-based Terminator Bridge extension
- Avoids CDP page refresh issues entirely
- Supports cross-network operation
**Context Allocation Flow:**
```python
# BrowserClient delegates to pool
context, page, should_close = await pool.allocate_context_and_page(
host_id, storage_state=storage_state
)
# Pool routes to appropriate BrowserInstance by host_id
# Returns context, page, and cleanup flag
```
### Session Restoration with JWT Verification
Session management provides robust offline validation and restoration:
1. **Load session** (`session_manager.py:317`) - SessionStorage reads JSON from `.sessions/{persona_id}`
2. **Offline validation** (`session_manager.py:151`) - Checks TTL expiry and JWT token expiry
3. **Navigate to origin** (`session_manager.py:368`) - Page.goto(base_url) to establish context
4. **Token extraction** (`session.py:555`) - Scans localStorage for Auth0 SPA SDK keys
5. **JWT verification** (`utils/jwt.py:114`) - Fetches JWKS and verifies RS256 signature
6. **localStorage injection** (`session_manager.py:276`) - Restores session data into browser
7. **Persona verification** (`session.py:118`) - Detects current logged-in persona
**Reusable Token Extraction:**
```python
# Common pattern across modules
from guide.app.auth.session import extract_bearer_token
token = await extract_bearer_token(page)
# Scans localStorage for Auth0 keys and parses nested JSON structure
```
### Form Field Filling with Type Inference
Smart form filling reconciles GraphQL schemas with live DOM elements:
1. **Schema fetch** - Retrieves GraphQL schema for target board
2. **Form context build** (`context_builder.py:165`) - Extracts all form fields and matches to schema
3. **Type inference** (`field_inference.py:79`) - Inspects DOM for ARIA roles and MUI classes
4. **Helper selection** (`field_inference.py:243`) - Maps field types to helper functions
5. **Dispatch execution** - Routes to appropriate fill helper based on inferred type
**Field Type Inference:**
```python
# Reusable pattern for DOM inspection
field_type = await infer_type_from_element(page, selector)
# Returns: select_single, select_combobox, fill_with_react_events, etc.
helper = select_helper_for_type(field_type)
```
**Selector Safety:**
```python
# Critical utility for JavaScript injection safety
from guide.app.browser.utils import escape_selector
escaped = escape_selector(selector) # Escapes \, ', " for JS strings
```
### GraphQL Query Execution with Token Handling
GraphQL client provides robust token management and error handling:
1. **Auto token extraction** (`graphql.py:77`) - Reuses extract_bearer_token from auth/session
2. **Connection pooling** (`graphql.py:30`) - Persistent httpx.AsyncClient for efficiency
3. **Response validation** (`graphql.py:139`) - Pydantic TypeAdapter for structural validation
4. **Error handling** (`graphql.py:166`) - Raises typed exceptions with structured details
5. **Retry logic** (`utils/retry.py:83`) - Exponential backoff decorator for resilience
**GraphQL Execution Pattern:**
```python
# Auto token discovery from page context
client = GraphQLClient(base_url=settings.raindrop_base_url)
result = await client.execute(
query=SOME_QUERY,
bearer_token=None, # Auto-extracted from page if available
variables={"id": board_id}
)
```
### Reusable Utilities Distribution
**Selector Escaping Utility:**
- Core definition in `browser/utils.py` (`escape_selector()`)
- Used across multiple modules for safe JavaScript string interpolation
- Critical for preventing XSS and selector injection attacks
**Retry Logic:**
- Exponential backoff implementation in `utils/retry.py`
- Used in GraphQL client and other network operations
- Configurable retry attempts and backoff strategy
**JWT Verification:**
- Auth0 JWKS fetching and RS256 signature verification
- Reusable across session management and token validation
- Handles key rotation and token expiry gracefully
## Development Workflow
1. **Edit code** (actions, browser logic, GraphQL ops, etc.)
2. **Run type check:** `basedpyright src` (catches generic types, missing annotations)
3. **Sanity compile:** `python -m compileall src/guide` (syntax check)
4. **Smoke test:** `python -m guide` then hit `/docs` or manual test via curl
5. **Review error handling:** ensure `GuideError` subclasses are raised, not generic exceptions
6. **Commit** with scoped, descriptive message
## Type & Linting Standards
- **Python 3.12+:** Use PEP 604 unions (`str | None`), built-in generics (`list[str]`, `dict[str, JSONValue]`)
- **Ban `Any` and `# type: ignore`:** Use type guards or Protocol instead
- **Pydantic v2:** Explicit types, model_validate for parsing, model_copy for immutable updates
- **Type checker:** Pyright (via basedpyright)
- **Docstrings:** Imperative style, document public APIs, include usage examples
## Error Handling & Logging
- Always raise `GuideError` subclasses (not generic `Exception`); routers translate to HTTP responses
- Log via `core/logging` (structured, levelled). Include persona/action IDs and host targets for traceability
- For browser flows, use Playwright traces (enabled by default in `BrowserClient`); disable only intentionally
- Validate external inputs early; surface schema/connection issues as `GuideError`
## Testing & Quality Gates
- **Minimum gate:** `basedpyright src` + `python -m compileall src/guide` before merge
- **Test Coverage:** Comprehensive unit and integration suites in `tests/` directory
- **Test Structure:**
- `tests/unit/` — Unit tests for strings registry, models, action registration
- `tests/integration/` — Integration tests for BrowserClient, BrowserPool, browser lifecycle
- `conftest.py` — Shared fixtures for mock objects and test setup
- Mock Playwright/GraphQL in tests; avoid real network/CDP calls
- Require deterministic fixtures; document any env vars needed in test module docstring
## Performance & Footprint
- Keep browser sessions short-lived; close contexts to avoid handle leaks
- Cache expensive GraphQL lookups (per-request OK, global only if safe)
- Don't widen dependencies without justification; stick to project pins in `pyproject.toml`
- Promptly close Playwright contexts/browser handles (wrapped in contextmanager; keep action code lean)
## Action Registration & Dependency Injection
### Registering a New Action
Use the `@register_action` decorator to auto-discover and register actions:
```python
from actions.base import DemoAction, register_action
@register_action
class MyAction(DemoAction):
id = "my-action"
description = "Does something cool"
category = "demo"
# Optional: Declare dependencies (auto-injected by ActionRegistry)
def __init__(self, session_manager: SessionManager, persona_store: PersonaStore):
self._session_manager = session_manager
self.persona_store = persona_store
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
# Implementation using injected dependencies
restored = await self._session_manager.restore_session(page, context.persona)
if not restored:
# Handle session restoration failure
pass
```
**Auto-Discovery:** `ActionRegistry` uses `pkgutil.walk_packages()` to discover all modules in `actions/` and collect all `@register_action` decorated classes.
**Dependency Injection:** Parameters in `__init__` are matched by name against DI context dict during action instantiation.
### Multi-Step Workflows (CompositeAction)
For workflows spanning multiple steps with shared state:
```python
@register_action
class MyWorkflow(CompositeAction):
id = "my-workflow"
description = "Multi-step workflow"
category = "demo"
child_actions = ("step1-action", "step2-action", "step3-action")
async def on_step_complete(self, step_id: str, result: ActionResult) -> None:
# Update shared_state after each step
# Accessed in child actions via context.shared_state dict
pass
```
### Browser Interaction Patterns
**Safe Selector Usage:**
```python
from guide.app.browser.utils import escape_selector
# Always escape selectors for JavaScript injection
escaped_selector = escape_selector(selector)
result = await page.evaluate(f"document.querySelector('{escaped_selector}')")
```
**Form Field Type Inference:**
```python
from guide.app.browser.elements.field_inference import infer_type_from_element, select_helper_for_type
# Infer field type from DOM element
field_type = await infer_type_from_element(page, selector)
helper_name = select_helper_for_type(field_type)
# Dispatch to appropriate helper
if helper_name == "select_single":
await select_single(page, selector, value)
elif helper_name == "fill_with_react_events":
await fill_text(page, selector, value)
```
**GraphQL with Auto Token Management:**
```python
from guide.app.raindrop.graphql import GraphQLClient
from guide.app.auth.session import extract_bearer_token
# Token automatically extracted from page if available
client = GraphQLClient(base_url=settings.raindrop_base_url)
result = await client.execute(query=GET_BOARD, variables={"id": board_id})
```
## Quick Checklist (New Feature)
- [ ] Add action in `actions/` submodule with `@register_action` decorator
- [ ] Declare dependencies in `__init__` for automatic injection
- [ ] Use `PageLike` type for page parameter to support all browser modes
- [ ] Use `escape_selector()` for any JavaScript string interpolation
- [ ] For form interactions, use type inference and helper dispatch patterns
- [ ] For GraphQL operations, rely on auto token extraction from page context
- [ ] Run `basedpyright src` + `python -m compileall src/guide` for validation
- [ ] Test via `python -m guide` + navigate to `http://localhost:8765/docs`
- [ ] Review error handling; raise `GuideError` subclasses, not generic exceptions
- [ ] Commit with descriptive message following established patterns

View File

@@ -1,4 +1,5 @@
from guide.app.actions.auth.login import LoginAsPersonaAction
from guide.app.actions.auth.logout import LogoutAction
from guide.app.actions.auth.request_otp import RequestOtpAction
__all__ = ["LoginAsPersonaAction", "RequestOtpAction"]
__all__ = ["LoginAsPersonaAction", "LogoutAction", "RequestOtpAction"]

View File

@@ -0,0 +1,103 @@
"""Logout action to clear browser session state."""
from typing import ClassVar, cast, override
from loguru import logger
from guide.app.actions.base import DemoAction, register_action
from guide.app.auth.session import logout
from guide.app.browser.types import PageLike
from guide.app.core.config import AppSettings
from guide.app.models.domain import ActionContext, ActionResult
_JS_CLEAR_LOCAL_STORAGE = """
(() => {
try {
const count = localStorage.length;
localStorage.clear();
return { cleared: count, error: null };
} catch (e) {
return { cleared: 0, error: e.message };
}
})();
"""
@register_action
class LogoutAction(DemoAction):
"""Log out current user and clear browser session state.
Clears localStorage and optionally clicks logout button.
Use this to reset browser state before logging in as a different persona.
Request params:
navigate_first: URL to navigate to before clearing (optional)
click_logout: Whether to click logout button if found (default: true)
"""
id: ClassVar[str] = "auth.logout"
description: ClassVar[str] = "Log out and clear browser session state."
category: ClassVar[str] = "auth"
_settings: AppSettings
def __init__(self, settings: AppSettings) -> None:
"""Initialize logout action.
Args:
settings: Application settings.
"""
self._settings = settings
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Execute logout and clear session state.
Args:
page: Browser page instance.
context: Action context with params.
Returns:
ActionResult with logout status.
"""
navigate_url = context.params.get("navigate_first")
click_logout = context.params.get("click_logout", True)
# Navigate first if URL provided (needed for localStorage access)
if navigate_url and isinstance(navigate_url, str):
logger.info("Navigating to {} before logout", navigate_url)
_ = await page.goto(navigate_url)
elif page.url == "about:blank":
# Navigate to app base URL so we can access localStorage
base_url = self._settings.raindrop_base_url
logger.info("Navigating to {} for localStorage access", base_url)
_ = await page.goto(base_url)
# Clear localStorage
result = await page.evaluate(_JS_CLEAR_LOCAL_STORAGE)
cleared_count = 0
if isinstance(result, dict):
result_dict = cast(dict[str, object], result)
raw_cleared = result_dict.get("cleared")
if isinstance(raw_cleared, int):
cleared_count = raw_cleared
logger.info("Cleared {} localStorage items", cleared_count)
# Click logout button if requested
logout_clicked = False
if click_logout:
await logout(page)
logout_clicked = True
return ActionResult(
details={
"status": "logged_out",
"local_storage_cleared": cleared_count,
"logout_clicked": logout_clicked,
"url": page.url,
}
)
__all__ = ["LogoutAction"]

View File

@@ -22,7 +22,7 @@ from guide.app.auth import (
from guide.app.browser.helpers import PageHelpers
from guide.app.browser.types import PageLike
from guide.app.core.config import AppSettings
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.models.domain import ActionContext, ActionParamInfo, ActionResult
from guide.app.models.personas import DemoPersona, PersonaResolver
from guide.app.strings.selectors.login import LoginSelectors
@@ -291,6 +291,7 @@ class RequestOtpAction(DemoAction):
email: Email address to request OTP for (required)
callback_base_url: Base URL for callback endpoint (optional, defaults to localhost)
force_fresh_login: Skip session restoration (optional, default: false)
switch_user: Logout current user first before login (optional, default: false)
Requires:
- RAINDROP_DEMO_N8N_WEBHOOK_URL environment variable
@@ -302,6 +303,85 @@ class RequestOtpAction(DemoAction):
)
category: ClassVar[str] = "auth"
long_description: ClassVar[str | None] = """
Authenticates a user via Auth0 passwordless (magic link) flow with n8n webhook integration.
**Flow:**
1. Checks for cached session - if valid and not expired, restores it instantly
2. Navigates to login page and enters email address
3. Triggers OTP email via Auth0
4. Sends webhook to n8n with correlation_id for email retrieval
5. Waits for n8n to find the magic link and call back
6. Completes login using the magic link URL
7. Saves session to disk for future requests (avoids re-authentication)
**Session Caching:**
Sessions are cached to `.sessions/{persona_id}.session.json` and reused until expired.
Use `force_fresh_login: true` to bypass cache, or `switch_user: true` to logout first.
**n8n Integration:**
Requires n8n workflow listening at RAINDROP_DEMO_N8N_WEBHOOK_URL that:
- Receives webhook with `correlation_id`, `email`, and `callback_url`
- Finds the OTP email in inbox
- Extracts magic link URL
- POSTs to callback_url with `{"correlation_id": "...", "otp_url": "..."}`
""".strip()
params_info: ClassVar[list[ActionParamInfo]] = [
ActionParamInfo(
name="email",
description="Email address to authenticate. Must be a valid Auth0 user.",
required=True,
example="travis@raindrop.com",
),
ActionParamInfo(
name="switch_user",
description="Logout current user before login. Use when switching between personas on the same browser.",
required=False,
default="false",
example="true",
),
ActionParamInfo(
name="force_fresh_login",
description="Skip session cache and force fresh OTP authentication.",
required=False,
default="false",
example="true",
),
ActionParamInfo(
name="callback_base_url",
description="Base URL for n8n callback endpoint. Defaults to RAINDROP_DEMO_CALLBACK_BASE_URL.",
required=False,
default="http://localhost:8765",
example="https://demo.example.com",
),
]
example_request: ClassVar[dict[str, object] | None] = {
"persona_id": None,
"host_id": "browserless-cdp",
"params": {
"email": "travis@raindrop.com",
"switch_user": True,
},
}
example_response: ClassVar[dict[str, object] | None] = {
"status": "success",
"action_id": "auth.request_otp",
"correlation_id": "a5edbff3-99b3-4e86-aab5-1fcbdc910171",
"result": {
"email": "travis@raindrop.com",
"status": "logged_in",
"correlation_id": "a5edbff3-99b3-4e86-aab5-1fcbdc910171",
},
}
requires: ClassVar[list[str]] = [
"RAINDROP_DEMO_N8N_WEBHOOK_URL - n8n webhook endpoint for OTP retrieval",
"n8n workflow configured to find OTP emails and callback",
]
_settings: AppSettings
_session_manager: SessionManager
_persona_resolver: PersonaResolver
@@ -345,7 +425,43 @@ class RequestOtpAction(DemoAction):
details={"provided_params": list(context.params.keys())},
)
force_fresh = context.params.get("force_fresh_login", False)
switch_user = context.params.get("switch_user", False)
# switch_user implies force_fresh_login (skip session restoration)
force_fresh = context.params.get("force_fresh_login", False) or switch_user
# If switching users, logout first to clear session state
if switch_user:
_logger.info("switch_user=True, clearing session before login")
base_url = self._settings.raindrop_base_url
helpers = PageHelpers(page)
# Navigate to app first
_ = await page.goto(base_url, wait_until="networkidle")
await helpers.wait_for_stable()
# Try to click logout button (may be in a menu)
logout_btn = page.locator('[data-test="auth-logout"]')
if await logout_btn.count() > 0:
_logger.info("Clicking logout button")
await logout_btn.click()
await helpers.wait_for_network_idle()
else:
# Logout button might be in user menu - try to find and click menu first
user_menu = page.locator('[data-test="user-menu"], [data-test="profile-menu"], button:has-text("Account")')
if await user_menu.count() > 0:
_logger.info("Opening user menu to find logout")
await user_menu.first.click()
await helpers.wait_for_network_idle()
# Now try logout button again
if await logout_btn.count() > 0:
await logout_btn.click()
await helpers.wait_for_network_idle()
# Clear localStorage to invalidate any remaining session
_ = await page.evaluate(
"(() => { const c = localStorage.length; localStorage.clear(); return c; })()"
)
_logger.info("Session cleared for user switch")
# Resolve persona from email for session management
persona: DemoPersona | None = None

View File

@@ -5,7 +5,12 @@ from typing import ClassVar, cast, override
from guide.app import errors
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionMetadata, ActionResult
from guide.app.models.domain import (
ActionContext,
ActionMetadata,
ActionParamInfo,
ActionResult,
)
class DemoAction(ABC):
@@ -13,12 +18,31 @@ class DemoAction(ABC):
Actions inherit from this class to be discoverable and executable
by the ActionRegistry.
Required class variables:
id: Unique action identifier (e.g., 'auth.request_otp')
description: Brief one-line description
category: Category for grouping (e.g., 'auth', 'intake')
Optional class variables for rich documentation:
long_description: Detailed multi-line description with usage notes
params_info: List of ActionParamInfo documenting accepted parameters
example_request: Example request payload dict
example_response: Example response payload dict
requires: List of requirements (e.g., env vars, config)
"""
id: ClassVar[str]
description: ClassVar[str]
category: ClassVar[str]
# Optional documentation fields
long_description: ClassVar[str | None] = None
params_info: ClassVar[list[ActionParamInfo]] = []
example_request: ClassVar[dict[str, object] | None] = None
example_response: ClassVar[dict[str, object] | None] = None
requires: ClassVar[list[str]] = []
@abstractmethod
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Execute the action and return a result."""
@@ -351,6 +375,19 @@ class ActionRegistry:
raise errors.ActionExecutionError(f"Unknown action '{action_id}'")
def _build_metadata(self, action: DemoAction) -> ActionMetadata:
"""Build ActionMetadata from an action instance."""
return ActionMetadata(
id=action.id,
description=action.description,
category=action.category,
long_description=getattr(action, "long_description", None),
params=getattr(action, "params_info", []),
example_request=getattr(action, "example_request", None),
example_response=getattr(action, "example_response", None),
requires=getattr(action, "requires", []),
)
def list_metadata(self) -> list[ActionMetadata]:
"""List metadata for all registered actions.
@@ -364,39 +401,21 @@ class ActionRegistry:
# Add explicit instances
for action in self._actions.values():
metadata.append(
ActionMetadata(
id=action.id,
description=action.description,
category=action.category,
)
)
metadata.append(self._build_metadata(action))
seen_ids.add(action.id)
# Add factory functions
for factory in self._factories.values():
action = factory()
if action.id not in seen_ids:
metadata.append(
ActionMetadata(
id=action.id,
description=action.description,
category=action.category,
)
)
metadata.append(self._build_metadata(action))
seen_ids.add(action.id)
# Add globally registered actions
for action_cls in get_registered_actions().values():
if action_cls.id not in seen_ids:
action = self._instantiate_with_di(action_cls)
metadata.append(
ActionMetadata(
id=action.id,
description=action.description,
category=action.category,
)
)
metadata.append(self._build_metadata(action))
seen_ids.add(action.id)
return metadata

View File

@@ -1,124 +1,76 @@
"""Email notification response playbook.
Receives n8n webhook payload and executes:
1. Session awareness check (detect if already logged in as correct user)
2. Conditional login (only if is_login=True and not already logged in)
- If no OTP URL provided, triggers OTP request flow via webhook callback
3. Conditional message response (only if is_message=True)
Receives n8n webhook payload for message notifications and executes:
1. Navigate directly to conversation URL (access_url)
2. Check if redirected to login (session invalid)
3. If redirected, authenticate via OTP and retry access_url
4. Dismiss any blocking modals
5. Parse message context for LLM reply generation
6. Submit generated reply
"""
import logging
from typing import ClassVar, override
from urllib.parse import parse_qs, urlparse
from loguru import logger
from guide.app import errors
from guide.app.actions.base import (
ActionRegistry,
ConditionalCompositeAction,
register_action,
from guide.app.actions.base import ActionRegistry, DemoAction, register_action
from guide.app.auth.session import detect_current_persona
from guide.app.browser.elements._type_guards import (
get_str_from_dict,
is_dict_str_object,
is_list_of_objects,
)
from guide.app.auth import detect_current_persona
from guide.app.browser.helpers import PageHelpers
from guide.app.browser.types import PageLike
from guide.app.core.config import AppSettings
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.models.personas import PersonaResolver
_logger = logging.getLogger(__name__)
from guide.app.strings.selectors.messaging import MessagingSelectors
from guide.app.utils.llm import ChatMessage, LLMClient
@register_action
class EmailNotificationResponsePlaybook(ConditionalCompositeAction):
"""Playbook for responding to email notifications via n8n.
Receives n8n payload and conditionally executes login and message steps.
class EmailNotificationResponsePlaybook(DemoAction):
"""Playbook for responding to message notification emails via n8n.
Request params (from n8n):
user_email: str - Email of user to authenticate as (required)
is_login: bool - Whether to perform login step (default: True)
is_message: bool - Whether to send message (default: True)
access_url: str - OTP URL for authentication (optional - triggers OTP request if missing)
message: str - Message text to send (required if is_message)
access_url: str - Conversation URL from notification email (required)
callback_base_url: str - Base URL for OTP callback (optional)
Session awareness:
- Detects if already logged in as correct user
- Skips login if session matches user_email
- If no OTP URL provided, triggers OTP request and waits for callback
Flow:
1. Navigate directly to access_url
2. Check if redirected to /login (session invalid)
3. If redirected, authenticate via OTP then retry access_url
4. Dismiss blocking modal if present
5. Parse message context for LLM reply generation
6. Submit generated reply
Browser host:
- Uses browserless-cdp (pass browser_host_id in request)
"""
id: ClassVar[str] = "playbook.email_notification_response"
description: ClassVar[str] = "Respond to email notification: login + message"
description: ClassVar[str] = "Respond to message notification: navigate + auth if needed + reply"
category: ClassVar[str] = "playbooks"
child_actions: ClassVar[tuple[str, ...]] = (
"auth.login_as_persona",
"messaging.respond",
)
_persona_resolver: PersonaResolver
_registry: ActionRegistry
_settings: AppSettings
def __init__(
self,
registry: ActionRegistry,
persona_resolver: PersonaResolver,
settings: AppSettings,
) -> None:
"""Initialize playbook with dependencies.
Args:
registry: ActionRegistry for resolving child actions.
persona_resolver: Resolver for email-based persona lookup.
settings: Application settings for n8n webhook configuration.
"""
super().__init__(registry)
self._persona_resolver = persona_resolver
"""Initialize playbook with dependencies."""
self._registry = registry
self._settings = settings
@override
async def should_execute_step(
self,
step_id: str,
context: ActionContext,
) -> bool:
"""Determine if step should execute based on n8n params.
Args:
step_id: The action ID of the step.
context: Current action context with params and shared_state.
Returns:
True to execute the step, False to skip.
"""
params = context.params
if step_id == "auth.login_as_persona":
is_login = params.get("is_login", True)
already_logged_in = context.shared_state.get("session_reused", False)
otp_flow_completed = context.shared_state.get("otp_flow_completed", False)
# Skip if already logged in OR if OTP flow handled login
return bool(is_login) and not already_logged_in and not otp_flow_completed
if step_id == "messaging.respond":
return bool(params.get("is_message", True))
return True
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Execute email notification playbook.
Args:
page: The browser page instance.
context: Action context with n8n params.
Returns:
ActionResult with combined status from all steps.
Raises:
ActionExecutionError: If required params are missing.
"""
"""Execute email notification response playbook."""
# Validate required params upfront
user_email = context.params.get("user_email")
if not user_email or not isinstance(user_email, str):
raise errors.ActionExecutionError(
@@ -126,61 +78,117 @@ class EmailNotificationResponsePlaybook(ConditionalCompositeAction):
details={"provided_params": list(context.params.keys())},
)
# Session awareness: check if already logged in as this user
current_email = await detect_current_persona(page)
if current_email and current_email.lower() == user_email.lower():
_logger.info("Already logged in as: %s", user_email)
context.shared_state["logged_in_email"] = current_email
context.shared_state["session_reused"] = True
else:
# Not logged in - check if we need OTP request flow
access_url = context.params.get("access_url")
is_login = context.params.get("is_login", True)
access_url = context.params.get("access_url")
if not access_url or not isinstance(access_url, str):
raise errors.ActionExecutionError(
"'access_url' param is required",
details={"provided_params": list(context.params.keys())},
)
if is_login and not access_url:
# No OTP URL provided - trigger OTP request flow
_logger.info(
"No OTP URL provided, triggering OTP request flow for: %s",
user_email,
helpers = PageHelpers(page)
auth_was_required = False
# Step 1: Navigate directly to access_url
logger.info("Step 1: Navigating directly to: {}", access_url)
_ = await page.goto(access_url, wait_until="networkidle")
await helpers.wait_for_stable()
# Step 2: Check if we need to authenticate
# - Login page shown (no session)
# - Wrong user logged in (need to switch)
current_url = page.url
title_result = await page.evaluate("document.title")
page_title = str(title_result) if title_result else ""
on_login_page = "/login" in current_url or "login" in page_title.lower()
# Check current logged-in user via localStorage
current_user = await detect_current_persona(page)
wrong_user = bool(current_user and current_user.lower() != user_email.lower())
needs_auth = on_login_page or wrong_user
if needs_auth:
if wrong_user:
logger.info(
"Step 2: Wrong user logged in (current={}, need={})",
current_user, user_email
)
otp_result = await self._trigger_otp_flow(page, context, user_email)
if otp_result.status == "error":
return otp_result
context.shared_state["otp_flow_completed"] = True
else:
logger.info("Step 2: Login required (title={})", page_title)
# Forward params to login action (email-based resolution)
context.params["email"] = user_email
if access_url := context.params.get("access_url"):
context.params["url"] = access_url
# Step 3: Authenticate via OTP (handles logout if needed)
logger.info("Step 3: Authenticating as {}", user_email)
auth_result = await self._authenticate(
page, context, user_email, switch_user=wrong_user
)
if auth_result.status == "error":
return auth_result
return await super().run(page, context)
auth_was_required = True
async def _trigger_otp_flow(
# Retry navigation to access_url after auth
logger.info("Step 3b: Retrying navigation to: {}", access_url)
_ = await page.goto(access_url, wait_until="networkidle")
await helpers.wait_for_stable()
else:
logger.info("Step 2: Correct user already logged in ({})", current_user)
# Step 4: Dismiss blocking modal if present
logger.info("Step 4: Checking for blocking modal")
modal_dismissed = await self._dismiss_modal_if_present(page, helpers)
# Step 5: Ensure chat panel is visible
logger.info("Step 5: Ensuring chat panel is visible")
chat_expanded = await self._ensure_chat_visible(page, helpers)
# Step 6: Parse message context for LLM reply generation
logger.info("Step 6: Parsing message context")
message_context = await self._parse_message_context(page)
# Step 7: Generate and submit reply
logger.info("Step 7: Generating and submitting reply")
reply_result = await self._generate_and_send_reply(
page, helpers, message_context, access_url
)
if reply_result.status == "error":
return reply_result
# Merge reply details into final result
reply_details = reply_result.details or {}
message_verified = reply_details.get("message_verified", False)
return ActionResult(
details={
"user_email": user_email,
"access_url": access_url,
"auth_was_required": auth_was_required,
"modal_dismissed": modal_dismissed,
"chat_expanded": chat_expanded,
"messages_found": message_context.get("message_count", 0),
"generated_reply": reply_details.get("generated_reply"),
"text_entered": reply_details.get("text_entered"),
"message_verified": message_verified,
}
)
async def _authenticate(
self,
page: PageLike,
context: ActionContext,
email: str,
*,
switch_user: bool = False,
) -> ActionResult:
"""Trigger OTP request flow via auth.request_otp action.
Args:
page: Browser page instance.
context: Action context.
email: Email to request OTP for.
Returns:
ActionResult from OTP request action.
"""
"""Authenticate user via request_otp action."""
try:
otp_action = self.registry.get("auth.request_otp")
otp_action = self._registry.get("auth.request_otp")
except errors.ActionExecutionError:
return ActionResult(
status="error",
error="auth.request_otp action not available",
details={"hint": "Ensure auth.request_otp is registered"},
)
# Forward callback_base_url if provided
otp_context = ActionContext(
action_id="auth.request_otp",
persona_id=context.persona_id,
@@ -190,25 +198,388 @@ class EmailNotificationResponsePlaybook(ConditionalCompositeAction):
"callback_base_url": context.params.get(
"callback_base_url", self._settings.callback_base_url
),
"switch_user": switch_user,
},
)
_logger.info("Executing OTP request flow for: %s", email)
return await otp_action.run(page, otp_context)
@override
async def on_step_complete(self, step_id: str, result: ActionResult) -> None:
"""Process step results and update shared state.
async def _dismiss_modal_if_present(
self,
page: PageLike,
helpers: PageHelpers,
) -> bool:
"""Dismiss blocking modals if present after navigation."""
dismissed = False
# Check for board item editor dialog first (common blocker)
board_dialog = page.locator(MessagingSelectors.BOARD_ITEM_DIALOG)
if await board_dialog.count() > 0:
logger.info("Board item editor dialog detected, attempting to close")
# Click the close button
close_btn = page.locator(MessagingSelectors.BOARD_ITEM_DIALOG_CLOSE)
try:
if await close_btn.count() > 0:
await close_btn.click()
await helpers.wait_for_stable()
# Wait for dialog to close
try:
_ = await page.wait_for_selector(
MessagingSelectors.BOARD_ITEM_DIALOG,
state="hidden",
timeout=3000,
)
dismissed = True
logger.info("Board item editor dialog closed via close button")
except Exception:
logger.warning("Dialog still visible after clicking close")
else:
logger.warning("Close button not found for board item dialog")
except Exception as exc:
logger.warning("Failed to close board item dialog: {}", exc)
# Check for modal wrapper
modal = page.locator(MessagingSelectors.MODAL_WRAPPER)
modal_count = await modal.count()
if modal_count > 0:
close_button = page.locator(MessagingSelectors.MODAL_CLOSE_BUTTON)
close_count = await close_button.count()
if close_count > 0:
logger.info("Dismissing blocking modal")
await close_button.first.click()
_ = await page.wait_for_selector(
MessagingSelectors.MODAL_WRAPPER,
state="hidden",
timeout=5000,
)
await helpers.wait_for_stable()
dismissed = True
return dismissed
async def _ensure_chat_visible(
self,
page: PageLike,
helpers: PageHelpers,
) -> bool:
"""Ensure chat panel is visible, expanding flyout if needed."""
chat_container = page.locator(MessagingSelectors.CHAT_MESSAGES_CONTAINER)
container_count = await chat_container.count()
if container_count > 0:
logger.info("Chat panel already visible")
return False
# Click flyout button to expand
logger.info("Chat not visible, clicking flyout button")
flyout_button = page.locator(MessagingSelectors.CHAT_FLYOUT_BUTTON)
flyout_count = await flyout_button.count()
if flyout_count == 0:
logger.warning("Chat flyout button not found")
return False
await flyout_button.first.click()
await helpers.wait_for_stable()
# Switch to conversations tab if present
conversations_tab = page.locator(MessagingSelectors.CHAT_CONVERSATIONS_TAB)
tab_count = await conversations_tab.count()
if tab_count > 0:
logger.info("Switching to conversations tab")
await conversations_tab.first.click()
await helpers.wait_for_stable()
return True
async def _parse_message_context(self, page: PageLike) -> dict[str, object]:
"""Parse the message panel to extract conversation context.
Extracts the previous message(s) for LLM context.
Uses the virtualized list structure:
#chat-messages-container > div > div > div:nth-child(n) > div
"""
# Check if chat panel is visible
chat_container = page.locator(MessagingSelectors.CHAT_MESSAGES_CONTAINER)
container_visible = await chat_container.count() > 0
if not container_visible:
logger.warning("Chat messages container not visible")
return {"error": "chat_not_visible", "messages": []}
# Wait for messages to render in the virtualized list
message_selector = MessagingSelectors.CHAT_MESSAGE_ITEM
message_locator = page.locator(message_selector)
try:
# Give virtualized list time to render (up to 5s)
await message_locator.first.wait_for(state="visible", timeout=5000)
logger.info("Messages rendered in chat container")
except Exception:
logger.warning("No messages rendered after waiting")
# Extract messages from chat container
messages: list[dict[str, str]] = []
try:
message_elements = await page.evaluate(
"""
(selector) => {
const container = document.querySelector('#chat-messages-container');
if (!container) return { error: 'container_not_found' };
const messageNodes = document.querySelectorAll(selector);
const messages = [];
messageNodes.forEach((node) => {
const text = node.textContent?.trim() || '';
if (text) {
messages.push({
sender: 'unknown',
text: text.slice(0, 500),
});
}
});
return {
messageCount: messageNodes.length,
messages: messages.slice(-10),
};
}
""",
message_selector,
)
# Parse the response
if is_dict_str_object(message_elements):
raw_messages = message_elements.get("messages")
if is_list_of_objects(raw_messages):
for elem in raw_messages:
if is_dict_str_object(elem):
text = get_str_from_dict(elem, "text", "")
if text:
messages.append({
"sender": get_str_from_dict(elem, "sender", "unknown"),
"text": text,
})
return {
"chat_visible": container_visible,
"messages": messages,
"message_count": len(messages),
}
except Exception as exc:
logger.warning("Failed to extract messages: {}", exc)
return {
"chat_visible": container_visible,
"messages": messages,
"message_count": len(messages),
}
async def _generate_and_send_reply(
self,
page: PageLike,
helpers: PageHelpers,
message_context: dict[str, object],
access_url: str,
) -> ActionResult:
"""Generate reply via LLM and send it.
Args:
step_id: The completed step action ID.
result: The step's ActionResult.
"""
assert self.context is not None
page: Browser page.
helpers: PageHelpers instance.
message_context: Parsed message context from chat panel.
access_url: Original access URL (contains board ID).
if step_id == "auth.login_as_persona" and result.details:
if status := result.details.get("status"):
self.context.shared_state["login_status"] = status
Returns:
ActionResult with reply details.
"""
# Extract messages from context
raw_messages = message_context.get("messages")
messages: list[dict[str, str]] = []
if is_list_of_objects(raw_messages):
for m in raw_messages:
if is_dict_str_object(m):
messages.append({
"sender": get_str_from_dict(m, "sender", "unknown"),
"text": get_str_from_dict(m, "text", ""),
})
if not messages:
logger.warning("No messages found for context - cannot generate reply")
return ActionResult(
status="error",
error="No message context available for reply generation",
details=message_context,
)
# Extract board ID from URL for reply input selector
board_id = self._extract_board_id(access_url)
if not board_id:
return ActionResult(
status="error",
error="Could not extract board ID from access_url",
details={"access_url": access_url},
)
logger.info("Generating reply for board {} with {} messages", board_id, len(messages))
# Build conversation for LLM
llm_client = LLMClient.from_settings(self._settings)
conversation = self._build_llm_conversation(messages)
try:
llm_response = await llm_client.chat_completion(
conversation,
temperature=0.7,
max_tokens=500,
)
generated_reply = llm_response.content.strip()
logger.debug("LLM generated reply: {}", generated_reply)
except errors.LLMError as exc:
logger.exception("LLM request failed: {}", exc.message)
return ActionResult(
status="error",
error=f"LLM request failed: {exc.message}",
details={"messages_count": len(messages), **exc.details},
)
if not generated_reply:
return ActionResult(
status="error",
error="LLM returned empty reply",
details={"messages_count": len(messages)},
)
# Type reply into input field - use Playwright's fill which handles Slate editors
reply_input_selector = f"#{board_id}"
reply_input = page.locator(reply_input_selector)
text_entered = False
try:
# Click to focus the input first
await reply_input.click(timeout=5000)
# Use Playwright's fill which properly handles contenteditable
await reply_input.fill(generated_reply)
await helpers.wait_for_stable()
# Verify the text was actually entered
actual_text = await reply_input.text_content()
text_entered = bool(actual_text and generated_reply[:20] in actual_text)
if not text_entered:
logger.warning(
"Text entry verification failed - expected: {}, actual: {}",
generated_reply[:50],
(actual_text or "")[:50],
)
except Exception as exc:
logger.error("Failed to type reply into {}: {}", reply_input_selector, exc)
return ActionResult(
status="error",
error=f"Failed to type reply: {exc}",
details={
"selector": reply_input_selector,
"generated_reply": generated_reply,
},
)
# Click the send button
send_button = page.locator(MessagingSelectors.CHAT_SEND_BUTTON)
try:
await send_button.click(timeout=5000)
await helpers.wait_for_stable()
except Exception as exc:
logger.error("Send button click failed: {}", exc)
return ActionResult(
status="error",
error=f"Send button click failed: {exc}",
details={
"selector": MessagingSelectors.CHAT_SEND_BUTTON,
"generated_reply": generated_reply,
},
)
# Verify message was sent by checking if it appears in the message container
await page.wait_for_timeout(self._settings.timeouts.combobox_listbox)
verify_result = await page.evaluate(
"""(expectedText) => {
const messages = document.querySelectorAll("[data-cy^='chat-message-']");
const lastMessage = messages[messages.length - 1];
if (!lastMessage) return { verified: false };
const text = lastMessage.textContent || '';
return { verified: text.includes(expectedText.slice(0, 30)) };
}""",
generated_reply,
)
message_verified = (
is_dict_str_object(verify_result) and verify_result.get("verified") is True
)
if not message_verified:
logger.warning("Message verification failed after send")
return ActionResult(
details={
"board_id": board_id,
"messages_count": len(messages),
"generated_reply": generated_reply,
"text_entered": text_entered,
"message_verified": message_verified,
}
)
def _extract_board_id(self, url: str) -> str | None:
"""Extract board ID from access URL.
The board ID is in the 'id' or 'form_item' query parameter.
Example: ...?form_item=PL01-800&id=PL01-800... -> 'PL01-800'
"""
try:
parsed = urlparse(url)
params = parse_qs(parsed.query)
# Try 'id' first, then 'form_item'
if "id" in params and params["id"]:
return params["id"][0]
if "form_item" in params and params["form_item"]:
return params["form_item"][0]
except Exception as exc:
logger.warning("Failed to parse URL for board ID: {}", exc)
return None
def _build_llm_conversation(
self, messages: list[dict[str, str]]
) -> list[ChatMessage]:
"""Build LLM conversation from chat messages.
Creates a system prompt and includes recent messages as context.
"""
system_prompt = """You are a helpful assistant responding to workplace messages.
Generate a brief, professional reply to the most recent message in the conversation.
Keep your response concise (1-3 sentences) and contextually appropriate.
Do not include any greeting like "Hi" or sign-off - just the reply content."""
# Format messages as conversation context
context_parts: list[str] = []
for msg in messages:
sender = msg.get("sender", "Unknown")
text = msg.get("text", "")
context_parts.append(f"{sender}: {text}")
user_message = f"""Here is the recent conversation:
{chr(10).join(context_parts)}
Please generate a brief, professional reply to the most recent message."""
return [
ChatMessage(role="system", content=system_prompt),
ChatMessage(role="user", content=user_message),
]
__all__ = ["EmailNotificationResponsePlaybook"]

View File

@@ -6,6 +6,7 @@ from guide.app.api.routes import (
config,
diagnostics,
health,
keepalive,
otp_callback,
sessions,
)
@@ -16,6 +17,7 @@ router.include_router(actions.router)
router.include_router(boards.router)
router.include_router(config.router)
router.include_router(diagnostics.router)
router.include_router(keepalive.router)
router.include_router(sessions.router)
router.include_router(otp_callback.router)

View File

@@ -1,3 +1,19 @@
from guide.app.api.routes import actions, boards, config, diagnostics, health, sessions
from guide.app.api.routes import (
actions,
boards,
config,
diagnostics,
health,
keepalive,
sessions,
)
__all__ = ["actions", "boards", "config", "diagnostics", "health", "sessions"]
__all__ = [
"actions",
"boards",
"config",
"diagnostics",
"health",
"keepalive",
"sessions",
]

View File

@@ -0,0 +1,58 @@
"""Keep-alive status endpoint for monitoring browserless session health."""
from typing import Annotated, Protocol, cast
from fastapi import APIRouter, Depends, FastAPI, Request
from guide.app.browser.keepalive import KeepAliveService
from guide.app.models.domain import (
KeepAliveHostStatusDTO,
KeepAliveStatusResponse,
)
class AppStateProtocol(Protocol):
"""Protocol for app state with keep-alive service."""
keepalive_service: KeepAliveService
def _keepalive_service(request: Request) -> KeepAliveService:
"""Extract keep-alive service from app state."""
app = cast(FastAPI, request.app)
state = cast(AppStateProtocol, cast(object, app.state))
return state.keepalive_service
KeepAliveDep = Annotated[KeepAliveService, Depends(_keepalive_service)]
router = APIRouter(prefix="/keep-alive", tags=["keep-alive"])
@router.get("/status", response_model=KeepAliveStatusResponse)
async def get_keepalive_status(service: KeepAliveDep) -> KeepAliveStatusResponse:
"""Get keep-alive status for all browserless hosts.
Returns the current status of the keep-alive service including:
- Status of each browserless host (last ping time, connection state, errors)
- Service configuration (ping interval)
- Whether the service is currently running
"""
status = service.get_status()
hosts = [
KeepAliveHostStatusDTO(
host_id=host.host_id,
last_ping=host.last_ping.isoformat() if host.last_ping else None,
is_connected=host.is_connected,
last_error=host.last_error,
)
for host in status.hosts
]
return KeepAliveStatusResponse(
hosts=hosts,
interval_seconds=status.interval_seconds,
is_running=status.is_running,
started_at=status.started_at.isoformat() if status.started_at else None,
)

View File

@@ -47,13 +47,15 @@ class OtpRequestData:
def to_json(self) -> str:
"""Serialize to JSON string."""
return json.dumps({
"correlation_id": self.correlation_id,
"email": self.email,
"created_at": self.created_at,
"otp_url": self.otp_url,
"error": self.error,
})
return json.dumps(
{
"correlation_id": self.correlation_id,
"email": self.email,
"created_at": self.created_at,
"otp_url": self.otp_url,
"error": self.error,
}
)
@classmethod
def from_json(cls, data: str) -> "OtpRequestData":
@@ -120,11 +122,11 @@ class MemoryOtpStore:
_lock: asyncio.Lock
_default_timeout: float
def __init__(self, default_timeout: float = 120.0) -> None:
def __init__(self, default_timeout: float = 300.0) -> None:
"""Initialize the callback store.
Args:
default_timeout: Default timeout in seconds for waiting.
default_timeout: Default timeout in seconds for waiting (5 min default).
"""
self._requests = {}
self._lock = asyncio.Lock()
@@ -304,8 +306,8 @@ class RedisOtpStore:
def __init__(
self,
redis_url: str,
default_timeout: float = 120.0,
ttl_seconds: int = 300,
default_timeout: float = 300.0,
ttl_seconds: int = 600,
poll_interval: float = 0.5,
key_prefix: str = "otp_callback:",
) -> None:
@@ -313,8 +315,8 @@ class RedisOtpStore:
Args:
redis_url: Redis connection URL.
default_timeout: Default timeout in seconds for waiting.
ttl_seconds: TTL for Redis keys (auto-cleanup).
default_timeout: Default timeout in seconds for waiting (5 min default).
ttl_seconds: TTL for Redis keys (auto-cleanup, 10 min default).
poll_interval: Polling interval in seconds.
key_prefix: Prefix for Redis keys.
"""
@@ -356,7 +358,9 @@ class RedisOtpStore:
try:
key = self._get_key(correlation_id)
_ = await client.setex(key, self._ttl_seconds, data.to_json())
_logger.info("Registered OTP request in Redis: %s for %s", correlation_id, email)
_logger.info(
"Registered OTP request in Redis: %s for %s", correlation_id, email
)
finally:
await client.close()
@@ -390,7 +394,9 @@ class RedisOtpStore:
start_time = asyncio.get_event_loop().time()
_logger.info(
"Waiting for OTP callback (Redis): %s (timeout=%ss)", correlation_id, timeout
"Waiting for OTP callback (Redis): %s (timeout=%ss)",
correlation_id,
timeout,
)
client = await self._get_redis()
@@ -405,8 +411,9 @@ class RedisOtpStore:
raw_data = await client.get(key)
if not raw_data:
raise ValueError(
f"No pending request for correlation_id: {correlation_id}"
# Key may have expired (TTL) or was never registered
raise TimeoutError(
f"OTP request expired or not found for {correlation_id} (key may have exceeded TTL before callback arrived)"
)
data = OtpRequestData.from_json(raw_data)
@@ -423,7 +430,9 @@ class RedisOtpStore:
f"OTP callback received but no URL for {correlation_id}"
)
_logger.info("OTP callback received (Redis) for: %s", correlation_id)
_logger.info(
"OTP callback received (Redis) for: %s", correlation_id
)
return data.otp_url
# Not fulfilled yet, wait and poll again
@@ -453,7 +462,9 @@ class RedisOtpStore:
try:
raw_data = await client.get(key)
if not raw_data:
_logger.warning("No pending request in Redis for callback: %s", correlation_id)
_logger.warning(
"No pending request in Redis for callback: %s", correlation_id
)
return False
data = OtpRequestData.from_json(raw_data)

View File

@@ -325,6 +325,11 @@ class SessionManager:
and verifies the login state. Automatically invalidates expired
or invalid sessions.
Warning:
If the base URL redirects to a different origin (e.g., SSO provider),
localStorage injection will fail silently. Ensure origin_url remains
on the application domain when capturing sessions.
Args:
page: Browser page instance.
persona: Persona to restore session for.
@@ -369,6 +374,9 @@ class SessionManager:
# This triggers the app to read localStorage without clearing it like goto() would
_ = await page.evaluate("window.location.reload()")
# Wait for page to finish loading after reload
await page.wait_for_load_state("networkidle")
# Validate session is actually working
from guide.app.auth.session import detect_current_persona

View File

@@ -97,14 +97,20 @@ class Accordion:
then clicks only those (reduces N+2 queries to 2+X where X = expanded count).
"""
# Single query to get count and expanded indices
total_count, expanded_indices = await self._get_expanded_indices(buttons_selector)
total_count, expanded_indices = await self._get_expanded_indices(
buttons_selector
)
if total_count == 0:
return {"collapsed_count": 0, "total_found": 0, "failed_indices": []}
if not expanded_indices:
_logger.debug("No expanded accordions found among %d buttons", total_count)
return {"collapsed_count": 0, "total_found": total_count, "failed_indices": []}
return {
"collapsed_count": 0,
"total_found": total_count,
"failed_indices": [],
}
buttons = self.page.locator(buttons_selector)
collapsed_count = 0

View File

@@ -0,0 +1,274 @@
"""Keep-alive service for browserless CDP sessions.
This module provides a background task that periodically refreshes browserless
CDP browser sessions to prevent session timeouts. It proactively establishes
connections to browserless hosts and reloads pages every 9 minutes.
"""
import asyncio
import contextlib
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import TYPE_CHECKING
from loguru import logger
from playwright.async_api import Page
from guide.app.core.config import HostKind
if TYPE_CHECKING:
from guide.app.browser.pool import BrowserPool
DEFAULT_KEEPALIVE_INTERVAL_SECONDS = 540 # 9 minutes
@dataclass
class KeepAliveHostStatus:
"""Status of a single browserless host."""
host_id: str
last_ping: datetime | None = None
is_connected: bool = False
last_error: str | None = None
@dataclass
class KeepAliveServiceStatus:
"""Overall status of the keep-alive service."""
hosts: list[KeepAliveHostStatus] = field(default_factory=list)
interval_seconds: int = DEFAULT_KEEPALIVE_INTERVAL_SECONDS
is_running: bool = False
started_at: datetime | None = None
class KeepAliveService:
"""Background service that keeps browserless CDP sessions alive.
Periodically pings browserless CDP hosts by reloading pages to prevent
session timeouts. Proactively establishes connections if not yet connected.
"""
_pool: "BrowserPool"
_interval: int
_task: asyncio.Task[None] | None
_status: dict[str, KeepAliveHostStatus]
_started_at: datetime | None
_running: bool
def __init__(
self,
browser_pool: "BrowserPool",
interval_seconds: int = DEFAULT_KEEPALIVE_INTERVAL_SECONDS,
) -> None:
"""Initialize the keep-alive service.
Args:
browser_pool: The browser pool to manage
interval_seconds: Interval between keep-alive pings (default: 540 = 9 minutes)
"""
self._pool = browser_pool
self._interval = interval_seconds
self._task = None
self._status = {}
self._started_at = None
self._running = False
def is_browserless_host(self, host_id: str) -> bool:
"""Check if a host is a browserless CDP host.
Args:
host_id: The host identifier to check
Returns:
True if the host is a browserless CDP host
"""
if "browserless" not in host_id.lower():
return False
host_config = self._pool.settings.browser_hosts.get(host_id)
if not host_config:
return False
return host_config.kind == HostKind.CDP
def get_browserless_host_ids(self) -> list[str]:
"""Get all browserless CDP host IDs from configuration.
Returns:
List of browserless CDP host IDs
"""
return [
host_id
for host_id in self._pool.settings.browser_hosts
if self.is_browserless_host(host_id)
]
async def _allocate_and_reload(self, host_id: str) -> None:
"""Allocate a page from the pool and reload it.
Args:
host_id: The host identifier
"""
context, page, should_close = await self._pool.allocate_context_and_page(
host_id
)
if isinstance(page, Page):
_ = await page.reload(timeout=30000)
if should_close and context is not None:
await context.close()
def _update_status(
self, host_id: str, *, is_connected: bool, error: str | None = None
) -> None:
"""Update the status for a host.
Args:
host_id: The host identifier
is_connected: Whether the host is connected
error: Error message if any
"""
self._status[host_id] = KeepAliveHostStatus(
host_id=host_id,
last_ping=datetime.now(UTC),
is_connected=is_connected,
last_error=error,
)
async def ping_host(self, host_id: str) -> None:
"""Refresh a single browserless host by reloading the page.
Proactively establishes connection if not yet connected.
Args:
host_id: The host identifier to ping
"""
logger.debug("[KEEPALIVE] Pinging host '{}'", host_id)
try:
instance = self._pool.get_instance(host_id)
# Proactively establish connection if not yet connected
if instance is None or instance.browser is None:
logger.info(
"[KEEPALIVE] Host '{}' not connected, establishing connection",
host_id,
)
await self._allocate_and_reload(host_id)
self._update_status(host_id, is_connected=True)
logger.info("[KEEPALIVE] Host '{}' connected and pinged", host_id)
return
# Reconnect if browser connection is stale
if not instance.browser.is_connected():
logger.warning(
"[KEEPALIVE] Host '{}' disconnected, reconnecting", host_id
)
await self._allocate_and_reload(host_id)
self._update_status(host_id, is_connected=True)
logger.info("[KEEPALIVE] Host '{}' reconnected and pinged", host_id)
return
# Reload existing cached page or allocate new one
cached_page = self._pool.get_cached_cdp_page(host_id)
if cached_page is not None:
_ = await cached_page.reload(timeout=30000)
logger.debug("[KEEPALIVE] Reloaded cached page for host '{}'", host_id)
else:
await self._allocate_and_reload(host_id)
self._update_status(host_id, is_connected=True)
except Exception as exc:
error_msg = str(exc)
logger.error("[KEEPALIVE] Failed to ping host '{}': {}", host_id, error_msg)
self._update_status(host_id, is_connected=False, error=error_msg)
async def ping_all(self) -> None:
"""Ping all browserless CDP hosts."""
host_ids = self.get_browserless_host_ids()
if not host_ids:
logger.debug("[KEEPALIVE] No browserless hosts configured")
return
logger.info("[KEEPALIVE] Pinging {} browserless hosts", len(host_ids))
for host_id in host_ids:
await self.ping_host(host_id)
async def _run_loop(self) -> None:
"""Background loop that runs keep-alive pings at the configured interval."""
logger.info(
"[KEEPALIVE] Starting keep-alive loop (interval: {}s)", self._interval
)
# Initial ping on startup
await self.ping_all()
while True:
await asyncio.sleep(self._interval)
await self.ping_all()
def start(self) -> None:
"""Start the background keep-alive task."""
if self._task is not None and not self._task.done():
logger.warning("[KEEPALIVE] Already running")
return
self._running = True
self._started_at = datetime.now(UTC)
self._task = asyncio.create_task(self._run_loop())
logger.info("[KEEPALIVE] Service started")
async def stop(self) -> None:
"""Stop and cleanup the background keep-alive task."""
self._running = False
if self._task is None:
return
_ = self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
logger.info("[KEEPALIVE] Service stopped")
def get_status(self) -> KeepAliveServiceStatus:
"""Get the current status of the keep-alive service.
Returns:
Status including all tracked hosts and service state
"""
host_ids = self.get_browserless_host_ids()
# Ensure all browserless hosts have status entries
hosts: list[KeepAliveHostStatus] = []
for host_id in host_ids:
if host_id in self._status:
hosts.append(self._status[host_id])
else:
hosts.append(
KeepAliveHostStatus(
host_id=host_id,
last_ping=None,
is_connected=False,
last_error=None,
)
)
return KeepAliveServiceStatus(
hosts=hosts,
interval_seconds=self._interval,
is_running=self._running,
started_at=self._started_at,
)
__all__ = [
"KeepAliveService",
"KeepAliveHostStatus",
"KeepAliveServiceStatus",
"DEFAULT_KEEPALIVE_INTERVAL_SECONDS",
]

View File

@@ -238,6 +238,11 @@ class BrowserInstance:
)
return self.browser
@property
def cached_cdp_page(self) -> Page | None:
"""Get the cached CDP page if available."""
return self._cdp_page
async def close(self) -> None:
"""Close the browser connection."""
with contextlib.suppress(Exception):
@@ -384,9 +389,22 @@ class BrowserPool:
await self._evict_instance(resolved_id)
instance = await self._create_instance(resolved_id, host_config)
self._instances[resolved_id] = instance
return await instance.allocate_context_and_page(
storage_state=storage_state
)
try:
return await instance.allocate_context_and_page(
storage_state=storage_state
)
except (TargetClosedError, errors.BrowserConnectionError) as retry_exc:
# Second failure - evict again to prevent lockout on future requests
await self._evict_instance(resolved_id)
raise errors.BrowserConnectionError(
f"Host '{resolved_id}' unavailable after reconnection attempt",
details={
"host_id": resolved_id,
"host_kind": host_config.kind.value,
"original_error": str(exc),
"retry_error": str(retry_exc),
},
) from retry_exc
async def _evict_instance(self, host_id: str) -> None:
"""Evict and close a stale browser instance."""
@@ -523,5 +541,30 @@ class BrowserPool:
return self._playwright.webkit
raise errors.ConfigError(f"Unsupported browser type '{browser}'")
def get_instance(self, host_id: str) -> BrowserInstance | None:
"""Get the browser instance for a host if it exists.
Args:
host_id: The host identifier
Returns:
The BrowserInstance if it exists, None otherwise
"""
return self._instances.get(host_id)
def get_cached_cdp_page(self, host_id: str) -> Page | None:
"""Get the cached CDP page for a host if available.
Args:
host_id: The host identifier
Returns:
The cached Page if available, None otherwise
"""
instance = self._instances.get(host_id)
if instance is None:
return None
return instance.cached_cdp_page
__all__ = ["BrowserPool", "BrowserInstance"]

View File

@@ -175,9 +175,9 @@ class AppSettings(BaseSettings):
# n8n Integration (use RAINDROP_DEMO_N8N_* env vars)
n8n_webhook_url: str | None = Field(default=None)
"""Webhook URL for n8n OTP request notifications."""
n8n_otp_callback_timeout: int = Field(default=120)
"""Timeout in seconds waiting for OTP callback from n8n."""
n8n_otp_email_delay: float = Field(default=5.0)
n8n_otp_callback_timeout: int = Field(default=300)
"""Timeout in seconds waiting for OTP callback from n8n (5 min default)."""
n8n_otp_email_delay: float = Field(default=15.0)
"""Delay in seconds after triggering OTP email before notifying n8n."""
callback_base_url: str = Field(default="http://localhost:8765")
"""Base URL for callback endpoints (used by n8n to call back)."""
@@ -200,6 +200,18 @@ class AppSettings(BaseSettings):
demonstration_instance_id: int = Field(default=107)
"""Instance ID for the Demonstration board. Override for different environments."""
# LLM Configuration (OpenAI-compatible endpoint)
llm_base_url: str = Field(default="http://bifrost.lab/v1")
"""Base URL for OpenAI-compatible LLM endpoint."""
llm_model: str = Field(
default="fireworks_ai/accounts/fireworks/models/qwen3-235b-a22b-instruct-2507"
)
"""Model identifier for LLM requests."""
llm_api_key: str | None = Field(default=None)
"""API key for LLM endpoint (optional if endpoint doesn't require auth)."""
llm_timeout_s: float = Field(default=60.0)
"""Timeout in seconds for LLM requests."""
def _load_yaml_file(path: Path) -> dict[str, object]:
"""Load YAML file, handling missing PyYAML gracefully.

View File

@@ -46,7 +46,9 @@ class InterceptHandler(logging.Handler):
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
logger.opt(depth=depth, exception=record.exc_info).log(
level, record.getMessage()
)
def setup_logger() -> None:
@@ -77,7 +79,7 @@ def setup_logger() -> None:
def inject_context(record: object) -> bool:
"""Filter function that injects context variables into record before serialization.
Args:
record: Loguru Record object with dict-like access to attributes.
"""
@@ -86,11 +88,11 @@ def setup_logger() -> None:
extra_attr = getattr(record, "extra", None)
if extra_attr is None:
return True
# Ensure we have a dict before mutating it
if not isinstance(extra_attr, dict):
return True
# Directly mutate the extra dict that loguru provides
# The isinstance check ensures it's a dict, so we can safely assign to it
if correlation_id := _ContextVars.correlation_id.get():

View File

@@ -6,6 +6,7 @@ from guide.app.errors.exceptions import (
GraphQLOperationError,
GraphQLTransportError,
GuideError,
LLMError,
MfaError,
PersonaError,
)
@@ -21,6 +22,7 @@ __all__ = [
"ActionExecutionError",
"GraphQLTransportError",
"GraphQLOperationError",
"LLMError",
"guide_error_handler",
"unhandled_error_handler",
]

View File

@@ -43,3 +43,7 @@ class GraphQLOperationError(GuideError):
class DoclingExtractionError(GuideError):
code: str = "DOCLING_EXTRACTION_FAILED"
class LLMError(GuideError):
code: str = "LLM_ERROR"

View File

@@ -10,6 +10,7 @@ from guide.app.actions.registry import default_registry
from guide.app.auth import SessionManager, SessionStorage
from guide.app.auth.otp_callback import configure_otp_store
from guide.app.browser.client import BrowserClient
from guide.app.browser.keepalive import KeepAliveService
from guide.app.browser.pool import BrowserPool
from guide.app.core.config import AppSettings, load_settings
from guide.app.core.logging import setup_logger
@@ -54,6 +55,9 @@ def create_app() -> FastAPI:
browser_pool = BrowserPool(settings)
browser_client = BrowserClient(browser_pool)
# Create keep-alive service for browserless sessions (9 minute interval)
keepalive_service = KeepAliveService(browser_pool)
# Create GraphQL client with connection pooling
graphql_client = GraphQLClient(settings)
@@ -62,6 +66,7 @@ def create_app() -> FastAPI:
app.state.action_registry = registry
app.state.browser_client = browser_client
app.state.browser_pool = browser_pool
app.state.keepalive_service = keepalive_service
app.state.graphql_client = graphql_client
app.state.persona_store = persona_store
app.state.board_store = board_store
@@ -73,16 +78,20 @@ def create_app() -> FastAPI:
# Startup/shutdown lifecycle using modern lifespan context manager
@contextlib.asynccontextmanager
async def lifespan(_app: FastAPI):
"""Manage browser pool and GraphQL client lifecycle."""
"""Manage browser pool, keep-alive service, and GraphQL client lifecycle."""
try:
await browser_pool.initialize()
# Start keep-alive service after pool is initialized
keepalive_service.start()
except Exception:
# Ensure GraphQL client is closed if pool init fails
# Ensure GraphQL client is closed if init fails
await graphql_client.aclose()
raise
try:
yield
finally:
# Stop keep-alive service before closing pool
await keepalive_service.stop()
await graphql_client.aclose()
await browser_pool.close()

View File

@@ -2,6 +2,7 @@ from guide.app.models.domain.models import (
ActionContext,
ActionEnvelope,
ActionMetadata,
ActionParamInfo,
ActionRequest,
ActionResponse,
ActionResult,
@@ -9,12 +10,15 @@ from guide.app.models.domain.models import (
BrowserHostDTO,
BrowserHostsResponse,
DebugInfo,
KeepAliveHostStatusDTO,
KeepAliveStatusResponse,
)
__all__ = [
"ActionContext",
"ActionEnvelope",
"ActionMetadata",
"ActionParamInfo",
"ActionRequest",
"ActionResponse",
"ActionResult",
@@ -22,4 +26,6 @@ __all__ = [
"BrowserHostDTO",
"BrowserHostsResponse",
"DebugInfo",
"KeepAliveHostStatusDTO",
"KeepAliveStatusResponse",
]

View File

@@ -60,12 +60,40 @@ class ActionResult(BaseModel):
error: str | None = None
class ActionParamInfo(BaseModel):
"""Documentation for a single action parameter."""
name: str
"""Parameter name as used in params dict."""
description: str
"""Human-readable description of the parameter."""
required: bool = False
"""Whether this parameter is required."""
default: str | None = None
"""Default value if not provided (as string for display)."""
example: str | None = None
"""Example value for documentation."""
class ActionMetadata(BaseModel):
"""Metadata about an action for discovery."""
"""Metadata about an action for discovery and documentation."""
id: str
"""Unique action identifier (e.g., 'auth.request_otp')."""
description: str
"""Brief one-line description of the action."""
category: str
"""Action category for grouping (e.g., 'auth', 'intake', 'sourcing')."""
long_description: str | None = None
"""Detailed multi-line description with usage notes."""
params: list[ActionParamInfo] = Field(default_factory=list)
"""Documentation for accepted parameters."""
example_request: dict[str, object] | None = None
"""Example request payload for documentation."""
example_response: dict[str, object] | None = None
"""Example successful response for documentation."""
requires: list[str] = Field(default_factory=list)
"""List of requirements (e.g., 'RAINDROP_DEMO_N8N_WEBHOOK_URL')."""
class ActionResponse(BaseModel):
@@ -117,3 +145,29 @@ class BrowserHostsResponse(BaseModel):
default_browser_host_id: str
browser_hosts: dict[str, BrowserHostDTO]
class KeepAliveHostStatusDTO(BaseModel):
"""Status of a single browserless host for API response."""
host_id: str
"""The host identifier."""
last_ping: str | None = None
"""ISO 8601 timestamp of last successful ping, or None if never pinged."""
is_connected: bool = False
"""Whether the host is currently connected."""
last_error: str | None = None
"""Error message from last failed ping attempt, or None if last ping succeeded."""
class KeepAliveStatusResponse(BaseModel):
"""Response model for keep-alive status endpoint."""
hosts: list[KeepAliveHostStatusDTO]
"""Status of all browserless hosts."""
interval_seconds: int
"""Interval between keep-alive pings in seconds."""
is_running: bool
"""Whether the keep-alive service is currently running."""
started_at: str | None = None
"""ISO 8601 timestamp when the service was started, or None if not running."""

View File

@@ -133,32 +133,39 @@ mutation UnarchiveEntity(${config.key_field}: {config.key_type.value}!) {{
"""
async def archive_entity(
async def _modify_archive_state(
graphql_url: str,
bearer_token: str,
entity_type: EntityType,
key_value: str | int,
*,
archive: bool,
) -> ArchiveResult:
"""Archive any entity by type and key.
"""Internal implementation for archive/unarchive operations.
Args:
graphql_url: GraphQL endpoint URL.
bearer_token: Bearer token for authentication.
entity_type: Type of entity to archive.
entity_type: Type of entity to modify.
key_value: Primary key value (uuid string or int id).
archive: True to archive, False to unarchive.
Returns:
ArchiveResult with archive confirmation.
ArchiveResult with operation confirmation.
Raises:
ValueError: If entity not found or archive fails.
ValueError: If entity not found or operation fails.
KeyError: If entity type is not configured.
"""
config = ENTITY_CONFIGS.get(entity_type)
if not config:
raise KeyError(f"Unknown entity type: {entity_type}")
mutation = _build_archive_mutation(config)
mutation = (
_build_archive_mutation(config)
if archive
else _build_unarchive_mutation(config)
)
variables = {config.key_field: key_value}
async with httpx.AsyncClient() as client:
@@ -208,6 +215,32 @@ async def archive_entity(
)
async def archive_entity(
graphql_url: str,
bearer_token: str,
entity_type: EntityType,
key_value: str | int,
) -> ArchiveResult:
"""Archive any entity by type and key.
Args:
graphql_url: GraphQL endpoint URL.
bearer_token: Bearer token for authentication.
entity_type: Type of entity to archive.
key_value: Primary key value (uuid string or int id).
Returns:
ArchiveResult with archive confirmation.
Raises:
ValueError: If entity not found or archive fails.
KeyError: If entity type is not configured.
"""
return await _modify_archive_state(
graphql_url, bearer_token, entity_type, key_value, archive=True
)
async def unarchive_entity(
graphql_url: str,
bearer_token: str,
@@ -229,57 +262,8 @@ async def unarchive_entity(
ValueError: If entity not found or unarchive fails.
KeyError: If entity type is not configured.
"""
config = ENTITY_CONFIGS.get(entity_type)
if not config:
raise KeyError(f"Unknown entity type: {entity_type}")
mutation = _build_unarchive_mutation(config)
variables = {config.key_field: key_value}
async with httpx.AsyncClient() as client:
resp = await client.post(
graphql_url,
json={"query": mutation, "variables": variables},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {bearer_token}",
},
timeout=30.0,
)
response_data = cast(dict[str, object], resp.json())
# Check for errors
if errors := response_data.get("errors"):
error_list = cast(list[dict[str, object]], errors)
first_error = error_list[0] if error_list else {}
raise ValueError(
f"GraphQL error: {first_error.get('message', 'Unknown error')}"
)
# Extract result
data_section = response_data.get("data")
if not isinstance(data_section, dict):
raise ValueError("Unexpected response format: missing data section")
update_key = f"update_{config.table_name}"
update_result = cast(dict[str, object], data_section).get(update_key)
if not isinstance(update_result, dict):
raise ValueError(f"Unexpected response format: missing {update_key}")
returning = cast(dict[str, object], update_result).get("returning")
if not isinstance(returning, list):
raise ValueError(f"Entity with {config.key_field}={key_value} not found")
returning_list = cast(list[dict[str, object]], returning)
if len(returning_list) == 0:
raise ValueError(f"Entity with {config.key_field}={key_value} not found")
item = returning_list[0]
return ArchiveResult(
entity_type=entity_type,
key_field=config.key_field,
key_value=key_value,
is_archived=bool(item.get("is_archived", False)),
data=item,
return await _modify_archive_state(
graphql_url, bearer_token, entity_type, key_value, archive=False
)

View File

@@ -31,10 +31,26 @@ class MessagingSelectors:
)
"""Close button to dismiss blocking modal."""
# Board item editor dialog (may block chat input)
BOARD_ITEM_DIALOG: ClassVar[str] = "#dialog_boardItemEditor"
"""Board item editor dialog that may intercept chat interactions."""
BOARD_ITEM_DIALOG_CLOSE: ClassVar[str] = "#page-header-container-close-button > button"
"""Close button for board item editor dialog."""
# Chat panel elements
CHAT_MESSAGES_CONTAINER: ClassVar[str] = 'xpath=//*[@id="chat-messages-container"]'
"""Chat messages container - if visible, chat panel is already open."""
CHAT_MESSAGE_ITEM: ClassVar[str] = "[data-cy^='chat-message-']"
"""Individual message items (data-cy='chat-message-0', chat-message-1, etc.)."""
CHAT_REPLY_CONTAINER: ClassVar[str] = "#rdslate-editor-container"
"""Parent container for the reply input editor."""
CHAT_SEND_BUTTON: ClassVar[str] = "#send-button"
"""Send button to submit chat message."""
CHAT_FLYOUT_BUTTON: ClassVar[str] = "xpath=/html/body/div/div/button"
"""Button to expand the chat flyout panel."""

196
src/guide/app/utils/llm.py Normal file
View File

@@ -0,0 +1,196 @@
"""LLM client for OpenAI-compatible endpoints.
Provides async chat completion requests using httpx.
"""
from dataclasses import dataclass
from typing import cast
import httpx
from loguru import logger
from guide.app.core.config import AppSettings
from guide.app.errors import LLMError
@dataclass(frozen=True, slots=True)
class ChatMessage:
"""Chat message for LLM conversation."""
role: str
content: str
@dataclass(frozen=True, slots=True)
class LLMResponse:
"""Response from LLM completion request."""
content: str
model: str
finish_reason: str | None
usage: dict[str, int] | None
class LLMClient:
"""Async client for OpenAI-compatible chat completions."""
_base_url: str
_model: str
_api_key: str | None
_timeout: float
def __init__(
self,
base_url: str,
model: str,
api_key: str | None = None,
timeout: float = 60.0,
) -> None:
"""Initialize LLM client.
Args:
base_url: Base URL for the API (e.g., 'http://bifrost.lab/v1').
model: Model identifier for requests.
api_key: Optional API key for authentication.
timeout: Request timeout in seconds.
"""
self._base_url = base_url.rstrip("/")
self._model = model
self._api_key = api_key
self._timeout = timeout
@classmethod
def from_settings(cls, settings: AppSettings) -> "LLMClient":
"""Create client from application settings."""
return cls(
base_url=settings.llm_base_url,
model=settings.llm_model,
api_key=settings.llm_api_key,
timeout=settings.llm_timeout_s,
)
async def chat_completion(
self,
messages: list[ChatMessage],
*,
temperature: float = 0.7,
max_tokens: int | None = None,
) -> LLMResponse:
"""Send chat completion request.
Args:
messages: List of chat messages for conversation context.
temperature: Sampling temperature (0.0-2.0).
max_tokens: Maximum tokens in response (optional).
Returns:
LLMResponse with generated content.
Raises:
LLMError: On network, API, or response parsing errors.
"""
headers: dict[str, str] = {"Content-Type": "application/json"}
if self._api_key:
headers["Authorization"] = f"Bearer {self._api_key}"
payload: dict[str, object] = {
"model": self._model,
"messages": [{"role": m.role, "content": m.content} for m in messages],
"temperature": temperature,
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
url = f"{self._base_url}/chat/completions"
logger.debug("LLM request to {} with model {}", url, self._model)
try:
async with httpx.AsyncClient(timeout=self._timeout) as client:
response = await client.post(url, json=payload, headers=headers)
_ = response.raise_for_status()
except httpx.TimeoutException as exc:
raise LLMError(
f"LLM request timed out after {self._timeout}s",
details={"url": url, "model": self._model, "timeout": self._timeout},
) from exc
except httpx.HTTPStatusError as exc:
raise LLMError(
f"LLM API returned error: {exc.response.status_code}",
details={
"url": url,
"model": self._model,
"status_code": exc.response.status_code,
"response_text": exc.response.text[:500],
},
) from exc
except httpx.RequestError as exc:
raise LLMError(
f"LLM request failed: {exc}",
details={"url": url, "model": self._model},
) from exc
data = cast(dict[str, object], response.json())
# Extract response content from OpenAI format
choices_raw = data.get("choices", [])
if not isinstance(choices_raw, list) or not choices_raw:
raise LLMError(
"No choices in LLM response",
details={"response_keys": list(data.keys())},
)
choices_list = cast(list[object], choices_raw)
first_choice_raw = choices_list[0]
if not isinstance(first_choice_raw, dict):
raise LLMError(
"Invalid choice format in LLM response",
details={"choice_type": type(first_choice_raw).__name__},
)
first_choice = cast(dict[str, object], first_choice_raw)
message_raw = first_choice.get("message", {})
if not isinstance(message_raw, dict):
raise LLMError(
"Invalid message format in LLM response",
details={"message_type": type(message_raw).__name__},
)
message = cast(dict[str, object], message_raw)
content_raw = message.get("content", "")
content: str = str(content_raw) if content_raw else ""
# Handle potential thinking tags (some models include these)
if "<think>" in content and "</think>" in content:
# Extract content after thinking block
think_end = content.find("</think>")
if think_end != -1:
content = content[think_end + len("</think>") :].strip()
usage_data = data.get("usage")
usage: dict[str, int] | None = None
if isinstance(usage_data, dict):
usage_typed = cast(dict[str, object], usage_data)
prompt_val = usage_typed.get("prompt_tokens", 0)
completion_val = usage_typed.get("completion_tokens", 0)
total_val = usage_typed.get("total_tokens", 0)
usage = {
"prompt_tokens": int(prompt_val) if isinstance(prompt_val, int | float) else 0,
"completion_tokens": int(completion_val) if isinstance(completion_val, int | float) else 0,
"total_tokens": int(total_val) if isinstance(total_val, int | float) else 0,
}
model_raw = data.get("model", self._model)
model_str: str = str(model_raw) if model_raw else self._model
finish_reason_raw = first_choice.get("finish_reason")
finish_reason: str | None = str(finish_reason_raw) if finish_reason_raw else None
return LLMResponse(
content=content,
model=model_str,
finish_reason=finish_reason,
usage=usage,
)
__all__ = ["ChatMessage", "LLMClient", "LLMResponse"]