Last updated 2026-05-28

Postgres store

The Postgres driver is the typical production shape: two tables, two indexes per table, pgx v5 connection pool, embedded migrations applied under pg_advisory_lock. Composite uniqueness is enforced by Postgres, not the application — that is what makes the conformance suite's same-key concurrency tests pass cleanly.

When you'd use this

Default choice for production unless you already run EntDB. Works with any managed Postgres 14+ (RDS, Azure Database for PostgreSQL, Cloud SQL).

Configuration

-e NOTIFY_STORE_DRIVER=postgres \
-e NOTIFY_POSTGRES_DSN='postgres://notify:notify@db.internal:5432/notify?sslmode=require' \
-e NOTIFY_POSTGRES_AUTOMIGRATE=true

See Store Setup → Postgres for the DSN options and the provisioning SQL.

Schema

CREATE TABLE notify_notifications (
id uuid PRIMARY KEY,
tenant_id text NOT NULL,
user_id text NOT NULL,
notification_id text NOT NULL,
subject_ref text NOT NULL DEFAULT '',
subject_type text NOT NULL DEFAULT '',
title text NOT NULL DEFAULT '',
body text NOT NULL DEFAULT '',
channel text NOT NULL DEFAULT '',
status text NOT NULL,
created_at_ms bigint NOT NULL,
delivered_at_ms bigint NOT NULL DEFAULT 0,
ack_at_ms bigint NOT NULL DEFAULT 0,
read_at_ms bigint NOT NULL DEFAULT 0,
UNIQUE (tenant_id, user_id, notification_id)
);
CREATE INDEX notify_notifications_user_created
ON notify_notifications (tenant_id, user_id, created_at_ms DESC, id DESC);
CREATE INDEX notify_notifications_user_unread
ON notify_notifications (tenant_id, user_id)
WHERE status <> 'read';
CREATE TABLE notify_devices (
id uuid PRIMARY KEY,
tenant_id text NOT NULL,
user_id text NOT NULL,
device_type text NOT NULL,
token text NOT NULL DEFAULT '',
created_at_ms bigint NOT NULL DEFAULT 0,
last_active_ms bigint NOT NULL DEFAULT 0,
UNIQUE (tenant_id, user_id, device_type)
);
CREATE INDEX notify_devices_user
ON notify_devices (tenant_id, user_id, device_type);

Three things matter about this schema:

  1. Composite uniqueness is enforced by Postgres. The UNIQUE(tenant_id, user_id, notification_id) constraint is what makes ON CONFLICT DO NOTHING safe for the idempotent CreateNotification and what makes the same-key concurrency test pass without an application-level mutex.
  2. The paging index is the tuple the query sorts by. (tenant_id, user_id, created_at_ms DESC, id DESC) matches ORDER BY created_at_ms DESC, id DESC, so paging is an index scan, not a sort node.
  3. The unread partial index keeps unreadCount cheap. Even with millions of read rows, the unread count is a scan over the partial index.

Idempotency / upsert pattern

CreateNotification:

INSERT INTO notify_notifications (
id, tenant_id, user_id, notification_id,
subject_ref, subject_type, title, body, channel,
status, created_at_ms
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (tenant_id, user_id, notification_id) DO NOTHING
RETURNING id;

If RETURNING produces a row, the writer won the insert and gets created=true. If pgx.ErrNoRows comes back (ON CONFLICT absorbed the dup), a follow-up SELECT recovers the canonical id and returns created=false. This satisfies both the basic Idempotency test and the same-key concurrency test: exactly one goroutine gets the RETURNING row, the rest take the recovery path.

UpsertDevice:

INSERT INTO notify_devices (
id, tenant_id, user_id, device_type, token,
created_at_ms, last_active_ms
) VALUES ($1, $2, $3, $4, $5, $6, $7)
ON CONFLICT (tenant_id, user_id, device_type) DO UPDATE SET
token = EXCLUDED.token,
last_active_ms = EXCLUDED.last_active_ms
RETURNING *;

created_at_ms is intentionally not touched on conflict — token rotation preserves the original create time.

UpdateStatus stamps the matching *_at_ms in one statement via a CASE:

UPDATE notify_notifications
SET status = $2,
delivered_at_ms = CASE WHEN $2 = 'delivered' THEN $3 ELSE delivered_at_ms END,
ack_at_ms = CASE WHEN $2 = 'acked' THEN $3 ELSE ack_at_ms END,
read_at_ms = CASE WHEN $2 = 'read' THEN $3 ELSE read_at_ms END
WHERE tenant_id = $1 AND id = $4;

A RowsAffected() == 0 result maps to notify.ErrNotFound.

Pagination

  • Order: ORDER BY created_at_ms DESC, id DESC.
  • Cursor: strconv.FormatInt(lastRow.CreatedAtMS, 10) — same wire format as the memory driver, so cross-driver tests interchange.
  • Next page: WHERE created_at_ms < $cursor (strict <, per the contract).
  • Detect next page: LIMIT n+1. If you get back n+1 rows, slice to n and emit nextCursor = items[n-1].CreatedAtMS. Saves a second round-trip and a count(*).
  • unreadCount is a separate SELECT count(*) FROM ... WHERE status <> 'read' — independent of the page window and any UnreadOnly filter.

Migrations

Embedded as a Go slice in store/postgres/migrations.go; applied under pg_advisory_lock(0x6E6F74696679_00) so concurrent process boots serialise on the lock instead of racing the DDL. The bookkeeping table notify_schema_migrations is idempotent — boots after the first see it and skip applied versions.

Migrations are append-only. A new schema change becomes a new migration{} entry at the end; shipped versions never get edited. The advisory lock key is unlikely to collide ("notify\0" in hex), but if another driver in the same database picks the same key its migrations would block on ours.

Connection pool

  • MaxConns = 25 by default — matches identity's DefaultMaxConns.
  • MaxConnLifetime = 1h, MaxConnIdleTime = 30m — cooperate with pgbouncer / Azure idle reapers.

Conformance results

24/24 pass under -race. Total wall time after the testcontainers Postgres warm-up is ~3.3s on Apple Silicon. See store/postgres/CONFORMANCE.md for the per-subtest table and design notes.

Running the suite locally

# Requires Docker (the suite uses testcontainers-go).
go test ./store/postgres/... -race -count=1 -v

The first run pulls postgres:16.13-alpine3.23; subsequent runs reuse the layer cache.

Related