feat: Add endpoint and UI to retry failed documents
Add a new `/documents/reprocess_failed` API endpoint and corresponding UI button to retry processing of failed and pending documents. This addresses a common recovery scenario when document processing fails due to server crashes, network errors, or LLM service outages. Backend changes: - Add ReprocessResponse model with status, message, and track_id fields - Add POST /documents/reprocess_failed endpoint that triggers background reprocessing of FAILED, PENDING, and interrupted PROCESSING documents - Reuses existing apipeline_process_enqueue_documents for consistency - Includes comprehensive docstring and logging for observability Frontend changes: - Add TypeScript types and API function for the new endpoint - Add retry handler with intelligent polling (fast refresh → normal) - Add "Retry Failed" button in Documents page toolbar - Button disabled when pipeline is busy to prevent duplicate operations - Complete i18n support (English and Chinese translations) This feature provides a convenient way to recover from processing failures without requiring a full filesystem rescan.
This commit is contained in:
@@ -134,6 +134,33 @@ class ScanResponse(BaseModel):
|
||||
}
|
||||
|
||||
|
||||
class ReprocessResponse(BaseModel):
|
||||
"""Response model for reprocessing failed documents operation
|
||||
|
||||
Attributes:
|
||||
status: Status of the reprocessing operation
|
||||
message: Message describing the operation result
|
||||
track_id: Tracking ID for monitoring reprocessing progress
|
||||
"""
|
||||
|
||||
status: Literal["reprocessing_started"] = Field(
|
||||
description="Status of the reprocessing operation"
|
||||
)
|
||||
message: str = Field(description="Human-readable message describing the operation")
|
||||
track_id: str = Field(
|
||||
description="Tracking ID for monitoring reprocessing progress"
|
||||
)
|
||||
|
||||
class Config:
|
||||
json_schema_extra = {
|
||||
"example": {
|
||||
"status": "reprocessing_started",
|
||||
"message": "Reprocessing of failed documents has been initiated in background",
|
||||
"track_id": "retry_20250729_170612_def456",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class InsertTextRequest(BaseModel):
|
||||
"""Request model for inserting a single text document
|
||||
|
||||
@@ -2657,4 +2684,52 @@ def create_document_routes(
|
||||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
@router.post(
|
||||
"/reprocess_failed",
|
||||
response_model=ReprocessResponse,
|
||||
dependencies=[Depends(combined_auth)],
|
||||
)
|
||||
async def reprocess_failed_documents(background_tasks: BackgroundTasks):
|
||||
"""
|
||||
Reprocess failed and pending documents.
|
||||
|
||||
This endpoint triggers the document processing pipeline which automatically
|
||||
picks up and reprocesses documents in the following statuses:
|
||||
- FAILED: Documents that failed during previous processing attempts
|
||||
- PENDING: Documents waiting to be processed
|
||||
- PROCESSING: Documents with abnormally terminated processing (e.g., server crashes)
|
||||
|
||||
This is useful for recovering from server crashes, network errors, LLM service
|
||||
outages, or other temporary failures that caused document processing to fail.
|
||||
|
||||
The processing happens in the background and can be monitored using the
|
||||
returned track_id or by checking the pipeline status.
|
||||
|
||||
Returns:
|
||||
ReprocessResponse: Response with status, message, and track_id
|
||||
|
||||
Raises:
|
||||
HTTPException: If an error occurs while initiating reprocessing (500).
|
||||
"""
|
||||
try:
|
||||
# Generate track_id with "retry" prefix for retry operation
|
||||
track_id = generate_track_id("retry")
|
||||
|
||||
# Start the reprocessing in the background
|
||||
background_tasks.add_task(rag.apipeline_process_enqueue_documents)
|
||||
logger.info(
|
||||
f"Reprocessing of failed documents initiated with track_id: {track_id}"
|
||||
)
|
||||
|
||||
return ReprocessResponse(
|
||||
status="reprocessing_started",
|
||||
message="Reprocessing of failed documents has been initiated in background",
|
||||
track_id=track_id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error initiating reprocessing of failed documents: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
return router
|
||||
|
||||
@@ -155,6 +155,12 @@ export type ScanResponse = {
|
||||
track_id: string
|
||||
}
|
||||
|
||||
export type ReprocessFailedResponse = {
|
||||
status: 'reprocessing_started'
|
||||
message: string
|
||||
track_id: string
|
||||
}
|
||||
|
||||
export type DeleteDocResponse = {
|
||||
status: 'deletion_started' | 'busy' | 'not_allowed'
|
||||
message: string
|
||||
@@ -353,6 +359,11 @@ export const scanNewDocuments = async (): Promise<ScanResponse> => {
|
||||
return response.data
|
||||
}
|
||||
|
||||
export const reprocessFailedDocuments = async (): Promise<ReprocessFailedResponse> => {
|
||||
const response = await axiosInstance.post('/documents/reprocess_failed')
|
||||
return response.data
|
||||
}
|
||||
|
||||
export const getDocumentsScanProgress = async (): Promise<LightragDocumentsScanProgress> => {
|
||||
const response = await axiosInstance.get('/documents/scan-progress')
|
||||
return response.data
|
||||
|
||||
@@ -21,6 +21,7 @@ import PaginationControls from '@/components/ui/PaginationControls'
|
||||
|
||||
import {
|
||||
scanNewDocuments,
|
||||
reprocessFailedDocuments,
|
||||
getDocumentsPaginated,
|
||||
DocsStatusesResponse,
|
||||
DocStatus,
|
||||
@@ -833,6 +834,42 @@ export default function DocumentManager() {
|
||||
}
|
||||
}, [t, startPollingInterval, currentTab, health, statusCounts])
|
||||
|
||||
const retryFailedDocuments = useCallback(async () => {
|
||||
try {
|
||||
// Check if component is still mounted before starting the request
|
||||
if (!isMountedRef.current) return;
|
||||
|
||||
const { status, message, track_id: _track_id } = await reprocessFailedDocuments(); // eslint-disable-line @typescript-eslint/no-unused-vars
|
||||
|
||||
// Check again if component is still mounted after the request completes
|
||||
if (!isMountedRef.current) return;
|
||||
|
||||
// Note: _track_id is available for future use (e.g., progress tracking)
|
||||
toast.message(message || status);
|
||||
|
||||
// Reset health check timer with 1 second delay to avoid race condition
|
||||
useBackendState.getState().resetHealthCheckTimerDelayed(1000);
|
||||
|
||||
// Start fast refresh with 2-second interval immediately after retry
|
||||
startPollingInterval(2000);
|
||||
|
||||
// Set recovery timer to restore normal polling interval after 15 seconds
|
||||
setTimeout(() => {
|
||||
if (isMountedRef.current && currentTab === 'documents' && health) {
|
||||
// Restore intelligent polling interval based on document status
|
||||
const hasActiveDocuments = (statusCounts.processing || 0) > 0 || (statusCounts.pending || 0) > 0;
|
||||
const normalInterval = hasActiveDocuments ? 5000 : 30000;
|
||||
startPollingInterval(normalInterval);
|
||||
}
|
||||
}, 15000); // Restore after 15 seconds
|
||||
} catch (err) {
|
||||
// Only show error if component is still mounted
|
||||
if (isMountedRef.current) {
|
||||
toast.error(errorMessage(err));
|
||||
}
|
||||
}
|
||||
}, [startPollingInterval, currentTab, health, statusCounts])
|
||||
|
||||
// Handle page size change - update state and save to store
|
||||
const handlePageSizeChange = useCallback((newPageSize: number) => {
|
||||
if (newPageSize === pagination.page_size) return;
|
||||
@@ -1085,6 +1122,16 @@ export default function DocumentManager() {
|
||||
>
|
||||
<RefreshCwIcon /> {t('documentPanel.documentManager.scanButton')}
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
onClick={retryFailedDocuments}
|
||||
side="bottom"
|
||||
tooltip={t('documentPanel.documentManager.retryFailedTooltip')}
|
||||
size="sm"
|
||||
disabled={pipelineBusy}
|
||||
>
|
||||
<RotateCcwIcon /> {t('documentPanel.documentManager.retryFailedButton')}
|
||||
</Button>
|
||||
<Button
|
||||
variant="outline"
|
||||
onClick={() => setShowPipelineStatus(true)}
|
||||
|
||||
@@ -115,6 +115,8 @@
|
||||
"title": "Document Management",
|
||||
"scanButton": "Scan",
|
||||
"scanTooltip": "Scan documents in input folder",
|
||||
"retryFailedButton": "Retry Failed",
|
||||
"retryFailedTooltip": "Retry processing all failed documents",
|
||||
"refreshTooltip": "Reset document list",
|
||||
"pipelineStatusButton": "Pipeline Status",
|
||||
"pipelineStatusTooltip": "View pipeline status",
|
||||
|
||||
@@ -115,6 +115,8 @@
|
||||
"title": "文档管理",
|
||||
"scanButton": "扫描",
|
||||
"scanTooltip": "扫描输入目录中的文档",
|
||||
"retryFailedButton": "重试失败",
|
||||
"retryFailedTooltip": "重新处理所有失败的文档",
|
||||
"refreshTooltip": "复位文档清单",
|
||||
"pipelineStatusButton": "流水线状态",
|
||||
"pipelineStatusTooltip": "查看流水线状态",
|
||||
|
||||
Reference in New Issue
Block a user