Enhance browser automation by introducing PageLike protocol for unified page handling across Playwright and extension contexts. Update actions and helpers to utilize PageLike, improving dropdown and form interactions. Add new browser element helpers for streamlined UI automation.

This commit is contained in:
2025-11-24 02:46:12 +00:00
parent 648c657e04
commit 2fad7ccbee
22 changed files with 851 additions and 544 deletions

View File

@@ -2,7 +2,7 @@
// Fires at document_start on all pages, and on focus/visibility/page-show.
(function () {
const MSG = { type: "terminator_content_handshake" };
function sendHandshake() {
const sendHandshake = () => {
try {
chrome.runtime.sendMessage(MSG, () => {
// ignore response
@@ -10,7 +10,7 @@
} catch (_) {
// Ignore if not allowed on special pages
}
}
};
// Initial handshake as early as possible
sendHandshake();
@@ -20,7 +20,9 @@
document.addEventListener(
"visibilitychange",
() => {
if (document.visibilityState === "visible") sendHandshake();
if (document.visibilityState === "visible") {
sendHandshake();
}
},
{ capture: false, passive: true }
);

View File

@@ -21,7 +21,9 @@ self.disableTerminatorDebug = () => {
debugEnabled = false;
};
function log(...args) {
if (!debugEnabled) return;
if (!debugEnabled) {
return;
}
console.log("[TerminatorBridge]", ...args);
}
@@ -85,7 +87,9 @@ function connect() {
log("Invalid JSON", event.data);
return;
}
if (!msg || !msg.action) return;
if (!msg || !msg.action) {
return;
}
if (msg.action === "eval") {
const { id, code, awaitPromise = true } = msg;
@@ -152,7 +156,9 @@ function ensureConnected() {
// Ensure we have an active WS connection on first message from content script
chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
try {
if (!message || message.type !== "terminator_content_handshake") return;
if (!message || message.type !== "terminator_content_handshake") {
return;
}
log("Received handshake from content script", {
tab: sender.tab && sender.tab.id,
});
@@ -172,7 +178,9 @@ chrome.runtime.onMessage.addListener((message, sender, sendResponse) => {
// Inject a lightweight handshake into the active tab when it changes/updates
async function injectHandshake(tabId) {
try {
if (typeof tabId !== "number") return;
if (typeof tabId !== "number") {
return;
}
await chrome.scripting.executeScript({
target: { tabId },
func: () => {
@@ -221,7 +229,9 @@ chrome.alarms.onAlarm.addListener((alarm) => {
chrome.tabs.onActivated.addListener(async () => {
ensureConnected();
const tabId = await getActiveTabIdSafe();
if (tabId != null) injectHandshake(tabId);
if (tabId != null) {
injectHandshake(tabId);
}
});
chrome.tabs.onUpdated.addListener(async (tabId, changeInfo) => {
if (
@@ -245,7 +255,7 @@ chrome.tabs.onRemoved.addListener((tabId) => {
// Clean up when debugger is manually detached (user clicked Cancel)
chrome.debugger.onDetach.addListener((source, reason) => {
const tabId = source.tabId;
const {tabId} = source;
if (attachedTabs.has(tabId)) {
attachedTabs.delete(tabId);
enabledTabs.delete(tabId); // Also clean enabled domains state
@@ -255,7 +265,9 @@ chrome.debugger.onDetach.addListener((source, reason) => {
});
function scheduleReconnect() {
if (reconnectTimer) return;
if (reconnectTimer) {
return;
}
const delay = currentReconnectDelayMs;
currentReconnectDelayMs = Math.min(
@@ -326,7 +338,9 @@ async function getActiveTabId() {
active: true,
lastFocusedWindow: true,
});
if (!tab || tab.id == null) throw new Error("No active tab");
if (!tab || tab.id == null) {
throw new Error("No active tab");
}
// Check if the tab URL is accessible for debugging
const url = tab.url || "";
@@ -351,9 +365,15 @@ async function getActiveTabId() {
function formatRemoteObject(obj) {
try {
if (obj === null || obj === undefined) return null;
if (Object.prototype.hasOwnProperty.call(obj, "value")) return obj.value;
if (obj.description !== undefined) return obj.description;
if (obj === null || obj === undefined) {
return null;
}
if (Object.prototype.hasOwnProperty.call(obj, "value")) {
return obj.value;
}
if (obj.description !== undefined) {
return obj.description;
}
return obj.type || null;
} catch (_) {
return null;
@@ -498,7 +518,7 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
* after that we'll get the `frameId` of the iframe to execute
raw js code inside the iframe's document to avoid CORS issue
* rewrite the raw js code so it removes the unneccesary `IFRAMESELCTOR` from eval code
* run user code safely by creating a Function rather than eval (still executes arbitrary code)
* run user code safely via eval inside an isolated function scope
*/
if (code.includes("IFRAMESELCTOR")) {
const frames = await chrome.webNavigation.getAllFrames({ tabId });
@@ -567,11 +587,15 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
const u2 = new URL(iframeSrc);
const host1 = u1.hostname.replace(/^www\./, "").toLowerCase();
const host2 = u2.hostname.replace(/^www\./, "").toLowerCase();
if (host1 !== host2) return false;
if (host1 !== host2) {
return false;
}
const p1 = u1.pathname.split("/").filter(Boolean);
const p2 = u2.pathname.split("/").filter(Boolean);
for (let i = 0; i < Math.min(3, p1.length, p2.length); i++) {
if (p1[i] !== p2[i]) return false;
if (p1[i] !== p2[i]) {
return false;
}
}
return true;
} catch {
@@ -601,16 +625,14 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
target: { tabId: tabId, frameIds: [frameId] },
world: "MAIN",
func: (userScript, shouldAwait) => {
return (async () => {
const fn = new Function(userScript);
const result = fn();
if (shouldAwait && result instanceof Promise) {
return await result;
}
const runUserScript = () => (0, eval)(userScript);
const result = runUserScript();
if (shouldAwait && result instanceof Promise) {
return result;
})();
}
return result;
},
args: [rewritten, !!awaitPromise],
args: [rewritten, Boolean(awaitPromise)],
});
if (!results || results.length === 0) {
@@ -628,15 +650,15 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
const perfStart = performance.now();
const timings = {};
// Auto-detect async IIFEs and set awaitPromise accordingly
const originalCode = code;
let shouldAwaitPromise = awaitPromise;
if (isAsyncIIFE(originalCode)) {
log("Detected async IIFE, forcing awaitPromise=true");
awaitPromise = true;
shouldAwaitPromise = true;
}
// Auto-detect and wrap code with top-level returns
code = wrapCodeIfNeeded(code);
const processedCode = wrapCodeIfNeeded(originalCode);
// Only attach if we haven't before
if (!attachedTabs.has(tabId)) {
@@ -973,7 +995,9 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
// Listen for console/log/exception events for this tab while the eval runs
onEvent = (source, method, params) => {
try {
if (!source || source.tabId !== tabId) return;
if (!source || source.tabId !== tabId) {
return;
}
if (method === "Runtime.consoleAPICalled") {
const level = params.type || "log";
const args = (params.args || []).map((a) => formatRemoteObject(a));
@@ -1011,8 +1035,8 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
try {
evalResult = await sendCommand(tabId, "Runtime.evaluate", {
expression: code,
awaitPromise: !!awaitPromise,
expression: processedCode,
awaitPromise: Boolean(shouldAwaitPromise),
returnByValue: true,
userGesture: true,
});
@@ -1027,7 +1051,7 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
// Check if we got "Illegal return statement" error and haven't wrapped yet
if (
exceptionDetails &&
code !== originalCode && // Already wrapped, don't retry
processedCode !== originalCode && // Already wrapped, don't retry
(exceptionDetails.text?.includes("Illegal return statement") ||
exceptionDetails.exception?.description?.includes(
"Illegal return statement",
@@ -1037,7 +1061,7 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
log("Still got 'Illegal return statement' after wrapping, not retrying");
} else if (
exceptionDetails &&
code === originalCode && // Not wrapped yet
processedCode === originalCode && // Not wrapped yet
(exceptionDetails.text?.includes("Illegal return statement") ||
exceptionDetails.exception?.description?.includes(
"Illegal return statement",
@@ -1055,7 +1079,7 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
evalResult = await sendCommand(tabId, "Runtime.evaluate", {
expression: wrappedCode,
awaitPromise: !!awaitPromise,
awaitPromise: Boolean(shouldAwaitPromise),
returnByValue: true,
userGesture: true,
});
@@ -1148,7 +1172,9 @@ async function evalInTab(tabId, code, awaitPromise, evalId) {
function debuggerAttach(tabId) {
return new Promise((resolve, reject) => {
chrome.debugger.attach({ tabId }, "1.3", (err) => {
if (chrome.runtime.lastError) return reject(chrome.runtime.lastError);
if (chrome.runtime.lastError) {
return reject(chrome.runtime.lastError);
}
resolve();
});
});
@@ -1164,7 +1190,9 @@ function sendCommand(tabId, method, params) {
return new Promise((resolve, reject) => {
chrome.debugger.sendCommand({ tabId }, method, params, (result) => {
const err = chrome.runtime.lastError;
if (err) return reject(err);
if (err) {
return reject(err);
}
resolve(result || {});
});
});

View File

@@ -30,6 +30,8 @@ dev = [
"pytest>=8.0.0",
"pytest-asyncio>=0.24.0",
"pytest-cov>=5.0.0",
"ruff>=0.14.6",
"pyrefly>=0.42.3",
]
[tool.pytest.ini_options]

View File

@@ -1,10 +1,9 @@
from playwright.async_api import Page
from typing import ClassVar, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.auth import DummyMfaCodeProvider, ensure_persona
from guide.app import errors
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.models.personas import PersonaStore
@@ -25,7 +24,7 @@ class LoginAsPersonaAction(DemoAction):
self._login_url = login_url
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
if context.persona_id is None:
raise errors.PersonaError("persona_id is required for login action")
persona = self._personas.get(context.persona_id)

View File

@@ -3,9 +3,8 @@ from collections.abc import Callable, Iterable, Mapping
from inspect import Parameter, signature
from typing import ClassVar, override, cast
from playwright.async_api import Page
from guide.app import errors
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionMetadata, ActionResult
@@ -21,7 +20,7 @@ class DemoAction(ABC):
category: ClassVar[str]
@abstractmethod
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Execute the action and return a result."""
...
@@ -84,7 +83,7 @@ class CompositeAction(DemoAction):
self.context: ActionContext | None = None
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Execute all child actions in sequence.
Args:

View File

@@ -6,10 +6,9 @@ browser interactions with minimal imports.
from typing import ClassVar, override
from playwright.async_api import Page
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.helpers import PageHelpers, AccordionCollapseResult
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
@@ -30,7 +29,7 @@ class CollapseAccordionsDemoAction(DemoAction):
category: ClassVar[str] = "demo"
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Collapse accordions on the current page.
Parameters:
@@ -94,9 +93,7 @@ class CollapseAccordionsDemoAction(DemoAction):
def _coerce_to_str(value: object, default: str) -> str:
"""Coerce a value to str, or return default if None."""
if value is None:
return default
return str(value)
return default if value is None else str(value)
def _coerce_to_int(value: object, default: int) -> int:
@@ -107,9 +104,7 @@ def _coerce_to_int(value: object, default: int) -> int:
return value
if isinstance(value, str):
return int(value)
if isinstance(value, float):
return int(value)
return default
return int(value) if isinstance(value, float) else default
__all__ = ["CollapseAccordionsDemoAction"]

View File

@@ -1,10 +1,9 @@
"""Diagnostic action to inspect page structure via extension client."""
from typing import ClassVar, override
from playwright.async_api import Page
from typing import ClassVar, cast, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
@@ -17,7 +16,7 @@ class DiagnosePageAction(DemoAction):
category: ClassVar[str] = "diagnostic"
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Run diagnostic JavaScript to inspect page structure."""
diagnostic_js = """
(() => {
@@ -55,4 +54,8 @@ class DiagnosePageAction(DemoAction):
result = await page.evaluate(diagnostic_js)
return ActionResult(details=result)
if not isinstance(result, dict):
return ActionResult(details={"result": result})
# Cast to dict[str, object] to satisfy type checker
details = cast(dict[str, object], result)
return ActionResult(details=details)

View File

@@ -1,10 +1,9 @@
import contextlib
from playwright.async_api import Page
from typing import ClassVar, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.helpers import PageHelpers
from guide.app.browser.elements.dropdown import select_combobox, select_multi, select_single
from guide.app.browser.elements.form import fill_date, fill_textarea
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
@@ -18,7 +17,7 @@ class FillIntakeBasicAction(DemoAction):
category: ClassVar[str] = "intake"
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
description_val = app_strings.intake.conveyor_belt_request
await page.fill(app_strings.intake.description_field, description_val)
await page.click(app_strings.intake.next_button)
@@ -34,7 +33,7 @@ class FillSourcingRequestAction(DemoAction):
category: ClassVar[str] = "intake"
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
"""Fill the sourcing request form with demo data.
Handles multi-select commodities, regions, and all other form fields.
@@ -42,213 +41,93 @@ class FillSourcingRequestAction(DemoAction):
import logging
_logger = logging.getLogger(__name__)
_logger.info("[FORM-FILL] Starting form fill action")
# Initialize PageHelpers for enhanced dropdown handling
helpers = PageHelpers(page)
# Fill commodities (multi-select autocomplete) using keyboard navigation
_logger.info("[FORM-FILL] Starting commodity fill")
commodities_result = await helpers.select_dropdown_options(
field_selector=app_strings.intake.commodity_field,
target_values=list(app_strings.intake.commodity_request),
close_after=True
# Commodities (multi)
commodities_result = await select_multi(
page,
app_strings.intake.commodity_field,
list(app_strings.intake.commodity_request),
)
_logger.info(
"[FORM-FILL] Commodities selected=%s not_found=%s available=%s",
commodities_result["selected"],
commodities_result["not_found"],
commodities_result["available_options"],
commodities_result["available"],
)
# Fill planned (single-select dropdown)
planned_result = await helpers.select_dropdown_options(
field_selector=app_strings.intake.planned_field,
target_values=[app_strings.intake.planned_request],
close_after=True,
# Planned (single)
planned_result = await select_single(
page,
app_strings.intake.planned_field,
app_strings.intake.planned_request,
)
_logger.info(
"[FORM-FILL] Planned selected=%s not_found=%s available=%s",
planned_result["selected"],
planned_result["not_found"],
planned_result["available_options"],
planned_result["available"],
)
# Fill regions (multi-select dropdown) using keyboard navigation
regions_result = await helpers.select_dropdown_options(
field_selector=app_strings.intake.regions_field,
target_values=list(app_strings.intake.regions_request),
close_after=True
# Regions (multi)
regions_result = await select_multi(
page,
app_strings.intake.regions_field,
list(app_strings.intake.regions_request),
)
_logger.info(
"[FORM-FILL] Regions selected=%s not_found=%s available=%s",
regions_result["selected"],
regions_result["not_found"],
regions_result["available_options"],
regions_result["available"],
)
# Fill OpEx/CapEx (single-select dropdown)
# OpEx/CapEx uses MUI Select (combobox), not Autocomplete; handle explicitly
opex_capex_target = app_strings.intake.opex_capex_request
opex_capex_field = app_strings.intake.opex_capex_field
opex_capex_result: dict[str, list[str]] = {"selected": [], "not_found": [], "available_options": []}
try:
already_script = """
(() => {
const field = document.querySelector('__FIELD__');
if (!field) return false;
const native = field.querySelector('input[name="opex_capex"]');
const val = native ? (native.value || '').toLowerCase() : '';
return val === '__TARGET_LOWER__';
})();
""".replace('__FIELD__', opex_capex_field).replace('__TARGET_LOWER__', opex_capex_target.lower())
already = await page.evaluate(already_script)
if already:
opex_capex_result["selected"] = [opex_capex_target]
else:
try:
await page.click(f"{opex_capex_field} svg[data-testid='ArrowDropDownIcon']")
opened = True
except Exception:
open_script = """
(() => {
const field = document.querySelector('__FIELD__');
if (!field) return false;
const combo = field.querySelector('[role="combobox"]');
if (!combo) return false;
combo.click();
return true;
})();
""".replace('__FIELD__', opex_capex_field)
opened = await page.evaluate(open_script)
if not opened:
raise RuntimeError("OpEx/CapEx combobox not opened")
await page.wait_for_timeout(200)
option_texts = await page.evaluate(
"""
(() => Array.from(document.querySelectorAll('[role="option"]')).map(o => (o.textContent || '').trim()))();
"""
)
_logger.info("[FORM-FILL] OpEx/CapEx available options: %s", option_texts)
opex_capex_result["available_options"] = option_texts
select_script = """
(() => {
const target = '__TARGET__'.toLowerCase();
const options = Array.from(document.querySelectorAll('[role="option"]'));
const canonical = {
'opex': 'OpEx',
'capex': 'CapEx',
'both': 'Both',
};
const desired = (canonical[target] || '__DESIRED__').toLowerCase();
let match = options.find(opt => (opt.textContent || '').trim().toLowerCase() === desired);
if (!match) {
match = options.find(opt => (opt.getAttribute('data-value') || '').trim().toLowerCase() === desired);
}
if (!match) return false;
match.click();
return true;
})();
""".replace('__TARGET__', opex_capex_target).replace('__DESIRED__', opex_capex_target)
selected = await page.evaluate(select_script)
await page.wait_for_timeout(150)
if selected:
opex_capex_result["selected"] = [opex_capex_target]
else:
opex_capex_result["not_found"] = [opex_capex_target]
except Exception as exc:
_logger.error("[FORM-FILL] OpEx/CapEx selection failed: %s", exc)
opex_capex_result["not_found"] = [opex_capex_target]
# Final fallback: force-set the native input if still not selected
if not opex_capex_result["selected"]:
with contextlib.suppress(Exception):
force_script = """
(() => {
const field = document.querySelector('__FIELD__');
if (!field) return false;
const native = field.querySelector('input[name="opex_capex"]');
const combo = field.querySelector('[role="combobox"]');
const val = '__VAL__';
if (native) {
native.value = val;
native.dispatchEvent(new Event('input', { bubbles: true }));
native.dispatchEvent(new Event('change', { bubbles: true }));
}
if (combo) {
combo.textContent = val;
}
return true;
})();
""".replace('__FIELD__', opex_capex_field).replace('__VAL__', opex_capex_target)
forced = await page.evaluate(force_script)
if forced:
opex_capex_result["selected"] = [opex_capex_target]
opex_capex_result["not_found"] = []
# OpEx/CapEx (combobox)
opex_capex_result = await select_combobox(
page,
app_strings.intake.opex_capex_field,
app_strings.intake.opex_capex_request,
)
_logger.info(
"[FORM-FILL] OpEx/CapEx selected=%s not_found=%s available=%s",
opex_capex_result["selected"],
opex_capex_result["not_found"],
opex_capex_result["available_options"],
opex_capex_result["available"],
)
# Fill target date (text input with MM/DD/YYYY placeholder)
await page.fill(
# Target date
await fill_date(
page,
app_strings.intake.target_date_field,
app_strings.intake.target_date_request,
)
# Fill description (required textarea)
await page.fill(
# Text areas
await fill_textarea(
page,
app_strings.intake.description_textarea,
app_strings.intake.description_request
app_strings.intake.description_request,
)
# Fill desired supplier name (textarea)
await page.fill(
await fill_textarea(
page,
app_strings.intake.desired_supplier_name_textarea,
app_strings.intake.desired_supplier_name_request
app_strings.intake.desired_supplier_name_request,
)
# Fill desired supplier contact (textarea)
await page.fill(
await fill_textarea(
page,
app_strings.intake.desired_supplier_contact_textarea,
app_strings.intake.desired_supplier_contact_request
app_strings.intake.desired_supplier_contact_request,
)
# Fill reseller (textarea)
await page.fill(
await fill_textarea(
page,
app_strings.intake.reseller_textarea,
app_strings.intake.reseller_request
app_strings.intake.reseller_request,
)
# Fill entity (autocomplete)
await page.evaluate(f"""
(() => {{
const entityContainer = document.querySelector('{app_strings.intake.entity_field}');
const entityInput = entityContainer?.querySelector('input');
if (!entityInput) return false;
entityInput.click();
return true;
}})();
""")
await page.wait_for_timeout(300)
entity_field_selector = f'{app_strings.intake.entity_field} input'
await page.fill(entity_field_selector, app_strings.intake.entity_request)
# Click the matching option using in-page JS (works in extension mode)
await page.evaluate(f"""
(() => {{
const target = '{app_strings.intake.entity_request}'.trim();
const options = Array.from(document.querySelectorAll('[role="option"]'));
const match = options.find(opt => opt.textContent.trim() === target);
if (!match) throw new Error('Entity option not found: ' + target);
match.click();
return true;
}})();
""")
# Entity (autocomplete single-select)
await select_single(
page,
app_strings.intake.entity_field,
app_strings.intake.entity_request,
)
return ActionResult(
details={

View File

@@ -1,8 +1,7 @@
from playwright.async_api import Page
from typing import ClassVar, override
from guide.app.actions.base import DemoAction, register_action
from guide.app.browser.types import PageLike
from guide.app.models.domain import ActionContext, ActionResult
from guide.app.strings.registry import app_strings
@@ -14,7 +13,7 @@ class AddThreeSuppliersAction(DemoAction):
category: ClassVar[str] = "sourcing"
@override
async def run(self, page: Page, context: ActionContext) -> ActionResult:
async def run(self, page: PageLike, context: ActionContext) -> ActionResult:
suppliers = app_strings.sourcing.default_trio
for supplier in suppliers:
await page.fill(app_strings.sourcing.supplier_search_input, supplier)

View File

@@ -5,6 +5,7 @@ from fastapi import FastAPI
from guide.app.actions.registry import ActionRegistry
from guide.app.auth import DummyMfaCodeProvider, ensure_persona
from guide.app.browser.client import BrowserClient
from guide.app.browser.types import PageLike
from guide.app import errors
from guide.app.core.config import AppSettings
from guide.app.core.logging import LoggingManager
@@ -95,11 +96,12 @@ async def execute_action(
try:
async with browser_client.open_page(target_host_id) as page:
page_like: PageLike = cast(PageLike, page)
if persona:
await ensure_persona(
page, persona, mfa_provider, login_url=settings.raindrop_base_url
page_like, persona, mfa_provider, login_url=settings.raindrop_base_url
)
result = await action.run(page, context)
result = await action.run(page_like, context)
except errors.GuideError as exc:
return ActionEnvelope(
status=ActionStatus.ERROR,

View File

@@ -1,11 +1,10 @@
from playwright.async_api import Page
from guide.app.auth.mfa import MfaCodeProvider
from guide.app.browser.types import PageLike
from guide.app.models.personas.models import DemoPersona
from guide.app.strings.registry import app_strings
async def detect_current_persona(page: Page) -> str | None:
async def detect_current_persona(page: PageLike) -> str | None:
"""Return the email/identifier of the currently signed-in user, if visible."""
element = page.locator(app_strings.auth.current_user_display)
if await element.count() == 0:
@@ -20,7 +19,7 @@ async def detect_current_persona(page: Page) -> str | None:
async def login_with_mfa(
page: Page, email: str, mfa_provider: MfaCodeProvider, login_url: str | None = None
page: PageLike, email: str, mfa_provider: MfaCodeProvider, login_url: str | None = None
) -> None:
"""Log in with MFA. Only proceeds if email input exists after navigation."""
email_input = page.locator(app_strings.auth.email_input)
@@ -45,7 +44,7 @@ async def login_with_mfa(
await page.click(app_strings.auth.submit_button)
async def logout(page: Page) -> None:
async def logout(page: PageLike) -> None:
"""Log out if the logout button exists."""
logout_btn = page.locator(app_strings.auth.logout_button)
if await logout_btn.count() > 0:
@@ -53,7 +52,7 @@ async def logout(page: Page) -> None:
async def ensure_persona(
page: Page,
page: PageLike,
persona: DemoPersona,
mfa_provider: MfaCodeProvider,
login_url: str | None = None,

View File

@@ -3,6 +3,7 @@ from collections.abc import AsyncIterator
from playwright.async_api import Page
from guide.app.browser.extension_client import ExtensionPage
from guide.app.browser.pool import BrowserPool
@@ -28,7 +29,9 @@ class BrowserClient:
self.pool: BrowserPool = pool
@contextlib.asynccontextmanager
async def open_page(self, host_id: str | None = None) -> AsyncIterator[Page]:
async def open_page(
self, host_id: str | None = None
) -> AsyncIterator[Page | ExtensionPage]:
"""Get a fresh page from the pool with guaranteed isolation.
For headless mode: allocates a new context and page, closes context after use.

View File

@@ -7,12 +7,11 @@ when actions fail, enabling better debugging in headless/CI environments.
import base64
from datetime import datetime, timezone
from playwright.async_api import Page
from guide.app.browser.types import PageLike
from guide.app.models.domain.models import DebugInfo
async def capture_screenshot(page: Page) -> str | None:
async def capture_screenshot(page: PageLike) -> str | None:
"""Capture page screenshot as base64-encoded PNG.
Args:
@@ -29,7 +28,7 @@ async def capture_screenshot(page: Page) -> str | None:
return None
async def capture_html(page: Page) -> str | None:
async def capture_html(page: PageLike) -> str | None:
"""Capture page HTML content.
Args:
@@ -45,7 +44,7 @@ async def capture_html(page: Page) -> str | None:
return None
async def capture_console_logs(_page: Page) -> list[str]:
async def capture_console_logs(_page: PageLike) -> list[str]:
"""Capture console messages logged during page lifecycle.
Note: This captures messages that were emitted during the page's
@@ -64,7 +63,7 @@ async def capture_console_logs(_page: Page) -> list[str]:
return []
async def capture_all_diagnostics(page: Page) -> DebugInfo:
async def capture_all_diagnostics(page: PageLike) -> DebugInfo:
"""Capture all diagnostic information (screenshot, HTML, logs).
Attempts to capture screenshot, HTML, and console logs. If any

View File

@@ -0,0 +1,14 @@
"""Convenient exports for browser element helpers (extension-friendly)."""
from guide.app.browser.elements.dropdown import select_multi, select_single, select_combobox
from guide.app.browser.elements.form import fill_text, fill_textarea, fill_date, fill_autocomplete
__all__ = [
"select_multi",
"select_single",
"select_combobox",
"fill_text",
"fill_textarea",
"fill_date",
"fill_autocomplete",
]

View File

@@ -0,0 +1,324 @@
"""Dropdown helpers optimized for extension mode.
Provides simple coroutines for multi/single selects and MUI Select combobox fields.
Avoids Playwright-only APIs where possible; uses short waits and auto-closes.
"""
import asyncio
import logging
import contextlib
from typing import TypedDict, cast
from guide.app.browser.types import PageLike
_logger = logging.getLogger(__name__)
class DropdownResult(TypedDict):
selected: list[str]
not_found: list[str]
available: list[str]
async def _send_key(page: PageLike, key: str) -> None:
keycode_map = {"ArrowDown": 40, "ArrowUp": 38, "Enter": 13, "Escape": 27, "Tab": 9}
keycode = keycode_map.get(key, 0)
await page.evaluate(
f"""
(() => {{
const event = new KeyboardEvent('keydown', {{
key: '{key}',
code: '{key}',
keyCode: {keycode},
which: {keycode},
bubbles: true,
cancelable: true,
composed: true
}});
if (document.activeElement) {{
document.activeElement.dispatchEvent(event);
}}
return true;
}})();
"""
)
async def _wait_for_role_option(page: PageLike, timeout_ms: int = 1200) -> bool:
start = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start) * 1000 < timeout_ms:
exists = await page.evaluate("document.querySelector('[role=\"option\"]') !== null")
if exists:
return True
await asyncio.sleep(0.05)
return False
async def _ensure_listbox(page: PageLike, timeout_ms: int = 1200) -> bool:
with contextlib.suppress(Exception):
if hasattr(page, "wait_for_selector"):
await page.wait_for_selector('[role="option"]', timeout=timeout_ms)
return True
return await _wait_for_role_option(page, timeout_ms)
async def _open_dropdown(page: PageLike, field_selector: str, popup_button_selector: str) -> None:
try:
await page.click(popup_button_selector)
except Exception:
await page.click(field_selector)
await _send_key(page, "ArrowDown")
await page.wait_for_timeout(100)
await page.wait_for_timeout(50)
async def _get_options(page: PageLike, field_selector: str) -> list[dict[str, str | int]] | None:
field_selector_js = field_selector.replace("'", "\\'")
result = await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector_js}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
const optionNodes = listbox ? Array.from(listbox.querySelectorAll('[role="option"]')) : [];
return optionNodes.map((opt, index) => ({{ index: index, text: (opt.textContent || '').trim() }}));
}})()
"""
)
return cast(list[dict[str, str | int]] | None, result)
async def select_multi(page: PageLike, field_selector: str, values: list[str]) -> DropdownResult:
popup_button_selector = f"{field_selector} .MuiAutocomplete-popupIndicator"
await _open_dropdown(page, field_selector, popup_button_selector)
await _ensure_listbox(page)
options = await _get_options(page, field_selector)
available_option_texts: list[str] = []
if options:
available_option_texts.extend([
str(opt["text"]) for opt in options
if "text" in opt
])
selected: list[str] = []
not_found: list[str] = []
for target_value in values:
escaped = target_value.replace("'", "\\'").replace('"', '\\"')
if not await _ensure_listbox(page):
await _open_dropdown(page, field_selector, popup_button_selector)
await _ensure_listbox(page)
options = await _get_options(page, field_selector)
clicked = await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
if (!listbox) return false;
const options = Array.from(listbox.querySelectorAll('[role="option"]'));
const target = '{escaped}'.toLowerCase();
const match = options.find(opt => {{
const text = (opt.textContent || '').trim();
const lower = text.toLowerCase();
const firstSeg = lower.split(' - ')[0];
const secondSeg = lower.split(' - ')[1] || '';
return lower === target || firstSeg === target || secondSeg === target;
}});
if (!match) return false;
match.click();
return true;
}})();
"""
)
if not clicked:
await _open_dropdown(page, field_selector, popup_button_selector)
await _ensure_listbox(page)
clicked = await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
if (!listbox) return false;
const options = Array.from(listbox.querySelectorAll('[role="option"]'));
const target = '{escaped}'.toLowerCase();
const match = options.find(opt => {{
const text = (opt.textContent || '').trim();
const lower = text.toLowerCase();
const firstSeg = lower.split(' - ')[0];
const secondSeg = lower.split(' - ')[1] || '';
return lower === target || firstSeg === target || secondSeg === target;
}});
if (!match) return false;
match.click();
return true;
}})();
"""
)
if clicked:
selected.append(target_value)
else:
not_found.append(target_value)
if not_found:
await _open_dropdown(page, field_selector, popup_button_selector)
await _ensure_listbox(page)
unresolved: list[str] = []
for val in list(not_found):
escaped = val.replace("'", "\\'").replace('"', '\\"')
clicked = await page.evaluate(
f"""
(() => {{
const target = '{escaped}'.toLowerCase();
const options = Array.from(document.querySelectorAll('[role="option"]'));
const match = options.find(opt => (opt.textContent || '').trim().toLowerCase().includes(target));
if (!match) return false;
match.click();
return true;
}})();
"""
)
if clicked:
selected.append(val)
else:
unresolved.append(val)
not_found = unresolved
# Auto-close
with contextlib.suppress(Exception):
await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector}');
const input = field ? field.querySelector('input') : null;
if (input) input.blur();
const popupBtn = field ? field.querySelector('.MuiAutocomplete-popupIndicator') : null;
if (popupBtn) popupBtn.click();
return true;
}})();
"""
)
await page.wait_for_timeout(50)
with contextlib.suppress(Exception):
await _send_key(page, "Tab")
await page.wait_for_timeout(50)
return {"selected": selected, "not_found": not_found, "available": available_option_texts}
async def select_single(page: PageLike, field_selector: str, value: str) -> DropdownResult:
return await select_multi(page, field_selector, [value])
async def select_combobox(
page: PageLike,
field_selector: str,
value: str,
*,
wait_ms: int = 1200,
) -> DropdownResult:
popup_button_selector = f"{field_selector} svg[data-testid='ArrowDropDownIcon']"
selected = False
available: list[str] = []
# Open dropdown (icon, then combobox fallback)
try:
await page.click(popup_button_selector)
except Exception:
with contextlib.suppress(Exception):
await page.click(f"{field_selector} [role='combobox']")
await page.wait_for_timeout(80)
# Try clicking option by data-value/text
try:
start = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start) * 1000 < wait_ms:
options_result = await page.evaluate(
"""
(() => Array.from(document.querySelectorAll('[role="option"]'))
.map(o => ({ text: (o.textContent || '').trim(), data: o.getAttribute('data-value') || '' })))();
"""
)
if options := cast(list[dict[str, str]] | None, options_result):
available = [opt.get("text", "") for opt in options]
break
await asyncio.sleep(0.05)
if available:
selected = await page.evaluate(
f"""
(() => {{
const target = '{value}'.toLowerCase();
const options = Array.from(document.querySelectorAll('[role="option"]'));
let match = options.find(opt => (opt.getAttribute('data-value') || '').trim().toLowerCase() === target);
if (!match) {{
match = options.find(opt => (opt.textContent || '').trim().toLowerCase() === target);
}}
if (!match) return false;
match.click();
return true;
}})();
"""
) or False
except Exception:
selected = False
# Force-set as last resort if still not selected
if not selected:
try:
selected = await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector}');
if (!field) return false;
const native = field.querySelector('input');
const combo = field.querySelector('[role="combobox"]');
const val = '{value}';
if (native) {{
native.value = val;
native.dispatchEvent(new Event('input', {{ bubbles: true }}));
native.dispatchEvent(new Event('change', {{ bubbles: true }}));
native.dispatchEvent(new Event('blur', {{ bubbles: true }}));
}}
if (combo) {{
combo.textContent = val;
}}
return true;
}})();
"""
) or False
except Exception:
selected = False
# Auto-close
with contextlib.suppress(Exception):
await page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector}');
const input = field ? field.querySelector('input') : null;
if (input) input.blur();
const icon = field ? field.querySelector('svg[data-testid="ArrowDropDownIcon"]') : null;
if (icon) icon.click();
return true;
}})();
"""
)
await page.wait_for_timeout(50)
return {
"selected": [value] if selected else [],
"not_found": [] if selected else [value],
"available": available,
}

View File

@@ -0,0 +1,21 @@
"""Form fill helpers (extension-friendly)."""
from guide.app.browser.elements import dropdown
from guide.app.browser.types import PageLike
async def fill_text(page: PageLike, selector: str, value: str) -> None:
await page.fill(selector, value)
async def fill_textarea(page: PageLike, selector: str, value: str) -> None:
await page.fill(selector, value)
async def fill_date(page: PageLike, selector: str, value: str) -> None:
await page.fill(selector, value)
async def fill_autocomplete(page: PageLike, field_selector: str, value: str) -> None:
await dropdown.select_single(page, field_selector, value)

View File

@@ -13,6 +13,7 @@ from typing import Protocol, cast
from websockets.asyncio.server import Server, ServerConnection, serve
from guide.app.errors import BrowserConnectionError
from guide.app.browser.types import PageLocator
_logger = logging.getLogger(__name__)
@@ -248,7 +249,14 @@ class ExtensionPage:
"""
await asyncio.sleep(timeout / 1000)
async def goto(self, url: str, wait_until: str = "load", timeout: float = 30000) -> None:
async def goto(
self,
url: str,
*,
timeout: float | None = None,
wait_until: str | None = None,
referer: str | None = None,
) -> None:
"""No-op navigation for extension mode.
In extension mode, the user manually navigates their browser to the desired
@@ -259,13 +267,14 @@ class ExtensionPage:
Args:
url: URL to navigate to (ignored)
wait_until: When to consider navigation complete (ignored)
timeout: Maximum navigation time in milliseconds (ignored)
wait_until: When to consider navigation complete (ignored)
referer: Referer header value (ignored)
"""
# Do absolutely nothing - user has already navigated to the correct page
pass
def locator(self, selector: str) -> "ExtensionLocator":
def locator(self, selector: str) -> "PageLocator":
"""Create a locator for the given selector.
Args:
@@ -274,7 +283,90 @@ class ExtensionPage:
Returns:
ExtensionLocator instance
"""
return ExtensionLocator(self, selector)
return cast(PageLocator, ExtensionLocator(self, selector))
async def wait_for_selector(
self,
selector: str,
*,
timeout: float | None = None,
state: str | None = None,
strict: bool | None = None,
) -> object | None:
"""Wait for an element matching selector to appear.
Args:
selector: CSS selector
timeout: Maximum time to wait in milliseconds (default: 5000)
state: State to wait for (ignored for now)
strict: Whether to use strict mode (ignored)
Returns:
None (for compatibility with Playwright)
"""
timeout_ms = int(timeout) if timeout else 5000
js_code = f"""
const selector = '{selector}';
const timeout = {timeout_ms};
const startTime = Date.now();
return new Promise((resolve, reject) => {{
const interval = setInterval(() => {{
const el = document.querySelector(selector);
if (el) {{
clearInterval(interval);
resolve(true);
}} else if (Date.now() - startTime > timeout) {{
clearInterval(interval);
reject(new Error(`Timeout waiting for selector: ${{selector}}`));
}}
}}, 100);
}});
"""
try:
await self.eval_js(js_code, await_promise=True)
return None
except Exception:
return None
async def wait_for_load_state(
self,
state: str | None = None,
*,
timeout: float | None = None,
) -> None:
"""Wait for page load state (no-op for extension mode).
Args:
state: Load state to wait for (ignored)
timeout: Maximum time to wait (ignored)
"""
# Extension mode doesn't need to wait for load states
pass
async def content(self) -> str:
"""Get page HTML content.
Returns:
HTML content of the page
"""
js_code = "return document.documentElement.outerHTML;"
result = await self.eval_js(js_code)
return str(result) if result else ""
async def screenshot(self, *, full_page: bool = False) -> bytes:
"""Capture page screenshot (not supported in extension mode).
Args:
full_page: Whether to capture full page (ignored)
Returns:
Empty bytes (screenshot not supported in extension mode)
Raises:
NotImplementedError: Screenshots not supported in extension mode
"""
raise NotImplementedError("Screenshots not supported in extension mode")
async def eval_js(self, code: str, await_promise: bool = True) -> JSONValue:
"""Execute JavaScript code via extension and return result.
@@ -343,18 +435,18 @@ class ExtensionLocator:
self._page = page
self._selector = selector
def locator(self, sub_selector: str) -> "ExtensionLocator":
def locator(self, selector: str) -> "PageLocator":
"""Find a child element within this locator.
Args:
sub_selector: CSS selector relative to parent
selector: CSS selector relative to parent
Returns:
New ExtensionLocator for the child
"""
# Combine selectors with descendant combinator
combined = f"{self._selector} {sub_selector}"
return ExtensionLocator(self._page, combined)
combined = f"{self._selector} {selector}"
return cast(PageLocator, ExtensionLocator(self._page, combined))
async def click(self) -> None:
"""Click the located element."""
@@ -427,13 +519,39 @@ class ExtensionLocator:
result = await self._page.eval_js(js_code)
return int(result) if isinstance(result, (int, float)) else 0
async def text_content(self) -> str | None:
"""Get the text content of the located element.
Returns:
Text content of the element, or None if element not found
"""
js_code = f"""
const el = document.querySelector('{self._selector}');
return el ? el.textContent : null;
"""
result = await self._page.eval_js(js_code)
return str(result) if result is not None else None
def nth(self, index: int) -> "PageLocator":
"""Get the nth matching element.
Args:
index: Zero-based index of the element
Returns:
ExtensionLocator for the nth element (for now, returns self)
"""
# For now, querySelector returns first match, so we return self
# In the future, this could be enhanced to support nth-child
return cast(PageLocator, self)
@property
def first(self) -> "ExtensionLocator":
def first(self) -> "PageLocator":
"""Get the first matching element.
For now, returns self as querySelector returns first match.
"""
return self
return cast(PageLocator, self)
class ExtensionClient:

View File

@@ -7,12 +7,16 @@ Provides a stateful wrapper around Playwright Page with:
- Fluent API for common interaction sequences
"""
import contextlib
import logging
from typing import TypedDict
from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from guide.app import errors
from guide.app.browser.elements import dropdown
from guide.app.browser.types import PageLike, PageLocator
from guide.app.browser.wait import (
wait_for_network_idle,
wait_for_navigation,
@@ -65,13 +69,13 @@ class PageHelpers:
)
"""
def __init__(self, page: Page) -> None:
def __init__(self, page: PageLike) -> None:
"""Initialize helpers with a Playwright page.
Args:
page: The Playwright page instance to wrap
"""
self.page: Page = page
self.page: PageLike = page
# --- Wait utilities (wrapped for convenience) ---
@@ -216,208 +220,12 @@ class PageHelpers:
target_values: list[str],
close_after: bool = True,
) -> dict[str, list[str]]:
"""Select options from a dropdown using keyboard navigation.
Opens a dropdown by focusing the input and pressing ArrowDown,
dynamically discovers available options, then navigates to and
selects each target value using arrow keys and Enter.
Works with both single-select and multi-select dropdowns.
Args:
field_selector: CSS selector for the dropdown field container
target_values: List of option values to select
close_after: Click outside dropdown to close after selection (default: True)
Returns:
Dict with keys:
- selected: List of values that were successfully selected
- not_found: List of values that were not in the dropdown options
- available_options: List of all available option values
Raises:
ActionExecutionError: If dropdown cannot be opened or no options found
"""
# Helper to (re)open the dropdown
async def _open_dropdown():
_logger.info(f"[DROPDOWN] Opening dropdown via popup button: {popup_button_selector}")
try:
await self.page.click(popup_button_selector)
except Exception:
await self.page.click(field_selector)
await self._send_key("ArrowDown")
await self.page.wait_for_timeout(100)
# Open the MUI Autocomplete dropdown
popup_button_selector = f"{field_selector} .MuiAutocomplete-popupIndicator"
await _open_dropdown()
async def _ensure_listbox():
try:
if hasattr(self.page, "wait_for_selector"):
await self.page.wait_for_selector('[role="option"]', timeout=1200) # type: ignore[arg-type]
else:
await self._wait_for_role_option(1200)
return True
except Exception:
return False
await _ensure_listbox()
# Dynamically discover available options, scoping to the listbox associated with this field
field_selector_js = field_selector.replace("'", "\\'")
async def _get_options():
return await self.page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector_js}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
const optionNodes = listbox ? Array.from(listbox.querySelectorAll('[role="option"]')) : [];
return optionNodes.map((opt, index) => ({{ index: index, text: (opt.textContent || '').trim() }}));
}})()
"""
)
options = await _get_options()
_logger.info(f"[DROPDOWN] After popup button click: found {len(options) if options else 0} options")
if not options:
raise errors.ActionExecutionError(
"No dropdown options found",
details={
"field_selector": field_selector,
"popup_button_selector": popup_button_selector,
},
)
available_option_texts = [opt["text"] for opt in options]
selected_values: list[str] = []
not_found_values: list[str] = []
# Select each target value by clicking the matching option inside the scoped listbox
for target_value in target_values:
escaped_value = target_value.replace("'", "\\'").replace('"', '\\"')
# Ensure listbox is present/open; always refresh options for each target
if not await _ensure_listbox():
await _open_dropdown()
await _ensure_listbox()
options = await _get_options()
clicked = await self.page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector_js}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
if (!listbox) return false;
const options = Array.from(listbox.querySelectorAll('[role="option"]'));
const target = '{escaped_value}'.toLowerCase();
const match = options.find(opt => {{
const text = (opt.textContent || '').trim();
const lower = text.toLowerCase();
const firstSeg = lower.split(' - ')[0];
const secondSeg = lower.split(' - ')[1] || '';
return lower === target || firstSeg === target || secondSeg === target;
}});
if (!match) return false;
match.click();
return true;
}})()
"""
)
# Retry once after reopening listbox if initial click failed
if not clicked:
await _open_dropdown()
await _ensure_listbox()
options = await _get_options()
clicked = await self.page.evaluate(
f"""
(() => {{
const field = document.querySelector('{field_selector_js}');
const input = field ? field.querySelector('input') : null;
const listboxId = input?.getAttribute('aria-controls') || input?.getAttribute('aria-owns');
const listbox = listboxId ? document.getElementById(listboxId) : document.querySelector('[role="listbox"]');
if (!listbox) return false;
const options = Array.from(listbox.querySelectorAll('[role="option"]'));
const target = '{escaped_value}'.toLowerCase();
const match = options.find(opt => {{
const text = (opt.textContent || '').trim();
const lower = text.toLowerCase();
const firstSeg = lower.split(' - ')[0];
const secondSeg = lower.split(' - ')[1] || '';
return lower === target || firstSeg === target || secondSeg === target;
}});
if (!match) return false;
match.click();
return true;
}})()
"""
)
if clicked:
selected_values.append(target_value)
else:
not_found_values.append(target_value)
# Fallback: if any targets not found, try a global search across all role=option nodes
if not_found_values:
await _open_dropdown()
await _ensure_listbox()
unresolved: list[str] = []
for target_value in list(not_found_values):
escaped_value = target_value.replace("'", "\\'").replace('"', '\\"')
clicked = await self.page.evaluate(
f"""
(() => {{
const target = '{escaped_value}'.toLowerCase();
const options = Array.from(document.querySelectorAll('[role="option"]'));
const match = options.find(opt => (opt.textContent || '').trim().toLowerCase().includes(target));
if (!match) return false;
match.click();
return true;
}})()
"""
)
if clicked:
selected_values.append(target_value)
else:
unresolved.append(target_value)
not_found_values = unresolved
# Close dropdown if requested (avoid closing the surrounding modal)
if close_after:
try:
close_script = """
(() => {
const field = document.querySelector('{FIELD}');
const input = field ? field.querySelector('input') : null;
if (input) {
input.blur();
}
const popupBtn = field ? field.querySelector('.MuiAutocomplete-popupIndicator') : null;
if (popupBtn) popupBtn.click();
return true;
})();
""".replace("{FIELD}", field_selector_js)
await self.page.evaluate(close_script)
except Exception:
pass
await self.page.wait_for_timeout(100)
try:
await self._send_key("Tab")
except Exception:
pass
await self.page.wait_for_timeout(100)
# Delegate to dropdown helper for consistency
result = await dropdown.select_multi(self.page, field_selector, target_values)
return {
"selected": selected_values,
"not_found": not_found_values,
"available_options": available_option_texts,
"selected": result["selected"],
"not_found": result["not_found"],
"available_options": result.get("available", []),
}
async def _send_key(self, key: str) -> None:
@@ -459,7 +267,7 @@ class PageHelpers:
async def _collapse_open_listboxes(self) -> None:
"""Hide any visible role=listbox elements without dismissing modals."""
try:
with contextlib.suppress(Exception):
await self.page.evaluate(
"""
(() => {
@@ -473,8 +281,6 @@ class PageHelpers:
})();
"""
)
except Exception:
pass
async def _wait_for_role_option(self, timeout_ms: int = 3000) -> None:
"""Wait for any element with role="option" using JS polling.
@@ -549,9 +355,9 @@ class PageHelpers:
# Material-UI uses KeyboardArrowUpOutlinedIcon when expanded
for i in range(count):
try:
button = buttons.nth(i)
button: PageLocator = buttons.nth(i)
# Check if this specific button contains the up icon
up_icon = button.locator(
up_icon: PageLocator = button.locator(
'svg[data-testid="KeyboardArrowUpOutlinedIcon"]'
)
# Fast check: if icon exists, button is expanded

View File

@@ -12,6 +12,7 @@ Architecture:
import contextlib
import logging
from typing import TypeAlias
from playwright.async_api import (
Browser,
@@ -25,6 +26,8 @@ from guide.app.core.config import AppSettings, BrowserHostConfig, HostKind
from guide.app import errors
from guide.app.browser.extension_client import ExtensionClient, ExtensionPage
PageLike: TypeAlias = Page | ExtensionPage
_logger = logging.getLogger(__name__)
@@ -61,7 +64,7 @@ class BrowserInstance:
# Cache extension page
self._extension_page: ExtensionPage | None = None
async def allocate_context_and_page(self) -> tuple[BrowserContext | None, Page, bool]:
async def allocate_context_and_page(self) -> tuple[BrowserContext | None, PageLike, bool]:
"""Allocate a fresh context and page for this request.
For CDP mode: returns cached context and page from initial connection.
@@ -79,70 +82,12 @@ class BrowserInstance:
"""
try:
if self.host_config.kind == HostKind.EXTENSION:
_logger.info(f"[EXTENSION-{self.host_id}] allocate_context_and_page called")
# Get extension page from extension client
if self._extension_page is None:
_logger.info(f"[EXTENSION-{self.host_id}] First access - getting page from extension client")
if self.extension_client is None:
raise errors.BrowserConnectionError(
f"Extension client not initialized for host {self.host_id}",
details={"host_id": self.host_id},
)
self._extension_page = await self.extension_client.get_page()
_logger.info(f"[EXTENSION-{self.host_id}] Cached extension page")
else:
_logger.info(f"[EXTENSION-{self.host_id}] Using cached extension page")
_logger.info(f"[EXTENSION-{self.host_id}] Returning extension page (no context)")
# Return None context, extension page (cast as Page), don't close
return None, self._extension_page, False # type: ignore[return-value]
return await self._allocate_extension_page()
if self.host_config.kind == HostKind.CDP:
_logger.info(f"[CDP-{self.host_id}] allocate_context_and_page called")
# Return cached CDP context/page to avoid re-querying (which causes refresh)
if self._cdp_context is None or self._cdp_page is None:
_logger.info(f"[CDP-{self.host_id}] First access - querying browser.contexts")
# First time: get context and page, then cache them
assert self.browser is not None
contexts = self.browser.contexts
_logger.info(f"[CDP-{self.host_id}] Got {len(contexts)} contexts")
if not contexts:
raise errors.BrowserConnectionError(
f"No contexts available in CDP browser for host {self.host_id}",
details={"host_id": self.host_id},
)
context = contexts[0]
_logger.info(f"[CDP-{self.host_id}] Querying context.pages")
pages = context.pages
_logger.info(f"[CDP-{self.host_id}] Got {len(pages)} pages: {[p.url for p in pages]}")
if not pages:
raise errors.BrowserConnectionError(
f"No pages available in CDP browser context for host {self.host_id}",
details={"host_id": self.host_id},
)
# Find the active (most recently used) non-devtools page
non_devtools_pages = [p for p in pages if not p.url.startswith("devtools://")]
if not non_devtools_pages:
raise errors.BrowserConnectionError(
f"No application pages found in CDP browser (only devtools pages)",
details={"host_id": self.host_id, "pages": [p.url for p in pages]},
)
# Cache the context and page for reuse
self._cdp_context = context
self._cdp_page = non_devtools_pages[-1]
_logger.info(f"[CDP-{self.host_id}] Cached page: {self._cdp_page.url}")
else:
_logger.info(f"[CDP-{self.host_id}] Using cached page: {self._cdp_page.url}")
return await self._allocate_cdp_page()
_logger.info(f"[CDP-{self.host_id}] Returning cached context and page")
# Return cached references (doesn't trigger refresh)
return self._cdp_context, self._cdp_page, False # Don't close CDP contexts
# Headless mode: create new context for isolation
assert self.browser is not None
context = await self.browser.new_context()
page = await context.new_page()
return context, page, True # Close headless contexts
return await self._allocate_headless_page()
except errors.BrowserConnectionError:
raise
except Exception as exc:
@@ -151,6 +96,75 @@ class BrowserInstance:
details={"host_id": self.host_id, "host_kind": self.host_config.kind},
) from exc
async def _allocate_extension_page(self) -> tuple[None, ExtensionPage, bool]:
_logger.info(f"[EXTENSION-{self.host_id}] allocate_context_and_page called")
if self._extension_page is None:
if self.extension_client is None:
raise errors.BrowserConnectionError(
f"Extension client not initialized for host {self.host_id}",
details={"host_id": self.host_id},
)
_logger.info(
f"[EXTENSION-{self.host_id}] First access - getting page from extension client"
)
self._extension_page = await self.extension_client.get_page()
_logger.info(f"[EXTENSION-{self.host_id}] Cached extension page")
_logger.info(f"[EXTENSION-{self.host_id}] Returning extension page (no context)")
return None, self._extension_page, False
async def _allocate_cdp_page(self) -> tuple[BrowserContext, PageLike, bool]:
_logger.info(f"[CDP-{self.host_id}] allocate_context_and_page called")
if self._cdp_context is None or self._cdp_page is None:
browser = self._require_browser()
_logger.info(f"[CDP-{self.host_id}] First access - querying browser.contexts")
contexts = browser.contexts
_logger.info(f"[CDP-{self.host_id}] Got {len(contexts)} contexts")
if not contexts:
raise errors.BrowserConnectionError(
f"No contexts available in CDP browser for host {self.host_id}",
details={"host_id": self.host_id},
)
context = contexts[0]
pages = context.pages
_logger.info(f"[CDP-{self.host_id}] Got {len(pages)} pages: {[p.url for p in pages]}")
if not pages:
raise errors.BrowserConnectionError(
f"No pages available in CDP browser context for host {self.host_id}",
details={"host_id": self.host_id},
)
non_devtools_pages = [p for p in pages if not p.url.startswith("devtools://")]
if not non_devtools_pages:
raise errors.BrowserConnectionError(
"No application pages found in CDP browser (only devtools pages)",
details={"host_id": self.host_id, "pages": [p.url for p in pages]},
)
self._cdp_context = context
self._cdp_page = non_devtools_pages[-1]
_logger.info(f"[CDP-{self.host_id}] Cached page: {self._cdp_page.url}")
else:
_logger.info(f"[CDP-{self.host_id}] Using cached page: {self._cdp_page.url}")
assert self._cdp_context is not None and self._cdp_page is not None
return self._cdp_context, self._cdp_page, False
async def _allocate_headless_page(self) -> tuple[BrowserContext, Page, bool]:
browser = self._require_browser()
context = await browser.new_context()
page = await context.new_page()
return context, page, True
def _require_browser(self) -> Browser:
if self.browser is None:
raise errors.BrowserConnectionError(
f"Browser not initialized for host {self.host_id}",
details={"host_id": self.host_id},
)
return self.browser
async def close(self) -> None:
"""Close the browser connection."""
with contextlib.suppress(Exception):
@@ -225,7 +239,7 @@ class BrowserPool:
async def allocate_context_and_page(
self, host_id: str | None = None
) -> tuple[BrowserContext | None, Page, bool]:
) -> tuple[BrowserContext | None, PageLike, bool]:
"""Allocate a fresh context and page for the specified host.
Lazily creates browser connections on first request per host.

View File

@@ -0,0 +1,103 @@
"""Type definitions for browser automation interfaces."""
from typing import Literal, Protocol
class PageLocator(Protocol):
"""Protocol for locator objects returned by page.locator()."""
async def count(self) -> int:
"""Get the count of matching elements."""
...
@property
def first(self) -> "PageLocator":
"""Get the first matching element."""
...
def nth(self, index: int) -> "PageLocator":
"""Get the nth matching element."""
...
def locator(self, selector: str) -> "PageLocator":
"""Create a locator for a descendant element."""
...
async def text_content(self) -> str | None:
"""Get the text content of the element."""
...
async def click(self) -> None:
"""Click the element."""
...
class PageLike(Protocol):
"""Protocol for page-like objects that support common browser automation operations.
This protocol allows both Playwright's Page and ExtensionPage to be used
interchangeably in actions and auth functions.
"""
async def fill(self, selector: str, value: str) -> None:
"""Fill an input element with a value."""
...
async def click(self, selector: str) -> None:
"""Click an element matching the selector."""
...
async def goto(
self,
url: str,
*,
timeout: float | None = None,
wait_until: Literal["commit", "domcontentloaded", "load", "networkidle"] | None = None,
referer: str | None = None,
) -> object | None:
"""Navigate to a URL."""
...
async def evaluate(self, expression: str) -> object:
"""Evaluate JavaScript expression in page context."""
...
def locator(self, selector: str) -> PageLocator:
"""Create a locator for the given selector."""
...
async def wait_for_selector(
self,
selector: str,
*,
timeout: float | None = None,
state: Literal["attached", "detached", "hidden", "visible"] | None = None,
strict: bool | None = None,
) -> object | None:
"""Wait for an element matching selector to appear."""
...
async def wait_for_timeout(self, timeout: int) -> None:
"""Wait for a specified timeout in milliseconds."""
...
async def wait_for_load_state(
self,
state: Literal["domcontentloaded", "load", "networkidle"] | None = None,
*,
timeout: float | None = None,
) -> None:
"""Wait for page load state."""
...
async def content(self) -> str:
"""Get page HTML content."""
...
async def screenshot(self, *, full_page: bool = False) -> bytes:
"""Capture page screenshot."""
...
__all__ = ["PageLike", "PageLocator"]

View File

@@ -6,14 +6,15 @@ and verifying visual stability before proceeding with actions.
import asyncio
from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
from guide.app import errors
from guide.app.browser.types import PageLike
from guide.app.utils.retry import retry_async
async def wait_for_selector(
page: Page,
page: PageLike,
selector: str,
timeout_ms: int = 5000,
) -> None:
@@ -35,7 +36,7 @@ async def wait_for_selector(
async def wait_for_navigation(
page: Page,
page: PageLike,
timeout_ms: int = 5000,
) -> None:
"""Wait for page navigation to complete.
@@ -55,7 +56,7 @@ async def wait_for_navigation(
async def wait_for_network_idle(
page: Page,
page: PageLike,
timeout_ms: int = 5000,
) -> None:
"""Wait for network to become idle (no active requests).
@@ -75,7 +76,7 @@ async def wait_for_network_idle(
async def is_page_stable(
page: Page,
page: PageLike,
stability_check_ms: int = 500,
samples: int = 3,
) -> bool:
@@ -112,7 +113,7 @@ async def is_page_stable(
@retry_async(retries=3, delay_seconds=0.2)
async def wait_for_stable_page(
page: Page,
page: PageLike,
stability_check_ms: int = 500,
samples: int = 3,
) -> None:

View File

@@ -42,9 +42,7 @@ def _load_queries(filename: str) -> dict[str, str]:
for definition in document.definitions:
if isinstance(definition, OperationDefinitionNode) and definition.name:
op_name = definition.name.value
# Extract the source text for this operation from the original content
source_text = _extract_operation_source(content, op_name)
if source_text:
if source_text := _extract_operation_source(content, op_name):
queries[op_name] = source_text
return queries
@@ -69,11 +67,10 @@ def _extract_operation_source(content: str, op_name: str) -> str:
stripped = line.strip()
# Look for operation definition line
if start_idx is None and op_name in stripped:
if stripped.startswith(("query ", "mutation ")):
start_idx = i
brace_count = stripped.count("{") - stripped.count("}")
continue
if start_idx is None and op_name in stripped and stripped.startswith(("query ", "mutation ")):
start_idx = i
brace_count = stripped.count("{") - stripped.count("}")
continue
# Count braces if we found the start
if start_idx is not None: