feat(etl): Add replication lag UI (#38973)

This commit is contained in:
Riccardo Busetti
2025-09-25 12:43:19 +02:00
committed by GitHub
parent 205a8bd14d
commit 32d130436d
7 changed files with 379 additions and 56 deletions

View File

@@ -1,7 +1,7 @@
import { useParams } from 'common'
import { InlineLink } from 'components/ui/InlineLink'
import { TableState } from './ReplicationPipelineStatus.types'
import { isValidRetryPolicy } from './ReplicationPipelineStatus.utils'
import { TableState } from './ReplicationPipelineStatus/ReplicationPipelineStatus.types'
import { isValidRetryPolicy } from './ReplicationPipelineStatus/ReplicationPipelineStatus.utils'
import { RetryCountdown } from './RetryCountdown'
import { RetryOptionsDropdown } from './RetryOptionsDropdown'

View File

@@ -4,10 +4,12 @@ import {
Ban,
ChevronLeft,
ExternalLink,
Info,
Pause,
Play,
RotateCcw,
Search,
WifiOff,
X,
} from 'lucide-react'
import Link from 'next/link'
@@ -31,17 +33,18 @@ import {
import { Badge, Button, cn } from 'ui'
import { GenericSkeletonLoader } from 'ui-patterns'
import { Input } from 'ui-patterns/DataInputs/Input'
import { ErroredTableDetails } from './ErroredTableDetails'
import { ErroredTableDetails } from '../ErroredTableDetails'
import {
PIPELINE_ACTIONABLE_STATES,
PIPELINE_ERROR_MESSAGES,
getStatusName,
} from './Pipeline.utils'
import { PipelineStatus, PipelineStatusName } from './PipelineStatus'
import { STATUS_REFRESH_FREQUENCY_MS } from './Replication.constants'
import { TableState } from './ReplicationPipelineStatus.types'
} from '../Pipeline.utils'
import { PipelineStatus, PipelineStatusName } from '../PipelineStatus'
import { STATUS_REFRESH_FREQUENCY_MS } from '../Replication.constants'
import { UpdateVersionModal } from '../UpdateVersionModal'
import { SlotLagMetrics, TableState } from './ReplicationPipelineStatus.types'
import { getDisabledStateConfig, getStatusConfig } from './ReplicationPipelineStatus.utils'
import { UpdateVersionModal } from './UpdateVersionModal'
import { SlotLagMetricsInline, SlotLagMetricsList } from './SlotLagMetrics'
/**
* Component for displaying replication pipeline status and table replication details.
@@ -82,7 +85,6 @@ export const ReplicationPipelineStatus = () => {
const {
data: replicationStatusData,
error: statusError,
isLoading: isStatusLoading,
isError: isStatusError,
} = useReplicationPipelineReplicationStatusQuery(
@@ -107,12 +109,14 @@ export const ReplicationPipelineStatus = () => {
const config = getDisabledStateConfig({ requestStatus, statusName })
const tableStatuses = replicationStatusData?.table_statuses || []
const applyLagMetrics = replicationStatusData?.apply_lag
const filteredTableStatuses =
filterString.length === 0
? tableStatuses
: tableStatuses.filter((table: TableState) =>
: tableStatuses.filter((table) =>
table.table_name.toLowerCase().includes(filterString.toLowerCase())
)
const tablesWithLag = tableStatuses.filter((table) => Boolean(table.table_sync_lag))
const isPipelineRunning = statusName === 'started'
const hasTableData = tableStatuses.length > 0
@@ -121,6 +125,10 @@ export const ReplicationPipelineStatus = () => {
requestStatus === PipelineStatusRequestStatus.StopRequested ||
requestStatus === PipelineStatusRequestStatus.RestartRequested
const showDisabledState = !isPipelineRunning || isEnablingDisabling
const refreshIntervalLabel =
STATUS_REFRESH_FREQUENCY_MS >= 1000
? `${Math.round(STATUS_REFRESH_FREQUENCY_MS / 1000)}s`
: `${STATUS_REFRESH_FREQUENCY_MS}ms`
const logsUrl = `/project/${projectRef}/logs/etl-replication-logs${
pipelineId ? `?f=${encodeURIComponent(JSON.stringify({ pipeline_id: pipelineId }))}` : ''
@@ -242,46 +250,105 @@ export const ReplicationPipelineStatus = () => {
</Button>
</div>
</div>
{(isPipelineLoading || isStatusLoading) && <GenericSkeletonLoader />}
{isPipelineError && (
<AlertError error={pipelineError} subject={PIPELINE_ERROR_MESSAGES.RETRIEVE_PIPELINE} />
)}
{isStatusError && (
<AlertError
error={statusError}
subject={PIPELINE_ERROR_MESSAGES.RETRIEVE_REPLICATION_STATUS}
/>
<div className="flex items-center gap-2 rounded-lg border border-warning-400 bg-warning-50 px-3 py-2 text-xs text-warning-800">
<WifiOff size={14} />
<span className="font-medium">Live updates paused</span>
<span className="text-warning-700">Retrying automatically</span>
</div>
)}
{showDisabledState && (
<div
className={cn(
'p-4 border border-default rounded-lg flex items-center justify-between',
config.colors.bg
)}
>
<div className="flex items-center gap-x-3">
<div
className={cn(
'w-10 h-10 rounded-full flex items-center justify-center',
config.colors.iconBg
)}
>
<div className={config.colors.icon}>{config.icon}</div>
</div>
<div className="flex-1">
<h4 className={`text-sm font-medium ${config.colors.text}`}>{config.title}</h4>
<p className={`text-sm ${config.colors.subtext}`}>{config.message}</p>
</div>
</div>
</div>
)}
{(isPipelineLoading || isStatusLoading) && (
<div className="space-y-3">
<div className="flex items-center gap-x-3">
<div className="h-6 w-40 rounded bg-surface-200" />
<div className="h-5 w-24 rounded bg-surface-200" />
</div>
<GenericSkeletonLoader />
</div>
)}
{applyLagMetrics && (
<div className="border border-default rounded-lg bg-surface-100 px-4 py-4 space-y-3">
<div className="flex flex-wrap items-baseline justify-between gap-y-1">
<div>
<h4 className="text-sm font-semibold text-foreground">Replication lag</h4>
<p className="text-xs text-foreground-light">
Snapshot of how far this pipeline is trailing behind right now.
</p>
</div>
<p className="text-xs text-foreground-lighter">
Updates every {refreshIntervalLabel}
</p>
</div>
{isStatusError && (
<p className="text-xs text-warning-700">
Unable to refresh data. Showing the last values we received.
</p>
)}
<SlotLagMetricsList metrics={applyLagMetrics} />
{tablesWithLag.length > 0 && (
<>
<div className="border-t border-default/40" />
<div className="space-y-3 text-xs text-foreground">
<div className="flex items-start gap-2 rounded-md border border-default/50 bg-surface-200/60 px-3 py-2 text-foreground-light">
<Info size={14} className="mt-0.5" />
<span>
During initial sync, tables can copy and stream independently before
reconciling with the overall pipeline.
</span>
</div>
<div className="rounded border border-default/50 bg-surface-200/40">
<ul className="divide-y divide-default/40">
{tablesWithLag.map((table) => (
<li key={`${table.table_id}-${table.table_name}`} className="px-3 py-2">
<SlotLagMetricsInline
tableName={table.table_name}
metrics={table.table_sync_lag as SlotLagMetrics}
/>
</li>
))}
</ul>
</div>
</div>
</>
)}
</div>
)}
{hasTableData && (
<div className="flex flex-col gap-y-4">
{showDisabledState && (
<div
className={cn(
'p-4 border border-default rounded-lg flex items-center justify-between',
config.colors.bg
)}
>
<div className="flex items-center gap-x-3">
<div
className={cn(
'w-10 h-10 rounded-full flex items-center justify-center',
config.colors.iconBg
)}
>
<div className={config.colors.icon}>{config.icon}</div>
</div>
<div className="flex-1">
<h4 className={`text-sm font-medium ${config.colors.text}`}>{config.title}</h4>
<p className={`text-sm ${config.colors.subtext}`}>{config.message}</p>
</div>
</div>
</div>
)}
<div className="flex flex-col gap-y-3">
<div className="w-full overflow-hidden overflow-x-auto">
{/* [Joshen] Should update to use new Table components next time */}
<Table
@@ -304,8 +371,8 @@ export const ReplicationPipelineStatus = () => {
</Table.td>
</Table.tr>
)}
{filteredTableStatuses.map((table: TableState, index: number) => {
const statusConfig = getStatusConfig(table.state)
{filteredTableStatuses.map((table, index) => {
const statusConfig = getStatusConfig(table.state as TableState['state'])
return (
<Table.tr key={`${table.table_name}-${index}`} className="border-t">
<Table.td className="align-top">
@@ -342,7 +409,7 @@ export const ReplicationPipelineStatus = () => {
Status unavailable while pipeline is {config.badge.toLowerCase()}
</p>
) : (
<div className="space-y-1">
<div className="space-y-3">
<div className="text-sm text-foreground">
{statusConfig.description}
</div>

View File

@@ -3,6 +3,16 @@ export type RetryPolicy =
| { policy: 'manual_retry' }
| { policy: 'timed_retry'; next_retry: string }
export type SlotLagMetrics = {
restart_lsn_bytes: number
confirmed_flush_lsn_bytes: number
safe_wal_size_bytes: number
write_lag?: number
flush_lag?: number
}
export type SlotLagMetricKey = keyof SlotLagMetrics
export type TableState = {
table_id: number
table_name: string
@@ -12,4 +22,5 @@ export type TableState = {
| { name: 'copied_table' }
| { name: 'following_wal'; lag: number }
| { name: 'error'; reason: string; solution?: string; retry_policy: RetryPolicy }
table_sync_lag?: SlotLagMetrics
}

View File

@@ -1,46 +1,57 @@
import { ReplicationPipelineStatusData } from 'data/replication/pipeline-status-query'
import dayjs from 'dayjs'
import { Activity, Clock, HelpCircle, Loader2, XCircle } from 'lucide-react'
import { ReplicationPipelineStatusData } from 'data/replication/pipeline-status-query'
import { formatBytes } from 'lib/helpers'
import { PipelineStatusRequestStatus } from 'state/replication-pipeline-request-status'
import { Badge } from 'ui'
import { getPipelineStateMessages } from './Pipeline.utils'
import { getPipelineStateMessages } from '../Pipeline.utils'
import { RetryPolicy, TableState } from './ReplicationPipelineStatus.types'
const numberFormatter = new Intl.NumberFormat()
export const getStatusConfig = (state: TableState['state']) => {
switch (state.name) {
case 'queued':
return {
badge: <Badge variant="warning">Queued</Badge>,
description: 'Waiting to start replication',
description: 'Table is waiting for ETL to pick it up for replication.',
tooltip: 'Table is waiting for ETL to pick it up for replication.',
color: 'text-warning',
}
case 'copying_table':
return {
badge: <Badge variant="brand">Copying</Badge>,
description: 'Initial data copy in progress',
description: "Table's existing rows are being copied before live streaming begins.",
tooltip: "Table's existing rows are being copied before live streaming begins.",
color: 'text-brand-600',
}
case 'copied_table':
return {
badge: <Badge variant="success">Copied</Badge>,
description: 'Initial copy completed',
description: "Table copy is complete and it's preparing to follow WAL changes.",
tooltip: "Table copy is complete and it's preparing to follow WAL changes.",
color: 'text-success-600',
}
case 'following_wal':
return {
badge: <Badge variant="success">Live</Badge>,
description: `Replicating live changes`,
description: 'Table is streaming new changes in real time from the WAL.',
tooltip: 'Table is streaming new changes in real time from the WAL.',
color: 'text-success-600',
}
case 'error':
return {
badge: <Badge variant="destructive">Error</Badge>,
description: <pre className="text-xs font-mono">{state.reason}</pre>,
description: 'Replication is paused because the table encountered an error.',
tooltip: 'Replication is paused because the table encountered an error.',
color: 'text-destructive-600',
}
default:
return {
badge: <Badge variant="warning">Unknown</Badge>,
description: 'Unknown status',
description: 'Table status is unavailable.',
tooltip: 'Table status is unavailable.',
color: 'text-warning',
}
}
@@ -122,3 +133,57 @@ export const isValidRetryPolicy = (policy: any): policy is RetryPolicy => {
return false
}
}
const formatLagBytesValue = (value?: number) => {
if (typeof value !== 'number' || Number.isNaN(value)) {
return { display: '—', detail: undefined }
}
const decimals = value < 1024 ? 0 : value < 1024 * 1024 ? 1 : 2
const display = formatBytes(value, decimals)
const detail = `${numberFormatter.format(value)} bytes`
return { display, detail }
}
const formatLagDurationValue = (value?: number) => {
if (typeof value !== 'number' || Number.isNaN(value)) {
return { display: '—', detail: undefined }
}
const sign = value < 0 ? '-' : ''
const absMilliseconds = Math.abs(value)
const duration = dayjs.duration(absMilliseconds, 'milliseconds')
if (absMilliseconds < 1000) {
return { display: `${value} ms`, detail: undefined }
}
const seconds = duration.asSeconds()
if (seconds < 60) {
const decimals = seconds >= 10 ? 1 : 2
return {
display: `${sign}${seconds.toFixed(decimals)} s`,
detail: `${numberFormatter.format(value)} ms`,
}
}
const minutes = duration.asMinutes()
if (minutes < 60) {
const roundedSeconds = Math.round(seconds)
return {
display: `${sign}${minutes.toFixed(minutes >= 10 ? 1 : 2)} min`,
detail: `${numberFormatter.format(roundedSeconds)} s`,
}
}
const hours = duration.asHours()
const roundedMinutes = Math.round(minutes)
return {
display: `${sign}${hours.toFixed(hours >= 10 ? 1 : 2)} h`,
detail: `${numberFormatter.format(roundedMinutes)} min`,
}
}
export const getFormattedLagValue = (type: 'bytes' | 'duration', value?: number) =>
type === 'bytes' ? formatLagBytesValue(value) : formatLagDurationValue(value)

View File

@@ -0,0 +1,126 @@
import { Info } from 'lucide-react'
import { Tooltip, TooltipContent, TooltipTrigger } from 'ui'
import { SlotLagMetricKey, SlotLagMetrics } from './ReplicationPipelineStatus.types'
import { getFormattedLagValue } from './ReplicationPipelineStatus.utils'
const SLOT_LAG_FIELDS: {
key: SlotLagMetricKey
label: string
type: 'bytes' | 'duration'
description: string
}[] = [
{
key: 'confirmed_flush_lsn_bytes',
label: 'WAL Flush lag (size)',
type: 'bytes',
description:
'Bytes between the newest WAL record applied locally and the latest flushed WAL record acknowledged by ETL.',
},
{
key: 'flush_lag',
label: 'WAL Flush lag (time)',
type: 'duration',
description:
'Time between flushing recent WAL locally and receiving notification that ETL has written and flushed it.',
},
{
key: 'safe_wal_size_bytes',
label: 'Remaining WAL size',
type: 'bytes',
description:
'Bytes still available to write to WAL before this slot risks entering the "lost" state.',
},
]
export const SlotLagMetricsInline = ({
tableName,
metrics,
}: {
tableName: string
metrics: SlotLagMetrics
}) => {
return (
<div className="flex flex-wrap items-center gap-x-3 gap-y-1.5 text-xs text-foreground">
<span className="truncate font-medium" title={tableName}>
{tableName}
</span>
<span className="text-foreground-lighter"></span>
<div className="flex flex-wrap items-center gap-x-3 gap-y-1.5 text-[11px] text-foreground-light">
{SLOT_LAG_FIELDS.map(({ key, label, type }) => {
const { display } = getFormattedLagValue(type, metrics[key])
return (
<span key={`${tableName}-${key}`} className="flex items-center gap-1">
<span className="uppercase tracking-wide text-[10px] text-foreground-lighter">
{label}
</span>
<span className="text-foreground">{display}</span>
</span>
)
})}
</div>
</div>
)
}
export const SlotLagMetricsList = ({
metrics,
size = 'default',
showMetricInfo = true,
}: {
metrics: SlotLagMetrics
size?: 'default' | 'compact'
showMetricInfo?: boolean
}) => {
const gridClasses =
size === 'default'
? 'grid-cols-1 sm:grid-cols-2 xl:grid-cols-3 gap-y-4 gap-x-6'
: 'grid-cols-2 gap-y-2 gap-x-4'
const labelClasses =
size === 'default' ? 'text-xs text-foreground-light' : 'text-[11px] text-foreground-lighter'
const valueClasses =
size === 'default'
? 'text-sm font-medium text-foreground'
: 'text-xs font-medium text-foreground'
return (
<dl className={`grid ${gridClasses}`}>
{SLOT_LAG_FIELDS.map(({ key, label, type, description }) => (
<div key={key} className="flex flex-col gap-0.5">
<dt className={labelClasses}>
<span className="inline-flex items-center gap-1">
{label}
{showMetricInfo && (
<Tooltip>
<TooltipTrigger asChild>
<button
type="button"
aria-label={`What is ${label}`}
className="inline-flex h-4 w-4 items-center justify-center rounded-full bg-surface-200 text-foreground-lighter transition-colors hover:bg-surface-300 hover:text-foreground focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-offset-1 focus-visible:ring-foreground-lighter"
>
<Info size={12} />
</button>
</TooltipTrigger>
<TooltipContent side="top" align="start" className="max-w-xs text-xs">
{description}
</TooltipContent>
</Tooltip>
)}
</span>
</dt>
{(() => {
const { display, detail } = getFormattedLagValue(type, metrics[key])
return (
<dd className={`flex flex-col ${valueClasses}`}>
<span>{display}</span>
{detail && <span className="text-[11px] text-foreground-lighter">{detail}</span>}
</dd>
)
})()}
</div>
))}
</dl>
)
}

View File

@@ -2,7 +2,7 @@ import { useRouter } from 'next/router'
import { useContext, useEffect } from 'react'
import { FeatureFlagContext, useFlag, useParams } from 'common'
import { ReplicationPipelineStatus } from 'components/interfaces/Database/Replication/ReplicationPipelineStatus'
import { ReplicationPipelineStatus } from 'components/interfaces/Database/Replication/ReplicationPipelineStatus/ReplicationPipelineStatus'
import DatabaseLayout from 'components/layouts/DatabaseLayout/DatabaseLayout'
import DefaultLayout from 'components/layouts/DefaultLayout'
import { ScaffoldContainer, ScaffoldSection } from 'components/layouts/Scaffold'

View File

@@ -7946,6 +7946,34 @@ export interface components {
}[]
}
ReplicationPipelineReplicationStatusResponse: {
/** @description The apply worker lag */
apply_lag?: {
/**
* @description Bytes between the current WAL location and the confirmed flush LSN.
* @example 2048
*/
confirmed_flush_lsn_bytes: number
/**
* @description Flush lag expressed in milliseconds.
* @example 1200
*/
flush_lag?: number
/**
* @description Bytes between the current WAL location and the slot restart LSN.
* @example 1024
*/
restart_lsn_bytes: number
/**
* @description How many bytes of WAL are still safe to build up before the limit of the slot is reached.
* @example 8192
*/
safe_wal_size_bytes: number
/**
* @description Write lag expressed in milliseconds.
* @example 1500
*/
write_lag?: number
}
/**
* @description Pipeline id
* @example 1012
@@ -7968,7 +7996,6 @@ export interface components {
name: 'copied_table'
}
| {
lag: number
/** @enum {string} */
name: 'following_wal'
}
@@ -8006,6 +8033,34 @@ export interface components {
* @example public.orders
*/
table_name: string
/** @description The table sync worker lag */
table_sync_lag?: {
/**
* @description Bytes between the current WAL location and the confirmed flush LSN.
* @example 2048
*/
confirmed_flush_lsn_bytes: number
/**
* @description Flush lag expressed in milliseconds.
* @example 1200
*/
flush_lag?: number
/**
* @description Bytes between the current WAL location and the slot restart LSN.
* @example 1024
*/
restart_lsn_bytes: number
/**
* @description How many bytes of WAL are still safe to build up before the limit of the slot is reached.
* @example 8192
*/
safe_wal_size_bytes: number
/**
* @description Write lag expressed in milliseconds.
* @example 1500
*/
write_lag?: number
}
}[]
}
/** @description Pipeline */
@@ -8342,7 +8397,6 @@ export interface components {
name: 'copied_table'
}
| {
lag: number
/** @enum {string} */
name: 'following_wal'
}