Last updated 2026-05-28

In-App channel

In-app delivery is a real-time push to any of the user's currently open client connections via the NotificationClientService.StreamEvents server-stream. No provider configuration is needed — when the live-connections subsystem is on, the in-app provider is auto-registered.

When you'd use this

Anything you want to surface immediately in the active session: a new chat message, an updated task list, a "your colleague joined the document" presence event. The same rows are stored for catch-up via GetNotifications, so a user who comes back online sees the missed traffic.

How it works

The orchestrator's in-app provider holds a reference to the realtime Registry:

  1. A producer calls Notify with DELIVERY_CHANNEL_IN_APP.
  2. The orchestrator stores the row (idempotent on the standard key).
  3. It calls the in-app provider's Send.
  4. The provider pushes the rendered StreamEvent to every live connection for the recipient via Registry.Push.
  5. If 1+ connections accepted the event → StatusDelivered. If 0 connections (user offline) → StatusPending.

Pending vs Delivered for offline users

Why this design. An in-app row to a user with no live connections gets StatusPendingnot a lying StatusDelivered. The status field is used downstream (unread filter, read-receipt analytics, dashboards) and it would silently mislead consumers if "pushed to nobody" looked the same as "successfully delivered". The row remains available via GetNotifications for catch-up; when the user reconnects they see it via the regular inbox flow.

The wire shape

StreamEvents is a server-streaming RPC that multiplexes three event kinds plus heartbeats over one connection:

message StreamEvent {
string session_id = 1;
oneof event {
NotificationEvent notification = 2;
DataChangeEvent data_change = 3;
HeartbeatEvent heartbeat = 4;
}
}
  • notification — the platform delivered a new notification row to this user.
  • data_change — an opaque "something upstream that you care about may have changed" hint. The client re-fetches via its own API. Carries an idempotency_key for at-least-once retry.
  • heartbeat — keepalive carrying the server clock. First heartbeat after handshake carries the assigned session_id so the client can target AckDataChange at this exact stream.

Lifecycle

  1. Client opens the stream with Authorization: Bearer <JWT> and a DeviceType.
  2. Server authenticates, allocates a Conn, registers it under the user.
  3. Server immediately sends one handshake HeartbeatEvent carrying the assigned session_id.
  4. Server ticks heartbeats on NOTIFY_LIVE_HEARTBEAT_INTERVAL (default 30s); the in-app provider pushes notifications + data-change events as they arrive.
  5. Client cancellation, network drop, or server shutdown ends the loop; the connection is unregistered before the handler returns.

Configuration

-e NOTIFY_LIVE_CONNECTIONS_ENABLED=true # default true
-e NOTIFY_LIVE_HEARTBEAT_INTERVAL=30s
-e NOTIFY_LIVE_RETRY_MAX_ATTEMPTS=3 # at-least-once budget for data-change events
-e NOTIFY_LIVE_RETRY_INTERVAL=5s
-e NOTIFY_ALLOWED_ORIGINS=https://app.example.com

Setting NOTIFY_LIVE_CONNECTIONS_ENABLED=false disables the subsystem entirely. StreamEvents returns CodeUnimplemented, the in-app provider is not registered, and in-app rows stay StatusPending for inbox-only consumption.

Browser client (Connect-Web)

stream.ts
import { createConnectTransport } from "@connectrpc/connect-web";
import { createClient } from "@connectrpc/connect";
import { NotificationClientService } from "./gen/notify/v1/notify_pb";
const transport = createConnectTransport({
baseUrl: "https://notify.example.com",
interceptors: [
(next) => async (req) => {
req.header.set("Authorization", `Bearer ${getAccessToken()}`);
return next(req);
},
],
});
const client = createClient(NotificationClientService, transport);
let sessionId = "";
for await (const ev of client.streamEvents({ deviceType: "DEVICE_TYPE_BROWSER" })) {
sessionId = ev.sessionId || sessionId;
switch (ev.event.case) {
case "notification": {
const n = ev.event.value.notification!;
console.log("notification:", n.title, "—", n.body);
showInUI(n);
break;
}
case "dataChange": {
const dc = ev.event.value;
console.log("data change:", dc.subjectRef);
await refreshUpstream(dc.subjectRef);
// ACK so the server stops retrying.
await client.ackDataChange({
idempotencyKey: dc.idempotencyKey,
sessionId,
});
break;
}
case "heartbeat":
console.log("heartbeat @", ev.event.value.timestampMs);
break;
}
}

Reconnect strategy

Streams die for legitimate reasons: laptop sleep, server rolling deploy, mobile carrier handover. The client should reconnect with exponential backoff and a cap:

async function connectWithRetry() {
let backoffMs = 500;
const maxMs = 30_000;
for (;;) {
try {
// streamEvents() throws on any stream-level error.
for await (const ev of client.streamEvents({ deviceType: "DEVICE_TYPE_BROWSER" })) {
handle(ev);
backoffMs = 500; // reset on first successful event
}
} catch (err) {
console.warn("stream closed:", err);
await new Promise((r) => setTimeout(r, backoffMs + Math.random() * 250));
backoffMs = Math.min(backoffMs * 2, maxMs);
}
}
}

Reconnecting is safe: any in-flight events the client missed are still in the store and available via GetNotifications. Data-change events the client did not ack will be retried by the server's RetryTracker until either the ack arrives or NOTIFY_LIVE_RETRY_MAX_ATTEMPTS is exhausted.

Sending an in-app notification

No address required — the in-app channel's destination is the user id itself.

curl -sX POST http://notify:8081/elloloop.notify.v1.NotificationInternalService/Notify \
-H 'Content-Type: application/json' \
-H "X-Notify-Internal-Token: $NOTIFY_INTERNAL_TOKEN" \
-d '{
"tenantId": "acme",
"notificationId": "task-42-comment-7",
"userIds": ["user-alice"],
"channels": ["DELIVERY_CHANNEL_IN_APP"],
"subjectRef": "task:42",
"subjectType": "task",
"title": "New comment",
"body": "Bob commented on Task 42"
}'

Backpressure

Each Conn has a buffered EventCh (default 64 deep). Registry.Push does a non-blocking send; a connection whose buffer is full has the event dropped and logged (event_queue_full). It stays registered for future events — one slow client never blocks the producer or the rest of the recipients.

The buffer size is currently fixed at DefaultEventBuffer = 64. If a real client genuinely needs deeper queueing, it's a follow-up wave; for now the right move is to keep the client UI thread non-blocking on event arrival.

Related