import { useParams } from 'common' import { AnalyticsBucket, BigQuery, Database } from 'icons' import { Minus, Snowflake } from 'lucide-react' import Link from 'next/link' import { useEffect, useState } from 'react' import { toast } from 'sonner' import { Button, TableCell, TableRow, Tooltip, TooltipContent, TooltipTrigger, WarningIcon, } from 'ui' import { ShimmeringLoader } from 'ui-patterns/ShimmeringLoader' import { DeleteDestination } from './DeleteDestination' import { PipelineStatus } from './PipelineStatus' import { PipelineStatusName, STATUS_REFRESH_FREQUENCY_MS } from './Replication.constants' import { getFormattedLagValue } from './ReplicationPipelineStatus/ReplicationPipelineStatus.utils' import { RowMenu } from './RowMenu' import { UpdateVersionModal } from './UpdateVersionModal' import { useDestinationInformation } from './useDestinationInformation' import { AlertError } from '@/components/ui/AlertError' import { useDeleteDestinationPipelineMutation } from '@/data/replication/delete-destination-pipeline-mutation' import { useReplicationPipelineReplicationStatusQuery } from '@/data/replication/pipeline-replication-status-query' import { useReplicationPipelineStatusQuery } from '@/data/replication/pipeline-status-query' import { useReplicationPipelineVersionQuery } from '@/data/replication/pipeline-version-query' import { useStopPipelineMutation } from '@/data/replication/stop-pipeline-mutation' import { PipelineStatusRequestStatus, usePipelineRequestStatus, } from '@/state/replication-pipeline-request-status' import { type ResponseError } from '@/types' interface DestinationRowProps { destinationId: number } export const DestinationRow = ({ destinationId }: DestinationRowProps) => { const { ref: projectRef } = useParams() const [showDeleteDestinationForm, setShowDeleteDestinationForm] = useState(false) const [isDeleting, setIsDeleting] = useState(false) const [showUpdateVersionModal, setShowUpdateVersionModal] = useState(false) const { type, statusName, destination, pipeline, pipelineStatus, pipelineFetcher } = useDestinationInformation({ id: destinationId, }) const { error: pipelineError, isPending: isPipelineLoading, isError: isPipelineError, isSuccess: isPipelineSuccess, } = pipelineFetcher const destinationName = destination?.name ?? '' const { error: pipelineStatusError, isPending: isPipelineStatusLoading, isError: isPipelineStatusError, isSuccess: isPipelineStatusSuccess, } = useReplicationPipelineStatusQuery( { projectRef, pipelineId: pipeline?.id, }, { refetchInterval: STATUS_REFRESH_FREQUENCY_MS } ) const { getRequestStatus, updatePipelineStatus } = usePipelineRequestStatus() const requestStatus = pipeline?.id ? getRequestStatus(pipeline.id) : PipelineStatusRequestStatus.None const { mutateAsync: stopPipeline } = useStopPipelineMutation() const { mutateAsync: deleteDestinationPipeline } = useDeleteDestinationPipelineMutation({}) // Fetch table-level replication status to surface errors in list view const { data: replicationStatusData, isPending: isReplicationStatusLoading, isError: isReplicationStatusError, } = useReplicationPipelineReplicationStatusQuery( { projectRef, pipelineId: pipeline?.id }, { refetchInterval: STATUS_REFRESH_FREQUENCY_MS } ) const tableStatuses = replicationStatusData?.table_statuses ?? [] const errorCount = tableStatuses.filter((t) => t.state?.name === 'error').length const applyLag = replicationStatusData?.apply_lag // Show the byte-based slot lag (WAL the destination hasn't confirmed flushing yet). The // time-based flush_lag from pg_stat_replication is routinely NULL for logical slots that are // idle or don't report timed feedback, whereas confirmed_flush_lsn_bytes is always populated. const lagBytes = applyLag?.confirmed_flush_lsn_bytes const lag = getFormattedLagValue('bytes', lagBytes) const isCaughtUp = lagBytes === 0 // Only show errors when pipeline is running (not when stopped or restarting) const isPipelineStopped = statusName === PipelineStatusName.STOPPED const isRestarting = requestStatus === PipelineStatusRequestStatus.RestartRequested const hasTableErrors = errorCount > 0 && !isPipelineStopped && !isRestarting // Check if a newer pipeline version is available (one-time check cached for session) const { data: versionData } = useReplicationPipelineVersionQuery({ projectRef, pipelineId: pipeline?.id, }) const hasUpdate = Boolean(versionData?.new_version) const onDeleteClick = async () => { if (!projectRef) { return console.error('Project ref is required') } if (!pipeline) { return toast.error('No pipeline found') } try { setIsDeleting(true) await stopPipeline({ projectRef, pipelineId: pipeline.id }) await deleteDestinationPipeline({ projectRef, destinationId: destinationId, pipelineId: pipeline.id, }) // Close dialog after successful deletion setShowDeleteDestinationForm(false) toast.success(`Deleted destination "${destinationName}"`) } catch (error) { toast.error(`Failed to delete destination: ${(error as ResponseError).message}`) } finally { setIsDeleting(false) } } useEffect(() => { if (pipeline?.id) { updatePipelineStatus(pipeline.id, statusName) } }, [pipeline?.id, statusName, updatePipelineStatus]) return ( <> {isPipelineError && ( )} {isPipelineSuccess && ( {type === 'BigQuery' ? ( ) : type === 'Analytics Bucket' ? ( ) : type === 'DuckLake' ? ( ) : type === 'Snowflake' ? ( ) : ( )} {isPipelineLoading ? ( ) : (

{type} (ID: {pipeline?.id})

{destinationName}

)}
{isPipelineLoading || !pipeline ? ( ) : ( )} {!pipeline ? ( ) : isReplicationStatusLoading ? ( ) : isReplicationStatusError || !applyLag ? ( ) : isCaughtUp ? ( Caught up ) : ( {lag.display} )} {isPipelineLoading || !pipeline ? ( ) : ( pipeline.config.publication_name )}
{hasTableErrors && ( {errorCount} table{errorCount === 1 ? '' : 's'} encountered replication errors. )} setShowDeleteDestinationForm(true)} hasUpdate={hasUpdate} onUpdateClick={() => setShowUpdateVersionModal(true)} />
)} setShowUpdateVersionModal(false)} confirmLabel={ statusName === PipelineStatusName.STARTED || statusName === PipelineStatusName.FAILED ? 'Update and restart' : 'Update version' } /> ) }