Blog post

Postgres Auditing in 150 lines of SQL

2022-03-08

21 minute read

Data auditing is a system that tracks changes to tables' contents over time. PostgreSQL has a robust set of features which we can leverage to create a generic auditing solution in 150 lines of SQL.

Auditing is particularly useful for historical analysis. To demonstrate, imagine you have a users table that tracks when a user is online. You might add a status column which can have one of two values: online and offline. How would you track how long a user is online for throughout an entire month? An auditing system would track every change with timestamps, and so you can measure the difference between each timestamp and sum them up for the entire month.

The goals of our auditing solution are:

  • low maintenance
  • easy to use
  • fast to query

To demonstrate what we're working towards, the following example shows what we'll have at the end of the blog post:

-- create a table
create table public.members(
    id int primary key,
    name text not null
);

-- Enable auditing on the new table
select audit.enable_tracking('public.members');

Produce some records to audit

-- create a new record
insert into public.members(id, name) values (1, 'foo');

-- edit the record
update public.members set name = 'bar' where id = 1;

-- delete the record
delete from public.members;

Review the audit log

select * from audit.record_history
id | record_id | old_record_id | op | ts | table_oid | table_schema | table_name | record | old_record  
----+--------------------------------------+--------------------------------------+--------+-------------------------------------+-----------+--------------+------------+--------------------------+--------------------------
2 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | | INSERT | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | {"id": 1, "name": "foo"} |
3 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | UPDATE | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | {"id": 1, "name": "bar"} | {"id": 1, "name": "foo"}
4 | | 1ecd5ff0-1b6b-5bc2-ad80-1cb19769c081 | DELETE | Mon Feb 28 18:13:52.698511 2022 PST | 16452 | public | members | | {"id": 1, "name": "bar"}
(3 rows)

Notice that our record_id and old_record_id stayed constant as we updated the row so we can easily query for a single row's history over time!

Lets get building

Namespace

To quote a tenet from the zen of python:

Namespaces are one honking great idea -- let's do more of those!

So first things first, we'll create a separate schema named audit to house our auditing entities.

create schema if not exists audit;

Storage

Next, we need a table to track inserts, updates and deletes.

Classically, an audit table's schema mirrors the table being audited and appends some metadata columns like the commit's timestamp. That solution has a few maintenance challenges:

  • enabling auditing on a table requires a database migration
  • when the source table's schema changes, the audit table's schema must also change

So instead, we'll lean on PostgreSQL's schema-less JSONB data type to store each record's data in a single column. That approach has the added benefit of allowing us to store multiple tables' audit history in a single audit table.

create table audit.record_version(
  id             bigserial primary key,
  -- auditing metadata
  record_id      uuid, -- identifies a new record by it's table + primary key
  old_record_id  uuid, -- ^
  op             varchar(8) not null, -- INSERT/UPDATE/DELETE/TRUNCATE
  ts             timestamptz not null default now(),
  -- table identifiers
  table_oid      oid not null,  -- pg internal id for a table
  table_schema   name not null, -- audited table's schema name e.g. 'public'
  table_name     name not null, -- audited table's table name e.g. 'account'
  -- record data
  record         jsonb, -- contents of the new record
  old_record     jsonb  -- previous record contents (for UPDATE/DELETE)
);
Postgres version compatibility

The table above uses PostgreSQL's built-in uuid functionality, which is available from version 14. For backwards compatibility you can use the uuid-ossp extension.

create extension if not exists "uuid-ossp";

Query Patterns

An audit log doesn't do us much good if its too slow to query! There are 2 query patterns we think are table stakes (😉) for an audit system:

Changes to a Table in a Time Range

For time slices, we need an index on the ts column. Since the table is append-only and the ts column is populated by insertion date, our values for ts are naturally in ascending order.

PostgreSQL's builtin BRIN index can leverage that correlation between value and physical location to produce an index that, at scale, is many hundreds of times smaller than the default (BTREE index) with faster lookup times.

-- index ts for time range filtering
create index record_version_ts
  on audit.record_version
  using brin(ts);

For table filtering, we've included a table_oid column which tracks PostgreSQL's internal numeric table identifier. We can add an index to this column instead of the table_schema and table_name columns, minimizing the index size and offering better performance.

-- index table_oid for table filtering
create index record_version_table_oid
  on audit.record_version
  using btree(table_oid);

Changes to a Record Over Time

One of the downsides to storing each row's data as jsonb is that filtering based on a column's value becomes very inefficient. If we want to look up a row's history quickly, we need to extract and index a unique identifier for each row.

For the globally unique identifier, we'll use the following structure

[table_oid, primary_key_value_1, primary_key_value_2, ...]

and hash that array as a UUID v5 to get an efficiently indexable UUID type to identify the row that is robust to data changes.

We'll use one utility function to lookup a record's primary key column names:

create or replace function audit.primary_key_columns(entity_oid oid)
    returns text[]
    stable
    security definer
    language sql
as $$
    -- Looks up the names of a table's primary key columns
    select
        coalesce(
            array_agg(pa.attname::text order by pa.attnum),
            array[]::text[]
        ) column_names
    from
        pg_index pi
        join pg_attribute pa
            on pi.indrelid = pa.attrelid
            and pa.attnum = any(pi.indkey)
    where
        indrelid = $1
        and indisprimary
$$;

and another to consume the table_oid and primary key, converting the result into the record's UUID.

create or replace function audit.to_record_id(
        entity_oid oid,
        pkey_cols text[],
        rec jsonb
)
    returns uuid
    stable
    language sql
as $$
    select
        case
            when rec is null then null
                        -- if no primary key exists, use a random uuid
            when pkey_cols = array[]::text[] then uuid_generate_v4()
            else (
                select
                    uuid_generate_v5(
                        'fd62bc3d-8d6e-43c2-919c-802ba3762271',
                        (
                                                    jsonb_build_array(to_jsonb($1))
                                                    || jsonb_agg($3 ->> key_)
                                                )::text
                    )
                from
                    unnest($2) x(key_)
            )
        end
$$;

Finally, we index the record_id and old_record_id columns that contain these unique identifiers for fast querying.

-- index record_id for fast searching
create index record_version_record_id
    on audit.record_version(record_id)
    where record_id is not null;

-- index old_record_id for fast searching
create index record_version_old_record_id
    on audit.record_version(record_id)
  where old_record_id is not null;

Enrollment

Okay, so we have a home for our audit data that we're confident it can be queried efficiently. Now how do we populate it?

We need the audit table to populate without end-users making any changes to their transactions. So we'll set up a trigger to fire when the data changes. In this case, we'll fire the trigger once for every inserted/updated/deleted row.

create or replace function audit.insert_update_delete_trigger()
    returns trigger
    security definer
    language plpgsql
as $$
declare
    pkey_cols text[] = audit.primary_key_columns(TG_RELID);
    record_jsonb jsonb = to_jsonb(new);
    record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);
    old_record_jsonb jsonb = to_jsonb(old);
    old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);
begin

    insert into audit.record_version(
        record_id,
        old_record_id,
        op,
        table_oid,
        table_schema,
        table_name,
        record,
        old_record
    )
    select
        record_id,
        old_record_id,
        TG_OP,
        TG_RELID,
        TG_TABLE_SCHEMA,
        TG_TABLE_NAME,
        record_jsonb,
        old_record_jsonb;

    return coalesce(new, old);
end;
$$;

Public API

Finally, we'll wrap up the trigger creation and removal process behind a clean, idempotent, user facing API.

The API we'll expose for enabling auditing on a table is

select audit.enable_tracking('<schema>.<table>'::regclass);

and for disabling tracking

select audit.disable_tracking('<schema>.<table>'::regclass);

Under the hood, those functions register our auditing trigger against the requested table.

create or replace function audit.enable_tracking(regclass)
    returns void
    volatile
    security definer
    language plpgsql
as $$
declare
    statement_row text = format('
        create trigger audit_i_u_d
            before insert or update or delete
            on %I
            for each row
            execute procedure audit.insert_update_delete_trigger();',
        $1
    );

    pkey_cols text[] = audit.primary_key_columns($1);
begin
    if pkey_cols = array[]::text[] then
        raise exception 'Table % can not be audited because it has no primary key', $1;
    end if;

    if not exists(select 1 from pg_trigger where tgrelid = $1 and tgname = 'audit_i_u_d') then
        execute statement_row;
    end if;
end;
$$;

create or replace function audit.disable_tracking(regclass)
    returns void
    volatile
    security definer
    language plpgsql
as $$
declare
    statement_row text = format(
        'drop trigger if exists audit_i_u_d on %I;',
        $1
    );
begin
    execute statement_row;
end;
$$;

And we're done with 2 lines of code to spare!

Performance

Auditing tables always reduces throughput of inserts, updates, and deletes. In cases where throughput is less than 1000 writes per second the overhead is typically negligible. For tables with a higher write frequency, consider logging changes outside of SQL with a tool like pgAudit.

Do I really expect you to copy/paste all that?

Nope, for a turnkey solution to auditing in PostgreSQL, we've packaged this script into an extension with some extra goodies like TRUNCATE support. Check it out at https://github.com/supabase/supa_audit.

Share this article

Last post

Should I Open Source my Company?

25 March 2022

Next post

Supabase Beta January 2022

22 February 2022

Related articles

Postgres WASM by Snaplet and Supabase

Choosing a Postgres Primary Key

Implementing "seen by" functionality with Postgres

Postgres Auditing in 150 lines of SQL

Protecting reserved roles with PostgreSQL Hooks

Build in a weekend, scale to millions