add websocket client stress test, passed and update the vendors (this commit fixes the https://github.com/kataras/iris/issues/1178 and https://github.com/kataras/iris/issues/1173)

Former-commit-id: 74ccd8f4bf60a71f1eb0e34149a6f19de95a9148
This commit is contained in:
Gerasimos (Makis) Maropoulos 2019-02-14 03:28:41 +02:00
parent 946c100f7d
commit 07994adabb
10 changed files with 247 additions and 70 deletions

View File

@ -0,0 +1,85 @@
package main
import (
"bufio"
"fmt"
"math/rand"
"os"
"sync"
"time"
"github.com/kataras/iris/websocket"
)
var (
url = "ws://localhost:8080/socket"
f *os.File
)
const totalClients = 1200
func main() {
var err error
f, err = os.Open("./test.data")
if err != nil {
panic(err)
}
defer f.Close()
wg := new(sync.WaitGroup)
for i := 0; i < totalClients/2; i++ {
wg.Add(1)
go connect(wg, 5*time.Second)
}
for i := 0; i < totalClients/2; i++ {
wg.Add(1)
waitTime := time.Duration(rand.Intn(10)) * time.Millisecond
time.Sleep(waitTime)
go connect(wg, 10*time.Second+waitTime)
}
wg.Wait()
fmt.Println("ALL OK.")
time.Sleep(5 * time.Second)
}
func connect(wg *sync.WaitGroup, alive time.Duration) {
c, err := websocket.Dial(url, websocket.ConnectionConfig{})
if err != nil {
panic(err)
}
c.OnError(func(err error) {
fmt.Printf("error: %v", err)
})
disconnected := false
c.OnDisconnect(func() {
fmt.Printf("I am disconnected after [%s].\n", alive)
disconnected = true
})
c.On("chat", func(message string) {
fmt.Printf("\n%s\n", message)
})
go func() {
time.Sleep(alive)
if err := c.Disconnect(); err != nil {
panic(err)
}
wg.Done()
}()
scanner := bufio.NewScanner(f)
for !disconnected {
if !scanner.Scan() || scanner.Err() != nil {
break
}
c.Emit("chat", scanner.Text())
}
}

View File

@ -0,0 +1,19 @@
Σκουπίζει τη τι αρματωσιά ευρυδίκης κι αποδεχθεί αν εχτύπεσεν. Οι το ζητούσε δεκτικό αφήσουν μπράτσο βλ απ. Φυγή τι έτσι εκ πλάι αυτή θεός ας αδάμ. Αποβαίνει να τι βλ κατάγεται γεγονότος. Μπουφάν ξάπλωσε σχέσεις βλ ας να να. Υποδηλώσει τα τι κι σιδερένιων εξελικτική ως συγκράτησε παιγνιώδης. Προφανώς μου μία σύγχρονο ιστορίας.
Νερά ψηλά λύπη αφτί ας ψυχή τι λόγω. Του φίλ διά γεφυρώνει ανίχνευση διεύρυνση. Όλο μήπως τομέα πρώτο στους δις νόημα εάν του. Παιδείας ομορφιάς καλύτερα ας με. Παραγωγή προθέαση σε κουλιζάρ παραπάνω υπ. Πώς δικούς στήθος πόντου πως θέατρο θέληση σίδερο. Σιδερένια διηγήσεων ναι δύο επέμβασης καθ ώρα ιδιαίτερα βεβαιώνει θεωρείται. Βλ νερο τη να ύλης μτφρ τέλη. Ας ρόλων τη χώρων υπ αφορά είδος είπεν.
Ου πάρκαρε παιδικό μάλιστα ιι. Σκοτωθεί απαγωγής ανάλυσης άνθρωποι ιι τραγικού οι. Αναπνοή επέλεξα πομπούς εφ δράσεις να. Νε υλικό ας ως ευρήκ νόρμα ου. Ιι εμάζεψα δεύτερη αλλαγές ατ τα σύζευξη επίπεδο. Συγγραφέα νεότερους κατέγραψε ζωή διά υφολογική. Που απέσ νου στον άρα είδη σούκ νικά ήρωά. Το κανένα τι ιι γωνίας να δεσμός.
Ροή ρευστότητα στο έλα παραμυθιού διαδικασία ειδυλλιακή. Ελλάδας σύμβαση δε με πομπούς εμφανής. Ατ ως εποχή τρόπο εβγάλ αυτές πεδίο γωνία. Των άνθρωπος μπανιέρα ροζ υφίστατο φίλ. Εδώ ροζ πήρε τύπο πια μην δική. Έζησαν μάλλον ως με δε τρόπου. Παράλληλη από αδιόρατης επισκίασε άρα rites ναι. Πολιτισμού του ειδολογική νέο συνάντησης στα ταυτότητας δημοσίευση.
Παραλλαγές τόζλουτζας κι ατ συγγραφέας παρωδώντας συνείδησης να. Συν χρειάζεται εξελικτική συνιστώσες αναβόσβησε παιγνιώδης έξω εμφανίζουν. Περίτεχνο κοινωνίας ρου του ηθελημένα την σύγχρονων ζώγ. Συν στα υποτίθεται εις ανακάλυψης νέο κατασκευές. Τεκμήρια επίλογοι περίοδος σου εξω στα αγγελίες ποικίλες. Γι παραμένει συμβάντος ακολούθως δε κι να υπόστρωμα. Τη θάρρος θεϊκού να μικρές αηδίες σοφίας πρέπει. Γιατί ευρήκ σοφία αίσια και όνομά για επικό την. Έστειλεν οι σύνδρομο αληθινής κι με εξυπνάδα υπέδειξε αδειάσει.
Πω δε φοβηθώ ας σε μιχάλη ακουσε όμορφα εφόδιο. Ελέγχοντας διαχείριση όλα αναπαράγει συν στα εάν. Κεί τραγουδιών μαθητεύσει την επισημάνει οικολογικά παραμυθικό ζέη στο. Λαϊκού ατότες εξω μια την ακούμε. Δεδομένου ας τα αγαπημένο παρουσίας διαθέσεων αν. Αντίστροφα ρεαλιστικό περιπέτεια διαδικασία άρα ατο ημερολόγια.
Άντλησης νεόφερτο μοναδικό εκ ιι δυναμική μηνύματα. Συγγραφική προ έξω την περιπέτεια εγχειρίδια μαθητεύσει εκφέρονται. Εάν δεν μαρί άρα ήχοι ατο κόρη. Εν ας επομένως κινήματα άνθρωποι. Ου δάσος τι υπ γιατί πόνος όποια αυτός. Δεύτερη δέχεται το χρονικά αχιλλέα μη. Τα απαγωγή ου ακριβώς θηλάσει. Οι παραγωγή τα παιγνίδι απ παιδικών τρομάξει.
Ιστορικά ανθρώπου οπλιστεί εκκίνηση στα μερ χάσματος αργότερα. Κοινωνία επιδίωξη κοιμάται πια πειστική διά πιο απ΄. Εκ οι εύκολα γονείς σύζυγο κι πολλοί με φυσερά. Εκ τα μέτωπο το κύματα δηλαδή όμορφα φανερό πράγμα. Νωρίτερα ομορφιάς διαμέσου ζώγ ανέδειξε υπό πρόσμιξη. Επιδιώκει τις όλη μια βεβαιώνει μελετηθεί μία. Παρατηρεί υιοθετούν ροή ανθρώπινη τον επέστρεψε κατασκευή πια.
Αναπνοή επί νυχτικά εις σηκώνει τράβηξε γερανού χάλκενα. Αν αδειάσει ποικίλες νε δυναμικό. Όπως δύο αυτό ένα δέβα αυτο από νέοι πάλι. Έως υποβάλλουν αποτέλεσμα εξω σην συγχρονική μεσημεριού. Όλα νέο νου εναντίον σκέπαζαν τον διδάσκει σπουδαίο. Ακόμη πι ως έργου σοφοί δε τα. Σώματος απόλυτα εν τέτοιες διάφορα ατ πι τι. Ως ατ κοινού έμαθες πλάκες. Τα τη συνοχή έκρυβε οποίος σταθεί παίκτη.
Με σύγχρονης βρίσκεται αποτέλεσε πα τα ελληνικής. Ανακοίνωσή τις στο ουσιαστικό πολλαπλούς τις φιλολογική σου. Φιλολογική να κι κι μορφολογία μυθοπλασία πω. Τυχόν βαθιά ου λόγια έχουν να. Μικρούς έχοντας με χαμένης τη μη. Μοντέλα συνήθως επί θεωρίες χρονικά όλα χάλκενα. Διήγημα θεωρίας ατ βαγγέλη βλ νε αρ ευτελής μαγείας.

View File

@ -0,0 +1,64 @@
package main
import (
"fmt"
"os"
"sync/atomic"
"time"
"github.com/kataras/iris"
"github.com/kataras/iris/websocket"
)
const totalClients = 1200
func main() {
app := iris.New()
// websocket.Config{PingPeriod: ((60 * time.Second) * 9) / 10}
ws := websocket.New(websocket.Config{})
ws.OnConnection(handleConnection)
app.Get("/socket", ws.Handler())
go func() {
t := time.NewTicker(2 * time.Second)
for {
<-t.C
conns := ws.GetConnections()
for _, conn := range conns {
// fmt.Println(conn.ID())
// Do nothing.
_ = conn
}
if atomic.LoadUint64(&count) == totalClients {
fmt.Println("ALL CLIENTS DISCONNECTED SUCCESSFULLY.")
t.Stop()
os.Exit(0)
return
}
}
}()
app.Run(iris.Addr(":8080"))
}
func handleConnection(c websocket.Connection) {
c.OnError(func(err error) { handleErr(c, err) })
c.OnDisconnect(func() { handleDisconnect(c) })
c.On("chat", func(message string) {
c.To(websocket.Broadcast).Emit("chat", c.ID()+": "+message)
})
}
var count uint64
func handleDisconnect(c websocket.Connection) {
atomic.AddUint64(&count, 1)
fmt.Printf("client [%s] disconnected!\n", c.ID())
}
func handleErr(c websocket.Connection, err error) {
fmt.Printf("client [%s] errored: %v\n", c.ID(), err)
}

View File

@ -21,6 +21,7 @@ $ go run main.go
>> hi! >> hi!
*/ */
func main() { func main() {
// `websocket.DialContext` is also available.
c, err := websocket.Dial(url, websocket.ConnectionConfig{}) c, err := websocket.Dial(url, websocket.ConnectionConfig{})
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -15,16 +15,15 @@ const (
DefaultWebsocketWriteTimeout = 0 DefaultWebsocketWriteTimeout = 0
// DefaultWebsocketReadTimeout 0, no timeout // DefaultWebsocketReadTimeout 0, no timeout
DefaultWebsocketReadTimeout = 0 DefaultWebsocketReadTimeout = 0
// DefaultWebsocketPongTimeout 60 * time.Second // DefaultWebsocketPingPeriod is 0 but
DefaultWebsocketPongTimeout = 60 * time.Second // could be 10 * time.Second.
// DefaultWebsocketPingPeriod (DefaultPongTimeout * 9) / 10 DefaultWebsocketPingPeriod = 0
DefaultWebsocketPingPeriod = (DefaultWebsocketPongTimeout * 9) / 10 // DefaultWebsocketMaxMessageSize 0
// DefaultWebsocketMaxMessageSize 1024 DefaultWebsocketMaxMessageSize = 0
DefaultWebsocketMaxMessageSize = 1024 // DefaultWebsocketReadBufferSize 0
// DefaultWebsocketReadBufferSize 4096 DefaultWebsocketReadBufferSize = 0
DefaultWebsocketReadBufferSize = 4096 // DefaultWebsocketWriterBufferSize 0
// DefaultWebsocketWriterBufferSize 4096 DefaultWebsocketWriterBufferSize = 0
DefaultWebsocketWriterBufferSize = 4096
// DefaultEvtMessageKey is the default prefix of the underline websocket events // DefaultEvtMessageKey is the default prefix of the underline websocket events
// that are being established under the hoods. // that are being established under the hoods.
// //
@ -76,11 +75,9 @@ type Config struct {
// 0 means no timeout. // 0 means no timeout.
// Default value is 0 // Default value is 0
ReadTimeout time.Duration ReadTimeout time.Duration
// PongTimeout allowed to read the next pong message from the connection. // PingPeriod send ping messages to the connection repeatedly after this period.
// Default value is 60 * time.Second // The value should be close to the ReadTimeout to avoid issues.
PongTimeout time.Duration // Default value is 0.
// PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout.
// Default value is 60 *time.Second
PingPeriod time.Duration PingPeriod time.Duration
// MaxMessageSize max message size allowed from connection. // MaxMessageSize max message size allowed from connection.
// Default value is 1024 // Default value is 1024
@ -89,12 +86,13 @@ type Config struct {
// compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication. // compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication.
// Default value is false // Default value is false
BinaryMessages bool BinaryMessages bool
// ReadBufferSize is the buffer size for the connection reader. // ReadBufferSize and WriteBufferSize specify I/O buffer sizes. If a buffer
// Default value is 4096 // size is zero, then buffers allocated by the HTTP server are used. The
ReadBufferSize int // I/O buffer sizes do not limit the size of the messages that can be sent
// WriteBufferSize is the buffer size for the connection writer. // or received.
// Default value is 4096 //
WriteBufferSize int // Default value is 0.
ReadBufferSize, WriteBufferSize int
// EnableCompression specify if the server should attempt to negotiate per // EnableCompression specify if the server should attempt to negotiate per
// message compression (RFC 7692). Setting this value to true does not // message compression (RFC 7692). Setting this value to true does not
// guarantee that compression will be supported. Currently only "no context // guarantee that compression will be supported. Currently only "no context
@ -121,10 +119,6 @@ func (c Config) Validate() Config {
c.ReadTimeout = DefaultWebsocketReadTimeout c.ReadTimeout = DefaultWebsocketReadTimeout
} }
if c.PongTimeout < 0 {
c.PongTimeout = DefaultWebsocketPongTimeout
}
if c.PingPeriod <= 0 { if c.PingPeriod <= 0 {
c.PingPeriod = DefaultWebsocketPingPeriod c.PingPeriod = DefaultWebsocketPingPeriod
} }

View File

@ -2,6 +2,7 @@ package websocket
import ( import (
"bytes" "bytes"
stdContext "context"
"errors" "errors"
"io" "io"
"net" "net"
@ -278,6 +279,12 @@ type (
var _ Connection = &connection{} var _ Connection = &connection{}
// WrapConnection wraps the underline websocket connection into a new iris websocket connection.
// The caller should call the `connection#Wait` (which blocks) to enable its read and write functionality.
func WrapConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) Connection {
return newConnection(underlineConn, cfg)
}
func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection { func newConnection(underlineConn UnderlineConnection, cfg ConnectionConfig) *connection {
cfg = cfg.Validate() cfg = cfg.Validate()
c := &connection{ c := &connection{
@ -306,7 +313,6 @@ func newServerConnection(ctx context.Context, s *Server, underlineConn Underline
EvtMessagePrefix: s.config.EvtMessagePrefix, EvtMessagePrefix: s.config.EvtMessagePrefix,
WriteTimeout: s.config.WriteTimeout, WriteTimeout: s.config.WriteTimeout,
ReadTimeout: s.config.ReadTimeout, ReadTimeout: s.config.ReadTimeout,
PongTimeout: s.config.PongTimeout,
PingPeriod: s.config.PingPeriod, PingPeriod: s.config.PingPeriod,
MaxMessageSize: s.config.MaxMessageSize, MaxMessageSize: s.config.MaxMessageSize,
BinaryMessages: s.config.BinaryMessages, BinaryMessages: s.config.BinaryMessages,
@ -383,25 +389,26 @@ func (c *connection) startPinger() {
c.underline.SetPingHandler(pingHandler) c.underline.SetPingHandler(pingHandler)
if c.config.PingPeriod > 0 {
go func() { go func() {
for { for {
// using sleep avoids the ticker error that causes a memory leak
time.Sleep(c.config.PingPeriod) time.Sleep(c.config.PingPeriod)
if atomic.LoadUint32(&c.disconnected) > 0 { if c == nil || atomic.LoadUint32(&c.disconnected) > 0 {
// verifies if already disconected // verifies if already disconected.
break return
} }
//fire all OnPing methods //fire all OnPing methods
c.fireOnPing() c.fireOnPing()
// try to ping the client, if failed then it disconnects // try to ping the client, if failed then it disconnects.
err := c.Write(websocket.PingMessage, []byte{}) err := c.Write(websocket.PingMessage, []byte{})
if err != nil { if err != nil {
// must stop to exit the loop and finish the go routine // must stop to exit the loop and exit from the routine.
break return
} }
} }
}() }()
} }
}
func (c *connection) fireOnPing() { func (c *connection) fireOnPing() {
// fire the onPingListeners // fire the onPingListeners
@ -444,14 +451,13 @@ func (c *connection) startReader() {
_, data, err := conn.ReadMessage() _, data, err := conn.ReadMessage()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
c.FireOnError(err) c.FireOnError(err)
} }
break return
} else {
c.messageReceived(data)
} }
c.messageReceived(data)
} }
} }
@ -722,14 +728,12 @@ type ConnectionConfig struct {
// 0 means no timeout. // 0 means no timeout.
// Default value is 0 // Default value is 0
ReadTimeout time.Duration ReadTimeout time.Duration
// PongTimeout allowed to read the next pong message from the connection. // PingPeriod send ping messages to the connection repeatedly after this period.
// Default value is 60 * time.Second // The value should be close to the ReadTimeout to avoid issues.
PongTimeout time.Duration // Default value is 0
// PingPeriod send ping messages to the connection within this period. Must be less than PongTimeout.
// Default value is 60 *time.Second
PingPeriod time.Duration PingPeriod time.Duration
// MaxMessageSize max message size allowed from connection. // MaxMessageSize max message size allowed from connection.
// Default value is 1024 // Default value is 0. Unlimited but it is recommended to be 1024 for medium to large messages.
MaxMessageSize int64 MaxMessageSize int64
// BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text // BinaryMessages set it to true in order to denotes binary data messages instead of utf-8 text
// compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication. // compatible if you wanna use the Connection's EmitMessage to send a custom binary data to the client, like a native server-client communication.
@ -765,10 +769,6 @@ func (c ConnectionConfig) Validate() ConnectionConfig {
c.ReadTimeout = DefaultWebsocketReadTimeout c.ReadTimeout = DefaultWebsocketReadTimeout
} }
if c.PongTimeout < 0 {
c.PongTimeout = DefaultWebsocketPongTimeout
}
if c.PingPeriod <= 0 { if c.PingPeriod <= 0 {
c.PingPeriod = DefaultWebsocketPingPeriod c.PingPeriod = DefaultWebsocketPingPeriod
} }
@ -788,22 +788,42 @@ func (c ConnectionConfig) Validate() ConnectionConfig {
return c return c
} }
// Dial opens a new client connection to a WebSocket. // ErrBadHandshake is returned when the server response to opening handshake is
// invalid.
var ErrBadHandshake = websocket.ErrBadHandshake
// DialContext creates a new client connection.
//
// The context will be used in the request and in the Dialer.
//
// If the WebSocket handshake fails, `ErrBadHandshake` is returned.
//
// The "url" input parameter is the url to connect to the server, it should be // The "url" input parameter is the url to connect to the server, it should be
// the ws:// (or wss:// if secure) + the host + the endpoint of the // the ws:// (or wss:// if secure) + the host + the endpoint of the
// open socket of the server, i.e ws://localhost:8080/my_websocket_endpoint. // open socket of the server, i.e ws://localhost:8080/my_websocket_endpoint.
func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) { //
// Custom dialers can be used by wrapping the iris websocket connection via `websocket.WrapConnection`.
func DialContext(ctx stdContext.Context, url string, cfg ConnectionConfig) (ClientConnection, error) {
if ctx == nil {
ctx = stdContext.Background()
}
if !strings.HasPrefix(url, "ws://") { if !strings.HasPrefix(url, "ws://") {
url = "ws://" + url url = "ws://" + url
} }
conn, _, err := websocket.DefaultDialer.Dial(url, nil) conn, _, err := websocket.DefaultDialer.DialContext(ctx, url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
clientConn := newConnection(conn, cfg) clientConn := WrapConnection(conn, cfg)
go clientConn.Wait() go clientConn.Wait()
return clientConn, nil return clientConn, nil
} }
// Dial creates a new client connection by calling `DialContext` with a background context.
func Dial(url string, cfg ConnectionConfig) (ClientConnection, error) {
return DialContext(stdContext.Background(), url, cfg)
}

View File

@ -34,7 +34,7 @@ func (e *emitter) EmitMessage(nativeMessage []byte) error {
} }
func (e *emitter) Emit(event string, data interface{}) error { func (e *emitter) Emit(event string, data interface{}) error {
message, err := e.conn.server.messageSerializer.serialize(event, data) message, err := e.conn.serializer.serialize(event, data)
if err != nil { if err != nil {
return err return err
} }

View File

@ -114,6 +114,7 @@ func (ms *messageSerializer) serialize(event string, data interface{}) ([]byte,
//we suppose is json //we suppose is json
res, err := json.Marshal(data) res, err := json.Marshal(data)
if err != nil { if err != nil {
ms.buf.Put(b)
return nil, err return nil, err
} }
b.WriteString(messageTypeJSON.String()) b.WriteString(messageTypeJSON.String())

View File

@ -44,7 +44,6 @@ type (
// Use a route to serve this file on a specific path, i.e // Use a route to serve this file on a specific path, i.e
// app.Any("/iris-ws.js", func(ctx iris.Context) { ctx.Write(mywebsocketServer.ClientSource) }) // app.Any("/iris-ws.js", func(ctx iris.Context) { ctx.Write(mywebsocketServer.ClientSource) })
ClientSource []byte ClientSource []byte
messageSerializer *messageSerializer
connections map[string]*connection // key = the Connection ID. connections map[string]*connection // key = the Connection ID.
rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name rooms map[string][]string // by default a connection is joined to a room which has the connection id as its name
mu sync.RWMutex // for rooms and connections. mu sync.RWMutex // for rooms and connections.
@ -64,7 +63,6 @@ func New(cfg Config) *Server {
return &Server{ return &Server{
config: cfg, config: cfg,
ClientSource: bytes.Replace(ClientSource, []byte(DefaultEvtMessageKey), cfg.EvtMessagePrefix, -1), ClientSource: bytes.Replace(ClientSource, []byte(DefaultEvtMessageKey), cfg.EvtMessagePrefix, -1),
messageSerializer: newMessageSerializer(cfg.EvtMessagePrefix),
connections: make(map[string]*connection), connections: make(map[string]*connection),
rooms: make(map[string][]string), rooms: make(map[string][]string),
onConnectionListeners: make([]ConnectionFunc, 0), onConnectionListeners: make([]ConnectionFunc, 0),

View File

@ -4,11 +4,6 @@ Source code and other details for the project are available at GitHub:
https://github.com/kataras/iris/tree/master/websocket https://github.com/kataras/iris/tree/master/websocket
Installation
$ go get -u github.com/kataras/iris/websocket
Example code: Example code: