Merge pull request #2562 from danielaskdd/fix-pg-timetout

Fix: Enhance PostgreSQL Reconnection Tolerance for HA Deployments
This commit is contained in:
Daniel.y
2025-12-31 23:42:28 +08:00
committed by GitHub
4 changed files with 61 additions and 38 deletions

View File

@@ -403,13 +403,22 @@ POSTGRES_VCHORDRQ_PROBES=
POSTGRES_VCHORDRQ_EPSILON=1.9 POSTGRES_VCHORDRQ_EPSILON=1.9
### PostgreSQL Connection Retry Configuration (Network Robustness) ### PostgreSQL Connection Retry Configuration (Network Robustness)
### Number of retry attempts (1-10, default: 3) ### NEW DEFAULTS (v1.4.10+): Optimized for HA deployments with ~30s switchover time
### Initial retry backoff in seconds (0.1-5.0, default: 0.5) ### These defaults provide out-of-the-box support for PostgreSQL High Availability setups
### Maximum retry backoff in seconds (backoff-60.0, default: 5.0) ###
### Number of retry attempts (1-100, default: 10)
### - Default 10 attempts allows ~225s total retry time (sufficient for most HA scenarios)
### - For extreme cases: increase up to 20-50
### Initial retry backoff in seconds (0.1-300.0, default: 3.0)
### - Default 3.0s provides reasonable initial delay for switchover detection
### - For faster recovery: decrease to 1.0-2.0
### Maximum retry backoff in seconds (must be >= backoff, max: 600.0, default: 30.0)
### - Default 30.0s matches typical switchover completion time
### - For longer switchovers: increase to 60-90
### Connection pool close timeout in seconds (1.0-30.0, default: 5.0) ### Connection pool close timeout in seconds (1.0-30.0, default: 5.0)
# POSTGRES_CONNECTION_RETRIES=3 # POSTGRES_CONNECTION_RETRIES=10
# POSTGRES_CONNECTION_RETRY_BACKOFF=0.5 # POSTGRES_CONNECTION_RETRY_BACKOFF=3.0
# POSTGRES_CONNECTION_RETRY_BACKOFF_MAX=5.0 # POSTGRES_CONNECTION_RETRY_BACKOFF_MAX=30.0
# POSTGRES_POOL_CLOSE_TIMEOUT=5.0 # POSTGRES_POOL_CLOSE_TIMEOUT=5.0
### PostgreSQL SSL Configuration (Optional) ### PostgreSQL SSL Configuration (Optional)

View File

@@ -1797,34 +1797,34 @@ class ClientManager:
), ),
# Connection retry configuration # Connection retry configuration
"connection_retry_attempts": min( "connection_retry_attempts": min(
10, 100, # Increased from 10 to 100 for long-running operations
int( int(
os.environ.get( os.environ.get(
"POSTGRES_CONNECTION_RETRIES", "POSTGRES_CONNECTION_RETRIES",
config.get("postgres", "connection_retries", fallback=3), config.get("postgres", "connection_retries", fallback=10),
) )
), ),
), ),
"connection_retry_backoff": min( "connection_retry_backoff": min(
5.0, 300.0, # Increased from 5.0 to 300.0 (5 minutes) for PG switchover scenarios
float( float(
os.environ.get( os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF", "POSTGRES_CONNECTION_RETRY_BACKOFF",
config.get( config.get(
"postgres", "connection_retry_backoff", fallback=0.5 "postgres", "connection_retry_backoff", fallback=3.0
), ),
) )
), ),
), ),
"connection_retry_backoff_max": min( "connection_retry_backoff_max": min(
60.0, 600.0, # Increased from 60.0 to 600.0 (10 minutes) for PG switchover scenarios
float( float(
os.environ.get( os.environ.get(
"POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "POSTGRES_CONNECTION_RETRY_BACKOFF_MAX",
config.get( config.get(
"postgres", "postgres",
"connection_retry_backoff_max", "connection_retry_backoff_max",
fallback=5.0, fallback=30.0,
), ),
) )
), ),

View File

@@ -584,7 +584,7 @@ export default function DocumentManager() {
// Utility function to create timeout wrapper for API calls // Utility function to create timeout wrapper for API calls
const withTimeout = useCallback(( const withTimeout = useCallback((
promise: Promise<any>, promise: Promise<any>,
timeoutMs: number = 30000, timeoutMs: number = 30000, // Default 30s timeout for normal operations
errorMsg: string = 'Request timeout' errorMsg: string = 'Request timeout'
): Promise<any> => { ): Promise<any> => {
const timeoutPromise = new Promise((_, reject) => { const timeoutPromise = new Promise((_, reject) => {
@@ -676,7 +676,8 @@ export default function DocumentManager() {
// Intelligent refresh function: handles all boundary cases // Intelligent refresh function: handles all boundary cases
const handleIntelligentRefresh = useCallback(async ( const handleIntelligentRefresh = useCallback(async (
targetPage?: number, // Optional target page, defaults to current page targetPage?: number, // Optional target page, defaults to current page
resetToFirst?: boolean // Whether to force reset to first page resetToFirst?: boolean, // Whether to force reset to first page
customTimeout?: number // Optional custom timeout in milliseconds (uses withTimeout default if not provided)
) => { ) => {
try { try {
if (!isMountedRef.current) return; if (!isMountedRef.current) return;
@@ -694,10 +695,10 @@ export default function DocumentManager() {
sort_direction: sortDirection sort_direction: sortDirection
}; };
// Use timeout wrapper for the API call // Use timeout wrapper for the API call (uses customTimeout if provided, otherwise withTimeout default)
const response = await withTimeout( const response = await withTimeout(
getDocumentsPaginated(request), getDocumentsPaginated(request),
30000, // 30 second timeout customTimeout, // Pass undefined to use default 30s, or explicit timeout for special cases
'Document fetch timeout' 'Document fetch timeout'
); );
@@ -717,7 +718,7 @@ export default function DocumentManager() {
const lastPageResponse = await withTimeout( const lastPageResponse = await withTimeout(
getDocumentsPaginated(lastPageRequest), getDocumentsPaginated(lastPageRequest),
30000, customTimeout, // Use same timeout for consistency
'Document fetch timeout' 'Document fetch timeout'
); );
@@ -847,7 +848,10 @@ export default function DocumentManager() {
// Reset health check timer with 1 second delay to avoid race condition // Reset health check timer with 1 second delay to avoid race condition
useBackendState.getState().resetHealthCheckTimerDelayed(1000); useBackendState.getState().resetHealthCheckTimerDelayed(1000);
// Start fast refresh with 2-second interval immediately after scan // Perform immediate refresh with 90s timeout after scan (tolerates PostgreSQL switchover)
await handleIntelligentRefresh(undefined, false, 90000);
// Start fast refresh with 2-second interval after initial refresh
startPollingInterval(2000); startPollingInterval(2000);
// Set recovery timer to restore normal polling interval after 15 seconds // Set recovery timer to restore normal polling interval after 15 seconds
@@ -865,7 +869,7 @@ export default function DocumentManager() {
toast.error(t('documentPanel.documentManager.errors.scanFailed', { error: errorMessage(err) })); toast.error(t('documentPanel.documentManager.errors.scanFailed', { error: errorMessage(err) }));
} }
} }
}, [t, startPollingInterval, currentTab, health, statusCounts]) }, [t, startPollingInterval, currentTab, health, statusCounts, handleIntelligentRefresh])
// Handle page size change - update state and save to store // Handle page size change - update state and save to store
const handlePageSizeChange = useCallback((newPageSize: number) => { const handlePageSizeChange = useCallback((newPageSize: number) => {
@@ -1184,7 +1188,7 @@ export default function DocumentManager() {
) : !isSelectionMode ? ( ) : !isSelectionMode ? (
<ClearDocumentsDialog onDocumentsCleared={handleDocumentsCleared} /> <ClearDocumentsDialog onDocumentsCleared={handleDocumentsCleared} />
) : null} ) : null}
<UploadDocumentsDialog onDocumentsUploaded={fetchDocuments} /> <UploadDocumentsDialog onDocumentsUploaded={() => handleIntelligentRefresh(undefined, false, 120000)} />
<PipelineStatusDialog <PipelineStatusDialog
open={showPipelineStatus} open={showPipelineStatus}
onOpenChange={setShowPipelineStatus} onOpenChange={setShowPipelineStatus}

View File

@@ -31,7 +31,13 @@ class TestPostgresRetryIntegration:
@pytest.fixture @pytest.fixture
def db_config(self): def db_config(self):
"""Load database configuration from environment variables.""" """Load database configuration from environment variables.
Uses new HA-optimized defaults that match postgres_impl.py ClientManager.get_config():
- 10 retry attempts (up from 3)
- 3.0s initial backoff (up from 0.5s)
- 30.0s max backoff (up from 5.0s)
"""
return { return {
"host": os.getenv("POSTGRES_HOST", "localhost"), "host": os.getenv("POSTGRES_HOST", "localhost"),
"port": int(os.getenv("POSTGRES_PORT", "5432")), "port": int(os.getenv("POSTGRES_PORT", "5432")),
@@ -40,31 +46,31 @@ class TestPostgresRetryIntegration:
"database": os.getenv("POSTGRES_DATABASE", "postgres"), "database": os.getenv("POSTGRES_DATABASE", "postgres"),
"workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"), "workspace": os.getenv("POSTGRES_WORKSPACE", "test_retry"),
"max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")), "max_connections": int(os.getenv("POSTGRES_MAX_CONNECTIONS", "10")),
# Connection retry configuration # Connection retry configuration - mirrors postgres_impl.py ClientManager.get_config()
# NEW DEFAULTS optimized for HA deployments
"connection_retry_attempts": min( "connection_retry_attempts": min(
10, int(os.getenv("POSTGRES_CONNECTION_RETRIES", "3")) 100,
int(os.getenv("POSTGRES_CONNECTION_RETRIES", "10")), # 3 → 10
), ),
"connection_retry_backoff": min( "connection_retry_backoff": min(
5.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "0.5")) 300.0,
float(
os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "3.0")
), # 0.5 → 3.0
), ),
"connection_retry_backoff_max": min( "connection_retry_backoff_max": min(
60.0, float(os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "5.0")) 600.0,
float(
os.getenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "30.0")
), # 5.0 → 30.0
), ),
"pool_close_timeout": min( "pool_close_timeout": min(
30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0")) 30.0, float(os.getenv("POSTGRES_POOL_CLOSE_TIMEOUT", "5.0"))
), ),
} }
@pytest.fixture
def test_env(self, monkeypatch):
"""Set up test environment variables for retry configuration."""
monkeypatch.setenv("POSTGRES_CONNECTION_RETRIES", "3")
monkeypatch.setenv("POSTGRES_CONNECTION_RETRY_BACKOFF", "0.5")
monkeypatch.setenv("POSTGRES_CONNECTION_RETRY_BACKOFF_MAX", "2.0")
monkeypatch.setenv("POSTGRES_POOL_CLOSE_TIMEOUT", "3.0")
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_real_connection_success(self, db_config, test_env): async def test_real_connection_success(self, db_config):
""" """
Test successful connection to real PostgreSQL database. Test successful connection to real PostgreSQL database.
@@ -100,11 +106,12 @@ class TestPostgresRetryIntegration:
await db.pool.close() await db.pool.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_simulated_transient_error_with_real_db(self, db_config, test_env): async def test_simulated_transient_error_with_real_db(self, db_config):
""" """
Test retry mechanism with simulated transient errors on real database. Test retry mechanism with simulated transient errors on real database.
Simulates connection failures on first 2 attempts, then succeeds. Simulates connection failures on first 2 attempts, then succeeds.
Uses new HA defaults (10 retries, 3s backoff).
""" """
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("INTEGRATION TEST 2: Simulated Transient Errors") print("INTEGRATION TEST 2: Simulated Transient Errors")
@@ -155,12 +162,13 @@ class TestPostgresRetryIntegration:
await db.pool.close() await db.pool.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_query_retry_with_real_db(self, db_config, test_env): async def test_query_retry_with_real_db(self, db_config):
""" """
Test query-level retry with simulated connection issues. Test query-level retry with simulated connection issues.
Tests that queries retry on transient failures by simulating Tests that queries retry on transient failures by simulating
a temporary database unavailability. a temporary database unavailability.
Uses new HA defaults (10 retries, 3s backoff).
""" """
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("INTEGRATION TEST 3: Query-Level Retry") print("INTEGRATION TEST 3: Query-Level Retry")
@@ -193,11 +201,12 @@ class TestPostgresRetryIntegration:
await db.pool.close() await db.pool.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_concurrent_queries_with_real_db(self, db_config, test_env): async def test_concurrent_queries_with_real_db(self, db_config):
""" """
Test concurrent queries to validate thread safety and connection pooling. Test concurrent queries to validate thread safety and connection pooling.
Runs multiple concurrent queries to ensure no deadlocks or race conditions. Runs multiple concurrent queries to ensure no deadlocks or race conditions.
Uses new HA defaults (10 retries, 3s backoff).
""" """
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("INTEGRATION TEST 4: Concurrent Queries") print("INTEGRATION TEST 4: Concurrent Queries")
@@ -243,9 +252,10 @@ class TestPostgresRetryIntegration:
await db.pool.close() await db.pool.close()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pool_close_timeout_real(self, db_config, test_env): async def test_pool_close_timeout_real(self, db_config):
""" """
Test pool close timeout protection with real database. Test pool close timeout protection with real database.
Uses new HA defaults (10 retries, 3s backoff).
""" """
print("\n" + "=" * 80) print("\n" + "=" * 80)
print("INTEGRATION TEST 5: Pool Close Timeout") print("INTEGRATION TEST 5: Pool Close Timeout")