diff --git a/_examples/websocket/go-client/client/main.go b/_examples/websocket/go-client/client/main.go index 6a36b6a2..f89f1803 100644 --- a/_examples/websocket/go-client/client/main.go +++ b/_examples/websocket/go-client/client/main.go @@ -21,21 +21,21 @@ $ go run main.go >> hi! */ func main() { - conn, err := websocket.Dial(url, websocket.DefaultEvtMessageKey) + c, err := websocket.Dial(url, websocket.ConnectionConfig{}) if err != nil { panic(err) } - conn.OnError(func(err error) { + c.OnError(func(err error) { fmt.Printf("error: %v", err) }) - conn.OnDisconnect(func() { - fmt.Println("Server was force-closed[see ../server/main.go#L19] this connection after 20 seconds, therefore I am disconnected.") + c.OnDisconnect(func() { + fmt.Println("Server was force-closed[see ../server/main.go#L17] this connection after 20 seconds, therefore I am disconnected.") os.Exit(0) }) - conn.On("chat", func(message string) { + c.On("chat", func(message string) { fmt.Printf("\n%s\n", message) }) @@ -51,7 +51,7 @@ func main() { break } - conn.Emit("chat", msgToSend) + c.Emit("chat", msgToSend) } fmt.Println("Terminated.") diff --git a/_examples/websocket/go-client/server/main.go b/_examples/websocket/go-client/server/main.go index 34e880d9..0c6bed81 100644 --- a/_examples/websocket/go-client/server/main.go +++ b/_examples/websocket/go-client/server/main.go @@ -11,8 +11,6 @@ import ( func main() { app := iris.New() ws := websocket.New(websocket.Config{}) - app.Get("/socket", ws.Handler()) - ws.OnConnection(func(c websocket.Connection) { go func() { <-time.After(20 * time.Second) @@ -28,5 +26,7 @@ func main() { }) }) + app.Get("/socket", ws.Handler()) + app.Run(iris.Addr(":8080")) } diff --git a/websocket/client.go b/websocket/client.go deleted file mode 100644 index 2220e07d..00000000 --- a/websocket/client.go +++ /dev/null @@ -1,214 +0,0 @@ -package websocket - -import ( - "bytes" - "strconv" - "strings" - "sync" - "sync/atomic" - - "github.com/gorilla/websocket" -) - -// Dial opens a new client connection to a WebSocket. -func Dial(url, evtMessagePrefix string) (ws *ClientConn, err error) { - if !strings.HasPrefix(url, "ws://") { - url = "ws://" + url - } - - conn, _, err := websocket.DefaultDialer.Dial(url, nil) - if err != nil { - return nil, err - } - - return NewClientConn(conn, evtMessagePrefix), nil -} - -type ClientConn struct { - underline UnderlineConnection // TODO make it using gorilla's one, because the 'startReader' will not know when to stop otherwise, we have a fixed length currently... - messageType int - serializer *messageSerializer - - onErrorListeners []ErrorFunc - onDisconnectListeners []DisconnectFunc - onNativeMessageListeners []NativeMessageFunc - onEventListeners map[string][]MessageFunc - - writerMu sync.Mutex - - disconnected uint32 -} - -func NewClientConn(conn UnderlineConnection, evtMessagePrefix string) *ClientConn { - if evtMessagePrefix == "" { - evtMessagePrefix = DefaultEvtMessageKey - } - - c := &ClientConn{ - underline: conn, - serializer: newMessageSerializer([]byte(evtMessagePrefix)), - - onErrorListeners: make([]ErrorFunc, 0), - onDisconnectListeners: make([]DisconnectFunc, 0), - onNativeMessageListeners: make([]NativeMessageFunc, 0), - onEventListeners: make(map[string][]MessageFunc, 0), - } - - c.SetBinaryMessages(false) - - go c.startReader() - - return c -} - -func (c *ClientConn) SetBinaryMessages(binaryMessages bool) { - if binaryMessages { - c.messageType = websocket.BinaryMessage - } else { - c.messageType = websocket.TextMessage - } -} - -func (c *ClientConn) startReader() { - defer c.Disconnect() - - for { - _, data, err := c.underline.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - c.FireOnError(err) - } - - break - } else { - c.messageReceived(data) - } - } -} - -func (c *ClientConn) messageReceived(data []byte) error { - if bytes.HasPrefix(data, c.serializer.prefix) { - // is a custom iris message. - receivedEvt := c.serializer.getWebsocketCustomEvent(data) - listeners, ok := c.onEventListeners[string(receivedEvt)] - if !ok || len(listeners) == 0 { - return nil // if not listeners for this event exit from here - } - - customMessage, err := c.serializer.deserialize(receivedEvt, data) - if customMessage == nil || err != nil { - return err - } - - for i := range listeners { - if fn, ok := listeners[i].(func()); ok { // its a simple func(){} callback - fn() - } else if fnString, ok := listeners[i].(func(string)); ok { - - if msgString, is := customMessage.(string); is { - fnString(msgString) - } else if msgInt, is := customMessage.(int); is { - // here if server side waiting for string but client side sent an int, just convert this int to a string - fnString(strconv.Itoa(msgInt)) - } - - } else if fnInt, ok := listeners[i].(func(int)); ok { - fnInt(customMessage.(int)) - } else if fnBool, ok := listeners[i].(func(bool)); ok { - fnBool(customMessage.(bool)) - } else if fnBytes, ok := listeners[i].(func([]byte)); ok { - fnBytes(customMessage.([]byte)) - } else { - listeners[i].(func(interface{}))(customMessage) - } - - } - } else { - // it's native websocket message - for i := range c.onNativeMessageListeners { - c.onNativeMessageListeners[i](data) - } - } - - return nil -} - -func (c *ClientConn) OnMessage(cb NativeMessageFunc) { - c.onNativeMessageListeners = append(c.onNativeMessageListeners, cb) -} - -func (c *ClientConn) On(event string, cb MessageFunc) { - if c.onEventListeners[event] == nil { - c.onEventListeners[event] = make([]MessageFunc, 0) - } - - c.onEventListeners[event] = append(c.onEventListeners[event], cb) -} - -func (c *ClientConn) OnError(cb ErrorFunc) { - c.onErrorListeners = append(c.onErrorListeners, cb) -} - -func (c *ClientConn) FireOnError(err error) { - for _, cb := range c.onErrorListeners { - cb(err) - } -} - -func (c *ClientConn) OnDisconnect(cb DisconnectFunc) { - c.onDisconnectListeners = append(c.onDisconnectListeners, cb) -} - -func (c *ClientConn) Disconnect() error { - if c == nil || !atomic.CompareAndSwapUint32(&c.disconnected, 0, 1) { - return ErrAlreadyDisconnected - } - - err := c.underline.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - if err != nil { - err = c.underline.Close() - } - - if err == nil { - for i := range c.onDisconnectListeners { - c.onDisconnectListeners[i]() - } - } - - return err -} - -func (c *ClientConn) EmitMessage(nativeMessage []byte) error { - return c.writeDefault(nativeMessage) -} - -func (c *ClientConn) Emit(event string, data interface{}) error { - b, err := c.serializer.serialize(event, data) - if err != nil { - return err - } - - return c.EmitMessage(b) -} - -// Write writes a raw websocket message with a specific type to the client -// used by ping messages and any CloseMessage types. -func (c *ClientConn) Write(websocketMessageType int, data []byte) error { - // for any-case the app tries to write from different goroutines, - // we must protect them because they're reporting that as bug... - c.writerMu.Lock() - // .WriteMessage same as NextWriter and close (flush) - err := c.underline.WriteMessage(websocketMessageType, data) - c.writerMu.Unlock() - if err != nil { - // if failed then the connection is off, fire the disconnect - c.Disconnect() - } - return err -} - -// writeDefault is the same as write but the message type is the configured by c.messageType -// if BinaryMessages is enabled then it's raw []byte as you expected to work with protobufs -func (c *ClientConn) writeDefault(data []byte) error { - return c.Write(c.messageType, data) -} diff --git a/websocket/connection.go b/websocket/connection.go index 66acb0df..821d812b 100644 --- a/websocket/connection.go +++ b/websocket/connection.go @@ -6,13 +6,37 @@ import ( "io" "net" "strconv" + "strings" "sync" + "sync/atomic" "time" "github.com/gorilla/websocket" "github.com/kataras/iris/context" ) +const ( + // TextMessage denotes a text data message. The text message payload is + // interpreted as UTF-8 encoded text data. + TextMessage = websocket.TextMessage + + // BinaryMessage denotes a binary data message. + BinaryMessage = websocket.BinaryMessage + + // CloseMessage denotes a close control message. The optional message + // payload contains a numeric code and text. Use the FormatCloseMessage + // function to format a close message payload. + CloseMessage = websocket.CloseMessage + + // PingMessage denotes a ping control message. The optional message payload + // is UTF-8 encoded text. + PingMessage = websocket.PingMessage + + // PongMessage denotes a ping control message. The optional message payload + // is UTF-8 encoded text. + PongMessage = websocket.PongMessage +) + type ( connectionValue struct { key []byte @@ -136,51 +160,27 @@ type ( PingFunc func() // PongFunc is the callback which fires on pong message received PongFunc func() - // Connection is the front-end API that you will use to communicate with the client side + // Connection is the front-end API that you will use to communicate with the client side, + // it is the server-side connection. Connection interface { - // Emitter implements EmitMessage & Emit - Emitter + ClientConnection // Err is not nil if the upgrader failed to upgrade http to websocket connection. Err() error - // ID returns the connection's identifier ID() string - // Server returns the websocket server instance // which this connection is listening to. // // Its connection-relative operations are safe for use. Server() *Server - - // Write writes a raw websocket message with a specific type to the client - // used by ping messages and any CloseMessage types. - Write(websocketMessageType int, data []byte) error - // Context returns the (upgraded) context.Context of this connection // avoid using it, you normally don't need it, // websocket has everything you need to authenticate the user BUT if it's necessary // then you use it to receive user information, for example: from headers Context() context.Context - - // OnDisconnect registers a callback which is fired when this connection is closed by an error or manual - OnDisconnect(DisconnectFunc) - // OnError registers a callback which fires when this connection occurs an error - OnError(ErrorFunc) - // OnPing registers a callback which fires on each ping - OnPing(PingFunc) - // OnPong registers a callback which fires on pong message received - OnPong(PongFunc) - // FireOnError can be used to send a custom error message to the connection - // - // It does nothing more than firing the OnError listeners. It doesn't send anything to the client. - FireOnError(err error) // To defines on what "room" (see Join) the server should send a message // returns an Emmiter(`EmitMessage` & `Emit`) to send messages. To(string) Emitter - // OnMessage registers a callback which fires when native websocket message received - OnMessage(NativeMessageFunc) - // On registers a callback to a particular event which is fired when a message to this event is received - On(string, MessageFunc) // Join registers this connection to a room, if it doesn't exist then it creates a new. One room can have one or more connections. One connection can be joined to many rooms. All connections are joined to a room specified by their `ID` automatically. Join(string) // IsJoined returns true when this connection is joined to the room, otherwise false. @@ -201,9 +201,6 @@ type ( // after the "On" events IF server's `Upgrade` is used, // otherise you don't have to call it because the `Handler()` does it automatically. Wait() - // Disconnect disconnects the client, close the underline websocket conn and removes it from the conn list - // returns the error, if any, from the underline connection - Disconnect() error // SetValue sets a key-value pair on the connection's mem store. SetValue(key string, value interface{}) // GetValue gets a value by its key from the connection's mem store. @@ -216,20 +213,51 @@ type ( GetValueInt(key string) int } + // ClientConnection is the client-side connection interface. Server shares some of its methods but the underline actions differs. + ClientConnection interface { + Emitter + // Write writes a raw websocket message with a specific type to the client + // used by ping messages and any CloseMessage types. + Write(websocketMessageType int, data []byte) error + // OnMessage registers a callback which fires when native websocket message received + OnMessage(NativeMessageFunc) + // On registers a callback to a particular event which is fired when a message to this event is received + On(string, MessageFunc) + // OnError registers a callback which fires when this connection occurs an error + OnError(ErrorFunc) + // OnPing registers a callback which fires on each ping + OnPing(PingFunc) + // OnPong registers a callback which fires on pong message received + OnPong(PongFunc) + // FireOnError can be used to send a custom error message to the connection + // + // It does nothing more than firing the OnError listeners. It doesn't send anything to the client. + FireOnError(err error) + // OnDisconnect registers a callback which is fired when this connection is closed by an error or manual + OnDisconnect(DisconnectFunc) + // Disconnect disconnects the client, close the underline websocket conn and removes it from the conn list + // returns the error, if any, from the underline connection + Disconnect() error + } + connection struct { - err error - underline UnderlineConnection - id string - messageType int - disconnected bool - onDisconnectListeners []DisconnectFunc - onRoomLeaveListeners []LeaveRoomFunc + err error + underline UnderlineConnection + config ConnectionConfig + defaultMessageType int + serializer *messageSerializer + id string + onErrorListeners []ErrorFunc onPingListeners []PingFunc onPongListeners []PongFunc onNativeMessageListeners []NativeMessageFunc onEventListeners map[string][]MessageFunc - started bool + onRoomLeaveListeners []LeaveRoomFunc + onDisconnectListeners []DisconnectFunc + disconnected uint32 + + started bool // these were maden for performance only self Emitter // pre-defined emitter than sends message to its self client broadcast Emitter // pre-defined emitter that sends message to all except this @@ -250,33 +278,49 @@ type ( var _ Connection = &connection{} -// CloseMessage denotes a close control message. The optional message -// payload contains a numeric code and text. Use the FormatCloseMessage -// function to format a close message payload. -// -// Use the `Connection#Disconnect` instead. -const CloseMessage = websocket.CloseMessage - -func newConnection(ctx context.Context, s *Server, underlineConn UnderlineConnection, id string) *connection { +func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection { + cfg = cfg.Validate() c := &connection{ underline: underlineConn, - id: id, - messageType: websocket.TextMessage, - onDisconnectListeners: make([]DisconnectFunc, 0), - onRoomLeaveListeners: make([]LeaveRoomFunc, 0), + config: cfg, + serializer: newMessageSerializer(cfg.EvtMessagePrefix), + defaultMessageType: websocket.TextMessage, onErrorListeners: make([]ErrorFunc, 0), + onPingListeners: make([]PingFunc, 0), + onPongListeners: make([]PongFunc, 0), onNativeMessageListeners: make([]NativeMessageFunc, 0), onEventListeners: make(map[string][]MessageFunc, 0), - onPongListeners: make([]PongFunc, 0), - started: false, - ctx: ctx, - server: s, + onDisconnectListeners: make([]DisconnectFunc, 0), + disconnected: 0, } - if s.config.BinaryMessages { - c.messageType = websocket.BinaryMessage + if cfg.BinaryMessages { + c.defaultMessageType = websocket.BinaryMessage } + return c +} + +func newServerConnection(ctx context.Context, s *Server, underlineConn UnderlineConnection, id string) *connection { + c := newConnection(underlineConn, ConnectionConfig{ + EvtMessagePrefix: s.config.EvtMessagePrefix, + WriteTimeout: s.config.WriteTimeout, + ReadTimeout: s.config.ReadTimeout, + PongTimeout: s.config.PongTimeout, + PingPeriod: s.config.PingPeriod, + MaxMessageSize: s.config.MaxMessageSize, + BinaryMessages: s.config.BinaryMessages, + ReadBufferSize: s.config.ReadBufferSize, + WriteBufferSize: s.config.WriteBufferSize, + EnableCompression: s.config.EnableCompression, + }) + + c.id = id + c.server = s + c.ctx = ctx + c.onRoomLeaveListeners = make([]LeaveRoomFunc, 0) + c.started = false + c.self = newEmitter(c, c.id) c.broadcast = newEmitter(c, Broadcast) c.all = newEmitter(c, All) @@ -295,7 +339,7 @@ func (c *connection) Write(websocketMessageType int, data []byte) error { // for any-case the app tries to write from different goroutines, // we must protect them because they're reporting that as bug... c.writerMu.Lock() - if writeTimeout := c.server.config.WriteTimeout; writeTimeout > 0 { + if writeTimeout := c.config.WriteTimeout; writeTimeout > 0 { // set the write deadline based on the configuration c.underline.SetWriteDeadline(time.Now().Add(writeTimeout)) } @@ -312,8 +356,8 @@ func (c *connection) Write(websocketMessageType int, data []byte) error { // writeDefault is the same as write but the message type is the configured by c.messageType // if BinaryMessages is enabled then it's raw []byte as you expected to work with protobufs -func (c *connection) writeDefault(data []byte) { - c.Write(c.messageType, data) +func (c *connection) writeDefault(data []byte) error { + return c.Write(c.defaultMessageType, data) } const ( @@ -342,8 +386,8 @@ func (c *connection) startPinger() { go func() { for { // using sleep avoids the ticker error that causes a memory leak - time.Sleep(c.server.config.PingPeriod) - if c.disconnected { + time.Sleep(c.config.PingPeriod) + if atomic.LoadUint32(&c.disconnected) > 0 { // verifies if already disconected break } @@ -375,12 +419,12 @@ func (c *connection) fireOnPong() { func (c *connection) startReader() { conn := c.underline - hasReadTimeout := c.server.config.ReadTimeout > 0 + hasReadTimeout := c.config.ReadTimeout > 0 - conn.SetReadLimit(c.server.config.MaxMessageSize) + conn.SetReadLimit(c.config.MaxMessageSize) conn.SetPongHandler(func(s string) error { if hasReadTimeout { - conn.SetReadDeadline(time.Now().Add(c.server.config.ReadTimeout)) + conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout)) } //fire all OnPong methods go c.fireOnPong() @@ -395,7 +439,7 @@ func (c *connection) startReader() { for { if hasReadTimeout { // set the read deadline based on the configuration - conn.SetReadDeadline(time.Now().Add(c.server.config.ReadTimeout)) + conn.SetReadDeadline(time.Now().Add(c.config.ReadTimeout)) } _, data, err := conn.ReadMessage() @@ -415,15 +459,15 @@ func (c *connection) startReader() { // messageReceived checks the incoming message and fire the nativeMessage listeners or the event listeners (ws custom message) func (c *connection) messageReceived(data []byte) { - if bytes.HasPrefix(data, c.server.config.EvtMessagePrefix) { + if bytes.HasPrefix(data, c.config.EvtMessagePrefix) { //it's a custom ws message - receivedEvt := c.server.messageSerializer.getWebsocketCustomEvent(data) + receivedEvt := c.serializer.getWebsocketCustomEvent(data) listeners, ok := c.onEventListeners[string(receivedEvt)] if !ok || len(listeners) == 0 { return // if not listeners for this event exit from here } - customMessage, err := c.server.messageSerializer.deserialize(receivedEvt, data) + customMessage, err := c.serializer.deserialize(receivedEvt, data) if customMessage == nil || err != nil { return } @@ -518,11 +562,23 @@ func (c *connection) To(to string) Emitter { } func (c *connection) EmitMessage(nativeMessage []byte) error { - return c.self.EmitMessage(nativeMessage) + if c.server != nil { + return c.self.EmitMessage(nativeMessage) + } + return c.writeDefault(nativeMessage) } func (c *connection) Emit(event string, message interface{}) error { - return c.self.Emit(event, message) + if c.server != nil { + return c.self.Emit(event, message) + } + + b, err := c.serializer.serialize(event, message) + if err != nil { + return err + } + + return c.EmitMessage(b) } func (c *connection) OnMessage(cb NativeMessageFunc) { @@ -586,10 +642,24 @@ func (c *connection) Wait() { var ErrAlreadyDisconnected = errors.New("already disconnected") func (c *connection) Disconnect() error { - if c == nil || c.disconnected { + if c == nil || !atomic.CompareAndSwapUint32(&c.disconnected, 0, 1) { return ErrAlreadyDisconnected } - return c.server.Disconnect(c.ID()) + + if c.server != nil { + return c.server.Disconnect(c.ID()) + } + + err := c.underline.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + err = c.underline.Close() + } + + if err == nil { + c.fireDisconnect() + } + + return err } // mem per-conn store @@ -632,3 +702,108 @@ func (c *connection) GetValueInt(key string) int { } return 0 } + +// ConnectionConfig is the base configuration for both server and client connections. +// Clients must use `ConnectionConfig` in order to `Dial`, server's connection configuration is set by the `Config` structure. +type ConnectionConfig struct { + // EvtMessagePrefix is the prefix of the underline websocket events that are being established under the hoods. + // This prefix is visible only to the javascript side (code) and it has nothing to do + // with the message that the end-user receives. + // Do not change it unless it is absolutely necessary. + // + // If empty then defaults to []byte("iris-websocket-message:"). + // Should match with the server's EvtMessagePrefix. + EvtMessagePrefix []byte + // WriteTimeout time allowed to write a message to the connection. + // 0 means no timeout. + // Default value is 0 + WriteTimeout time.Duration + // ReadTimeout time allowed to read a message from the connection. + // 0 means no timeout. + // Default value is 0 + ReadTimeout time.Duration + // PongTimeout allowed to read the next pong message from the connection. + // Default value is 60 * time.Second + PongTimeout time.Duration + // PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout. + // Default value is 60 *time.Second + PingPeriod time.Duration + // MaxMessageSize max message size allowed from connection. + // Default value is 1024 + MaxMessageSize int64 + // BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text + // compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication. + // Default value is false + BinaryMessages bool + // ReadBufferSize is the buffer size for the connection reader. + // Default value is 4096 + ReadBufferSize int + // WriteBufferSize is the buffer size for the connection writer. + // Default value is 4096 + WriteBufferSize int + // EnableCompression specify if the server should attempt to negotiate per + // message compression (RFC 7692). Setting this value to true does not + // guarantee that compression will be supported. Currently only "no context + // takeover" modes are supported. + // + // Defaults to false and it should be remain as it is, unless special requirements. + EnableCompression bool +} + +// Validate validates the connection configuration. +func (c ConnectionConfig) Validate() ConnectionConfig { + if len(c.EvtMessagePrefix) == 0 { + c.EvtMessagePrefix = []byte(DefaultEvtMessageKey) + } + + // 0 means no timeout. + if c.WriteTimeout < 0 { + c.WriteTimeout = DefaultWebsocketWriteTimeout + } + + if c.ReadTimeout < 0 { + c.ReadTimeout = DefaultWebsocketReadTimeout + } + + if c.PongTimeout < 0 { + c.PongTimeout = DefaultWebsocketPongTimeout + } + + if c.PingPeriod <= 0 { + c.PingPeriod = DefaultWebsocketPingPeriod + } + + if c.MaxMessageSize <= 0 { + c.MaxMessageSize = DefaultWebsocketMaxMessageSize + } + + if c.ReadBufferSize <= 0 { + c.ReadBufferSize = DefaultWebsocketReadBufferSize + } + + if c.WriteBufferSize <= 0 { + c.WriteBufferSize = DefaultWebsocketWriterBufferSize + } + + return c +} + +// Dial opens a new client connection to a WebSocket. +// The "url" input parameter is the url to connect to the server, it should be +// the ws:// (or wss:// if secure) + the host + the endpoint of the +// open socket of the server, i.e ws://localhost:8080/my_websocket_endpoint. +func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) { + if !strings.HasPrefix(url, "ws://") { + url = "ws://" + url + } + + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + clientConn := newConnection(conn, cfg) + go clientConn.Wait() + + return clientConn, nil +} diff --git a/websocket/server.go b/websocket/server.go index 9bf323ad..1de429da 100644 --- a/websocket/server.go +++ b/websocket/server.go @@ -3,6 +3,7 @@ package websocket import ( "bytes" "sync" + "sync/atomic" "github.com/kataras/iris/context" @@ -149,7 +150,7 @@ func (s *Server) handleConnection(ctx context.Context, websocketConn UnderlineCo // use the config's id generator (or the default) to create a websocket client/connection id cid := s.config.IDGenerator(ctx) // create the new connection - c := newConnection(ctx, s, websocketConn, cid) + c := newServerConnection(ctx, s, websocketConn, cid) // add the connection to the Server's list s.addConnection(c) @@ -397,7 +398,7 @@ func (s *Server) Disconnect(connID string) (err error) { // remove the connection from the list. if conn, ok := s.getConnection(connID); ok { - conn.disconnected = true + atomic.StoreUint32(&conn.disconnected, 1) // fire the disconnect callbacks, if any. conn.fireDisconnect() // close the underline connection and return its error, if any.