Server Sent Events (SSE)
Okapi provides built-in support for Server Sent Events (SSE), allowing you to easily implement real-time data streaming from the server to clients. SSE enables one-way communication where the server can push updates to connected clients without polling.
Table of Contents
Quick Start
Simple Time Stream
package main
import (
"fmt"
"github.com/jkaninda/okapi"
"time"
)
func main() {
o := okapi.Default()
o.Get("/time", func(c *okapi.Context) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
err := c.SSESendText(t.Format(time.RFC3339))
if err != nil {
return err
}
case <-c.Request().Context().Done():
return nil
}
}
})
err := o.Start()
if err != nil {
panic(err)
}
}
Basic Examples
1. Simple Data Stream
o.Get("/counter", func(c *okapi.Context) error {
for i := 0; i <= 10; i++ {
err := c.SSESendData(i)
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
})
2. JSON Data Stream
type StockUpdate struct {
Name string `json:"name"`
Price float64 `json:"price"`
Time string `json:"time"`
}
o.Get("/stocks", func(c *okapi.Context) error {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
update := StockUpdate{
Name: "STOCK-XYZ",
Price: 150.25 + float64(time.Now().Second()),
Time: time.Now().Format(time.RFC3339),
}
err := c.SSESendJSON(update)
if err != nil {
return err
}
case <-c.Request().Context().Done():
return nil
}
}
})
3. Custom Event Types
o.Get("/notifications", func(c *okapi.Context) error {
notifications := []struct {
Type string
Data okapi.M
}{
{"info", okapi.M{"message": "System started"}},
{"warning", okapi.M{"message": "High CPU usage"}},
{"error", okapi.M{"message": "Database connection failed"}},
}
for _, notif := range notifications {
err := c.SSEvent(notif.Type, notif.Data)
if err != nil {
return err
}
time.Sleep(2 * time.Second)
}
return nil
})
4. Events with IDs
o.Get("/messages", func(c *okapi.Context) error {
for i := 0; i < 5; i++ {
id := fmt.Sprintf("msg-%d", i)
data := okapi.M{
"text": fmt.Sprintf("Message %d", i),
"timestamp": time.Now().Unix(),
}
err := c.SSESendEvent(id, "message", data)
if err != nil {
return err
}
time.Sleep(1 * time.Second)
}
return nil
})
Advanced Features
1. Channel-Based Streaming
o.Get("/events", func(c *okapi.Context) error {
messageChan := make(chan okapi.Message, 10)
ctx := c.Request().Context()
// Producer goroutine
go func() {
defer close(messageChan)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for i := 0; i < 10; i++ {
select {
case <-ctx.Done():
return
case <-ticker.C:
messageChan <- okapi.Message{
Event: "update",
Data: okapi.M{
"count": i,
"time": time.Now().Format(time.RFC3339),
},
}
}
}
}()
return c.SSEStream(ctx, messageChan)
})
2. Stream with Options (Ping & Error Handling)
o.Get("/live-data", func(c *okapi.Context) error {
messageChan := make(chan okapi.Message, 10)
ctx := c.Request().Context()
opts := &okapi.StreamOptions{
PingInterval: 30 * time.Second, // Keep connection alive
Serializer: okapi.JSONSerializer{},
OnError: func(err error) {
fmt.Printf("Stream error: %v\n", err)
},
}
// Producer goroutine
go func() {
defer close(messageChan)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
messageChan <- okapi.Message{
Event: "data",
Data: okapi.M{
"temperature": 20 + time.Now().Second()%10,
"humidity": 60 + time.Now().Second()%20,
},
}
}
}
}()
return c.SSEStreamWithOptions(ctx, messageChan, opts)
})
3. Binary Data Streaming
o.Get("/images", func(c *okapi.Context) error {
// Simulate sending binary data
imageData := []byte{0x89, 0x50, 0x4E, 0x47} // PNG header example
err := c.SSESendBinary(imageData)
if err != nil {
return err
}
return nil
})
Custom Serializers
Create a Custom Serializer
// XMLSerializer for XML data
type XMLSerializer struct{}
func (x XMLSerializer) Serialize(data any) (string, error) {
xmlBytes, err := xml.Marshal(data)
if err != nil {
return "", fmt.Errorf("failed to encode data as XML: %w", err)
}
return string(xmlBytes), nil
}
// Usage
o.Get("/xml-events", func(c *okapi.Context) error {
data := struct {
XMLName xml.Name `xml:"update"`
Message string `xml:"message"`
Time string `xml:"time"`
}{
Message: "Hello World",
Time: time.Now().Format(time.RFC3339),
}
return c.SendSSECustom(data, XMLSerializer{})
})
Complete Example with Client
package main
import (
"fmt"
"github.com/jkaninda/okapi"
"net/http"
"time"
)
const template = `
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title></title>
<style>
body { font-family: Arial, sans-serif; max-width: 800px; margin: 50px auto; }
#events { border: 1px solid #ccc; padding: 20px; height: 400px; overflow-y: auto; }
.event { margin: 10px 0; padding: 10px; background: #f5f5f5; border-radius: 4px; }
.event-type { font-weight: bold; color: #007bff; }
</style>
</head>
<body>
<h1></h1>
<div id="status">Connecting...</div>
<div id="events"></div>
<script>
const eventsDiv = document.getElementById('events');
const statusDiv = document.getElementById('status');
const eventSource = new EventSource('');
eventSource.onopen = function() {
statusDiv.textContent = 'Connected ✓';
statusDiv.style.color = 'green';
};
eventSource.onmessage = function(event) {
addEvent('message', event.data);
};
// Listen for custom event types
eventSource.addEventListener('update', function(event) {
addEvent('update', event.data);
});
eventSource.addEventListener('notification', function(event) {
addEvent('notification', event.data);
});
eventSource.onerror = function(error) {
statusDiv.textContent = 'Error - Reconnecting...';
statusDiv.style.color = 'red';
};
function addEvent(type, data) {
const eventElement = document.createElement('div');
eventElement.className = 'event';
eventElement.innerHTML = '<span class="event-type">' + type + ':</span> ' + data;
eventsDiv.insertBefore(eventElement, eventsDiv.firstChild);
}
</script>
</body>
</html>`
func main() {
o := okapi.Default()
// Serve HTML page
o.Get("/", func(c *okapi.Context) error {
return c.HTMLView(http.StatusOK, template, okapi.M{
"title": "Okapi SSE Demo",
"eventURL": "/events",
})
})
// SSE endpoint
o.Get("/events", func(c *okapi.Context) error {
messageChan := make(chan okapi.Message, 10)
ctx := c.Request().Context()
go func() {
defer close(messageChan)
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
counter := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
counter++
// Send different event types
eventType := "update"
if counter%5 == 0 {
eventType = "notification"
}
messageChan <- okapi.Message{
Event: eventType,
Data: okapi.M{
"count": counter,
"timestamp": time.Now().Format(time.RFC3339),
"message": fmt.Sprintf("Event #%d", counter),
},
}
}
}
}()
opts := &okapi.StreamOptions{
PingInterval: 30 * time.Second,
OnError: func(err error) {
fmt.Printf("Stream error: %v\n", err)
},
}
return c.SSEStreamWithOptions(ctx, messageChan, opts)
})
if err := o.Start(); err != nil {
panic(err)
}
}
Best Practices
1. Always Handle Context Cancellation
o.Get("/events", func(c *okapi.Context) error {
ctx := c.Request().Context()
for {
select {
case <-ctx.Done():
// Client disconnected
return nil
default:
// Send event
err := c.SSESendData("data")
if err != nil {
return err
}
}
}
})
2. Use Buffered Channels
messageChan := make(chan okapi.Message, 100) // Buffered channel
3. Set Appropriate Ping Intervals
opts := &okapi.StreamOptions{
PingInterval: 30 * time.Second, // Recommended: 15-60 seconds
}
4. Clean Up Resources
go func() {
defer close(messageChan)
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop() // Always clean up tickers
// ... your code
}()
5. Handle Errors Gracefully
opts := &okapi.StreamOptions{
OnError: func(err error) {
log.Printf("SSE stream error: %v", err)
// Implement your error handling logic
},
}
API Reference
Methods
SSESendData(data any)- Send data with automatic serializationSSESendJSON(data any)- Send JSON data explicitlySSESendText(text string)- Send plain textSSESendBinary(data []byte)- Send binary data as base64SSEvent(eventType string, data any)- Send event with custom typeSSESendEvent(id, eventType string, data any)- Send event with ID and typeSSEStream(ctx, messageChan)- Stream from channelSSEStreamWithOptions(ctx, messageChan, opts)- Stream with advanced optionsSendSSECustom(data, serializer)- Send with custom serializer
StreamOptions
type StreamOptions struct {
Serializer Serializer // Custom serializer
PingInterval time.Duration // Keep-alive interval
OnError func(error) // Error handler
}
More Examples
See the complete examples repository: okapi-example ```