Implement Semantic Form Discovery Plan and Refactor Browser Elements

- Introduced a comprehensive implementation plan for semantic form discovery, addressing architectural gaps for LLM-driven contextual form filling.
- Created new modules for type guards and context building, enhancing schema-DOM reconciliation.
- Consolidated existing utilities to reduce code duplication and improve maintainability.
- Updated dropdown handling to support schema-aware selections and improved field inference logic.
- Enhanced diagnostics and browser element interactions, including ARIA label resolution and role-based detection.
- Removed outdated regex functions and tests, streamlining the codebase.
- Added new integration tests for the updated form extraction and interaction capabilities.
This commit is contained in:
2025-12-06 02:44:29 +00:00
parent ad14cfa366
commit 9e2a9bf499
21 changed files with 2385 additions and 1540 deletions

View File

@@ -13,6 +13,9 @@ hosts:
support-extension:
kind: extension
port: 17374
headless-local:
kind: headless
browserless-cdp:
kind: cdp
host: browserless.lab # goes through Traefik
port: 80 # Traefik web entrypoint
browser: chromium

985
plan.md Normal file
View File

@@ -0,0 +1,985 @@
# Semantic Form Discovery Implementation Plan
## Overview
This plan addresses the architectural gaps identified in `docs/spec.md` that prevent reliable LLM-driven contextual form filling. The core problem is the lack of a semantic bridge between the GraphQL schema (which knows field definitions) and the live DOM (which requires interaction).
## Problem Summary
| Bug | Location | Root Cause | Impact |
|-----|----------|------------|--------|
| **A** | `form_discovery.py` | Regex HTML parsing fails on React/MUI | Fields return "unknown" type |
| **B** | `diagnostics.py` | Race conditions on dropdown inspection | Zero options for typeahead fields |
| **C** | `field_inference.py` | CSS class-based detection breaks in prod builds | Incorrect helper selection |
## Architecture Goals
1. **Replace Regex with DOM APIs** - Use browser's accessibility tree via `page.evaluate()`
2. **Bridge Schema to DOM** - Map GraphQL `FieldDef` to live DOM elements by `data-cy` + label matching
3. **Use Schema for Choices** - Don't open dropdowns to discover options; use GraphQL schema
4. **Harden Field Inference** - Use W3C roles (`role="combobox"`) instead of MUI class names
---
## Pre-Implementation: Code Reuse Audit Results
Before implementing, we identified significant code reuse opportunities:
### Already Exists (Import, Don't Recreate)
| Component | Location | Reuse Strategy |
|-----------|----------|----------------|
| `FieldDef`, `FormSchema`, `FieldType`, `FieldChoice` | `form_schema.py` | Import into `context_builder.py` |
| `FieldInfo`, `InputInfo` dataclasses | `diagnostics.py:62-89` | Use as base for `FieldContext` |
| `_JS_FIELD_CHECK`, `_JS_INPUT_CHECK` JS templates | `diagnostics.py:415-465` | Extend for semantic extraction |
| `escape_selector()` | `mui.py:41-52` | Canonical location (2 files duplicate it) |
| `extract_form_field_metadata()` | `form_discovery.py:82-140` | Enhance with ARIA fallbacks |
| Role-based detection (`role="combobox"`) | `form_discovery.py:100,164` | Already using W3C roles! |
| Listbox resolution via `aria-controls`/`aria-owns` | `dropdown/_helpers.py:43,171` | Extract to shared helper |
### Duplication to Consolidate
| Duplicated Code | Files | Action |
|-----------------|-------|--------|
| Type guards (`_is_dict_str_object`, `_get_str_from_dict`, etc.) | `form_discovery.py:17-42`, `field_inference.py:16-35` | Create `_type_guards.py` |
| `_escape_selector()` | `diagnostics.py:410`, `extension_client.py:85` | Import from `mui.py` |
| `aria-controls` or `aria-owns` pattern | 4+ locations in dropdown modules | Extract to `_helpers.py` |
| `_infer_field_type_from_*` logic | `form_discovery.py:394-427`, `field_inference.py:46-77` | Consolidate in `field_inference.py` |
### Key Insight: Role-Based Detection Already Partially Exists
The codebase already uses W3C roles correctly in several places:
- `form_discovery.py:100` uses `[role="combobox"]`
- `dropdown/_helpers.py` uses `[role="listbox"]`, `[role="option"]`
- `diagnostics.py` has extensive role-based inspection
**The plan should enhance existing role-based patterns, not replace them.**
---
## Phase 0: Consolidate Shared Utilities (Prerequisites)
**Goal:** Eliminate duplication before adding new code.
### 0.1 Create Type Guards Module
**File:** `src/guide/app/browser/elements/_type_guards.py`
```python
"""Shared type guards and dict extraction utilities."""
from typing import TypeGuard
def is_dict_str_object(obj: object) -> TypeGuard[dict[str, object]]:
"""Type guard to check if object is dict[str, object]."""
return isinstance(obj, dict)
def is_list_of_objects(obj: object) -> TypeGuard[list[object]]:
"""Type guard to check if object is a list."""
return isinstance(obj, list)
def get_str_from_dict(d: dict[str, object], key: str, default: str = "") -> str:
"""Safely extract string value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else default
def get_str_or_none_from_dict(d: dict[str, object], key: str) -> str | None:
"""Safely extract string or None value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else None
def get_bool_from_dict(d: dict[str, object], key: str, default: bool = False) -> bool:
"""Safely extract bool value from dict."""
val = d.get(key)
return bool(val) if isinstance(val, bool) else default
```
### 0.2 Update Files to Import from Central Location
**Files to update:**
- `form_discovery.py` - Delete lines 17-42, import from `_type_guards`
- `field_inference.py` - Delete lines 16-35, import from `_type_guards`
### 0.3 Unify Selector Escaping
**Files to update:**
- `diagnostics.py:410-412` - Replace `_escape_selector()` with import from `mui.py`
- `extension_client.py:85+` - Replace method with import from `mui.py`
### 0.4 Extract Listbox ID Helper
**File:** `src/guide/app/browser/elements/dropdown/_helpers.py`
**Add function:**
```python
async def get_listbox_id(page: PageLike, selector: str) -> str | None:
"""Get listbox ID from aria-controls or aria-owns attributes."""
return await page.evaluate(f"""
(() => {{
const el = document.querySelector('{escape_selector(selector)}');
const input = el?.querySelector('input') || el;
return input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns') || null;
}})()
""")
```
### 0.5 Deliverables
- [ ] `_type_guards.py` with consolidated utilities
- [ ] Updated imports in `form_discovery.py` and `field_inference.py`
- [ ] Unified `escape_selector()` usage across codebase
- [ ] `get_listbox_id()` helper in `_helpers.py`
- [ ] All existing tests still pass
---
## Phase 1: Semantic Field Extraction (Enhance Existing)
**Goal:** Enhance `extract_form_field_metadata()` with ARIA-based label resolution.
### 1.1 Enhance Existing Form Discovery Module
**File:** `src/guide/app/browser/elements/form_discovery.py`
**Strategy:** Enhance existing `extract_form_field_metadata()` rather than creating new module.
**New JavaScript Evaluation (replace existing JS in lines 92-115):**
```javascript
((selector) => {
const field = document.querySelector(selector);
if (!field) return null;
const input = field.querySelector('input, textarea, [role="combobox"]');
const autocomplete = field.querySelector('.MuiAutocomplete-root');
const select = field.querySelector('[role="combobox"]');
// Priority-based label resolution
let label = "";
// 1. Direct label association (HTMLInputElement.labels)
if (input && input.labels && input.labels.length > 0) {
label = input.labels[0].innerText;
}
// 2. ARIA label
else if (input?.hasAttribute('aria-label')) {
label = input.getAttribute('aria-label');
}
// 3. ARIA labelledby
else if (input?.hasAttribute('aria-labelledby')) {
const labelId = input.getAttribute('aria-labelledby');
const labelEl = document.getElementById(labelId);
if (labelEl) label = labelEl.innerText;
}
// 4. MUI FormControl fallback
else {
const formControl = field.closest('.MuiFormControl-root') || field;
const labelEl = formControl.querySelector('label');
if (labelEl) label = labelEl.textContent;
}
// Determine type (preserve fields for _infer_field_type_from_metadata)
let type = 'unknown';
if (autocomplete) type = 'autocomplete';
else if (select) type = 'select';
else if (input) type = input.getAttribute('type') || 'text';
return {
data_cy: field.getAttribute('data-cy'),
label: (label || '').trim(),
// PRESERVED: Fields required by _infer_field_type_from_metadata
type: type,
has_autocomplete: !!autocomplete,
has_select: !!select,
input_type: input ? input.getAttribute('type') : null,
// NEW: Role-based fields for enhanced inference
role: input?.getAttribute('role'),
aria_controls: input?.getAttribute('aria-controls'),
aria_owns: input?.getAttribute('aria-owns'),
// Existing fields
required: input ? input.hasAttribute('required') : false,
disabled: field.classList.contains('Mui-disabled'),
};
})(selector)
```
**Key Point:** Preserves all fields used by `_infer_field_type_from_metadata()` while adding accessible name resolution and ARIA attributes.
### 1.2 Add Accessible Name Extraction Helper
**File:** `src/guide/app/browser/elements/form_discovery.py`
**New function with full implementation:**
```python
_JS_ACCESSIBLE_NAME = """
((selector) => {
const field = document.querySelector(selector);
if (!field) return null;
const input = field.querySelector('input, textarea, [role="combobox"]');
if (!input) return null;
// Priority 1: Direct label association
if (input.labels && input.labels.length > 0) {
return input.labels[0].innerText.trim();
}
// Priority 2: aria-label
const ariaLabel = input.getAttribute('aria-label');
if (ariaLabel) return ariaLabel.trim();
// Priority 3: aria-labelledby
const labelledBy = input.getAttribute('aria-labelledby');
if (labelledBy) {
const labelEl = document.getElementById(labelledBy);
if (labelEl) return labelEl.innerText.trim();
}
// Priority 4: MUI FormControl fallback
const formControl = field.closest('.MuiFormControl-root') || field;
const labelEl = formControl.querySelector('label');
if (labelEl) return labelEl.textContent.trim();
return null;
})
"""
async def extract_accessible_name(page: PageLike, selector: str) -> str | None:
"""Extract computed accessible name for a form field.
Resolution order:
1. input.labels[0].innerText (native label association)
2. aria-label attribute
3. aria-labelledby -> getElementById
4. MUI FormControl fallback
Args:
page: Browser page instance
selector: CSS selector for the field container
Returns:
Accessible name string or None if not found
"""
from guide.app.browser.elements.mui import escape_selector
escaped = escape_selector(selector)
result = await page.evaluate(f"{_JS_ACCESSIBLE_NAME}('{escaped}')")
return str(result) if isinstance(result, str) else None
```
**Integration points:**
- Used by `build_form_context()` in Phase 2 for label matching
- Can be called standalone for field diagnostics
### 1.3 Remove Dead Regex Functions
**Analysis:** These functions are not used in production code and tests reimplemented the logic locally.
**Delete from `form_discovery.py`:**
- `extract_field_from_html()` (line 285) - unused
- `extract_all_fields_from_html()` (line 334) - unused
- `_infer_field_type_from_html()` (line 364) - only called by above
**Update `browser/elements/__init__.py`:**
- Remove exports for deleted functions (lines 55-56, 111-112)
**Replace (not delete) test file:**
- `tests/integration/browser/test_form_extraction_from_html.py` -> `tests/unit/test_accessible_name.py`
- New tests exercise DOM-eval accessible name resolution using offline HTML fixtures
- Preserves regression coverage for label extraction edge cases
**Test replacement approach:**
```python
# tests/unit/test_accessible_name.py
import pytest
from unittest.mock import AsyncMock
class TestAccessibleNameExtraction:
"""Test accessible name resolution priority order."""
@pytest.fixture
def mock_page(self):
"""Mock PageLike that returns predefined evaluate results."""
page = AsyncMock()
return page
async def test_label_association_priority(self, mock_page):
"""Native label association takes priority over aria-label."""
mock_page.evaluate.return_value = "Native Label"
result = await extract_accessible_name(mock_page, "[data-cy='test']")
assert result == "Native Label"
async def test_aria_label_fallback(self, mock_page):
"""aria-label used when no native label exists."""
# ... test with fixture returning aria-label value
async def test_aria_labelledby_resolution(self, mock_page):
"""aria-labelledby resolves to referenced element."""
# ... test with fixture
async def test_mui_formcontrol_fallback(self, mock_page):
"""MUI FormControl label used as last resort."""
# ... test with fixture
```
**Net code reduction:** ~120 lines removed (regex functions) + ~300 lines test file rewritten to ~80 lines
### 1.4 Deliverables
- [ ] Enhanced JS evaluation in `extract_form_field_metadata()`
- [ ] New `extract_accessible_name()` function with full implementation
- [ ] Delete `extract_field_from_html()`, `extract_all_fields_from_html()`, `_infer_field_type_from_html()`
- [ ] Update `__init__.py` exports
- [ ] Replace `test_form_extraction_from_html.py` with `test_accessible_name.py`
- [ ] Unit tests for ARIA label resolution priority order
- [ ] All remaining tests still pass
---
## Phase 2: Schema-DOM Bridge
**Goal:** Create reconciliation layer that maps GraphQL `FormSchema` to live DOM fields.
### 2.1 Create Context Builder Module
**File:** `src/guide/app/browser/context_builder.py`
**Purpose:** Generate LLM-consumable context combining schema + DOM state.
**Key imports (reuse existing):**
```python
from guide.app.raindrop.operations.form_schema import (
FormSchema, FieldDef, FieldType, FieldChoice
)
from guide.app.browser.diagnostics import FieldInfo, InputInfo
from guide.app.browser.elements.form_discovery import (
extract_all_form_fields, extract_accessible_name, FormField
)
```
**Key Data Structures:**
```python
from guide.app.browser.elements.field_inference import HelperFunction
@dataclass(frozen=True)
class FieldContext:
"""Merged schema + DOM context for a single field."""
field_key: str # e.g., "f19" (from schema)
label: str # "Estimated Value" (from schema)
schema_type: FieldType # Original schema type (menu, user, etc.)
ui_type: str # Automation type (select, autocomplete, text)
helper: HelperFunction # Exact helper function to use
dom_selector: str | None # CSS selector if DOM match found
dom_label: str | None # Label from DOM (for verification)
current_value: str | None # From DOM
is_required: bool # From schema
is_disabled: bool # From DOM
allowed_values: tuple[str, ...] | None # For menu fields (from schema)
@dataclass(frozen=True)
class FormContext:
"""Complete form context for LLM consumption."""
entity_type: str
entity_id: int | str
entity_name: str
fields: dict[str, FieldContext]
unmatched_dom_fields: tuple[str, ...] # DOM fields without schema match
unmatched_schema_fields: tuple[str, ...] # Schema fields without DOM match
```
**Note:** `schema_type`, `ui_type`, and `helper` are populated using the mapping table defined in Phase 5.
### 2.2 Matching Algorithm
```python
async def build_form_context(
page: PageLike,
schema: FormSchema,
) -> FormContext:
"""Build merged context from schema and live DOM."""
```
**Matching Strategy (priority order):**
1. **Exact `data-cy` match** - `data-cy="board-item-field-{type}-{field_key}"`
2. **Field key in `data-cy`** - Schema field name appears in `data-cy` attribute
3. **Label text match** - Schema label matches DOM accessible name (case-insensitive)
4. **Fuzzy label match** - Normalized string comparison for typo tolerance
### 2.3 Integration (Layering-Safe)
**IMPORTANT:** Do NOT add browser dependencies to `raindrop/operations/form_schema.py`. That would violate the project's separation between data/GraphQL and Playwright code.
**Correct approach:** Keep `get_form_context()` in `browser/context_builder.py` and have callers pass in a `FormSchema`:
**File:** `src/guide/app/browser/context_builder.py`
```python
async def get_form_context_from_schema(
page: PageLike,
schema: FormSchema,
) -> FormContext:
"""Build form context from pre-fetched schema.
This is the primary API. Callers fetch the schema separately
using raindrop/operations/form_schema.py, then pass it here.
Args:
page: Browser page instance
schema: Pre-fetched FormSchema from GraphQL
Returns:
FormContext with merged schema + DOM data
"""
return await build_form_context(page, schema)
```
**Usage pattern in actions:**
```python
# In action code (correct layering)
from guide.app.raindrop.operations.form_schema import get_board_schema
from guide.app.browser.context_builder import get_form_context_from_schema
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
# Data layer: fetch schema
schema = await get_board_schema(graphql_url, token, board_id)
# Browser layer: build context
form_context = await get_form_context_from_schema(page, schema)
# Use context for LLM-driven filling
llm_prompt = format_for_llm(form_context)
```
**Layer diagram:**
```
┌─────────────────────────────────────────────────────────┐
│ Action Layer (actions/) │
│ - Orchestrates data + browser operations │
│ - Calls both GraphQL ops and browser helpers │
└─────────────────────────┬───────────────────────────────┘
┌─────────────────┴─────────────────┐
│ │
v v
┌───────────────────┐ ┌───────────────────────┐
│ Data Layer │ │ Browser Layer │
│ (raindrop/) │ │ (browser/) │
│ - GraphQL ops │ │ - context_builder.py │
│ - form_schema.py │ │ - form_discovery.py │
│ - NO Playwright │ │ - PageLike ops │
└───────────────────┘ └───────────────────────┘
```
### 2.4 Deliverables
- [ ] `context_builder.py` with `FormContext` and `FieldContext` models
- [ ] `build_form_context()` function with matching algorithm
- [ ] `get_form_context_from_schema()` convenience function in browser layer (NOT raindrop/)
- [ ] Unit tests for matching logic (exact match, label match, fuzzy match)
- [ ] Integration test: build context for demo board form
---
## Phase 3: Dropdown Choice Resolution
**Goal:** Use GraphQL schema for dropdown choices instead of opening dropdowns.
### 3.1 Schema Choices (Already in Phase 2)
`FieldContext.allowed_values` populated from `FieldDef.choices`.
### 3.2 Add Schema-Aware Selection
**File:** `src/guide/app/browser/elements/dropdown/__init__.py`
**New Function:**
```python
async def select_from_schema(
page: PageLike,
selector: str,
value: str,
field_def: FieldDef | None = None,
*,
strict_validation: bool = False,
) -> DropdownResult:
"""Select value with optional schema validation.
Args:
page: Browser page instance
selector: CSS selector for the dropdown field
value: Value to select
field_def: Optional FieldDef for validation
strict_validation: If True, raise on invalid choices. Default False.
Returns:
DropdownResult with validation info including:
- validated: bool - whether value was validated against schema
- validation_warning: str | None - warning if value not in choices
Note:
Dynamic fields (USER, SUPPLIER, RELATIONSHIP, CONTRACTS) have empty
choices in schema since options are API-driven. For these fields,
validation is skipped and a diagnostic is surfaced, not a blocker.
"""
# Check if field has static choices
has_static_choices = (
field_def is not None
and field_def.choices
and field_def.field_type not in (
FieldType.USER,
FieldType.SUPPLIER,
FieldType.RELATIONSHIP,
FieldType.CONTRACTS,
)
)
validation_warning: str | None = None
if has_static_choices:
# Validate against both choice.text and choice.value
valid_texts = {c.text for c in field_def.choices}
valid_values = {c.value for c in field_def.choices if c.value}
all_valid = valid_texts | valid_values
if value not in all_valid:
validation_warning = (
f"Value '{value}' not in schema choices: {sorted(valid_texts)}"
)
if strict_validation:
raise ValueError(validation_warning)
# Proceed with selection regardless of validation
result = await select_single(page, selector, value)
# Augment result with validation info
return {
**result,
"validated": has_static_choices and validation_warning is None,
"validation_warning": validation_warning,
}
```
**Key design decisions:**
1. **No hard fail on missing choices** - Dynamic fields don't enumerate options
2. **Match both text and value** - Schema choices have both `text` and `value` fields
3. **Surface diagnostics, don't block** - Validation warnings in result, not exceptions
4. **Explicit opt-in for strict mode** - `strict_validation=True` for known static fields
### 3.3 Update Typeahead Handler
**File:** `src/guide/app/browser/elements/dropdown/typeahead.py`
**Problem:** Typeahead fields show zero options until user types.
**Solution:** Don't inspect options before typing.
**Changes:**
```python
async def select_typeahead(
page: PageLike,
selector: str,
value: str,
*,
min_chars: int = 3, # Minimum chars before options appear
wait_for_options: bool = True,
option_wait_ms: int = 500, # Wait time after typing
) -> DropdownResult:
```
### 3.4 Deliverables
- [ ] `select_from_schema()` function in dropdown module
- [ ] Updated `select_typeahead()` with better wait logic
- [ ] Remove option inspection from `inspect_dropdown()` for typeahead fields
- [ ] Unit tests for schema-aware selection
---
## Phase 4: Harden Field Inference
**Goal:** Prioritize W3C role-based inference while keeping MUI class fallbacks for robustness.
### 4.1 Update Field Inference Module
**File:** `src/guide/app/browser/elements/field_inference.py`
**Current Problem (lines 119-122):**
```python
# Relies solely on class names which break in production MUI builds
if any("MuiAutocomplete-root" in c for c in classes):
return "autocomplete"
```
**New JS Template (enhanced, not replaced):**
```javascript
((selector) => {
const el = document.querySelector(selector);
if (!el) return null;
const input = el.querySelector('input') || el;
const autocomplete = el.querySelector('.MuiAutocomplete-root');
const selectRoot = el.querySelector('.MuiSelect-root');
return {
tag_name: el.tagName.toLowerCase(),
// PRIMARY: Role-based detection (W3C standard)
role: el.getAttribute('role'),
input_role: input.getAttribute('role'),
aria_controls: input.getAttribute('aria-controls'),
aria_owns: input.getAttribute('aria-owns'),
aria_expanded: input.getAttribute('aria-expanded'),
aria_haspopup: input.getAttribute('aria-haspopup'),
// FALLBACK: MUI class detection (for pages missing ARIA)
has_autocomplete_class: !!autocomplete,
has_select_class: !!selectRoot,
// Existing
type_attr: input.getAttribute('type'),
data_cy: el.getAttribute('data-cy') || el.closest('[data-cy]')?.getAttribute('data-cy'),
};
})
```
### 4.2 Update Inference Logic (Role-First with Class Fallback)
**New approach: Check roles first, fall back to classes:**
```python
async def infer_type_from_element(page: PageLike, selector: str) -> str:
# ... evaluate JS ...
# PRIMARY: Role-based detection (most reliable)
input_role = get_str_from_dict(result, "input_role")
if input_role == "combobox":
return "autocomplete"
# Check for ARIA popup indicators
aria_controls = get_str_from_dict(result, "aria_controls")
aria_owns = get_str_from_dict(result, "aria_owns")
if aria_controls or aria_owns:
return "autocomplete"
# FALLBACK: MUI class detection (for pages without proper ARIA)
# Guarded - only used when role detection fails
has_autocomplete_class = get_bool_from_dict(result, "has_autocomplete_class")
has_select_class = get_bool_from_dict(result, "has_select_class")
if has_autocomplete_class:
return "autocomplete"
if has_select_class:
return "select"
# Check input type
type_attr = get_str_from_dict(result, "type_attr")
if type_attr == "number":
return "number"
# ... etc
return "text" # Safe default
```
### 4.3 Robustness Strategy
**Keep dual detection for reliability:**
| Priority | Detection Method | Reliability | Notes |
|----------|------------------|-------------|-------|
| 1 | `role="combobox"` | High | W3C standard, works in all builds |
| 2 | `aria-controls`/`aria-owns` | High | Indicates popup relationship |
| 3 | `.MuiAutocomplete-root` class | Medium | Fallback for pages missing ARIA |
| 4 | `.MuiSelect-root` class | Medium | Fallback for custom builds |
**Why keep class fallbacks:**
- Some internal pages may not have proper ARIA roles
- Custom MUI themes might disable role attributes
- Graceful degradation > hard failures
### 4.4 Deliverables
- [ ] Updated `infer_type_from_element()` with role-first, class-fallback detection
- [ ] New JS evaluation template with both role and class detection
- [ ] Unit tests for role-based inference
- [ ] Unit tests for class fallback when roles missing
- [ ] Integration test: infer types from production-build page
---
## Phase 5: LLM Context Generation
**Goal:** Generate structured JSON context for LLM form filling with proper type alignment.
### 5.1 Type Mapping Table
**Problem:** Schema `FieldType` values (menu, user, contracts) don't align with UI helper types (select, autocomplete) used by `select_helper_for_type()`.
**Solution:** Add explicit mapping in `context_builder.py`:
```python
from guide.app.browser.elements.field_inference import HelperFunction, select_helper_for_type
# Mapping from schema FieldType to UI automation type
_SCHEMA_TO_UI_TYPE: dict[FieldType, str] = {
FieldType.TEXT: "text",
FieldType.TEXTAREA: "textarea",
FieldType.MENU: "select",
FieldType.NUMBER: "number",
FieldType.DATE: "date",
FieldType.CHECKBOX: "checkbox",
FieldType.USER: "autocomplete",
FieldType.SUPPLIER: "autocomplete",
FieldType.COMMODITY: "autocomplete",
FieldType.DEPARTMENT: "autocomplete",
FieldType.CONTRACTS: "autocomplete",
FieldType.RELATIONSHIP: "autocomplete",
FieldType.ATTACHMENT: "file",
FieldType.UNKNOWN: "text",
}
def get_ui_type(field_type: FieldType) -> str:
"""Map schema field type to UI automation type."""
return _SCHEMA_TO_UI_TYPE.get(field_type, "text")
def get_helper_for_field(field_type: FieldType) -> HelperFunction:
"""Get automation helper function for field type."""
ui_type = get_ui_type(field_type)
return select_helper_for_type(ui_type)
```
### 5.2 Enhanced FieldContext
**Update `FieldContext` to include both types:**
```python
@dataclass(frozen=True)
class FieldContext:
"""Merged schema + DOM context for a single field."""
field_key: str
label: str
schema_type: FieldType # Original schema type (menu, user, etc.)
ui_type: str # Automation type (select, autocomplete, text)
helper: HelperFunction # Exact helper function to use
dom_selector: str | None
dom_label: str | None
current_value: str | None
is_required: bool
is_disabled: bool
allowed_values: tuple[str, ...] | None
```
### 5.3 Enhanced Formatter
**File:** `src/guide/app/browser/context_builder.py`
```python
def format_for_llm(context: FormContext) -> dict[str, dict[str, object]]:
"""Format form context as LLM-consumable JSON.
Returns dict keyed by field_key with:
- label: Display name
- schema_type: Original field type from schema
- ui_type: Type for automation (aligns with select_helper_for_type)
- helper: Exact helper function to call
- selector: CSS selector for interaction
- current_value: Current value if any
- is_required: Whether field is required
- allowed_values: Valid choices for menu fields
"""
return {
field_key: {
"label": field.label,
"schema_type": field.schema_type.value, # e.g., "menu", "user"
"ui_type": field.ui_type, # e.g., "select", "autocomplete"
"helper": field.helper, # e.g., "select_single", "select_combobox"
"selector": field.dom_selector,
"current_value": field.current_value,
"is_required": field.is_required,
"allowed_values": list(field.allowed_values) if field.allowed_values else None,
}
for field_key, field in context.fields.items()
if field.dom_selector # Only include fields we can interact with
}
```
### 5.4 Example Output
```json
{
"f19": {
"label": "Estimated Value",
"schema_type": "number",
"ui_type": "number",
"helper": "fill_with_react_events",
"selector": "[data-cy='board-item-field-number-f19'] input",
"current_value": null,
"is_required": true,
"allowed_values": null
},
"status": {
"label": "Status",
"schema_type": "menu",
"ui_type": "select",
"helper": "select_single",
"selector": "[data-cy='board-item-field-menu-status']",
"current_value": "Draft",
"is_required": false,
"allowed_values": ["Draft", "Active", "Archived"]
},
"assigned_to": {
"label": "Assigned To",
"schema_type": "user",
"ui_type": "autocomplete",
"helper": "select_combobox",
"selector": "[data-cy='board-item-field-user-assigned_to']",
"current_value": null,
"is_required": false,
"allowed_values": null
}
}
```
### 5.5 Deliverables
- [ ] `_SCHEMA_TO_UI_TYPE` mapping table in `context_builder.py`
- [ ] `get_ui_type()` and `get_helper_for_field()` functions
- [ ] Enhanced `FieldContext` with `schema_type`, `ui_type`, `helper` fields
- [ ] Updated `format_for_llm()` with all type fields
- [ ] Unit tests for type mapping
- [ ] Integration with form automation actions
---
## Phase 6: Testing & Validation
### 6.1 Unit Tests
| Test File | Coverage |
|-----------|----------|
| `test_type_guards.py` | Consolidated type guards |
| `test_semantic_discovery.py` | ARIA label extraction |
| `test_context_builder.py` | Schema-DOM matching |
| `test_field_inference_roles.py` | Role-based type inference |
### 6.2 Integration Tests
| Test | Purpose |
|------|---------|
| `test_board_form_context.py` | Build context for real board form |
| `test_inference_production.py` | Test inference on minified MUI build |
### 6.3 Regression Tests
- All 28+ existing tests must pass
- Deprecated functions work with warnings
---
## Implementation Order
```
Phase 0: Consolidation (1 day)
|-- _type_guards.py
|-- Unify escape_selector imports
|-- get_listbox_id helper
Phase 1: Semantic Extraction (1 day) Phase 4: Field Inference (1 day)
|-- Enhance extract_form_field_metadata |-- Role-based JS template
|-- Add extract_accessible_name |-- Remove MUI class checks
|-- Deprecate regex functions |-- Update inference logic
| |
v v
Phase 2: Context Builder (2 days)
|-- FieldContext, FormContext models
|-- build_form_context() matching
|-- get_form_context() integration
|
v
Phase 3: Dropdown (1 day) Phase 5: LLM Format (0.5 day)
|-- select_from_schema() |-- format_for_llm()
|-- Update typeahead |
| |
v v
Phase 6: Testing (1 day)
|-- Unit tests
|-- Integration tests
```
---
## File Summary
### New Files (Reduced from original plan)
| File | Purpose |
|------|---------|
| `browser/elements/_type_guards.py` | Consolidated type guard utilities |
| `browser/context_builder.py` | Schema-DOM reconciliation + LLM formatting |
**Note:** `semantic_discovery.py` and `llm_context.py` are NOT needed as separate files. Their functionality is integrated into existing modules.
### Modified Files
| File | Changes |
|------|---------|
| `browser/elements/form_discovery.py` | ARIA label resolution, delete regex functions |
| `browser/elements/field_inference.py` | Role-first detection with class fallback, import type guards |
| `browser/elements/dropdown/__init__.py` | Add `select_from_schema()` |
| `browser/elements/dropdown/_helpers.py` | Add `get_listbox_id()` |
| `browser/elements/dropdown/typeahead.py` | Improve wait logic |
| `browser/diagnostics.py` | Import `escape_selector` from `mui.py` |
| `browser/extension_client.py` | Import `escape_selector` from `mui.py` |
| `browser/elements/__init__.py` | Remove exports for deleted regex functions |
**NOT modified:** `raindrop/operations/form_schema.py` - Browser dependencies stay in browser layer
### Test Files
| File | Coverage |
|------|---------|
| `tests/unit/test_type_guards.py` | Phase 0 |
| `tests/unit/test_accessible_name.py` | Phase 1 (replaces `test_form_extraction_from_html.py`) |
| `tests/unit/test_context_builder.py` | Phase 2 |
| `tests/unit/test_type_mapping.py` | Phase 5 (schema -> UI type mapping) |
| `tests/unit/test_field_inference_roles.py` | Phase 4 |
| `tests/integration/test_form_context.py` | E2E validation |
**Deleted:** `tests/integration/browser/test_form_extraction_from_html.py` - Replaced with `test_accessible_name.py`
---
## Success Criteria
1. **No "unknown" field types** - All standard Raindrop fields correctly identified
2. **Schema-aware choices** - Dropdown options from schema, not DOM inspection
3. **Production compatibility** - Works with minified MUI class names
4. **LLM-ready output** - JSON context with labels, types, selectors, choices
5. **Code reduction** - Net reduction in lines of code:
- ~120 lines: Delete unused regex functions + test file
- ~50 lines: Consolidate duplicated type guards
- ~20 lines: Unify escape_selector imports
6. **No regression** - Remaining tests pass (test count may decrease due to removing regex tests)
---
## Risk Mitigation
| Risk | Mitigation |
|------|------------|
| Breaking existing actions | Grep confirmed no production usage of deleted functions |
| Accessibility API gaps | MUI fallback for label resolution |
| Schema-DOM mismatch | Fuzzy matching + unmatched field tracking |
| Typeahead timing issues | Configurable wait params, retry logic |
---
## Dependencies
- No new external packages required
- Uses existing: `playwright`, `pydantic`, `httpx`
- GraphQL schema access via existing `form_schema.py`

View File

@@ -18,6 +18,8 @@ dependencies = [
"pydantic-settings>=2.4.0",
"python-dotenv>=1.2.1",
"pyyaml>=6.0.2",
"redis>=7.1.0",
"types-redis>=4.6.0.20241004",
"uvicorn>=0.30.6",
]

View File

@@ -1,796 +0,0 @@
Based on the `troubleshooting.md` (which shows a complex React/Material UI form with `data-cy` attributes) and your existing codebase, here is the solution divided into two parts:
1. **Browser Elements**: A robust set of Python helpers (`mui.py`) specifically designed to conquer the Material UI structure seen in your HTML.
2. **Extension Integration**: An "airtight" implementation of `worker.js` and updates to `extension_client.py` to ensure reliable execution of these helpers via the Chrome Extension architecture (Service Worker <-> Content Script <-> DOM).
---
### Part 1: Robust Browser Elements (`src/guide/app/browser/elements/mui.py`)
I recommend creating a dedicated `mui.py` file. The generic `dropdown.py` is often too broad for the specific nesting of MUI v5 components seen in your `troubleshooting.md`.
This file handles the "React Hack" (triggering events so state updates) and targets the stable `data-cy` wrappers.
```python
# src/guide/app/browser/elements/mui.py
import logging
from typing import Any
from guide.app.browser.types import PageLike
from guide.app.browser.elements.dropdown import click_with_mouse_events
_logger = logging.getLogger(__name__)
async def select_mui_dropdown(
page: PageLike,
wrapper_selector: str,
value_text: str
) -> None:
"""
Selects an option from a Material UI Select component (non-searchable dropdown).
Target Structure (from troubleshooting.md):
<div data-cy="contract-form-field-type"> ... <div role="combobox">
Args:
page: The page or extension client.
wrapper_selector: The data-cy attribute or selector for the wrapper (e.g., '[data-cy="contract-form-field-type"]').
value_text: The visible text of the option to select.
"""
# 1. Target the clickable combobox div within the wrapper
# We avoid the input[type="hidden"] and target the visible div
trigger_selector = f"{wrapper_selector} [role='combobox']"
# 2. Click to open the listbox
await click_with_mouse_events(page, trigger_selector)
# 3. Wait for the listbox (MUI attaches this to document.body, usually via a Portal)
# We look for a listbox that is visible.
await page.wait_for_selector('ul[role="listbox"]', timeout=2000)
# 4. Find and click the specific option
# MUI options usually have role="option". We use XPath to match text exact or contains.
option_selector = f'//ul[@role="listbox"]//li[@role="option"][contains(text(), "{value_text}")]'
# Fallback: strict text match if fuzzy fails
try:
await page.click(option_selector)
except Exception:
# Try exact match
await page.click(f'//ul[@role="listbox"]//li[@role="option" and text()="{value_text}"]')
# 5. Wait for listbox to detach (confirm selection)
try:
await page.wait_for_selector('ul[role="listbox"]', state="detached", timeout=1000)
except Exception:
# If it didn't close, we might need to press Escape, but usually click works.
pass
async def select_mui_autocomplete(
page: PageLike,
wrapper_selector: str,
value_text: str,
clear_existing: bool = True
) -> None:
"""
Selects an option from a Material UI Autocomplete (searchable + combobox).
Target Structure (from troubleshooting.md):
<div data-cy="contract-form-field-supplier"> ... <input role="combobox">
"""
input_selector = f"{wrapper_selector} input[role='combobox']"
# 1. Clear existing value if needed (via the small 'x' button if present)
if clear_existing:
clear_btn = f"{wrapper_selector} .MuiAutocomplete-clearIndicator"
is_visible = await page.evaluate(f"""document.querySelector('{clear_btn}') !== null""")
if is_visible:
await page.click(clear_btn)
# 2. Focus and Type into the input
# Note: We use type() which handles the React synthetic event dispatch in extension_client
await page.click(input_selector)
# Type enough to trigger search, or full text
await page.fill(input_selector, value_text)
# 3. Wait for the loading indicator to vanish (if async)
# troubleshooting.md shows `.MuiAutocomplete-popupIndicator` but loading state usually changes icon
await page.wait_for_timeout(500) # Short buffer for React to render options
# 4. Wait for listbox
await page.wait_for_selector('ul[role="listbox"]', timeout=3000)
# 5. Click the option
# Note: Autocomplete options often highlight specific parts of text, so we rely on role="option"
option_selector = f'//ul[@role="listbox"]//li[@role="option"]//text()[contains(., "{value_text}")]/ancestor::li'
# Simplified selector if the above complex path fails
simple_option = f'li[role="option"]:has-text("{value_text}")'
try:
await page.click(simple_option)
except Exception:
# Fallback to pure JS click if selector engine is strict
js_click = f"""
const opts = Array.from(document.querySelectorAll('ul[role="listbox"] li[role="option"]'));
const target = opts.find(el => el.textContent.includes("{value_text}"));
if (target) target.click();
else throw new Error("Option not found");
"""
await page.evaluate(js_click)
async def fill_mui_text(
page: PageLike,
wrapper_selector: str,
value: str
) -> None:
"""
Fills a standard MUI Text Field or Text Area.
"""
# MUI inputs are usually nested inside the wrapper
input_selector = f"{wrapper_selector} input"
# Check if it's a textarea
is_textarea = await page.evaluate(f"document.querySelector('{wrapper_selector} textarea') !== null")
if is_textarea:
input_selector = f"{wrapper_selector} textarea:not([aria-hidden='true'])"
await page.fill(input_selector, value)
async def set_mui_checkbox(
page: PageLike,
wrapper_selector: str,
label_text: str,
checked: bool = True
) -> None:
"""
Toggles a checkbox inside a FormGroup.
Target: <label> ... <span class="MuiCheckbox-root"> ... <span class="MuiTypography-root">Direct</span>
"""
# Find the specific label containing the text
# We click the input's parent span or the label to toggle
# Logic: Find the input associated with the label text
js_toggle = f"""
const labels = Array.from(document.querySelectorAll('{wrapper_selector} label'));
const targetLabel = labels.find(l => l.textContent.includes("{label_text}"));
if (!targetLabel) throw new Error("Label '{label_text}' not found");
const input = targetLabel.querySelector('input[type="checkbox"]');
if (input.checked !== {str(checked).lower()}) {{
input.click();
}}
"""
await page.evaluate(js_toggle)
```
---
### Part 2: Airtight Extension Integration
The challenge is that `worker.js` (Service Worker) cannot access the DOM. It must message a Content Script.
#### 2.1 The Content Script (`extensions/content.js`)
This script executes the actual DOM manipulation. It needs to handle the "React Hack" to ensure `fill` works on MUI components.
```javascript
// extensions/content.js
// Listen for messages from the worker
chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
if (request.action === "EXECUTE_JS") {
executeSafe(request.code, request.args)
.then(result => sendResponse({ status: "success", result }))
.catch(error => sendResponse({ status: "error", error: error.message }));
// Return true to indicate async response
return true;
}
});
async function executeSafe(code, args) {
// Wrap evaluation in an async function to allow 'await' in the passed string
// logic similar to Playwright's evaluate
const func = new Function("args", `return (async () => { ${code} })();`);
return await func(args);
}
// Helper injected into the page context to handle React Inputs
// This creates a global helper we can call from our Python strings
window.__fillReactInput = (selector, value) => {
const input = document.querySelector(selector);
if (!input) throw new Error(`Element not found: ${selector}`);
// 1. Set value
const nativeInputValueSetter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, "value").set;
nativeInputValueSetter.call(input, value);
// 2. Dispatch events
const eventBubbles = { bubbles: true };
input.dispatchEvent(new Event('input', eventBubbles));
input.dispatchEvent(new Event('change', eventBubbles));
// 3. Blur to trigger validation (common in MUI)
input.blur();
};
```
#### 2.2 The Background Worker (`extensions/worker.js`)
This acts as the bridge between the WebSocket (Python) and the Content Script (Tab). It handles connection resilience.
```javascript
// extensions/worker.js
let socket = null;
const WS_URL = "ws://localhost:17373";
function connect() {
socket = new WebSocket(WS_URL);
socket.onopen = () => {
console.log("Connected to Python Guide");
// Identify as extension
socket.send(JSON.stringify({ type: "IDENTIFY", kind: "extension" }));
};
socket.onmessage = async (event) => {
const message = JSON.parse(event.data);
const { id, code, action } = message;
try {
// Get the active tab
const [tab] = await chrome.tabs.query({ active: true, currentWindow: true });
if (!tab) {
throw new Error("No active tab found");
}
// Send to content script
const result = await chrome.tabs.sendMessage(tab.id, {
action: "EXECUTE_JS",
code: code
});
if (result.status === "error") {
throw new Error(result.error);
}
socket.send(JSON.stringify({
id,
status: "success",
result: result.result
}));
} catch (err) {
socket.send(JSON.stringify({
id,
status: "error",
error: err.message || "Unknown error in extension worker"
}));
}
};
socket.onclose = () => {
console.log("Disconnected. Reconnecting in 3s...");
setTimeout(connect, 3000);
};
socket.onerror = (err) => {
console.error("WebSocket error:", err);
socket.close();
};
}
// Keep Service Worker alive
chrome.runtime.onStartup.addListener(connect);
chrome.runtime.onInstalled.addListener(connect);
connect();
```
#### 2.3 Updated Python Client (`src/guide/app/browser/extension_client.py`)
Update the `eval_js` and `fill` methods to leverage the architecture above.
```python
# src/guide/app/browser/extension_client.py (Partial update)
# ... existing imports ...
class ExtensionPage:
# ... existing init ...
async def fill(self, selector: str, value: str) -> None:
"""
Fill using the React-compatible helper defined in content.js.
"""
escaped_selector = self._escape_selector(selector)
escaped_value = value.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"')
# We rely on window.__fillReactInput defined in content.js
# Or we inject the logic directly if we want to be self-contained
js_code = f"""
const input = document.querySelector('{escaped_selector}');
if (!input) throw new Error("Element not found: {escaped_selector}");
// React state update hack
const setter = Object.getOwnPropertyDescriptor(window.HTMLInputElement.prototype, "value").set;
setter.call(input, '{escaped_value}');
input.dispatchEvent(new Event('input', {{ bubbles: true }}));
input.dispatchEvent(new Event('change', {{ bubbles: true }}));
input.focus();
input.blur();
"""
await self.eval_js(js_code)
async def click(self, selector: str) -> None:
"""
Robust click that attempts to scroll into view and hit the center.
"""
escaped = self._escape_selector(selector)
js_code = f"""
const el = document.querySelector('{escaped}');
if (!el) throw new Error("Element not found: {escaped}");
el.scrollIntoView({{behavior: 'auto', block: 'center'}});
// Wait a tick for scroll
await new Promise(r => setTimeout(r, 100));
// Dispatch mouse events sequence for MUI/React
const opts = {{ bubbles: true, cancelable: true, view: window }};
el.dispatchEvent(new MouseEvent('mousedown', opts));
el.dispatchEvent(new MouseEvent('mouseup', opts));
el.click();
"""
await self.eval_js(js_code)
# ... existing methods ...
```
### Summary of Integration Logic
1. **Python (`select_mui_dropdown`)**: Calls `page.evaluate(...)` (which maps to `client.eval_js`).
2. **Extension Client (`eval_js`)**: Sends JSON `{ id: "uuid", code: "document.querySelector..." }` over WebSocket.
3. **Worker (`worker.js`)**: Receives JSON, finds Active Tab ID, sends message `chrome.tabs.sendMessage`.
4. **Content Script (`content.js`)**: Receives message, runs `new Function(...)` against the actual DOM. It has access to `window` and `document`. It calculates the result or throws an error.
5. **Return Path**: Content Script -> Worker (via callback) -> Python (via WebSocket).
This architecture allows your Python `mui.py` helpers to interact with the complex `troubleshooting.md` form elements as if they were running locally, bypassing the security restrictions of the Service Worker and the "no-DOM-access" limitation of pure Playwright in Extension mode.
Based on the complex structure of `troubleshooting.md` (MUI v5, Portals, dynamic IDs, nested wrappers) and the requirement for robust extension support, here is the comprehensive solution.
This solution introduces a specialized library `mui.py` for the browser elements and a unified "Action Dispatcher" pattern in the extension files to handle the complexity of React events and isolated DOM environments.
### 1. Robust Browser Elements (`src/guide/app/browser/elements/mui.py`)
This module abstracts the complexity of Material UI. It identifies elements by their stable `data-cy` wrappers but interacts with the specific functional DOM nodes (inputs, listboxes) required for the action.
```python
# src/guide/app/browser/elements/mui.py
import logging
import os
from guide.app.browser.types import PageLike
from guide.app.browser.elements.dropdown import click_with_mouse_events
_logger = logging.getLogger(__name__)
# --- Text & Numeric Inputs ---
async def fill_text(page: PageLike, wrapper_selector: str, value: str, is_textarea: bool = False) -> None:
"""
Fills a text input or textarea wrapped in a MUI container.
Handles masking and validation triggers via React event simulation.
Args:
wrapper_selector: Selector for the container (e.g. '[data-cy="contract-form-field-name"]')
value: The string value to type.
is_textarea: Force targeting a textarea element.
"""
# 1. Determine target selector
# MUI structure: <div data-cy="..."> ... <div class="MuiInputBase..."> <input>
tag = "textarea" if is_textarea else "input"
target_selector = f"{wrapper_selector} {tag}:not([type='hidden'])"
# 2. Check for disabled state first
is_disabled = await page.evaluate(f"""
const el = document.querySelector('{target_selector}');
el ? el.disabled || el.readOnly : false
""")
if is_disabled:
_logger.warning(f"Skipping fill for disabled element: {wrapper_selector}")
return
# 3. Use robust fill (handles React state)
# If extension mode, this uses the enhanced fill logic in extension_client
await page.click(target_selector) # Focus
await page.fill(target_selector, value)
# 4. Blur to trigger validation (required for numeric/masked inputs)
await page.evaluate(f"document.querySelector('{target_selector}').blur()")
async def fill_numeric(page: PageLike, wrapper_selector: str, value: str) -> None:
"""
Fills a numeric input. Clears existing masked values (like '0%') before typing.
"""
target_selector = f"{wrapper_selector} input"
# Select all and delete to handle masks/formatting
await page.click(target_selector)
await page.evaluate(f"""
const el = document.querySelector('{target_selector}');
if (el) {{
el.value = '';
el.dispatchEvent(new Event('input', {{ bubbles: true }}));
}}
""")
await page.fill(target_selector, value)
# --- Dropdowns & Autocomplete ---
async def select_single_choice(page: PageLike, wrapper_selector: str, value_text: str) -> None:
"""
Selects from a standard MUI Select (non-searchable dropdown).
Structure: Wrapper -> [role=combobox] -> Portal(ul[role=listbox]) -> li[role=option]
"""
trigger_selector = f"{wrapper_selector} [role='combobox']"
# 1. Open Dropdown
await click_with_mouse_events(page, trigger_selector)
# 2. Wait for Portal
await page.wait_for_selector('ul[role="listbox"]', timeout=3000)
# 3. Select Option (Exact match preferred, fallback to substring)
# Using XPath to break out of the wrapper scope and search the document body
xpath = f"//ul[@role='listbox']//li[@role='option' and .//text()='{value_text}']"
try:
# Try finding exact text match
if hasattr(page, "click_element_with_text"):
# Extension optimized path
await page.click_element_with_text('ul[role="listbox"] li[role="option"]', value_text)
else:
# Playwright CDP path
await page.locator(xpath).first.click()
except Exception:
# Fallback: JavaScript find and click
await page.evaluate(f"""
const opts = Array.from(document.querySelectorAll('ul[role="listbox"] li[role="option"]'));
const target = opts.find(el => el.textContent.trim() === "{value_text}");
if (target) target.click();
else throw new Error("Option '{value_text}' not found in dropdown");
""")
# 4. Wait for close
await page.wait_for_timeout(300)
async def select_autocomplete(page: PageLike, wrapper_selector: str, value_text: str, multi_select: bool = False) -> None:
"""
Selects from a MUI Autocomplete (Search as you type).
Structure: Wrapper -> input[role=combobox] -> Portal(ul[role=listbox])
"""
input_selector = f"{wrapper_selector} input[role='combobox']"
# 1. Clear existing if single select and not empty
if not multi_select:
clear_btn = f"{wrapper_selector} .MuiAutocomplete-clearIndicator"
has_clear = await page.evaluate(f"!!document.querySelector('{clear_btn}')")
if has_clear:
await page.click(clear_btn)
# 2. Type to search
await page.click(input_selector)
await page.fill(input_selector, value_text)
# 3. Wait for options
await page.wait_for_selector('ul[role="listbox"]', timeout=3000)
# 4. Click option (Autocomplete options often wrap text in spans)
# We generally click the first match for the typed text
await page.evaluate(f"""
const opts = Array.from(document.querySelectorAll('ul[role="listbox"] li[role="option"]'));
// Prioritize exact match, then startsWith, then includes
let target = opts.find(el => el.textContent.trim() === "{value_text}");
if (!target) target = opts.find(el => el.textContent.trim().startsWith("{value_text}"));
if (!target) target = opts.find(el => el.textContent.includes("{value_text}"));
if (target) target.click();
else throw new Error("Autocomplete option '{value_text}' not found");
""")
# --- Boolean Inputs ---
async def set_checkbox(page: PageLike, wrapper_selector: str, checked: bool = True) -> None:
"""
Toggles a checkbox. Checks current state before clicking.
Works for both 'terminate for convenience' (span wrapper) and 'attributes' (label wrapper).
"""
input_selector = f"{wrapper_selector} input[type='checkbox']"
# Check current state using JS to avoid stale element handle issues
is_checked = await page.evaluate(f"document.querySelector('{input_selector}').checked")
if is_checked != checked:
# Click the *input* directly if possible, or its parent if input is hidden/zero-size
await click_with_mouse_events(page, input_selector)
async def set_radio_group(page: PageLike, wrapper_selector: str, option_label: str) -> None:
"""
Selects a specific radio button within a group by its label.
"""
# 1. Find the radio input associated with the label text inside the wrapper
# Strategy: Find label containing text -> find radio input
js_click = f"""
const wrapper = document.querySelector('{wrapper_selector}');
const labels = Array.from(wrapper.querySelectorAll('label'));
const targetLabel = labels.find(l => l.textContent.includes("{option_label}"));
if (targetLabel) {{
const radio = targetLabel.querySelector('input[type="radio"]');
if (radio && !radio.checked) radio.click();
}} else {{
throw new Error("Radio option '{option_label}' not found");
}}
"""
await page.evaluate(js_click)
# --- Attachments ---
async def upload_file(page: PageLike, dropzone_selector: str, file_path: str) -> None:
"""
Uploads a file to a dropzone input.
Args:
dropzone_selector: Selector for the input[type=file] (e.g. '#rd-drop-zone-input')
file_path: Absolute path to the file on the machine running the python code.
"""
# Extension Mode Limitation Check
if hasattr(page, "eval_js"): # Is ExtensionPage
_logger.warning("File upload via Extension is limited. Attempting JS DataTransfer simulation.")
# We can't read the file from disk in JS. We can only simulate the event if we had the bytes.
# For this context, we will skip actual upload in extension or throw not supported.
# However, to be "airtight" for what IS possible:
pass
else:
# CDP Mode (Playwright Standard)
await page.locator(dropzone_selector).set_input_files(file_path)
# --- Buttons ---
async def click_button(page: PageLike, selector: str) -> None:
"""
Robust button click that handles MUI ripples and loading states.
"""
# Wait for button to be enabled
await page.wait_for_selector(f"{selector}:not([disabled])", timeout=3000)
await click_with_mouse_events(page, selector)
```
### 2. The Content Script (`extensions/content.js`)
This script acts as the browser-side runtime. It includes a specific `ActionDispatcher` to cleanly handle different interaction types and the crucial React input hacks.
```javascript
// extensions/content.js
// --- React Helpers ---
const ReactUtils = {
// Sets value on React controlled inputs by bypassing the wrapper
setNativeValue: (element, value) => {
const valueSetter = Object.getOwnPropertyDescriptor(element, 'value').set;
const prototype = Object.getPrototypeOf(element);
const prototypeValueSetter = Object.getOwnPropertyDescriptor(prototype, 'value').set;
if (valueSetter && valueSetter !== prototypeValueSetter) {
prototypeValueSetter.call(element, value);
} else {
valueSetter.call(element, value);
}
},
// Dispatches full suite of events to ensure UI updates
dispatchEvents: (element) => {
element.dispatchEvent(new Event('input', { bubbles: true }));
element.dispatchEvent(new Event('change', { bubbles: true }));
element.blur(); // Often needed for validation
}
};
// --- Action Implementations ---
const Actions = {
FILL: async ({ selector, value }) => {
const el = document.querySelector(selector);
if (!el) throw new Error(`Element not found: ${selector}`);
el.focus();
ReactUtils.setNativeValue(el, value);
ReactUtils.dispatchEvents(el);
return true;
},
CLICK: async ({ selector }) => {
const el = document.querySelector(selector);
if (!el) throw new Error(`Element not found: ${selector}`);
// Scroll logic
el.scrollIntoView({ behavior: 'auto', block: 'center' });
// Synthetic Mouse Events (MUI/React often needs these)
const opts = { bubbles: true, cancelable: true, view: window };
el.dispatchEvent(new MouseEvent('mousedown', opts));
el.dispatchEvent(new MouseEvent('mouseup', opts));
el.click();
return true;
},
CLICK_TEXT: async ({ selector, text }) => {
// Finds an element matching selector that contains text
const elements = Array.from(document.querySelectorAll(selector));
const target = elements.find(e => e.textContent.includes(text));
if (!target) throw new Error(`Element ${selector} with text "${text}" not found`);
target.scrollIntoView({ behavior: 'auto', block: 'center' });
target.click();
return true;
},
EVAL: async ({ code, args }) => {
// Safe evaluation of custom JS strings passed from Python
const func = new Function("args", `return (async () => { ${code} })();`);
return await func(args);
}
};
// --- Message Listener ---
chrome.runtime.onMessage.addListener((request, sender, sendResponse) => {
const action = Actions[request.action];
if (action) {
action(request.payload)
.then(result => sendResponse({ status: "success", result }))
.catch(error => sendResponse({ status: "error", error: error.message }));
return true; // Async response
}
});
```
### 3. The Extension Worker (`extensions/worker.js`)
The airtight bridge. It handles the WebSocket connection, identification, and robust error handling if the active tab is missing or the content script hasn't loaded.
```javascript
// extensions/worker.js
const WS_URL = "ws://localhost:17373";
let socket = null;
let reconnectTimer = null;
function connect() {
socket = new WebSocket(WS_URL);
socket.onopen = () => {
console.log("[Guide] Connected to Host");
socket.send(JSON.stringify({ type: "IDENTIFY", kind: "extension" }));
if (reconnectTimer) clearInterval(reconnectTimer);
};
socket.onmessage = async (event) => {
const msg = JSON.parse(event.data);
const { id, action, payload } = msg;
try {
// 1. Get Active Tab
const [tab] = await chrome.tabs.query({ active: true, currentWindow: true });
if (!tab) throw new Error("No active tab found");
// 2. Relay to Content Script
// We explicitly map Python generic requests to specific Content Script actions
// If code is provided, it's an EVAL, otherwise mapped action
const targetAction = msg.code ? "EVAL" : (msg.action || "EVAL");
const targetPayload = msg.code ? { code: msg.code } : payload;
const response = await chrome.tabs.sendMessage(tab.id, {
action: targetAction,
payload: targetPayload
});
if (!response) throw new Error("No response from content script (is page loaded?)");
if (response.status === "error") throw new Error(response.error);
// 3. Success Response
socket.send(JSON.stringify({ id, status: "success", result: response.result }));
} catch (err) {
console.error("[Guide] Execution Error:", err);
socket.send(JSON.stringify({ id, status: "error", error: err.message }));
}
};
socket.onclose = () => {
console.log("[Guide] Disconnected. Retrying...");
socket = null;
reconnectTimer = setTimeout(connect, 3000);
};
socket.onerror = (err) => {
console.error("[Guide] Socket Error:", err);
socket.close();
};
}
// Ensure worker stays alive and connects immediately
chrome.runtime.onStartup.addListener(connect);
chrome.runtime.onInstalled.addListener(connect);
connect();
```
### 4. Integration Updates (`src/guide/app/browser/extension_client.py`)
Update the Python client to send structured messages that match the new `Actions` map in `content.js`.
```python
# src/guide/app/browser/extension_client.py (Relevant updates)
class ExtensionPage:
# ... init ...
async def fill(self, selector: str, value: str) -> None:
"""
Structured fill command sent to extension.
"""
escaped_selector = self._escape_selector(selector)
# We send a structured object now, not raw JS, for standard actions
await self._send_command("FILL", {"selector": escaped_selector, "value": value})
async def click(self, selector: str) -> None:
escaped_selector = self._escape_selector(selector)
await self._send_command("CLICK", {"selector": escaped_selector})
async def click_element_with_text(self, selector: str, text: str, timeout: int = 5000) -> None:
escaped_selector = self._escape_selector(selector)
await self._send_command("CLICK_TEXT", {"selector": escaped_selector, "text": text})
async def _send_command(self, action: str, payload: dict[str, Any]) -> Any:
"""
Helper to send structured commands.
"""
request_id = str(uuid.uuid4())
future: asyncio.Future[JSONValue] = asyncio.Future()
self._pending[request_id] = future
message = {
"id": request_id,
"action": action, # Maps to content.js Actions
"payload": payload
}
await self._client.send_message(message)
return await future
# eval_js remains for custom scripts
async def eval_js(self, code: str, await_promise: bool = True) -> JSONValue:
request_id = str(uuid.uuid4())
future: asyncio.Future[JSONValue] = asyncio.Future()
self._pending[request_id] = future
message = {
"id": request_id,
"code": code, # Worker detects 'code' and maps to EVAL
}
await self._client.send_message(message)
return await future
```

View File

@@ -0,0 +1,15 @@
"""Generic form automation actions.
These actions work with any entity type that has a form schema
(boards, contracts, sourcing events, etc.).
"""
from guide.app.actions.form.smart_fill import (
InspectFormContextAction,
SmartFillAction,
)
__all__ = [
"InspectFormContextAction",
"SmartFillAction",
]

View File

@@ -0,0 +1,382 @@
"""Smart form filling using schema-DOM reconciliation.
Demonstrates the Semantic Bridge architecture for LLM-driven automation:
1. Fetch board schema from GraphQL
2. Build FormContext by reconciling schema with live DOM
3. Format for LLM consumption
4. Execute fills using dynamically-dispatched helpers
"""
from __future__ import annotations
import logging
from typing import ClassVar, cast, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.context_builder import (
FormContext,
build_form_context,
format_for_llm,
)
from guide.app.browser.elements.dropdown import (
select_combobox,
select_multi,
select_single,
)
from guide.app.browser.elements.dropdown.schema_aware import select_from_schema
from guide.app.browser.elements.form import fill_text
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.raindrop.operations.form_schema import (
FieldDef,
SchemaSourceType,
get_form_schema,
)
_logger = logging.getLogger(__name__)
async def _dispatch_fill(
page: PageLike,
selector: str,
value: str | list[str],
helper_name: str,
field_def: FieldDef | None = None,
) -> dict[str, object]:
"""Dispatch to appropriate fill helper based on helper_name.
Args:
page: Browser page instance.
selector: CSS selector for the field.
value: Value(s) to fill.
helper_name: Helper function name from FieldContext.helper.
field_def: Optional FieldDef for schema validation.
Returns:
Dict with selection/fill result.
"""
match helper_name:
case "select_single":
if field_def is not None:
# Use schema-aware selection with validation
result = await select_from_schema(page, selector, str(value), field_def)
return {
"selected": result["selected"],
"not_found": result["not_found"],
"validated": result["validated"],
"validation_warning": result["validation_warning"],
}
result = await select_single(page, selector, str(value))
return {
"selected": result["selected"],
"not_found": result["not_found"],
}
case "select_combobox":
result = await select_combobox(page, selector, str(value))
return {
"selected": result["selected"],
"not_found": result["not_found"],
}
case "select_multi":
values = value if isinstance(value, list) else [value]
result = await select_multi(page, selector, values)
return {
"selected": result["selected"],
"not_found": result["not_found"],
}
case "fill_with_react_events":
success = await fill_text(page, selector, str(value))
return {"filled": success, "value": str(value)}
case _:
_logger.warning(
"Unknown helper '%s', falling back to fill_text", helper_name
)
success = await fill_text(page, selector, str(value))
return {"filled": success, "value": str(value), "fallback": True}
@register_action
class SmartFillAction(DemoAction):
"""Fill form fields using schema-aware semantic matching.
This action demonstrates the Semantic Bridge architecture:
- Reconciles GraphQL schema with live DOM elements
- Generates LLM-consumable context with field metadata
- Dispatches to appropriate helpers based on field type
- Validates choices against schema where applicable
Required params:
board_id (int): Board ID to fetch schema for.
values (dict): Field values to fill, keyed by field_key (e.g., "f19").
Optional params:
graphql_url (str): Override GraphQL endpoint.
container_selector (str): Container to search for fields (default: "body").
dry_run (bool): If True, only build context without filling (for inspection).
"""
id: ClassVar[str] = "smart-fill"
description: ClassVar[str] = "Fill form using schema-DOM semantic bridge"
category: ClassVar[str] = "form"
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
params = context.params
# Extract parameters
board_id = params.get("board_id")
if not isinstance(board_id, int):
return ActionResult(
status="error",
details={"error": "board_id (int) is required"},
)
raw_values = params.get("values", {})
if not isinstance(raw_values, dict):
return ActionResult(
status="error",
details={"error": "values must be a dict keyed by field_key"},
)
# Cast to typed dict for iteration
values_to_fill = cast(dict[str, str | list[str]], raw_values)
dry_run = bool(params.get("dry_run", False))
container_selector = str(params.get("container_selector", "body"))
# Get bearer token from page localStorage
bearer_token = await self._extract_bearer_token(page)
if not bearer_token:
return ActionResult(
status="error",
details={"error": "No bearer token found in page localStorage"},
)
# Determine GraphQL URL (from params or default)
graphql_url = str(
params.get("graphql_url", "https://stg.raindrop.com/hasura/v1/graphql")
)
# 1. Fetch board schema from GraphQL
_logger.info("[SmartFill] Fetching schema for board_id=%d", board_id)
try:
schema = await get_form_schema(
graphql_url=graphql_url,
bearer_token=bearer_token,
source_type=SchemaSourceType.BOARD,
entity_key=board_id,
)
except ValueError as exc:
return ActionResult(
status="error",
details={"error": f"Failed to fetch schema: {exc}"},
)
# 2. Build semantic bridge (FormContext)
_logger.info("[SmartFill] Building form context from schema + DOM")
form_context = await build_form_context(page, schema, container_selector)
# 3. Format for LLM consumption
llm_context = format_for_llm(form_context)
# Log context for debugging
_logger.info(
"[SmartFill] Form context built: %d fields matched, %d unmatched schema, %d unmatched DOM",
len(form_context.fields),
len(form_context.unmatched_schema_fields),
len(form_context.unmatched_dom_fields),
)
if dry_run:
return ActionResult(
status="ok",
details={
"mode": "dry_run",
"schema_entity": schema.entity_name,
"llm_context": llm_context,
"unmatched_schema_fields": list(
form_context.unmatched_schema_fields
),
"unmatched_dom_fields": list(form_context.unmatched_dom_fields),
},
)
# 4. Execute fills
fill_results: dict[str, dict[str, object]] = {}
fill_errors: dict[str, str] = {}
for field_key, target_value in values_to_fill.items():
if field_key not in form_context.fields:
fill_errors[field_key] = "field_not_found_in_context"
continue
field_ctx = form_context.fields[field_key]
if not field_ctx.dom_selector:
fill_errors[field_key] = "no_dom_selector_for_field"
continue
if field_ctx.is_disabled:
fill_errors[field_key] = "field_is_disabled"
continue
# Get field def for schema validation (if available)
field_def = schema.get_field(field_key)
try:
_logger.info(
"[SmartFill] Filling %s (%s) with %s using %s",
field_key,
field_ctx.label,
target_value,
field_ctx.helper,
)
result = await _dispatch_fill(
page=page,
selector=field_ctx.dom_selector,
value=target_value,
helper_name=field_ctx.helper,
field_def=field_def,
)
fill_results[field_key] = result
except Exception as exc:
_logger.exception("Failed to fill %s", field_key)
fill_errors[field_key] = str(exc)
# Return ok with partial failures in details (ActionResult only allows ok/error)
# We return ok to indicate the action ran; check fields_failed for partial issues
return ActionResult(
status="ok",
details={
"schema_entity": schema.entity_name,
"fields_requested": list(values_to_fill.keys()),
"fields_filled": fill_results,
"fields_failed": fill_errors if fill_errors else None,
"partial_failure": bool(fill_errors),
"llm_context": llm_context,
},
)
async def _extract_bearer_token(self, page: PageLike) -> str | None:
"""Extract bearer token from page localStorage."""
try:
result = await page.evaluate(
"""
(() => {
// Try common storage keys for Raindrop auth
const keys = [
'access_token',
'auth_token',
'token',
'rd_access_token',
];
for (const key of keys) {
const val = localStorage.getItem(key);
if (val) return val;
}
// Try parsing auth state
const authState = localStorage.getItem('auth');
if (authState) {
try {
const parsed = JSON.parse(authState);
return parsed.access_token || parsed.token || null;
} catch (e) {}
}
return null;
})();
"""
)
return str(result) if isinstance(result, str) else None
except Exception:
return None
@register_action
class InspectFormContextAction(DemoAction):
"""Inspect form context without filling any fields.
Useful for debugging and understanding form structure.
Required params:
board_id (int): Board ID to fetch schema for.
Optional params:
graphql_url (str): Override GraphQL endpoint.
container_selector (str): Container to search for fields (default: "body").
"""
id: ClassVar[str] = "inspect-form-context"
description: ClassVar[str] = "Inspect form schema-DOM context for debugging"
category: ClassVar[str] = "diagnose"
@override
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
# Delegate to SmartFillAction with dry_run=True
params = dict(context.params)
params["dry_run"] = True
params["values"] = {}
smart_fill_context = ActionContext(
action_id=context.action_id,
persona_id=context.persona_id,
browser_host_id=context.browser_host_id,
params=params,
correlation_id=context.correlation_id,
shared_state=dict(context.shared_state),
)
smart_fill = SmartFillAction()
return await smart_fill.run(page, smart_fill_context)
def _build_llm_prompt(form_context: FormContext, user_intent: str) -> str:
"""Build LLM prompt for value generation (future use).
Args:
form_context: Merged schema + DOM context.
user_intent: Natural language description of what user wants.
Returns:
Formatted prompt for LLM.
"""
llm_data = format_for_llm(form_context)
fields_desc: list[str] = []
for key, field_info in llm_data.items():
label = str(field_info.get("label", ""))
ui_type = str(field_info.get("ui_type", ""))
desc = f"- {key} ({label}): {ui_type}"
allowed_values = field_info.get("allowed_values")
if allowed_values is not None and isinstance(allowed_values, list):
desc += f" - choices: {allowed_values}"
if field_info.get("is_required"):
desc += " [REQUIRED]"
fields_desc.append(desc)
return f"""You are filling a form for: {form_context.entity_name}
User intent: {user_intent}
Available fields:
{chr(10).join(fields_desc)}
Respond with a JSON object mapping field_key to value. Only include fields you want to fill.
For menu fields, use exact choice text. For multi-select, use a list.
Example response:
{{"f19": "Active", "f20": "Hardware Services"}}
"""
# Keep _build_llm_prompt available for future LLM integration
_ = _build_llm_prompt # Suppress unused warning
__all__ = [
"SmartFillAction",
"InspectFormContextAction",
]

View File

@@ -3,4 +3,7 @@ from guide.app.actions.intake.sourcing_request import (
FillSourcingRequestAction,
)
__all__ = ["FillIntakeBasicAction", "FillSourcingRequestAction"]
__all__ = [
"FillIntakeBasicAction",
"FillSourcingRequestAction",
]

View File

@@ -0,0 +1,297 @@
"""Schema-DOM reconciliation for LLM-driven form filling.
Provides utilities to:
- Map GraphQL FormSchema to live DOM elements
- Build merged context for LLM consumption
- Generate structured JSON for form automation
"""
from __future__ import annotations
from dataclasses import dataclass
from guide.app.browser.elements.field_inference import (
HelperFunction,
select_helper_for_type,
)
from guide.app.browser.elements.form_discovery import (
FormField,
extract_accessible_name,
extract_all_form_fields,
extract_field_value,
)
from guide.app.browser.types import PageLike
from guide.app.raindrop.operations.form_schema import FieldDef, FieldType, FormSchema
# ---------------------------------------------------------------------------
# Type Mapping: Schema FieldType -> UI Automation Type
# ---------------------------------------------------------------------------
_SCHEMA_TO_UI_TYPE: dict[FieldType, str] = {
FieldType.TEXT: "text",
FieldType.TEXTAREA: "textarea",
FieldType.MENU: "select",
FieldType.NUMBER: "number",
FieldType.DATE: "date",
FieldType.CHECKBOX: "checkbox",
FieldType.USER: "autocomplete",
FieldType.SUPPLIER: "autocomplete",
FieldType.COMMODITY: "autocomplete",
FieldType.DEPARTMENT: "autocomplete",
FieldType.CONTRACTS: "autocomplete",
FieldType.RELATIONSHIP: "autocomplete",
FieldType.ATTACHMENT: "file",
FieldType.UNKNOWN: "text",
}
def get_ui_type(field_type: FieldType) -> str:
"""Map schema field type to UI automation type."""
return _SCHEMA_TO_UI_TYPE.get(field_type, "text")
def get_helper_for_field(field_type: FieldType) -> HelperFunction:
"""Get automation helper function for field type."""
ui_type = get_ui_type(field_type)
return select_helper_for_type(ui_type)
# ---------------------------------------------------------------------------
# Data Structures
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class FieldContext:
"""Merged schema + DOM context for a single field."""
field_key: str # e.g., "f19" (from schema)
label: str # "Estimated Value" (from schema)
schema_type: FieldType # Original schema type (menu, user, etc.)
ui_type: str # Automation type (select, autocomplete, text)
helper: HelperFunction # Exact helper function to use
dom_selector: str | None # CSS selector if DOM match found
dom_label: str | None # Label from DOM (for verification)
current_value: list[str] | str | None # From DOM (list for multi-select chips)
is_required: bool # From schema
is_disabled: bool # From DOM
allowed_values: tuple[str, ...] | None # For menu fields (from schema)
@dataclass(frozen=True)
class FormContext:
"""Complete form context for LLM consumption."""
entity_type: str
entity_id: int | str
entity_name: str
fields: dict[str, FieldContext]
unmatched_dom_fields: tuple[str, ...] # DOM fields without schema match
unmatched_schema_fields: tuple[str, ...] # Schema fields without DOM match
# ---------------------------------------------------------------------------
# Matching Algorithm
# ---------------------------------------------------------------------------
def _normalize_label(label: str) -> str:
"""Normalize label for fuzzy matching."""
return label.lower().strip().replace("_", " ").replace("-", " ")
def _match_field_to_dom(
field_def: FieldDef,
dom_fields: list[FormField],
) -> FormField | None:
"""Match a schema field to a DOM field using priority-based matching.
Matching Strategy (priority order):
1. Exact data-cy match: data-cy contains field name
2. Field key in data-cy: Schema field name appears in data-cy attribute
3. Label text match: Schema label matches DOM label (case-insensitive)
4. Fuzzy label match: Normalized string comparison
"""
field_name = field_def.name
field_label = field_def.label
# Priority 1 & 2: data-cy matching
for dom_field in dom_fields:
data_cy = dom_field.get("data_cy")
if isinstance(data_cy, str) and data_cy:
# Exact field name in data-cy
if field_name in data_cy:
return dom_field
# Field key pattern: board-item-field-{type}-{key}
if f"-{field_name}" in data_cy or f"_{field_name}" in data_cy:
return dom_field
# Priority 3: Exact label match (case-insensitive)
normalized_schema_label = _normalize_label(field_label)
for dom_field in dom_fields:
dom_label = dom_field.get("label", "")
if _normalize_label(dom_label) == normalized_schema_label:
return dom_field
# Priority 4: Fuzzy label match (contains)
for dom_field in dom_fields:
dom_label = dom_field.get("label", "")
normalized_dom_label = _normalize_label(dom_label)
if normalized_schema_label in normalized_dom_label:
return dom_field
if normalized_dom_label in normalized_schema_label:
return dom_field
return None
async def build_form_context(
page: PageLike,
schema: FormSchema,
container_selector: str = "body",
) -> FormContext:
"""Build merged context from schema and live DOM.
Args:
page: Browser page instance
schema: Pre-fetched FormSchema from GraphQL
container_selector: CSS selector for form container
Returns:
FormContext with merged schema + DOM data
"""
# Extract all DOM fields
dom_fields = await extract_all_form_fields(page, container_selector)
# Track matching
matched_dom_indices: set[int] = set()
field_contexts: dict[str, FieldContext] = {}
unmatched_schema: list[str] = []
# Match each schema field to DOM
for field_def in schema.fields:
matched_dom = _match_field_to_dom(field_def, dom_fields)
if matched_dom:
# Mark as matched
for i, df in enumerate(dom_fields):
if df.get("data_cy") == matched_dom.get("data_cy"):
matched_dom_indices.add(i)
break
# Get current value if selector available
current_value: list[str] | str | None = None
dom_label: str | None = None
is_disabled = matched_dom.get("disabled", False)
if matched_dom.get("selector"):
current_value = await extract_field_value(page, matched_dom["selector"])
dom_label = await extract_accessible_name(page, matched_dom["selector"])
# Build allowed values from schema choices
allowed_values: tuple[str, ...] | None = None
if field_def.choices:
allowed_values = tuple(c.text for c in field_def.choices)
field_contexts[field_def.name] = FieldContext(
field_key=field_def.name,
label=field_def.label,
schema_type=field_def.field_type,
ui_type=get_ui_type(field_def.field_type),
helper=get_helper_for_field(field_def.field_type),
dom_selector=matched_dom.get("selector"),
dom_label=dom_label or matched_dom.get("label"),
current_value=current_value,
is_required=field_def.required,
is_disabled=is_disabled,
allowed_values=allowed_values,
)
else:
unmatched_schema.append(field_def.name)
# Collect unmatched DOM fields
unmatched_dom: list[str] = []
for i, dom_field in enumerate(dom_fields):
if i not in matched_dom_indices:
data_cy = dom_field.get("data_cy")
if data_cy:
unmatched_dom.append(data_cy)
return FormContext(
entity_type=schema.entity_type,
entity_id=schema.entity_id,
entity_name=schema.entity_name,
fields=field_contexts,
unmatched_dom_fields=tuple(unmatched_dom),
unmatched_schema_fields=tuple(unmatched_schema),
)
async def get_form_context_from_schema(
page: PageLike,
schema: FormSchema,
container_selector: str = "body",
) -> FormContext:
"""Build form context from pre-fetched schema.
This is the primary API. Callers fetch the schema separately
using raindrop/operations/form_schema.py, then pass it here.
Args:
page: Browser page instance
schema: Pre-fetched FormSchema from GraphQL
container_selector: CSS selector for form container
Returns:
FormContext with merged schema + DOM data
"""
return await build_form_context(page, schema, container_selector)
# ---------------------------------------------------------------------------
# LLM Context Generation
# ---------------------------------------------------------------------------
def format_for_llm(context: FormContext) -> dict[str, dict[str, object]]:
"""Format form context as LLM-consumable JSON.
Returns dict keyed by field_key with:
- label: Display name
- schema_type: Original field type from schema
- ui_type: Type for automation (aligns with select_helper_for_type)
- helper: Exact helper function to call
- selector: CSS selector for interaction
- current_value: Current value if any
- is_required: Whether field is required
- allowed_values: Valid choices for menu fields
"""
return {
field_key: {
"label": field.label,
"schema_type": field.schema_type.value, # e.g., "menu", "user"
"ui_type": field.ui_type, # e.g., "select", "autocomplete"
"helper": field.helper, # e.g., "select_single", "select_combobox"
"selector": field.dom_selector,
"current_value": field.current_value,
"is_required": field.is_required,
"is_disabled": field.is_disabled,
"allowed_values": list(field.allowed_values)
if field.allowed_values
else None,
}
for field_key, field in context.fields.items()
if field.dom_selector # Only include fields we can interact with
}
__all__ = [
"FieldContext",
"FormContext",
"build_form_context",
"get_form_context_from_schema",
"format_for_llm",
"get_ui_type",
"get_helper_for_field",
]

View File

@@ -16,6 +16,7 @@ from datetime import datetime, timezone
from enum import Enum
from typing import TypeGuard, TypedDict
from guide.app.browser.elements.mui import escape_selector
from guide.app.browser.types import PageLike
from guide.app.models.domain.models import DebugInfo
@@ -407,11 +408,6 @@ def _is_page_structure_result(obj: object) -> TypeGuard[PageStructureResult]:
# ---------------------------------------------------------------------------
def _escape_selector(selector: str) -> str:
"""Escape selector for safe JS interpolation."""
return selector.replace("\\", "\\\\").replace("'", "\\'")
_JS_FIELD_CHECK = """
(() => {{
const field = document.querySelector('{selector}');
@@ -633,7 +629,7 @@ async def inspect_field(page: PageLike, selector: str) -> FieldInfo:
Returns:
FieldInfo with existence and visibility details
"""
escaped = _escape_selector(selector)
escaped = escape_selector(selector)
result = await page.evaluate(_JS_FIELD_CHECK.format(selector=escaped))
if not _is_field_check_result(result):
@@ -673,7 +669,7 @@ async def inspect_input(page: PageLike, selector: str) -> InputInfo:
Returns:
InputInfo with input element details
"""
escaped = _escape_selector(selector)
escaped = escape_selector(selector)
result = await page.evaluate(_JS_INPUT_CHECK.format(selector=escaped))
if not _is_input_check_result(result):
@@ -710,7 +706,7 @@ async def get_selected_chips(page: PageLike, selector: str) -> ChipsResult:
Returns:
ChipsResult with chip count and details
"""
escaped = _escape_selector(selector)
escaped = escape_selector(selector)
result = await page.evaluate(_JS_CHIPS_CHECK.format(selector=escaped))
if not _is_chips_check_result(result):
@@ -798,7 +794,7 @@ async def inspect_dropdown(
listbox_after: ListboxInfo | None = None
if open_dropdown and field_info.exists:
escaped = _escape_selector(selector)
escaped = escape_selector(selector)
click_result = await page.evaluate(_JS_CLICK_DROPDOWN.format(selector=escaped))
if _is_click_dropdown_result(click_result):
@@ -814,7 +810,7 @@ async def inspect_dropdown(
await page.evaluate(_JS_CLOSE_DROPDOWN)
# Get component structure
escaped = _escape_selector(selector)
escaped = escape_selector(selector)
structure_result = await page.evaluate(
_JS_COMPONENT_STRUCTURE.format(selector=escaped)
)

View File

@@ -47,13 +47,12 @@ from guide.app.browser.elements.form import (
# Form discovery helpers
from guide.app.browser.elements.form_discovery import (
FormField,
extract_accessible_name,
extract_all_data_cy_selectors,
extract_form_field_metadata,
extract_all_form_fields,
extract_field_value,
extract_field_state,
extract_field_from_html,
extract_all_fields_from_html,
)
# Field inference helpers
@@ -103,13 +102,12 @@ __all__ = [
"fill_autocomplete",
# Form discovery
"FormField",
"extract_accessible_name",
"extract_all_data_cy_selectors",
"extract_form_field_metadata",
"extract_all_form_fields",
"extract_field_value",
"extract_field_state",
"extract_field_from_html",
"extract_all_fields_from_html",
# Field inference
"infer_type_from_selector",
"infer_type_from_element",

View File

@@ -0,0 +1,46 @@
"""Shared type guards for browser element operations.
Provides type-safe extraction of values from JavaScript evaluation results,
which return untyped dict[str, object] structures from page.evaluate() calls.
"""
from __future__ import annotations
from typing import TypeGuard
def is_dict_str_object(obj: object) -> TypeGuard[dict[str, object]]:
"""Type guard to check if object is dict[str, object]."""
return isinstance(obj, dict)
def is_list_of_objects(obj: object) -> TypeGuard[list[object]]:
"""Type guard to check if object is a list."""
return isinstance(obj, list)
def get_str_from_dict(d: dict[str, object], key: str, default: str = "") -> str:
"""Safely extract string value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else default
def get_str_or_none_from_dict(d: dict[str, object], key: str) -> str | None:
"""Safely extract string or None value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else None
def get_bool_from_dict(d: dict[str, object], key: str, default: bool = False) -> bool:
"""Safely extract bool value from dict."""
val = d.get(key)
return bool(val) if isinstance(val, bool) else default
__all__ = [
"is_dict_str_object",
"is_list_of_objects",
"get_str_from_dict",
"get_str_or_none_from_dict",
"get_bool_from_dict",
]

View File

@@ -21,6 +21,11 @@ from guide.app.browser.elements.dropdown.combobox import (
)
from guide.app.browser.elements.dropdown.typeahead import select_typeahead
from guide.app.browser.elements.dropdown._close import close_all_dropdowns
from guide.app.browser.elements.dropdown._helpers import get_listbox_id
from guide.app.browser.elements.dropdown.schema_aware import (
select_from_schema,
SchemaAwareDropdownResult,
)
# Re-exports from mui.py for backward compatibility
from guide.app.browser.elements.mui import (
@@ -50,7 +55,10 @@ __all__ = [
"select_mui_options",
"select_autocomplete",
"select_typeahead",
"select_from_schema",
"close_all_dropdowns",
"get_listbox_id",
"SchemaAwareDropdownResult",
# Re-exports from mui.py for backward compatibility
"DropdownResult",
"escape_selector",

View File

@@ -20,6 +20,35 @@ from guide.app.errors import ActionExecutionError
_logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Shared Utilities
# ---------------------------------------------------------------------------
async def get_listbox_id(page: PageLike, selector: str) -> str | None:
"""Get listbox ID from aria-controls or aria-owns attributes.
Args:
page: Browser page instance
selector: CSS selector for the field container
Returns:
The listbox element ID if found, None otherwise
"""
escaped = escape_selector(selector)
result = await page.evaluate(
f"""
(() => {{
const el = document.querySelector('{escaped}');
const input = el?.querySelector('input') || el;
return input?.getAttribute('aria-controls') ||
input?.getAttribute('aria-owns') || null;
}})()
"""
)
return str(result) if isinstance(result, str) else None
# ---------------------------------------------------------------------------
# Autocomplete Helpers
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,117 @@
"""Schema-aware dropdown selection with validation.
Provides dropdown selection with optional GraphQL schema validation
for menu/select fields. Validates choices against schema definitions
while allowing dynamic fields (user, supplier, etc.) to pass through.
"""
from __future__ import annotations
from typing import TypedDict
from guide.app.browser.elements.dropdown.autocomplete import select_single
from guide.app.browser.types import PageLike
from guide.app.raindrop.operations.form_schema import FieldDef, FieldType
# Dynamic field types that don't have static choices in schema
_DYNAMIC_FIELD_TYPES = frozenset(
{
FieldType.USER,
FieldType.SUPPLIER,
FieldType.RELATIONSHIP,
FieldType.CONTRACTS,
FieldType.COMMODITY,
FieldType.DEPARTMENT,
}
)
class SchemaAwareDropdownResult(TypedDict):
"""Result of a schema-validated dropdown selection operation."""
selected: list[str]
not_found: list[str]
available: list[str]
validated: bool
validation_warning: str | None
async def select_from_schema(
page: PageLike,
selector: str,
value: str,
field_def: FieldDef | None = None,
*,
strict_validation: bool = False,
) -> SchemaAwareDropdownResult:
"""Select value with optional schema validation.
Validate against GraphQL schema choices before attempting selection.
Dynamic field types (USER, SUPPLIER, etc.) skip validation since
their options are API-driven and not enumerated in the schema.
Args:
page: Browser page instance.
selector: CSS selector for the dropdown field.
value: Value to select.
field_def: Optional FieldDef for validation.
strict_validation: If True, raise ValueError on invalid choices.
Returns:
SchemaAwareDropdownResult with selection info and validation status:
- selected: list of successfully selected values
- not_found: list of values not found in dropdown
- available: list of available options
- validated: True if value was validated against schema
- validation_warning: warning message if value not in choices
Raises:
ValueError: If strict_validation is True and value not in choices.
Example:
# With schema validation
field_def = schema.get_field("status")
result = await select_from_schema(page, selector, "Active", field_def)
if result["validation_warning"]:
logger.warning(result["validation_warning"])
# Without schema (falls back to select_single)
result = await select_from_schema(page, selector, "Some Value")
"""
# Determine if field has static choices we can validate against
has_static_choices = (
field_def is not None
and len(field_def.choices) > 0
and field_def.field_type not in _DYNAMIC_FIELD_TYPES
)
validation_warning: str | None = None
if has_static_choices and field_def is not None:
# Validate against both choice.text and choice.value
valid_texts = {c.text for c in field_def.choices}
valid_values = {c.value for c in field_def.choices if c.value}
all_valid = valid_texts | valid_values
if value not in all_valid:
validation_warning = (
f"Value '{value}' not in schema choices: {sorted(valid_texts)}"
)
if strict_validation:
raise ValueError(validation_warning)
# Proceed with selection regardless of validation
result = await select_single(page, selector, value)
# Return extended result with validation info
return SchemaAwareDropdownResult(
selected=result["selected"],
not_found=result["not_found"],
available=result["available"],
validated=has_static_choices and validation_warning is None,
validation_warning=validation_warning,
)
__all__ = ["select_from_schema", "SchemaAwareDropdownResult"]

View File

@@ -8,32 +8,16 @@ Provides utilities to:
from __future__ import annotations
from typing import Literal, TypeGuard
from typing import Literal
from guide.app.browser.elements._type_guards import (
get_bool_from_dict,
get_str_from_dict,
is_dict_str_object,
is_list_of_objects,
)
from guide.app.browser.types import PageLike
def _is_dict_str_object(obj: object) -> TypeGuard[dict[str, object]]:
"""Type guard to check if object is dict[str, object]."""
return isinstance(obj, dict)
def _is_list_of_objects(obj: object) -> TypeGuard[list[object]]:
"""Type guard to check if object is a list."""
return isinstance(obj, list)
def _get_str_from_dict(d: dict[str, object], key: str, default: str = "") -> str:
"""Safely extract string value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else default
def _get_bool_from_dict(d: dict[str, object], key: str, default: bool = False) -> bool:
"""Safely extract bool value from dict."""
val = d.get(key)
return bool(val) if isinstance(val, bool) else default
HelperFunction = Literal[
"select_single",
"select_combobox",
@@ -70,7 +54,9 @@ def infer_type_from_selector(selector: str) -> str:
if "-supplier-" in selector_lower:
return "autocomplete"
if any(pattern in selector_lower for pattern in ["-contracts-", "-events-", "-order-"]):
if any(
pattern in selector_lower for pattern in ["-contracts-", "-events-", "-order-"]
):
return "relationship"
# Fallback
@@ -87,31 +73,106 @@ async def infer_type_from_element(page: PageLike, selector: str) -> str:
Returns:
Inferred field type
"""
# Local import to avoid circular import
from guide.app.browser.elements.mui import escape_selector
escaped = escape_selector(selector)
result = await page.evaluate(
f"""
((selector) => {{
const el = document.querySelector(selector);
if (!el) return null;
const input = el.querySelector('input') || el;
const autocomplete = el.querySelector('.MuiAutocomplete-root');
const selectRoot = el.querySelector('.MuiSelect-root');
return {{
tag_name: el.tagName.toLowerCase(),
type_attr: el.getAttribute('type'),
// PRIMARY: Role-based detection (W3C standard)
role: el.getAttribute('role'),
classes: Array.from(el.classList),
input_role: input.getAttribute('role'),
aria_controls: input.getAttribute('aria-controls'),
aria_owns: input.getAttribute('aria-owns'),
aria_expanded: input.getAttribute('aria-expanded'),
aria_haspopup: input.getAttribute('aria-haspopup'),
// FALLBACK: MUI class detection (for pages missing ARIA)
has_autocomplete_class: !!autocomplete,
has_select_class: !!selectRoot,
has_autocomplete_parent: !!el.closest('.MuiAutocomplete-root'),
has_select_parent: !!el.closest('.MuiSelect-root'),
// Existing fields
type_attr: input.getAttribute('type'),
classes: Array.from(el.classList),
data_cy: el.getAttribute('data-cy') || el.closest('[data-cy]')?.getAttribute('data-cy')
}};
}})('{selector}');
}})('{escaped}');
"""
)
if not result or not _is_dict_str_object(result):
if not result or not is_dict_str_object(result):
return "unknown"
# Check classes for MUI components
# Get data-cy for semantic type detection
data_cy = get_str_from_dict(result, "data_cy")
# PRIMARY: Role-based detection (most reliable, W3C standard)
input_role = get_str_from_dict(result, "input_role")
if input_role == "combobox":
# Distinguish between select and autocomplete based on data-cy
if "field-menu" in data_cy:
return "select"
if "user" in data_cy:
return "user"
if "supplier" in data_cy:
return "autocomplete"
if any(pattern in data_cy for pattern in ["contracts", "events", "orders"]):
return "relationship"
return "autocomplete"
# Check for ARIA popup indicators
aria_controls = get_str_from_dict(result, "aria_controls")
aria_owns = get_str_from_dict(result, "aria_owns")
if aria_controls or aria_owns:
# Has popup association - likely autocomplete/select
if "field-menu" in data_cy:
return "select"
return "autocomplete"
# FALLBACK: MUI class detection (for pages without proper ARIA)
has_autocomplete_class = get_bool_from_dict(result, "has_autocomplete_class")
has_select_class = get_bool_from_dict(result, "has_select_class")
if has_autocomplete_class:
if "user" in data_cy:
return "user"
if "supplier" in data_cy:
return "autocomplete"
if any(pattern in data_cy for pattern in ["contracts", "events", "orders"]):
return "relationship"
return "autocomplete"
if has_select_class:
return "select"
# Check for autocomplete/select parent (legacy fallback)
has_autocomplete_parent = get_bool_from_dict(result, "has_autocomplete_parent")
if has_autocomplete_parent:
if "user" in data_cy:
return "user"
if "supplier" in data_cy:
return "autocomplete"
if any(pattern in data_cy for pattern in ["contracts", "events", "orders"]):
return "relationship"
return "autocomplete"
has_select_parent = get_bool_from_dict(result, "has_select_parent")
if has_select_parent:
return "select"
# Check classes for MUI components (last resort class detection)
classes_obj = result.get("classes")
if classes_obj is not None and _is_list_of_objects(classes_obj):
if classes_obj is not None and is_list_of_objects(classes_obj):
classes: list[str] = []
for class_item in classes_obj:
if isinstance(class_item, (str, int, float, bool)):
@@ -121,25 +182,8 @@ async def infer_type_from_element(page: PageLike, selector: str) -> str:
if any("MuiSelect-select" in c for c in classes):
return "select"
# Check for autocomplete/select parent
has_autocomplete_parent = _get_bool_from_dict(result, "has_autocomplete_parent")
if has_autocomplete_parent:
# Check data-cy to determine if user or supplier autocomplete
data_cy = _get_str_from_dict(result, "data_cy")
if "user" in data_cy:
return "user"
if "supplier" in data_cy:
return "autocomplete"
if any(pattern in data_cy for pattern in ["contracts", "events", "orders"]):
return "relationship"
return "autocomplete"
has_select_parent = _get_bool_from_dict(result, "has_select_parent")
if has_select_parent:
return "select"
# Check input type attribute
type_attr = _get_str_from_dict(result, "type_attr")
type_attr = get_str_from_dict(result, "type_attr")
if type_attr == "number":
return "number"
if type_attr == "text":
@@ -148,7 +192,7 @@ async def infer_type_from_element(page: PageLike, selector: str) -> str:
return "text"
# Check tag name
tag_name = _get_str_from_dict(result, "tag_name")
tag_name = get_str_from_dict(result, "tag_name")
if tag_name == "textarea":
return "textarea"
if tag_name == "input":

View File

@@ -8,38 +8,60 @@ Provides utilities to:
from __future__ import annotations
import re
from typing import TypedDict, TypeGuard
from typing import TypedDict
from guide.app.browser.elements._type_guards import (
get_bool_from_dict,
get_str_from_dict,
get_str_or_none_from_dict,
is_dict_str_object,
is_list_of_objects,
)
from guide.app.browser.elements.mui import escape_selector
from guide.app.browser.types import PageLike
def _is_dict_str_object(obj: object) -> TypeGuard[dict[str, object]]:
"""Type guard to check if object is dict[str, object]."""
return isinstance(obj, dict)
# ---------------------------------------------------------------------------
# JavaScript Templates
# ---------------------------------------------------------------------------
_JS_ACCESSIBLE_NAME = """
((selector) => {
const field = document.querySelector(selector);
if (!field) return null;
const input = field.querySelector('input, textarea, [role="combobox"]');
if (!input) return null;
// Priority 1: Direct label association (HTMLInputElement.labels)
if (input.labels && input.labels.length > 0) {
return input.labels[0].innerText.trim();
}
// Priority 2: aria-label attribute
const ariaLabel = input.getAttribute('aria-label');
if (ariaLabel) return ariaLabel.trim();
// Priority 3: aria-labelledby reference
const labelledBy = input.getAttribute('aria-labelledby');
if (labelledBy) {
const labelEl = document.getElementById(labelledBy);
if (labelEl) return labelEl.innerText.trim();
}
// Priority 4: MUI FormControl fallback
const formControl = field.closest('.MuiFormControl-root') || field;
const labelEl = formControl.querySelector('label');
if (labelEl) return labelEl.textContent.trim();
return null;
})
"""
def _is_list_of_objects(obj: object) -> TypeGuard[list[object]]:
"""Type guard to check if object is a list."""
return isinstance(obj, list)
def _get_str_from_dict(d: dict[str, object], key: str, default: str = "") -> str:
"""Safely extract string value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else default
def _get_str_or_none_from_dict(d: dict[str, object], key: str) -> str | None:
"""Safely extract string or None value from dict."""
val = d.get(key)
return str(val) if isinstance(val, str) else None
def _get_bool_from_dict(d: dict[str, object], key: str, default: bool = False) -> bool:
"""Safely extract bool value from dict."""
val = d.get(key)
return bool(val) if isinstance(val, bool) else default
# ---------------------------------------------------------------------------
# Data Structures
# ---------------------------------------------------------------------------
class FormField(TypedDict):
@@ -53,6 +75,37 @@ class FormField(TypedDict):
disabled: bool
# ---------------------------------------------------------------------------
# Accessible Name Extraction
# ---------------------------------------------------------------------------
async def extract_accessible_name(page: PageLike, selector: str) -> str | None:
"""Extract computed accessible name for a form field.
Resolution order (W3C accessible name computation):
1. input.labels[0].innerText (native label association)
2. aria-label attribute
3. aria-labelledby -> getElementById
4. MUI FormControl fallback
Args:
page: Browser page instance
selector: CSS selector for the field container
Returns:
Accessible name string or None if not found
"""
escaped = escape_selector(selector)
result = await page.evaluate(f"{_JS_ACCESSIBLE_NAME}('{escaped}')")
return str(result) if isinstance(result, str) else None
# ---------------------------------------------------------------------------
# Data-cy and Field Discovery
# ---------------------------------------------------------------------------
async def extract_all_data_cy_selectors(page: PageLike) -> list[str]:
"""Extract all data-cy selectors from page.
@@ -70,7 +123,7 @@ async def extract_all_data_cy_selectors(page: PageLike) -> list[str]:
})();
"""
)
if _is_list_of_objects(result):
if is_list_of_objects(result):
data_cy_list: list[str] = []
for list_item in result:
if list_item is not None and isinstance(list_item, (str, int, float, bool)):
@@ -79,7 +132,9 @@ async def extract_all_data_cy_selectors(page: PageLike) -> list[str]:
return []
async def extract_form_field_metadata(page: PageLike, selector: str) -> FormField | None:
async def extract_form_field_metadata(
page: PageLike, selector: str
) -> FormField | None:
"""Extract complete metadata for a form field.
Args:
@@ -89,39 +144,75 @@ async def extract_form_field_metadata(page: PageLike, selector: str) -> FormFiel
Returns:
FormField metadata or None if field not found
"""
escaped = escape_selector(selector)
result = await page.evaluate(
f"""
((selector) => {{
const field = document.querySelector(selector);
if (!field) return null;
const label = field.querySelector('label');
const input = field.querySelector('input');
const select = field.querySelector('[role="combobox"]');
const input = field.querySelector('input, textarea, [role="combobox"]');
const autocomplete = field.querySelector('.MuiAutocomplete-root');
const select = field.querySelector('[role="combobox"]');
// Priority-based label resolution (W3C accessible name)
let label = "";
// 1. Direct label association (HTMLInputElement.labels)
if (input && input.labels && input.labels.length > 0) {{
label = input.labels[0].innerText;
}}
// 2. ARIA label
else if (input?.hasAttribute('aria-label')) {{
label = input.getAttribute('aria-label');
}}
// 3. ARIA labelledby
else if (input?.hasAttribute('aria-labelledby')) {{
const labelId = input.getAttribute('aria-labelledby');
const labelEl = document.getElementById(labelId);
if (labelEl) label = labelEl.innerText;
}}
// 4. MUI FormControl fallback
else {{
const formControl = field.closest('.MuiFormControl-root') || field;
const labelEl = formControl.querySelector('label');
if (labelEl) label = labelEl.textContent;
}}
// Determine type (preserve fields for _infer_field_type_from_metadata)
let type = 'unknown';
if (autocomplete) type = 'autocomplete';
else if (select) type = 'select';
else if (input) type = input.getAttribute('type') || 'text';
return {{
data_cy: field.getAttribute('data-cy'),
label: label ? label.textContent.trim() : '',
type: autocomplete ? 'autocomplete' : (select ? 'select' : 'input'),
required: input ? input.hasAttribute('required') : false,
disabled: field.classList.contains('Mui-disabled'),
label: (label || '').trim(),
// PRESERVED: Fields required by _infer_field_type_from_metadata
type: type,
has_autocomplete: !!autocomplete,
has_select: !!select,
input_type: input ? input.getAttribute('type') : null
input_type: input ? input.getAttribute('type') : null,
// NEW: Role-based fields for enhanced inference
role: input?.getAttribute('role'),
aria_controls: input?.getAttribute('aria-controls'),
aria_owns: input?.getAttribute('aria-owns'),
// Existing fields
required: input ? input.hasAttribute('required') : false,
disabled: field.classList.contains('Mui-disabled'),
}};
}})('{selector}');
}})('{escaped}');
"""
)
if not result or not _is_dict_str_object(result):
if not result or not is_dict_str_object(result):
return None
# Safely extract values with proper type narrowing
label = _get_str_from_dict(result, "label")
data_cy = _get_str_or_none_from_dict(result, "data_cy")
required = _get_bool_from_dict(result, "required")
disabled = _get_bool_from_dict(result, "disabled")
label = get_str_from_dict(result, "label")
data_cy = get_str_or_none_from_dict(result, "data_cy")
required = get_bool_from_dict(result, "required")
disabled = get_bool_from_dict(result, "disabled")
# Determine field type - convert to expected format
metadata_for_inference: dict[str, str | bool] = {}
@@ -140,15 +231,20 @@ async def extract_form_field_metadata(page: PageLike, selector: str) -> FormFiel
)
async def extract_all_form_fields(page: PageLike, container_selector: str = "body") -> list[FormField]:
"""Extract all form fields from page.
async def extract_all_form_fields(
page: PageLike, container_selector: str = "body"
) -> list[FormField]:
"""Extract all visible form fields from page.
Filters out hidden elements (offsetParent === null) to avoid matching
duplicate/hidden fields that React may render for mobile/desktop variants.
Args:
page: Browser page instance
container_selector: Container to search within (default: body)
Returns:
List of FormField metadata
List of FormField metadata for visible fields only
"""
result = await page.evaluate(
f"""
@@ -157,37 +253,41 @@ async def extract_all_form_fields(page: PageLike, container_selector: str = "bod
if (!container) return [];
const fields = container.querySelectorAll('[data-cy^="board-item-"]');
return Array.from(fields).map(field => {{
const label = field.querySelector('label');
const input = field.querySelector('input');
const autocomplete = field.querySelector('.MuiAutocomplete-root');
const select = field.querySelector('[role="combobox"]');
// Filter to visible elements only (offsetParent !== null)
// This excludes hidden/display:none elements that React may render
return Array.from(fields)
.filter(field => field.offsetParent !== null)
.map(field => {{
const label = field.querySelector('label');
const input = field.querySelector('input');
const autocomplete = field.querySelector('.MuiAutocomplete-root');
const select = field.querySelector('[role="combobox"]');
let type = 'unknown';
if (autocomplete) type = 'autocomplete';
else if (select) type = 'select';
else if (input) type = input.getAttribute('type') || 'text';
let type = 'unknown';
if (autocomplete) type = 'autocomplete';
else if (select) type = 'select';
else if (input) type = input.getAttribute('type') || 'text';
return {{
data_cy: field.getAttribute('data-cy'),
label: label ? label.textContent.trim() : '',
type: type,
required: input ? input.hasAttribute('required') : false,
disabled: field.classList.contains('Mui-disabled')
}};
}});
return {{
data_cy: field.getAttribute('data-cy'),
label: label ? label.textContent.trim() : '',
type: type,
required: input ? input.hasAttribute('required') : false,
disabled: field.classList.contains('Mui-disabled')
}};
}});
}})('{container_selector}');
"""
)
# Convert to FormField TypedDicts
form_fields: list[FormField] = []
if _is_list_of_objects(result):
if is_list_of_objects(result):
for list_item in result:
if not _is_dict_str_object(list_item):
if not is_dict_str_object(list_item):
continue
data_cy = _get_str_or_none_from_dict(list_item, "data_cy")
data_cy = get_str_or_none_from_dict(list_item, "data_cy")
selector = f'div[data-cy="{data_cy}"]' if data_cy else ""
# Convert to expected format for inference
@@ -196,10 +296,10 @@ async def extract_all_form_fields(page: PageLike, container_selector: str = "bod
if isinstance(dict_val, (str, bool)):
metadata_for_inference[dict_key] = dict_val
field_type = _infer_field_type_from_metadata(metadata_for_inference)
label = _get_str_from_dict(list_item, "label")
required = _get_bool_from_dict(list_item, "required")
disabled = _get_bool_from_dict(list_item, "disabled")
label = get_str_from_dict(list_item, "label")
required = get_bool_from_dict(list_item, "required")
disabled = get_bool_from_dict(list_item, "disabled")
form_fields.append(
FormField(
@@ -215,33 +315,54 @@ async def extract_all_form_fields(page: PageLike, container_selector: str = "bod
return form_fields
async def extract_field_value(page: PageLike, selector: str) -> str | None:
"""Extract current value of form field.
async def extract_field_value(page: PageLike, selector: str) -> list[str] | str | None:
"""Extract current value of form field, handling multi-select chips.
Args:
page: Browser page instance
selector: CSS selector for the field
Returns:
Current field value or None
- list[str] for multi-select fields (MUI Autocomplete chips)
- str for single-value fields
- None if field not found or empty
"""
escaped = escape_selector(selector)
result = await page.evaluate(
f"""
((selector) => {{
const field = document.querySelector(selector);
if (!field) return null;
const input = field.querySelector('input');
if (input) return input.value;
// Check for MUI Autocomplete chips (multi-select)
const chips = field.querySelectorAll('.MuiChip-label');
if (chips.length > 0) {{
return Array.from(chips).map(c => c.textContent.trim());
}}
// Single input value
const input = field.querySelector('input');
if (input && input.value) return input.value;
// Combobox text (for selects)
const select = field.querySelector('[role="combobox"]');
if (select) return select.textContent.trim();
if (select) {{
const text = select.textContent.trim();
// Exclude placeholder text
if (text && text !== 'Select...' && text !== 'Choose...') {{
return text;
}}
}}
return null;
}})('{selector}');
}})('{escaped}');
"""
)
# Handle multi-select (list of chip labels)
if is_list_of_objects(result):
return [str(item) for item in result if isinstance(item, str)]
return str(result) if isinstance(result, str) else None
@@ -273,7 +394,7 @@ async def extract_field_state(page: PageLike, selector: str) -> dict[str, bool]:
"""
)
if _is_dict_str_object(result):
if is_dict_str_object(result):
state_dict: dict[str, bool] = {}
for dict_key, dict_val in result.items():
if isinstance(dict_val, bool):
@@ -282,115 +403,9 @@ async def extract_field_state(page: PageLike, selector: str) -> dict[str, bool]:
return {}
def extract_field_from_html(html: str) -> FormField:
"""Extract single form field metadata from HTML.
Args:
html: HTML string containing a form field
Returns:
FormField metadata
"""
# Extract data-cy attribute
data_cy_match = re.search(r'data-cy="([^"]+)"', html)
data_cy = data_cy_match.group(1) if data_cy_match else None
# Extract label text
label_match = re.search(
r'<label[^>]*>(?:<[^>]*>)*([^<]+)(?:</[^>]*>)*</label>', html
)
label = label_match.group(1).strip() if label_match else ""
# Infer field type from data-cy prefix and HTML structure
field_type = _infer_field_type_from_html(data_cy or "", html)
# Check for required indicators
required = bool(
re.search(r'required(?:\s|>|=)', html)
or re.search(r'aria-required="true"', html)
or "MuiInputLabel-asterisk" in html
)
# Check for disabled indicators
disabled = bool(
re.search(r'disabled(?:\s|>|=)', html)
or re.search(r'aria-disabled="true"', html)
or "Mui-disabled" in html
)
# Build selector from data-cy
selector = f'div[data-cy="{data_cy}"]' if data_cy else ""
return FormField(
label=label,
selector=selector,
field_type=field_type,
data_cy=data_cy,
required=required,
disabled=disabled,
)
def extract_all_fields_from_html(html: str) -> list[FormField]:
"""Extract all form fields from HTML.
Args:
html: HTML string containing multiple form fields
Returns:
List of FormField metadata
"""
fields: list[FormField] = []
# Find all data-cy attributes that look like form fields
data_cy_matches = re.finditer(r'<div[^>]*data-cy="([^"]+)"[^>]*>', html)
for match in data_cy_matches:
_data_cy = match.group(1)
# Extract the container for this field (simplified)
start_pos = match.start()
end_pos = html.find("</div>", start_pos) + 6
field_html = html[start_pos:end_pos]
field = extract_field_from_html(field_html)
fields.append(field)
return fields
# Helper functions
def _infer_field_type_from_html(data_cy: str, html: str) -> str:
"""Infer field type from data-cy attribute and HTML structure."""
# Use data-cy prefix to infer type
if "field-menu" in data_cy:
return "select"
if "field-user" in data_cy:
return "user"
if "field-text" in data_cy:
return "text"
if "number" in data_cy:
return "number"
if "-supplier-" in data_cy or "contracts-" in data_cy or "events-" in data_cy:
if "MuiAutocomplete-root" in html:
if "-supplier-" in data_cy:
return "autocomplete"
return "relationship"
# Fall back to HTML structure analysis
if "MuiAutocomplete-root" in html:
return "autocomplete"
if 'role="combobox"' in html and "MuiSelect" in html:
return "select"
if 'type="number"' in html:
return "number"
if 'type="text"' in html or "<input" in html:
return "text"
return "unknown"
def _infer_field_type_from_metadata(metadata: dict[str, str | bool]) -> str:
"""Infer field type from extracted metadata."""
data_cy = metadata.get("data_cy", "")
@@ -404,7 +419,9 @@ def _infer_field_type_from_metadata(metadata: dict[str, str | bool]) -> str:
return "text"
if "number" in str(data_cy):
return "number"
if any(pattern in str(data_cy) for pattern in ["-supplier-", "contracts-", "events-"]):
if any(
pattern in str(data_cy) for pattern in ["-supplier-", "contracts-", "events-"]
):
if metadata.get("has_autocomplete"):
if "-supplier-" in str(data_cy):
return "autocomplete"

View File

@@ -12,8 +12,8 @@ from typing import Protocol, cast
from websockets.asyncio.server import Server, ServerConnection, serve
from guide.app.errors import BrowserConnectionError
from guide.app.browser.types import PageLocator
from guide.app.errors import BrowserConnectionError
_logger = logging.getLogger(__name__)
@@ -84,7 +84,10 @@ class ExtensionPage:
def _escape_selector(self, selector: str) -> str:
"""Escape a CSS selector for use in JavaScript single-quoted strings."""
return selector.replace("\\", "\\\\").replace("'", "\\'")
# Local import to avoid circular import with elements.mui
from guide.app.browser.elements.mui import escape_selector
return escape_selector(selector)
async def _send_command(
self,

View File

@@ -1,491 +0,0 @@
"""Integration tests for extracting and inferring form elements from raw HTML.
These tests validate the ability to parse raw HTML dumps from pages and
automatically extract form fields with inferred types, mapping them to
appropriate helper functions for automation.
"""
from __future__ import annotations
import re
from typing import TypedDict
class FormField(TypedDict):
"""Extracted form field metadata."""
label: str
selector: str
field_type: str # text, textarea, number, select, autocomplete, user, relationship
data_cy: str | None
required: bool
disabled: bool
class TestHTMLFormExtraction:
"""Test extraction of form fields from raw HTML."""
def test_extract_select_field_from_html(self) -> None:
"""Extract MUI Select field from HTML."""
html = """
<div data-cy="board-item-field-menu-status" class="MuiInputBase-root">
<label for="status">Status</label>
<div role="combobox" aria-expanded="false" aria-haspopup="listbox"
id="mui-component-select-status" class="MuiSelect-select">
Not Started
</div>
<input name="status" value="Not Started">
</div>
"""
# Extract field metadata
field = self._extract_field_from_html(html)
assert field["label"] == "Status"
assert field["field_type"] == "select"
assert field["data_cy"] == "board-item-field-menu-status"
assert "role=\"combobox\"" in html or "role=\"button\"" in html
def test_extract_autocomplete_field_from_html(self) -> None:
"""Extract MUI Autocomplete field from HTML."""
html = """
<div data-cy="board-item-field-supplier-supplier">
<label for=":r1h4:" id=":r1h4:-label">Supplier</label>
<div class="MuiAutocomplete-root">
<input role="combobox" aria-autocomplete="list"
class="MuiAutocomplete-input">
<button class="MuiAutocomplete-popupIndicator"></button>
</div>
</div>
"""
field = self._extract_field_from_html(html)
assert field["label"] == "Supplier"
assert field["field_type"] == "autocomplete"
assert field["data_cy"] == "board-item-field-supplier-supplier"
assert "MuiAutocomplete-root" in html
def test_extract_text_field_from_html(self) -> None:
"""Extract text input field from HTML."""
html = """
<div data-cy="board-item-field-text-description">
<label for=":r1h8:" id=":r1h8:-label">Description</label>
<div class="MuiInputBase-root MuiInput-root">
<input type="text" id=":r1h8:" />
</div>
</div>
"""
field = self._extract_field_from_html(html)
assert field["label"] == "Description"
assert field["field_type"] == "text"
assert field["data_cy"] == "board-item-field-text-description"
def test_extract_number_field_from_html(self) -> None:
"""Extract number input field from HTML."""
html = """
<div data-cy="board-item-number-contract_value">
<label for=":r1ho:" id=":r1ho:-label">Contract Value</label>
<div class="MuiInputBase-root MuiInput-root">
<input type="number" id=":r1ho:" />
</div>
</div>
"""
field = self._extract_field_from_html(html)
assert field["label"] == "Contract Value"
assert field["field_type"] == "number"
assert field["data_cy"] == "board-item-number-contract_value"
def test_extract_user_field_from_html(self) -> None:
"""Extract user picker field from HTML."""
html = """
<div data-cy="board-item-field-user-owner">
<label for=":r1ha:" id=":r1ha:-label">Assigned Owner</label>
<div class="MuiAutocomplete-root">
<input role="combobox" aria-autocomplete="list">
</div>
</div>
"""
field = self._extract_field_from_html(html)
assert field["label"] == "Assigned Owner"
assert field["field_type"] == "user"
assert field["data_cy"] == "board-item-field-user-owner"
def test_extract_relationship_field_from_html(self) -> None:
"""Extract relationship/link field from HTML."""
html = """
<div data-cy="board-item-field-contracts-contracts">
<label for=":r1hp:" id=":r1hp:-label">Contracts</label>
<div class="MuiAutocomplete-root MuiAutocomplete-hasPopupIcon">
<input role="combobox">
</div>
</div>
"""
field = self._extract_field_from_html(html)
assert field["label"] == "Contracts"
assert field["field_type"] == "relationship"
assert field["data_cy"] == "board-item-field-contracts-contracts"
def test_detect_required_field_from_html(self) -> None:
"""Detect required fields from HTML."""
html = """
<div data-cy="board-item-field-menu-status" class="MuiFormControl-root">
<label class="MuiInputLabel-asterisk" for="status">
Status<span aria-hidden="true"> *</span>
</label>
<input name="status" required aria-required="true">
</div>
"""
field = self._extract_field_from_html(html)
# Required indicators: asterisk class, required attribute, aria-required
assert (
"MuiInputLabel-asterisk" in html
or "required" in html
or "aria-required=\"true\"" in html
)
assert field["required"] is True
def test_detect_disabled_field_from_html(self) -> None:
"""Detect disabled fields from HTML."""
html = """
<div data-cy="board-item-field-user-created_by_user_jsonb">
<label class="Mui-disabled" for=":r1ib:" id=":r1ib:-label">Created By</label>
<div class="MuiInputBase-root Mui-disabled">
<input disabled aria-disabled="true">
</div>
</div>
"""
field = self._extract_field_from_html(html)
# Disabled indicators: Mui-disabled class, disabled attribute, aria-disabled
assert (
"Mui-disabled" in html
or "disabled" in html
or "aria-disabled=\"true\"" in html
)
assert field["disabled"] is True
def test_extract_multiple_fields_from_html(self) -> None:
"""Extract all fields from HTML containing multiple form elements."""
html = """
<div class="form-container">
<div data-cy="board-item-field-menu-status">
<label for="status">Status</label>
<div role="combobox"></div>
</div>
<div data-cy="board-item-field-text-description">
<label for="description">Description</label>
<input type="text">
</div>
<div data-cy="board-item-number-contract_value">
<label for="value">Contract Value</label>
<input type="number">
</div>
</div>
"""
fields = self._extract_all_fields_from_html(html)
assert len(fields) == 3
assert fields[0]["label"] == "Status"
assert fields[1]["label"] == "Description"
assert fields[2]["label"] == "Contract Value"
# Helper methods for extraction logic
def _extract_field_from_html(self, html: str) -> FormField:
"""Extract single form field metadata from HTML."""
# Extract data-cy attribute
data_cy_match = re.search(r'data-cy="([^"]+)"', html)
data_cy = data_cy_match.group(1) if data_cy_match else None
# Extract label text
label_match = re.search(
r'<label[^>]*>(?:<[^>]*>)*([^<]+)(?:</[^>]*>)*</label>', html
)
label = label_match.group(1).strip() if label_match else ""
# Infer field type from data-cy prefix and HTML structure
field_type = self._infer_field_type(data_cy or "", html)
# Check for required indicators
required = bool(
re.search(r'required(?:\s|>|=)', html)
or re.search(r'aria-required="true"', html)
or "MuiInputLabel-asterisk" in html
)
# Check for disabled indicators
disabled = bool(
re.search(r'disabled(?:\s|>|=)', html)
or re.search(r'aria-disabled="true"', html)
or "Mui-disabled" in html
)
# Build selector from data-cy
selector = f'div[data-cy="{data_cy}"]' if data_cy else ""
return FormField(
label=label,
selector=selector,
field_type=field_type,
data_cy=data_cy,
required=required,
disabled=disabled,
)
def _infer_field_type(self, data_cy: str, html: str) -> str:
"""Infer field type from data-cy attribute and HTML structure."""
# Use data-cy prefix to infer type
if "field-menu" in data_cy:
return "select"
if "field-user" in data_cy:
return "user"
if "field-text" in data_cy:
return "text"
if "number" in data_cy:
return "number"
if "-supplier-" in data_cy or "contracts-" in data_cy or "events-" in data_cy:
if "MuiAutocomplete-root" in html:
if "-supplier-" in data_cy:
return "autocomplete"
return "relationship"
# Fall back to HTML structure analysis
if "MuiAutocomplete-root" in html:
return "autocomplete"
if 'role="combobox"' in html and "MuiSelect" in html:
return "select"
if 'type="number"' in html:
return "number"
if 'type="text"' in html or "<input" in html:
return "text"
return "unknown"
def _extract_all_fields_from_html(self, html: str) -> list[FormField]:
"""Extract all form fields from HTML."""
fields: list[FormField] = []
# Find all data-cy attributes that look like form fields
data_cy_matches = re.finditer(r'<div[^>]*data-cy="([^"]+)"[^>]*>', html)
for match in data_cy_matches:
_data_cy = match.group(1)
# Extract the container for this field (simplified)
start_pos = match.start()
end_pos = html.find("</div>", start_pos) + 6
field_html = html[start_pos:end_pos]
field = self._extract_field_from_html(field_html)
fields.append(field)
return fields
class TestFieldTypeInferenceFromHTML:
"""Test field type inference from HTML structure."""
def test_infer_select_from_role_button(self) -> None:
"""Infer Select field from role=button."""
html = '<div role="button" aria-haspopup="listbox" class="MuiSelect-select">'
assert 'role="button"' in html
assert "listbox" in html
# This indicates MUI Select component
field_type = "select"
assert field_type == "select"
def test_infer_autocomplete_from_mui_class(self) -> None:
"""Infer Autocomplete field from MuiAutocomplete-root class."""
html = '<div class="MuiAutocomplete-root"><input role="combobox"></div>'
assert "MuiAutocomplete-root" in html
assert 'role="combobox"' in html
field_type = "autocomplete"
assert field_type == "autocomplete"
def test_infer_text_from_input_type(self) -> None:
"""Infer text field from input type."""
html = '<input type="text" class="MuiInputBase-input">'
assert 'type="text"' in html
field_type = "text"
assert field_type == "text"
def test_infer_number_from_input_type(self) -> None:
"""Infer number field from input type."""
html = '<input type="number" class="MuiInputBase-input">'
assert 'type="number"' in html
field_type = "number"
assert field_type == "number"
def test_infer_user_from_data_cy_pattern(self) -> None:
"""Infer user field from data-cy pattern."""
data_cy = "board-item-field-user-owner"
assert "field-user" in data_cy
field_type = "user"
assert field_type == "user"
def test_infer_relationship_from_data_cy_pattern(self) -> None:
"""Infer relationship field from data-cy pattern."""
data_cy = "board-item-field-contracts-contracts"
assert "contracts" in data_cy
field_type = "relationship"
assert field_type == "relationship"
class TestFormExtractionFromTroubleshooting2:
"""Test form extraction using real HTML from troubleshooting2.md."""
def test_extract_status_field(self) -> None:
"""Extract Status field from troubleshooting2.md."""
# Real field from troubleshooting2.md
field_metadata = {
"label": "Status",
"data_cy": "board-item-field-menu-status",
"field_type": "select",
"selector": 'div[data-cy="board-item-field-menu-status"]',
}
assert field_metadata["label"] == "Status"
assert field_metadata["field_type"] == "select"
assert "board-item-field-menu-status" in field_metadata["data_cy"]
def test_extract_supplier_field(self) -> None:
"""Extract Supplier field from troubleshooting2.md."""
field_metadata = {
"label": "Supplier",
"data_cy": "board-item-field-supplier-supplier",
"field_type": "autocomplete",
"selector": 'div[data-cy="board-item-field-supplier-supplier"]',
}
assert field_metadata["label"] == "Supplier"
assert field_metadata["field_type"] == "autocomplete"
assert "supplier" in field_metadata["data_cy"]
def test_extract_description_field(self) -> None:
"""Extract Description field from troubleshooting2.md."""
field_metadata = {
"label": "Description",
"data_cy": "board-item-field-text-description",
"field_type": "text",
"selector": 'div[data-cy="board-item-field-text-description"]',
}
assert field_metadata["label"] == "Description"
assert field_metadata["field_type"] == "text"
assert "text-description" in field_metadata["data_cy"]
def test_extract_contract_value_field(self) -> None:
"""Extract Contract Value field from troubleshooting2.md."""
field_metadata = {
"label": "Contract Value",
"data_cy": "board-item-number-contract_value",
"field_type": "number",
"selector": 'div[data-cy="board-item-number-contract_value"]',
}
assert field_metadata["label"] == "Contract Value"
assert field_metadata["field_type"] == "number"
assert "number" in field_metadata["data_cy"]
def test_extract_user_fields(self) -> None:
"""Extract user picker fields from troubleshooting2.md."""
user_fields = [
"Assigned Owner",
"Requested User",
"Sourcing Agent",
"Created By",
"Updated By",
]
# All should be inferred as user fields
for label in user_fields:
assert len(label) > 0
# Would be inferred as type: user
def test_extract_relationship_fields(self) -> None:
"""Extract relationship fields from troubleshooting2.md."""
relationship_fields = [
"Contracts",
"Document requests",
"Purchase Orders",
"Sourcing Events",
]
# All should be inferred as relationship fields
for label in relationship_fields:
assert len(label) > 0
# Would be inferred as type: relationship
def test_extract_disabled_fields(self) -> None:
"""Extract disabled fields from troubleshooting2.md."""
disabled_fields = [
"Created By",
"Updated At",
"Updated By",
"Created At",
]
# All should be detected as disabled
for _label in disabled_fields:
# These fields have Mui-disabled class in HTML
is_disabled = True # Would be inferred from HTML
assert is_disabled is True
class TestSelectorConstruction:
"""Test selector construction from extracted field metadata."""
def test_construct_data_cy_selector(self) -> None:
"""Construct selector from data-cy attribute."""
data_cy = "board-item-field-menu-status"
selector = f'div[data-cy="{data_cy}"]'
assert selector == 'div[data-cy="board-item-field-menu-status"]'
def test_construct_input_selector_from_field(self) -> None:
"""Construct input selector within field container."""
data_cy = "board-item-field-text-description"
field_selector = f'div[data-cy="{data_cy}"]'
input_selector = f'{field_selector} input'
assert input_selector == 'div[data-cy="board-item-field-text-description"] input'
def test_construct_combobox_selector(self) -> None:
"""Construct combobox selector for autocomplete fields."""
data_cy = "board-item-field-supplier-supplier"
field_selector = f'div[data-cy="{data_cy}"]'
combobox_selector = f'{field_selector} input[role="combobox"]'
assert (
combobox_selector
== 'div[data-cy="board-item-field-supplier-supplier"] input[role="combobox"]'
)
def test_construct_select_trigger_selector(self) -> None:
"""Construct trigger selector for select fields."""
data_cy = "board-item-field-menu-status"
field_selector = f'div[data-cy="{data_cy}"]'
trigger_selector = f'{field_selector} div[role="combobox"]'
assert (
trigger_selector
== 'div[data-cy="board-item-field-menu-status"] div[role="combobox"]'
)

13
worker/Dockerfile Normal file
View File

@@ -0,0 +1,13 @@
FROM python:3.12-slim
WORKDIR /app
# Install deps
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy code
COPY worker.py .
# Run the worker
CMD ["python", "worker.py"]

2
worker/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
redis==5.0.4
requests==2.32.3

172
worker/worker.py Normal file
View File

@@ -0,0 +1,172 @@
from __future__ import annotations
import json
import math
import os
import random
import signal
import time
from collections.abc import Callable
from types import FrameType
from typing import TypedDict, cast
import redis
import requests
from redis import Redis
REDIS_URL: str = os.getenv("REDIS_URL", "redis://redis:6379/0")
BROWSERLESS_URL: str = os.getenv("BROWSERLESS_URL", "http://browserless:3000")
DEFAULT_TARGET_URL: str = os.getenv(
"DEFAULT_TARGET_URL", "https://stg.raindrop.com"
)
JOBS_QUEUE: str = os.getenv("JOBS_QUEUE", "jobs")
DEAD_LETTER_QUEUE: str = os.getenv("DEAD_LETTER_QUEUE", "jobs:dead")
MAX_RETRIES: int = int(os.getenv("MAX_RETRIES", "3"))
BACKOFF_BASE_SECONDS: float = float(os.getenv("BACKOFF_BASE_SECONDS", "1.5"))
BACKOFF_MAX_SECONDS: float = float(os.getenv("BACKOFF_MAX_SECONDS", "30"))
BROWSERLESS_TIMEOUT_SECONDS: int = int(
os.getenv("BROWSERLESS_TIMEOUT_SECONDS", "60")
)
class JobPayload(TypedDict, total=False):
type: str
url: str
_attempt: int
def get_redis_client() -> Redis[bytes]:
return redis.from_url(REDIS_URL)
def _with_stop_on_signal(
stop_flag: dict[str, bool]
) -> Callable[[int, FrameType | None], None]:
def handler(signum: int, frame: FrameType | None) -> None:
_ = frame
print(f"[worker] Received signal {signum}, shutting down after current job...")
stop_flag["stop"] = True
return handler
def handle_job(job: JobPayload) -> None:
"""
Example job payload:
{ "type": "screenshot", "url": "https://stg.raindrop.com" }
"""
job_type: str = str(job.get("type", "screenshot"))
if job_type == "screenshot":
url: str = str(job.get("url") or DEFAULT_TARGET_URL)
print(f"[worker] Taking screenshot of: {url}")
resp = requests.post(
f"{BROWSERLESS_URL}/screenshot",
json={
"url": url,
"options": {
"fullPage": True,
},
},
timeout=BROWSERLESS_TIMEOUT_SECONDS,
)
resp.raise_for_status()
print(f"[worker] Screenshot taken, {len(resp.content)} bytes")
else:
print(f"[worker] Unknown job type: {job_type}")
def _calculate_backoff(attempt: int) -> float:
multiplier: float = math.pow(2.0, float(max(0, attempt - 1)))
base: float = BACKOFF_BASE_SECONDS * multiplier
capped: float = min(BACKOFF_MAX_SECONDS, base)
jitter = random.uniform(0, capped / 2)
return capped + jitter
def _parse_job(raw: bytes) -> JobPayload | None:
try:
loaded = cast(object, json.loads(raw.decode("utf-8")))
except json.JSONDecodeError:
print(f"[worker] Invalid JSON job: {raw!r}")
return None
if not isinstance(loaded, dict):
print(f"[worker] Job is not an object: {loaded!r}")
return None
payload = cast(dict[str, object], loaded)
job: JobPayload = {}
type_value = payload.get("type")
if isinstance(type_value, str):
job["type"] = type_value
url_value = payload.get("url")
if isinstance(url_value, str):
job["url"] = url_value
attempt_value = payload.get("_attempt")
if isinstance(attempt_value, int):
job["_attempt"] = attempt_value
return job
def main() -> None:
r: Redis[bytes] = get_redis_client()
stop_flag: dict[str, bool] = {"stop": False}
for sig in (signal.SIGTERM, signal.SIGINT):
_ = signal.signal(sig, _with_stop_on_signal(stop_flag))
print(f"[worker] Connected to Redis at {REDIS_URL}")
print(f"[worker] Using Browserless at {BROWSERLESS_URL}")
print(f"[worker] Default URL: {DEFAULT_TARGET_URL}")
print(f"[worker] Listening on queue: {JOBS_QUEUE}")
while not stop_flag["stop"]:
try:
blpop_result: tuple[bytes, bytes] | None = r.blpop([JOBS_QUEUE], timeout=5)
if stop_flag["stop"]:
break
if blpop_result is None:
continue
_, raw = blpop_result
job = _parse_job(raw)
if job is None:
continue
attempt = int(job.get("_attempt", 0))
try:
handle_job(job)
except Exception as e: # broad catch to keep the worker alive
attempt += 1
job["_attempt"] = attempt
if attempt >= MAX_RETRIES:
_ = r.lpush(DEAD_LETTER_QUEUE, json.dumps(job))
print(
f"[worker] Error handling job after {attempt} attempts; pushed to {DEAD_LETTER_QUEUE}: {e}"
)
else:
backoff = _calculate_backoff(attempt)
print(
f"[worker] Error handling job (attempt {attempt}/{MAX_RETRIES}); retrying in {backoff:.1f}s: {e}"
)
time.sleep(backoff)
_ = r.rpush(JOBS_QUEUE, json.dumps(job))
except redis.RedisError as e:
print(f"[worker] Redis error: {e}, retrying in 5s...")
time.sleep(5)
if __name__ == "__main__":
main()