mirror of
https://github.com/supabase/supabase.git
synced 2026-07-02 01:54:22 +08:00
## I have read the [CONTRIBUTING.md](https://github.com/supabase/supabase/blob/master/CONTRIBUTING.md) file. YES ## What kind of change does this PR introduce? Docs update (new guides + follow-up documentation fix from review feedback). ## What is the current behavior? There was no consolidated docs example for resumable WebSockets with Edge Functions, and no dedicated troubleshooting guide for worker timeouts / WebSocket drops. ## What is the new behavior? - Adds a resumable WebSockets guide for Edge Functions, including: - session persistence - event replay - idempotency pattern and schema examples - client/server example flow - Adds an Edge Functions troubleshooting guide for worker timeouts and WebSocket drops. - Updates docs navigation to surface the new guides. - Follow-up fix from review feedback: the browser client example now stores `sessionId` and `lastEventId` in `sessionStorage` (instead of `localStorage`). ## Additional context - Branch has been updated with latest `origin/master`. - This PR remains documentation-focused; no production runtime code changes were introduced. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Documentation** * Added a guide on resumable WebSockets covering session persistence, event replay, idempotency patterns, SQL schema examples, and client/server usage. * Added a troubleshooting guide on Edge Functions worker timeouts and WebSocket drops with scenarios, symptoms, and practical workarounds. * Enhanced WebSocket docs with a production note on worker lifecycle and keeping runtime promises open to avoid premature shutdown. * Navigation updated to surface the new guides. <!-- review_stack_entry_start --> [](https://app.coderabbit.ai/change-stack/supabase/supabase/pull/46178?utm_source=github_walkthrough&utm_medium=github&utm_campaign=change_stack) <!-- review_stack_entry_end --> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Lakshan Perera <lakshan@supabase.io> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> Co-authored-by: CodeRabbit <noreply@coderabbit.ai> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
226 lines
6.7 KiB
Plaintext
226 lines
6.7 KiB
Plaintext
---
|
|
title: 'Resumable WebSockets with Edge Functions'
|
|
description: 'Build reconnect-safe WebSockets with event replay, idempotency keys, and graceful restarts.'
|
|
---
|
|
|
|
This example shows how to build a reconnect-safe chat stream on Supabase Edge Functions using:
|
|
|
|
- WebSocket upgrade + JWT auth
|
|
- Postgres-backed session and event persistence
|
|
- Event replay with `lastEventId`
|
|
- Idempotent user messages with `idempotency_key`
|
|
- Graceful client reconnects during worker restarts
|
|
|
|
Reference implementation: [Building Resumable WebSockets with Supabase Edge Functions and Postgres](https://blog.mansueli.com/building-resumable-websockets-with-supabase-edge-functions-and-postgres)
|
|
|
|
## Architecture
|
|
|
|
1. Client connects with a user JWT, plus optional `sessionId` and `lastEventId`.
|
|
2. Function verifies auth and either resumes or creates a session.
|
|
3. Every message is written to `ws_events` with an incrementing `id`.
|
|
4. On reconnect, server replays events where `id > lastEventId`.
|
|
5. Client updates local `lastEventId` and resumes without losing messages.
|
|
|
|
## Database schema
|
|
|
|
```sql
|
|
create extension if not exists pgcrypto;
|
|
|
|
create table ws_sessions (
|
|
id uuid primary key default gen_random_uuid(),
|
|
user_id uuid not null,
|
|
created_at timestamptz default now(),
|
|
updated_at timestamptz default now(),
|
|
last_event_id bigint default 0
|
|
);
|
|
|
|
create table ws_events (
|
|
id bigint generated by default as identity primary key,
|
|
session_id uuid not null references ws_sessions(id) on delete cascade,
|
|
event_type text not null,
|
|
payload jsonb not null,
|
|
created_at timestamptz default now()
|
|
);
|
|
create index ws_events_session_id_id_idx on ws_events(session_id, id);
|
|
|
|
create table ws_idempotency_keys (
|
|
session_id uuid not null references ws_sessions(id) on delete cascade,
|
|
idempotency_key uuid not null,
|
|
primary key(session_id, idempotency_key)
|
|
);
|
|
|
|
create unlogged table ws_live_connections (
|
|
session_id uuid primary key,
|
|
connected_at timestamptz default now(),
|
|
last_seen_at timestamptz default now(),
|
|
edge_region text
|
|
);
|
|
```
|
|
|
|
## Edge Function (WebSocket proxy)
|
|
|
|
Use `supabase functions serve --no-verify-jwt` and validate JWT inside the function.
|
|
|
|
```ts
|
|
import { createAdminClient, createContextClient, verifyCredentials } from '@supabase/server/core'
|
|
|
|
const PREEMPTIVE_RESTART_MS = 340_000
|
|
|
|
function send(socket: WebSocket, payload: unknown) {
|
|
if (socket.readyState === WebSocket.OPEN) {
|
|
socket.send(JSON.stringify(payload))
|
|
}
|
|
}
|
|
|
|
Deno.serve(async (req) => {
|
|
const url = new URL(req.url)
|
|
const token = url.searchParams.get('token')
|
|
if (!token) return new Response('Missing token', { status: 401 })
|
|
|
|
const { data: auth, error } = await verifyCredentials({ token, apikey: null }, { auth: 'user' })
|
|
if (error || !auth?.userClaims?.id) {
|
|
return new Response('Unauthorized', { status: 401 })
|
|
}
|
|
|
|
const admin = createAdminClient()
|
|
const { socket, response } = Deno.upgradeWebSocket(req, { idleTimeout: 0 })
|
|
|
|
// Prevent EarlyDrop by keeping a pending promise until socket close.
|
|
let resolveClosed!: () => void
|
|
const closed = new Promise<void>((resolve) => {
|
|
resolveClosed = resolve
|
|
})
|
|
// @ts-ignore
|
|
EdgeRuntime.waitUntil(closed)
|
|
|
|
const requestedSessionId = url.searchParams.get('sessionId')
|
|
const lastEventId = Number(url.searchParams.get('lastEventId') || 0)
|
|
const sessionId = requestedSessionId ?? crypto.randomUUID()
|
|
|
|
socket.onclose = () => {
|
|
resolveClosed()
|
|
}
|
|
|
|
socket.onmessage = async (event) => {
|
|
const msg = JSON.parse(event.data)
|
|
|
|
if (msg.type === 'user_message') {
|
|
const { error: idempotencyError } = await admin.from('ws_idempotency_keys').upsert(
|
|
{
|
|
session_id: sessionId,
|
|
idempotency_key: msg.idempotency_key,
|
|
},
|
|
{ onConflict: 'session_id,idempotency_key', ignoreDuplicates: true }
|
|
)
|
|
|
|
let userEvent
|
|
|
|
if (idempotencyError) {
|
|
// Conflict detected - this is a retry, fetch the existing event
|
|
const { data: existingEvent } = await admin
|
|
.from('ws_events')
|
|
.select()
|
|
.eq('session_id', sessionId)
|
|
.eq('idempotency_key', msg.idempotency_key)
|
|
.single()
|
|
|
|
userEvent = existingEvent
|
|
} else {
|
|
// New idempotency key - insert the event
|
|
const { data: newEvent } = await admin
|
|
.from('ws_events')
|
|
.insert({
|
|
session_id: sessionId,
|
|
event_type: 'user_message',
|
|
payload: { content: msg.content },
|
|
})
|
|
.select()
|
|
.single()
|
|
|
|
userEvent = newEvent
|
|
}
|
|
|
|
send(socket, {
|
|
type: 'user_message',
|
|
payload: userEvent?.payload,
|
|
event_id: userEvent?.id,
|
|
})
|
|
}
|
|
}
|
|
|
|
send(socket, { type: 'session_init', session_id: sessionId })
|
|
|
|
queueMicrotask(async () => {
|
|
const { data: replayEvents } = await admin
|
|
.from('ws_events')
|
|
.select('*')
|
|
.eq('session_id', sessionId)
|
|
.gt('id', lastEventId)
|
|
.order('id')
|
|
|
|
for (const event of replayEvents ?? []) {
|
|
send(socket, {
|
|
type: event.event_type,
|
|
payload: event.payload,
|
|
event_id: event.id,
|
|
replay: true,
|
|
})
|
|
}
|
|
})
|
|
|
|
setTimeout(() => {
|
|
send(socket, { type: 'server_restarting' })
|
|
socket.close(1012, 'Service restart')
|
|
}, PREEMPTIVE_RESTART_MS)
|
|
|
|
return response
|
|
})
|
|
```
|
|
|
|
## Browser client
|
|
|
|
The client stores `sessionId` and `lastEventId` in session storage, then reconnects with exponential backoff.
|
|
|
|
```ts
|
|
let sessionId = sessionStorage.getItem('ws_session_id')
|
|
let lastEventId = Number(sessionStorage.getItem('last_event_id') || 0)
|
|
|
|
function connect(token: string) {
|
|
const url =
|
|
`wss://YOUR_PROJECT.functions.supabase.co/websocket-proxy` +
|
|
`?token=${encodeURIComponent(token)}` +
|
|
`&lastEventId=${lastEventId}` +
|
|
(sessionId ? `&sessionId=${sessionId}` : '')
|
|
|
|
const ws = new WebSocket(url)
|
|
|
|
ws.onmessage = (e) => {
|
|
const msg = JSON.parse(e.data)
|
|
|
|
if (msg.event_id) {
|
|
lastEventId = Math.max(lastEventId, msg.event_id)
|
|
sessionStorage.setItem('last_event_id', String(lastEventId))
|
|
}
|
|
|
|
if (msg.type === 'session_init') {
|
|
sessionId = msg.session_id
|
|
sessionStorage.setItem('ws_session_id', sessionId)
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
## Why this pattern works
|
|
|
|
- If the worker restarts, the client reconnects with the same session.
|
|
- Replay closes delivery gaps caused by reconnect windows.
|
|
- Idempotency keys prevent duplicate inserts when clients retry.
|
|
- `EdgeRuntime.waitUntil()` prevents unexpected early termination of idle-looking WebSocket workers.
|
|
|
|
## Next steps
|
|
|
|
- Add row-level security policies for all `ws_*` tables.
|
|
- Add a heartbeat and cleanup policy for stale sessions.
|
|
- Add structured event payload types and input validation.
|
|
- Add observability dashboards for disconnect rate and replay lag.
|