Files
rag-manager/ingest_pipeline/cli/tui/screens/dialogs.py
2025-09-19 08:31:36 +00:00

324 lines
12 KiB
Python

"""Dialog screens for confirmations and user interactions."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar
from textual.app import ComposeResult
from textual.binding import Binding
from textual.containers import Container, Horizontal
from textual.screen import ModalScreen, Screen
from textual.widgets import Button, Footer, Header, LoadingIndicator, RichLog, Static
from typing_extensions import override
from ..models import CollectionInfo
if TYPE_CHECKING:
from .dashboard import CollectionOverviewScreen
from .documents import DocumentManagementScreen
class ConfirmDeleteScreen(Screen[None]):
"""Screen for confirming collection deletion."""
collection: CollectionInfo
parent_screen: CollectionOverviewScreen
BINDINGS: list[Binding] = [
Binding("escape", "app.pop_screen", "Cancel"),
Binding("y", "confirm_delete", "Yes"),
Binding("n", "app.pop_screen", "No"),
Binding("enter", "confirm_delete", "Confirm"),
]
def __init__(self, collection: CollectionInfo, parent_screen: CollectionOverviewScreen):
super().__init__()
self.collection = collection
self.parent_screen = parent_screen
@override
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static("⚠️ Confirm Deletion", classes="title warning"),
Static(f"Are you sure you want to delete collection '{self.collection['name']}'?"),
Static(f"Backend: {self.collection['backend']}"),
Static(f"Documents: {self.collection['count']:,}"),
Static("This action cannot be undone!", classes="warning"),
Static("Press Y to confirm, N or Escape to cancel", classes="subtitle"),
Horizontal(
Button("✅ Yes, Delete (Y)", id="yes_btn", variant="error"),
Button("❌ Cancel (N)", id="no_btn", variant="default"),
classes="action_buttons",
),
classes="main_container center",
)
yield Footer()
def on_mount(self) -> None:
"""Initialize the screen with focus on cancel button for safety."""
self.query_one("#no_btn").focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "yes_btn":
self.action_confirm_delete()
elif event.button.id == "no_btn":
self.app.pop_screen()
def action_confirm_delete(self) -> None:
"""Confirm deletion."""
self.run_worker(self.delete_collection())
async def delete_collection(self) -> None:
"""Delete the collection."""
try:
if self.collection["type"] == "weaviate" and self.parent_screen.weaviate:
# Delete Weaviate collection
if self.parent_screen.weaviate.client and self.parent_screen.weaviate.client.collections:
self.parent_screen.weaviate.client.collections.delete(self.collection["name"])
self.notify(
f"Deleted Weaviate collection: {self.collection['name']}",
severity="information",
)
else:
# Use the dashboard's method to get the appropriate storage backend
storage_backend = self.parent_screen._get_storage_for_collection(self.collection)
if not storage_backend:
self.notify(
f"❌ No storage backend available for {self.collection['type']} collection: {self.collection['name']}",
severity="error",
)
self.app.pop_screen()
return
# Check if the storage backend supports collection deletion
if not hasattr(storage_backend, 'delete_collection'):
self.notify(
f"❌ Collection deletion not supported for {self.collection['type']} backend",
severity="error",
)
self.app.pop_screen()
return
# Delete the collection using the appropriate backend
# Ensure we use the exact collection name, not any default from storage config
collection_name = str(self.collection["name"])
collection_type = str(self.collection["type"])
self.notify(f"Deleting {collection_type} collection: {collection_name}...", severity="information")
# Use the standard delete_collection method for all backends
if hasattr(storage_backend, 'delete_collection'):
success = await storage_backend.delete_collection(collection_name)
else:
self.notify("❌ Backend does not support collection deletion", severity="error")
self.app.pop_screen()
return
if success:
self.notify(
f"✅ Successfully deleted {self.collection['type']} collection: {self.collection['name']}",
severity="information",
timeout=3.0,
)
else:
self.notify(
f"❌ Failed to delete {self.collection['type']} collection: {self.collection['name']}",
severity="error",
)
# Don't refresh if deletion failed
self.app.pop_screen()
return
# Refresh parent screen after a short delay to ensure deletion is processed
self.call_later(lambda _: self.parent_screen.refresh_collections(), 0.5) # 500ms delay
self.app.pop_screen()
except Exception as e:
self.notify(f"Failed to delete collection: {e}", severity="error", markup=False)
class ConfirmDocumentDeleteScreen(Screen[None]):
"""Screen for confirming document deletion."""
doc_ids: list[str]
collection: CollectionInfo
parent_screen: "DocumentManagementScreen"
BINDINGS: list[Binding] = [
Binding("escape", "app.pop_screen", "Cancel"),
Binding("y", "confirm_delete", "Yes"),
Binding("n", "app.pop_screen", "No"),
Binding("enter", "confirm_delete", "Confirm"),
]
def __init__(
self,
doc_ids: list[str],
collection: CollectionInfo,
parent_screen: "DocumentManagementScreen",
):
super().__init__()
self.doc_ids = doc_ids
self.collection = collection
self.parent_screen = parent_screen
@override
def compose(self) -> ComposeResult:
yield Header()
yield Container(
Static("⚠️ Confirm Document Deletion", classes="title warning"),
Static(
f"Are you sure you want to delete {len(self.doc_ids)} documents from '{self.collection['name']}'?"
),
Static("This action cannot be undone!", classes="warning"),
Static("Press Y to confirm, N or Escape to cancel", classes="subtitle"),
Horizontal(
Button("✅ Yes, Delete (Y)", id="yes_btn", variant="error"),
Button("❌ Cancel (N)", id="no_btn", variant="default"),
classes="action_buttons",
),
LoadingIndicator(id="loading"),
classes="main_container center",
)
yield Footer()
def on_mount(self) -> None:
"""Initialize the screen with focus on cancel button for safety."""
self.query_one("#loading").display = False
self.query_one("#no_btn").focus()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "yes_btn":
self.action_confirm_delete()
elif event.button.id == "no_btn":
self.app.pop_screen()
def action_confirm_delete(self) -> None:
"""Confirm deletion."""
self.run_worker(self.delete_documents())
async def delete_documents(self) -> None:
"""Delete the selected documents."""
loading = self.query_one("#loading")
loading.display = True
try:
results: dict[str, bool] = {}
if hasattr(self.parent_screen, 'storage') and self.parent_screen.storage:
# Delete documents via storage
# The storage should have delete_documents method for weaviate
storage = self.parent_screen.storage
if hasattr(storage, 'delete_documents'):
results = await storage.delete_documents(
self.doc_ids,
collection_name=self.collection["name"],
)
# Count successful deletions
successful = sum(bool(success) for success in results.values())
failed = len(results) - successful
if successful > 0:
self.notify(f"Deleted {successful} documents", severity="information")
if failed > 0:
self.notify(f"Failed to delete {failed} documents", severity="error")
# Clear selection and refresh parent screen
self.parent_screen.selected_docs.clear()
await self.parent_screen.load_documents()
self.app.pop_screen()
except Exception as e:
self.notify(f"Failed to delete documents: {e}", severity="error", markup=False)
finally:
loading.display = False
class LogViewerScreen(ModalScreen[None]):
"""Display live log output without disrupting the TUI."""
_log_widget: RichLog | None
_log_file: Path | None
BINDINGS: list[Binding] = [
Binding("escape", "close", "Close"),
Binding("ctrl+l", "close", "Close"),
Binding("s", "show_path", "Log File"),
]
def __init__(self) -> None:
super().__init__()
self._log_widget = None
self._log_file = None
@override
def compose(self) -> ComposeResult:
yield Header(show_clock=True)
yield Container(
Static("📜 Live Application Logs", classes="title"),
Static("Logs update in real time. Press S to reveal the log file path.", classes="subtitle"),
RichLog(id="log_stream", classes="log-stream", wrap=True, highlight=False),
Static("", id="log_file_path", classes="subtitle"),
classes="main_container log-viewer-container",
)
yield Footer()
def on_mount(self) -> None:
"""Attach this viewer to the parent application once mounted."""
self._log_widget = self.query_one(RichLog)
if hasattr(self.app, 'attach_log_viewer'):
self.app.attach_log_viewer(self)
def on_unmount(self) -> None:
"""Detach from the parent application when closed."""
if hasattr(self.app, 'detach_log_viewer'):
self.app.detach_log_viewer(self)
def _get_log_widget(self) -> RichLog:
if self._log_widget is None:
self._log_widget = self.query_one(RichLog)
if self._log_widget is None:
raise RuntimeError("RichLog widget not found")
return self._log_widget
def replace_logs(self, lines: list[str]) -> None:
"""Replace rendered logs with the provided history."""
log_widget = self._get_log_widget()
log_widget.clear()
for line in lines:
log_widget.write(line)
log_widget.scroll_end(animate=False)
def append_logs(self, lines: list[str]) -> None:
"""Append new log lines to the viewer."""
log_widget = self._get_log_widget()
for line in lines:
log_widget.write(line)
log_widget.scroll_end(animate=False)
def update_log_file(self, log_file: Path | None) -> None:
"""Update the displayed log file path."""
self._log_file = log_file
label = self.query_one("#log_file_path", Static)
if log_file is None:
label.update("Logs are not currently being persisted to disk.")
else:
label.update(f"Log file: {log_file}")
def action_close(self) -> None:
"""Close the log viewer."""
self.app.pop_screen()
def action_show_path(self) -> None:
"""Reveal the log file location in a notification."""
if self._log_file is None:
self.notify("File logging is disabled for this session.", severity="warning")
else:
self.notify(f"Log file available at: {self._log_file}", severity="information", markup=False)