Consuming Supabase Queue Messages with Edge Functions
Learn how to consume Supabase Queue messages server-side with a Supabase Edge Function
This guide helps you read & process queue messages server-side with a Supabase Edge Function. Read Queues API Reference for more details on our API.
Concepts
Supabase Queues is a pull-based Message Queue consisting of three main components: Queues, Messages, and Queue Types. You should already be familiar with the Queues Quickstart.
Consuming messages in an Edge Function
This is a Supabase Edge Function that reads 5 messages off the queue, processes each of them, and deletes each message when it is done.
1import 'jsr:@supabase/functions-js/edge-runtime.d.ts'2import { createClient } from 'npm:@supabase/supabase-js@2'34const supabaseUrl = 'supabaseURL'5const supabaseKey = 'supabaseKey'67const supabase = createClient(supabaseUrl, supabaseKey)8const queueName = 'your_queue_name'910// Type definition for queue messages11interface QueueMessage {12 msg_id: bigint13 read_ct: number14 vt: string15 enqueued_at: string16 message: any17}1819async function processMessage(message: QueueMessage) {20 //21 // Do whatever logic you need to with the message content22 //23 // Delete the message from the queue24 const { error: deleteError } = await supabase.schema('pgmq_public').rpc('delete', {25 queue_name: queueName,26 msg_id: message.msg_id,27 })2829 if (deleteError) {30 console.error(`Failed to delete message ${message.msg_id}:`, deleteError)31 } else {32 console.log(`Message ${message.msg_id} deleted from queue`)33 }34}3536Deno.serve(async (req) => {37 const { data: messages, error } = await supabase.schema('pgmq_public').rpc('read', {38 queue_name: queueName,39 sleep_seconds: 0, // Don't wait if queue is empty40 n: 5, // Read 5 messages off the queue41 })4243 if (error) {44 console.error(`Error reading from ${queueName} queue:`, error)45 return new Response(JSON.stringify({ error: error.message }), {46 status: 500,47 headers: { 'Content-Type': 'application/json' },48 })49 }5051 if (!messages || messages.length === 0) {52 console.log('No messages in workflow_messages queue')53 return new Response(JSON.stringify({ message: 'No messages in queue' }), {54 status: 200,55 headers: { 'Content-Type': 'application/json' },56 })57 }5859 console.log(`Found ${messages.length} messages to process`)6061 // Process each message that was read off the queue62 for (const message of messages) {63 try {64 await processMessage(message as QueueMessage)65 } catch (error) {66 console.error(`Error processing message ${message.msg_id}:`, error)67 }68 }6970 // Return immediately while background processing continues71 return new Response(72 JSON.stringify({73 message: `Processing ${messages.length} messages in background`,74 count: messages.length,75 }),76 {77 status: 200,78 headers: { 'Content-Type': 'application/json' },79 }80 )81})Every time this Edge Function is run it:
- Read 5 messages off the queue
- Call the
processMessagefunction - At the end of
processMessage, the message is deleted from the queue - If
processMessagethrows an error, the error is logged. In this case, the message is still in the queue, so the next time this Edge Function runs it reads the message again.
You might find this kind of setup handy to run with Supabase Cron. You can set up Cron so that every N number of minutes or seconds, the Edge Function will run and process a number of messages off the queue.
Similarly, you can invoke the Edge Function on command at any given time with supabase.functions.invoke.