Queues

Consuming Supabase Queue Messages with Edge Functions

Learn how to consume Supabase Queue messages server-side with a Supabase Edge Function


This guide helps you read & process queue messages server-side with a Supabase Edge Function. Read Queues API Reference for more details on our API.

Concepts

Supabase Queues is a pull-based Message Queue consisting of three main components: Queues, Messages, and Queue Types. You should already be familiar with the Queues Quickstart.

Consuming messages in an Edge Function

This is a Supabase Edge Function that reads 5 messages off the queue, processes each of them, and deletes each message when it is done.

1
import 'jsr:@supabase/functions-js/edge-runtime.d.ts'
2
import { createClient } from 'npm:@supabase/supabase-js@2'
3
4
const supabaseUrl = 'supabaseURL'
5
const supabaseKey = 'supabaseKey'
6
7
const supabase = createClient(supabaseUrl, supabaseKey)
8
const queueName = 'your_queue_name'
9
10
// Type definition for queue messages
11
interface QueueMessage {
12
msg_id: bigint
13
read_ct: number
14
vt: string
15
enqueued_at: string
16
message: any
17
}
18
19
async function processMessage(message: QueueMessage) {
20
//
21
// Do whatever logic you need to with the message content
22
//
23
// Delete the message from the queue
24
const { error: deleteError } = await supabase.schema('pgmq_public').rpc('delete', {
25
queue_name: queueName,
26
msg_id: message.msg_id,
27
})
28
29
if (deleteError) {
30
console.error(`Failed to delete message ${message.msg_id}:`, deleteError)
31
} else {
32
console.log(`Message ${message.msg_id} deleted from queue`)
33
}
34
}
35
36
Deno.serve(async (req) => {
37
const { data: messages, error } = await supabase.schema('pgmq_public').rpc('read', {
38
queue_name: queueName,
39
sleep_seconds: 0, // Don't wait if queue is empty
40
n: 5, // Read 5 messages off the queue
41
})
42
43
if (error) {
44
console.error(`Error reading from ${queueName} queue:`, error)
45
return new Response(JSON.stringify({ error: error.message }), {
46
status: 500,
47
headers: { 'Content-Type': 'application/json' },
48
})
49
}
50
51
if (!messages || messages.length === 0) {
52
console.log('No messages in workflow_messages queue')
53
return new Response(JSON.stringify({ message: 'No messages in queue' }), {
54
status: 200,
55
headers: { 'Content-Type': 'application/json' },
56
})
57
}
58
59
console.log(`Found ${messages.length} messages to process`)
60
61
// Process each message that was read off the queue
62
for (const message of messages) {
63
try {
64
await processMessage(message as QueueMessage)
65
} catch (error) {
66
console.error(`Error processing message ${message.msg_id}:`, error)
67
}
68
}
69
70
// Return immediately while background processing continues
71
return new Response(
72
JSON.stringify({
73
message: `Processing ${messages.length} messages in background`,
74
count: messages.length,
75
}),
76
{
77
status: 200,
78
headers: { 'Content-Type': 'application/json' },
79
}
80
)
81
})

Every time this Edge Function is run it:

  1. Read 5 messages off the queue
  2. Call the processMessage function
  3. At the end of processMessage, the message is deleted from the queue
  4. If processMessage throws an error, the error is logged. In this case, the message is still in the queue, so the next time this Edge Function runs it reads the message again.

You might find this kind of setup handy to run with Supabase Cron. You can set up Cron so that every N number of minutes or seconds, the Edge Function will run and process a number of messages off the queue.

Similarly, you can invoke the Edge Function on command at any given time with supabase.functions.invoke.