Automatic embeddings
Vector embeddings enable powerful semantic search capabilities in Postgres, but managing them alongside your content has traditionally been complex. This guide demonstrates how to automate embedding generation and updates using Supabase Edge Functions, pgmq, pg_net, and pg_cron.
Understanding the challenge
When implementing semantic search with pgvector, developers typically need to:
- Generate embeddings via an external API (like OpenAI)
- Store these embeddings alongside the content
- Keep embeddings in sync when content changes
- Handle failures and retries in the embedding generation process
While Postgres full-text search can handle this internally through synchronous calls to to_tsvector
and triggers, semantic search requires asynchronous API calls to a provider like OpenAI to generate vector embeddings. This guide demonstrates how to use triggers, queues, and Supabase Edge Functions to bridge this gap.
Understanding the architecture
We'll leverage the following Postgres and Supabase features to create the automated embedding system:
- pgvector: Stores and queries vector embeddings
- pgmq: Queues embedding generation requests for processing and retries
- pg_net: Handles asynchronous HTTP requests to Edge Functions directly from Postgres
- pg_cron: Automatically processes and retries embedding generations
- Triggers: Detects content changes and enqueues embedding generation requests
- Edge Functions: Generates embeddings via an API like OpenAI (customizable)
We'll design the system to:
-
Be generic, so that it can be used with any table and content. This allows you to configure embeddings in multiple places, each with the ability to customize the input used for embedding generation. These will all use the same queue infrastructure and Edge Function to generate the embeddings.
-
Handle failures gracefully, by retrying failed jobs and providing detailed information about the status of each job.
Implementation
We'll start by setting up the infrastructure needed to queue and process embedding generation requests. Then we'll create an example table with triggers to enqueue these embedding requests whenever content is inserted or updated.
Step 1: Enable extensions
First, let's enable the required extensions:
_22-- For vector operations_22create extension if not exists vector_22with_22 schema extensions;_22_22-- For queueing and processing jobs_22-- (pgmq will create its own schema)_22create extension if not exists pgmq;_22_22-- For async HTTP requests_22create extension if not exists pg_net_22with_22 schema extensions;_22_22-- For scheduled processing and retries_22-- (pg_cron will create its own schema)_22create extension if not exists pg_cron;_22_22-- For clearing embeddings during updates_22create extension if not exists hstore_22with_22 schema extensions;
Even though the SQL code is create extension
, this is the equivalent of "enabling the extension".
To disable an extension, call drop extension
.
Step 2: Create utility functions
Before we set up our embedding logic, we need to create some utility functions:
_66-- Schema for utility functions_66create schema util;_66_66-- Utility function to get the Supabase project URL (required for Edge Functions)_66create function util.project_url()_66returns text_66language plpgsql_66security definer_66as $$_66declare_66 secret_value text;_66begin_66 -- Retrieve the project URL from Vault_66 select decrypted_secret into secret_value from vault.decrypted_secrets where name = 'project_url';_66 return secret_value;_66end;_66$$;_66_66-- Generic function to invoke any Edge Function_66create or replace function util.invoke_edge_function(_66 name text,_66 body jsonb,_66 timeout_milliseconds int = 5 * 60 * 1000 -- default 5 minute timeout_66)_66returns void_66language plpgsql_66as $$_66declare_66 headers_raw text;_66 auth_header text;_66begin_66 -- If we're in a PostgREST session, reuse the request headers for authorization_66 headers_raw := current_setting('request.headers', true);_66_66 -- Only try to parse if headers are present_66 auth_header := case_66 when headers_raw is not null then_66 (headers_raw::json->>'authorization')_66 else_66 null_66 end;_66_66 -- Perform async HTTP request to the edge function_66 perform net.http_post(_66 url => util.project_url() || '/functions/v1/' || name,_66 headers => jsonb_build_object(_66 'Content-Type', 'application/json',_66 'Authorization', auth_header_66 ),_66 body => body,_66 timeout_milliseconds => timeout_milliseconds_66 );_66end;_66$$;_66_66-- Generic trigger function to clear a column on update_66create or replace function util.clear_column()_66returns trigger_66language plpgsql as $$_66declare_66 clear_column text := TG_ARGV[0];_66begin_66 NEW := NEW #= hstore(clear_column, NULL);_66 return NEW;_66end;_66$$;
Here we create:
- A schema
util
to store utility functions. - A function to retrieve the Supabase project URL from Vault. We'll add this secret next.
- A generic function to invoke any Edge Function with a given name and request body.
- A generic trigger function to clear a column on update. This function accepts the column name as an argument and sets it to
NULL
in theNEW
record. We'll explain how to use this function later.
Every project has a unique API URL that is required to invoke Edge Functions. Let's go ahead and add the project URL secret to Vault depending on your environment.
When working with a local Supabase stack, add the following to your supabase/seed.sql
file:
_10select_10 vault.create_secret('http://api.supabase.internal:8000', 'project_url');
When deploying to the cloud platform, open the SQL editor and run the following, replacing <project-url>
with your project's API URL:
_10select_10 vault.create_secret('<project-url>', 'project_url');
Step 3: Create queue and triggers
Our goal is to automatically generate embeddings whenever content is inserted or updated within a table. We can use triggers and queues to achieve this. Our approach is to automatically queue embedding jobs whenever records are inserted or updated in a table, then process them asynchronously using a cron job. If a job fails, it will remain in the queue and be retried in the next scheduled task.
First we create a pgmq
queue for processing embedding requests:
_10-- Queue for processing embedding jobs_10select pgmq.create('embedding_jobs');
Next we create a trigger function to queue embedding jobs. We'll use this function to handle both insert and update events:
_22-- Generic trigger function to queue embedding jobs_22create or replace function util.queue_embeddings()_22returns trigger_22language plpgsql_22as $$_22declare_22 content_function text = TG_ARGV[0];_22 embedding_column text = TG_ARGV[1];_22begin_22 perform pgmq.send(_22 queue_name => 'embedding_jobs',_22 msg => jsonb_build_object(_22 'id', NEW.id,_22 'schema', TG_TABLE_SCHEMA,_22 'table', TG_TABLE_NAME,_22 'contentFunction', content_function,_22 'embeddingColumn', embedding_column_22 )_22 );_22 return NEW;_22end;_22$$;
Our util.queue_embeddings
trigger function is generic and can be used with any table and content function. It accepts two arguments:
-
content_function
: The name of a function that returns the text content to be embedded. The function should accept a single row as input and return text (see theembedding_input
example).This allows you to customize the text input passed to the embedding model - for example, you could concatenate multiple columns together like
title
andcontent
and use the result as input. -
embedding_column
: The name of the destination column where the embedding will be stored.
Note that the util.queue_embeddings
trigger function requires a for each row
clause to work correctly. See Usage for an example of how to use this trigger function with your table.
Next we'll create a function to process the embedding jobs. This function will read jobs from the queue, group them into batches, and invoke the Edge Function to generate embeddings. We'll use pg_cron
to schedule this function to run every 10 seconds.
_58-- Function to process embedding jobs from the queue_58create or replace function util.process_embeddings(_58 batch_size int = 10,_58 max_requests int = 10,_58 timeout_milliseconds int = 5 * 60 * 1000 -- default 5 minute timeout_58)_58returns void_58language plpgsql_58as $$_58declare_58 job_batches jsonb[];_58 batch jsonb;_58begin_58 with_58 -- First get jobs and assign batch numbers_58 numbered_jobs as (_58 select_58 message || jsonb_build_object('jobId', msg_id) as job_info,_58 (row_number() over (order by 1) - 1) / batch_size as batch_num_58 from pgmq.read(_58 queue_name => 'embedding_jobs',_58 vt => timeout_milliseconds / 1000,_58 qty => max_requests * batch_size_58 )_58 ),_58 -- Then group jobs into batches_58 batched_jobs as (_58 select_58 jsonb_agg(job_info) as batch_array,_58 batch_num_58 from numbered_jobs_58 group by batch_num_58 )_58 -- Finally aggregate all batches into array_58 select array_agg(batch_array)_58 from batched_jobs_58 into job_batches;_58_58 -- Invoke the embed edge function for each batch_58 foreach batch in array job_batches loop_58 perform util.invoke_edge_function(_58 name => 'embed',_58 body => batch,_58 timeout_milliseconds => timeout_milliseconds_58 );_58 end loop;_58end;_58$$;_58_58-- Schedule the embedding processing_58select_58 cron.schedule(_58 'process-embeddings',_58 '10 seconds',_58 $$_58 select util.process_embeddings();_58 $$_58 );
Let's discuss some common questions about this approach:
Why not generate all embeddings in a single Edge Function request?
While this is possible, it can lead to long processing times and potential timeouts. Batching allows us to process multiple embeddings concurrently and handle failures more effectively.
Why not one request per row?
This approach can lead to API rate limiting and performance issues. Batching provides a balance between efficiency and reliability.
Why queue requests instead of processing them immediately?
Queuing allows us to handle failures gracefully, retry requests, and manage concurrency more effectively. Specifically we are using pgmq
's visibility timeouts to ensure that failed requests are retried.
How do visibility timeouts work?
Every time we read a message from the queue, we set a visibility timeout which tells pgmq
to hide the message from other readers for a certain period. If the Edge Function fails to process the message within this period, the message becomes visible again and will be retried by the next scheduled task.
How do we handle retries?
We use pg_cron
to schedule a task that reads messages from the queue and processes them. If the Edge Function fails to process a message, it becomes visible again after a timeout and can be retried by the next scheduled task.
Is 10 seconds a good interval for processing?
This interval is a good starting point, but you may need to adjust it based on your workload and the time it takes to generate embeddings. You can adjust the batch_size
, max_requests
, and timeout_milliseconds
parameters to optimize performance.
Step 4: Create the Edge Function
Finally we'll create the Edge Function to generate embeddings. We'll use OpenAI's API in this example, but you can replace it with any other embedding generation service.
Use the Supabase CLI to create a new Edge Function:
_10supabase functions new embed
This will create a new directory supabase/functions/embed
with an index.ts
file. Replace the contents of this file with the following:
supabase/functions/embed/index.ts:
_196// Setup type definitions for built-in Supabase Runtime APIs_196import 'jsr:@supabase/functions-js/edge-runtime.d.ts'_196_196// We'll use the OpenAI API to generate embeddings_196import OpenAI from 'jsr:@openai/openai'_196_196import { z } from 'npm:zod'_196_196// We'll make a direct Postgres connection to update the document_196import postgres from 'https://deno.land/x/postgresjs@v3.4.5/mod.js'_196_196// Initialize OpenAI client_196const openai = new OpenAI({_196 // We'll need to manually set the `OPENAI_API_KEY` environment variable_196 apiKey: Deno.env.get('OPENAI_API_KEY'),_196})_196_196// Initialize Postgres client_196const sql = postgres(_196 // `SUPABASE_DB_URL` is a built-in environment variable_196 Deno.env.get('SUPABASE_DB_URL')!_196)_196_196const jobSchema = z.object({_196 jobId: z.number(),_196 id: z.number(),_196 schema: z.string(),_196 table: z.string(),_196 contentFunction: z.string(),_196 embeddingColumn: z.string(),_196})_196_196const failedJobSchema = jobSchema.extend({_196 error: z.string(),_196})_196_196type Job = z.infer<typeof jobSchema>_196type FailedJob = z.infer<typeof failedJobSchema>_196_196type Row = {_196 id: string_196 content: unknown_196}_196_196const QUEUE_NAME = 'embedding_jobs'_196_196// Listen for HTTP requests_196Deno.serve(async (req) => {_196 if (req.method !== 'POST') {_196 return new Response('expected POST request', { status: 405 })_196 }_196_196 if (req.headers.get('content-type') !== 'application/json') {_196 return new Response('expected json body', { status: 400 })_196 }_196_196 // Use Zod to parse and validate the request body_196 const parseResult = z.array(jobSchema).safeParse(await req.json())_196_196 if (parseResult.error) {_196 return new Response(`invalid request body: ${parseResult.error.message}`, {_196 status: 400,_196 })_196 }_196_196 const pendingJobs = parseResult.data_196_196 // Track jobs that completed successfully_196 const completedJobs: Job[] = []_196_196 // Track jobs that failed due to an error_196 const failedJobs: FailedJob[] = []_196_196 async function processJobs() {_196 let currentJob: Job | undefined_196_196 while ((currentJob = pendingJobs.shift()) !== undefined) {_196 try {_196 await processJob(currentJob)_196 completedJobs.push(currentJob)_196 } catch (error) {_196 failedJobs.push({_196 ...currentJob,_196 error: error instanceof Error ? error.message : JSON.stringify(error),_196 })_196 }_196 }_196 }_196_196 try {_196 // Process jobs while listening for worker termination_196 await Promise.race([processJobs(), catchUnload()])_196 } catch (error) {_196 // If the worker is terminating (e.g. wall clock limit reached),_196 // add pending jobs to fail list with termination reason_196 failedJobs.push(_196 ...pendingJobs.map((job) => ({_196 ...job,_196 error: error instanceof Error ? error.message : JSON.stringify(error),_196 }))_196 )_196 }_196_196 // Log completed and failed jobs for traceability_196 console.log('finished processing jobs:', {_196 completedJobs: completedJobs.length,_196 failedJobs: failedJobs.length,_196 })_196_196 return new Response(_196 JSON.stringify({_196 completedJobs,_196 failedJobs,_196 }),_196 {_196 // 200 OK response_196 status: 200,_196_196 // Custom headers to report job status_196 headers: {_196 'Content-Type': 'application/json',_196 'X-Completed-Jobs': completedJobs.length.toString(),_196 'X-Failed-Jobs': failedJobs.length.toString(),_196 },_196 }_196 )_196})_196_196/**_196 * Generates an embedding for the given text._196 */_196async function generateEmbedding(text: string) {_196 const response = await openai.embeddings.create({_196 model: 'text-embedding-3-small',_196 input: text,_196 })_196 const [data] = response.data_196_196 if (!data) {_196 throw new Error('failed to generate embedding')_196 }_196_196 return data.embedding_196}_196_196/**_196 * Processes an embedding job._196 */_196async function processJob(job: Job) {_196 const { jobId, id, schema, table, contentFunction, embeddingColumn } = job_196_196 // Fetch content for the schema/table/row combination_196 const [row]: [Row] = await sql`_196 select_196 id,_196 ${sql(contentFunction)}(t) as content_196 from_196 ${sql(schema)}.${sql(table)} t_196 where_196 id = ${id}_196 `_196_196 if (!row) {_196 throw new Error(`row not found: ${schema}.${table}/${id}`)_196 }_196_196 if (typeof row.content !== 'string') {_196 throw new Error(`invalid content - expected string: ${schema}.${table}/${id}`)_196 }_196_196 const embedding = await generateEmbedding(row.content)_196_196 await sql`_196 update_196 ${sql(schema)}.${sql(table)}_196 set_196 ${sql(embeddingColumn)} = ${JSON.stringify(embedding)}_196 where_196 id = ${id}_196 `_196_196 await sql`_196 select pgmq.delete(${QUEUE_NAME}, ${jobId}::bigint)_196 `_196}_196_196/**_196 * Returns a promise that rejects if the worker is terminating._196 */_196function catchUnload() {_196 return new Promise((reject) => {_196 addEventListener('beforeunload', (ev: any) => {_196 reject(new Error(ev.detail?.reason))_196 })_196 })_196}
The Edge Function listens for incoming HTTP requests from pg_net
and processes each embedding job. It is a generic worker that can handle embedding jobs for any table and column. It uses OpenAI's API to generate embeddings and updates the corresponding row in the database. It also deletes the job from the queue once it has been processed.
The function is designed to process multiple jobs independently. If one job fails, it will not affect the processing of other jobs. The function returns a 200 OK
response with a list of completed and failed jobs. We can use this information to diagnose failed jobs. See Troubleshooting for more details.
You will need to set the OPENAI_API_KEY
environment variable to authenticate with OpenAI. When running the Edge Function locally, you can add it to a .env
file:
.env:
_10OPENAI_API_KEY=your-api-key
When you're ready to deploy the Edge Function, set can set the environment variable using the Supabase CLI:
_10supabase secrets set --env-file .env
or
_10supabase secrets set OPENAI_API_KEY=<your-api-key>
Alternatively, you can replace the generateEmbedding
function with your own embedding generation logic.
See Deploy to Production for more information on how to deploy the Edge Function.
Usage
Now that the infrastructure is in place, let's go through an example of how to use this system to automatically generate embeddings for a table of documents. You can use this approach with multiple tables and customize the input for each embedding generation as needed.
1. Create table to store documents with embeddings
We'll set up a new documents
table that will store our content and embeddings:
_11-- Table to store documents with embeddings_11create table documents (_11 id integer primary key generated always as identity,_11 title text not null,_11 content text not null,_11 embedding halfvec,_11 created_at timestamp with time zone default now()_11);_11_11-- Index for vector search over document embeddings_11create index on documents using hnsw (embedding halfvec_cosine_ops);
Our documents
table stores the title and content of each document along with its vector embedding. We use a halfvec
column to store the embeddings, which is a pgvector
data type that stores float values in half precision (16 bits) to save space.
We use an HNSW index on the vector column. Note that we are choosing halfvec_cosine_ops
as the index method, which means our future queries will need to use cosine distance (<=>
) to find similar embeddings. Also note that HNSW indexes support a maximum of 4000 dimensions for halfvec
vectors, so keep this in mind when choosing an embedding model. If your model generates embeddings with more than 4000 dimensions, you will need to reduce the dimensionality before indexing them. See Matryoshka embeddings for a potential solution to shortening dimensions.
Also note that the table must have a primary key column named id
for our triggers to work correctly with the util.queue_embeddings
function and for our Edge Function to update the correct row.
2. Create triggers to enqueue embedding jobs
Now we'll set up the triggers to enqueue embedding jobs whenever content is inserted or updated:
_25-- Customize the input for embedding generation_25-- e.g. Concatenate title and content with a markdown header_25create or replace function embedding_input(doc documents)_25returns text_25language plpgsql_25immutable_25as $$_25begin_25 return '# ' || doc.title || E'\n\n' || doc.content;_25end;_25$$;_25_25-- Trigger for insert events_25create trigger embed_documents_on_insert_25 after insert_25 on documents_25 for each row_25 execute function util.queue_embeddings('embedding_input', 'embedding');_25_25-- Trigger for update events_25create trigger embed_documents_on_update_25 after update of title, content -- must match the columns in embedding_input()_25 on documents_25 for each row_25 execute function util.queue_embeddings('embedding_input', 'embedding');
We create 2 triggers:
-
embed_documents_on_insert
: Enqueues embedding jobs whenever new rows are inserted into thedocuments
table. -
embed_documents_on_update
: Enqueues embedding jobs whenever thetitle
orcontent
columns are updated in thedocuments
table.
Both of these triggers use the same util.queue_embeddings
function that will queue the embedding jobs for processing. They accept 2 arguments:
-
embedding_input
: The name of the function that generates the input for embedding generation. This function allows you to customize the text input passed to the embedding model (e.g. concatenating the title and content). The function should accept a single row as input and return text. -
embedding
: The name of the destination column where the embedding will be stored.
Note that the update trigger only fires when the title
or content
columns are updated. This is to avoid unnecessary updates to the embedding column when other columns are updated. Make sure that these columns match the columns used in the embedding_input
function.
(Optional) Clearing embeddings on update
Note that our trigger will enqueue new embedding jobs when content is updated, but it will not clear any existing embeddings. This means that an embedding can be temporarily out of sync with the content until the new embedding is generated and updated.
If it is more important to have accurate embeddings than any embedding, you can add another trigger to clear the existing embedding until the new one is generated:
_10-- Trigger to clear the embedding column on update_10create trigger clear_document_embedding_on_update_10 before update of title, content -- must match the columns in embedding_input()_10 on documents_10 for each row_10 execute function util.clear_column('embedding');
util.clear_column
is a generic trigger function we created earlier that can be used to clear any column in a table.
- It accepts the column name as an argument. This column must be nullable.
- It requires a
before
trigger with afor each row
clause. - It requires the
hstore
extension we created earlier.
This example will clear the embedding
column whenever the title
or content
columns are updated (note the of title, content
clause). This ensures that the embedding is always in sync with the title and content, but it will result in temporary gaps in search results until the new embedding is generated.
We intentionally use a before
trigger because it allows us to modify the record before it's written to disk, avoiding an extra update
statement that would be needed with an after
trigger.
3. Insert and update documents
Let's insert a new document and update its content to see the embedding generation in action:
_10-- Insert a new document_10insert into documents (title, content)_10values_10 ('Understanding Vector Databases', 'Vector databases are specialized...');_10_10-- Immediately check the embedding column_10select id, embedding_10from documents_10where title = 'Understanding Vector Databases';
You should observe that the embedding
column is initially null
after inserting the document. This is because the embedding generation is asynchronous and will be processed by the Edge Function in the next scheduled task.
Wait up to 10 seconds for the next task to run, then check the embedding
column again:
_10select id, embedding_10from documents_10where title = 'Understanding Vector Databases';
You should see the generated embedding for the document.
Next let's update the content of the document:
_10-- Update the content of the document_10update documents_10set content = 'Vector databases allow you to query...'_10where title = 'Understanding Vector Databases';_10_10-- Immediately check the embedding column_10select id, embedding_10from documents_10where title = 'Understanding Vector Databases';
You should observe that the embedding
column is reset to null
after updating the content. This is because of the trigger we added to clear existing embeddings whenever the content is updated. The embedding will be regenerated by the Edge Function in the next scheduled task.
Wait up to 10 seconds for the next task to run, then check the embedding
column again:
_10select id, embedding_10from documents_10where title = 'Understanding Vector Databases';
You should see the updated embedding for the document.
Finally we'll update the title of the document:
_10-- Update the title of the document_10update documents_10set title = 'Understanding Vector Databases with Supabase'_10where title = 'Understanding Vector Databases';
You should observe that the embedding
column is once again reset to null
after updating the title. This is because the trigger we added to clear existing embeddings fires when either the content
or title
columns are updated. The embedding will be regenerated by the Edge Function in the next scheduled task.
Wait up to 10 seconds for the next task to run, then check the embedding
column again:
_10select id, embedding_10from documents_10where title = 'Understanding Vector Databases with Supabase';
You should see the updated embedding for the document.
Troubleshooting
The embed
Edge Function processes a batch of embedding jobs and returns a 200 OK
response with a list of completed and failed jobs in the body. For example:
_23{_23 "completedJobs": [_23 {_23 "jobId": "1",_23 "id": "1",_23 "schema": "public",_23 "table": "documents",_23 "contentFunction": "embedding_input",_23 "embeddingColumn": "embedding"_23 }_23 ],_23 "failedJobs": [_23 {_23 "jobId": "2",_23 "id": "2",_23 "schema": "public",_23 "table": "documents",_23 "contentFunction": "embedding_input",_23 "embeddingColumn": "embedding",_23 "error": "error connecting to openai api"_23 }_23 ]_23}
It also returns the number of completed and failed jobs in the response headers. For example:
_10X-Completed-Jobs: 1_10X-Failed-Jobs: 1
You can also use the X-Deno-Execution-Id
header to trace the execution of the Edge Function within the dashboard logs.
Each failed job includes an error
field with a description of the failure. Reasons for a job failing could include:
- An error generating the embedding via external API
- An error connecting to the database
- The edge function being terminated (e.g. due to a wall clock limit)
- Any other error thrown during processing
pg_net
stores HTTP responses in the net._http_response
table, which can be queried to diagnose issues with the embedding generation process.
_10select_10 *_10from_10 net._http_response_10where_10 (headers->>'X-Failed-Jobs')::int > 0;
Conclusion
Automating embedding generation and updates in Postgres allow you to build powerful semantic search capabilities without the complexity of managing embeddings manually.
By combining Postgres features like triggers, queues, and other extensions with Supabase Edge Functions, we can create a robust system that handles embedding generation asynchronously and retries failed jobs automatically.
This system can be customized to work with any content and embedding generation service, providing a flexible and scalable solution for semantic search in Postgres.