Last updated 2026-05-28
Subscribe over SSE
Open a long-lived stream of new notifications, data-change hints,
and heartbeats from a browser or mobile client. The platform speaks
Connect over HTTP/2; StreamEvents is a server-streaming
RPC, so it works equally over plain SSE-style framing.
When you'd use this
Any client surface that wants real-time updates: a web SPA showing chat messages without a refresh, a mobile app updating a task list while the user has it open, a desktop tray app surfacing system notifications.
Recipe — browser (Connect-Web)
// shared/notify-client.tsimport { createConnectTransport } from "@connectrpc/connect-web";import { createClient } from "@connectrpc/connect";import { NotificationClientService } from "./gen/notify/v1/notify_pb";
export function buildNotifyClient(baseUrl: string, getToken: () => Promise<string>) { const transport = createConnectTransport({ baseUrl, interceptors: [ (next) => async (req) => { const token = await getToken(); req.header.set("Authorization", `Bearer ${token}`); return next(req); }, ], }); return createClient(NotificationClientService, transport);}// features/notifications/stream.tsimport { buildNotifyClient } from "../../shared/notify-client";
type NotifyEvent = | { kind: "notification"; id: string; title: string; body: string; subjectRef: string } | { kind: "dataChange"; idempotencyKey: string; subjectRef: string; subjectType: string } | { kind: "heartbeat"; timestampMs: bigint };
export interface Stream { cancel(): void; sessionId(): string;}
export function openStream(opts: { baseUrl: string; getToken: () => Promise<string>; onEvent: (ev: NotifyEvent) => void; onError?: (err: unknown) => void;}): Stream { const client = buildNotifyClient(opts.baseUrl, opts.getToken); const abort = new AbortController(); let currentSession = "";
void (async () => { let backoffMs = 500; const maxMs = 30_000;
while (!abort.signal.aborted) { try { const it = client.streamEvents( { deviceType: "DEVICE_TYPE_BROWSER" }, { signal: abort.signal }, ); for await (const ev of it) { currentSession = ev.sessionId || currentSession; backoffMs = 500; // reset on the first received event
switch (ev.event.case) { case "notification": { const n = ev.event.value.notification!; opts.onEvent({ kind: "notification", id: n.id, title: n.title, body: n.body, subjectRef: n.subjectRef, }); break; } case "dataChange": { const dc = ev.event.value; opts.onEvent({ kind: "dataChange", idempotencyKey: dc.idempotencyKey, subjectRef: dc.subjectRef, subjectType: dc.subjectType, }); // Ack so the server stops retrying. await client.ackDataChange({ idempotencyKey: dc.idempotencyKey, sessionId: currentSession, }); break; } case "heartbeat": opts.onEvent({ kind: "heartbeat", timestampMs: ev.event.value.timestampMs, }); break; } } } catch (err) { if (abort.signal.aborted) return; opts.onError?.(err); // Exponential backoff with jitter and a cap. await new Promise((r) => setTimeout(r, backoffMs + Math.random() * 250)); backoffMs = Math.min(backoffMs * 2, maxMs); } } })();
return { cancel: () => abort.abort(), sessionId: () => currentSession, };}// app/page.tsx (React-style usage)import { useEffect, useState } from "react";import { openStream } from "./features/notifications/stream";
export function NotificationBell() { const [count, setCount] = useState(0);
useEffect(() => { const stream = openStream({ baseUrl: "https://notify.example.com", getToken: async () => localStorage.getItem("access_token") || "", onEvent: (ev) => { if (ev.kind === "notification") { setCount((c) => c + 1); } }, onError: (err) => console.warn("stream error:", err), }); return () => stream.cancel(); }, []);
return <div className="bell">🔔 {count}</div>;}The handshake heartbeat
The server immediately sends one heartbeat after registering
your connection, before any traffic. That heartbeat carries the
server-assigned session_id, which is what you pass back
on AckDataChange to scope the cancellation to this
exact stream. The code above stores it in currentSession.
Reconnect strategy
Streams die for legitimate reasons: laptop sleep, server rolling deploy, mobile carrier handover, network blip. The code above reconnects with exponential backoff (500ms → 1s → 2s → ... → 30s) with a 0–250ms jitter so a herd of clients does not synchronise its reconnects.
Reconnecting is safe: any in-flight events the client missed are
still in the store and available via
GetNotifications. Data-change events the client did
not ack will be retried by the server's RetryTracker
until either the ack arrives or
NOTIFY_LIVE_RETRY_MAX_ATTEMPTS is exhausted.
Cold-start catch-up
A typical UI fetches the inbox once on app load to populate the list, then opens the stream for live updates:
// On mount: cold-fetch.const { notifications, nextCursor, unreadCount } = await client.getNotifications({ limit: 20 });
// Then open the stream and merge incoming.openStream({ /* ... */ });Recipe — raw cURL (for debugging only)
Useful for confirming that the stream works at all. Connect over
HTTP/2 uses length-prefixed JSON frames, not text/event-stream, so
cURL output is not human-friendly. Use --no-buffer and
expect binary framing.
curl -N --no-buffer \ -X POST http://localhost:8080/elloloop.notify.v1.NotificationClientService/StreamEvents \ -H "Authorization: Bearer dev:user-alice:acme" \ -H "Content-Type: application/connect+json" \ -d '{"deviceType":"DEVICE_TYPE_BROWSER"}'
For a readable debug stream, use the Connect-Go client in a small
Go test harness, or the connect-web client in
node --experimental-fetch.
Authorization on streams
The JWT is validated at handshake time and the resulting claims live
on the stream's context for its entire lifetime. The
platform does not re-validate mid-stream. If a token's
exp is reached during a long-lived stream, the existing
stream keeps flowing until disconnect; the client must mint a fresh
token before its next reconnect.