Last updated 2026-05-28
Realtime Engine
The realtime package is the in-memory engine that powers
in-app delivery: a per-user connection registry, a buffered per-conn
channel, and an at-least-once retry tracker. It is generic over the
event payload (Registry[T any]), so it has zero
dependency on the notify proto or any transport — the server
instantiates Registry[*pb.StreamEvent] at boot.
When you'd care
You only need to know this exists if you are:
- Operating the container and turning the live-connections subsystem on or off (
NOTIFY_LIVE_CONNECTIONS_ENABLED). - Building a custom client and wondering why a backed-up consumer doesn't block the producer.
- Implementing a different transport (raw SSE, WebSocket) on top of the library.
- Reading the codebase and trying to understand where state goes.
The three types
// One live client connection. EventCh is buffered (default 64) and// NEVER closed: closing would race concurrent non-blocking senders;// once unregistered + handler returns, the channel is unreachable and// reclaimed by the GC.type Conn[T any] struct { ID string UserID string TenantID string DeviceType string EventCh chan T CreatedAt time.Time}
// Per-user index of live connections. Lock-free reads via RLock; writes// take the full lock. Push does non-blocking sends — a full buffer is// logged and the event dropped, but the connection stays registered.type Registry[T any] struct{ /* … */ }
// At-least-once retry budget. Track(parent, key, []connID, callback)// runs the callback on a fixed interval per (key, connID) until// either Ack is called or attempt budget is exhausted.type RetryTracker struct{ /* … */ }What hangs off it
The server constructs one Registry shared by every
connection, and one RetryTracker shared by every
in-flight retry. StreamEvents creates a fresh
Conn on each open, registers it, and tears it down on
exit. The in-app provider's Send looks up live
connections for the target user via Registry.Push and
returns StatusDelivered if any accepted the event,
StatusPending if none did.
Non-blocking by design
Why this design. A stalled client must never block the producer. EveryPushis a non-blockingselect { case ch <- ev: ... default: ... }; a connection whose buffer is full has the event dropped (logged viaevent_queue_full) but stays registered for future events. The alternative — backing up the orchestrator behind one slow client — would let one misbehaving recipient stall delivery for everyone else on the same notifier.
EventCh is buffered AND never closed
This is a subtle invariant documented inline in
realtime/conn.go:
- Buffered — so non-blocking sends have somewhere to go and producers don't stall.
- Never closed — because closing a channel that goroutines are racing to send on panics. The lifecycle ends by unregistering the Conn and letting the handler return; the GC reclaims the channel.
RetryTracker: at-least-once for data-change events
Data-change hints (DataChangeEvent) are best-effort hints
that something the client cares about may have changed upstream.
The client re-fetches its own state via its own API. For these
hints, the platform offers at-least-once via the
RetryTracker:
- Server emits a
DataChangeEventwith anidempotency_key. RetryTracker.Track(ctx, key, []connID, sendCallback)spawns a goroutine per (key, connID) that re-sends onNOTIFY_LIVE_RETRY_INTERVALticks.- Client acks via
AckDataChange{idempotency_key, session_id}. - The tracker cancels the retry goroutine.
Budget is bounded: NOTIFY_LIVE_RETRY_MAX_ATTEMPTS (default
3) caps retries; a value of 0 disables retries entirely. On shutdown
CancelAll closes every in-flight retry so they don't
leak.
Turning the subsystem off
Set NOTIFY_LIVE_CONNECTIONS_ENABLED=false. The container
never constructs a Registry, the in-app provider is not registered,
and StreamEvents returns
CodeUnimplemented. In-app notifications remain
StatusPending; clients catch up via
GetNotifications.
This is a legitimate deployment shape when you only care about the durable side of the platform — e.g. a backend running notify in library mode purely for stored notification fanout, with the live surface served by a separate gateway.
Related
- In-App channel — the wire view
- Subscribe over SSE — full client flow
- Graceful shutdown — retry-tracker drain