2018-08-03 00:59:19 +02:00
// Package main shows how to send continuous event messages to the clients through SSE via a broker.
// Read details at: https://www.w3schools.com/htmL/html5_serversentevents.asp and
2018-07-31 02:12:16 +02:00
// https://robots.thoughtbot.com/writing-a-server-sent-events-server-in-go
package main
import (
"encoding/json"
"fmt"
"time"
"github.com/kataras/golog"
"github.com/kataras/iris"
2018-08-02 00:58:46 +02:00
// Note:
// For some reason the latest vscode-go language extension does not provide enough intelligence (parameters documentation and go to definition features)
// for the `iris.Context` alias, therefore if you use VS Code, import the original import path of the `Context`, that will do it:
"github.com/kataras/iris/context"
2018-07-31 02:12:16 +02:00
)
// A Broker holds open client connections,
// listens for incoming events on its Notifier channel
// and broadcast event data to all registered connections.
type Broker struct {
// Events are pushed to this channel by the main events-gathering routine.
Notifier chan [ ] byte
// New client connections.
newClients chan chan [ ] byte
// Closed client connections.
closingClients chan chan [ ] byte
// Client connections registry.
clients map [ chan [ ] byte ] bool
}
// NewBroker returns a new broker factory.
func NewBroker ( ) * Broker {
b := & Broker {
Notifier : make ( chan [ ] byte , 1 ) ,
newClients : make ( chan chan [ ] byte ) ,
closingClients : make ( chan chan [ ] byte ) ,
clients : make ( map [ chan [ ] byte ] bool ) ,
}
// Set it running - listening and broadcasting events.
go b . listen ( )
return b
}
// Listen on different channels and act accordingly.
func ( b * Broker ) listen ( ) {
for {
select {
case s := <- b . newClients :
// A new client has connected.
// Register their message channel.
b . clients [ s ] = true
golog . Infof ( "Client added. %d registered clients" , len ( b . clients ) )
case s := <- b . closingClients :
// A client has dettached and we want to
// stop sending them messages.
delete ( b . clients , s )
golog . Warnf ( "Removed client. %d registered clients" , len ( b . clients ) )
case event := <- b . Notifier :
// We got a new event from the outside!
// Send event to all connected clients.
for clientMessageChan := range b . clients {
clientMessageChan <- event
}
2018-08-03 00:59:19 +02:00
2018-07-31 02:12:16 +02:00
}
}
}
2018-08-02 00:58:46 +02:00
func ( b * Broker ) ServeHTTP ( ctx context . Context ) {
2018-07-31 02:12:16 +02:00
// Make sure that the writer supports flushing.
2018-08-02 16:46:35 +02:00
flusher , ok := ctx . ResponseWriter ( ) . Flusher ( )
2018-07-31 02:12:16 +02:00
if ! ok {
2018-08-02 16:46:35 +02:00
ctx . StatusCode ( iris . StatusHTTPVersionNotSupported )
2018-07-31 02:12:16 +02:00
ctx . WriteString ( "Streaming unsupported!" )
return
}
// Set the headers related to event streaming, you can omit the "application/json" if you send plain text.
2018-08-02 00:58:46 +02:00
// If you develop a go client, you must have: "Accept" : "application/json, text/event-stream" header as well.
2018-07-31 02:12:16 +02:00
ctx . ContentType ( "application/json, text/event-stream" )
ctx . Header ( "Cache-Control" , "no-cache" )
ctx . Header ( "Connection" , "keep-alive" )
// We also add a Cross-origin Resource Sharing header so browsers on different domains can still connect.
ctx . Header ( "Access-Control-Allow-Origin" , "*" )
// Each connection registers its own message channel with the Broker's connections registry.
messageChan := make ( chan [ ] byte )
// Signal the broker that we have a new connection.
b . newClients <- messageChan
2018-08-03 00:59:19 +02:00
// Listen to connection close and when the entire request handler chain exits(this handler here) and un-register messageChan.
2018-08-02 16:46:35 +02:00
ctx . OnClose ( func ( ) {
// Remove this client from the map of connected clients
// when this handler exits.
2018-07-31 02:12:16 +02:00
b . closingClients <- messageChan
2018-08-02 16:46:35 +02:00
} )
2018-07-31 02:12:16 +02:00
2018-08-03 00:59:19 +02:00
// Block waiting for messages broadcast on this connection's messageChan.
2018-07-31 02:12:16 +02:00
for {
// Write to the ResponseWriter.
// Server Sent Events compatible.
2018-08-03 00:59:19 +02:00
ctx . Writef ( "data: %s\n\n" , <- messageChan )
2018-07-31 02:12:16 +02:00
// or json: data:{obj}.
2018-08-05 12:51:05 +02:00
// Flush the data immediately instead of buffering it for later.
2018-07-31 02:12:16 +02:00
flusher . Flush ( )
}
}
type event struct {
Timestamp int64 ` json:"timestamp" `
Message string ` json:"message" `
}
2018-08-03 00:59:19 +02:00
const script = ` < script type = "text/javascript" >
if ( typeof ( EventSource ) != = "undefined" ) {
console . log ( "server-sent events supported" ) ;
var client = new EventSource ( "http://localhost:8080/events" ) ;
var index = 1 ;
client . onmessage = function ( evt ) {
console . log ( evt ) ;
// it's not required that you send and receive JSON, you can just output the "evt.data" as well.
dataJSON = JSON . parse ( evt . data )
var table = document . getElementById ( "messagesTable" ) ;
var row = table . insertRow ( index ) ;
var cellTimestamp = row . insertCell ( 0 ) ;
var cellMessage = row . insertCell ( 1 ) ;
cellTimestamp . innerHTML = dataJSON . timestamp ;
cellMessage . innerHTML = dataJSON . message ;
index ++ ;
window . scrollTo ( 0 , document . body . scrollHeight ) ;
} ;
} else {
document . getElementById ( "header" ) . innerHTML = "<h2>SSE not supported by this client-protocol</h2>" ;
}
< / script > `
2018-07-31 02:12:16 +02:00
func main ( ) {
broker := NewBroker ( )
go func ( ) {
for {
time . Sleep ( 2 * time . Second )
now := time . Now ( )
evt := event {
Timestamp : now . Unix ( ) ,
2018-08-03 00:59:19 +02:00
Message : fmt . Sprintf ( "Hello at %s" , now . Format ( time . RFC1123 ) ) ,
2018-07-31 02:12:16 +02:00
}
2018-08-03 00:59:19 +02:00
2018-07-31 02:12:16 +02:00
evtBytes , err := json . Marshal ( evt )
if err != nil {
2018-08-03 00:59:19 +02:00
golog . Error ( err )
2018-07-31 02:12:16 +02:00
continue
}
broker . Notifier <- evtBytes
}
} ( )
app := iris . New ( )
2018-08-03 12:45:05 +02:00
app . Get ( "/" , func ( ctx context . Context ) {
2018-08-03 00:59:19 +02:00
ctx . HTML (
` <html><head><title>SSE</title> ` + script + ` < / head >
< body >
< h1 id = "header" > Waiting for messages ... < / h1 >
< table id = "messagesTable" border = "1" >
< tr >
< th > Timestamp ( server ) < / th >
< th > Message < / th >
< / tr >
< / table >
< / body >
< / html > ` )
} )
app . Get ( "/events" , broker . ServeHTTP )
2018-07-31 02:12:16 +02:00
// http://localhost:8080
2018-08-03 00:59:19 +02:00
// http://localhost:8080/events
2018-08-02 16:46:35 +02:00
app . Run ( iris . Addr ( ":8080" ) , iris . WithoutServerError ( iris . ErrServerClosed ) )
2018-07-31 02:12:16 +02:00
}