2016-05-30 16:08:09 +02:00
package websocket
import (
"time"
"bytes"
2016-05-31 12:50:53 +02:00
"strconv"
2016-05-30 16:08:09 +02:00
"github.com/iris-contrib/websocket"
"github.com/kataras/iris/utils"
)
type (
// DisconnectFunc is the callback which fires when a client/connection closed
DisconnectFunc func ( )
2016-06-18 01:57:18 +02:00
// ErrorFunc is the callback which fires when an error happens
ErrorFunc ( func ( string ) )
2016-05-30 16:08:09 +02:00
// NativeMessageFunc is the callback for native websocket messages, receives one []byte parameter which is the raw client's message
NativeMessageFunc func ( [ ] byte )
// MessageFunc is the second argument to the Emitter's Emit functions.
// A callback which should receives one parameter of type string, int, bool or any valid JSON/Go struct
MessageFunc interface { }
// Connection is the client
Connection interface {
2016-06-21 13:18:22 +02:00
// Emitter implements EmitMessage & Emit
Emitter
2016-05-30 16:08:09 +02:00
// ID returns the connection's identifier
ID ( ) string
// OnDisconnect registers a callback which fires when this connection is closed by an error or manual
OnDisconnect ( DisconnectFunc )
2016-06-18 01:57:18 +02:00
// OnError registers a callback which fires when this connection occurs an error
OnError ( ErrorFunc )
// EmitError can be used to send a custom error message to the connection
//
// It does nothing more than firing the OnError listeners. It doesn't sends anything to the client.
EmitError ( errorMessage string )
2016-05-30 16:08:09 +02:00
// To defines where server should send a message
// returns an emmiter to send messages
2016-06-21 13:18:22 +02:00
To ( string ) Emitter
2016-05-30 16:08:09 +02:00
// OnMessage registers a callback which fires when native websocket message received
OnMessage ( NativeMessageFunc )
// On registers a callback to a particular event which fires when a message to this event received
On ( string , MessageFunc )
// Join join a connection to a room, it doesn't check if connection is already there, so care
Join ( string )
// Leave removes a connection from a room
Leave ( string )
}
connection struct {
underline * websocket . Conn
id string
send chan [ ] byte
onDisconnectListeners [ ] DisconnectFunc
2016-06-18 01:57:18 +02:00
onErrorListeners [ ] ErrorFunc
2016-05-30 16:08:09 +02:00
onNativeMessageListeners [ ] NativeMessageFunc
onEventListeners map [ string ] [ ] MessageFunc
// these were maden for performance only
2016-06-21 13:18:22 +02:00
self Emitter // pre-defined emmiter than sends message to its self client
broadcast Emitter // pre-defined emmiter that sends message to all except this
all Emitter // pre-defined emmiter which sends message to all clients
2016-05-30 16:08:09 +02:00
server * server
}
)
var _ Connection = & connection { }
// connection implementation
func newConnection ( websocketConn * websocket . Conn , s * server ) * connection {
c := & connection {
id : utils . RandomString ( 64 ) ,
underline : websocketConn ,
send : make ( chan [ ] byte , 256 ) ,
onDisconnectListeners : make ( [ ] DisconnectFunc , 0 ) ,
2016-06-18 01:57:18 +02:00
onErrorListeners : make ( [ ] ErrorFunc , 0 ) ,
2016-05-30 16:08:09 +02:00
onNativeMessageListeners : make ( [ ] NativeMessageFunc , 0 ) ,
onEventListeners : make ( map [ string ] [ ] MessageFunc , 0 ) ,
server : s ,
}
2016-06-21 13:18:22 +02:00
c . self = newEmitter ( c , c . id )
c . broadcast = newEmitter ( c , NotMe )
c . all = newEmitter ( c , All )
2016-05-30 16:08:09 +02:00
return c
}
func ( c * connection ) write ( messageType int , data [ ] byte ) error {
2016-06-15 23:13:56 +02:00
c . underline . SetWriteDeadline ( time . Now ( ) . Add ( c . server . config . WriteTimeout ) )
2016-05-30 16:08:09 +02:00
return c . underline . WriteMessage ( messageType , data )
}
func ( c * connection ) writer ( ) {
2016-06-15 23:13:56 +02:00
ticker := time . NewTicker ( c . server . config . PingPeriod )
2016-05-30 16:08:09 +02:00
defer func ( ) {
ticker . Stop ( )
c . underline . Close ( )
} ( )
for {
select {
case msg , ok := <- c . send :
if ! ok {
2016-06-17 18:51:17 +02:00
defer func ( ) {
// FIX FOR: https://github.com/kataras/iris/issues/175
// AS I TESTED ON TRIDENT ENGINE (INTERNET EXPLORER/SAFARI):
// NAVIGATE TO SITE, CLOSE THE TAB, NOTHING HAPPENS
// CLOSE THE WHOLE BROWSER, THEN THE c.conn is NOT NILL BUT ALL ITS FUNCTIONS PANICS, MEANS THAT IS THE STRUCT IS NOT NIL BUT THE WRITER/READER ARE NIL
// THE ONLY SOLUTION IS TO RECOVER HERE AT ANY PANIC
// THE FRAMETYPE = 8, c.closeSend = true
// NOTE THAT THE CLIENT IS NOT DISCONNECTED UNTIL THE WHOLE WINDOW BROWSER CLOSED, this is engine's bug.
//
if err := recover ( ) ; err != nil {
ticker . Stop ( )
c . server . free <- c
c . underline . Close ( )
}
} ( )
2016-05-30 16:08:09 +02:00
c . write ( websocket . CloseMessage , [ ] byte { } )
return
}
2016-06-17 00:44:16 +02:00
c . underline . SetWriteDeadline ( time . Now ( ) . Add ( c . server . config . WriteTimeout ) )
res , err := c . underline . NextWriter ( websocket . TextMessage )
if err != nil {
2016-05-30 16:08:09 +02:00
return
}
2016-06-17 00:44:16 +02:00
res . Write ( msg )
n := len ( c . send )
for i := 0 ; i < n ; i ++ {
res . Write ( <- c . send )
}
if err := res . Close ( ) ; err != nil {
return
}
// if err := c.write(websocket.TextMessage, msg); err != nil {
// return
// }
2016-05-30 16:08:09 +02:00
case <- ticker . C :
if err := c . write ( websocket . PingMessage , [ ] byte { } ) ; err != nil {
return
}
}
}
}
func ( c * connection ) reader ( ) {
defer func ( ) {
c . server . free <- c
c . underline . Close ( )
} ( )
conn := c . underline
2016-06-15 23:13:56 +02:00
conn . SetReadLimit ( c . server . config . MaxMessageSize )
conn . SetReadDeadline ( time . Now ( ) . Add ( c . server . config . PongTimeout ) )
2016-05-30 16:08:09 +02:00
conn . SetPongHandler ( func ( s string ) error {
2016-06-15 23:13:56 +02:00
conn . SetReadDeadline ( time . Now ( ) . Add ( c . server . config . PongTimeout ) )
2016-05-30 16:08:09 +02:00
return nil
} )
for {
if _ , data , err := conn . ReadMessage ( ) ; err != nil {
if websocket . IsUnexpectedCloseError ( err , websocket . CloseGoingAway ) {
2016-06-18 01:57:18 +02:00
c . EmitError ( err . Error ( ) )
2016-05-30 16:08:09 +02:00
}
break
} else {
c . messageReceived ( data )
}
}
}
// messageReceived checks the incoming message and fire the nativeMessage listeners or the event listeners (iris-ws custom message)
func ( c * connection ) messageReceived ( data [ ] byte ) {
if bytes . HasPrefix ( data , prefixBytes ) {
customData := string ( data )
//it's a custom iris-ws message
receivedEvt := getCustomEvent ( customData )
listeners := c . onEventListeners [ receivedEvt ]
if listeners == nil { // if not listeners for this event exit from here
return
}
customMessage , err := deserialize ( receivedEvt , customData )
if customMessage == nil || err != nil {
return
}
for i := range listeners {
if fn , ok := listeners [ i ] . ( func ( ) ) ; ok { // its a simple func(){} callback
fn ( )
} else if fnString , ok := listeners [ i ] . ( func ( string ) ) ; ok {
2016-05-31 12:50:53 +02:00
if msgString , is := customMessage . ( string ) ; is {
fnString ( msgString )
} else if msgInt , is := customMessage . ( int ) ; is {
// here if server side waiting for string but client side sent an int, just convert this int to a string
fnString ( strconv . Itoa ( msgInt ) )
}
2016-05-30 16:08:09 +02:00
} else if fnInt , ok := listeners [ i ] . ( func ( int ) ) ; ok {
fnInt ( customMessage . ( int ) )
} else if fnBool , ok := listeners [ i ] . ( func ( bool ) ) ; ok {
fnBool ( customMessage . ( bool ) )
} else if fnBytes , ok := listeners [ i ] . ( func ( [ ] byte ) ) ; ok {
fnBytes ( customMessage . ( [ ] byte ) )
} else {
listeners [ i ] . ( func ( interface { } ) ) ( customMessage )
}
}
} else {
// it's native websocket message
for i := range c . onNativeMessageListeners {
c . onNativeMessageListeners [ i ] ( data )
}
}
}
func ( c * connection ) ID ( ) string {
return c . id
}
func ( c * connection ) fireDisconnect ( ) {
for i := range c . onDisconnectListeners {
c . onDisconnectListeners [ i ] ( )
}
}
func ( c * connection ) OnDisconnect ( cb DisconnectFunc ) {
c . onDisconnectListeners = append ( c . onDisconnectListeners , cb )
}
2016-06-18 01:57:18 +02:00
func ( c * connection ) OnError ( cb ErrorFunc ) {
c . onErrorListeners = append ( c . onErrorListeners , cb )
}
func ( c * connection ) EmitError ( errorMessage string ) {
for _ , cb := range c . onErrorListeners {
cb ( errorMessage )
}
}
2016-06-21 13:18:22 +02:00
func ( c * connection ) To ( to string ) Emitter {
2016-05-30 16:08:09 +02:00
if to == NotMe { // if send to all except me, then return the pre-defined emmiter, and so on
return c . broadcast
} else if to == All {
return c . all
} else if to == c . id {
return c . self
}
// is an emmiter to another client/connection
2016-06-21 13:18:22 +02:00
return newEmitter ( c , to )
2016-05-30 16:08:09 +02:00
}
func ( c * connection ) EmitMessage ( nativeMessage [ ] byte ) error {
return c . self . EmitMessage ( nativeMessage )
}
func ( c * connection ) Emit ( event string , message interface { } ) error {
return c . self . Emit ( event , message )
}
func ( c * connection ) OnMessage ( cb NativeMessageFunc ) {
c . onNativeMessageListeners = append ( c . onNativeMessageListeners , cb )
}
func ( c * connection ) On ( event string , cb MessageFunc ) {
if c . onEventListeners [ event ] == nil {
c . onEventListeners [ event ] = make ( [ ] MessageFunc , 0 )
}
c . onEventListeners [ event ] = append ( c . onEventListeners [ event ] , cb )
}
func ( c * connection ) Join ( roomName string ) {
payload := roomPayload { roomName , c . id }
c . server . join <- payload
}
func ( c * connection ) Leave ( roomName string ) {
payload := roomPayload { roomName , c . id }
c . server . leave <- payload
}
//