359 lines
14 KiB
Python
359 lines
14 KiB
Python
"""Dialog screens for confirmations and user interactions."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
from textual.app import ComposeResult
|
|
from textual.binding import Binding
|
|
from textual.containers import Container, Horizontal
|
|
from textual.screen import ModalScreen, Screen
|
|
from textual.widgets import Button, Footer, Header, LoadingIndicator, RichLog, Static
|
|
from typing_extensions import override
|
|
|
|
from ..models import CollectionInfo
|
|
|
|
if TYPE_CHECKING:
|
|
from ..app import CollectionManagementApp
|
|
from .dashboard import CollectionOverviewScreen
|
|
from .documents import DocumentManagementScreen
|
|
|
|
|
|
class ConfirmDeleteScreen(Screen[None]):
|
|
"""Screen for confirming collection deletion."""
|
|
|
|
collection: CollectionInfo
|
|
parent_screen: CollectionOverviewScreen
|
|
|
|
@property
|
|
def app(self) -> CollectionManagementApp: # type: ignore[override]
|
|
"""Return the typed app instance."""
|
|
return super().app # type: ignore[return-value]
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "app.pop_screen", "Cancel"),
|
|
Binding("y", "confirm_delete", "Yes"),
|
|
Binding("n", "app.pop_screen", "No"),
|
|
Binding("enter", "confirm_delete", "Confirm"),
|
|
]
|
|
|
|
def __init__(self, collection: CollectionInfo, parent_screen: CollectionOverviewScreen):
|
|
super().__init__()
|
|
self.collection = collection
|
|
self.parent_screen = parent_screen
|
|
|
|
@override
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Static("⚠️ Confirm Deletion", classes="title warning"),
|
|
Static(f"Are you sure you want to delete collection '{self.collection['name']}'?"),
|
|
Static(f"Backend: {self.collection['backend']}"),
|
|
Static(f"Documents: {self.collection['count']:,}"),
|
|
Static("This action cannot be undone!", classes="warning"),
|
|
Static("Press Y to confirm, N or Escape to cancel", classes="subtitle"),
|
|
Horizontal(
|
|
Button("✅ Yes, Delete (Y)", id="yes_btn", variant="error"),
|
|
Button("❌ Cancel (N)", id="no_btn", variant="default"),
|
|
classes="action_buttons",
|
|
),
|
|
classes="main_container center",
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Initialize the screen with focus on cancel button for safety."""
|
|
self.query_one("#no_btn").focus()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle button presses."""
|
|
if event.button.id == "yes_btn":
|
|
self.action_confirm_delete()
|
|
elif event.button.id == "no_btn":
|
|
self.app.pop_screen()
|
|
|
|
def action_confirm_delete(self) -> None:
|
|
"""Confirm deletion."""
|
|
self.run_worker(self.delete_collection())
|
|
|
|
async def delete_collection(self) -> None:
|
|
"""Delete the collection."""
|
|
try:
|
|
if self.collection["type"] == "weaviate" and self.parent_screen.weaviate:
|
|
# Delete Weaviate collection
|
|
if (
|
|
self.parent_screen.weaviate.client
|
|
and self.parent_screen.weaviate.client.collections
|
|
):
|
|
self.parent_screen.weaviate.client.collections.delete(self.collection["name"])
|
|
self.notify(
|
|
f"Deleted Weaviate collection: {self.collection['name']}",
|
|
severity="information",
|
|
)
|
|
else:
|
|
# Use the dashboard's method to get the appropriate storage backend
|
|
storage_backend = self.parent_screen._get_storage_for_collection(self.collection)
|
|
if not storage_backend:
|
|
self.notify(
|
|
f"❌ No storage backend available for {self.collection['type']} collection: {self.collection['name']}",
|
|
severity="error",
|
|
)
|
|
self.app.pop_screen()
|
|
return
|
|
|
|
# Check if the storage backend supports collection deletion
|
|
if not hasattr(storage_backend, "delete_collection"):
|
|
self.notify(
|
|
f"❌ Collection deletion not supported for {self.collection['type']} backend",
|
|
severity="error",
|
|
)
|
|
self.app.pop_screen()
|
|
return
|
|
|
|
# Delete the collection using the appropriate backend
|
|
# Ensure we use the exact collection name, not any default from storage config
|
|
collection_name = str(self.collection["name"])
|
|
collection_type = str(self.collection["type"])
|
|
|
|
self.notify(
|
|
f"Deleting {collection_type} collection: {collection_name}...",
|
|
severity="information",
|
|
)
|
|
|
|
# Use the standard delete_collection method for all backends
|
|
if hasattr(storage_backend, "delete_collection"):
|
|
success = await storage_backend.delete_collection(collection_name)
|
|
else:
|
|
self.notify("❌ Backend does not support collection deletion", severity="error")
|
|
self.app.pop_screen()
|
|
return
|
|
if success:
|
|
self.notify(
|
|
f"✅ Successfully deleted {self.collection['type']} collection: {self.collection['name']}",
|
|
severity="information",
|
|
timeout=3.0,
|
|
)
|
|
else:
|
|
self.notify(
|
|
f"❌ Failed to delete {self.collection['type']} collection: {self.collection['name']}",
|
|
severity="error",
|
|
)
|
|
# Don't refresh if deletion failed
|
|
self.app.pop_screen()
|
|
return
|
|
|
|
# Refresh parent screen after a short delay to ensure deletion is processed
|
|
self.call_later(self._refresh_parent_collections, 0.5) # 500ms delay
|
|
self.app.pop_screen()
|
|
|
|
except Exception as e:
|
|
self.notify(f"Failed to delete collection: {e}", severity="error", markup=False)
|
|
|
|
def _refresh_parent_collections(self) -> None:
|
|
"""Helper method to refresh parent collections."""
|
|
try:
|
|
self.parent_screen.refresh_collections()
|
|
except Exception as e:
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
logger.exception(f"Failed to refresh parent collections after deletion: {e}")
|
|
# Don't re-raise to prevent TUI crash
|
|
|
|
|
|
class ConfirmDocumentDeleteScreen(Screen[None]):
|
|
"""Screen for confirming document deletion."""
|
|
|
|
doc_ids: list[str]
|
|
collection: CollectionInfo
|
|
parent_screen: DocumentManagementScreen
|
|
|
|
@property
|
|
def app(self) -> CollectionManagementApp: # type: ignore[override]
|
|
"""Return the typed app instance."""
|
|
return super().app # type: ignore[return-value]
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "app.pop_screen", "Cancel"),
|
|
Binding("y", "confirm_delete", "Yes"),
|
|
Binding("n", "app.pop_screen", "No"),
|
|
Binding("enter", "confirm_delete", "Confirm"),
|
|
]
|
|
|
|
def __init__(
|
|
self,
|
|
doc_ids: list[str],
|
|
collection: CollectionInfo,
|
|
parent_screen: DocumentManagementScreen,
|
|
):
|
|
super().__init__()
|
|
self.doc_ids = doc_ids
|
|
self.collection = collection
|
|
self.parent_screen = parent_screen
|
|
|
|
@override
|
|
def compose(self) -> ComposeResult:
|
|
yield Header()
|
|
yield Container(
|
|
Static("⚠️ Confirm Document Deletion", classes="title warning"),
|
|
Static(
|
|
f"Are you sure you want to delete {len(self.doc_ids)} documents from '{self.collection['name']}'?"
|
|
),
|
|
Static("This action cannot be undone!", classes="warning"),
|
|
Static("Press Y to confirm, N or Escape to cancel", classes="subtitle"),
|
|
Horizontal(
|
|
Button("✅ Yes, Delete (Y)", id="yes_btn", variant="error"),
|
|
Button("❌ Cancel (N)", id="no_btn", variant="default"),
|
|
classes="action_buttons",
|
|
),
|
|
LoadingIndicator(id="loading"),
|
|
classes="main_container center",
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Initialize the screen with focus on cancel button for safety."""
|
|
self.query_one("#loading").display = False
|
|
self.query_one("#no_btn").focus()
|
|
|
|
def on_button_pressed(self, event: Button.Pressed) -> None:
|
|
"""Handle button presses."""
|
|
if event.button.id == "yes_btn":
|
|
self.action_confirm_delete()
|
|
elif event.button.id == "no_btn":
|
|
self.app.pop_screen()
|
|
|
|
def action_confirm_delete(self) -> None:
|
|
"""Confirm deletion."""
|
|
self.run_worker(self.delete_documents())
|
|
|
|
async def delete_documents(self) -> None:
|
|
"""Delete the selected documents."""
|
|
loading = self.query_one("#loading")
|
|
loading.display = True
|
|
|
|
try:
|
|
results: dict[str, bool] = {}
|
|
if hasattr(self.parent_screen, "storage") and self.parent_screen.storage:
|
|
# Delete documents via storage
|
|
# The storage should have delete_documents method for weaviate
|
|
storage = self.parent_screen.storage
|
|
if hasattr(storage, "delete_documents"):
|
|
results = await storage.delete_documents(
|
|
self.doc_ids,
|
|
collection_name=self.collection["name"],
|
|
)
|
|
|
|
# Count successful deletions
|
|
successful = sum(bool(success) for success in results.values())
|
|
failed = len(results) - successful
|
|
|
|
if successful > 0:
|
|
self.notify(f"Deleted {successful} documents", severity="information")
|
|
if failed > 0:
|
|
self.notify(f"Failed to delete {failed} documents", severity="error")
|
|
|
|
# Clear selection and refresh parent screen
|
|
self.parent_screen.selected_docs.clear()
|
|
await self.parent_screen.load_documents()
|
|
self.app.pop_screen()
|
|
|
|
except Exception as e:
|
|
self.notify(f"Failed to delete documents: {e}", severity="error", markup=False)
|
|
finally:
|
|
loading.display = False
|
|
|
|
|
|
class LogViewerScreen(ModalScreen[None]):
|
|
"""Display live log output without disrupting the TUI."""
|
|
|
|
_log_widget: RichLog | None
|
|
_log_file: Path | None
|
|
|
|
@property
|
|
def app(self) -> CollectionManagementApp: # type: ignore[override]
|
|
"""Return the typed app instance."""
|
|
return super().app # type: ignore[return-value]
|
|
|
|
BINDINGS = [
|
|
Binding("escape", "close", "Close"),
|
|
Binding("ctrl+l", "close", "Close"),
|
|
Binding("s", "show_path", "Log File"),
|
|
]
|
|
|
|
def __init__(self) -> None:
|
|
super().__init__()
|
|
self._log_widget = None
|
|
self._log_file = None
|
|
|
|
@override
|
|
def compose(self) -> ComposeResult:
|
|
yield Header(show_clock=True)
|
|
yield Container(
|
|
Static("📜 Live Application Logs", classes="title"),
|
|
Static(
|
|
"Logs update in real time. Press S to reveal the log file path.", classes="subtitle"
|
|
),
|
|
RichLog(id="log_stream", classes="log-stream", wrap=True, highlight=False),
|
|
Static("", id="log_file_path", classes="subtitle"),
|
|
classes="main_container log-viewer-container",
|
|
)
|
|
yield Footer()
|
|
|
|
def on_mount(self) -> None:
|
|
"""Attach this viewer to the parent application once mounted."""
|
|
self._log_widget = self.query_one(RichLog)
|
|
|
|
if hasattr(self.app, "attach_log_viewer"):
|
|
self.app.attach_log_viewer(self) # type: ignore[arg-type]
|
|
|
|
def on_unmount(self) -> None:
|
|
"""Detach from the parent application when closed."""
|
|
|
|
if hasattr(self.app, "detach_log_viewer"):
|
|
self.app.detach_log_viewer(self) # type: ignore[arg-type]
|
|
|
|
def _get_log_widget(self) -> RichLog:
|
|
if self._log_widget is None:
|
|
self._log_widget = self.query_one(RichLog)
|
|
if self._log_widget is None:
|
|
raise RuntimeError("RichLog widget not found")
|
|
return self._log_widget
|
|
|
|
def replace_logs(self, lines: list[str]) -> None:
|
|
"""Replace rendered logs with the provided history."""
|
|
log_widget = self._get_log_widget()
|
|
log_widget.clear()
|
|
for line in lines:
|
|
log_widget.write(line)
|
|
log_widget.scroll_end(animate=False)
|
|
|
|
def append_logs(self, lines: list[str]) -> None:
|
|
"""Append new log lines to the viewer."""
|
|
log_widget = self._get_log_widget()
|
|
for line in lines:
|
|
log_widget.write(line)
|
|
log_widget.scroll_end(animate=False)
|
|
|
|
def update_log_file(self, log_file: Path | None) -> None:
|
|
"""Update the displayed log file path."""
|
|
self._log_file = log_file
|
|
label = self.query_one("#log_file_path", Static)
|
|
if log_file is None:
|
|
label.update("Logs are not currently being persisted to disk.")
|
|
else:
|
|
label.update(f"Log file: {log_file}")
|
|
|
|
def action_close(self) -> None:
|
|
"""Close the log viewer."""
|
|
self.app.pop_screen()
|
|
|
|
def action_show_path(self) -> None:
|
|
"""Reveal the log file location in a notification."""
|
|
if self._log_file is None:
|
|
self.notify("File logging is disabled for this session.", severity="warning")
|
|
else:
|
|
self.notify(
|
|
f"Log file available at: {self._log_file}", severity="information", markup=False
|
|
)
|