Use Upstash Realtime on the server to emit events, subscribe to channels, and retrieve message history.
Emit Events
Emit events from any server context:
import { realtime } from "@/lib/realtime"
export const POST = async () => {
await realtime.emit("notification.alert", "hello world!")
return new Response("OK")
}
Emit to specific channels:
import { realtime } from "@/lib/realtime"
export const POST = async () => {
const channel = realtime.channel("user-123")
await channel.emit("notification.alert", "hello world!")
return new Response("OK")
}
Subscribe to Events
Subscribe to events on a channel:
import { realtime } from "@/lib/realtime"
const unsubscribe = await realtime.channel("notifications").subscribe({
events: ["notification.alert"],
onData({ event, data, channel }) {
console.log("New notification:", data)
},
})
Subscribe to multiple events:
import { realtime } from "@/lib/realtime"
const unsubscribe = await realtime.channel("room-123").subscribe({
events: ["chat.message", "user.joined", "user.left"],
onData({ event, data, channel }) {
// 👇 data is automatically typed based on the event
if (event === "chat.message") console.log("New message:", data)
if (event === "user.joined") console.log("User joined:", data)
if (event === "user.left") console.log("User left:", data)
},
})
Unsubscribe
Clean up subscriptions when done:
import { realtime } from "@/lib/realtime"
const channel = realtime.channel("room-123")
const unsubscribe = await channel.subscribe({
events: ["chat.message"],
onData({ data }) {
console.log("Message:", data)
},
})
unsubscribe()
// or: channel.unsubscribe()
Retrieve History
Fetch past messages from a channel:
import { realtime } from "@/lib/realtime"
export const GET = async () => {
const messages = await realtime.channel("room-123").history()
return new Response(JSON.stringify(messages))
}
History Options
Maximum number of messages to retrieve (capped at 1000)
Fetch messages after this Unix timestamp (in milliseconds)
Fetch messages before this Unix timestamp (in milliseconds)
const messages = await realtime.channel("room-123").history({
limit: 100,
start: Date.now() - 86400000,
})
History Response
Each history message contains:
type HistoryMessage = {
id: string
event: string
channel: string
data: unknown
}
Subscribe with History
Replay past messages and continue subscribing to new ones:
import { realtime } from "@/lib/realtime"
const channel = realtime.channel("room-123")
await channel.subscribe({
events: ["chat.message"],
history: true,
onData({ event, data, channel }) {
console.log("Message:", data)
},
})
Pass history options:
await channel.subscribe({
events: ["chat.message"],
history: {
limit: 50,
start: Date.now() - 3600000,
},
onData({ data }) {
console.log("Message:", data)
},
})
This pattern:
- Fetches messages matching the history criteria
- Replays them in chronological order
- Continues to listen for new messages
Use Cases
Stream progress updates from long-running tasks:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const { jobId } = await req.json()
const channel = realtime.channel(jobId)
await channel.emit("job.started", { progress: 0 })
for (let i = 0; i <= 100; i += 10) {
await processChunk()
await channel.emit("job.progress", { progress: i })
}
await channel.emit("job.completed", { progress: 100 })
return new Response("OK")
}
Process events with server-side logic:import { realtime } from "@/lib/realtime"
import { sendEmail } from "@/lib/email"
await realtime.channel("notifications").subscribe({
events: ["notification.alert"],
onData: async ({ data }) => {
if (data.priority === "high") {
await sendEmail({
to: data.userId,
subject: "Urgent Notification",
body: data.message,
})
}
},
})
Multi-Channel Broadcasting
Emit events to multiple channels:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const { teamIds, message } = await req.json()
await Promise.all(
teamIds.map((teamId: string) =>
realtime.channel(`team-${teamId}`).emit("announcement", message)
)
)
return new Response("Broadcast sent")
}
Forward webhook events to realtime channels:import { realtime } from "@/lib/realtime"
export const POST = async (req: Request) => {
const payload = await req.json()
const channel = realtime.channel(`user-${payload.userId}`)
await channel.emit("webhook.received", payload)
return new Response("OK")
}
Next Steps