Last updated 2026-05-28
Send a Notification
Producer-side recipes for the Notify RPC. Same wire
contract from every language; only the SDK and the auth-header
plumbing differ.
When you'd use this
Any backend service that wants to push a notification to a user via notify. Webhook receivers, business-logic services, scheduled job runners — all of them call this RPC.
Recipe 1 — cURL
The simplest possible producer. Useful for one-off debugging, smoke tests against a fresh deployment, or generating fixtures.
curl -sX POST http://notify:8081/elloloop.notify.v1.NotificationInternalService/Notify \ -H 'Content-Type: application/json' \ -H "X-Notify-Internal-Token: $NOTIFY_INTERNAL_TOKEN" \ -d '{ "tenantId": "acme", "notificationId": "task-42-comment-7", "userIds": ["user-alice"], "channels": ["DELIVERY_CHANNEL_IN_APP", "DELIVERY_CHANNEL_EMAIL"], "subjectRef": "task:42", "subjectType": "task", "title": "New comment", "body": "Bob commented on Task 42", "addresses": { "user-alice": { "byChannel": { "email": "alice@example.com" } } } }'Response:
{"delivered": 1, "pending": 1, "failed": 0} delivered=1 = the email send succeeded.
pending=1 = the in-app row was stored, but no live
connection is open for user-alice; it will surface on
their next StreamEvents open via GetNotifications.
Recipe 2 — Go (Connect-Go)
Production-shape Go producer using the generated Connect client. This is what an api-gateway-style consumer looks like.
package mainapp
import ( "context" "errors" "fmt" "net/http" "os"
"connectrpc.com/connect"
notifyv1 "github.com/elloloop/notify/gen/go/notify/v1" "github.com/elloloop/notify/gen/go/notify/v1/notifyv1connect")
type NotifyClient struct { inner notifyv1connect.NotificationInternalServiceClient token string}
func NewNotifyClient(addr, token string) *NotifyClient { return &NotifyClient{ inner: notifyv1connect.NewNotificationInternalServiceClient( http.DefaultClient, addr, // e.g. "http://notify:8081" ), token: token, }}
func (c *NotifyClient) Notify(ctx context.Context, in *notifyv1.NotifyRequest) (*notifyv1.NotifyResponse, error) { req := connect.NewRequest(in) req.Header().Set("X-Notify-Internal-Token", c.token)
resp, err := c.inner.Notify(ctx, req) if err != nil { var ce *connect.Error if errors.As(err, &ce) { return nil, fmt.Errorf("notify: %s: %w", ce.Code(), ce) } return nil, fmt.Errorf("notify: %w", err) } return resp.Msg, nil}
// Usage from a handler.func sendTaskComment(ctx context.Context, client *NotifyClient, tenantID, recipient, taskID, body string) error { _, err := client.Notify(ctx, ¬ifyv1.NotifyRequest{ TenantId: tenantID, NotificationId: fmt.Sprintf("task-%s-comment-%d", taskID, time.Now().UnixMilli()), UserIds: []string{recipient}, Channels: []notifyv1.DeliveryChannel{ notifyv1.DeliveryChannel_DELIVERY_CHANNEL_IN_APP, notifyv1.DeliveryChannel_DELIVERY_CHANNEL_EMAIL, }, SubjectRef: "task:" + taskID, SubjectType: "task", Title: "New comment", Body: body, Addresses: map[string]*notifyv1.ChannelAddresses{ recipient: { ByChannel: map[string]string{ "email": lookupEmail(ctx, recipient), }, }, }, }) return err}
// Construct the client once at boot.var notifyClient = NewNotifyClient( os.Getenv("NOTIFY_INTERNAL_ADDR"), os.Getenv("NOTIFY_INTERNAL_TOKEN"),)Recipe 3 — Python (Connect-Python)
For services using Python. The same pattern works with the gRPC stubs — Connect's JSON-over-HTTP is just easier to debug.
# requires:# pip install httpx# pip install connectrpc # or your generated Connect stubsimport osimport timeimport httpxfrom notify.gen.notify.v1.notify_pb2 import ( NotifyRequest, ChannelAddresses, DELIVERY_CHANNEL_IN_APP, DELIVERY_CHANNEL_EMAIL,)from notify.gen.notify.v1.notify_pb2_connect import NotificationInternalServiceClient
class NotifyClient: def __init__(self, addr: str, token: str): self._http = httpx.Client(base_url=addr, timeout=10.0) self._token = token self._inner = NotificationInternalServiceClient(self._http)
def notify(self, *, tenant_id: str, notification_id: str, user_ids: list[str], channels: list[int], title: str, body: str, subject_ref: str = "", subject_type: str = "", addresses: dict[str, dict[str, str]] | None = None): req = NotifyRequest( tenant_id=tenant_id, notification_id=notification_id, user_ids=user_ids, channels=channels, subject_ref=subject_ref, subject_type=subject_type, title=title, body=body, addresses={ uid: ChannelAddresses(by_channel=mapping) for uid, mapping in (addresses or {}).items() }, ) return self._inner.Notify( req, headers={"X-Notify-Internal-Token": self._token}, )
client = NotifyClient( addr=os.environ["NOTIFY_INTERNAL_ADDR"], token=os.environ["NOTIFY_INTERNAL_TOKEN"],)
# Usage.resp = client.notify( tenant_id="acme", notification_id=f"task-42-comment-{int(time.time()*1000)}", user_ids=["user-alice"], channels=[DELIVERY_CHANNEL_IN_APP, DELIVERY_CHANNEL_EMAIL], title="New comment", body="Bob commented on Task 42", subject_ref="task:42", subject_type="task", addresses={ "user-alice": {"email": "alice@example.com"}, },)print(resp.delivered, resp.pending, resp.failed)Idempotency in practice
Pick a notification_id that is deterministic from the
triggering event. Two examples:
- Comment created event:
"task-{taskID}-comment-{commentID}". Webhook retries of the same comment produce the same id; notify de-dupes. - Daily summary cron:
"summary-{userID}-{YYYY-MM-DD}". A cron re-run within the same day produces the same id; notify de-dupes.
Avoid wall-clock-derived ids for retryable events — a slow retry crossing a millisecond boundary would create a duplicate. The one place wall-clock is fine is event-source ids that include their own monotonic offset (Kafka offsets, etc.).
Per-recipient overrides
The Addresses map is per-user. Mix-and-match
destinations across recipients in one call:
req := ¬ifyv1.NotifyRequest{ TenantId: "acme", NotificationId: "release-2026-05-27-001", UserIds: []string{"alice", "bob", "carol"}, Channels: []notifyv1.DeliveryChannel{notifyv1.DeliveryChannel_DELIVERY_CHANNEL_EMAIL}, Title: "Release 2026.05.27 is live", Body: "Changelog: ...", Addresses: map[string]*notifyv1.ChannelAddresses{ "alice": { ByChannel: map[string]string{"email": "alice@example.com"} }, "bob": { ByChannel: map[string]string{"email": "bob@example.com"} }, // carol has no address → her row is stored as Pending; no email sent. },}Empty channels list
Omit channels entirely (or pass an empty list) to fan
out to every channel that has both an active provider AND a
destination for the user. The orchestrator computes the effective
set at request time.