Skip to main content

Realtime

New

Real-time pub/sub messaging with SSE delivery, durable message history, and automatic reconnection. Zero message loss guaranteed.

SSE Delivery

Serverless-compatible streaming via Server-Sent Events. Works on Vercel, Cloudflare, and any edge runtime.

Message History

Durable message storage with stream persistence. Replay missed messages on reconnect with zero loss.

Auto-Reconnect

Exponential backoff reconnection with automatic state recovery and message deduplication.

Multi-Channel

Subscribe to multiple channels simultaneously with a single hook. Per-channel status tracking.

Overview

The Realtime service provides real-time pub/sub messaging powered by durable streams and Server-Sent Events. Every message is durably stored via XADD and instantly pushed via PUBLISH, giving you both persistent history and sub-millisecond delivery. SSE transport means it works everywhere serverless runs — no WebSocket limitations, no sticky sessions, no special infrastructure.

Subscribe-First, Replay-Second

The connection establishes a Pub/Sub subscription before replaying history. Messages arriving during replay are buffered and deduplicated, guaranteeing zero message loss across the transition from history to live mode.

Heartbeat and Duration

Connections send a heartbeat every 60 seconds and have a maximum duration of 300 seconds. When the connection expires, a reconnect signal is sent and the client automatically re-establishes the stream with no message loss.

Quick Start

Emit and subscribe to real-time events in your application:

import { createStreams } from '@sylphx/sdk/server'

const streams = createStreams()

// Emit an event to a channel
const id = await streams.emit('chat:room-1', 'message', {
  text: 'Hello!',
  userId: '123',
})

// Get message history
const messages = await streams.history('chat:room-1', {
  limit: 50,
})

console.log(`Published message ${id}`)
console.log(`History: ${messages.length} messages`)

Emitting Events

Events are published to channels with a type and arbitrary JSON data. Each emit performs two operations: XADD for durable storage and PUBLISH for instant delivery to all subscribers.

import { createStreams } from '@sylphx/sdk/server'

const streams = createStreams()

// Emit returns the stream entry ID (timestamp-based)
const id = await streams.emit('orders:updates', 'status_changed', {
  orderId: 'ord_123',
  status: 'shipped',
  trackingNumber: 'TRK-456',
})

// Emit from a server action
async function sendMessage(roomId: string, text: string) {
  'use server'
  const streams = createStreams()
  return streams.emit(`chat:${roomId}`, 'message', {
    text,
    userId: getCurrentUserId(),
    timestamp: Date.now(),
  })
}

Automatic ID Generation

Stream entry IDs are automatically generated using the format timestamp-sequence (e.g., 1706886400000-0). IDs are globally ordered and guarantee chronological message ordering within a channel.

Message History

All emitted messages are durably stored in persistent streams. Retrieve history by channel with optional range and limit parameters:

import { createStreams } from '@sylphx/sdk/server'

const streams = createStreams()

// Get the last 50 messages
const recent = await streams.history('chat:room-1', {
  limit: 50,
})

// Get messages after a specific ID (for pagination)
const older = await streams.history('chat:room-1', {
  start: '1706886400000-0',
  limit: 100,
})

// Get messages in a time range
const range = await streams.history('chat:room-1', {
  start: '1706886400000-0',
  end: '1706890000000-0',
  limit: 200,
})

// Each message has the StreamMessage shape
for (const msg of recent) {
  console.log(msg.id)        // "1706886400123-0"
  console.log(msg.event)     // "message"
  console.log(msg.channel)   // "chat:room-1"
  console.log(msg.data)      // { text: "Hello!", userId: "123" }
  console.log(msg.timestamp) // 1706886400123
}

Client-Side History

The useRealtime hook automatically fetches history on connect when you pass the history option. You do not need to call the history API manually in React components.
PropertyTypeDescription
channelrequiredstringChannel name to fetch history for
startstring= "-"Start ID for range query. Use "-" for oldest.
endstring= "+"End ID for range query. Use "+" for newest.
limitnumber= 100Maximum messages to return (max 1000)

Connection Management

The useRealtime hook provides full control over connection lifecycle and state:

'use client'

import { useRealtime } from '@sylphx/sdk/react'
import type { RealtimeStatus } from '@sylphx/sdk/react'

function LiveDashboard() {
  const {
    messages,
    status,    // 'connecting' | 'connected' | 'reconnecting' | 'disconnected'
    connect,   // Manually connect
    disconnect,// Manually disconnect
    clear,     // Clear local message buffer
    emit,      // Send events
  } = useRealtime('dashboard:metrics', {
    enabled: true,  // Auto-connect on mount (default: true)
    onConnect: (channel) => {
      console.log(`Connected to ${channel}`)
    },
    onReconnect: () => {
      console.log('Reconnecting...')
    },
    onError: (error) => {
      console.error('Connection error:', error)
    },
  })

  // Status indicator
  const statusColor: Record<RealtimeStatus, string> = {
    connected: 'bg-green-500',
    connecting: 'bg-yellow-500',
    reconnecting: 'bg-yellow-500',
    disconnected: 'bg-red-500',
  }

  return (
    <div>
      <div className="flex items-center gap-2">
        <div className={`w-2 h-2 rounded-full ${statusColor[status]}`} />
        <span>{status}</span>
      </div>

      <button onClick={disconnect}>Pause</button>
      <button onClick={connect}>Resume</button>
      <button onClick={clear}>Clear Messages</button>

      <div>{messages.length} messages</div>
    </div>
  )
}
StatusDescription
connectingInitial connection being established
connectedActive and receiving messages
reconnectingReconnecting after error or server-initiated reconnect signal
disconnectedNot connected. Call connect() to re-establish

Channel Helper

The server-side SDK provides a channel helper for scoping operations to a specific channel. This is convenient when you emit multiple events to the same channel:

import { createStreams } from '@sylphx/sdk/server'

const streams = createStreams()

// Create a channel helper — scopes emit() and history()
const room = streams.channel('chat:room-1')

// Emit without repeating the channel name
await room.emit('message', { text: 'Hello!', userId: '123' })
await room.emit('typing', { userId: '123', active: true })
await room.emit('reaction', { messageId: 'msg_1', emoji: '👍' })

// Get history for this channel
const history = await room.history({ limit: 100 })

// Compare with the top-level API
// These two calls are equivalent:
await streams.emit('chat:room-1', 'message', { text: 'Hi' })
await room.emit('message', { text: 'Hi' })

Singleton Pattern

Use getStreams() instead of createStreams() if you want a singleton client that reuses the same instance across your server code. Both read from environment variables by default.

Multi-Channel Subscriptions

Subscribe to multiple channels at once with useRealtimeChannels. Messages from all channels are combined in chronological order. The overall status reflects the aggregate state of all channel connections.

'use client'

import { useRealtimeChannels } from '@sylphx/sdk/react'

interface ChatMessage {
  text: string
  userId: string
}

function TeamChat({ teamRooms }: { teamRooms: string[] }) {
  const { messages, emit, status, clear } = useRealtimeChannels<ChatMessage>(
    teamRooms.map((room) => `chat:${room}`),
    {
      events: ['message'],
      history: 20,
      onMessage: (msg) => {
        // msg.channel tells you which channel it came from
        console.log(`[${msg.channel}] ${msg.data.text}`)
      },
    }
  )

  // Emit to the first channel by default
  const sendToGeneral = () => emit('message', { text: 'Hello team!', userId: 'me' })

  // Group messages by channel for display
  const byChannel = messages.reduce<Record<string, typeof messages>>(
    (acc, msg) => {
      const ch = msg.channel
      if (!acc[ch]) acc[ch] = []
      acc[ch].push(msg)
      return acc
    },
    {}
  )

  return (
    <div>
      <span>Status: {status}</span>
      {Object.entries(byChannel).map(([channel, msgs]) => (
        <div key={channel}>
          <h3>{channel}</h3>
          {msgs.map((msg) => (
            <p key={msg.id}>{msg.data.text}</p>
          ))}
        </div>
      ))}
    </div>
  )
}

Status Aggregation

The combined status follows priority rules: if any channel is connecting, the overall status is connecting. If any is reconnecting, the overall is reconnecting. It only shows connected when all channels are connected.

API Reference

Server-side API:

MethodDescription
streams.emit(channel, event, data)Emit an event to a channel. Returns the stream entry ID.
streams.history(channel, opts?)Get message history with optional range and limit.
streams.channel(name)Get a channel helper with scoped emit() and history().
createStreams(opts?)Create a new streams client. Reads SYLPHX_SECRET_KEY and SYLPHX_PLATFORM_URL from env.
getStreams()Get the singleton streams client. Creates one on first call.

React hooks:

HookDescription
useRealtime(channel, opts?)Subscribe to a single channel. Returns messages, status, emit, connect, disconnect, clear.
useRealtimeChannels(channels, opts?)Subscribe to multiple channels. Combined messages with per-channel tracking.

useRealtime options:

PropertyTypeDescription
eventsstring[]Filter to specific event types. If empty, receives all events.
historynumber | { start?: string; limit?: number }Number of historical messages to load on connect, or options object.
enabledboolean= trueWhether to auto-connect on mount.
onConnect(channel: string) => voidCallback when connected to the channel.
onMessage(message: StreamMessage<T>) => voidCallback for each incoming message.
onReconnect() => voidCallback when the connection is re-establishing.
onError(error: Error) => voidCallback on connection error.
platformUrlstringOverride the platform URL (uses provider config by default).

StreamMessage<T> shape:

PropertyTypeDescription
idrequiredstringStream entry ID (e.g., "1706886400000-0")
eventrequiredstringEvent type (e.g., "message", "typing")
channelrequiredstringChannel the message was sent to
datarequiredTEvent payload (your custom data)
timestampnumberUnix timestamp in milliseconds

Best Practices

Use Hierarchical Channel Names

Name channels with colon-separated namespaces: chat:room-1, orders:user-123, dashboard:metrics. This makes filtering and debugging straightforward.

Handle Reconnection Gracefully

The hook reconnects automatically with exponential backoff. Use the onReconnect callback to show a UI indicator, and trust that messages will not be lost during the transition.

Keep Messages Small

Send references (IDs, URLs) instead of large payloads. For example, emit a document ID and fetch the full document separately. This keeps your streams fast and your history compact.

Use Event Filtering

Pass the events option to receive only the event types your component needs. This reduces unnecessary renders and keeps your message buffer focused.

Clear Stale Messages

Call clear() when navigating away from a view or when messages become irrelevant. This prevents memory buildup in long-running sessions.

Leverage the Channel Helper

On the server, use streams.channel(name) when emitting multiple events to the same channel. It reduces repetition and makes your intent clearer.