From cb69df2ccfcb6b124c642756fa05674a8888f556 Mon Sep 17 00:00:00 2001 From: Gerasimos Maropoulos Date: Tue, 10 Apr 2018 21:01:24 +0300 Subject: [PATCH] Add a very simple usage-example for sending server side events Former-commit-id: 6df287d915a772bcae3f2f98445676aba39a2bc6 --- _examples/README.md | 1 + _examples/README_ZH.md | 1 + .../sse-third-party/main.go | 51 +++++++++++++++++ .../http_responsewriter/stream-writer/main.go | 2 +- sessions/database.go | 38 +++---------- sessions/provider.go | 19 +++++-- sessions/session.go | 55 ++++++++++--------- 7 files changed, 105 insertions(+), 62 deletions(-) create mode 100644 _examples/http_responsewriter/sse-third-party/main.go diff --git a/_examples/README.md b/_examples/README.md index 9b7a3b53..082b851a 100644 --- a/_examples/README.md +++ b/_examples/README.md @@ -337,6 +337,7 @@ You can serve [quicktemplate](https://github.com/valyala/quicktemplate) and [her - [Write Gzip](http_responsewriter/write-gzip/main.go) - [Stream Writer](http_responsewriter/stream-writer/main.go) - [Transactions](http_responsewriter/transactions/main.go) +- [SSE (third-party package usage for server-side events)](http_responsewriter/sse-third-party/main.go) > The `context/context#ResponseWriter()` returns an enchament version of a http.ResponseWriter, these examples show some places where the Context uses this object. Besides that you can use it as you did before iris. diff --git a/_examples/README_ZH.md b/_examples/README_ZH.md index f9400bcf..ea377833 100644 --- a/_examples/README_ZH.md +++ b/_examples/README_ZH.md @@ -336,6 +336,7 @@ You can serve [quicktemplate](https://github.com/valyala/quicktemplate) and [her - [Write Gzip](http_responsewriter/write-gzip/main.go) - [Stream Writer](http_responsewriter/stream-writer/main.go) - [Transactions](http_responsewriter/transactions/main.go) +- [SSE (third-party package usage for server-side events)](http_responsewriter/sse-third-party/main.go) > The `context/context#ResponseWriter()` returns an enchament version of a http.ResponseWriter, these examples show some places where the Context uses this object. Besides that you can use it as you did before iris. diff --git a/_examples/http_responsewriter/sse-third-party/main.go b/_examples/http_responsewriter/sse-third-party/main.go new file mode 100644 index 00000000..4c5be9b1 --- /dev/null +++ b/_examples/http_responsewriter/sse-third-party/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "time" + + "github.com/kataras/iris" + "github.com/r3labs/sse" +) + +// First of all install the sse third-party package (you can use other if you don't like this approach) +// $ go get -u github.com/r3labs/sse +func main() { + app := iris.New() + s := sse.New() + /* + This creates a new stream inside of the scheduler. + Seeing as there are no consumers, publishing a message + to this channel will do nothing. + Clients can connect to this stream once the iris handler is started + by specifying stream as a url parameter, like so: + http://localhost:8080/events?stream=messages + */ + s.CreateStream("messages") + + app.Any("/events", iris.FromStd(s.HTTPHandler)) + + go func() { + // You design when to send messages to the client, + // here we just wait 5 seconds to send the first message + // in order to give u time to open a browser window... + time.Sleep(5 * time.Second) + // Publish a payload to the stream. + s.Publish("messages", &sse.Event{ + Data: []byte("ping"), + }) + + time.Sleep(3 * time.Second) + s.Publish("messages", &sse.Event{ + Data: []byte("second message"), + }) + time.Sleep(2 * time.Second) + s.Publish("messages", &sse.Event{ + Data: []byte("third message"), + }) + + }() // ... + + app.Run(iris.Addr(":8080"), iris.WithoutServerError(iris.ErrServerClosed)) +} + +/* For a golang SSE client you can look at: https://github.com/r3labs/sse#example-client */ diff --git a/_examples/http_responsewriter/stream-writer/main.go b/_examples/http_responsewriter/stream-writer/main.go index ac6f178d..175b7a84 100644 --- a/_examples/http_responsewriter/stream-writer/main.go +++ b/_examples/http_responsewriter/stream-writer/main.go @@ -17,7 +17,7 @@ func main() { i := 0 // goroutine in order to no block and just wait, // goroutine is OPTIONAL and not a very good option but it depends on the needs - // Look the streaming_simple_2 for an alternative code style + // Look the /alternative route for an alternative code style // Send the response in chunks and wait for a second between each chunk. go ctx.StreamWriter(func(w io.Writer) bool { i++ diff --git a/sessions/database.go b/sessions/database.go index 02a8042e..73eb2ea1 100644 --- a/sessions/database.go +++ b/sessions/database.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/gob" "io" - "sync" "github.com/kataras/iris/core/memstore" ) @@ -96,42 +95,21 @@ type SyncPayload struct { Store RemoteStore } -var spPool = sync.Pool{New: func() interface{} { return SyncPayload{} }} - -func acquireSyncPayload(session *Session, action Action) SyncPayload { - p := spPool.Get().(SyncPayload) - p.SessionID = session.sid - - // clone the life time, except the timer. - // lifetime := LifeTime{ - // Time: session.lifetime.Time, - // OriginalDuration: session.lifetime.OriginalDuration, - // } - - // lifetime := acquireLifetime(session.lifetime.OriginalDuration, nil) - - p.Store = RemoteStore{ - Values: session.values, - Lifetime: session.lifetime, +func newSyncPayload(session *Session, action Action) SyncPayload { + return SyncPayload{ + SessionID: session.sid, + Action: action, + Store: RemoteStore{ + Values: session.values, + Lifetime: session.lifetime, + }, } - - p.Action = action - return p -} - -func releaseSyncPayload(p SyncPayload) { - p.Value.Key = "" - p.Value.ValueRaw = nil - - // releaseLifetime(p.Store.Lifetime) - spPool.Put(p) } func syncDatabases(databases []Database, payload SyncPayload) { for i, n := 0, len(databases); i < n; i++ { databases[i].Sync(payload) } - releaseSyncPayload(payload) } // RemoteStore is a helper which is a wrapper diff --git a/sessions/provider.go b/sessions/provider.go index bf539a8d..460a6983 100644 --- a/sessions/provider.go +++ b/sessions/provider.go @@ -97,10 +97,12 @@ func (p *provider) newSession(sid string, expires time.Duration) *Session { } func (p *provider) loadSessionFromDB(sid string) (memstore.Store, LifeTime) { - var store memstore.Store - var lifetime LifeTime + var ( + store memstore.Store + lifetime LifeTime + firstValidIdx = 1 + ) - firstValidIdx := 1 for i, n := 0, len(p.databases); i < n; i++ { storeDB := p.databases[i].Load(sid) if storeDB.Lifetime.HasExpired() { // if expired then skip this db @@ -120,11 +122,16 @@ func (p *provider) loadSessionFromDB(sid string) (memstore.Store, LifeTime) { // else append this database's key-value pairs // to the store storeDB.Values.Visit(func(key string, value interface{}) { - store.Set(key, value) + store.Save(key, value, false) }) } } + // default to memstore if no other store given. + // if store == nil { + // store = &memstore.Store{} + // } + // Note: if one database and it's being expired then the lifetime will be zero(unlimited) // this by itself is wrong but on the `newSession` we make check of this case too and update the lifetime // if the configuration has expiration registered. @@ -200,5 +207,7 @@ func (p *provider) DestroyAll() { func (p *provider) deleteSession(sess *Session) { delete(p.sessions, sess.sid) - syncDatabases(p.databases, acquireSyncPayload(sess, ActionDestroy)) + if len(p.databases) > 0 { + syncDatabases(p.databases, newSyncPayload(sess, ActionDestroy)) + } } diff --git a/sessions/session.go b/sessions/session.go index 84dfa0d2..4b19ee2b 100644 --- a/sessions/session.go +++ b/sessions/session.go @@ -366,11 +366,11 @@ func (s *Session) GetBooleanDefault(key string, defaultValue bool) bool { // GetAll returns a copy of all session's values. func (s *Session) GetAll() map[string]interface{} { - items := make(map[string]interface{}, len(s.values)) + items := make(map[string]interface{}, s.values.Len()) s.mu.RLock() - for _, kv := range s.values { - items[kv.Key] = kv.Value() - } + s.values.Visit(func(key string, value interface{}) { + items[key] = value + }) s.mu.RUnlock() return items } @@ -394,7 +394,6 @@ func (s *Session) VisitAll(cb func(k string, v interface{})) { } func (s *Session) set(key string, value interface{}, immutable bool) { - action := ActionCreate // defaults to create, means the first insert. isFirst := s.values.Len() == 0 entry, isNew := s.values.Save(key, value, immutable) @@ -403,25 +402,25 @@ func (s *Session) set(key string, value interface{}, immutable bool) { s.isNew = false s.mu.Unlock() - if !isFirst { - // we could use s.isNew - // which is setted at sessions.go#Start when values are empty - // but here we want the specific key-value pair's state. - if isNew { - action = ActionInsert - } else { - action = ActionUpdate + if len(s.provider.databases) > 0 { + action := ActionCreate // defaults to create, means the first insert. + if !isFirst { + // we could use s.isNew + // which is setted at sessions.go#Start when values are empty + // but here we want the specific key-value pair's state. + if isNew { + action = ActionInsert + } else { + action = ActionUpdate + } } + + p := newSyncPayload(s, action) + p.Value = entry + + syncDatabases(s.provider.databases, p) } - /// TODO: remove the expireAt pointer, wtf, we could use zero time instead, - // that was not my commit so I will ask for permission first... - // rename the expireAt to expiresAt, it seems to make more sense to me - - p := acquireSyncPayload(s, action) - p.Value = entry - - syncDatabases(s.provider.databases, p) } // Set fills the session with an entry "value", based on its "key". @@ -475,9 +474,11 @@ func (s *Session) Delete(key string) bool { } s.mu.Unlock() - p := acquireSyncPayload(s, ActionDelete) - p.Value = memstore.Entry{Key: key} - syncDatabases(s.provider.databases, p) + if len(s.provider.databases) > 0 { + p := newSyncPayload(s, ActionDelete) + p.Value = memstore.Entry{Key: key} + syncDatabases(s.provider.databases, p) + } return removed } @@ -496,8 +497,10 @@ func (s *Session) Clear() { s.isNew = false s.mu.Unlock() - p := acquireSyncPayload(s, ActionClear) - syncDatabases(s.provider.databases, p) + if len(s.provider.databases) > 0 { + p := newSyncPayload(s, ActionClear) + syncDatabases(s.provider.databases, p) + } } // ClearFlashes removes all flash messages.