From 07994adabb0b26b75263687548d3f5ca7e8022a0 Mon Sep 17 00:00:00 2001 From: "Gerasimos (Makis) Maropoulos" Date: Thu, 14 Feb 2019 03:28:41 +0200 Subject: [PATCH] add websocket client stress test, passed and update the vendors (this commit fixes the https://github.com/kataras/iris/issues/1178 and https://github.com/kataras/iris/issues/1173) Former-commit-id: 74ccd8f4bf60a71f1eb0e34149a6f19de95a9148 --- .../go-client-stress-test/client/main.go | 85 +++++++++++++++++ .../go-client-stress-test/client/test.data | 19 ++++ .../go-client-stress-test/server/main.go | 64 +++++++++++++ _examples/websocket/go-client/client/main.go | 1 + websocket/config.go | 44 ++++----- websocket/connection.go | 92 +++++++++++-------- websocket/emitter.go | 2 +- websocket/message.go | 3 +- websocket/server.go | 2 - websocket/websocket.go | 5 - 10 files changed, 247 insertions(+), 70 deletions(-) create mode 100644 _examples/websocket/go-client-stress-test/client/main.go create mode 100644 _examples/websocket/go-client-stress-test/client/test.data create mode 100644 _examples/websocket/go-client-stress-test/server/main.go diff --git a/_examples/websocket/go-client-stress-test/client/main.go b/_examples/websocket/go-client-stress-test/client/main.go new file mode 100644 index 00000000..7d49a1cf --- /dev/null +++ b/_examples/websocket/go-client-stress-test/client/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "bufio" + "fmt" + "math/rand" + "os" + "sync" + "time" + + "github.com/kataras/iris/websocket" +) + +var ( + url = "ws://localhost:8080/socket" + f *os.File +) + +const totalClients = 1200 + +func main() { + var err error + f, err = os.Open("./test.data") + if err != nil { + panic(err) + } + defer f.Close() + + wg := new(sync.WaitGroup) + for i := 0; i < totalClients/2; i++ { + wg.Add(1) + go connect(wg, 5*time.Second) + } + + for i := 0; i < totalClients/2; i++ { + wg.Add(1) + waitTime := time.Duration(rand.Intn(10)) * time.Millisecond + time.Sleep(waitTime) + go connect(wg, 10*time.Second+waitTime) + } + + wg.Wait() + fmt.Println("ALL OK.") + time.Sleep(5 * time.Second) +} + +func connect(wg *sync.WaitGroup, alive time.Duration) { + + c, err := websocket.Dial(url, websocket.ConnectionConfig{}) + if err != nil { + panic(err) + } + + c.OnError(func(err error) { + fmt.Printf("error: %v", err) + }) + + disconnected := false + c.OnDisconnect(func() { + fmt.Printf("I am disconnected after [%s].\n", alive) + disconnected = true + }) + + c.On("chat", func(message string) { + fmt.Printf("\n%s\n", message) + }) + + go func() { + time.Sleep(alive) + if err := c.Disconnect(); err != nil { + panic(err) + } + + wg.Done() + }() + + scanner := bufio.NewScanner(f) + for !disconnected { + if !scanner.Scan() || scanner.Err() != nil { + break + } + + c.Emit("chat", scanner.Text()) + } +} diff --git a/_examples/websocket/go-client-stress-test/client/test.data b/_examples/websocket/go-client-stress-test/client/test.data new file mode 100644 index 00000000..be5eb502 --- /dev/null +++ b/_examples/websocket/go-client-stress-test/client/test.data @@ -0,0 +1,19 @@ +Σκουπίζει τη τι αρματωσιά ευρυδίκης κι αποδεχθεί αν εχτύπεσεν. Οι το ζητούσε δεκτικό αφήσουν μπράτσο βλ απ. Φυγή τι έτσι εκ πλάι αυτή θεός ας αδάμ. Αποβαίνει να τι βλ κατάγεται γεγονότος. Μπουφάν ξάπλωσε σχέσεις βλ ας να να. Υποδηλώσει τα τι κι σιδερένιων εξελικτική ως συγκράτησε παιγνιώδης. Προφανώς μου μία σύγχρονο ιστορίας. + +Νερά ψηλά λύπη αφτί ας ψυχή τι λόγω. Του φίλ διά γεφυρώνει ανίχνευση διεύρυνση. Όλο μήπως τομέα πρώτο στους δις νόημα εάν του. Παιδείας ομορφιάς καλύτερα ας με. Παραγωγή προθέαση σε κουλιζάρ παραπάνω υπ. Πώς δικούς στήθος πόντου πως θέατρο θέληση σίδερο. Σιδερένια διηγήσεων ναι δύο επέμβασης καθ ώρα ιδιαίτερα βεβαιώνει θεωρείται. Βλ νερο τη να ύλης μτφρ τέλη. Ας ρόλων τη χώρων υπ αφορά είδος είπεν. + +Ου πάρκαρε παιδικό μάλιστα ιι. Σκοτωθεί απαγωγής ανάλυσης άνθρωποι ιι τραγικού οι. Αναπνοή επέλεξα πομπούς εφ δράσεις να. Νε υλικό ας ως ευρήκ νόρμα ου. Ιι εμάζεψα δεύτερη αλλαγές ατ τα σύζευξη επίπεδο. Συγγραφέα νεότερους κατέγραψε ζωή διά υφολογική. Που απέσ νου στον άρα είδη σούκ νικά ήρωά. Το κανένα τι ιι γωνίας να δεσμός. + +Ροή ρευστότητα στο έλα παραμυθιού διαδικασία ειδυλλιακή. Ελλάδας σύμβαση δε με πομπούς εμφανής. Ατ ως εποχή τρόπο εβγάλ αυτές πεδίο γωνία. Των άνθρωπος μπανιέρα ροζ υφίστατο φίλ. Εδώ ροζ πήρε τύπο πια μην δική. Έζησαν μάλλον ως με δε τρόπου. Παράλληλη από αδιόρατης επισκίασε άρα rites ναι. Πολιτισμού του ειδολογική νέο συνάντησης στα ταυτότητας δημοσίευση. + +Παραλλαγές τόζλουτζας κι ατ συγγραφέας παρωδώντας συνείδησης να. Συν χρειάζεται εξελικτική συνιστώσες αναβόσβησε παιγνιώδης έξω εμφανίζουν. Περίτεχνο κοινωνίας ρου του ηθελημένα την σύγχρονων ζώγ. Συν στα υποτίθεται εις ανακάλυψης νέο κατασκευές. Τεκμήρια επίλογοι περίοδος σου εξω στα αγγελίες ποικίλες. Γι παραμένει συμβάντος ακολούθως δε κι να υπόστρωμα. Τη θάρρος θεϊκού να μικρές αηδίες σοφίας πρέπει. Γιατί ευρήκ σοφία αίσια και όνομά για επικό την. Έστειλεν οι σύνδρομο αληθινής κι με εξυπνάδα υπέδειξε αδειάσει. + +Πω δε φοβηθώ ας σε μιχάλη ακουσε όμορφα εφόδιο. Ελέγχοντας διαχείριση όλα αναπαράγει συν στα εάν. Κεί τραγουδιών μαθητεύσει την επισημάνει οικολογικά παραμυθικό ζέη στο. Λαϊκού ατότες εξω μια την ακούμε. Δεδομένου ας τα αγαπημένο παρουσίας διαθέσεων αν. Αντίστροφα ρεαλιστικό περιπέτεια διαδικασία άρα ατο ημερολόγια. + +Άντλησης νεόφερτο μοναδικό εκ ιι δυναμική μηνύματα. Συγγραφική προ έξω την περιπέτεια εγχειρίδια μαθητεύσει εκφέρονται. Εάν δεν μαρί άρα ήχοι ατο κόρη. Εν ας επομένως κινήματα άνθρωποι. Ου δάσος τι υπ γιατί πόνος όποια αυτός. Δεύτερη δέχεται το χρονικά αχιλλέα μη. Τα απαγωγή ου ακριβώς θηλάσει. Οι παραγωγή τα παιγνίδι απ παιδικών τρομάξει. + +Ιστορικά ανθρώπου οπλιστεί εκκίνηση στα μερ χάσματος αργότερα. Κοινωνία επιδίωξη κοιμάται πια πειστική διά πιο απ΄. Εκ οι εύκολα γονείς σύζυγο κι πολλοί με φυσερά. Εκ τα μέτωπο το κύματα δηλαδή όμορφα φανερό πράγμα. Νωρίτερα ομορφιάς διαμέσου ζώγ ανέδειξε υπό πρόσμιξη. Επιδιώκει τις όλη μια βεβαιώνει μελετηθεί μία. Παρατηρεί υιοθετούν ροή ανθρώπινη τον επέστρεψε κατασκευή πια. + +Αναπνοή επί νυχτικά εις σηκώνει τράβηξε γερανού χάλκενα. Αν αδειάσει ποικίλες νε δυναμικό. Όπως δύο αυτό ένα δέβα αυτο από νέοι πάλι. Έως υποβάλλουν αποτέλεσμα εξω σην συγχρονική μεσημεριού. Όλα νέο νου εναντίον σκέπαζαν τον διδάσκει σπουδαίο. Ακόμη πι ως έργου σοφοί δε τα. Σώματος απόλυτα εν τέτοιες διάφορα ατ πι τι. Ως ατ κοινού έμαθες πλάκες. Τα τη συνοχή έκρυβε οποίος σταθεί παίκτη. + +Με σύγχρονης βρίσκεται αποτέλεσε πα τα ελληνικής. Ανακοίνωσή τις στο ουσιαστικό πολλαπλούς τις φιλολογική σου. Φιλολογική να κι κι μορφολογία μυθοπλασία πω. Τυχόν βαθιά ου λόγια έχουν να. Μικρούς έχοντας με χαμένης τη μη. Μοντέλα συνήθως επί θεωρίες χρονικά όλα χάλκενα. Διήγημα θεωρίας ατ βαγγέλη βλ νε αρ ευτελής μαγείας. diff --git a/_examples/websocket/go-client-stress-test/server/main.go b/_examples/websocket/go-client-stress-test/server/main.go new file mode 100644 index 00000000..0519323a --- /dev/null +++ b/_examples/websocket/go-client-stress-test/server/main.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/kataras/iris" + "github.com/kataras/iris/websocket" +) + +const totalClients = 1200 + +func main() { + app := iris.New() + + // websocket.Config{PingPeriod: ((60 * time.Second) * 9) / 10} + ws := websocket.New(websocket.Config{}) + ws.OnConnection(handleConnection) + app.Get("/socket", ws.Handler()) + + go func() { + t := time.NewTicker(2 * time.Second) + for { + <-t.C + + conns := ws.GetConnections() + for _, conn := range conns { + // fmt.Println(conn.ID()) + // Do nothing. + _ = conn + } + + if atomic.LoadUint64(&count) == totalClients { + fmt.Println("ALL CLIENTS DISCONNECTED SUCCESSFULLY.") + t.Stop() + os.Exit(0) + return + } + } + }() + + app.Run(iris.Addr(":8080")) +} + +func handleConnection(c websocket.Connection) { + c.OnError(func(err error) { handleErr(c, err) }) + c.OnDisconnect(func() { handleDisconnect(c) }) + c.On("chat", func(message string) { + c.To(websocket.Broadcast).Emit("chat", c.ID()+": "+message) + }) +} + +var count uint64 + +func handleDisconnect(c websocket.Connection) { + atomic.AddUint64(&count, 1) + fmt.Printf("client [%s] disconnected!\n", c.ID()) +} + +func handleErr(c websocket.Connection, err error) { + fmt.Printf("client [%s] errored: %v\n", c.ID(), err) +} diff --git a/_examples/websocket/go-client/client/main.go b/_examples/websocket/go-client/client/main.go index f89f1803..2d1ded6d 100644 --- a/_examples/websocket/go-client/client/main.go +++ b/_examples/websocket/go-client/client/main.go @@ -21,6 +21,7 @@ $ go run main.go >> hi! */ func main() { + // `websocket.DialContext` is also available. c, err := websocket.Dial(url, websocket.ConnectionConfig{}) if err != nil { panic(err) diff --git a/websocket/config.go b/websocket/config.go index 68be4f75..145ea2a6 100644 --- a/websocket/config.go +++ b/websocket/config.go @@ -15,16 +15,15 @@ const ( DefaultWebsocketWriteTimeout = 0 // DefaultWebsocketReadTimeout 0, no timeout DefaultWebsocketReadTimeout = 0 - // DefaultWebsocketPongTimeout 60 * time.Second - DefaultWebsocketPongTimeout = 60 * time.Second - // DefaultWebsocketPingPeriod (DefaultPongTimeout * 9) / 10 - DefaultWebsocketPingPeriod = (DefaultWebsocketPongTimeout * 9) / 10 - // DefaultWebsocketMaxMessageSize 1024 - DefaultWebsocketMaxMessageSize = 1024 - // DefaultWebsocketReadBufferSize 4096 - DefaultWebsocketReadBufferSize = 4096 - // DefaultWebsocketWriterBufferSize 4096 - DefaultWebsocketWriterBufferSize = 4096 + // DefaultWebsocketPingPeriod is 0 but + // could be 10 * time.Second. + DefaultWebsocketPingPeriod = 0 + // DefaultWebsocketMaxMessageSize 0 + DefaultWebsocketMaxMessageSize = 0 + // DefaultWebsocketReadBufferSize 0 + DefaultWebsocketReadBufferSize = 0 + // DefaultWebsocketWriterBufferSize 0 + DefaultWebsocketWriterBufferSize = 0 // DefaultEvtMessageKey is the default prefix of the underline websocket events // that are being established under the hoods. // @@ -76,11 +75,9 @@ type Config struct { // 0 means no timeout. // Default value is 0 ReadTimeout time.Duration - // PongTimeout allowed to read the next pong message from the connection. - // Default value is 60 * time.Second - PongTimeout time.Duration - // PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout. - // Default value is 60 *time.Second + // PingPeriod send ping messages to the connection repeatedly after this period. + // The value should be close to the ReadTimeout to avoid issues. + // Default value is 0. PingPeriod time.Duration // MaxMessageSize max message size allowed from connection. // Default value is 1024 @@ -89,12 +86,13 @@ type Config struct { // compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication. // Default value is false BinaryMessages bool - // ReadBufferSize is the buffer size for the connection reader. - // Default value is 4096 - ReadBufferSize int - // WriteBufferSize is the buffer size for the connection writer. - // Default value is 4096 - WriteBufferSize int + // ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer + // size is zero, then buffers allocated by the HTTP server are used. The + // I/O buffer sizes do not limit the size of the messages that can be sent + // or received. + // + // Default value is 0. + ReadBufferSize, WriteBufferSize int // EnableCompression specify if the server should attempt to negotiate per // message compression (RFC 7692). Setting this value to true does not // guarantee that compression will be supported. Currently only "no context @@ -121,10 +119,6 @@ func (c Config) Validate() Config { c.ReadTimeout = DefaultWebsocketReadTimeout } - if c.PongTimeout < 0 { - c.PongTimeout = DefaultWebsocketPongTimeout - } - if c.PingPeriod <= 0 { c.PingPeriod = DefaultWebsocketPingPeriod } diff --git a/websocket/connection.go b/websocket/connection.go index 821d812b..b2b7afab 100644 --- a/websocket/connection.go +++ b/websocket/connection.go @@ -2,6 +2,7 @@ package websocket import ( "bytes" + stdContext "context" "errors" "io" "net" @@ -278,6 +279,12 @@ type ( var _ Connection = &connection{} +// WrapConnection wraps the underline websocket connection into a new iris websocket connection. +// The caller should call the `connection#Wait` (which blocks) to enable its read and write functionality. +func WrapConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) Connection { + return newConnection(underlineConn, cfg) +} + func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection { cfg = cfg.Validate() c := &connection{ @@ -306,7 +313,6 @@ func newServerConnection(ctx context.Context, s *Server, underlineConn Underline EvtMessagePrefix: s.config.EvtMessagePrefix, WriteTimeout: s.config.WriteTimeout, ReadTimeout: s.config.ReadTimeout, - PongTimeout: s.config.PongTimeout, PingPeriod: s.config.PingPeriod, MaxMessageSize: s.config.MaxMessageSize, BinaryMessages: s.config.BinaryMessages, @@ -383,24 +389,25 @@ func (c *connection) startPinger() { c.underline.SetPingHandler(pingHandler) - go func() { - for { - // using sleep avoids the ticker error that causes a memory leak - time.Sleep(c.config.PingPeriod) - if atomic.LoadUint32(&c.disconnected) > 0 { - // verifies if already disconected - break + if c.config.PingPeriod > 0 { + go func() { + for { + time.Sleep(c.config.PingPeriod) + if c == nil || atomic.LoadUint32(&c.disconnected) > 0 { + // verifies if already disconected. + return + } + //fire all OnPing methods + c.fireOnPing() + // try to ping the client, if failed then it disconnects. + err := c.Write(websocket.PingMessage, []byte{}) + if err != nil { + // must stop to exit the loop and exit from the routine. + return + } } - //fire all OnPing methods - c.fireOnPing() - // try to ping the client, if failed then it disconnects - err := c.Write(websocket.PingMessage, []byte{}) - if err != nil { - // must stop to exit the loop and finish the go routine - break - } - } - }() + }() + } } func (c *connection) fireOnPing() { @@ -444,14 +451,13 @@ func (c *connection) startReader() { _, data, err := conn.ReadMessage() if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) { c.FireOnError(err) } - break - } else { - c.messageReceived(data) + return } + c.messageReceived(data) } } @@ -722,14 +728,12 @@ type ConnectionConfig struct { // 0 means no timeout. // Default value is 0 ReadTimeout time.Duration - // PongTimeout allowed to read the next pong message from the connection. - // Default value is 60 * time.Second - PongTimeout time.Duration - // PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout. - // Default value is 60 *time.Second + // PingPeriod send ping messages to the connection repeatedly after this period. + // The value should be close to the ReadTimeout to avoid issues. + // Default value is 0 PingPeriod time.Duration // MaxMessageSize max message size allowed from connection. - // Default value is 1024 + // Default value is 0. Unlimited but it is recommended to be 1024 for medium to large messages. MaxMessageSize int64 // BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text // compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication. @@ -765,10 +769,6 @@ func (c ConnectionConfig) Validate() ConnectionConfig { c.ReadTimeout = DefaultWebsocketReadTimeout } - if c.PongTimeout < 0 { - c.PongTimeout = DefaultWebsocketPongTimeout - } - if c.PingPeriod <= 0 { c.PingPeriod = DefaultWebsocketPingPeriod } @@ -788,22 +788,42 @@ func (c ConnectionConfig) Validate() ConnectionConfig { return c } -// Dial opens a new client connection to a WebSocket. +// ErrBadHandshake is returned when the server response to opening handshake is +// invalid. +var ErrBadHandshake = websocket.ErrBadHandshake + +// DialContext creates a new client connection. +// +// The context will be used in the request and in the Dialer. +// +// If the WebSocket handshake fails, `ErrBadHandshake` is returned. +// // The "url" input parameter is the url to connect to the server, it should be // the ws:// (or wss:// if secure) + the host + the endpoint of the // open socket of the server, i.e ws://localhost:8080/my_websocket_endpoint. -func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) { +// +// Custom dialers can be used by wrapping the iris websocket connection via `websocket.WrapConnection`. +func DialContext(ctx stdContext.Context, url string, cfg ConnectionConfig) (ClientConnection, error) { + if ctx == nil { + ctx = stdContext.Background() + } + if !strings.HasPrefix(url, "ws://") { url = "ws://" + url } - conn, _, err := websocket.DefaultDialer.Dial(url, nil) + conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil) if err != nil { return nil, err } - clientConn := newConnection(conn, cfg) + clientConn := WrapConnection(conn, cfg) go clientConn.Wait() return clientConn, nil } + +// Dial creates a new client connection by calling `DialContext` with a background context. +func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) { + return DialContext(stdContext.Background(), url, cfg) +} diff --git a/websocket/emitter.go b/websocket/emitter.go index d8ae087d..84d1fa48 100644 --- a/websocket/emitter.go +++ b/websocket/emitter.go @@ -34,7 +34,7 @@ func (e *emitter) EmitMessage(nativeMessage []byte) error { } func (e *emitter) Emit(event string, data interface{}) error { - message, err := e.conn.server.messageSerializer.serialize(event, data) + message, err := e.conn.serializer.serialize(event, data) if err != nil { return err } diff --git a/websocket/message.go b/websocket/message.go index 804baa3c..6b27fbee 100644 --- a/websocket/message.go +++ b/websocket/message.go @@ -81,7 +81,7 @@ var ( ) // websocketMessageSerialize serializes a custom websocket message from websocketServer to be delivered to the client -// returns the string form of the message +// returns the string form of the message // Supported data types are: string, int, bool, bytes and JSON. func (ms *messageSerializer) serialize(event string, data interface{}) ([]byte, error) { b := ms.buf.Get() @@ -114,6 +114,7 @@ func (ms *messageSerializer) serialize(event string, data interface{}) ([]byte, //we suppose is json res, err := json.Marshal(data) if err != nil { + ms.buf.Put(b) return nil, err } b.WriteString(messageTypeJSON.String()) diff --git a/websocket/server.go b/websocket/server.go index 1de429da..6b9b6130 100644 --- a/websocket/server.go +++ b/websocket/server.go @@ -44,7 +44,6 @@ type ( // Use a route to serve this file on a specific path, i.e // app.Any("/iris-ws.js", func(ctx iris.Context) { ctx.Write(mywebsocketServer.ClientSource) }) ClientSource []byte - messageSerializer *messageSerializer connections map[string]*connection // key = the Connection ID. rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name mu sync.RWMutex // for rooms and connections. @@ -64,7 +63,6 @@ func New(cfg Config) *Server { return &Server{ config: cfg, ClientSource: bytes.Replace(ClientSource, []byte(DefaultEvtMessageKey), cfg.EvtMessagePrefix, -1), - messageSerializer: newMessageSerializer(cfg.EvtMessagePrefix), connections: make(map[string]*connection), rooms: make(map[string][]string), onConnectionListeners: make([]ConnectionFunc, 0), diff --git a/websocket/websocket.go b/websocket/websocket.go index 9cf4fdf0..1792e0dc 100644 --- a/websocket/websocket.go +++ b/websocket/websocket.go @@ -4,11 +4,6 @@ Source code and other details for the project are available at GitHub: https://github.com/kataras/iris/tree/master/websocket -Installation - - $ go get -u github.com/kataras/iris/websocket - - Example code: