Last updated 2026-05-28

Realtime Engine

The realtime package is the in-memory engine that powers in-app delivery: a per-user connection registry, a buffered per-conn channel, and an at-least-once retry tracker. It is generic over the event payload (Registry[T any]), so it has zero dependency on the notify proto or any transport — the server instantiates Registry[*pb.StreamEvent] at boot.

When you'd care

You only need to know this exists if you are:

  • Operating the container and turning the live-connections subsystem on or off (NOTIFY_LIVE_CONNECTIONS_ENABLED).
  • Building a custom client and wondering why a backed-up consumer doesn't block the producer.
  • Implementing a different transport (raw SSE, WebSocket) on top of the library.
  • Reading the codebase and trying to understand where state goes.

The three types

// One live client connection. EventCh is buffered (default 64) and
// NEVER closed: closing would race concurrent non-blocking senders;
// once unregistered + handler returns, the channel is unreachable and
// reclaimed by the GC.
type Conn[T any] struct {
ID string
UserID string
TenantID string
DeviceType string
EventCh chan T
CreatedAt time.Time
}
// Per-user index of live connections. Lock-free reads via RLock; writes
// take the full lock. Push does non-blocking sends — a full buffer is
// logged and the event dropped, but the connection stays registered.
type Registry[T any] struct{ /* … */ }
// At-least-once retry budget. Track(parent, key, []connID, callback)
// runs the callback on a fixed interval per (key, connID) until
// either Ack is called or attempt budget is exhausted.
type RetryTracker struct{ /* … */ }

What hangs off it

The server constructs one Registry shared by every connection, and one RetryTracker shared by every in-flight retry. StreamEvents creates a fresh Conn on each open, registers it, and tears it down on exit. The in-app provider's Send looks up live connections for the target user via Registry.Push and returns StatusDelivered if any accepted the event, StatusPending if none did.

Non-blocking by design

Why this design. A stalled client must never block the producer. Every Push is a non-blocking select { case ch <- ev: ... default: ... }; a connection whose buffer is full has the event dropped (logged via event_queue_full) but stays registered for future events. The alternative — backing up the orchestrator behind one slow client — would let one misbehaving recipient stall delivery for everyone else on the same notifier.

EventCh is buffered AND never closed

This is a subtle invariant documented inline in realtime/conn.go:

  • Buffered — so non-blocking sends have somewhere to go and producers don't stall.
  • Never closed — because closing a channel that goroutines are racing to send on panics. The lifecycle ends by unregistering the Conn and letting the handler return; the GC reclaims the channel.

RetryTracker: at-least-once for data-change events

Data-change hints (DataChangeEvent) are best-effort hints that something the client cares about may have changed upstream. The client re-fetches its own state via its own API. For these hints, the platform offers at-least-once via the RetryTracker:

  1. Server emits a DataChangeEvent with an idempotency_key.
  2. RetryTracker.Track(ctx, key, []connID, sendCallback) spawns a goroutine per (key, connID) that re-sends on NOTIFY_LIVE_RETRY_INTERVAL ticks.
  3. Client acks via AckDataChange{idempotency_key, session_id}.
  4. The tracker cancels the retry goroutine.

Budget is bounded: NOTIFY_LIVE_RETRY_MAX_ATTEMPTS (default 3) caps retries; a value of 0 disables retries entirely. On shutdown CancelAll closes every in-flight retry so they don't leak.

Turning the subsystem off

Set NOTIFY_LIVE_CONNECTIONS_ENABLED=false. The container never constructs a Registry, the in-app provider is not registered, and StreamEvents returns CodeUnimplemented. In-app notifications remain StatusPending; clients catch up via GetNotifications.

This is a legitimate deployment shape when you only care about the durable side of the platform — e.g. a backend running notify in library mode purely for stored notification fanout, with the live surface served by a separate gateway.

Related