# Resumable WebSockets with Edge Functions

Build reconnect-safe WebSockets with event replay, idempotency keys, and graceful restarts.

This example shows how to build a reconnect-safe chat stream on Supabase Edge Functions using:

- WebSocket upgrade + JWT auth
- Postgres-backed session and event persistence
- Event replay with `lastEventId`
- Idempotent user messages with `idempotency_key`
- Graceful client reconnects during worker restarts

Reference implementation: [Building Resumable WebSockets with Supabase Edge Functions and Postgres](https://blog.mansueli.com/building-resumable-websockets-with-supabase-edge-functions-and-postgres)

## Architecture

1. Client connects with a user JWT, plus optional `sessionId` and `lastEventId`.
2. Function verifies auth and either resumes or creates a session.
3. Every message is written to `ws_events` with an incrementing `id`.
4. On reconnect, server replays events where `id > lastEventId`.
5. Client updates local `lastEventId` and resumes without losing messages.

## Database schema

```sql
create extension if not exists pgcrypto;

create table ws_sessions (
  id uuid primary key default gen_random_uuid(),
  user_id uuid not null,
  created_at timestamptz default now(),
  updated_at timestamptz default now(),
  last_event_id bigint default 0
);

create table ws_events (
  id bigint generated by default as identity primary key,
  session_id uuid not null references ws_sessions(id) on delete cascade,
  event_type text not null,
  payload jsonb not null,
  created_at timestamptz default now()
);
create index ws_events_session_id_id_idx on ws_events(session_id, id);

create table ws_idempotency_keys (
  session_id uuid not null references ws_sessions(id) on delete cascade,
  idempotency_key uuid not null,
  primary key(session_id, idempotency_key)
);

create unlogged table ws_live_connections (
  session_id uuid primary key,
  connected_at timestamptz default now(),
  last_seen_at timestamptz default now(),
  edge_region text
);
```

## Edge Function (WebSocket proxy)

Use `supabase functions serve --no-verify-jwt` and validate JWT inside the function.

```ts
import { createAdminClient, createContextClient, verifyCredentials } from '@supabase/server/core'

const PREEMPTIVE_RESTART_MS = 340_000

function send(socket: WebSocket, payload: unknown) {
  if (socket.readyState === WebSocket.OPEN) {
    socket.send(JSON.stringify(payload))
  }
}

Deno.serve(async (req) => {
  const url = new URL(req.url)
  const token = url.searchParams.get('token')
  if (!token) return new Response('Missing token', { status: 401 })

  const { data: auth, error } = await verifyCredentials({ token, apikey: null }, { auth: 'user' })
  if (error || !auth?.userClaims?.id) {
    return new Response('Unauthorized', { status: 401 })
  }

  const admin = createAdminClient()
  const { socket, response } = Deno.upgradeWebSocket(req, { idleTimeout: 0 })

  // Prevent EarlyDrop by keeping a pending promise until socket close.
  let resolveClosed!: () => void
  const closed = new Promise<void>((resolve) => {
    resolveClosed = resolve
  })
  // @ts-ignore
  EdgeRuntime.waitUntil(closed)

  const requestedSessionId = url.searchParams.get('sessionId')
  const lastEventId = Number(url.searchParams.get('lastEventId') || 0)
  const sessionId = requestedSessionId ?? crypto.randomUUID()

  socket.onclose = () => {
    resolveClosed()
  }

  socket.onmessage = async (event) => {
    const msg = JSON.parse(event.data)

    if (msg.type === 'user_message') {
      const { error: idempotencyError } = await admin.from('ws_idempotency_keys').upsert(
        {
          session_id: sessionId,
          idempotency_key: msg.idempotency_key,
        },
        { onConflict: 'session_id,idempotency_key', ignoreDuplicates: true }
      )

      let userEvent

      if (idempotencyError) {
        // Conflict detected - this is a retry, fetch the existing event
        const { data: existingEvent } = await admin
          .from('ws_events')
          .select()
          .eq('session_id', sessionId)
          .eq('idempotency_key', msg.idempotency_key)
          .single()

        userEvent = existingEvent
      } else {
        // New idempotency key - insert the event
        const { data: newEvent } = await admin
          .from('ws_events')
          .insert({
            session_id: sessionId,
            event_type: 'user_message',
            payload: { content: msg.content },
          })
          .select()
          .single()

        userEvent = newEvent
      }

      send(socket, {
        type: 'user_message',
        payload: userEvent?.payload,
        event_id: userEvent?.id,
      })
    }
  }

  send(socket, { type: 'session_init', session_id: sessionId })

  queueMicrotask(async () => {
    const { data: replayEvents } = await admin
      .from('ws_events')
      .select('*')
      .eq('session_id', sessionId)
      .gt('id', lastEventId)
      .order('id')

    for (const event of replayEvents ?? []) {
      send(socket, {
        type: event.event_type,
        payload: event.payload,
        event_id: event.id,
        replay: true,
      })
    }
  })

  setTimeout(() => {
    send(socket, { type: 'server_restarting' })
    socket.close(1012, 'Service restart')
  }, PREEMPTIVE_RESTART_MS)

  return response
})
```

## Browser client

The client stores `sessionId` and `lastEventId` in session storage, then reconnects with exponential backoff.

```ts
let sessionId = sessionStorage.getItem('ws_session_id')
let lastEventId = Number(sessionStorage.getItem('last_event_id') || 0)

function connect(token: string) {
  const url =
    `wss://YOUR_PROJECT.functions.supabase.co/websocket-proxy` +
    `?token=${encodeURIComponent(token)}` +
    `&lastEventId=${lastEventId}` +
    (sessionId ? `&sessionId=${sessionId}` : '')

  const ws = new WebSocket(url)

  ws.onmessage = (e) => {
    const msg = JSON.parse(e.data)

    if (msg.event_id) {
      lastEventId = Math.max(lastEventId, msg.event_id)
      sessionStorage.setItem('last_event_id', String(lastEventId))
    }

    if (msg.type === 'session_init') {
      sessionId = msg.session_id
      sessionStorage.setItem('ws_session_id', sessionId)
    }
  }
}
```

## Why this pattern works

- If the worker restarts, the client reconnects with the same session.
- Replay closes delivery gaps caused by reconnect windows.
- Idempotency keys prevent duplicate inserts when clients retry.
- `EdgeRuntime.waitUntil()` prevents unexpected early termination of idle-looking WebSocket workers.

## Next steps

- Add row-level security policies for all `ws_*` tables.
- Add a heartbeat and cleanup policy for stale sessions.
- Add structured event payload types and input validation.
- Add observability dashboards for disconnect rate and replay lag.