Edge Functions

Resumable WebSockets with Edge Functions


This example shows how to build a reconnect-safe chat stream on Supabase Edge Functions using:

  • WebSocket upgrade + JWT auth
  • Postgres-backed session and event persistence
  • Event replay with lastEventId
  • Idempotent user messages with idempotency_key
  • Graceful client reconnects during worker restarts

Reference implementation: Building Resumable WebSockets with Supabase Edge Functions and Postgres

Architecture#

  1. Client connects with a user JWT, plus optional sessionId and lastEventId.
  2. Function verifies auth and either resumes or creates a session.
  3. Every message is written to ws_events with an incrementing id.
  4. On reconnect, server replays events where id > lastEventId.
  5. Client updates local lastEventId and resumes without losing messages.

Database schema#

1
create extension if not exists pgcrypto;
2
3
create table ws_sessions (
4
id uuid primary key default gen_random_uuid(),
5
user_id uuid not null,
6
created_at timestamptz default now(),
7
updated_at timestamptz default now(),
8
last_event_id bigint default 0
9
);
10
11
create table ws_events (
12
id bigint generated by default as identity primary key,
13
session_id uuid not null references ws_sessions(id) on delete cascade,
14
event_type text not null,
15
payload jsonb not null,
16
created_at timestamptz default now()
17
);
18
create index ws_events_session_id_id_idx on ws_events(session_id, id);
19
20
create table ws_idempotency_keys (
21
session_id uuid not null references ws_sessions(id) on delete cascade,
22
idempotency_key uuid not null,
23
primary key(session_id, idempotency_key)
24
);
25
26
create unlogged table ws_live_connections (
27
session_id uuid primary key,
28
connected_at timestamptz default now(),
29
last_seen_at timestamptz default now(),
30
edge_region text
31
);

Edge Function (WebSocket proxy)#

Use supabase functions serve --no-verify-jwt and validate JWT inside the function.

1
import { createAdminClient, createContextClient, verifyCredentials } from '@supabase/server/core'
2
3
const PREEMPTIVE_RESTART_MS = 340_000
4
5
function send(socket: WebSocket, payload: unknown) {
6
if (socket.readyState === WebSocket.OPEN) {
7
socket.send(JSON.stringify(payload))
8
}
9
}
10
11
Deno.serve(async (req) => {
12
const url = new URL(req.url)
13
const token = url.searchParams.get('token')
14
if (!token) return new Response('Missing token', { status: 401 })
15
16
const { data: auth, error } = await verifyCredentials({ token, apikey: null }, { auth: 'user' })
17
if (error || !auth?.userClaims?.id) {
18
return new Response('Unauthorized', { status: 401 })
19
}
20
21
const admin = createAdminClient()
22
const { socket, response } = Deno.upgradeWebSocket(req, { idleTimeout: 0 })
23
24
// Prevent EarlyDrop by keeping a pending promise until socket close.
25
let resolveClosed!: () => void
26
const closed = new Promise<void>((resolve) => {
27
resolveClosed = resolve
28
})
29
// @ts-ignore
30
EdgeRuntime.waitUntil(closed)
31
32
const requestedSessionId = url.searchParams.get('sessionId')
33
const lastEventId = Number(url.searchParams.get('lastEventId') || 0)
34
const sessionId = requestedSessionId ?? crypto.randomUUID()
35
36
socket.onclose = () => {
37
resolveClosed()
38
}
39
40
socket.onmessage = async (event) => {
41
const msg = JSON.parse(event.data)
42
43
if (msg.type === 'user_message') {
44
const { error: idempotencyError } = await admin.from('ws_idempotency_keys').upsert(
45
{
46
session_id: sessionId,
47
idempotency_key: msg.idempotency_key,
48
},
49
{ onConflict: 'session_id,idempotency_key', ignoreDuplicates: true }
50
)
51
52
let userEvent
53
54
if (idempotencyError) {
55
// Conflict detected - this is a retry, fetch the existing event
56
const { data: existingEvent } = await admin
57
.from('ws_events')
58
.select()
59
.eq('session_id', sessionId)
60
.eq('idempotency_key', msg.idempotency_key)
61
.single()
62
63
userEvent = existingEvent
64
} else {
65
// New idempotency key - insert the event
66
const { data: newEvent } = await admin
67
.from('ws_events')
68
.insert({
69
session_id: sessionId,
70
event_type: 'user_message',
71
payload: { content: msg.content },
72
})
73
.select()
74
.single()
75
76
userEvent = newEvent
77
}
78
79
send(socket, {
80
type: 'user_message',
81
payload: userEvent?.payload,
82
event_id: userEvent?.id,
83
})
84
}
85
}
86
87
send(socket, { type: 'session_init', session_id: sessionId })
88
89
queueMicrotask(async () => {
90
const { data: replayEvents } = await admin
91
.from('ws_events')
92
.select('*')
93
.eq('session_id', sessionId)
94
.gt('id', lastEventId)
95
.order('id')
96
97
for (const event of replayEvents ?? []) {
98
send(socket, {
99
type: event.event_type,
100
payload: event.payload,
101
event_id: event.id,
102
replay: true,
103
})
104
}
105
})
106
107
setTimeout(() => {
108
send(socket, { type: 'server_restarting' })
109
socket.close(1012, 'Service restart')
110
}, PREEMPTIVE_RESTART_MS)
111
112
return response
113
})

Browser client#

The client stores sessionId and lastEventId in session storage, then reconnects with exponential backoff.

1
let sessionId = sessionStorage.getItem('ws_session_id')
2
let lastEventId = Number(sessionStorage.getItem('last_event_id') || 0)
3
4
function connect(token: string) {
5
const url =
6
`wss://YOUR_PROJECT.functions.supabase.co/websocket-proxy` +
7
`?token=${encodeURIComponent(token)}` +
8
`&lastEventId=${lastEventId}` +
9
(sessionId ? `&sessionId=${sessionId}` : '')
10
11
const ws = new WebSocket(url)
12
13
ws.onmessage = (e) => {
14
const msg = JSON.parse(e.data)
15
16
if (msg.event_id) {
17
lastEventId = Math.max(lastEventId, msg.event_id)
18
sessionStorage.setItem('last_event_id', String(lastEventId))
19
}
20
21
if (msg.type === 'session_init') {
22
sessionId = msg.session_id
23
sessionStorage.setItem('ws_session_id', sessionId)
24
}
25
}
26
}

Why this pattern works#

  • If the worker restarts, the client reconnects with the same session.
  • Replay closes delivery gaps caused by reconnect windows.
  • Idempotency keys prevent duplicate inserts when clients retry.
  • EdgeRuntime.waitUntil() prevents unexpected early termination of idle-looking WebSocket workers.

Next steps#

  • Add row-level security policies for all ws_* tables.
  • Add a heartbeat and cleanup policy for stale sessions.
  • Add structured event payload types and input validation.
  • Add observability dashboards for disconnect rate and replay lag.