Last updated 2026-05-28

Subscribe over SSE

Open a long-lived stream of new notifications, data-change hints, and heartbeats from a browser or mobile client. The platform speaks Connect over HTTP/2; StreamEvents is a server-streaming RPC, so it works equally over plain SSE-style framing.

When you'd use this

Any client surface that wants real-time updates: a web SPA showing chat messages without a refresh, a mobile app updating a task list while the user has it open, a desktop tray app surfacing system notifications.

Recipe — browser (Connect-Web)

shared/notify-client.ts
// shared/notify-client.ts
import { createConnectTransport } from "@connectrpc/connect-web";
import { createClient } from "@connectrpc/connect";
import { NotificationClientService } from "./gen/notify/v1/notify_pb";
export function buildNotifyClient(baseUrl: string, getToken: () => Promise<string>) {
const transport = createConnectTransport({
baseUrl,
interceptors: [
(next) => async (req) => {
const token = await getToken();
req.header.set("Authorization", `Bearer ${token}`);
return next(req);
},
],
});
return createClient(NotificationClientService, transport);
}
features/notifications/stream.ts
// features/notifications/stream.ts
import { buildNotifyClient } from "../../shared/notify-client";
type NotifyEvent =
| { kind: "notification"; id: string; title: string; body: string; subjectRef: string }
| { kind: "dataChange"; idempotencyKey: string; subjectRef: string; subjectType: string }
| { kind: "heartbeat"; timestampMs: bigint };
export interface Stream {
cancel(): void;
sessionId(): string;
}
export function openStream(opts: {
baseUrl: string;
getToken: () => Promise<string>;
onEvent: (ev: NotifyEvent) => void;
onError?: (err: unknown) => void;
}): Stream {
const client = buildNotifyClient(opts.baseUrl, opts.getToken);
const abort = new AbortController();
let currentSession = "";
void (async () => {
let backoffMs = 500;
const maxMs = 30_000;
while (!abort.signal.aborted) {
try {
const it = client.streamEvents(
{ deviceType: "DEVICE_TYPE_BROWSER" },
{ signal: abort.signal },
);
for await (const ev of it) {
currentSession = ev.sessionId || currentSession;
backoffMs = 500; // reset on the first received event
switch (ev.event.case) {
case "notification": {
const n = ev.event.value.notification!;
opts.onEvent({
kind: "notification",
id: n.id,
title: n.title,
body: n.body,
subjectRef: n.subjectRef,
});
break;
}
case "dataChange": {
const dc = ev.event.value;
opts.onEvent({
kind: "dataChange",
idempotencyKey: dc.idempotencyKey,
subjectRef: dc.subjectRef,
subjectType: dc.subjectType,
});
// Ack so the server stops retrying.
await client.ackDataChange({
idempotencyKey: dc.idempotencyKey,
sessionId: currentSession,
});
break;
}
case "heartbeat":
opts.onEvent({
kind: "heartbeat",
timestampMs: ev.event.value.timestampMs,
});
break;
}
}
} catch (err) {
if (abort.signal.aborted) return;
opts.onError?.(err);
// Exponential backoff with jitter and a cap.
await new Promise((r) => setTimeout(r, backoffMs + Math.random() * 250));
backoffMs = Math.min(backoffMs * 2, maxMs);
}
}
})();
return {
cancel: () => abort.abort(),
sessionId: () => currentSession,
};
}
app/page.tsx
// app/page.tsx (React-style usage)
import { useEffect, useState } from "react";
import { openStream } from "./features/notifications/stream";
export function NotificationBell() {
const [count, setCount] = useState(0);
useEffect(() => {
const stream = openStream({
baseUrl: "https://notify.example.com",
getToken: async () => localStorage.getItem("access_token") || "",
onEvent: (ev) => {
if (ev.kind === "notification") {
setCount((c) => c + 1);
}
},
onError: (err) => console.warn("stream error:", err),
});
return () => stream.cancel();
}, []);
return <div className="bell">🔔 {count}</div>;
}

The handshake heartbeat

The server immediately sends one heartbeat after registering your connection, before any traffic. That heartbeat carries the server-assigned session_id, which is what you pass back on AckDataChange to scope the cancellation to this exact stream. The code above stores it in currentSession.

Reconnect strategy

Streams die for legitimate reasons: laptop sleep, server rolling deploy, mobile carrier handover, network blip. The code above reconnects with exponential backoff (500ms → 1s → 2s → ... → 30s) with a 0–250ms jitter so a herd of clients does not synchronise its reconnects.

Reconnecting is safe: any in-flight events the client missed are still in the store and available via GetNotifications. Data-change events the client did not ack will be retried by the server's RetryTracker until either the ack arrives or NOTIFY_LIVE_RETRY_MAX_ATTEMPTS is exhausted.

Cold-start catch-up

A typical UI fetches the inbox once on app load to populate the list, then opens the stream for live updates:

// On mount: cold-fetch.
const { notifications, nextCursor, unreadCount } =
await client.getNotifications({ limit: 20 });
// Then open the stream and merge incoming.
openStream({ /* ... */ });

Recipe — raw cURL (for debugging only)

Useful for confirming that the stream works at all. Connect over HTTP/2 uses length-prefixed JSON frames, not text/event-stream, so cURL output is not human-friendly. Use --no-buffer and expect binary framing.

curl -N --no-buffer \
-X POST http://localhost:8080/elloloop.notify.v1.NotificationClientService/StreamEvents \
-H "Authorization: Bearer dev:user-alice:acme" \
-H "Content-Type: application/connect+json" \
-d '{"deviceType":"DEVICE_TYPE_BROWSER"}'

For a readable debug stream, use the Connect-Go client in a small Go test harness, or the connect-web client in node --experimental-fetch.

Authorization on streams

The JWT is validated at handshake time and the resulting claims live on the stream's context for its entire lifetime. The platform does not re-validate mid-stream. If a token's exp is reached during a long-lived stream, the existing stream keeps flowing until disconnect; the client must mint a fresh token before its next reconnect.

Related