use the same connection structure for both client and server-side connections interfaces, the 'Connection' interface could be changed to 'ServerConn' but this would produce breaking naming change to the iris users, so keep it as it's.

Former-commit-id: 3440871b368709e33d2d2a5080c66f7ad9338970
This commit is contained in:
Gerasimos (Makis) Maropoulos 2019-02-10 17:16:43 +02:00
parent 280872fd59
commit 946c100f7d
5 changed files with 258 additions and 296 deletions

View File

@ -21,21 +21,21 @@ $ go run main.go
>> hi!
*/
func main() {
conn, err := websocket.Dial(url, websocket.DefaultEvtMessageKey)
c, err := websocket.Dial(url, websocket.ConnectionConfig{})
if err != nil {
panic(err)
}
conn.OnError(func(err error) {
c.OnError(func(err error) {
fmt.Printf("error: %v", err)
})
conn.OnDisconnect(func() {
fmt.Println("Server was force-closed[see ../server/main.go#L19] this connection after 20 seconds, therefore I am disconnected.")
c.OnDisconnect(func() {
fmt.Println("Server was force-closed[see ../server/main.go#L17] this connection after 20 seconds, therefore I am disconnected.")
os.Exit(0)
})
conn.On("chat", func(message string) {
c.On("chat", func(message string) {
fmt.Printf("\n%s\n", message)
})
@ -51,7 +51,7 @@ func main() {
break
}
conn.Emit("chat", msgToSend)
c.Emit("chat", msgToSend)
}
fmt.Println("Terminated.")

View File

@ -11,8 +11,6 @@ import (
func main() {
app := iris.New()
ws := websocket.New(websocket.Config{})
app.Get("/socket", ws.Handler())
ws.OnConnection(func(c websocket.Connection) {
go func() {
<-time.After(20 * time.Second)
@ -28,5 +26,7 @@ func main() {
})
})
app.Get("/socket", ws.Handler())
app.Run(iris.Addr(":8080"))
}

View File

@ -1,214 +0,0 @@
package websocket
import (
"bytes"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/gorilla/websocket"
)
// Dial opens a new client connection to a WebSocket.
func Dial(url, evtMessagePrefix string) (ws *ClientConn, err error) {
if !strings.HasPrefix(url, "ws://") {
url = "ws://" + url
}
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
return NewClientConn(conn, evtMessagePrefix), nil
}
type ClientConn struct {
underline UnderlineConnection // TODO make it using gorilla's one, because the 'startReader' will not know when to stop otherwise, we have a fixed length currently...
messageType int
serializer *messageSerializer
onErrorListeners []ErrorFunc
onDisconnectListeners []DisconnectFunc
onNativeMessageListeners []NativeMessageFunc
onEventListeners map[string][]MessageFunc
writerMu sync.Mutex
disconnected uint32
}
func NewClientConn(conn UnderlineConnection, evtMessagePrefix string) *ClientConn {
if evtMessagePrefix == "" {
evtMessagePrefix = DefaultEvtMessageKey
}
c := &ClientConn{
underline: conn,
serializer: newMessageSerializer([]byte(evtMessagePrefix)),
onErrorListeners: make([]ErrorFunc, 0),
onDisconnectListeners: make([]DisconnectFunc, 0),
onNativeMessageListeners: make([]NativeMessageFunc, 0),
onEventListeners: make(map[string][]MessageFunc, 0),
}
c.SetBinaryMessages(false)
go c.startReader()
return c
}
func (c *ClientConn) SetBinaryMessages(binaryMessages bool) {
if binaryMessages {
c.messageType = websocket.BinaryMessage
} else {
c.messageType = websocket.TextMessage
}
}
func (c *ClientConn) startReader() {
defer c.Disconnect()
for {
_, data, err := c.underline.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
c.FireOnError(err)
}
break
} else {
c.messageReceived(data)
}
}
}
func (c *ClientConn) messageReceived(data []byte) error {
if bytes.HasPrefix(data, c.serializer.prefix) {
// is a custom iris message.
receivedEvt := c.serializer.getWebsocketCustomEvent(data)
listeners, ok := c.onEventListeners[string(receivedEvt)]
if !ok || len(listeners) == 0 {
return nil // if not listeners for this event exit from here
}
customMessage, err := c.serializer.deserialize(receivedEvt, data)
if customMessage == nil || err != nil {
return err
}
for i := range listeners {
if fn, ok := listeners[i].(func()); ok { // its a simple func(){} callback
fn()
} else if fnString, ok := listeners[i].(func(string)); ok {
if msgString, is := customMessage.(string); is {
fnString(msgString)
} else if msgInt, is := customMessage.(int); is {
// here if server side waiting for string but client side sent an int, just convert this int to a string
fnString(strconv.Itoa(msgInt))
}
} else if fnInt, ok := listeners[i].(func(int)); ok {
fnInt(customMessage.(int))
} else if fnBool, ok := listeners[i].(func(bool)); ok {
fnBool(customMessage.(bool))
} else if fnBytes, ok := listeners[i].(func([]byte)); ok {
fnBytes(customMessage.([]byte))
} else {
listeners[i].(func(interface{}))(customMessage)
}
}
} else {
// it's native websocket message
for i := range c.onNativeMessageListeners {
c.onNativeMessageListeners[i](data)
}
}
return nil
}
func (c *ClientConn) OnMessage(cb NativeMessageFunc) {
c.onNativeMessageListeners = append(c.onNativeMessageListeners, cb)
}
func (c *ClientConn) On(event string, cb MessageFunc) {
if c.onEventListeners[event] == nil {
c.onEventListeners[event] = make([]MessageFunc, 0)
}
c.onEventListeners[event] = append(c.onEventListeners[event], cb)
}
func (c *ClientConn) OnError(cb ErrorFunc) {
c.onErrorListeners = append(c.onErrorListeners, cb)
}
func (c *ClientConn) FireOnError(err error) {
for _, cb := range c.onErrorListeners {
cb(err)
}
}
func (c *ClientConn) OnDisconnect(cb DisconnectFunc) {
c.onDisconnectListeners = append(c.onDisconnectListeners, cb)
}
func (c *ClientConn) Disconnect() error {
if c == nil || !atomic.CompareAndSwapUint32(&c.disconnected, 0, 1) {
return ErrAlreadyDisconnected
}
err := c.underline.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
err = c.underline.Close()
}
if err == nil {
for i := range c.onDisconnectListeners {
c.onDisconnectListeners[i]()
}
}
return err
}
func (c *ClientConn) EmitMessage(nativeMessage []byte) error {
return c.writeDefault(nativeMessage)
}
func (c *ClientConn) Emit(event string, data interface{}) error {
b, err := c.serializer.serialize(event, data)
if err != nil {
return err
}
return c.EmitMessage(b)
}
// Write writes a raw websocket message with a specific type to the client
// used by ping messages and any CloseMessage types.
func (c *ClientConn) Write(websocketMessageType int, data []byte) error {
// for any-case the app tries to write from different goroutines,
// we must protect them because they're reporting that as bug...
c.writerMu.Lock()
// .WriteMessage same as NextWriter and close (flush)
err := c.underline.WriteMessage(websocketMessageType, data)
c.writerMu.Unlock()
if err != nil {
// if failed then the connection is off, fire the disconnect
c.Disconnect()
}
return err
}
// writeDefault is the same as write but the message type is the configured by c.messageType
// if BinaryMessages is enabled then it's raw []byte as you expected to work with protobufs
func (c *ClientConn) writeDefault(data []byte) error {
return c.Write(c.messageType, data)
}

View File

@ -6,13 +6,37 @@ import (
"io"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/kataras/iris/context"
)
const (
// TextMessage denotes a text data message. The text message payload is
// interpreted as UTF-8 encoded text data.
TextMessage = websocket.TextMessage
// BinaryMessage denotes a binary data message.
BinaryMessage = websocket.BinaryMessage
// CloseMessage denotes a close control message. The optional message
// payload contains a numeric code and text. Use the FormatCloseMessage
// function to format a close message payload.
CloseMessage = websocket.CloseMessage
// PingMessage denotes a ping control message. The optional message payload
// is UTF-8 encoded text.
PingMessage = websocket.PingMessage
// PongMessage denotes a ping control message. The optional message payload
// is UTF-8 encoded text.
PongMessage = websocket.PongMessage
)
type (
connectionValue struct {
key []byte
@ -136,51 +160,27 @@ type (
PingFunc func()
// PongFunc is the callback which fires on pong message received
PongFunc func()
// Connection is the front-end API that you will use to communicate with the client side
// Connection is the front-end API that you will use to communicate with the client side,
// it is the server-side connection.
Connection interface {
// Emitter implements EmitMessage & Emit
Emitter
ClientConnection
// Err is not nil if the upgrader failed to upgrade http to websocket connection.
Err() error
// ID returns the connection's identifier
ID() string
// Server returns the websocket server instance
// which this connection is listening to.
//
// Its connection-relative operations are safe for use.
Server() *Server
// Write writes a raw websocket message with a specific type to the client
// used by ping messages and any CloseMessage types.
Write(websocketMessageType int, data []byte) error
// Context returns the (upgraded) context.Context of this connection
// avoid using it, you normally don't need it,
// websocket has everything you need to authenticate the user BUT if it's necessary
// then you use it to receive user information, for example: from headers
Context() context.Context
// OnDisconnect registers a callback which is fired when this connection is closed by an error or manual
OnDisconnect(DisconnectFunc)
// OnError registers a callback which fires when this connection occurs an error
OnError(ErrorFunc)
// OnPing registers a callback which fires on each ping
OnPing(PingFunc)
// OnPong registers a callback which fires on pong message received
OnPong(PongFunc)
// FireOnError can be used to send a custom error message to the connection
//
// It does nothing more than firing the OnError listeners. It doesn't send anything to the client.
FireOnError(err error)
// To defines on what "room" (see Join) the server should send a message
// returns an Emmiter(`EmitMessage` & `Emit`) to send messages.
To(string) Emitter
// OnMessage registers a callback which fires when native websocket message received
OnMessage(NativeMessageFunc)
// On registers a callback to a particular event which is fired when a message to this event is received
On(string, MessageFunc)
// Join registers this connection to a room, if it doesn't exist then it creates a new. One room can have one or more connections. One connection can be joined to many rooms. All connections are joined to a room specified by their `ID` automatically.
Join(string)
// IsJoined returns true when this connection is joined to the room, otherwise false.
@ -201,9 +201,6 @@ type (
// after the "On" events IF server's `Upgrade` is used,
// otherise you don't have to call it because the `Handler()` does it automatically.
Wait()
// Disconnect disconnects the client, close the underline websocket conn and removes it from the conn list
// returns the error, if any, from the underline connection
Disconnect() error
// SetValue sets a key-value pair on the connection's mem store.
SetValue(key string, value interface{})
// GetValue gets a value by its key from the connection's mem store.
@ -216,19 +213,50 @@ type (
GetValueInt(key string) int
}
// ClientConnection is the client-side connection interface. Server shares some of its methods but the underline actions differs.
ClientConnection interface {
Emitter
// Write writes a raw websocket message with a specific type to the client
// used by ping messages and any CloseMessage types.
Write(websocketMessageType int, data []byte) error
// OnMessage registers a callback which fires when native websocket message received
OnMessage(NativeMessageFunc)
// On registers a callback to a particular event which is fired when a message to this event is received
On(string, MessageFunc)
// OnError registers a callback which fires when this connection occurs an error
OnError(ErrorFunc)
// OnPing registers a callback which fires on each ping
OnPing(PingFunc)
// OnPong registers a callback which fires on pong message received
OnPong(PongFunc)
// FireOnError can be used to send a custom error message to the connection
//
// It does nothing more than firing the OnError listeners. It doesn't send anything to the client.
FireOnError(err error)
// OnDisconnect registers a callback which is fired when this connection is closed by an error or manual
OnDisconnect(DisconnectFunc)
// Disconnect disconnects the client, close the underline websocket conn and removes it from the conn list
// returns the error, if any, from the underline connection
Disconnect() error
}
connection struct {
err error
underline UnderlineConnection
config ConnectionConfig
defaultMessageType int
serializer *messageSerializer
id string
messageType int
disconnected bool
onDisconnectListeners []DisconnectFunc
onRoomLeaveListeners []LeaveRoomFunc
onErrorListeners []ErrorFunc
onPingListeners []PingFunc
onPongListeners []PongFunc
onNativeMessageListeners []NativeMessageFunc
onEventListeners map[string][]MessageFunc
onRoomLeaveListeners []LeaveRoomFunc
onDisconnectListeners []DisconnectFunc
disconnected uint32
started bool
// these were maden for performance only
self Emitter // pre-defined emitter than sends message to its self client
@ -250,33 +278,49 @@ type (
var _ Connection = &connection{}
// CloseMessage denotes a close control message. The optional message
// payload contains a numeric code and text. Use the FormatCloseMessage
// function to format a close message payload.
//
// Use the `Connection#Disconnect` instead.
const CloseMessage = websocket.CloseMessage
func newConnection(ctx context.Context, s *Server, underlineConn UnderlineConnection, id string) *connection {
func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection {
cfg = cfg.Validate()
c := &connection{
underline: underlineConn,
id: id,
messageType: websocket.TextMessage,
onDisconnectListeners: make([]DisconnectFunc, 0),
onRoomLeaveListeners: make([]LeaveRoomFunc, 0),
config: cfg,
serializer: newMessageSerializer(cfg.EvtMessagePrefix),
defaultMessageType: websocket.TextMessage,
onErrorListeners: make([]ErrorFunc, 0),
onPingListeners: make([]PingFunc, 0),
onPongListeners: make([]PongFunc, 0),
onNativeMessageListeners: make([]NativeMessageFunc, 0),
onEventListeners: make(map[string][]MessageFunc, 0),
onPongListeners: make([]PongFunc, 0),
started: false,
ctx: ctx,
server: s,
onDisconnectListeners: make([]DisconnectFunc, 0),
disconnected: 0,
}
if s.config.BinaryMessages {
c.messageType = websocket.BinaryMessage
if cfg.BinaryMessages {
c.defaultMessageType = websocket.BinaryMessage
}
return c
}
func newServerConnection(ctx context.Context, s *Server, underlineConn UnderlineConnection, id string) *connection {
c := newConnection(underlineConn, ConnectionConfig{
EvtMessagePrefix: s.config.EvtMessagePrefix,
WriteTimeout: s.config.WriteTimeout,
ReadTimeout: s.config.ReadTimeout,
PongTimeout: s.config.PongTimeout,
PingPeriod: s.config.PingPeriod,
MaxMessageSize: s.config.MaxMessageSize,
BinaryMessages: s.config.BinaryMessages,
ReadBufferSize: s.config.ReadBufferSize,
WriteBufferSize: s.config.WriteBufferSize,
EnableCompression: s.config.EnableCompression,
})
c.id = id
c.server = s
c.ctx = ctx
c.onRoomLeaveListeners = make([]LeaveRoomFunc, 0)
c.started = false
c.self = newEmitter(c, c.id)
c.broadcast = newEmitter(c, Broadcast)
c.all = newEmitter(c, All)
@ -295,7 +339,7 @@ func (c *connection) Write(websocketMessageType int, data []byte) error {
// for any-case the app tries to write from different goroutines,
// we must protect them because they're reporting that as bug...
c.writerMu.Lock()
if writeTimeout := c.server.config.WriteTimeout; writeTimeout > 0 {
if writeTimeout := c.config.WriteTimeout; writeTimeout > 0 {
// set the write deadline based on the configuration
c.underline.SetWriteDeadline(time.Now().Add(writeTimeout))
}
@ -312,8 +356,8 @@ func (c *connection) Write(websocketMessageType int, data []byte) error {
// writeDefault is the same as write but the message type is the configured by c.messageType
// if BinaryMessages is enabled then it's raw []byte as you expected to work with protobufs
func (c *connection) writeDefault(data []byte) {
c.Write(c.messageType, data)
func (c *connection) writeDefault(data []byte) error {
return c.Write(c.defaultMessageType, data)
}
const (
@ -342,8 +386,8 @@ func (c *connection) startPinger() {
go func() {
for {
// using sleep avoids the ticker error that causes a memory leak
time.Sleep(c.server.config.PingPeriod)
if c.disconnected {
time.Sleep(c.config.PingPeriod)
if atomic.LoadUint32(&c.disconnected) > 0 {
// verifies if already disconected
break
}
@ -375,12 +419,12 @@ func (c *connection) fireOnPong() {
func (c *connection) startReader() {
conn := c.underline
hasReadTimeout := c.server.config.ReadTimeout > 0
hasReadTimeout := c.config.ReadTimeout > 0
conn.SetReadLimit(c.server.config.MaxMessageSize)
conn.SetReadLimit(c.config.MaxMessageSize)
conn.SetPongHandler(func(s string) error {
if hasReadTimeout {
conn.SetReadDeadline(time.Now().Add(c.server.config.ReadTimeout))
conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
}
//fire all OnPong methods
go c.fireOnPong()
@ -395,7 +439,7 @@ func (c *connection) startReader() {
for {
if hasReadTimeout {
// set the read deadline based on the configuration
conn.SetReadDeadline(time.Now().Add(c.server.config.ReadTimeout))
conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout))
}
_, data, err := conn.ReadMessage()
@ -415,15 +459,15 @@ func (c *connection) startReader() {
// messageReceived checks the incoming message and fire the nativeMessage listeners or the event listeners (ws custom message)
func (c *connection) messageReceived(data []byte) {
if bytes.HasPrefix(data, c.server.config.EvtMessagePrefix) {
if bytes.HasPrefix(data, c.config.EvtMessagePrefix) {
//it's a custom ws message
receivedEvt := c.server.messageSerializer.getWebsocketCustomEvent(data)
receivedEvt := c.serializer.getWebsocketCustomEvent(data)
listeners, ok := c.onEventListeners[string(receivedEvt)]
if !ok || len(listeners) == 0 {
return // if not listeners for this event exit from here
}
customMessage, err := c.server.messageSerializer.deserialize(receivedEvt, data)
customMessage, err := c.serializer.deserialize(receivedEvt, data)
if customMessage == nil || err != nil {
return
}
@ -518,13 +562,25 @@ func (c *connection) To(to string) Emitter {
}
func (c *connection) EmitMessage(nativeMessage []byte) error {
if c.server != nil {
return c.self.EmitMessage(nativeMessage)
}
return c.writeDefault(nativeMessage)
}
func (c *connection) Emit(event string, message interface{}) error {
if c.server != nil {
return c.self.Emit(event, message)
}
b, err := c.serializer.serialize(event, message)
if err != nil {
return err
}
return c.EmitMessage(b)
}
func (c *connection) OnMessage(cb NativeMessageFunc) {
c.onNativeMessageListeners = append(c.onNativeMessageListeners, cb)
}
@ -586,12 +642,26 @@ func (c *connection) Wait() {
var ErrAlreadyDisconnected = errors.New("already disconnected")
func (c *connection) Disconnect() error {
if c == nil || c.disconnected {
if c == nil || !atomic.CompareAndSwapUint32(&c.disconnected, 0, 1) {
return ErrAlreadyDisconnected
}
if c.server != nil {
return c.server.Disconnect(c.ID())
}
err := c.underline.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
err = c.underline.Close()
}
if err == nil {
c.fireDisconnect()
}
return err
}
// mem per-conn store
func (c *connection) SetValue(key string, value interface{}) {
@ -632,3 +702,108 @@ func (c *connection) GetValueInt(key string) int {
}
return 0
}
// ConnectionConfig is the base configuration for both server and client connections.
// Clients must use `ConnectionConfig` in order to `Dial`, server's connection configuration is set by the `Config` structure.
type ConnectionConfig struct {
// EvtMessagePrefix is the prefix of the underline websocket events that are being established under the hoods.
// This prefix is visible only to the javascript side (code) and it has nothing to do
// with the message that the end-user receives.
// Do not change it unless it is absolutely necessary.
//
// If empty then defaults to []byte("iris-websocket-message:").
// Should match with the server's EvtMessagePrefix.
EvtMessagePrefix []byte
// WriteTimeout time allowed to write a message to the connection.
// 0 means no timeout.
// Default value is 0
WriteTimeout time.Duration
// ReadTimeout time allowed to read a message from the connection.
// 0 means no timeout.
// Default value is 0
ReadTimeout time.Duration
// PongTimeout allowed to read the next pong message from the connection.
// Default value is 60 * time.Second
PongTimeout time.Duration
// PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout.
// Default value is 60 *time.Second
PingPeriod time.Duration
// MaxMessageSize max message size allowed from connection.
// Default value is 1024
MaxMessageSize int64
// BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text
// compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication.
// Default value is false
BinaryMessages bool
// ReadBufferSize is the buffer size for the connection reader.
// Default value is 4096
ReadBufferSize int
// WriteBufferSize is the buffer size for the connection writer.
// Default value is 4096
WriteBufferSize int
// EnableCompression specify if the server should attempt to negotiate per
// message compression (RFC 7692). Setting this value to true does not
// guarantee that compression will be supported. Currently only "no context
// takeover" modes are supported.
//
// Defaults to false and it should be remain as it is, unless special requirements.
EnableCompression bool
}
// Validate validates the connection configuration.
func (c ConnectionConfig) Validate() ConnectionConfig {
if len(c.EvtMessagePrefix) == 0 {
c.EvtMessagePrefix = []byte(DefaultEvtMessageKey)
}
// 0 means no timeout.
if c.WriteTimeout < 0 {
c.WriteTimeout = DefaultWebsocketWriteTimeout
}
if c.ReadTimeout < 0 {
c.ReadTimeout = DefaultWebsocketReadTimeout
}
if c.PongTimeout < 0 {
c.PongTimeout = DefaultWebsocketPongTimeout
}
if c.PingPeriod <= 0 {
c.PingPeriod = DefaultWebsocketPingPeriod
}
if c.MaxMessageSize <= 0 {
c.MaxMessageSize = DefaultWebsocketMaxMessageSize
}
if c.ReadBufferSize <= 0 {
c.ReadBufferSize = DefaultWebsocketReadBufferSize
}
if c.WriteBufferSize <= 0 {
c.WriteBufferSize = DefaultWebsocketWriterBufferSize
}
return c
}
// Dial opens a new client connection to a WebSocket.
// The "url" input parameter is the url to connect to the server, it should be
// the ws:// (or wss:// if secure) + the host + the endpoint of the
// open socket of the server, i.e ws://localhost:8080/my_websocket_endpoint.
func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) {
if !strings.HasPrefix(url, "ws://") {
url = "ws://" + url
}
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
clientConn := newConnection(conn, cfg)
go clientConn.Wait()
return clientConn, nil
}

View File

@ -3,6 +3,7 @@ package websocket
import (
"bytes"
"sync"
"sync/atomic"
"github.com/kataras/iris/context"
@ -149,7 +150,7 @@ func (s *Server) handleConnection(ctx context.Context, websocketConn UnderlineCo
// use the config's id generator (or the default) to create a websocket client/connection id
cid := s.config.IDGenerator(ctx)
// create the new connection
c := newConnection(ctx, s, websocketConn, cid)
c := newServerConnection(ctx, s, websocketConn, cid)
// add the connection to the Server's list
s.addConnection(c)
@ -397,7 +398,7 @@ func (s *Server) Disconnect(connID string) (err error) {
// remove the connection from the list.
if conn, ok := s.getConnection(connID); ok {
conn.disconnected = true
atomic.StoreUint32(&conn.disconnected, 1)
// fire the disconnect callbacks, if any.
conn.fireDisconnect()
// close the underline connection and return its error, if any.