Update documentation, refactor string handling, and enhance logging

- Updated the development server documentation to reflect the new port configuration.
- Refactored string handling by replacing the centralized registry with dedicated selectors for better modularity and type safety.
- Enhanced logging throughout the application by integrating loguru for structured logging and improved context handling.
- Removed outdated files and streamlined the codebase for better maintainability.
- Added new HTML parsing utilities using BeautifulSoup to improve DOM traversal and element extraction.
- Updated various components to utilize the new string selectors, ensuring consistency across the codebase.
This commit is contained in:
2025-12-07 14:16:27 +00:00
parent 5cd1fa3532
commit f502286450
28 changed files with 870 additions and 1384 deletions

View File

@@ -34,7 +34,7 @@ python -m guide
HOST=127.0.0.1 PORT=9000 python -m guide
# View API docs
# Navigate to http://localhost:8765/docs
# Navigate to http://localhost:8000/docs
# Key endpoints:
# GET /healthz # liveness check

View File

@@ -1,170 +0,0 @@
# MUI Autocomplete Dropdown Close - Failed Strategies
## Problem Statement
MUI Autocomplete multi-select dropdowns stay open by design for multiple selections. After selecting options, the dropdown must be closed before interacting with other form fields. Otherwise, subsequent fields see the wrong listbox options.
## Environment
- Browser automation via Chrome Extension (WebSocket-based, not CDP)
- Extension uses `dispatchEvent` for all interactions (events have `isTrusted: false`)
- MUI v5 Autocomplete with multi-select enabled
- React controlled components
---
## Failed Strategies
### 1. Simple Blur
```javascript
input.blur();
document.body.focus();
```
**Result:** Dropdown stays open. MUI Autocomplete does not close on blur alone for multi-select.
---
### 2. Tab Key Event
```javascript
const eventProps = {
key: 'Tab',
code: 'Tab',
keyCode: 9,
bubbles: true,
cancelable: true,
composed: true
};
input.dispatchEvent(new KeyboardEvent('keydown', eventProps));
input.dispatchEvent(new KeyboardEvent('keyup', eventProps));
input.blur();
```
**Result:** Dropdown stays open. Tab key alone doesn't trigger close for multi-select autocomplete.
---
### 3. Click Popup Indicator (Toggle)
```python
await page.click(f"{field_selector} .MuiAutocomplete-popupIndicator")
```
**Result:** Inconsistent. Sometimes works, sometimes opens a different dropdown. The popup indicator is a toggle button - if the dropdown is somehow in a weird state, clicking it may re-open instead of close.
---
### 4. Click Neutral Element (h2, Dialog Title)
```python
await page.click("h2")
await page.click(".MuiDialogTitle-root")
await page.click(".MuiDialogContent-root")
```
**Result:** Dropdown stays open. The click events are dispatched but MUI ClickAwayListener doesn't respond. Likely because programmatic events have `isTrusted: false`.
---
### 5. Mousedown Event on Document (ClickAwayListener)
```javascript
const event = new MouseEvent('mousedown', {
bubbles: true,
cancelable: true,
view: window,
clientX: rect.left + 5,
clientY: rect.top + 5,
button: 0
});
target.dispatchEvent(event); // target = h2, dialog, body
```
**Result:** Dropdown stays open. MUI ClickAwayListener likely checks `event.isTrusted` which is `false` for programmatically dispatched events.
---
### 6. Escape Key to Input
```javascript
input.focus();
const eventProps = {
key: 'Escape',
code: 'Escape',
keyCode: 27,
bubbles: true,
cancelable: true,
composed: true
};
input.dispatchEvent(new KeyboardEvent('keydown', eventProps));
input.dispatchEvent(new KeyboardEvent('keyup', eventProps));
input.blur();
```
**Result:** CLOSES THE PARENT MODAL. The Escape event bubbles up and closes the dialog, not just the dropdown. This breaks the entire form flow.
---
### 7. Escape Key to Active Element (from _open_dropdown)
```javascript
const active = document.activeElement;
active.dispatchEvent(new KeyboardEvent('keydown', { key: 'Escape', ... }));
```
**Result:** Same as #6 - closes the parent modal.
---
### 8. Remove Listbox from DOM
```javascript
const listbox = document.querySelector('[role="listbox"]');
listbox.remove();
```
**Result:** CRASHES REACT. React expects to manage the DOM - removing elements it controls causes "Cannot read properties of null" errors and breaks the form.
---
### 9. Set aria-expanded to false
```javascript
input.setAttribute('aria-expanded', 'false');
```
**Result:** Visual state doesn't change. MUI manages this attribute internally via React state. Setting it directly has no effect on the actual dropdown visibility.
---
## Key Insights
1. **`isTrusted` Check**: MUI ClickAwayListener likely checks `event.isTrusted`. All programmatically dispatched events have `isTrusted: false`, so they're ignored.
2. **React State Ownership**: MUI Autocomplete dropdown visibility is controlled by React state, not DOM attributes. Direct DOM manipulation doesn't work.
3. **Multi-select Behavior**: Unlike single-select, multi-select autocomplete is designed to stay open for selecting multiple items. Standard close mechanisms are intentionally disabled.
4. **Escape Bubbling**: Escape key events bubble to parent elements, closing modals instead of just dropdowns.
5. **Extension Limitation**: Chrome extension automation via `dispatchEvent` cannot produce trusted events. Only actual user input or CDP Input.dispatchMouseEvent can create trusted events.
---
## Successful Solution ✅
**Mark ALL listboxes as closed with data attribute + CSS hiding + filter in queries**
```javascript
// In close logic (after select_multi completes):
const listboxes = document.querySelectorAll('[role="listbox"]:not([data-dropdown-closed])');
listboxes.forEach(listbox => {
listbox.setAttribute('data-dropdown-closed', 'true');
listbox.style.setProperty('display', 'none', 'important');
listbox.style.setProperty('visibility', 'hidden', 'important');
listbox.style.setProperty('pointer-events', 'none', 'important');
});
// In all listbox queries:
const listbox = document.querySelector('[role="listbox"]:not([data-dropdown-closed])');
```
**Why it works:**
1. **Data attribute marking**: `data-dropdown-closed="true"` survives React re-renders (React doesn't remove unknown attributes)
2. **CSS with !important**: Prevents MUI styles from overwriting our hiding
3. **Filter in ALL queries**: `check_listbox_visible`, `_get_options`, `_get_listbox_options` all use `:not([data-dropdown-closed])` selector
4. **Mark ALL listboxes**: Not just the one for current field - catches any orphaned/stale listboxes
**Key insight:** The issue wasn't that we couldn't close the dropdown - it's that we couldn't prevent subsequent code from finding the stale listbox. By marking and filtering, we make stale listboxes invisible to our automation code.
---
## Previously Attempted (Did Not Work)
### CSS Hiding Only (Without Data Attribute)
```javascript
listbox.style.display = 'none';
```
**Result:** React re-renders could remove the style, and queries would still find the element.

File diff suppressed because one or more lines are too long

View File

@@ -1,201 +1,89 @@
This is a sophisticated automation codebase blending Playwright/CDP with high-level abstraction layers. However, to achieve your goal of **LLM-driven contextual filling** (where an LLM decides *what* to put in a field based on its label/context rather than hardcoded `fill_text` calls), the current implementation has significant gaps in how it "perceives" the form.
# Spec Validation (2025-12-07)
Here is a critique of the current approach, specific bug identifications, and a strategy to bridge the gap.
### 1. Critique: The "Uncanny Valley" of Field Discovery
The codebase currently sits between two approaches:
1. **Explicit Definitions:** `raindrop/operations/form_schema.py` (GraphQL schema).
2. **Heuristic DOM Scraping:** `browser/elements/form_discovery.py` (Regex/HTML parsing).
**The Core Problem:** There is no reliable bridge between the *Schema* (which knows that field `f19` is "Estimated Value" and requires a number) and the *DOM* (which is just a `<div>` with `data-cy="board-item-field-number-f19"`).
To allow an LLM to generate values, you need to construct a **Context Object** that says:
> "Field 'Description' (DOM Selector: `[data-cy=...]`) is a Text Area. It is currently empty. It corresponds to the 'Description' field in the Schema."
#### Weaknesses in Current Logic:
* **Regex HTML Parsing is Fragile:** `extract_field_from_html` in `form_discovery.py` attempts to parse raw HTML strings with Regex to find labels and attributes. This is prone to failure with React/MUI because:
* The `label` is often a sibling or wrapped in a `FormControl`, not always nested cleanly inside the captured HTML snippet.
* MUI uses portals for dropdowns; the options aren't inside the captured HTML snippet of the field.
* **Dependency on `data-cy` structure:** The logic heavily infers types based on the string format of `data-cy` (e.g., splitting by hyphens). If the frontend team changes `board-item-field-text-...` to `field-input-...`, the inference engine breaks entirely.
* **Disconnect from ARIA:** The discovery logic looks for visual labels via Regex but ignores the Accessibility Tree. The most reliable way to identify a field for an LLM is the **Accessible Name** (computed from `label`, `aria-label`, `aria-labelledby`), which is what screen readers (and intelligent agents) should use.
### 2. Bugs & Fragile Code Identification
#### Bug A: Regex Parsing of React/MUI Forms
**File:** `src/guide/app/browser/elements/form_discovery.py`
**Function:** `extract_field_from_html`
Findings below are grounded in current source with inline excerpts.
## Functional Gaps
- Docling config is cosmetic: `extract_ui_elements` keeps `_docling_url`/`_api_key` params but the docstring says they are “Unused (maintained for signature compatibility)”, and the body only runs `_extract_elements_from_html(...)` on captured HTML (no HTTP call).
```python
# CURRENT BROKEN LOGIC
label_match = re.search(
r'<label[^>]*>(.*?)</label>',
html,
re.IGNORECASE | re.DOTALL
)
# src/guide/app/browser/diagnostics.py
async def extract_ui_elements(..., _docling_url: str | None = None, _api_key: str | None = None):
"""... docling_url: Unused (maintained for signature compatibility)"""
html = await capture_html(page)
ui_elements = _extract_elements_from_html(html)
```
**Why it fails:** In MUI, the `<label>` tag often has a generated `id` and uses `for="input-id"`. The input is often *not* inside the label, or the label text is split across spans. Furthermore, `extract_all_fields_from_html` slices HTML based on `data-cy`. If the `<label>` is *outside* that sliced `div` (which happens in grid layouts), you lose the label context.
#### Bug B: Dropdown Option Discovery Timing
**File:** `src/guide/app/browser/diagnostics.py`
**Function:** `inspect_dropdown`
The code attempts to open the dropdown to inspect options:
- OTP callbacks are processlocal: the store is an inmemory dict plus a modulelevel singleton. Requests and callbacks on different workers wont see the same `_requests`.
```python
click_result = await page.evaluate(_JS_CLICK_DROPDOWN.format(selector=escaped))
# ... then inspects listbox ...
# src/guide/app/auth/otp_callback.py
class OtpCallbackStore:
_requests: dict[str, OtpRequest]
...
_store: OtpCallbackStore | None = None
```
**The Bug:**
1. **Race Condition:** MUI Autocomplete animations take time (often 200-300ms). The script inspects the DOM immediately after the click. While there is a `wait_for_role_option` helper elsewhere, `diagnostics.py` seems to rely on immediate execution in `inspect_listbox`.
2. **Stale References:** If the dropdown is "Type-to-search" (`browser/elements/dropdown/typeahead.py`), opening it yields **zero options** until typing occurs. An LLM querying this field will think it has no valid choices.
#### Bug C: Boolean Logic in Field Inference
**File:** `src/guide/app/browser/elements/field_inference.py`
**Function:** `infer_type_from_element`
- GraphQL client rejects partial successes: any `errors` entry triggers `GraphQLOperationError`, even if `data` is present.
```python
# CURRENT
classes_obj = result.get("classes")
# ...
has_autocomplete_parent = _get_bool_from_dict(result, "has_autocomplete_parent")
# src/guide/app/raindrop/graphql.py
if validated.has_errors:
raise errors.GraphQLOperationError(
validated.first_error_message or "GraphQL operation failed",
details={"errors": error_details},
)
```
**The Bug:** It relies on the presence of specific class names in the DOM. In production builds of MUI/React, class names are often mangled (e.g., `css-1h51icj-MuiAutocomplete`). Unless you are strictly checking for the static `Mui*` classes (which can be disabled in themes), this inference will return "unknown".
### 3. Proposed Solution: The "Semantic Bridge"
To fix this and enable LLM context generation, do not rely on Regex. Use the Browser to compute the **Computed Accessible Name** and map it to the **GraphQL Schema**.
#### Step 1: Replace Regex Discovery with DOM Evaluation
Replace `extract_field_from_html` with a Javascript evaluation that asks the browser "What is the label for this input?".
**Update:** `src/guide/app/browser/elements/form_discovery.py`
## Configuration & Portability
- Demonstration board bindings are hardcoded to a single environment.
```python
_JS_EXTRACT_SEMANTIC_FIELDS = """
(() => {
const fields = [];
// Find all potential inputs
const inputs = document.querySelectorAll('input, textarea, [role="button"], [role="combobox"]');
inputs.forEach(el => {
// Skip hidden inputs
if (el.type === 'hidden' || el.offsetParent === null) return;
// 1. Calculate Label (Bridge logical to visual)
let label = "";
// Try direct labels
if (el.labels && el.labels.length > 0) {
label = el.labels[0].innerText;
}
// Try ARIA
else if (el.hasAttribute('aria-label')) {
label = el.getAttribute('aria-label');
}
else if (el.hasAttribute('aria-labelledby')) {
const labelId = el.getAttribute('aria-labelledby');
const labelEl = document.getElementById(labelId);
if (labelEl) label = labelEl.innerText;
}
// Try MUI specific traversal (Fallback)
else {
const parent = el.closest('.MuiFormControl-root');
if (parent) {
const labelEl = parent.querySelector('label');
if (labelEl) label = labelEl.innerText;
}
}
// 2. Determine Type & Data-CY
const wrapper = el.closest('[data-cy]');
const dataCy = wrapper ? wrapper.getAttribute('data-cy') : (el.getAttribute('data-cy') || "");
fields.push({
selector: dataCy ? `[data-cy="${dataCy}"]` : uniqueSelector(el), // pseudo-code for selector gen
data_cy: dataCy,
label: label.trim(),
tag_name: el.tagName.toLowerCase(),
type_attr: el.getAttribute('type'),
role: el.getAttribute('role'),
value: el.value,
is_required: el.required || el.getAttribute('aria-required') === 'true',
is_disabled: el.disabled || el.getAttribute('aria-disabled') === 'true'
});
});
return fields;
})()
"""
async def extract_semantic_form_fields(page: PageLike) -> list[FormField]:
"""Extracts fields using DOM accessibility APIs instead of Regex."""
return await page.evaluate(_JS_EXTRACT_SEMANTIC_FIELDS)
# src/guide/app/raindrop/operations/demonstration.py
DEMONSTRATION_BOARD_ID = 596
DEMONSTRATION_INSTANCE_ID = 107
```
#### Step 2: Bridge Schema and Discovery
You have the "Perfect" schema in `GetBoardDefinition` (GraphQL) and the "Actual" DOM in the function above. You need a reconciliation step.
Create a new utility `src/guide/app/browser/context_builder.py`:
- Redis URL currently unused: even with `RAINDROP_DEMO_REDIS_URL=redis://192.168.50.210:6379/4`, `AppSettings` has no Redis-related fields; only the documented keys (raindrop URLs, browser hosts, docling_*, session_*, n8n_*) are parsed, so the value is ignored at runtime.
```python
async def build_llm_form_context(page: PageLike, board_schema: FormSchema) -> dict:
# 1. Get live DOM fields
dom_fields = await extract_semantic_form_fields(page)
# 2. Map DOM to Schema
# Raindrop seems to use 'f1', 'f2' keys in schema that map to 'data-cy' in DOM
context_map = {}
for schema_field in board_schema.fields:
# Find matching DOM element
# Heuristic: Match data-cy ending with field name or match Label text
match = next((f for f in dom_fields if schema_field.name in (f['data_cy'] or "")), None)
if not match:
# Fallback: Match by Label text (fuzzy)
match = next((f for f in dom_fields if schema_field.label.lower() in f['label'].lower()), None)
field_context = {
"field_name": schema_field.label, # "Estimated Value"
"description": f"Type: {schema_field.field_type}", # "Type: number"
"is_required": schema_field.required,
"current_value": match['value'] if match else None,
"dom_selector": match['selector'] if match else None,
}
# CRITICAL: Inject Choices for LLM
if schema_field.field_type == 'menu':
field_context['allowed_values'] = [c.text for c in schema_field.choices]
context_map[schema_field.name] = field_context
return context_map
# src/guide/app/core/config.py
model_config = SettingsConfigDict(env_prefix="RAINDROP_DEMO_", ...)
# defined fields: raindrop_base_url, raindrop_graphql_url, browser_hosts, docling_*, session_*, n8n_* (no redis)
```
### 4. Summary of Actions to Take
1. **Refactor `form_discovery.py`**: Stop parsing HTML strings with Regex. It is the root cause of "unknown" fields. Use `page.evaluate` to inspect the computed accessibility tree (`labels`, `aria-labelledby`).
2. **Fix `dropdown` logic**: Don't rely on opening dropdowns to find choices for the LLM. Use the GraphQL definition (`GetBoardDefinition`) to get the allowed values, and pass those to the LLM. Only use the DOM to *verify* the value was set.
3. **Harden `field_inference.py`**:
* Stop checking for `has_autocomplete_parent` via classes.
* Instead, check `role="combobox"` on the input or its direct parent. This is the W3C standard and works across MUI versions.
4. **Create the Bridge**: Use the code snippet in Step 2 to generate a JSON object that acts as the prompt for your LLM.
**Prompt Example for LLM (generated from proposed logic):**
```json
{
"f19": {
"label": "Estimated Value",
"type": "number",
"selector": "[data-cy='board-item-field-number-f19'] input",
"allowed_values": null
},
"status": {
"label": "Status",
"type": "menu",
"selector": "[data-cy='board-item-field-menu-status']",
"allowed_values": ["Draft", "Active", "Archived"]
}
}
## Security Notes
- JWT expiry is parsed without signature verification and feeds offline session validation; a forged token with a farfuture `exp` could keep a cached session “valid” until TTL expires.
```python
# src/guide/app/auth/session_manager.py
token_expires_at = self.parse_jwt_expiry(token.value)
...
if session.token_expires_at:
token_remaining = session.token_expires_at - now
```
## Performance Considerations
- Accordion collapsing issues one Playwright hop per button (`buttons.nth(index)` inside a loop), which scales poorly on pages with many accordions.
```python
# src/guide/app/browser/elements/layout.py
for index in range(count):
button: PageLocator = buttons.nth(index)
if await icon.count() > 0:
await button.click(timeout=max_wait)
```
- Form discovery walks every `[data-cy^="board-item-"]` node and marshals its metadata in one evaluate call; large boards will produce heavy payloads.
```javascript
// src/guide/app/browser/elements/form_discovery.py
const fields = container.querySelectorAll('[data-cy^=\"board-item-\"]');
return Array.from(fields)
.filter(field => field.offsetParent !== null)
```
## DX / DI Footnote
- Action DI requires constructor params to exist in the DI context unless theyre varargs or have defaults; decorated `__init__` without `functools.wraps` can break injection.
```python
# src/guide/app/actions/base.py
for param_name, param in sig.parameters.items():
if param_name in self._di_context:
kwargs[param_name] = self._di_context[param_name]
else:
is_var_param = param.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD)
has_default = cast(object, param.default) != Parameter.empty
if not is_var_param and not has_default:
raise errors.ActionExecutionError(...)
```
*Instruction:* "Generate a JSON of values to fill based on these field definitions."

173
plan.md
View File

@@ -1,154 +1,45 @@
Guide codebase validation with excerpts from current sources (updated 2025-12-07).
Validated assessment (updated 2025-12-07)
## Architecture confirmations
- **Semantic Bridge** — `src/guide/app/actions/form/smart_fill.py:1-121` documents the schema→DOM→LLM pipeline and dispatch helpers.
```python
# src/guide/app/actions/form/smart_fill.py:1-8,165-188
"""Smart form filling using schema-DOM reconciliation."""
_logger.info("[SmartFill] Fetching schema for board_id=%d", board_id)
form_context = await build_form_context(page, schema, container_selector)
llm_context = format_for_llm(form_context)
```
- **Terminator Bridge extension** — `src/guide/app/browser/extension_client.py:1-5` describes the extension-backed bridge used when CDP refresh is problematic.
```python
# src/guide/app/browser/extension_client.py:1-5
"""WebSocket server for Terminator Bridge browser extension.
Provides a Playwright-like API that translates Python calls into JavaScript
executed via the browser extension..."""
```
- **Dual-mode pool (headless/CDP/extension)** — `src/guide/app/browser/pool.py:79-110` routes allocation based on `host_config.kind`.
```python
# src/guide/app/browser/pool.py:79-110
if self.host_config.kind == HostKind.EXTENSION:
return await self._allocate_extension_page()
if self.host_config.kind == HostKind.CDP:
return await self._allocate_cdp_page()
return await self._allocate_headless_page(...)
```
Executive context
- `create_app` wires settings, persona/board stores, session manager, action registry, browser pool/client, and GraphQL client into FastAPI app state (src/guide/app/main.py:22-59).
- CDP isolation reuses a cached context/page but clears state whenever `host_config.isolate` is true (src/guide/app/browser/pool.py:207-231).
## Findings (validated with code excerpts)
### 1) CDP context reuse contradicts isolation promise (High)
Docstring promises “No page/context pooling” while CDP path caches and reuses the same context/page.
Findings (grounded in code)
1) CDP isolation already clears storage (no bug to fix)
```python
# src/guide/app/browser/pool.py:1-11,74-200
"""Per action: Fresh BrowserContext for complete isolation
- No page/context pooling: Each action gets a clean slate"""
self._cdp_context: BrowserContext | None = None
# src/guide/app/browser/pool.py:207-231
if self.host_config.isolate:
await self._clear_cdp_storage(self._cdp_context, self._cdp_page)
...
if self._cdp_context is None or self._cdp_page is None:
context = browser.contexts[0]
self._cdp_context = context
self._cdp_page = non_devtools_pages[-1]
await context.clear_cookies()
await page.evaluate("localStorage.clear(); sessionStorage.clear();")
```
Note: permissions are not cleared; add `clear_permissions()` if cross-action permission reset is required.
2) Replace regex-only HTML parsing with BeautifulSoup
```python
# src/guide/app/browser/diagnostics.py:1060-1105
input_pattern = r'<input[^>]*?(?:name|id|data-(?:cy|test(?:id)?))\\s*=\\s*["\\\']([^"\\\']+)["\\\'][^>]*>'
...
return self._cdp_context, self._cdp_page, False # reused on subsequent calls
```
Effect: cookies/storage persist across actions for CDP hosts despite isolation claims.
### 2) Dropdown wait scopes `[role="option"]` globally (Medium)
`ensure_listbox` waits for any option in the DOM, which can pick a stale or unrelated dropdown when multiple are present.
```python
# src/guide/app/browser/elements/mui.py:301-317
async def ensure_listbox(...):
if hasattr(page, "wait_for_selector"):
_ = await page.wait_for_selector('[role="option"]', timeout=timeout_ms)
return True
return await wait_for_role_option(page, timeout_ms)
```
### 3) Runtime type checks rely on casts after JSON parsing (Medium)
GraphQL and OTP flows cast parsed JSON without structural validation; unexpected shapes can slip through until attribute access.
```python
# src/guide/app/raindrop/graphql.py:129-165
payload = response.json()
typed_payload = cast(Mapping[str, JSONValue], payload)
data_node_raw: JSONValue | None = typed_payload.get("data")
...
data_node = cast(Mapping[str, JSONValue], data_node_raw)
```
```python
# src/guide/app/actions/auth/request_otp.py:254-272
raw: object = cast(object, response.json())
if not isinstance(raw, dict):
return None
data = cast(dict[str, object], raw)
```
### 4) Selector escaping scattered via aliases/wrappers (Low)
Utility lives in one place but is re-exported or wrapped in several modules.
```python
# src/guide/app/browser/utils.py:8-19
def escape_selector(selector: str) -> str:
return selector.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
# src/guide/app/browser/elements/mui.py:40-44
from guide.app.browser.utils import escape_selector, escape_js_string
_escape_selector = escape_selector
# src/guide/app/browser/extension_client.py:89-93
def _escape_selector(self, selector: str) -> str:
from guide.app.browser.utils import escape_selector
return escape_selector(selector)
```
### 5) GraphQL query definitions split between code and .graphql files (Medium)
Hard-coded mutations persist alongside file-loaded queries.
```python
# src/guide/app/raindrop/operations/board_items.py:38-52
_CREATE_BOARD_ITEM_MUTATION = """
mutation CreateBoardItem($object: board_item_insert_input!) { ... }
"""
```
```python
# src/guide/app/raindrop/generated/queries.py:1-137 (loading .graphql)
_intake_queries = _load_queries("intake.graphql")
GET_INTAKE_REQUEST = _validate_query_loaded(...)
```
Refactor path: move remaining `_CREATE_*` strings into `raindrop/queries/*.graphql`.
### 6) Wait logic/timeouts diverge between Playwright and extension paths (Low)
WaitMixin centralizes timeouts, but ExtensionPage bakes its own defaults instead of using `Timeouts`.
```python
# src/guide/app/browser/mixins/wait.py:34-72
await wait_for_selector(self.page, selector, timeout_ms=timeout_ms, timeouts=self.timeouts)
```
```python
# src/guide/app/browser/extension_client.py:363-385
async def wait_for_selector(..., timeout: float | None = None, ...):
timeout_ms = int(timeout) if timeout else 5000
selector_escaped = self._escape_selector(selector)
for match in re.finditer(button_pattern, html_content, re.IGNORECASE):
...
```
Consider threading `Timeouts.extension_*` through extension helpers for consistency.
HTML parsing via regex will break on multiline attributes or nested quotes; switch to BeautifulSoup (preferred lightweight option) to extract inputs/buttons/selects/role="button" elements.
### 7) Naming overlap of element helpers (Low)
`form.py` contains thin input wrappers, while `form_automation.py` performs discovery and smart filling, leading to ambiguous imports.
3) Trim duplicated selector registry and remove underscored alias exports
```python
# src/guide/app/browser/elements/form.py:12-55
async def fill_text(...): result = await fill_with_react_events(...)
async def fill_autocomplete(...): result = await select_single(...)
# src/guide/app/strings/registry.py:51-96
description_field: ClassVar[str] = IntakeSelectors.DESCRIPTION_FIELD
...
page_header_title: ClassVar[str] = IntakeSelectors.PAGE_HEADER_TITLE
```
```python
# src/guide/app/browser/elements/form_automation.py:32-70
fields = await extract_all_form_fields(page, container_selector)
for field in fields:
success = await fill_field(page, field, value)
```
Renaming `form.py` to `inputs.py` (or similar) would better separate atomic vs. orchestration utilities.
Every selector is re-assigned in `IntakeStrings` (and peers), doubling maintenance versus exposing the selector classes directly. Also `__all__` re-exports underscored aliases in `browser/elements/mui.py` and `dropdown/__init__.py`; plan: expose only canonical names and drop leading-underscore exports to tighten API surface.
### 8) PageHelpers concentrates multiple mixins (Note)
`PageHelpers` inherits Wait, Diagnostics, and Interaction mixins, making it a broad façade that could drift into “god object” territory as features grow.
4) Extension client hardcodes timeouts instead of using settings
```python
# src/guide/app/browser/helpers.py:13-27
class PageHelpers(WaitMixin, DiagnosticsMixin, InteractionMixin):
def __init__(self, page: PageLike, *, timeouts: Timeouts | None = None) -> None:
self.page = page
self.timeouts = timeouts or Timeouts()
# src/guide/app/browser/extension_client.py:381-389
async def wait_for_selector(..., timeout: float | None = None, ...):
"""... default: 5000"""
timeout_ms = int(timeout) if timeout else 5000
```
Consider composing smaller helpers in action code to reduce coupling.
## Recommended next steps
- Decide whether CDP hosts should preserve or isolate state; if isolation is desired, create fresh contexts/pages per request or clear storage between calls.
- Scope dropdown waits to the active listbox (e.g., `aria-controls={listbox_id}`) to avoid cross-dropdown leakage.
- Introduce lightweight runtime validation (Pydantic `TypeAdapter` or explicit `TypeGuard`) before casting JSON payloads.
- Finish migrating GraphQL strings into `.graphql` files and import through `generated/queries.py`.
- Thread `Timeouts` config into extension wait helpers for consistent behavior.
- Rename `form.py` to convey “input primitives” and keep `form_automation.py` for orchestration.
`AppSettings.Timeouts` centralizes browser/extension timeouts (src/guide/app/core/config.py:70-104), but the extension client still defaults to 5000ms; consider plumbing `settings.timeouts` through for consistency.

View File

@@ -9,10 +9,12 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"bs4>=0.0.2",
"faker>=38.2.0",
"fastapi>=0.121.3",
"graphql-core>=3.2.0",
"httpx>=0.27.0",
"loguru>=0.7.3",
"playwright>=1.56.0",
"pydantic>=2.12.4",
"pydantic-settings>=2.4.0",

View File

@@ -14,7 +14,8 @@ from guide.app.browser.elements import (
from guide.app.browser.elements.dropdown import select_multi, click_with_mouse_events
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
from guide.app.strings.demo_texts.contract import ContractTexts
from guide.app.strings.selectors.contract import ContractFormSelectors
from guide.app import errors
_logger = logging.getLogger(__name__)
@@ -322,7 +323,7 @@ class FillContractFormAction(DemoAction):
# Dropdowns & autocompletes (one field at a time)
# Contract type uses select_single (same pattern as sourcing intake)
contract_type_selector = primary_selector(
app_strings.contract.contract_type_field
ContractFormSelectors.CONTRACT_TYPE_FIELD
)
exists = await page.evaluate(
f"(function(){{return document.querySelector('{contract_type_selector}') !== null; }})();"
@@ -369,7 +370,7 @@ class FillContractFormAction(DemoAction):
contract_type_result = await select_single(
page,
contract_type_selector,
app_strings.contract.contract_type_request,
ContractTexts.CONTRACT_TYPE,
)
_logger.info(
"[FORM-FILL] Contract type attempt %d: selected=%s not_found=%s available=%s",
@@ -456,7 +457,7 @@ class FillContractFormAction(DemoAction):
# Commodities (multi) - use select_multi directly like sourcing intake
# Extract primary selector (first from comma-separated list) for proper popup button selector
commodities_field_selector = primary_selector(
app_strings.contract.contract_commodities_field
ContractFormSelectors.CONTRACT_COMMODITIES_FIELD
)
# Verify element exists before attempting selection
@@ -477,7 +478,7 @@ class FillContractFormAction(DemoAction):
commodities_result = await select_multi(
page,
commodities_field_selector,
list(app_strings.contract.contract_commodities_request),
list(ContractTexts.CONTRACT_COMMODITIES),
)
_logger.info(
"[FORM-FILL] Contract commodities selected=%s not_found=%s available=%s",
@@ -502,7 +503,7 @@ class FillContractFormAction(DemoAction):
# Supplier contact - type-to-search autocomplete (requires typing to trigger API)
supplier_contact_selector = primary_selector(
app_strings.contract.supplier_contact_field
ContractFormSelectors.SUPPLIER_CONTACT_FIELD
)
# Wait for field to become available (not disabled)
@@ -524,7 +525,7 @@ class FillContractFormAction(DemoAction):
await scroll_into_view(supplier_contact_selector)
await page.wait_for_timeout(150)
contact_value = app_strings.contract.supplier_contact_request
contact_value = ContractTexts.SUPPLIER_CONTACT
supplier_result = await select_typeahead(
page,
supplier_contact_selector,
@@ -558,14 +559,14 @@ class FillContractFormAction(DemoAction):
# Entity and Regions - single select like sourcing intake
entity_selector = primary_selector(
app_strings.contract.entity_and_regions_field
ContractFormSelectors.ENTITY_AND_REGIONS_FIELD
)
await scroll_into_view(entity_selector)
await page.wait_for_timeout(150)
entity_result = await select_single(
page,
entity_selector,
app_strings.contract.entity_and_regions_request,
ContractTexts.ENTITY_AND_REGIONS,
)
selections["entity_and_regions"] = {
"selected": entity_result["selected"],
@@ -580,14 +581,14 @@ class FillContractFormAction(DemoAction):
# Renewal Type - single select
renewal_type_selector = primary_selector(
app_strings.contract.renewal_type_field
ContractFormSelectors.RENEWAL_TYPE_FIELD
)
await scroll_into_view(renewal_type_selector)
await page.wait_for_timeout(150)
renewal_type_result = await select_single(
page,
renewal_type_selector,
app_strings.contract.renewal_type_request,
ContractTexts.RENEWAL_TYPE,
)
selections["renewal_type"] = {
"selected": renewal_type_result["selected"],
@@ -601,13 +602,13 @@ class FillContractFormAction(DemoAction):
await deselect_field(renewal_type_selector)
# Currency - single select
currency_selector = primary_selector(app_strings.contract.currency_field)
currency_selector = primary_selector(ContractFormSelectors.CURRENCY_FIELD)
await scroll_into_view(currency_selector)
await page.wait_for_timeout(150)
currency_result = await select_single(
page,
currency_selector,
app_strings.contract.currency_request,
ContractTexts.CURRENCY,
)
selections["currency"] = {
"selected": currency_result["selected"],
@@ -620,14 +621,14 @@ class FillContractFormAction(DemoAction):
# Payment Terms - single select
payment_terms_selector = primary_selector(
app_strings.contract.payment_terms_field
ContractFormSelectors.PAYMENT_TERMS_FIELD
)
await scroll_into_view(payment_terms_selector)
await page.wait_for_timeout(150)
payment_terms_result = await select_single(
page,
payment_terms_selector,
app_strings.contract.payment_terms_request,
ContractTexts.PAYMENT_TERMS,
)
selections["payment_terms"] = {
"selected": payment_terms_result["selected"],
@@ -642,14 +643,14 @@ class FillContractFormAction(DemoAction):
# Payment Schedule - single select
payment_schedule_selector = primary_selector(
app_strings.contract.payment_schedule_field
ContractFormSelectors.PAYMENT_SCHEDULE_FIELD
)
await scroll_into_view(payment_schedule_selector)
await page.wait_for_timeout(150)
payment_schedule_result = await select_single(
page,
payment_schedule_selector,
app_strings.contract.payment_schedule_request,
ContractTexts.PAYMENT_SCHEDULE,
)
selections["payment_schedule"] = {
"selected": payment_schedule_result["selected"],
@@ -664,14 +665,14 @@ class FillContractFormAction(DemoAction):
# Business Contact - type-to-search autocomplete
business_contact_selector = primary_selector(
app_strings.contract.business_contact_field
ContractFormSelectors.BUSINESS_CONTACT_FIELD
)
await scroll_into_view(business_contact_selector)
await page.wait_for_timeout(150)
business_contact_result = await select_typeahead(
page,
business_contact_selector,
app_strings.contract.business_contact_request,
ContractTexts.BUSINESS_CONTACT,
min_chars=3,
wait_ms=3000,
)
@@ -688,14 +689,14 @@ class FillContractFormAction(DemoAction):
# Managing Department - single select
managing_dept_selector = primary_selector(
app_strings.contract.managing_department_field
ContractFormSelectors.MANAGING_DEPARTMENT_FIELD
)
await scroll_into_view(managing_dept_selector)
await page.wait_for_timeout(150)
managing_dept_result = await select_single(
page,
managing_dept_selector,
app_strings.contract.managing_department_request,
ContractTexts.MANAGING_DEPARTMENT,
)
selections["managing_department"] = {
"selected": managing_dept_result["selected"],
@@ -710,14 +711,14 @@ class FillContractFormAction(DemoAction):
# Funding Department - single select
funding_dept_selector = primary_selector(
app_strings.contract.funding_department_field
ContractFormSelectors.FUNDING_DEPARTMENT_FIELD
)
await scroll_into_view(funding_dept_selector)
await page.wait_for_timeout(150)
funding_dept_result = await select_single(
page,
funding_dept_selector,
app_strings.contract.funding_department_request,
ContractTexts.FUNDING_DEPARTMENT,
)
selections["funding_department"] = {
"selected": funding_dept_result["selected"],
@@ -735,16 +736,16 @@ class FillContractFormAction(DemoAction):
"effective_date",
fill_date(
page,
app_strings.contract.effective_date_field,
app_strings.contract.effective_date_request,
ContractFormSelectors.EFFECTIVE_DATE_FIELD,
ContractTexts.EFFECTIVE_DATE,
),
)
await attempt(
"end_date",
fill_date(
page,
app_strings.contract.end_date_field,
app_strings.contract.end_date_request,
ContractFormSelectors.END_DATE_FIELD,
ContractTexts.END_DATE,
),
)
@@ -753,16 +754,16 @@ class FillContractFormAction(DemoAction):
"renewal_increase",
fill_text(
page,
app_strings.contract.renewal_increase_field,
app_strings.contract.renewal_increase_request,
ContractFormSelectors.RENEWAL_INCREASE_FIELD,
ContractTexts.RENEWAL_INCREASE,
),
)
await attempt(
"renewal_alert_days",
fill_text(
page,
app_strings.contract.renewal_alert_days_field,
app_strings.contract.renewal_alert_days_request,
ContractFormSelectors.RENEWAL_ALERT_DAYS_FIELD,
ContractTexts.RENEWAL_ALERT_DAYS,
),
)
# Notices not present on this view; skip to avoid selector errors
@@ -770,61 +771,61 @@ class FillContractFormAction(DemoAction):
"total_value",
fill_text(
page,
app_strings.contract.total_value_field,
app_strings.contract.total_value_request,
ContractFormSelectors.TOTAL_VALUE,
ContractTexts.TOTAL_VALUE,
),
)
await attempt(
"budget",
fill_text(
page,
app_strings.contract.budget_field,
app_strings.contract.budget_request,
ContractFormSelectors.BUDGET_FIELD,
ContractTexts.BUDGET,
),
)
await attempt(
"project_name",
fill_if_present(
app_strings.contract.project_name_field,
app_strings.contract.project_name_request,
ContractFormSelectors.PROJECT_NAME_FIELD,
ContractTexts.PROJECT_NAME,
),
)
await attempt(
"master_project_name",
fill_if_present(
app_strings.contract.master_project_name_field,
app_strings.contract.master_project_name_request,
ContractFormSelectors.MASTER_PROJECT_NAME_FIELD,
ContractTexts.MASTER_PROJECT_NAME,
),
)
await attempt(
"rebate",
fill_text(
page,
app_strings.contract.rebate_field,
app_strings.contract.rebate_request,
ContractFormSelectors.REBATE_FIELD,
ContractTexts.REBATE,
),
)
await attempt(
"saving",
fill_text(
page,
app_strings.contract.saving_field,
app_strings.contract.saving_request,
ContractFormSelectors.SAVING_FIELD,
ContractTexts.SAVING,
),
)
await attempt(
"breach_notification",
fill_if_present(
app_strings.contract.breach_notification_field,
app_strings.contract.breach_notification_request,
ContractFormSelectors.BREACH_NOTIFICATION_FIELD,
ContractTexts.BREACH_NOTIFICATION,
),
)
# Boolean toggle
if app_strings.contract.terminate_for_convenience_request:
if ContractTexts.TERMINATE_FOR_CONVENIENCE:
try:
await page.click(
app_strings.contract.terminate_for_convenience_toggle
ContractFormSelectors.TERMINATE_FOR_CONVENIENCE_TOGGLE
)
except Exception as exc: # noqa: BLE001
_logger.warning(
@@ -833,18 +834,18 @@ class FillContractFormAction(DemoAction):
await set_input_value(
"business_continuity",
app_strings.contract.business_continuity_field,
app_strings.contract.business_continuity_request,
ContractFormSelectors.BUSINESS_CONTINUITY_FIELD,
ContractTexts.BUSINESS_CONTINUITY,
)
await set_input_value(
"customer_data",
app_strings.contract.customer_data_field,
app_strings.contract.customer_data_request,
ContractFormSelectors.CUSTOMER_DATA_FIELD,
ContractTexts.CUSTOMER_DATA,
)
await set_input_value(
"reseller",
app_strings.contract.reseller_field,
app_strings.contract.reseller_request,
ContractFormSelectors.RESELLER_FIELD,
ContractTexts.RESELLER,
)
return ActionResult(

View File

@@ -10,7 +10,7 @@ from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.helpers import PageHelpers, AccordionCollapseResult
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
from guide.app.strings.selectors.common import CommonSelectors
@register_action
@@ -52,7 +52,7 @@ class CollapseAccordionsDemoAction(DemoAction):
# Get selector from params or use default
selector: str = _coerce_to_str(
context.params.get("selector"),
app_strings.common.page_header_accordion,
CommonSelectors.PAGE_HEADER_ACCORDION,
)
# Get timeout from params or use default

View File

@@ -9,7 +9,7 @@ with the messaging UI (e.g., board view with chat panel).
import asyncio
import logging
from guide.app.browser.extension_client import ExtensionClient, ExtensionPage
from guide.app.strings.registry import MessagingStrings
from guide.app.strings.selectors.messaging import MessagingSelectors
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
_logger = logging.getLogger(__name__)
@@ -17,14 +17,14 @@ _logger = logging.getLogger(__name__)
# XPath selectors to validate
MESSAGING_SELECTORS: dict[str, str] = {
"notification_indicator": MessagingStrings.notification_indicator,
"modal_wrapper": MessagingStrings.modal_wrapper,
"modal_close_button": MessagingStrings.modal_close_button,
"chat_messages_container": MessagingStrings.chat_messages_container,
"chat_flyout_button": MessagingStrings.chat_flyout_button,
"chat_conversations_tab": MessagingStrings.chat_conversations_tab,
"chat_input": MessagingStrings.chat_input,
"send_button": MessagingStrings.send_button,
"notification_indicator": MessagingSelectors.NOTIFICATION_INDICATOR,
"modal_wrapper": MessagingSelectors.MODAL_WRAPPER,
"modal_close_button": MessagingSelectors.MODAL_CLOSE_BUTTON,
"chat_messages_container": MessagingSelectors.CHAT_MESSAGES_CONTAINER,
"chat_flyout_button": MessagingSelectors.CHAT_FLYOUT_BUTTON,
"chat_conversations_tab": MessagingSelectors.CHAT_CONVERSATIONS_TAB,
"chat_input": MessagingSelectors.CHAT_INPUT,
"send_button": MessagingSelectors.SEND_BUTTON,
}

View File

@@ -11,7 +11,8 @@ from guide.app.browser.elements.mui import DropdownResult
from guide.app.browser.helpers import PageHelpers
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
from guide.app.strings.demo_texts.intake import IntakeTexts
from guide.app.strings.selectors.intake import IntakeSelectors
@register_action
@@ -24,9 +25,9 @@ class FillIntakeBasicAction(DemoAction):
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
description_val = app_strings.intake.conveyor_belt_request
await page.fill(app_strings.intake.description_field, description_val)
await page.click(app_strings.intake.next_button)
description_val = IntakeTexts.CONVEYOR_BELT_REQUEST
await page.fill(IntakeSelectors.DESCRIPTION_FIELD, description_val)
await page.click(IntakeSelectors.NEXT_BUTTON)
return ActionResult(details={"message": "Intake filled"})
@@ -54,66 +55,66 @@ class FillSourcingRequestAction(DemoAction):
results: dict[str, DropdownResult] = {
"commodities": await select_multi(
page,
app_strings.intake.commodity_field,
list(app_strings.intake.commodity_request),
IntakeSelectors.COMMODITY_FIELD,
list(IntakeTexts.COMMODITY_REQUEST),
)
}
# Planned (single)
results["planned"] = await select_single(
page,
app_strings.intake.planned_field,
app_strings.intake.planned_request,
IntakeSelectors.PLANNED_FIELD,
IntakeTexts.PLANNED_REQUEST,
)
# Regions (multi)
results["regions"] = await select_multi(
page,
app_strings.intake.regions_field,
list(app_strings.intake.regions_request),
IntakeSelectors.REGIONS_FIELD,
list(IntakeTexts.REGIONS_REQUEST),
)
# OpEx/CapEx (combobox)
results["opex_capex"] = await select_combobox(
page,
app_strings.intake.opex_capex_field,
app_strings.intake.opex_capex_request,
IntakeSelectors.OPEX_CAPEX_FIELD,
IntakeTexts.OPEX_CAPEX_REQUEST,
)
# Target date
await fill_date(
page,
app_strings.intake.target_date_field,
app_strings.intake.target_date_request,
IntakeSelectors.TARGET_DATE_FIELD,
IntakeTexts.TARGET_DATE_REQUEST,
)
# Text areas
await fill_textarea(
page,
app_strings.intake.description_textarea,
app_strings.intake.description_request,
IntakeSelectors.DESCRIPTION_TEXTAREA,
IntakeTexts.DESCRIPTION_REQUEST,
)
await fill_textarea(
page,
app_strings.intake.desired_supplier_name_textarea,
app_strings.intake.desired_supplier_name_request,
IntakeSelectors.DESIRED_SUPPLIER_NAME_TEXTAREA,
IntakeTexts.DESIRED_SUPPLIER_NAME_REQUEST,
)
await fill_textarea(
page,
app_strings.intake.desired_supplier_contact_textarea,
app_strings.intake.desired_supplier_contact_request,
IntakeSelectors.DESIRED_SUPPLIER_CONTACT_TEXTAREA,
IntakeTexts.DESIRED_SUPPLIER_CONTACT_REQUEST,
)
await fill_textarea(
page,
app_strings.intake.reseller_textarea,
app_strings.intake.reseller_request,
IntakeSelectors.RESELLER_TEXTAREA,
IntakeTexts.RESELLER_REQUEST,
)
# Entity (autocomplete single-select)
results["entity"] = await select_single(
page,
app_strings.intake.entity_field,
app_strings.intake.entity_request,
IntakeSelectors.ENTITY_FIELD,
IntakeTexts.ENTITY_REQUEST,
)
return ActionResult(

View File

@@ -7,7 +7,7 @@ from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.helpers import PageHelpers
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import MessagingStrings
from guide.app.strings.selectors.messaging import MessagingSelectors
@register_action
@@ -67,13 +67,13 @@ class RespondToMessageAction(DemoAction):
await self._verify_chat_visible(page, helpers, step="before_typing")
# 5. Type message into chat input
await page.fill(MessagingStrings.chat_input, message)
await page.fill(MessagingSelectors.CHAT_INPUT, message)
# 6. Verify chat is still visible before sending
await self._verify_chat_visible(page, helpers, step="before_send")
# 7. Send message
await page.click(MessagingStrings.send_button)
await page.click(MessagingSelectors.SEND_BUTTON)
# 8. Wait for network activity to settle
await helpers.wait_for_network_idle()
@@ -105,18 +105,18 @@ class RespondToMessageAction(DemoAction):
Returns:
True if modal was dismissed, False otherwise.
"""
modal = page.locator(MessagingStrings.modal_wrapper)
modal = page.locator(MessagingSelectors.MODAL_WRAPPER)
modal_count = await modal.count()
if modal_count > 0:
close_button = page.locator(MessagingStrings.modal_close_button)
close_button = page.locator(MessagingSelectors.MODAL_CLOSE_BUTTON)
close_count = await close_button.count()
if close_count > 0:
await close_button.first.click()
# Wait for modal to close
_ = await page.wait_for_selector(
MessagingStrings.modal_wrapper,
MessagingSelectors.MODAL_WRAPPER,
state="hidden",
timeout=5000,
)
@@ -139,7 +139,7 @@ class RespondToMessageAction(DemoAction):
Returns:
True if chat was expanded, False if already visible.
"""
chat_container = page.locator(MessagingStrings.chat_messages_container)
chat_container = page.locator(MessagingSelectors.CHAT_MESSAGES_CONTAINER)
container_count = await chat_container.count()
if container_count > 0:
@@ -147,13 +147,13 @@ class RespondToMessageAction(DemoAction):
return False
# Click flyout button to expand
flyout_button = page.locator(MessagingStrings.chat_flyout_button)
flyout_button = page.locator(MessagingSelectors.CHAT_FLYOUT_BUTTON)
flyout_count = await flyout_button.count()
if flyout_count == 0:
raise errors.ActionExecutionError(
"Chat flyout button not found",
details={"selector": MessagingStrings.chat_flyout_button},
details={"selector": MessagingSelectors.CHAT_FLYOUT_BUTTON},
)
await flyout_button.first.click()
@@ -163,7 +163,7 @@ class RespondToMessageAction(DemoAction):
await self._verify_chat_visible(page, helpers, step="after_flyout_click")
# Switch to conversations tab
conversations_tab = page.locator(MessagingStrings.chat_conversations_tab)
conversations_tab = page.locator(MessagingSelectors.CHAT_CONVERSATIONS_TAB)
tab_count = await conversations_tab.count()
if tab_count > 0:
@@ -191,7 +191,7 @@ class RespondToMessageAction(DemoAction):
Raises:
ActionExecutionError: If chat panel cannot be made visible.
"""
chat_container = page.locator(MessagingStrings.chat_messages_container)
chat_container = page.locator(MessagingSelectors.CHAT_MESSAGES_CONTAINER)
container_count = await chat_container.count()
if container_count > 0:
@@ -199,7 +199,7 @@ class RespondToMessageAction(DemoAction):
return
# Chat not visible - attempt recovery by clicking flyout button
flyout_button = page.locator(MessagingStrings.chat_flyout_button)
flyout_button = page.locator(MessagingSelectors.CHAT_FLYOUT_BUTTON)
flyout_count = await flyout_button.count()
if flyout_count > 0:
@@ -216,8 +216,8 @@ class RespondToMessageAction(DemoAction):
f"Chat panel not visible at step '{step}'",
details={
"step": step,
"container_selector": MessagingStrings.chat_messages_container,
"flyout_selector": MessagingStrings.chat_flyout_button,
"container_selector": MessagingSelectors.CHAT_MESSAGES_CONTAINER,
"flyout_selector": MessagingSelectors.CHAT_FLYOUT_BUTTON,
},
)

View File

@@ -3,7 +3,8 @@ from typing import ClassVar, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
from guide.app.strings.demo_texts.suppliers import SupplierTexts
from guide.app.strings.selectors.sourcing import SourcingSelectors
@register_action
@@ -14,8 +15,8 @@ class AddThreeSuppliersAction(DemoAction):
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
suppliers = app_strings.sourcing.default_trio
suppliers = SupplierTexts.DEFAULT_TRIO
for supplier in suppliers:
await page.fill(app_strings.sourcing.supplier_search_input, supplier)
await page.click(app_strings.sourcing.add_supplier_button)
await page.fill(SourcingSelectors.SUPPLIER_SEARCH_INPUT, supplier)
await page.click(SourcingSelectors.ADD_SUPPLIER_BUTTON)
return ActionResult(details={"added_suppliers": list(suppliers)})

View File

@@ -1,8 +1,9 @@
import logging
from typing import Annotated, Protocol, cast
from fastapi import APIRouter, Depends, Request
from fastapi import FastAPI
from loguru import logger
from guide.app.actions.registry import ActionRegistry
from guide.app.auth import DummyMfaCodeProvider, ensure_persona
from guide.app.browser.client import BrowserClient
@@ -21,8 +22,6 @@ from guide.app.models.domain import (
)
from guide.app.models.personas import DemoPersona, PersonaStore
_logger = logging.getLogger(__name__)
def _resolve_target_host(
payload: ActionRequest,
@@ -95,7 +94,7 @@ async def _capture_error_diagnostics(
}
return await capture_all_diagnostics(page_like, docling_config=docling_config)
except Exception as diag_exc:
_logger.warning(f"Failed to capture diagnostics: {diag_exc}")
logger.warning("Failed to capture diagnostics: {}", diag_exc)
return None
@@ -132,6 +131,7 @@ def _build_success_envelope(
result=result.details,
)
router = APIRouter()

View File

@@ -1,18 +1,20 @@
import json
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import cast
from loguru import logger
from guide.app.auth.mfa import MfaCodeProvider
from guide.app.browser.types import PageLike
from guide.app.core.config import Timeouts
from guide.app.errors import PersonaError
from guide.app.models.personas.models import DemoPersona
from guide.app.strings.registry import app_strings
from guide.app.strings.selectors.auth import Auth0ErrorSelectors
from guide.app.strings.selectors.auth import Auth0ErrorSelectors, AuthSelectors
from guide.app.utils.jwt import is_jwt_format as _is_jwt_format
_logger = logging.getLogger(__name__)
# Module-level default timeouts for backward compatibility
_DEFAULT_TIMEOUTS = Timeouts()
async def _check_for_auth_errors(
@@ -36,8 +38,8 @@ async def _check_for_auth_errors(
error_el = page.locator(selector)
if await error_el.count() > 0:
error_text = await error_el.first.text_content()
_logger.warning(
"%s error: %s (selector: %s)",
logger.warning(
"{} error: {} (selector: {})",
context,
error_text,
selector,
@@ -114,39 +116,39 @@ async def detect_current_persona(page: PageLike) -> str | None:
that may contain user email information.
"""
tokens = await discover_auth_tokens(page)
_logger.debug(
"Discovered %d auth-related localStorage keys: %s",
logger.debug(
"Discovered {} auth-related localStorage keys: {}",
len(tokens),
list(tokens.keys()),
)
if not tokens:
_logger.debug("No auth tokens found in localStorage")
logger.debug("No auth tokens found in localStorage")
return None
# First priority: Plain 'email' key (direct email string)
if "email" in tokens:
email_value = tokens["email"]
if "@" in email_value:
_logger.info(
"Detected persona from localStorage 'email' key: %s", email_value
logger.info(
"Detected persona from localStorage 'email' key: {}", email_value
)
return email_value
# Second priority: Auth0 SPA SDK user keys (@@user@@ keys contain profile info)
for key, value in tokens.items():
if "@@auth0spajs@@" in key and "@@user@@" in key:
_logger.debug("Found Auth0 SPA SDK user key: %s", key)
logger.debug("Found Auth0 SPA SDK user key: {}", key)
if email := _extract_email_from_auth0_user(value):
_logger.info("Detected persona from Auth0 user cache: %s", email)
logger.info("Detected persona from Auth0 user cache: {}", email)
return email
# Third priority: Any auth-related key containing user email
for key, value in tokens.items():
if email := _extract_email_from_json(value):
_logger.info("Detected persona from localStorage key '%s': %s", key, email)
logger.info("Detected persona from localStorage key '{}': {}", key, email)
return email
_logger.debug("No email found in any localStorage keys")
logger.debug("No email found in any localStorage keys")
return None
@@ -157,48 +159,50 @@ async def login_with_mfa(
login_url: str | None = None,
) -> None:
"""Log in with MFA. Only proceeds if email input exists after navigation."""
email_input = page.locator(app_strings.auth.email_input)
email_input = page.locator(AuthSelectors.EMAIL_INPUT)
# Check if we need to navigate to the login page
if await email_input.count() == 0:
if login_url:
_logger.debug("Navigating to login URL: %s", login_url)
logger.debug("Navigating to login URL: {}", login_url)
_response = await page.goto(login_url)
del _response
# Check again after navigation - user might already be logged in
if await email_input.count() == 0:
_logger.debug(
logger.debug(
"No email input found after navigation - user already logged in"
)
return
else:
_logger.debug("No login URL and no email input - user already logged in")
logger.debug("No login URL and no email input - user already logged in")
return
_logger.info("Starting MFA login for: %s", email)
await page.fill(app_strings.auth.email_input, email)
await page.click(app_strings.auth.send_code_button)
_logger.debug("Sent MFA code request, waiting for code")
logger.info("Starting MFA login for: {}", email)
await page.fill(AuthSelectors.EMAIL_INPUT, email)
await page.click(AuthSelectors.SEND_CODE_BUTTON)
logger.debug("Sent MFA code request, waiting for code")
code = mfa_provider.get_code(email)
await page.fill(app_strings.auth.code_input, code)
await page.click(app_strings.auth.submit_button)
_logger.info("MFA login submitted for: %s", email)
await page.fill(AuthSelectors.CODE_INPUT, code)
await page.click(AuthSelectors.SUBMIT_BUTTON)
logger.info("MFA login submitted for: {}", email)
async def logout(page: PageLike) -> None:
"""Log out if the logout button exists."""
logout_btn = page.locator(app_strings.auth.logout_button)
logout_btn = page.locator(AuthSelectors.LOGOUT_BUTTON)
if await logout_btn.count() > 0:
_logger.info("Logging out current user")
logger.info("Logging out current user")
await logout_btn.click()
else:
_logger.debug("No logout button found - user may not be logged in")
logger.debug("No logout button found - user may not be logged in")
async def login_with_verification_code(
page: PageLike,
verification_code: str,
expected_email: str | None = None,
*,
timeouts: Timeouts | None = None,
) -> bool:
"""Authenticate via verification code input on Auth0 page.
@@ -209,6 +213,7 @@ async def login_with_verification_code(
page: Browser page already on the Auth0 verification code page.
verification_code: 6-digit code from email.
expected_email: If provided, validate logged-in user matches this email.
timeouts: Optional Timeouts instance for centralized configuration.
Returns:
True if authentication successful, False otherwise.
@@ -216,34 +221,37 @@ async def login_with_verification_code(
from guide.app.browser.wait import wait_for_stable_page
from guide.app.strings.selectors.login import LoginSelectors
_logger.info("Starting verification code login flow")
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
logger.info("Starting verification code login flow")
# Wait for code input field
code_input = page.locator(LoginSelectors.VERIFICATION_CODE_INPUT)
try:
await code_input.wait_for(state="visible", timeout=5000)
await code_input.wait_for(
state="visible", timeout=effective_timeouts.element_default
)
except Exception as exc:
_logger.error("Verification code input not found: %s", exc)
logger.error("Verification code input not found: {}", exc)
return False
# Fill verification code
_logger.info("Filling verification code")
logger.info("Filling verification code")
await code_input.fill(verification_code)
# Click submit button
submit_btn = page.locator(LoginSelectors.VERIFICATION_CODE_SUBMIT)
if await submit_btn.count() > 0:
_logger.info("Clicking verification code submit button")
logger.info("Clicking verification code submit button")
await submit_btn.first.click()
else:
_logger.warning("No verification code submit button found")
logger.warning("No verification code submit button found")
return False
# Wait for auth redirect
await wait_for_stable_page(page, stability_check_ms=8000)
# Log current URL after submission
_logger.info("URL after verification code submission: %s", page.url)
logger.info("URL after verification code submission: {}", page.url)
# Check for error messages
if await _check_for_auth_errors(
@@ -255,17 +263,17 @@ async def login_with_verification_code(
if expected_email:
current = await detect_current_persona(page)
if current and current.lower() == expected_email.lower():
_logger.info("Verification code login successful for: %s", expected_email)
logger.info("Verification code login successful for: {}", expected_email)
return True
_logger.warning(
"Verification code login validation failed - expected: %s, detected: %s",
logger.warning(
"Verification code login validation failed - expected: {}, detected: {}",
expected_email,
current,
)
return False
_logger.info("Verification code login completed (no email validation requested)")
logger.info("Verification code login completed (no email validation requested)")
return True
@@ -289,14 +297,14 @@ async def login_with_otp_url(
"""
from guide.app.browser.wait import wait_for_stable_page
_logger.info("Starting OTP login flow")
_logger.info("OTP URL: %s", otp_url)
logger.info("Starting OTP login flow")
logger.info("OTP URL: {}", otp_url)
# Navigate to OTP URL
try:
_ = await page.goto(otp_url)
except Exception as exc:
_logger.error("Failed to navigate to OTP URL: %s", exc)
logger.error("Failed to navigate to OTP URL: {}", exc)
return False
# Wait for page to stabilize
@@ -313,7 +321,7 @@ async def login_with_otp_url(
for selector in Auth0ErrorSelectors.LOGIN_BUTTON_SELECTORS:
login_btn = page.locator(selector)
if await login_btn.count() > 0:
_logger.info("Clicking OTP confirmation button: %s", selector)
logger.info("Clicking OTP confirmation button: {}", selector)
await login_btn.first.click()
button_clicked = True
# Wait for auth redirect after clicking - Auth0 can be slow
@@ -321,10 +329,10 @@ async def login_with_otp_url(
break
if not button_clicked:
_logger.info("No OTP confirmation button found - may have auto-redirected")
logger.info("No OTP confirmation button found - may have auto-redirected")
# Log current URL after click attempt to diagnose redirect issues
_logger.info("URL after OTP login flow: %s", page.url)
logger.info("URL after OTP login flow: {}", page.url)
# Check for errors after clicking (in case click triggered error)
if await _check_for_auth_errors(
@@ -337,15 +345,15 @@ async def login_with_otp_url(
# Debug: log current URL and localStorage keys
try:
current_url = page.url
_logger.info("Post-login URL: %s", current_url)
logger.info("Post-login URL: {}", current_url)
ls_keys = await page.evaluate("Object.keys(localStorage)")
_logger.info("localStorage keys: %s", ls_keys)
logger.info("localStorage keys: {}", ls_keys)
except Exception as debug_exc:
_logger.warning("Debug info collection failed: %s", debug_exc)
logger.warning("Debug info collection failed: {}", debug_exc)
current = await detect_current_persona(page)
if current and current.lower() == expected_email.lower():
_logger.info("OTP login successful for: %s", expected_email)
logger.info("OTP login successful for: {}", expected_email)
return True
# Capture page content on failure for debugging
@@ -354,22 +362,22 @@ async def login_with_otp_url(
body_text = await page.evaluate(
"document.body?.innerText?.substring(0, 500) || ''"
)
_logger.warning(
"OTP login validation failed - expected: %s, detected: %s, page_title: %s, body_preview: %s",
logger.warning(
"OTP login validation failed - expected: {}, detected: {}, page_title: {}, body_preview: {}",
expected_email,
current,
page_title,
body_text,
)
except Exception:
_logger.warning(
"OTP login validation failed - expected: %s, detected: %s",
logger.warning(
"OTP login validation failed - expected: {}, detected: {}",
expected_email,
current,
)
return False
_logger.info("OTP login completed (no email validation requested)")
logger.info("OTP login completed (no email validation requested)")
return True
@@ -380,15 +388,15 @@ async def ensure_persona(
login_url: str | None = None,
) -> None:
"""Ensure the browser is logged in as the specified persona."""
_logger.debug("Ensuring persona: %s", persona.email)
logger.debug("Ensuring persona: {}", persona.email)
current = await detect_current_persona(page)
if current and current.lower() == persona.email.lower():
_logger.debug("Already logged in as: %s", persona.email)
logger.debug("Already logged in as: {}", persona.email)
return
if current:
_logger.info("Switching persona from %s to %s", current, persona.email)
logger.info("Switching persona from {} to {}", current, persona.email)
else:
_logger.info("Logging in as persona: %s", persona.email)
logger.info("Logging in as persona: {}", persona.email)
await logout(page)
await login_with_mfa(page, persona.email, mfa_provider, login_url=login_url)
@@ -542,11 +550,11 @@ async def discover_auth_tokens(page: PageLike) -> dict[str, str]:
all_keys = await page.evaluate(_JS_LIST_ALL_LOCALSTORAGE_KEYS)
if isinstance(all_keys, list):
keys_list = cast(list[str], all_keys)
_logger.debug("All localStorage keys (%d): %s", len(keys_list), keys_list)
logger.debug("All localStorage keys ({}): {}", len(keys_list), keys_list)
result = await page.evaluate(_JS_DISCOVER_AUTH_TOKENS)
if not isinstance(result, dict):
_logger.debug("localStorage scan returned non-dict result")
logger.debug("localStorage scan returned non-dict result")
return {}
result_dict = cast(dict[str, object], result)
return {
@@ -554,7 +562,7 @@ async def discover_auth_tokens(page: PageLike) -> dict[str, str]:
}
except Exception as exc:
# Handle cases where page.evaluate fails (restricted pages, closed pages)
_logger.debug("localStorage access failed: %s", exc)
logger.debug("localStorage access failed: {}", exc)
return {}

View File

@@ -1,10 +1,10 @@
"""Core session management service."""
import logging
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, cast
from guide.app.utils.jwt import parse_jwt_expiry as _parse_jwt_expiry
from loguru import logger
from playwright.async_api import BrowserContext
@@ -18,8 +18,6 @@ from guide.app.models.domain import ActionResult
from guide.app.models.personas.models import DemoPersona
from guide.app.utils.urls import extract_base_url
_logger = logging.getLogger(__name__)
_JS_GET_ALL_LOCAL_STORAGE = """
(() => {
const items = {};
@@ -214,8 +212,8 @@ class SessionManager:
return
_ = await page.evaluate(_JS_SET_LOCAL_STORAGE, session.local_storage)
_logger.debug(
"Injected %d localStorage items for persona '%s'",
logger.debug(
"Injected {} localStorage items for persona '{}'",
len(session.local_storage),
session.persona_id,
)
@@ -237,8 +235,8 @@ class SessionManager:
# Cast cookies to SetCookieParam sequence - structure is compatible
cookies_typed = cast("list[SetCookieParam]", session.cookies)
await context.add_cookies(cookies_typed)
_logger.debug(
"Injected %d cookies for persona '%s'",
logger.debug(
"Injected {} cookies for persona '{}'",
len(session.cookies),
session.persona_id,
)
@@ -274,13 +272,13 @@ class SessionManager:
"""
session = self.load_session(persona.id)
if not session:
_logger.debug("No saved session found for persona %s", persona.id)
logger.debug("No saved session found for persona {}", persona.id)
return None
validation = self.validate_offline(session)
if not validation.is_valid:
_logger.info(
"Saved session for %s is invalid: %s",
logger.info(
"Saved session for {} is invalid: {}",
persona.id,
validation.reason,
)
@@ -291,8 +289,8 @@ class SessionManager:
try:
base_url = extract_base_url(session.origin_url)
except ValueError as exc:
_logger.warning(
"Invalid origin URL in session for %s: %s",
logger.warning(
"Invalid origin URL in session for {}: {}",
persona.id,
exc,
)
@@ -314,7 +312,7 @@ class SessionManager:
current = await detect_current_persona(page)
if current and current.lower() == persona.email.lower():
_logger.info("Restored session for persona %s", persona.id)
logger.info("Restored session for persona {}", persona.id)
return ActionResult(
details={
"persona_id": persona.id,
@@ -325,8 +323,8 @@ class SessionManager:
)
# Session injection failed - invalidate it
_logger.warning(
"Session injection failed for %s (expected: %s, got: %s)",
logger.warning(
"Session injection failed for {} (expected: {}, got: {})",
persona.id,
persona.email,
current,
@@ -380,8 +378,8 @@ class SessionManager:
for c in cookies
]
# ExtensionPage doesn't support cookie extraction - session will be localStorage-only
_logger.warning(
"Cookie extraction not supported for %s - session will use localStorage only",
logger.warning(
"Cookie extraction not supported for {} - session will use localStorage only",
type(page).__name__,
)
return []

View File

@@ -1,9 +1,9 @@
import contextlib
import logging
from collections.abc import AsyncIterator
from pathlib import Path
from typing import TYPE_CHECKING
from loguru import logger
from playwright.async_api import Page
from guide.app.browser.extension_client import ExtensionPage
@@ -14,8 +14,6 @@ if TYPE_CHECKING:
else:
StorageState = dict[str, object] # Runtime fallback
_logger = logging.getLogger(__name__)
class BrowserClient:
"""Provides page access via a persistent browser pool with context isolation.
@@ -61,19 +59,19 @@ class BrowserClient:
ConfigError: If the host_id is invalid or not configured
BrowserConnectionError: If the browser connection fails
"""
_logger.info("[BrowserClient] open_page called for host_id: %s", host_id)
logger.info("[BrowserClient] open_page called for host_id: {}", host_id)
context, page, should_close = await self.pool.allocate_context_and_page(
host_id, storage_state=storage_state
)
_logger.info(
"[BrowserClient] Got page from pool, should_close: %s", should_close
logger.info(
"[BrowserClient] Got page from pool, should_close: {}", should_close
)
try:
yield page
finally:
_logger.info("[BrowserClient] Cleaning up, should_close: %s", should_close)
logger.info("[BrowserClient] Cleaning up, should_close: {}", should_close)
# Only close context for headless mode (not CDP/extension)
if should_close and context is not None:
with contextlib.suppress(Exception):

View File

@@ -15,10 +15,14 @@ from guide.app.browser.elements.mui import (
check_listbox_visible,
click_with_mouse_events,
)
from guide.app.core.config import Timeouts
from guide.app.errors import ActionExecutionError
_logger = logging.getLogger(__name__)
# Module-level default timeouts for backward compatibility
_DEFAULT_TIMEOUTS = Timeouts()
# ---------------------------------------------------------------------------
# Shared Utilities
@@ -97,13 +101,18 @@ async def is_dropdown_open(page: PageLike, field_selector: str) -> bool:
async def open_dropdown(
page: PageLike, field_selector: str, popup_button_selector: str
page: PageLike,
field_selector: str,
popup_button_selector: str,
*,
timeouts: Timeouts | None = None,
) -> None:
"""Open an autocomplete dropdown.
Checks if already open first to avoid toggling closed.
Closes any other open dropdown before opening this one.
"""
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
if await is_dropdown_open(page, field_selector):
_logger.debug("[Dropdown] Already open, skipping click for: %s", field_selector)
return
@@ -168,7 +177,9 @@ async def open_dropdown(
try:
with contextlib.suppress(Exception):
_ = await page.wait_for_selector(popup_button_selector, timeout=1000)
_ = await page.wait_for_selector(
popup_button_selector, timeout=effective_timeouts.dropdown_field
)
popup_selector_js = escape_selector(popup_button_selector)
can_click = await page.evaluate(
f"""
@@ -184,11 +195,15 @@ async def open_dropdown(
raise ActionExecutionError("Popup button not available")
except Exception:
with contextlib.suppress(Exception):
_ = await page.wait_for_selector(f"{field_selector} input", timeout=1000)
_ = await page.wait_for_selector(
f"{field_selector} input", timeout=effective_timeouts.dropdown_field
)
await page.click(f"{field_selector} input")
await send_key(page, "ArrowDown")
with contextlib.suppress(Exception):
_ = await page.wait_for_selector(field_selector, timeout=1000)
_ = await page.wait_for_selector(
field_selector, timeout=effective_timeouts.dropdown_field
)
await page.click(field_selector)
await send_key(page, "ArrowDown")
await page.wait_for_timeout(100)
@@ -321,14 +336,19 @@ async def open_combobox_dropdown(
combobox_selector: str,
field_selector: str,
popup_button_selector: str,
*,
timeouts: Timeouts | None = None,
) -> bool:
"""Open combobox dropdown using multiple strategies. Returns True if opened."""
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
if await check_listbox_visible(page):
_logger.debug("[Dropdown] Combobox already open, skipping click")
return True
try:
_ = await page.wait_for_selector(combobox_selector, timeout=1000)
_ = await page.wait_for_selector(
combobox_selector, timeout=effective_timeouts.dropdown_field
)
_ = await click_with_mouse_events(page, combobox_selector)
await page.wait_for_timeout(800)
@@ -362,7 +382,9 @@ async def open_combobox_dropdown(
for icon_sel in icon_selectors:
try:
_ = await page.wait_for_selector(icon_sel, timeout=500)
_ = await page.wait_for_selector(
icon_sel, timeout=effective_timeouts.dropdown_icon
)
_ = await click_with_mouse_events(page, icon_sel, focus_first=False)
await page.wait_for_timeout(400)
if await check_listbox_visible(page):
@@ -659,15 +681,22 @@ async def force_set_combobox_value(
async def open_mui_dropdown(
page: PageLike, field_selector: str, popup_button_selector: str
page: PageLike,
field_selector: str,
popup_button_selector: str,
*,
timeouts: Timeouts | None = None,
) -> bool:
"""Open MUI dropdown by clicking popup button or field. Returns True if clicked."""
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
if await check_listbox_visible(page):
_logger.debug("[Dropdown] MUI dropdown already open, skipping click")
return True
with contextlib.suppress(Exception):
_ = await page.wait_for_selector(popup_button_selector, timeout=500)
_ = await page.wait_for_selector(
popup_button_selector, timeout=effective_timeouts.dropdown_icon
)
clicked = await click_with_mouse_events(
page, popup_button_selector, focus_first=True
)
@@ -675,7 +704,9 @@ async def open_mui_dropdown(
return True
with contextlib.suppress(Exception):
_ = await page.wait_for_selector(field_selector, timeout=500)
_ = await page.wait_for_selector(
field_selector, timeout=effective_timeouts.dropdown_icon
)
clicked = await click_with_mouse_events(page, field_selector, focus_first=True)
if clicked:
return True

View File

@@ -17,9 +17,13 @@ from typing import TypedDict
from guide.app.browser.types import PageLike
from guide.app.browser.utils import escape_js_string, escape_selector
from guide.app.core.config import Timeouts
_logger = logging.getLogger(__name__)
# Module-level default timeouts for backward compatibility
_DEFAULT_TIMEOUTS = Timeouts()
# ---------------------------------------------------------------------------
# Shared Types
@@ -270,20 +274,26 @@ async def clear_and_fill_with_react_events(
async def wait_for_role_option(
page: PageLike,
timeout_ms: int = 600,
timeout_ms: int | None = None,
*,
listbox_id: str | None = None,
timeouts: Timeouts | None = None,
) -> bool:
"""Wait for role=option elements to appear in the DOM.
Args:
page: PageLike instance
timeout_ms: Maximum wait time in milliseconds
timeout_ms: Maximum wait time in milliseconds (default from Timeouts.listbox_wait)
listbox_id: Optional listbox ID to scope the search (prevents cross-dropdown leakage)
timeouts: Optional Timeouts instance for centralized configuration
Returns:
True if options appeared, False if timeout
"""
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
effective_timeout_ms = (
timeout_ms if timeout_ms is not None else effective_timeouts.listbox_wait
)
start = asyncio.get_event_loop().time()
# Build scoped query based on listbox_id
@@ -306,7 +316,7 @@ async def wait_for_role_option(
})();
"""
while (asyncio.get_event_loop().time() - start) * 1000 < timeout_ms:
while (asyncio.get_event_loop().time() - start) * 1000 < effective_timeout_ms:
exists = await page.evaluate(query)
if exists:
return True
@@ -320,9 +330,10 @@ _wait_for_role_option = wait_for_role_option
async def ensure_listbox(
page: PageLike,
timeout_ms: int = 600,
timeout_ms: int | None = None,
*,
listbox_id: str | None = None,
timeouts: Timeouts | None = None,
) -> bool:
"""Ensure a listbox with options is visible.
@@ -332,25 +343,38 @@ async def ensure_listbox(
Args:
page: PageLike instance
timeout_ms: Maximum wait time in milliseconds
timeout_ms: Maximum wait time in milliseconds (default from Timeouts.listbox_wait)
listbox_id: Optional listbox ID from aria-controls to scope the search
timeouts: Optional Timeouts instance for centralized configuration
Returns:
True if listbox is visible, False otherwise
"""
effective_timeouts = timeouts or _DEFAULT_TIMEOUTS
effective_timeout_ms = (
timeout_ms if timeout_ms is not None else effective_timeouts.listbox_wait
)
# When scoped to a specific listbox, skip generic wait_for_selector
# and use our scoped polling directly
if listbox_id:
return await wait_for_role_option(page, timeout_ms, listbox_id=listbox_id)
return await wait_for_role_option(
page,
effective_timeout_ms,
listbox_id=listbox_id,
timeouts=effective_timeouts,
)
# Unscoped fallback: try Playwright wait then polling
with contextlib.suppress(Exception):
if hasattr(page, "wait_for_selector"):
# Use scoped selector to avoid stale options
selector = '[role="listbox"]:not([data-dropdown-closed]) [role="option"]:not([data-dropdown-stale])'
_ = await page.wait_for_selector(selector, timeout=timeout_ms)
_ = await page.wait_for_selector(selector, timeout=effective_timeout_ms)
return True
return await wait_for_role_option(page, timeout_ms)
return await wait_for_role_option(
page, effective_timeout_ms, timeouts=effective_timeouts
)
# Backward compatibility alias

View File

@@ -7,18 +7,16 @@ executed via the browser extension, avoiding CDP page refresh issues.
import asyncio
import contextlib
import json
import logging
import uuid
from typing import Protocol, cast
from loguru import logger
from websockets.asyncio.server import Server, ServerConnection, serve
from guide.app.browser.types import PageLocator
from guide.app.core.config import DEFAULT_EXTENSION_PORT
from guide.app.core.config import DEFAULT_EXTENSION_PORT, Timeouts
from guide.app.errors import ActionExecutionError, BrowserConnectionError
_logger = logging.getLogger(__name__)
# JSON-serializable values that can be returned from JavaScript evaluation
type JSONValue = (
str | int | float | bool | None | dict[str, JSONValue] | list[JSONValue]
@@ -76,8 +74,11 @@ class ExtensionPage:
title = await page.evaluate("document.title")
"""
def __init__(self, client: "ExtensionClient") -> None:
def __init__(
self, client: "ExtensionClient", *, timeouts: Timeouts | None = None
) -> None:
self._client: ExtensionClient = client
self._timeouts: Timeouts = timeouts or Timeouts()
self._pending: dict[str, asyncio.Future[JSONValue]] = {}
self._current_url: str = ""
@@ -97,7 +98,7 @@ class ExtensionPage:
action: str,
payload: dict[str, JSONValue],
*,
timeout: float = 30.0,
timeout: float | None = None,
) -> JSONValue:
"""Send structured command to extension via ActionDispatcher.
@@ -126,19 +127,22 @@ class ExtensionPage:
"payload": payload,
}
_logger.debug(f"[Extension] Sending {action}: {payload}")
logger.debug("[Extension] Sending {}: {}", action, payload)
await self._client.send_message(cast(dict[str, JSONValue], message))
effective_timeout = (
timeout if timeout is not None else self._timeouts.extension_command_s
)
try:
return await asyncio.wait_for(future, timeout=timeout)
return await asyncio.wait_for(future, timeout=effective_timeout)
except asyncio.TimeoutError as e:
_ = self._pending.pop(request_id, None)
raise BrowserConnectionError(
f"Extension command timeout after {timeout}s: {action}",
f"Extension command timeout after {effective_timeout}s: {action}",
details={
"action": action,
"payload": payload,
"timeout_seconds": timeout,
"timeout_seconds": effective_timeout,
},
) from e
@@ -285,7 +289,7 @@ class ExtensionPage:
return await self.eval_js(expression)
async def click_element_with_text(
self, selector: str, text: str, timeout: int = 5000
self, selector: str, text: str, timeout: int | None = None
) -> None:
"""Click an element matching selector that contains specific text.
@@ -294,15 +298,18 @@ class ExtensionPage:
Args:
selector: CSS selector for elements to search
text: Text content to match
timeout: Maximum time to wait in milliseconds (default 5000)
timeout: Maximum time to wait in milliseconds (default from Timeouts.element_default)
Raises:
Exception: If no matching element found within timeout
"""
effective_timeout_ms = (
timeout if timeout is not None else self._timeouts.element_default
)
_ = await self._send_command(
"CLICK_TEXT",
{"selector": selector, "text": text},
timeout=timeout / 1000, # Convert ms to seconds
timeout=effective_timeout_ms / 1000, # Convert ms to seconds
)
async def wait_for_timeout(self, timeout: int) -> None:
@@ -326,7 +333,7 @@ class ExtensionPage:
_ = await self._send_command(
"trusted_click",
{"x": x, "y": y},
timeout=10.0,
timeout=self._timeouts.extension_trusted_click_s,
)
async def goto(
@@ -385,7 +392,7 @@ class ExtensionPage:
Returns:
None (for compatibility with Playwright)
"""
timeout_ms = int(timeout) if timeout else 5000
timeout_ms = int(timeout) if timeout else self._timeouts.element_default
poll_interval_ms = 100 # Poll every 100ms like Playwright
wait_state = state or "attached"
_ = strict # Reserved for parameter compatibility
@@ -491,16 +498,17 @@ class ExtensionPage:
"awaitPromise": await_promise,
}
_logger.debug(f"[Extension] Sending eval: {code[:100]}...")
logger.debug("[Extension] Sending eval: {}...", code[:100])
await self._client.send_message(cast(dict[str, JSONValue], message))
eval_timeout = self._timeouts.extension_command_s
try:
return await asyncio.wait_for(future, timeout=30.0)
return await asyncio.wait_for(future, timeout=eval_timeout)
except asyncio.TimeoutError as e:
_ = self._pending.pop(request_id, None)
raise BrowserConnectionError(
f"Extension eval timeout after 30s: {code[:100]}",
details={"code_preview": code[:100], "timeout_seconds": 30.0},
f"Extension eval timeout after {eval_timeout}s: {code[:100]}",
details={"code_preview": code[:100], "timeout_seconds": eval_timeout},
) from e
def handle_response(self, data: dict[str, JSONValue]) -> None:
@@ -790,10 +798,15 @@ class ExtensionClient:
"""
def __init__(
self, host: str = "0.0.0.0", port: int = DEFAULT_EXTENSION_PORT
self,
host: str = "0.0.0.0",
port: int = DEFAULT_EXTENSION_PORT,
*,
timeouts: Timeouts | None = None,
) -> None:
self._host: str = host
self._port: int = port
self._timeouts: Timeouts = timeouts or Timeouts()
self._server: Server | None = None
self._ws: ServerConnection | None = None
self._page: ExtensionPage | None = None
@@ -813,7 +826,7 @@ class ExtensionClient:
async def start(self) -> None:
"""Start the WebSocket server and wait for extension to connect."""
_logger.info(f"Starting WebSocket server at {self._host}:{self._port}")
logger.info("Starting WebSocket server at {}:{}", self._host, self._port)
try:
self._server = await serve(self._handle_connection, self._host, self._port)
@@ -823,20 +836,27 @@ class ExtensionClient:
details={"host": self._host, "port": self._port, "reason": str(exc)},
) from exc
_logger.info(f"WebSocket server listening at ws://{self._host}:{self._port}")
_logger.info("Waiting for extension to connect...")
logger.info("WebSocket server listening at ws://{}:{}", self._host, self._port)
logger.info("Waiting for extension to connect...")
# Wait for extension to connect (with timeout)
connection_timeout = self._timeouts.extension_connection_s
try:
_ = await asyncio.wait_for(self._connected.wait(), timeout=10.0)
_logger.info("Extension connected successfully")
_ = await asyncio.wait_for(
self._connected.wait(), timeout=connection_timeout
)
logger.info("Extension connected successfully")
except (
asyncio.TimeoutError
) as exc: # pragma: no cover - covered by unit test raising connection error
await self.close()
raise BrowserConnectionError(
"Extension did not connect within 10 seconds. Make sure Chrome is running with the Terminator Bridge extension loaded.",
details={"host": self._host, "port": self._port, "timeout_seconds": 10},
f"Extension did not connect within {connection_timeout} seconds. Make sure Chrome is running with the Terminator Bridge extension loaded.",
details={
"host": self._host,
"port": self._port,
"timeout_seconds": connection_timeout,
},
) from exc
async def close(self) -> None:
@@ -848,7 +868,7 @@ class ExtensionClient:
if self._server:
self._server.close()
await self._server.wait_closed()
_logger.info("WebSocket server closed")
logger.info("WebSocket server closed")
async def get_page(self) -> ExtensionPage:
"""Get the page interface for browser automation.
@@ -872,38 +892,43 @@ class ExtensionClient:
Args:
websocket: WebSocket connection from extension
"""
_logger.info("Extension connected")
logger.info("Extension connected")
self._ws = websocket
self._page = ExtensionPage(self)
self._page = ExtensionPage(self, timeouts=self._timeouts)
# Set connected flag only AFTER page is initialized to prevent race condition
self._connected.set()
ws_timeout = self._timeouts.websocket_receive_s
try:
# Use timeout to detect hung connections
while True:
try:
# 60s timeout - if no message in 60s, check connection health
message = await asyncio.wait_for(websocket.recv(), timeout=60.0)
# Timeout triggers ping to check connection health
message = await asyncio.wait_for(
websocket.recv(), timeout=ws_timeout
)
try:
data = cast(dict[str, JSONValue], json.loads(message))
if self._page:
self._page.handle_response(data)
except json.JSONDecodeError:
_logger.warning("Invalid JSON from extension: %s", message)
logger.warning("Invalid JSON from extension: {}", message)
except Exception as e:
_logger.error("Error handling extension message: %s", e)
logger.error("Error handling extension message: {}", e)
except asyncio.TimeoutError:
# No message in 60s - check if connection is still alive
_logger.debug("No message from extension for 60s, sending ping")
# No message in timeout period - check if connection is still alive
logger.debug(
"No message from extension for {}s, sending ping", ws_timeout
)
try:
_ = await websocket.ping()
except Exception:
_logger.warning("Extension ping failed, connection may be dead")
logger.warning("Extension ping failed, connection may be dead")
break
except Exception as e:
_logger.error("Extension connection error: %s", e)
logger.error("Extension connection error: {}", e)
finally:
_logger.info("Extension disconnected")
logger.info("Extension disconnected")
# Ensure WebSocket is properly closed before clearing reference
with contextlib.suppress(Exception):
if self._ws:

View File

@@ -0,0 +1,218 @@
"""HTML parsing utilities using BeautifulSoup for reliable DOM traversal.
Provides robust HTML element extraction using BeautifulSoup instead of regex patterns.
This module is designed to extract interactive UI elements (inputs, buttons, selects,
textareas) and build CSS selectors with a defined priority order.
"""
from typing import Final
from bs4 import BeautifulSoup, Tag
# Selector attribute priority order (highest to lowest)
SELECTOR_PRIORITY: Final[tuple[str, ...]] = (
"data-cy",
"data-testid",
"data-test",
"id",
"name",
"aria-label",
)
def extract_form_elements(html: str) -> dict[str, str]:
"""Extract interactive UI elements with CSS selectors via DOM traversal.
Finds all input, button, select, textarea, and role="button" elements
and maps them to semantic names with CSS selectors.
Args:
html: HTML string to parse
Returns:
Dictionary mapping semantic element names to CSS selectors
"""
soup = BeautifulSoup(html, "html.parser")
elements: dict[str, str] = {}
name_counts: dict[str, int] = {}
# Standard form elements
for tag in soup.find_all(["input", "button", "select", "textarea"]):
_add_element(tag, elements, name_counts)
# Role=button elements (custom buttons implemented with divs/spans)
for tag in soup.find_all(attrs={"role": "button"}):
_add_element(tag, elements, name_counts, prefix="ROLE_BUTTON_")
return elements
def extract_modal_content(html: str) -> str | None:
"""Extract displayed modal content, ignoring hidden modals.
Searches for modal elements using common patterns (modal-wrapper class,
modal ID, role=dialog, MuiModal class) and returns the first visible one.
Args:
html: HTML string to parse
Returns:
HTML string of the modal content if found and visible, None otherwise
"""
soup = BeautifulSoup(html, "html.parser")
# Check for modals in priority order using direct CSS class/attribute checks
# 1. Check for modal-wrapper class
for modal in soup.find_all("div", class_=True):
classes = modal.get("class")
if isinstance(classes, list) and any(
"modal-wrapper" in str(c) for c in classes
):
if not _is_hidden(modal):
return str(modal)
# 2. Check for modal in ID
for modal in soup.find_all("div", id=True):
elem_id = modal.get("id")
if isinstance(elem_id, str) and "modal" in elem_id.lower():
if not _is_hidden(modal):
return str(modal)
# 3. Check for role=dialog
for modal in soup.find_all("div", attrs={"role": "dialog"}):
if not _is_hidden(modal):
return str(modal)
# 4. Check for MuiModal class
for modal in soup.find_all("div", class_=True):
classes = modal.get("class")
if isinstance(classes, list) and any("MuiModal" in str(c) for c in classes):
if not _is_hidden(modal):
return str(modal)
return None
def build_selector(tag: Tag) -> str | None:
"""Build CSS selector using priority order.
Creates a CSS selector from the element's attributes, preferring
data-cy > data-testid > data-test > id > name > aria-label.
Args:
tag: BeautifulSoup Tag element
Returns:
CSS selector string or None if no suitable attribute found
"""
for attr in SELECTOR_PRIORITY:
value = tag.get(attr)
if value:
# Handle list values (e.g., class attributes)
if isinstance(value, list):
value = value[0] if value else None
if value:
if attr == "id":
return f"#{value}"
return f'[{attr}="{value}"]'
return None
def _is_hidden(elem: Tag) -> bool:
"""Check if element is hidden via style, aria-hidden, or hidden attribute.
Args:
elem: BeautifulSoup Tag element
Returns:
True if element appears to be hidden
"""
# Check inline style for display:none
style = elem.get("style", "")
if isinstance(style, str) and "display:" in style and "none" in style:
return True
# Check aria-hidden
if elem.get("aria-hidden") == "true":
return True
# Check hidden attribute
return elem.has_attr("hidden")
def _get_element_identifier(tag: Tag) -> str | None:
"""Get the primary identifier for an element based on selector priority.
Args:
tag: BeautifulSoup Tag element
Returns:
The identifier string or None if no suitable attribute found
"""
for attr in SELECTOR_PRIORITY:
value = tag.get(attr)
if value:
if isinstance(value, list):
value = value[0] if value else None
if value:
return str(value)
return None
def _normalize_name(name: str) -> str:
"""Normalize a name to a valid identifier format.
Args:
name: Raw name string
Returns:
Normalized uppercase name with safe characters
"""
return name.upper().replace("-", "_").replace(".", "_").replace(" ", "_")
def _add_element(
tag: Tag,
elements: dict[str, str],
name_counts: dict[str, int],
prefix: str = "",
) -> None:
"""Add element to dict with duplicate handling.
Extracts the selector and semantic name, handles duplicates by adding
numeric suffixes.
Args:
tag: BeautifulSoup Tag element
elements: Output dictionary to update
name_counts: Counter dict for duplicate handling
prefix: Optional prefix for semantic name (e.g., "ROLE_BUTTON_")
"""
selector = build_selector(tag)
if not selector:
return
identifier = _get_element_identifier(tag)
if not identifier:
return
# Build semantic name with tag prefix for non-input elements
tag_prefix = ""
if tag.name != "input":
tag_prefix = f"{tag.name.upper()}_"
base_name = f"{prefix}{tag_prefix}{_normalize_name(identifier)}"
# Handle duplicates with numeric suffixes
if base_name not in name_counts:
name_counts[base_name] = 1
elements[base_name] = selector
else:
name_counts[base_name] += 1
elements[f"{base_name}_{name_counts[base_name]}"] = selector
__all__ = [
"SELECTOR_PRIORITY",
"extract_form_elements",
"extract_modal_content",
"build_selector",
]

View File

@@ -17,10 +17,10 @@ Isolation Modes:
import asyncio
import contextlib
import logging
from pathlib import Path
from typing import TYPE_CHECKING, TypeAlias
from loguru import logger
from playwright.async_api import (
Browser,
BrowserContext,
@@ -47,9 +47,6 @@ from guide.app.browser.extension_client import ExtensionClient, ExtensionPage
PageLike: TypeAlias = Page | ExtensionPage
_logger = logging.getLogger(__name__)
class BrowserInstance:
"""Manages a single browser connection and its lifecycle.
@@ -124,7 +121,7 @@ class BrowserInstance:
async def _allocate_extension_page(self) -> tuple[None, ExtensionPage, bool]:
"""Allocate extension page - no caching needed unlike CDP mode."""
_logger.info(f"[EXTENSION-{self.host_id}] allocate_context_and_page called")
logger.info("[EXTENSION-{}] allocate_context_and_page called", self.host_id)
if self.extension_client is None:
raise errors.BrowserConnectionError(
f"Extension client not initialized for host {self.host_id}",
@@ -133,24 +130,24 @@ class BrowserInstance:
# Get fresh page each time - no caching needed for extension mode
# (CDP caches to avoid page refresh on context/page queries)
page = await self.extension_client.get_page()
_logger.info(f"[EXTENSION-{self.host_id}] Returning fresh extension page")
logger.info("[EXTENSION-{}] Returning fresh extension page", self.host_id)
return None, page, False
async def _allocate_cdp_page(self) -> tuple[BrowserContext, PageLike, bool]:
_logger.info(f"[CDP-{self.host_id}] allocate_context_and_page called")
logger.info("[CDP-{}] allocate_context_and_page called", self.host_id)
# Check if cached page is still valid
if self._cdp_page is not None and self._cdp_page.is_closed():
_logger.warning(
f"[CDP-{self.host_id}] Cached page is closed, clearing cache"
logger.warning(
"[CDP-{}] Cached page is closed, clearing cache", self.host_id
)
self._cdp_context = None
self._cdp_page = None
# Check if browser connection is still alive
if self.browser is not None and not self.browser.is_connected():
_logger.warning(
f"[CDP-{self.host_id}] Browser disconnected, clearing cache"
logger.warning(
"[CDP-{}] Browser disconnected, clearing cache", self.host_id
)
self._cdp_context = None
self._cdp_page = None
@@ -161,11 +158,11 @@ class BrowserInstance:
if self._cdp_context is None or self._cdp_page is None:
browser = self._require_browser()
_logger.info(
f"[CDP-{self.host_id}] First access - querying browser.contexts"
logger.info(
"[CDP-{}] First access - querying browser.contexts", self.host_id
)
contexts = browser.contexts
_logger.info(f"[CDP-{self.host_id}] Got {len(contexts)} contexts")
logger.info("[CDP-{}] Got {} contexts", self.host_id, len(contexts))
if not contexts:
raise errors.BrowserConnectionError(
f"No contexts available in CDP browser for host {self.host_id}",
@@ -174,8 +171,11 @@ class BrowserInstance:
context = contexts[0]
pages = context.pages
_logger.info(
f"[CDP-{self.host_id}] Got {len(pages)} pages: {[p.url for p in pages]}"
logger.info(
"[CDP-{}] Got {} pages: {}",
self.host_id,
len(pages),
[p.url for p in pages],
)
if not pages:
raise errors.BrowserConnectionError(
@@ -194,10 +194,10 @@ class BrowserInstance:
self._cdp_context = context
self._cdp_page = non_devtools_pages[-1]
_logger.info(f"[CDP-{self.host_id}] Cached page: {self._cdp_page.url}")
logger.info("[CDP-{}] Cached page: {}", self.host_id, self._cdp_page.url)
else:
_logger.info(
f"[CDP-{self.host_id}] Using cached page: {self._cdp_page.url}"
logger.info(
"[CDP-{}] Using cached page: {}", self.host_id, self._cdp_page.url
)
# Assert non-None for type narrowing (values were just assigned above)
@@ -226,7 +226,7 @@ class BrowserInstance:
This provides session isolation without full context recreation,
avoiding the page refresh that would occur from re-querying contexts.
"""
_logger.debug(f"[CDP-{self.host_id}] Clearing storage for isolation")
logger.debug("[CDP-{}] Clearing storage for isolation", self.host_id)
await context.clear_cookies()
await page.evaluate("localStorage.clear(); sessionStorage.clear();")
@@ -280,7 +280,7 @@ class BrowserPool:
# Warn if no browser hosts configured
if not self.settings.browser_hosts:
_logger.warning(
logger.warning(
"No browser hosts configured. Actions requiring browser access will fail."
)
@@ -294,15 +294,15 @@ class BrowserPool:
self._instances[host_id] = instance
# Eagerly cache the page reference to avoid querying on first request
_ = await instance.allocate_context_and_page()
_logger.info(
f"Eagerly connected to CDP host '{host_id}' and cached page"
logger.info(
"Eagerly connected to CDP host '{}' and cached page", host_id
)
except Exception as exc:
_logger.warning(
f"Failed to eagerly connect to CDP host '{host_id}': {exc}"
logger.warning(
"Failed to eagerly connect to CDP host '{}': {}", host_id, exc
)
_logger.info("Browser pool initialized")
logger.info("Browser pool initialized")
async def close(self) -> None:
"""Close all browser connections and the Playwright instance."""
@@ -319,7 +319,7 @@ class BrowserPool:
with contextlib.suppress(Exception):
await self._playwright.stop()
self._playwright = None
_logger.info("Browser pool closed")
logger.info("Browser pool closed")
async def allocate_context_and_page(
self,
@@ -376,8 +376,10 @@ class BrowserPool:
)
except (TargetClosedError, errors.BrowserConnectionError) as exc:
# Connection is stale - evict and reconnect once
_logger.warning(
f"Stale connection detected for host '{resolved_id}', reconnecting: {exc}"
logger.warning(
"Stale connection detected for host '{}', reconnecting: {}",
resolved_id,
exc,
)
await self._evict_instance(resolved_id)
instance = await self._create_instance(resolved_id, host_config)
@@ -392,7 +394,7 @@ class BrowserPool:
instance = self._instances.pop(host_id)
with contextlib.suppress(Exception):
await instance.close()
_logger.info(f"Evicted stale browser instance for host '{host_id}'")
logger.info("Evicted stale browser instance for host '{}'", host_id)
async def _create_instance(
self, host_id: str, host_config: BrowserHostConfig
@@ -428,8 +430,10 @@ class BrowserPool:
instance = BrowserInstance(
host_id, host_config, browser=None, extension_client=extension_client
)
_logger.info(
f"Created extension client instance for host '{host_id}' ({host_config.kind})"
logger.info(
"Created extension client instance for host '{}' ({})",
host_id,
host_config.kind,
)
return instance
@@ -443,8 +447,8 @@ class BrowserPool:
browser = await self._launch_headless(host_config)
instance = BrowserInstance(host_id, host_config, browser=browser)
_logger.info(
f"Created browser instance for host '{host_id}' ({host_config.kind})"
logger.info(
"Created browser instance for host '{}' ({})", host_id, host_config.kind
)
return instance
@@ -476,7 +480,7 @@ class BrowserPool:
try:
browser = await self._playwright.chromium.connect_over_cdp(target_url)
_logger.info(f"Connected to CDP endpoint: {target_url}")
logger.info("Connected to CDP endpoint: {}", target_url)
return browser
except Exception as exc:
raise errors.BrowserConnectionError(
@@ -493,8 +497,8 @@ class BrowserPool:
browser_type = self._resolve_browser_type(host_config.browser)
try:
browser = await browser_type.launch(headless=True)
_logger.info(
f"Launched headless browser: {host_config.browser or 'chromium'}"
logger.info(
"Launched headless browser: {}", host_config.browser or "chromium"
)
return browser
except Exception as exc:

View File

@@ -88,6 +88,8 @@ class Timeouts(BaseModel):
"""Timeout for dropdown listbox appearance."""
dropdown_field: int = 1000
"""Timeout for dropdown field interactions."""
dropdown_icon: int = 500
"""Timeout for dropdown icon/indicator interactions."""
combobox_listbox: int = 2500
"""Extended timeout for combobox listbox population."""

View File

@@ -1,8 +1,15 @@
"""Centralized loguru logging configuration.
Provides structured JSON file logging and pretty console output with
request context injection via context variables.
"""
import contextvars
import json
import logging
import sys
from typing import override
from pathlib import Path
from loguru import logger
class _ContextVars:
@@ -22,66 +29,106 @@ class _ContextVars:
)
class ContextJsonFormatter(logging.Formatter):
"""JSON formatter that includes request context variables in every log entry."""
class InterceptHandler(logging.Handler):
"""Redirect standard library logging to loguru."""
@override
def format(self, record: logging.LogRecord) -> str:
"""Format the log record as JSON with context variables."""
log_data: dict[str, object] = {
"timestamp": self.formatTime(record, datefmt="%Y-%m-%dT%H:%M:%S"),
"level": record.levelname,
"logger": record.name,
"module": record.module,
"line": record.lineno,
"msg": record.getMessage(),
}
def emit(self, record: logging.LogRecord) -> None:
"""Intercept logging records and forward to loguru."""
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno
# Add request context fields if set
frame, depth = logging.currentframe(), 2
while frame is not None and frame.f_code.co_filename == logging.__file__:
frame = frame.f_back
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
def setup_logger() -> None:
"""Configure loguru with console and file sinks.
Console: Pretty colored output to stderr (INFO level)
File: JSON-structured logs to logs/app_logs.log (DEBUG level, 10MB rotation, 7 day retention)
Also intercepts standard library logging (e.g., from requests, urllib) and redirects to loguru.
Context variables (correlation_id, action_id, persona_id, host_id) are automatically
included in JSON logs via loguru's bind() mechanism.
"""
# Remove default handler
logger.remove()
# Console sink: pretty colored output
logger.add(
sys.stderr,
colorize=True,
format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
)
# File sink: JSON-structured logs with context injection
log_dir = Path("logs")
log_dir.mkdir(exist_ok=True)
log_file = log_dir / "app_logs.log"
def inject_context(record: object) -> bool:
"""Filter function that injects context variables into record before serialization.
Args:
record: Loguru Record object with dict-like access to attributes.
"""
# Loguru Record objects have an 'extra' attribute that is a dict
# Access it via getattr for type safety
extra_attr = getattr(record, "extra", None)
if extra_attr is None:
return True
# Ensure we have a dict before mutating it
if not isinstance(extra_attr, dict):
return True
# Directly mutate the extra dict that loguru provides
# The isinstance check ensures it's a dict, so we can safely assign to it
if correlation_id := _ContextVars.correlation_id.get():
log_data["correlation_id"] = correlation_id
extra_attr["correlation_id"] = correlation_id
if action_id := _ContextVars.action_id.get():
log_data["action_id"] = action_id
extra_attr["action_id"] = action_id
if persona_id := _ContextVars.persona_id.get():
log_data["persona_id"] = persona_id
extra_attr["persona_id"] = persona_id
if host_id := _ContextVars.host_id.get():
log_data["host_id"] = host_id
extra_attr["host_id"] = host_id
return True # Always log this record
# Add exception info if present
if record.exc_info:
log_data["exc_info"] = self.formatException(record.exc_info)
logger.add(
str(log_file),
rotation="10 MB",
retention="7 days",
level="DEBUG",
serialize=True, # JSON formatting
enqueue=True, # Thread-safe
filter=inject_context,
)
return json.dumps(log_data)
# Intercept standard library logging
logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
# Suppress noisy third-party loggers
for logger_name in ["httpx", "httpcore", "playwright"]:
logging.getLogger(logger_name).setLevel(logging.WARNING)
class LoggingManager:
"""Manages structured JSON logging with request context injection."""
"""Manages structured logging with request context injection."""
context: type[_ContextVars] = _ContextVars
@staticmethod
def configure(level: int | str = logging.INFO) -> None:
"""Configure JSON logging with structured output and context injection.
All log entries will include:
- ISO timestamp
- Log level
- Logger name, module, line number
- Request context (if set via context variables)
Args:
level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
"""
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.handlers.clear()
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(ContextJsonFormatter())
root_logger.addHandler(handler)
# Backward compatibility: expose configure_logging function
def configure_logging(level: int | str = logging.INFO) -> None:
"""Configure JSON logging (wrapper for LoggingManager.configure)."""
LoggingManager.configure(level)
"""Configure logging (wrapper for setup_logger).
Note: Level parameter is ignored as loguru handles levels per sink.
"""
setup_logger()

View File

@@ -11,7 +11,7 @@ from guide.app.auth import SessionManager, SessionStorage
from guide.app.browser.client import BrowserClient
from guide.app.browser.pool import BrowserPool
from guide.app.core.config import AppSettings, load_settings
from guide.app.core.logging import configure_logging
from guide.app.core.logging import setup_logger
from guide.app.api import router as api_router
from guide.app import errors
from guide.app.models.boards import BoardStore
@@ -20,7 +20,7 @@ from guide.app.raindrop import GraphQLClient
def create_app() -> FastAPI:
configure_logging()
setup_logger()
settings = load_settings()
persona_store = PersonaStore(settings)

View File

@@ -1,8 +1,8 @@
import logging
from collections.abc import Mapping
from typing import TypeGuard
import httpx
from loguru import logger
from pydantic import ValidationError
from guide.app import errors
@@ -12,8 +12,6 @@ from guide.app.core.config import AppSettings
from guide.app.models.personas.models import DemoPersona
from guide.app.raindrop.validation import validate_graphql_response_json
_logger = logging.getLogger(__name__)
class GraphQLClient:
"""GraphQL client with connection pooling.
@@ -79,9 +77,9 @@ class GraphQLClient:
extracted = await extract_bearer_token(page)
if extracted:
bearer_token = extracted.value
_logger.debug("Auto-extracted bearer token from page localStorage")
logger.debug("Auto-extracted bearer token from page localStorage")
else:
_logger.warning(
logger.warning(
"No bearer token found in page localStorage - GraphQL request will be unauthenticated"
)

View File

@@ -1,478 +0,0 @@
"""Static type-safe registry for UI strings, selectors, and labels.
This module provides a flattened, hierarchy-based structure enabling IDE autocompletion,
rename refactoring, and type safety.
GraphQL queries are now maintained separately in raindrop/generated/queries.py
and loaded from .graphql files for better maintainability.
Usage:
from guide.app.strings import app_strings
# Selectors (type-safe, autocomplete-friendly)
selector = app_strings.intake.description_field
# Labels
label = app_strings.intake.description_placeholder
# Demo text
text = app_strings.intake.conveyor_belt_request
# GraphQL queries (use guide.app.raindrop.generated.queries instead)
from guide.app.raindrop.generated.queries import GET_INTAKE_REQUEST
"""
from typing import ClassVar
from guide.app.strings.demo_texts.contract import ContractTexts
from guide.app.strings.demo_texts.intake import IntakeTexts
from guide.app.strings.demo_texts.suppliers import SupplierTexts
from guide.app.strings.labels.auth import AuthLabels
from guide.app.strings.labels.contract import (
ContractBoardLabels,
ContractFilterLabels,
ContractFormLabels,
)
from guide.app.strings.labels.intake import IntakeLabels
from guide.app.strings.labels.sourcing import SourcingLabels
from guide.app.strings.selectors.auth import AuthSelectors
from guide.app.strings.selectors.common import CommonSelectors
from guide.app.strings.selectors.contract import (
ContractBoardSelectors,
ContractDashboardFilters,
ContractFormSelectors,
)
from guide.app.strings.selectors.intake import IntakeSelectors
from guide.app.strings.selectors.messaging import MessagingSelectors
from guide.app.strings.selectors.navigation import NavigationSelectors
from guide.app.strings.selectors.sourcing import SourcingSelectors
class IntakeStrings:
"""Intake flow strings: selectors, labels, and demo text.
Provides direct access to all intake-related UI constants.
"""
# Selectors - General
description_field: ClassVar[str] = IntakeSelectors.DESCRIPTION_FIELD
next_button: ClassVar[str] = IntakeSelectors.NEXT_BUTTON
# Selectors - Sourcing Request Form
form: ClassVar[str] = IntakeSelectors.FORM
requester_field: ClassVar[str] = IntakeSelectors.REQUESTER_FIELD
assigned_owner_field: ClassVar[str] = IntakeSelectors.ASSIGNED_OWNER_FIELD
legal_contact_field: ClassVar[str] = IntakeSelectors.LEGAL_CONTACT_FIELD
commodity_field: ClassVar[str] = IntakeSelectors.COMMODITY_FIELD
planned_field: ClassVar[str] = IntakeSelectors.PLANNED_FIELD
regions_field: ClassVar[str] = IntakeSelectors.REGIONS_FIELD
opex_capex_field: ClassVar[str] = IntakeSelectors.OPEX_CAPEX_FIELD
description_text_field: ClassVar[str] = IntakeSelectors.DESCRIPTION_TEXT_FIELD
description_textarea: ClassVar[str] = IntakeSelectors.DESCRIPTION_TEXTAREA
reseller_field: ClassVar[str] = IntakeSelectors.RESELLER_FIELD
reseller_textarea: ClassVar[str] = IntakeSelectors.RESELLER_TEXTAREA
target_date_field: ClassVar[str] = IntakeSelectors.TARGET_DATE_FIELD
entity_field: ClassVar[str] = IntakeSelectors.ENTITY_FIELD
desired_supplier_name_field: ClassVar[str] = (
IntakeSelectors.DESIRED_SUPPLIER_NAME_FIELD
)
desired_supplier_name_textarea: ClassVar[str] = (
IntakeSelectors.DESIRED_SUPPLIER_NAME_TEXTAREA
)
desired_supplier_contact_field: ClassVar[str] = (
IntakeSelectors.DESIRED_SUPPLIER_CONTACT_FIELD
)
desired_supplier_contact_textarea: ClassVar[str] = (
IntakeSelectors.DESIRED_SUPPLIER_CONTACT_TEXTAREA
)
select_document_field: ClassVar[str] = IntakeSelectors.SELECT_DOCUMENT_FIELD
drop_zone_container: ClassVar[str] = IntakeSelectors.DROP_ZONE_CONTAINER
drop_zone_input: ClassVar[str] = IntakeSelectors.DROP_ZONE_INPUT
back_button: ClassVar[str] = IntakeSelectors.BACK_BUTTON
submit_button: ClassVar[str] = IntakeSelectors.SUBMIT_BUTTON
close_button: ClassVar[str] = IntakeSelectors.CLOSE_BUTTON
page_header: ClassVar[str] = IntakeSelectors.PAGE_HEADER
page_header_card: ClassVar[str] = IntakeSelectors.PAGE_HEADER_CARD
page_header_title: ClassVar[str] = IntakeSelectors.PAGE_HEADER_TITLE
# Labels - Legacy
next_button_label: ClassVar[str] = IntakeLabels.NEXT_BUTTON
legacy_description_placeholder: ClassVar[str] = (
IntakeLabels.LEGACY_DESCRIPTION_PLACEHOLDER
)
# Labels - Sourcing Request Form
requester_label: ClassVar[str] = IntakeLabels.REQUESTER
assigned_owner_label: ClassVar[str] = IntakeLabels.ASSIGNED_OWNER
commodity_label: ClassVar[str] = IntakeLabels.COMMODITY
planned_label: ClassVar[str] = IntakeLabels.PLANNED
legal_contact_label: ClassVar[str] = IntakeLabels.LEGAL_CONTACT
regions_label: ClassVar[str] = IntakeLabels.REGIONS
opex_capex_label: ClassVar[str] = IntakeLabels.OPEX_CAPEX
description_label: ClassVar[str] = IntakeLabels.DESCRIPTION
description_placeholder: ClassVar[str] = IntakeLabels.DESCRIPTION_PLACEHOLDER
target_date_label: ClassVar[str] = IntakeLabels.TARGET_DATE
reseller_label: ClassVar[str] = IntakeLabels.RESELLER
entity_label: ClassVar[str] = IntakeLabels.ENTITY
desired_supplier_name_label: ClassVar[str] = IntakeLabels.DESIRED_SUPPLIER_NAME
desired_supplier_contact_label: ClassVar[str] = (
IntakeLabels.DESIRED_SUPPLIER_CONTACT
)
select_document_label: ClassVar[str] = IntakeLabels.SELECT_DOCUMENT
back_button_label: ClassVar[str] = IntakeLabels.BACK_BUTTON
submit_button_label: ClassVar[str] = IntakeLabels.SUBMIT_BUTTON
close_button_label: ClassVar[str] = IntakeLabels.CLOSE_BUTTON
form_title: ClassVar[str] = IntakeLabels.FORM_TITLE
board_title: ClassVar[str] = IntakeLabels.BOARD_TITLE
# Demo text
conveyor_belt_request: ClassVar[str] = IntakeTexts.CONVEYOR_BELT_REQUEST
alt_request: ClassVar[str] = IntakeTexts.ALT_REQUEST
commodity_request: ClassVar[tuple[str, ...]] = IntakeTexts.COMMODITY_REQUEST
planned_request: ClassVar[str] = IntakeTexts.PLANNED_REQUEST
regions_request: ClassVar[tuple[str, ...]] = IntakeTexts.REGIONS_REQUEST
opex_capex_request: ClassVar[str] = IntakeTexts.OPEX_CAPEX_REQUEST
description_request: ClassVar[str] = IntakeTexts.DESCRIPTION_REQUEST
target_date_request: ClassVar[str] = IntakeTexts.TARGET_DATE_REQUEST
desired_supplier_name_request: ClassVar[str] = (
IntakeTexts.DESIRED_SUPPLIER_NAME_REQUEST
)
desired_supplier_contact_request: ClassVar[str] = (
IntakeTexts.DESIRED_SUPPLIER_CONTACT_REQUEST
)
reseller_request: ClassVar[str] = IntakeTexts.RESELLER_REQUEST
entity_request: ClassVar[str] = IntakeTexts.ENTITY_REQUEST
class ContractStrings:
"""Contract flow strings: selectors, labels, and demo text."""
# Form selectors
save_button: ClassVar[str] = ContractFormSelectors.SAVE_BUTTON
menu_button: ClassVar[str] = ContractFormSelectors.MENU_BUTTON
error_info_button: ClassVar[str] = ContractFormSelectors.ERROR_INFO_BUTTON
close_button: ClassVar[str] = ContractFormSelectors.CLOSE_BUTTON
drop_zone_input: ClassVar[str] = ContractFormSelectors.DROP_ZONE_INPUT
contract_type_field: ClassVar[str] = ContractFormSelectors.CONTRACT_TYPE_FIELD
contract_commodities_field: ClassVar[str] = (
ContractFormSelectors.CONTRACT_COMMODITIES_FIELD
)
supplier_contact_field: ClassVar[str] = ContractFormSelectors.SUPPLIER_CONTACT_FIELD
classification_field: ClassVar[str] = ContractFormSelectors.CLASSIFICATION_FIELD
entity_and_regions_field: ClassVar[str] = (
ContractFormSelectors.ENTITY_AND_REGIONS_FIELD
)
renewal_type_field: ClassVar[str] = ContractFormSelectors.RENEWAL_TYPE_FIELD
renewal_increase_field: ClassVar[str] = ContractFormSelectors.RENEWAL_INCREASE_FIELD
renewal_alert_days_field: ClassVar[str] = (
ContractFormSelectors.RENEWAL_ALERT_DAYS_FIELD
)
effective_date_field: ClassVar[str] = ContractFormSelectors.EFFECTIVE_DATE_FIELD
end_date_field: ClassVar[str] = ContractFormSelectors.END_DATE_FIELD
required_notice_for_nonrenewal_field: ClassVar[str] = (
ContractFormSelectors.REQUIRED_NOTICE_FOR_NONRENEWAL_FIELD
)
terminate_for_convenience_toggle: ClassVar[str] = (
ContractFormSelectors.TERMINATE_FOR_CONVENIENCE_TOGGLE
)
required_notice_for_termination_field: ClassVar[str] = (
ContractFormSelectors.REQUIRED_NOTICE_FOR_TERMINATION_FIELD
)
total_value_field: ClassVar[str] = ContractFormSelectors.TOTAL_VALUE
annualized_value_field: ClassVar[str] = ContractFormSelectors.ANNUALIZED_VALUE
budget_field: ClassVar[str] = ContractFormSelectors.BUDGET_FIELD
cc1_allocation_field: ClassVar[str] = ContractFormSelectors.CC1_ALLOCATION_FIELD
rebate_field: ClassVar[str] = ContractFormSelectors.REBATE_FIELD
saving_field: ClassVar[str] = ContractFormSelectors.SAVING_FIELD
currency_field: ClassVar[str] = ContractFormSelectors.CURRENCY_FIELD
payment_terms_field: ClassVar[str] = ContractFormSelectors.PAYMENT_TERMS_FIELD
payment_schedule_field: ClassVar[str] = ContractFormSelectors.PAYMENT_SCHEDULE_FIELD
business_contact_field: ClassVar[str] = ContractFormSelectors.BUSINESS_CONTACT_FIELD
managing_department_field: ClassVar[str] = (
ContractFormSelectors.MANAGING_DEPARTMENT_FIELD
)
funding_department_field: ClassVar[str] = (
ContractFormSelectors.FUNDING_DEPARTMENT_FIELD
)
reseller_field: ClassVar[str] = ContractFormSelectors.RESELLER_FIELD
project_name_field: ClassVar[str] = ContractFormSelectors.PROJECT_NAME_FIELD
master_project_name_field: ClassVar[str] = (
ContractFormSelectors.MASTER_PROJECT_NAME_FIELD
)
business_continuity_field: ClassVar[str] = (
ContractFormSelectors.BUSINESS_CONTINUITY_FIELD
)
customer_data_field: ClassVar[str] = ContractFormSelectors.CUSTOMER_DATA_FIELD
breach_notification_field: ClassVar[str] = (
ContractFormSelectors.BREACH_NOTIFICATION_FIELD
)
# Board/grid selectors
grid_view_button: ClassVar[str] = ContractBoardSelectors.GRID_VIEW_BUTTON
chart_view_button: ClassVar[str] = ContractBoardSelectors.CHART_VIEW_BUTTON
active_filter_button: ClassVar[str] = ContractBoardSelectors.ACTIVE_FILTER_BUTTON
archived_filter_button: ClassVar[str] = (
ContractBoardSelectors.ARCHIVED_FILTER_BUTTON
)
quick_filter: ClassVar[str] = ContractBoardSelectors.QUICK_FILTER
add_row_button: ClassVar[str] = ContractBoardSelectors.ADD_ROW_BUTTON
# Dashboard filters
filter_30_days: ClassVar[str] = ContractDashboardFilters.FILTER_30_DAYS
filter_60_days: ClassVar[str] = ContractDashboardFilters.FILTER_60_DAYS
filter_90_days: ClassVar[str] = ContractDashboardFilters.FILTER_90_DAYS
filter_sep_2025: ClassVar[str] = ContractDashboardFilters.FILTER_SEP_2025
filter_oct_2025: ClassVar[str] = ContractDashboardFilters.FILTER_OCT_2025
filter_nov_2025: ClassVar[str] = ContractDashboardFilters.FILTER_NOV_2025
filter_nda: ClassVar[str] = ContractDashboardFilters.FILTER_NDA
filter_msa: ClassVar[str] = ContractDashboardFilters.FILTER_MSA
filter_order_form: ClassVar[str] = ContractDashboardFilters.FILTER_ORDER_FORM
filter_sow: ClassVar[str] = ContractDashboardFilters.FILTER_SOW
filter_draft: ClassVar[str] = ContractDashboardFilters.FILTER_DRAFT
filter_pending_approval: ClassVar[str] = (
ContractDashboardFilters.FILTER_PENDING_APPROVAL
)
filter_ready_for_signature: ClassVar[str] = (
ContractDashboardFilters.FILTER_READY_FOR_SIGNATURE
)
filter_pending_signature: ClassVar[str] = (
ContractDashboardFilters.FILTER_PENDING_SIGNATURE
)
filter_direct: ClassVar[str] = ContractDashboardFilters.FILTER_DIRECT
filter_minority_supplier: ClassVar[str] = (
ContractDashboardFilters.FILTER_MINORITY_SUPPLIER
)
filter_preferred: ClassVar[str] = ContractDashboardFilters.FILTER_PREFERRED
filter_sell_side: ClassVar[str] = ContractDashboardFilters.FILTER_SELL_SIDE
# Labels
save_label: ClassVar[str] = ContractFormLabels.SAVE
menu_label: ClassVar[str] = ContractFormLabels.MENU
error_info_label: ClassVar[str] = ContractFormLabels.ERROR_INFO
close_label: ClassVar[str] = ContractFormLabels.CLOSE
drop_zone_label: ClassVar[str] = ContractFormLabels.DROP_ZONE
increase_percent_label: ClassVar[str] = ContractFormLabels.INCREASE_PERCENT
renewal_notification_label: ClassVar[str] = ContractFormLabels.RENEWAL_NOTIFICATION
total_value_label: ClassVar[str] = ContractFormLabels.TOTAL_VALUE
annualized_value_label: ClassVar[str] = ContractFormLabels.ANNUALIZED_VALUE
budget_label: ClassVar[str] = ContractFormLabels.BUDGET
cc1_allocation_label: ClassVar[str] = ContractFormLabels.CC1_ALLOCATION
rebate_label: ClassVar[str] = ContractFormLabels.REBATE
saving_label: ClassVar[str] = ContractFormLabels.SAVING
contract_type_label: ClassVar[str] = ContractFormLabels.CONTRACT_TYPE
contract_commodities_label: ClassVar[str] = ContractFormLabels.CONTRACT_COMMODITIES
supplier_contact_label: ClassVar[str] = ContractFormLabels.SUPPLIER_CONTACT
classification_label: ClassVar[str] = ContractFormLabels.CLASSIFICATION
entity_and_regions_label: ClassVar[str] = ContractFormLabels.ENTITY_AND_REGIONS
renewal_type_label: ClassVar[str] = ContractFormLabels.RENEWAL_TYPE
renewal_increase_label: ClassVar[str] = ContractFormLabels.RENEWAL_INCREASE
renewal_alert_days_label: ClassVar[str] = ContractFormLabels.RENEWAL_ALERT_DAYS
effective_date_label: ClassVar[str] = ContractFormLabels.EFFECTIVE_DATE
end_date_label: ClassVar[str] = ContractFormLabels.END_DATE
required_notice_for_nonrenewal_label: ClassVar[str] = (
ContractFormLabels.REQUIRED_NOTICE_FOR_NONRENEWAL
)
terminate_for_convenience_label: ClassVar[str] = (
ContractFormLabels.TERMINATE_FOR_CONVENIENCE
)
required_notice_for_termination_label: ClassVar[str] = (
ContractFormLabels.REQUIRED_NOTICE_FOR_TERMINATION
)
currency_label: ClassVar[str] = ContractFormLabels.CURRENCY
payment_terms_label: ClassVar[str] = ContractFormLabels.PAYMENT_TERMS
payment_schedule_label: ClassVar[str] = ContractFormLabels.PAYMENT_SCHEDULE
business_contact_label: ClassVar[str] = ContractFormLabels.BUSINESS_CONTACT
managing_department_label: ClassVar[str] = ContractFormLabels.MANAGING_DEPARTMENT
funding_department_label: ClassVar[str] = ContractFormLabels.FUNDING_DEPARTMENT
project_name_label: ClassVar[str] = ContractFormLabels.PROJECT_NAME
master_project_name_label: ClassVar[str] = ContractFormLabels.MASTER_PROJECT_NAME
business_continuity_label: ClassVar[str] = ContractFormLabels.BUSINESS_CONTINUITY
customer_data_label: ClassVar[str] = ContractFormLabels.CUSTOMER_DATA
breach_notification_label: ClassVar[str] = ContractFormLabels.BREACH_NOTIFICATION
reseller_label: ClassVar[str] = ContractFormLabels.RESELLER
# Board labels
grid_view_label: ClassVar[str] = ContractBoardLabels.GRID_VIEW
chart_view_label: ClassVar[str] = ContractBoardLabels.CHART_VIEW
active_contracts_label: ClassVar[str] = ContractBoardLabels.ACTIVE_CONTRACTS
archived_contracts_label: ClassVar[str] = ContractBoardLabels.ARCHIVED_CONTRACTS
quick_filter_label: ClassVar[str] = ContractBoardLabels.QUICK_FILTER
add_row_label: ClassVar[str] = ContractBoardLabels.ADD_ROW
# Filter labels
expiring_30_days_label: ClassVar[str] = ContractFilterLabels.EXPIRING_30_DAYS
expiring_60_days_label: ClassVar[str] = ContractFilterLabels.EXPIRING_60_DAYS
expiring_90_days_label: ClassVar[str] = ContractFilterLabels.EXPIRING_90_DAYS
sep_2025_label: ClassVar[str] = ContractFilterLabels.SEP_2025
oct_2025_label: ClassVar[str] = ContractFilterLabels.OCT_2025
nov_2025_label: ClassVar[str] = ContractFilterLabels.NOV_2025
nda_label: ClassVar[str] = ContractFilterLabels.NDA
msa_label: ClassVar[str] = ContractFilterLabels.MSA
order_form_label: ClassVar[str] = ContractFilterLabels.ORDER_FORM
sow_label: ClassVar[str] = ContractFilterLabels.SOW
draft_label: ClassVar[str] = ContractFilterLabels.DRAFT
pending_approval_label: ClassVar[str] = ContractFilterLabels.PENDING_APPROVAL
ready_for_signature_label: ClassVar[str] = ContractFilterLabels.READY_FOR_SIGNATURE
pending_signature_label: ClassVar[str] = ContractFilterLabels.PENDING_SIGNATURE
direct_label: ClassVar[str] = ContractFilterLabels.DIRECT
minority_supplier_label: ClassVar[str] = ContractFilterLabels.MINORITY_SUPPLIER
preferred_label: ClassVar[str] = ContractFilterLabels.PREFERRED
sell_side_label: ClassVar[str] = ContractFilterLabels.SELL_SIDE
# Demo text
contract_type_request: ClassVar[str] = ContractTexts.CONTRACT_TYPE
contract_commodities_request: ClassVar[tuple[str, ...]] = (
ContractTexts.CONTRACT_COMMODITIES
)
supplier_contact_request: ClassVar[str] = ContractTexts.SUPPLIER_CONTACT
classification_request: ClassVar[str] = ContractTexts.CLASSIFICATION
entity_and_regions_request: ClassVar[str] = ContractTexts.ENTITY_AND_REGIONS
renewal_type_request: ClassVar[str] = ContractTexts.RENEWAL_TYPE
renewal_increase_request: ClassVar[str] = ContractTexts.RENEWAL_INCREASE
renewal_alert_days_request: ClassVar[str] = ContractTexts.RENEWAL_ALERT_DAYS
effective_date_request: ClassVar[str] = ContractTexts.EFFECTIVE_DATE
end_date_request: ClassVar[str] = ContractTexts.END_DATE
required_notice_for_nonrenewal_request: ClassVar[str] = (
ContractTexts.REQUIRED_NOTICE_FOR_NONRENEWAL
)
terminate_for_convenience_request: ClassVar[bool] = (
ContractTexts.TERMINATE_FOR_CONVENIENCE
)
required_notice_for_termination_request: ClassVar[str] = (
ContractTexts.REQUIRED_NOTICE_FOR_TERMINATION
)
currency_request: ClassVar[str] = ContractTexts.CURRENCY
payment_terms_request: ClassVar[str] = ContractTexts.PAYMENT_TERMS
total_value_request: ClassVar[str] = ContractTexts.TOTAL_VALUE
payment_schedule_request: ClassVar[str] = ContractTexts.PAYMENT_SCHEDULE
business_contact_request: ClassVar[str] = ContractTexts.BUSINESS_CONTACT
managing_department_request: ClassVar[str] = ContractTexts.MANAGING_DEPARTMENT
funding_department_request: ClassVar[str] = ContractTexts.FUNDING_DEPARTMENT
budget_request: ClassVar[str] = ContractTexts.BUDGET
project_name_request: ClassVar[str] = ContractTexts.PROJECT_NAME
master_project_name_request: ClassVar[str] = ContractTexts.MASTER_PROJECT_NAME
business_continuity_request: ClassVar[str] = ContractTexts.BUSINESS_CONTINUITY
customer_data_request: ClassVar[str] = ContractTexts.CUSTOMER_DATA
rebate_request: ClassVar[str] = ContractTexts.REBATE
saving_request: ClassVar[str] = ContractTexts.SAVING
breach_notification_request: ClassVar[str] = ContractTexts.BREACH_NOTIFICATION
reseller_request: ClassVar[str] = ContractTexts.RESELLER
class SourcingStrings:
"""Sourcing flow strings: selectors, labels, and demo text.
Provides direct access to all sourcing-related UI constants.
"""
# Selectors
supplier_search_input: ClassVar[str] = SourcingSelectors.SUPPLIER_SEARCH_INPUT
add_supplier_button: ClassVar[str] = SourcingSelectors.ADD_SUPPLIER_BUTTON
supplier_row: ClassVar[str] = SourcingSelectors.SUPPLIER_ROW
# Labels
suppliers_tab: ClassVar[str] = SourcingLabels.SUPPLIERS_TAB
add_button: ClassVar[str] = SourcingLabels.ADD_BUTTON
# Demo text
default_trio: ClassVar[list[str]] = SupplierTexts.DEFAULT_TRIO
notes: ClassVar[str] = SupplierTexts.NOTES
class NavigationStrings:
"""Navigation flow strings: selectors.
Provides direct access to navigation-related UI constants.
"""
# Selectors
global_search: ClassVar[str] = NavigationSelectors.GLOBAL_SEARCH
first_result: ClassVar[str] = NavigationSelectors.FIRST_RESULT
class AuthStrings:
"""Authentication flow strings: selectors and labels.
Provides direct access to all auth-related UI constants.
"""
# Selectors
email_input: ClassVar[str] = AuthSelectors.EMAIL_INPUT
send_code_button: ClassVar[str] = AuthSelectors.SEND_CODE_BUTTON
code_input: ClassVar[str] = AuthSelectors.CODE_INPUT
submit_button: ClassVar[str] = AuthSelectors.SUBMIT_BUTTON
logout_button: ClassVar[str] = AuthSelectors.LOGOUT_BUTTON
current_user_display: ClassVar[str] = AuthSelectors.CURRENT_USER_DISPLAY
# Labels
login_email_label: ClassVar[str] = AuthLabels.LOGIN_EMAIL_LABEL
login_send_code_button: ClassVar[str] = AuthLabels.LOGIN_SEND_CODE_BUTTON
login_verify_code_label: ClassVar[str] = AuthLabels.LOGIN_VERIFY_CODE_LABEL
logout_label: ClassVar[str] = AuthLabels.LOGOUT_LABEL
current_user_display_prefix: ClassVar[str] = AuthLabels.CURRENT_USER_DISPLAY_PREFIX
class CommonStrings:
"""Common UI strings: selectors for cross-domain elements.
Provides direct access to selectors for generic UI patterns used
across multiple domains (accordions, modals, etc.).
"""
# Selectors
page_header_accordion: ClassVar[str] = CommonSelectors.PAGE_HEADER_ACCORDION
class MessagingStrings:
"""Messaging flow strings: selectors for chat panel interactions."""
# New XPath-based selectors (primary)
notification_indicator: ClassVar[str] = MessagingSelectors.NOTIFICATION_INDICATOR
modal_wrapper: ClassVar[str] = MessagingSelectors.MODAL_WRAPPER
modal_close_button: ClassVar[str] = MessagingSelectors.MODAL_CLOSE_BUTTON
chat_messages_container: ClassVar[str] = MessagingSelectors.CHAT_MESSAGES_CONTAINER
chat_flyout_button: ClassVar[str] = MessagingSelectors.CHAT_FLYOUT_BUTTON
chat_conversations_tab: ClassVar[str] = MessagingSelectors.CHAT_CONVERSATIONS_TAB
# Legacy selectors (fallback)
chat_button: ClassVar[str] = MessagingSelectors.CHAT_BUTTON
chat_panel: ClassVar[str] = MessagingSelectors.CHAT_PANEL
chat_input: ClassVar[str] = MessagingSelectors.CHAT_INPUT
send_button: ClassVar[str] = MessagingSelectors.SEND_BUTTON
class AppStrings:
"""Root registry for all application strings.
Provides hierarchical, type-safe access to selectors, labels, and demo texts.
Each namespace (intake, contract, sourcing, navigation, auth, common) exposes nested classes.
GraphQL queries are maintained separately in raindrop/generated/queries.py
and loaded from .graphql files for better maintainability.
"""
intake: ClassVar[type[IntakeStrings]] = IntakeStrings
contract: ClassVar[type[ContractStrings]] = ContractStrings
sourcing: ClassVar[type[SourcingStrings]] = SourcingStrings
navigation: ClassVar[type[NavigationStrings]] = NavigationStrings
auth: ClassVar[type[AuthStrings]] = AuthStrings
common: ClassVar[type[CommonStrings]] = CommonStrings
messaging: ClassVar[type[MessagingStrings]] = MessagingStrings
# Module-level instance for convenience
app_strings = AppStrings()
__all__ = [
"AppStrings",
"app_strings",
"IntakeStrings",
"ContractStrings",
"SourcingStrings",
"NavigationStrings",
"AuthStrings",
"CommonStrings",
"MessagingStrings",
]

View File

@@ -146,32 +146,32 @@ async def test_sourcing_request_field_selectors_match_html(
action_context: ActionContext,
) -> None:
"""Verify selectors match the HTML structure in docs/sourcing_intake.md."""
from guide.app.strings.registry import app_strings
from guide.app.strings.selectors.intake import IntakeSelectors
action = FillSourcingRequestAction()
await action.run(mock_page_with_helpers, action_context)
# Validate critical selectors used (from app_strings.intake)
# Validate critical selectors used (from IntakeSelectors)
_ = action_context.params # Unused but validates structure
# These selectors must match docs/sourcing_intake.md HTML structure
expected_selectors = {
"commodity_field": '[data-cy="board-item-field-commodities-commodity"]',
"planned_field": '[data-cy="board-item-field-menu-planned"]',
"regions_field": '[data-cy="board-item-field-regions-f32"]',
"opex_capex_field": '[data-cy="board-item-field-menu-opex_capex"]',
"target_date_field": 'input[name="target_date"]',
"description_textarea": 'textarea[name="description"]',
"reseller_textarea": 'textarea[name="reseller"]',
"desired_supplier_name_textarea": 'textarea[name="f45"]',
"desired_supplier_contact_textarea": 'textarea[name="f46"]',
"entity_field": '[data-cy="board-item-field-entity-f31"]',
"COMMODITY_FIELD": '[data-cy="board-item-field-commodities-commodity"]',
"PLANNED_FIELD": '[data-cy="board-item-field-menu-planned"]',
"REGIONS_FIELD": '[data-cy="board-item-field-regions-f32"]',
"OPEX_CAPEX_FIELD": '[data-cy="board-item-field-menu-opex_capex"]',
"TARGET_DATE_FIELD": 'input[name="target_date"]',
"DESCRIPTION_TEXTAREA": 'textarea[name="description"]',
"RESELLER_TEXTAREA": 'textarea[name="reseller"]',
"DESIRED_SUPPLIER_NAME_TEXTAREA": 'textarea[name="f45"]',
"DESIRED_SUPPLIER_CONTACT_TEXTAREA": 'textarea[name="f46"]',
"ENTITY_FIELD": '[data-cy="board-item-field-entity-f31"]',
}
# Verify selectors from strings registry match expected values
# Verify selectors from IntakeSelectors match expected values
for field, expected_selector in expected_selectors.items():
actual_selector = getattr(app_strings.intake, field)
actual_selector = getattr(IntakeSelectors, field)
assert actual_selector == expected_selector, (
f"Selector mismatch for {field}: "
f"expected '{expected_selector}', got '{actual_selector}'"

View File

@@ -1,74 +1,72 @@
"""Unit tests for strings registry access patterns."""
"""Unit tests for strings selector access patterns."""
import pytest
class TestStringsRegistryAccess:
"""Test string registry access patterns."""
class TestIntakeSelectorsAccess:
"""Test intake selector access patterns."""
def test_registry_initializes(self) -> None:
"""Test that AppStrings registry initializes without errors."""
from guide.app.strings.registry import AppStrings
def test_intake_selectors_initializes(self) -> None:
"""Test that IntakeSelectors class initializes without errors."""
from guide.app.strings.selectors.intake import IntakeSelectors
app_strings = AppStrings()
assert app_strings is not None
def test_intake_module_exists(self) -> None:
"""Test that intake module is accessible in registry."""
from guide.app.strings.registry import AppStrings
app_strings = AppStrings()
assert hasattr(app_strings, "intake")
assert IntakeSelectors is not None
def test_intake_selectors_accessible(self) -> None:
"""Test that intake selectors are directly accessible."""
from guide.app.strings.registry import AppStrings
from guide.app.strings.selectors.intake import IntakeSelectors
app_strings = AppStrings()
intake = app_strings.intake
assert intake is not None
# Verify flattened access pattern (no nested .selectors wrapper)
assert hasattr(intake, "description_field")
assert hasattr(intake, "next_button")
def test_intake_labels_accessible(self) -> None:
"""Test that intake labels are directly accessible."""
from guide.app.strings.registry import AppStrings
app_strings = AppStrings()
intake = app_strings.intake
assert intake is not None
# Verify flattened access pattern (no nested .labels wrapper)
assert hasattr(intake, "description_placeholder")
# Verify direct UPPERCASE access pattern
assert hasattr(IntakeSelectors, "DESCRIPTION_FIELD")
assert hasattr(IntakeSelectors, "NEXT_BUTTON")
def test_string_values_are_non_empty(self) -> None:
"""Test that string values are non-empty and accessible."""
from guide.app.strings.registry import AppStrings
from guide.app.strings.selectors.intake import IntakeSelectors
app_strings = AppStrings()
intake = app_strings.intake
# Access selectors
description_field = intake.description_field
description_field = IntakeSelectors.DESCRIPTION_FIELD
assert isinstance(description_field, str)
assert len(description_field) > 0
def test_registry_has_multiple_modules(self) -> None:
"""Test that registry contains multiple string modules."""
from guide.app.strings.registry import AppStrings
app_strings = AppStrings()
# Check for main module sections
assert hasattr(app_strings, "intake")
assert hasattr(app_strings, "sourcing")
@pytest.mark.parametrize(
"field_name",
["description_field", "description_placeholder", "next_button"],
["DESCRIPTION_FIELD", "NEXT_BUTTON", "COMMODITY_FIELD"],
)
def test_intake_fields_exist(self, field_name: str) -> None:
"""Test that expected intake fields exist in registry."""
from guide.app.strings.registry import AppStrings
"""Test that expected intake fields exist in IntakeSelectors."""
from guide.app.strings.selectors.intake import IntakeSelectors
app_strings = AppStrings()
intake = app_strings.intake
assert hasattr(intake, field_name), f"Field {field_name} not found in intake"
assert hasattr(IntakeSelectors, field_name), f"Field {field_name} not found"
class TestContractSelectorsAccess:
"""Test contract selector access patterns."""
def test_contract_selectors_accessible(self) -> None:
"""Test that contract selectors are directly accessible."""
from guide.app.strings.selectors.contract import ContractFormSelectors
assert hasattr(ContractFormSelectors, "CONTRACT_TYPE_FIELD")
assert hasattr(ContractFormSelectors, "SAVE_BUTTON")
class TestAuthSelectorsAccess:
"""Test auth selector access patterns."""
def test_auth_selectors_accessible(self) -> None:
"""Test that auth selectors are directly accessible."""
from guide.app.strings.selectors.auth import AuthSelectors
assert hasattr(AuthSelectors, "EMAIL_INPUT")
assert hasattr(AuthSelectors, "LOGOUT_BUTTON")
class TestMessagingSelectorsAccess:
"""Test messaging selector access patterns."""
def test_messaging_selectors_accessible(self) -> None:
"""Test that messaging selectors are directly accessible."""
from guide.app.strings.selectors.messaging import MessagingSelectors
assert hasattr(MessagingSelectors, "CHAT_INPUT")
assert hasattr(MessagingSelectors, "SEND_BUTTON")