SSE Delivery
Serverless-compatible streaming via Server-Sent Events. Works on Vercel, Cloudflare, and any edge runtime.
Message History
Durable message storage with stream persistence. Replay missed messages on reconnect with zero loss.
Auto-Reconnect
Exponential backoff reconnection with automatic state recovery and message deduplication.
Multi-Channel
Subscribe to multiple channels simultaneously with a single hook. Per-channel status tracking.
Overview
The Realtime service provides real-time pub/sub messaging powered by durable streams and Server-Sent Events. Every message is durably stored via XADD and instantly pushed via PUBLISH, giving you both persistent history and sub-millisecond delivery. SSE transport means it works everywhere serverless runs — no WebSocket limitations, no sticky sessions, no special infrastructure.
Subscribe-First, Replay-Second
The connection establishes a Pub/Sub subscription before replaying history. Messages arriving during replay are buffered and deduplicated, guaranteeing zero message loss across the transition from history to live mode.
Heartbeat and Duration
Connections send a heartbeat every 60 seconds and have a maximum duration of 300 seconds. When the connection expires, a reconnect signal is sent and the client automatically re-establishes the stream with no message loss.
Quick Start
Emit and subscribe to real-time events in your application:
import { createStreams } from '@sylphx/sdk/server'
const streams = createStreams()
// Emit an event to a channel
const id = await streams.emit('chat:room-1', 'message', {
text: 'Hello!',
userId: '123',
})
// Get message history
const messages = await streams.history('chat:room-1', {
limit: 50,
})
console.log(`Published message ${id}`)
console.log(`History: ${messages.length} messages`)Emitting Events
Events are published to channels with a type and arbitrary JSON data. Each emit performs two operations: XADD for durable storage and PUBLISH for instant delivery to all subscribers.
import { createStreams } from '@sylphx/sdk/server'
const streams = createStreams()
// Emit returns the stream entry ID (timestamp-based)
const id = await streams.emit('orders:updates', 'status_changed', {
orderId: 'ord_123',
status: 'shipped',
trackingNumber: 'TRK-456',
})
// Emit from a server action
async function sendMessage(roomId: string, text: string) {
'use server'
const streams = createStreams()
return streams.emit(`chat:${roomId}`, 'message', {
text,
userId: getCurrentUserId(),
timestamp: Date.now(),
})
}Automatic ID Generation
timestamp-sequence (e.g., 1706886400000-0). IDs are globally ordered and guarantee chronological message ordering within a channel.Message History
All emitted messages are durably stored in persistent streams. Retrieve history by channel with optional range and limit parameters:
import { createStreams } from '@sylphx/sdk/server'
const streams = createStreams()
// Get the last 50 messages
const recent = await streams.history('chat:room-1', {
limit: 50,
})
// Get messages after a specific ID (for pagination)
const older = await streams.history('chat:room-1', {
start: '1706886400000-0',
limit: 100,
})
// Get messages in a time range
const range = await streams.history('chat:room-1', {
start: '1706886400000-0',
end: '1706890000000-0',
limit: 200,
})
// Each message has the StreamMessage shape
for (const msg of recent) {
console.log(msg.id) // "1706886400123-0"
console.log(msg.event) // "message"
console.log(msg.channel) // "chat:room-1"
console.log(msg.data) // { text: "Hello!", userId: "123" }
console.log(msg.timestamp) // 1706886400123
}Client-Side History
useRealtime hook automatically fetches history on connect when you pass the history option. You do not need to call the history API manually in React components.| Property | Type | Description |
|---|---|---|
channelrequired | string | Channel name to fetch history for |
start | string= "-" | Start ID for range query. Use "-" for oldest. |
end | string= "+" | End ID for range query. Use "+" for newest. |
limit | number= 100 | Maximum messages to return (max 1000) |
Connection Management
The useRealtime hook provides full control over connection lifecycle and state:
'use client'
import { useRealtime } from '@sylphx/sdk/react'
import type { RealtimeStatus } from '@sylphx/sdk/react'
function LiveDashboard() {
const {
messages,
status, // 'connecting' | 'connected' | 'reconnecting' | 'disconnected'
connect, // Manually connect
disconnect,// Manually disconnect
clear, // Clear local message buffer
emit, // Send events
} = useRealtime('dashboard:metrics', {
enabled: true, // Auto-connect on mount (default: true)
onConnect: (channel) => {
console.log(`Connected to ${channel}`)
},
onReconnect: () => {
console.log('Reconnecting...')
},
onError: (error) => {
console.error('Connection error:', error)
},
})
// Status indicator
const statusColor: Record<RealtimeStatus, string> = {
connected: 'bg-green-500',
connecting: 'bg-yellow-500',
reconnecting: 'bg-yellow-500',
disconnected: 'bg-red-500',
}
return (
<div>
<div className="flex items-center gap-2">
<div className={`w-2 h-2 rounded-full ${statusColor[status]}`} />
<span>{status}</span>
</div>
<button onClick={disconnect}>Pause</button>
<button onClick={connect}>Resume</button>
<button onClick={clear}>Clear Messages</button>
<div>{messages.length} messages</div>
</div>
)
}| Status | Description |
|---|---|
| connecting | Initial connection being established |
| connected | Active and receiving messages |
| reconnecting | Reconnecting after error or server-initiated reconnect signal |
| disconnected | Not connected. Call connect() to re-establish |
Channel Helper
The server-side SDK provides a channel helper for scoping operations to a specific channel. This is convenient when you emit multiple events to the same channel:
import { createStreams } from '@sylphx/sdk/server'
const streams = createStreams()
// Create a channel helper — scopes emit() and history()
const room = streams.channel('chat:room-1')
// Emit without repeating the channel name
await room.emit('message', { text: 'Hello!', userId: '123' })
await room.emit('typing', { userId: '123', active: true })
await room.emit('reaction', { messageId: 'msg_1', emoji: '👍' })
// Get history for this channel
const history = await room.history({ limit: 100 })
// Compare with the top-level API
// These two calls are equivalent:
await streams.emit('chat:room-1', 'message', { text: 'Hi' })
await room.emit('message', { text: 'Hi' })Singleton Pattern
getStreams() instead of createStreams() if you want a singleton client that reuses the same instance across your server code. Both read from environment variables by default.Multi-Channel Subscriptions
Subscribe to multiple channels at once with useRealtimeChannels. Messages from all channels are combined in chronological order. The overall status reflects the aggregate state of all channel connections.
'use client'
import { useRealtimeChannels } from '@sylphx/sdk/react'
interface ChatMessage {
text: string
userId: string
}
function TeamChat({ teamRooms }: { teamRooms: string[] }) {
const { messages, emit, status, clear } = useRealtimeChannels<ChatMessage>(
teamRooms.map((room) => `chat:${room}`),
{
events: ['message'],
history: 20,
onMessage: (msg) => {
// msg.channel tells you which channel it came from
console.log(`[${msg.channel}] ${msg.data.text}`)
},
}
)
// Emit to the first channel by default
const sendToGeneral = () => emit('message', { text: 'Hello team!', userId: 'me' })
// Group messages by channel for display
const byChannel = messages.reduce<Record<string, typeof messages>>(
(acc, msg) => {
const ch = msg.channel
if (!acc[ch]) acc[ch] = []
acc[ch].push(msg)
return acc
},
{}
)
return (
<div>
<span>Status: {status}</span>
{Object.entries(byChannel).map(([channel, msgs]) => (
<div key={channel}>
<h3>{channel}</h3>
{msgs.map((msg) => (
<p key={msg.id}>{msg.data.text}</p>
))}
</div>
))}
</div>
)
}Status Aggregation
status follows priority rules: if any channel is connecting, the overall status is connecting. If any is reconnecting, the overall is reconnecting. It only shows connected when all channels are connected.API Reference
Server-side API:
| Method | Description |
|---|---|
| streams.emit(channel, event, data) | Emit an event to a channel. Returns the stream entry ID. |
| streams.history(channel, opts?) | Get message history with optional range and limit. |
| streams.channel(name) | Get a channel helper with scoped emit() and history(). |
| createStreams(opts?) | Create a new streams client. Reads SYLPHX_SECRET_KEY and SYLPHX_PLATFORM_URL from env. |
| getStreams() | Get the singleton streams client. Creates one on first call. |
React hooks:
| Hook | Description |
|---|---|
| useRealtime(channel, opts?) | Subscribe to a single channel. Returns messages, status, emit, connect, disconnect, clear. |
| useRealtimeChannels(channels, opts?) | Subscribe to multiple channels. Combined messages with per-channel tracking. |
useRealtime options:
| Property | Type | Description |
|---|---|---|
events | string[] | Filter to specific event types. If empty, receives all events. |
history | number | { start?: string; limit?: number } | Number of historical messages to load on connect, or options object. |
enabled | boolean= true | Whether to auto-connect on mount. |
onConnect | (channel: string) => void | Callback when connected to the channel. |
onMessage | (message: StreamMessage<T>) => void | Callback for each incoming message. |
onReconnect | () => void | Callback when the connection is re-establishing. |
onError | (error: Error) => void | Callback on connection error. |
platformUrl | string | Override the platform URL (uses provider config by default). |
StreamMessage<T> shape:
| Property | Type | Description |
|---|---|---|
idrequired | string | Stream entry ID (e.g., "1706886400000-0") |
eventrequired | string | Event type (e.g., "message", "typing") |
channelrequired | string | Channel the message was sent to |
datarequired | T | Event payload (your custom data) |
timestamp | number | Unix timestamp in milliseconds |
Best Practices
Use Hierarchical Channel Names
Name channels with colon-separated namespaces: chat:room-1, orders:user-123, dashboard:metrics. This makes filtering and debugging straightforward.
Handle Reconnection Gracefully
The hook reconnects automatically with exponential backoff. Use the onReconnect callback to show a UI indicator, and trust that messages will not be lost during the transition.
Keep Messages Small
Send references (IDs, URLs) instead of large payloads. For example, emit a document ID and fetch the full document separately. This keeps your streams fast and your history compact.
Use Event Filtering
Pass the events option to receive only the event types your component needs. This reduces unnecessary renders and keeps your message buffer focused.
Clear Stale Messages
Call clear() when navigating away from a view or when messages become irrelevant. This prevents memory buildup in long-running sessions.
Leverage the Channel Helper
On the server, use streams.channel(name) when emitting multiple events to the same channel. It reduces repetition and makes your intent clearer.