diff --git a/apps/studio/components/interfaces/Database/Replication/ErroredTableDetails.tsx b/apps/studio/components/interfaces/Database/Replication/ErroredTableDetails.tsx index c38c6c272c..ba0bdad5e4 100644 --- a/apps/studio/components/interfaces/Database/Replication/ErroredTableDetails.tsx +++ b/apps/studio/components/interfaces/Database/Replication/ErroredTableDetails.tsx @@ -1,7 +1,7 @@ import { useParams } from 'common' import { InlineLink } from 'components/ui/InlineLink' -import { TableState } from './ReplicationPipelineStatus.types' -import { isValidRetryPolicy } from './ReplicationPipelineStatus.utils' +import { TableState } from './ReplicationPipelineStatus/ReplicationPipelineStatus.types' +import { isValidRetryPolicy } from './ReplicationPipelineStatus/ReplicationPipelineStatus.utils' import { RetryCountdown } from './RetryCountdown' import { RetryOptionsDropdown } from './RetryOptionsDropdown' diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.tsx similarity index 75% rename from apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx rename to apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.tsx index 61197040cd..ec36254a3e 100644 --- a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.tsx +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.tsx @@ -4,10 +4,12 @@ import { Ban, ChevronLeft, ExternalLink, + Info, Pause, Play, RotateCcw, Search, + WifiOff, X, } from 'lucide-react' import Link from 'next/link' @@ -31,17 +33,18 @@ import { import { Badge, Button, cn } from 'ui' import { GenericSkeletonLoader } from 'ui-patterns' import { Input } from 'ui-patterns/DataInputs/Input' -import { ErroredTableDetails } from './ErroredTableDetails' +import { ErroredTableDetails } from '../ErroredTableDetails' import { PIPELINE_ACTIONABLE_STATES, PIPELINE_ERROR_MESSAGES, getStatusName, -} from './Pipeline.utils' -import { PipelineStatus, PipelineStatusName } from './PipelineStatus' -import { STATUS_REFRESH_FREQUENCY_MS } from './Replication.constants' -import { TableState } from './ReplicationPipelineStatus.types' +} from '../Pipeline.utils' +import { PipelineStatus, PipelineStatusName } from '../PipelineStatus' +import { STATUS_REFRESH_FREQUENCY_MS } from '../Replication.constants' +import { UpdateVersionModal } from '../UpdateVersionModal' +import { SlotLagMetrics, TableState } from './ReplicationPipelineStatus.types' import { getDisabledStateConfig, getStatusConfig } from './ReplicationPipelineStatus.utils' -import { UpdateVersionModal } from './UpdateVersionModal' +import { SlotLagMetricsInline, SlotLagMetricsList } from './SlotLagMetrics' /** * Component for displaying replication pipeline status and table replication details. @@ -82,7 +85,6 @@ export const ReplicationPipelineStatus = () => { const { data: replicationStatusData, - error: statusError, isLoading: isStatusLoading, isError: isStatusError, } = useReplicationPipelineReplicationStatusQuery( @@ -107,12 +109,14 @@ export const ReplicationPipelineStatus = () => { const config = getDisabledStateConfig({ requestStatus, statusName }) const tableStatuses = replicationStatusData?.table_statuses || [] + const applyLagMetrics = replicationStatusData?.apply_lag const filteredTableStatuses = filterString.length === 0 ? tableStatuses - : tableStatuses.filter((table: TableState) => + : tableStatuses.filter((table) => table.table_name.toLowerCase().includes(filterString.toLowerCase()) ) + const tablesWithLag = tableStatuses.filter((table) => Boolean(table.table_sync_lag)) const isPipelineRunning = statusName === 'started' const hasTableData = tableStatuses.length > 0 @@ -121,6 +125,10 @@ export const ReplicationPipelineStatus = () => { requestStatus === PipelineStatusRequestStatus.StopRequested || requestStatus === PipelineStatusRequestStatus.RestartRequested const showDisabledState = !isPipelineRunning || isEnablingDisabling + const refreshIntervalLabel = + STATUS_REFRESH_FREQUENCY_MS >= 1000 + ? `${Math.round(STATUS_REFRESH_FREQUENCY_MS / 1000)}s` + : `${STATUS_REFRESH_FREQUENCY_MS}ms` const logsUrl = `/project/${projectRef}/logs/etl-replication-logs${ pipelineId ? `?f=${encodeURIComponent(JSON.stringify({ pipeline_id: pipelineId }))}` : '' @@ -242,46 +250,105 @@ export const ReplicationPipelineStatus = () => { - - {(isPipelineLoading || isStatusLoading) && } - {isPipelineError && ( )} {isStatusError && ( - +
+ + Live updates paused + Retrying automatically +
+ )} + + {showDisabledState && ( +
+
+
+
{config.icon}
+
+
+

{config.title}

+

{config.message}

+
+
+
+ )} + + {(isPipelineLoading || isStatusLoading) && ( +
+
+
+
+
+ +
+ )} + + {applyLagMetrics && ( +
+
+
+

Replication lag

+

+ Snapshot of how far this pipeline is trailing behind right now. +

+
+

+ Updates every {refreshIntervalLabel} +

+
+ + {isStatusError && ( +

+ Unable to refresh data. Showing the last values we received. +

+ )} + + + + {tablesWithLag.length > 0 && ( + <> +
+
+
+ + + During initial sync, tables can copy and stream independently before + reconciling with the overall pipeline. + +
+
+
    + {tablesWithLag.map((table) => ( +
  • + +
  • + ))} +
+
+
+ + )} +
)} {hasTableData && ( -
- {showDisabledState && ( -
-
-
-
{config.icon}
-
-
-

{config.title}

-

{config.message}

-
-
-
- )} - +
{/* [Joshen] Should update to use new Table components next time */} { )} - {filteredTableStatuses.map((table: TableState, index: number) => { - const statusConfig = getStatusConfig(table.state) + {filteredTableStatuses.map((table, index) => { + const statusConfig = getStatusConfig(table.state as TableState['state']) return ( @@ -342,7 +409,7 @@ export const ReplicationPipelineStatus = () => { Status unavailable while pipeline is {config.badge.toLowerCase()}

) : ( -
+
{statusConfig.description}
diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.types.ts b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.types.ts similarity index 62% rename from apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.types.ts rename to apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.types.ts index f3c3cf2b8e..02aa0c694a 100644 --- a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.types.ts +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.types.ts @@ -3,6 +3,16 @@ export type RetryPolicy = | { policy: 'manual_retry' } | { policy: 'timed_retry'; next_retry: string } +export type SlotLagMetrics = { + restart_lsn_bytes: number + confirmed_flush_lsn_bytes: number + safe_wal_size_bytes: number + write_lag?: number + flush_lag?: number +} + +export type SlotLagMetricKey = keyof SlotLagMetrics + export type TableState = { table_id: number table_name: string @@ -12,4 +22,5 @@ export type TableState = { | { name: 'copied_table' } | { name: 'following_wal'; lag: number } | { name: 'error'; reason: string; solution?: string; retry_policy: RetryPolicy } + table_sync_lag?: SlotLagMetrics } diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.utils.tsx similarity index 57% rename from apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx rename to apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.utils.tsx index a4d3504431..76552b4d1d 100644 --- a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus.utils.tsx +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus.utils.tsx @@ -1,46 +1,57 @@ -import { ReplicationPipelineStatusData } from 'data/replication/pipeline-status-query' +import dayjs from 'dayjs' import { Activity, Clock, HelpCircle, Loader2, XCircle } from 'lucide-react' + +import { ReplicationPipelineStatusData } from 'data/replication/pipeline-status-query' +import { formatBytes } from 'lib/helpers' import { PipelineStatusRequestStatus } from 'state/replication-pipeline-request-status' import { Badge } from 'ui' -import { getPipelineStateMessages } from './Pipeline.utils' +import { getPipelineStateMessages } from '../Pipeline.utils' import { RetryPolicy, TableState } from './ReplicationPipelineStatus.types' +const numberFormatter = new Intl.NumberFormat() + export const getStatusConfig = (state: TableState['state']) => { switch (state.name) { case 'queued': return { badge: Queued, - description: 'Waiting to start replication', + description: 'Table is waiting for ETL to pick it up for replication.', + tooltip: 'Table is waiting for ETL to pick it up for replication.', color: 'text-warning', } case 'copying_table': return { badge: Copying, - description: 'Initial data copy in progress', + description: "Table's existing rows are being copied before live streaming begins.", + tooltip: "Table's existing rows are being copied before live streaming begins.", color: 'text-brand-600', } case 'copied_table': return { badge: Copied, - description: 'Initial copy completed', + description: "Table copy is complete and it's preparing to follow WAL changes.", + tooltip: "Table copy is complete and it's preparing to follow WAL changes.", color: 'text-success-600', } case 'following_wal': return { badge: Live, - description: `Replicating live changes`, + description: 'Table is streaming new changes in real time from the WAL.', + tooltip: 'Table is streaming new changes in real time from the WAL.', color: 'text-success-600', } case 'error': return { badge: Error, - description:
{state.reason}
, + description: 'Replication is paused because the table encountered an error.', + tooltip: 'Replication is paused because the table encountered an error.', color: 'text-destructive-600', } default: return { badge: Unknown, - description: 'Unknown status', + description: 'Table status is unavailable.', + tooltip: 'Table status is unavailable.', color: 'text-warning', } } @@ -122,3 +133,57 @@ export const isValidRetryPolicy = (policy: any): policy is RetryPolicy => { return false } } + +const formatLagBytesValue = (value?: number) => { + if (typeof value !== 'number' || Number.isNaN(value)) { + return { display: '—', detail: undefined } + } + + const decimals = value < 1024 ? 0 : value < 1024 * 1024 ? 1 : 2 + const display = formatBytes(value, decimals) + const detail = `${numberFormatter.format(value)} bytes` + + return { display, detail } +} + +const formatLagDurationValue = (value?: number) => { + if (typeof value !== 'number' || Number.isNaN(value)) { + return { display: '—', detail: undefined } + } + + const sign = value < 0 ? '-' : '' + const absMilliseconds = Math.abs(value) + const duration = dayjs.duration(absMilliseconds, 'milliseconds') + + if (absMilliseconds < 1000) { + return { display: `${value} ms`, detail: undefined } + } + + const seconds = duration.asSeconds() + if (seconds < 60) { + const decimals = seconds >= 10 ? 1 : 2 + return { + display: `${sign}${seconds.toFixed(decimals)} s`, + detail: `${numberFormatter.format(value)} ms`, + } + } + + const minutes = duration.asMinutes() + if (minutes < 60) { + const roundedSeconds = Math.round(seconds) + return { + display: `${sign}${minutes.toFixed(minutes >= 10 ? 1 : 2)} min`, + detail: `${numberFormatter.format(roundedSeconds)} s`, + } + } + + const hours = duration.asHours() + const roundedMinutes = Math.round(minutes) + return { + display: `${sign}${hours.toFixed(hours >= 10 ? 1 : 2)} h`, + detail: `${numberFormatter.format(roundedMinutes)} min`, + } +} + +export const getFormattedLagValue = (type: 'bytes' | 'duration', value?: number) => + type === 'bytes' ? formatLagBytesValue(value) : formatLagDurationValue(value) diff --git a/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/SlotLagMetrics.tsx b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/SlotLagMetrics.tsx new file mode 100644 index 0000000000..1e11718fbe --- /dev/null +++ b/apps/studio/components/interfaces/Database/Replication/ReplicationPipelineStatus/SlotLagMetrics.tsx @@ -0,0 +1,126 @@ +import { Info } from 'lucide-react' + +import { Tooltip, TooltipContent, TooltipTrigger } from 'ui' +import { SlotLagMetricKey, SlotLagMetrics } from './ReplicationPipelineStatus.types' +import { getFormattedLagValue } from './ReplicationPipelineStatus.utils' + +const SLOT_LAG_FIELDS: { + key: SlotLagMetricKey + label: string + type: 'bytes' | 'duration' + description: string +}[] = [ + { + key: 'confirmed_flush_lsn_bytes', + label: 'WAL Flush lag (size)', + type: 'bytes', + description: + 'Bytes between the newest WAL record applied locally and the latest flushed WAL record acknowledged by ETL.', + }, + { + key: 'flush_lag', + label: 'WAL Flush lag (time)', + type: 'duration', + description: + 'Time between flushing recent WAL locally and receiving notification that ETL has written and flushed it.', + }, + { + key: 'safe_wal_size_bytes', + label: 'Remaining WAL size', + type: 'bytes', + description: + 'Bytes still available to write to WAL before this slot risks entering the "lost" state.', + }, +] + +export const SlotLagMetricsInline = ({ + tableName, + metrics, +}: { + tableName: string + metrics: SlotLagMetrics +}) => { + return ( +
+ + {tableName} + + +
+ {SLOT_LAG_FIELDS.map(({ key, label, type }) => { + const { display } = getFormattedLagValue(type, metrics[key]) + return ( + + + {label} + + {display} + + ) + })} +
+
+ ) +} + +export const SlotLagMetricsList = ({ + metrics, + size = 'default', + showMetricInfo = true, +}: { + metrics: SlotLagMetrics + size?: 'default' | 'compact' + showMetricInfo?: boolean +}) => { + const gridClasses = + size === 'default' + ? 'grid-cols-1 sm:grid-cols-2 xl:grid-cols-3 gap-y-4 gap-x-6' + : 'grid-cols-2 gap-y-2 gap-x-4' + + const labelClasses = + size === 'default' ? 'text-xs text-foreground-light' : 'text-[11px] text-foreground-lighter' + + const valueClasses = + size === 'default' + ? 'text-sm font-medium text-foreground' + : 'text-xs font-medium text-foreground' + + return ( +
+ {SLOT_LAG_FIELDS.map(({ key, label, type, description }) => ( +
+
+ + {label} + {showMetricInfo && ( + + + + + + {description} + + + )} + +
+ {(() => { + const { display, detail } = getFormattedLagValue(type, metrics[key]) + return ( +
+ {display} + {detail && {detail}} +
+ ) + })()} +
+ ))} +
+ ) +} diff --git a/apps/studio/pages/project/[ref]/database/replication/[pipelineId].tsx b/apps/studio/pages/project/[ref]/database/replication/[pipelineId].tsx index 916e508530..03ffa0ba6f 100644 --- a/apps/studio/pages/project/[ref]/database/replication/[pipelineId].tsx +++ b/apps/studio/pages/project/[ref]/database/replication/[pipelineId].tsx @@ -2,7 +2,7 @@ import { useRouter } from 'next/router' import { useContext, useEffect } from 'react' import { FeatureFlagContext, useFlag, useParams } from 'common' -import { ReplicationPipelineStatus } from 'components/interfaces/Database/Replication/ReplicationPipelineStatus' +import { ReplicationPipelineStatus } from 'components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus' import DatabaseLayout from 'components/layouts/DatabaseLayout/DatabaseLayout' import DefaultLayout from 'components/layouts/DefaultLayout' import { ScaffoldContainer, ScaffoldSection } from 'components/layouts/Scaffold' diff --git a/packages/api-types/types/platform.d.ts b/packages/api-types/types/platform.d.ts index b5ed0ecbf2..5c0a5cc0d8 100644 --- a/packages/api-types/types/platform.d.ts +++ b/packages/api-types/types/platform.d.ts @@ -7946,6 +7946,34 @@ export interface components { }[] } ReplicationPipelineReplicationStatusResponse: { + /** @description The apply worker lag */ + apply_lag?: { + /** + * @description Bytes between the current WAL location and the confirmed flush LSN. + * @example 2048 + */ + confirmed_flush_lsn_bytes: number + /** + * @description Flush lag expressed in milliseconds. + * @example 1200 + */ + flush_lag?: number + /** + * @description Bytes between the current WAL location and the slot restart LSN. + * @example 1024 + */ + restart_lsn_bytes: number + /** + * @description How many bytes of WAL are still safe to build up before the limit of the slot is reached. + * @example 8192 + */ + safe_wal_size_bytes: number + /** + * @description Write lag expressed in milliseconds. + * @example 1500 + */ + write_lag?: number + } /** * @description Pipeline id * @example 1012 @@ -7968,7 +7996,6 @@ export interface components { name: 'copied_table' } | { - lag: number /** @enum {string} */ name: 'following_wal' } @@ -8006,6 +8033,34 @@ export interface components { * @example public.orders */ table_name: string + /** @description The table sync worker lag */ + table_sync_lag?: { + /** + * @description Bytes between the current WAL location and the confirmed flush LSN. + * @example 2048 + */ + confirmed_flush_lsn_bytes: number + /** + * @description Flush lag expressed in milliseconds. + * @example 1200 + */ + flush_lag?: number + /** + * @description Bytes between the current WAL location and the slot restart LSN. + * @example 1024 + */ + restart_lsn_bytes: number + /** + * @description How many bytes of WAL are still safe to build up before the limit of the slot is reached. + * @example 8192 + */ + safe_wal_size_bytes: number + /** + * @description Write lag expressed in milliseconds. + * @example 1500 + */ + write_lag?: number + } }[] } /** @description Pipeline */ @@ -8342,7 +8397,6 @@ export interface components { name: 'copied_table' } | { - lag: number /** @enum {string} */ name: 'following_wal' }