Last updated 2026-05-28
In-App channel
In-app delivery is a real-time push to any of the user's currently
open client connections via the
NotificationClientService.StreamEvents server-stream.
No provider configuration is needed — when the live-connections
subsystem is on, the in-app provider is auto-registered.
When you'd use this
Anything you want to surface immediately in the active session: a new
chat message, an updated task list, a "your colleague joined the
document" presence event. The same rows are stored for catch-up via
GetNotifications, so a user who comes back online sees
the missed traffic.
How it works
The orchestrator's in-app provider holds a reference to the
realtime Registry:
- A producer calls
NotifywithDELIVERY_CHANNEL_IN_APP. - The orchestrator stores the row (idempotent on the standard key).
- It calls the in-app provider's
Send. - The provider pushes the rendered
StreamEventto every live connection for the recipient viaRegistry.Push. - If 1+ connections accepted the event →
StatusDelivered. If 0 connections (user offline) →StatusPending.
Pending vs Delivered for offline users
Why this design. An in-app row to a user with no live connections getsStatusPending— not a lyingStatusDelivered. The status field is used downstream (unread filter, read-receipt analytics, dashboards) and it would silently mislead consumers if "pushed to nobody" looked the same as "successfully delivered". The row remains available viaGetNotificationsfor catch-up; when the user reconnects they see it via the regular inbox flow.
The wire shape
StreamEvents is a server-streaming RPC that multiplexes
three event kinds plus heartbeats over one connection:
message StreamEvent { string session_id = 1; oneof event { NotificationEvent notification = 2; DataChangeEvent data_change = 3; HeartbeatEvent heartbeat = 4; }}notification— the platform delivered a new notification row to this user.data_change— an opaque "something upstream that you care about may have changed" hint. The client re-fetches via its own API. Carries anidempotency_keyfor at-least-once retry.heartbeat— keepalive carrying the server clock. First heartbeat after handshake carries the assignedsession_idso the client can targetAckDataChangeat this exact stream.
Lifecycle
- Client opens the stream with
Authorization: Bearer <JWT>and aDeviceType. - Server authenticates, allocates a
Conn, registers it under the user. - Server immediately sends one handshake
HeartbeatEventcarrying the assignedsession_id. - Server ticks heartbeats on
NOTIFY_LIVE_HEARTBEAT_INTERVAL(default 30s); the in-app provider pushes notifications + data-change events as they arrive. - Client cancellation, network drop, or server shutdown ends the loop; the connection is unregistered before the handler returns.
Configuration
-e NOTIFY_LIVE_CONNECTIONS_ENABLED=true # default true-e NOTIFY_LIVE_HEARTBEAT_INTERVAL=30s-e NOTIFY_LIVE_RETRY_MAX_ATTEMPTS=3 # at-least-once budget for data-change events-e NOTIFY_LIVE_RETRY_INTERVAL=5s-e NOTIFY_ALLOWED_ORIGINS=https://app.example.com
Setting NOTIFY_LIVE_CONNECTIONS_ENABLED=false disables
the subsystem entirely. StreamEvents returns
CodeUnimplemented, the in-app provider is not
registered, and in-app rows stay StatusPending for
inbox-only consumption.
Browser client (Connect-Web)
import { createConnectTransport } from "@connectrpc/connect-web";import { createClient } from "@connectrpc/connect";import { NotificationClientService } from "./gen/notify/v1/notify_pb";
const transport = createConnectTransport({ baseUrl: "https://notify.example.com", interceptors: [ (next) => async (req) => { req.header.set("Authorization", `Bearer ${getAccessToken()}`); return next(req); }, ],});const client = createClient(NotificationClientService, transport);
let sessionId = "";
for await (const ev of client.streamEvents({ deviceType: "DEVICE_TYPE_BROWSER" })) { sessionId = ev.sessionId || sessionId;
switch (ev.event.case) { case "notification": { const n = ev.event.value.notification!; console.log("notification:", n.title, "—", n.body); showInUI(n); break; } case "dataChange": { const dc = ev.event.value; console.log("data change:", dc.subjectRef); await refreshUpstream(dc.subjectRef); // ACK so the server stops retrying. await client.ackDataChange({ idempotencyKey: dc.idempotencyKey, sessionId, }); break; } case "heartbeat": console.log("heartbeat @", ev.event.value.timestampMs); break; }}Reconnect strategy
Streams die for legitimate reasons: laptop sleep, server rolling deploy, mobile carrier handover. The client should reconnect with exponential backoff and a cap:
async function connectWithRetry() { let backoffMs = 500; const maxMs = 30_000;
for (;;) { try { // streamEvents() throws on any stream-level error. for await (const ev of client.streamEvents({ deviceType: "DEVICE_TYPE_BROWSER" })) { handle(ev); backoffMs = 500; // reset on first successful event } } catch (err) { console.warn("stream closed:", err); await new Promise((r) => setTimeout(r, backoffMs + Math.random() * 250)); backoffMs = Math.min(backoffMs * 2, maxMs); } }}
Reconnecting is safe: any in-flight events the client missed are
still in the store and available via
GetNotifications. Data-change events the client did not
ack will be retried by the server's RetryTracker until
either the ack arrives or
NOTIFY_LIVE_RETRY_MAX_ATTEMPTS is exhausted.
Sending an in-app notification
No address required — the in-app channel's destination is the user id itself.
curl -sX POST http://notify:8081/elloloop.notify.v1.NotificationInternalService/Notify \ -H 'Content-Type: application/json' \ -H "X-Notify-Internal-Token: $NOTIFY_INTERNAL_TOKEN" \ -d '{ "tenantId": "acme", "notificationId": "task-42-comment-7", "userIds": ["user-alice"], "channels": ["DELIVERY_CHANNEL_IN_APP"], "subjectRef": "task:42", "subjectType": "task", "title": "New comment", "body": "Bob commented on Task 42" }'Backpressure
Each Conn has a buffered EventCh (default
64 deep). Registry.Push does a non-blocking send; a
connection whose buffer is full has the event dropped and logged
(event_queue_full). It stays registered for future
events — one slow client never blocks the producer or the rest of
the recipients.
The buffer size is currently fixed at
DefaultEventBuffer = 64. If a real client genuinely
needs deeper queueing, it's a follow-up wave; for now the right move
is to keep the client UI thread non-blocking on event arrival.
Related
- Realtime Engine — Registry / RetryTracker internals
- Subscribe over SSE — full client example with reconnect
- Ack a notification — marking the row read