mirror of
https://github.com/kataras/iris.git
synced 2025-03-13 21:36:28 +01:00
Add a very simple usage-example for sending server side events
Former-commit-id: 6df287d915a772bcae3f2f98445676aba39a2bc6
This commit is contained in:
parent
989cb3f045
commit
cb69df2ccf
|
@ -337,6 +337,7 @@ You can serve [quicktemplate](https://github.com/valyala/quicktemplate) and [her
|
|||
- [Write Gzip](http_responsewriter/write-gzip/main.go)
|
||||
- [Stream Writer](http_responsewriter/stream-writer/main.go)
|
||||
- [Transactions](http_responsewriter/transactions/main.go)
|
||||
- [SSE (third-party package usage for server-side events)](http_responsewriter/sse-third-party/main.go)
|
||||
|
||||
> The `context/context#ResponseWriter()` returns an enchament version of a http.ResponseWriter, these examples show some places where the Context uses this object. Besides that you can use it as you did before iris.
|
||||
|
||||
|
|
|
@ -336,6 +336,7 @@ You can serve [quicktemplate](https://github.com/valyala/quicktemplate) and [her
|
|||
- [Write Gzip](http_responsewriter/write-gzip/main.go)
|
||||
- [Stream Writer](http_responsewriter/stream-writer/main.go)
|
||||
- [Transactions](http_responsewriter/transactions/main.go)
|
||||
- [SSE (third-party package usage for server-side events)](http_responsewriter/sse-third-party/main.go)
|
||||
|
||||
> The `context/context#ResponseWriter()` returns an enchament version of a http.ResponseWriter, these examples show some places where the Context uses this object. Besides that you can use it as you did before iris.
|
||||
|
||||
|
|
51
_examples/http_responsewriter/sse-third-party/main.go
vendored
Normal file
51
_examples/http_responsewriter/sse-third-party/main.go
vendored
Normal file
|
@ -0,0 +1,51 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/kataras/iris"
|
||||
"github.com/r3labs/sse"
|
||||
)
|
||||
|
||||
// First of all install the sse third-party package (you can use other if you don't like this approach)
|
||||
// $ go get -u github.com/r3labs/sse
|
||||
func main() {
|
||||
app := iris.New()
|
||||
s := sse.New()
|
||||
/*
|
||||
This creates a new stream inside of the scheduler.
|
||||
Seeing as there are no consumers, publishing a message
|
||||
to this channel will do nothing.
|
||||
Clients can connect to this stream once the iris handler is started
|
||||
by specifying stream as a url parameter, like so:
|
||||
http://localhost:8080/events?stream=messages
|
||||
*/
|
||||
s.CreateStream("messages")
|
||||
|
||||
app.Any("/events", iris.FromStd(s.HTTPHandler))
|
||||
|
||||
go func() {
|
||||
// You design when to send messages to the client,
|
||||
// here we just wait 5 seconds to send the first message
|
||||
// in order to give u time to open a browser window...
|
||||
time.Sleep(5 * time.Second)
|
||||
// Publish a payload to the stream.
|
||||
s.Publish("messages", &sse.Event{
|
||||
Data: []byte("ping"),
|
||||
})
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
s.Publish("messages", &sse.Event{
|
||||
Data: []byte("second message"),
|
||||
})
|
||||
time.Sleep(2 * time.Second)
|
||||
s.Publish("messages", &sse.Event{
|
||||
Data: []byte("third message"),
|
||||
})
|
||||
|
||||
}() // ...
|
||||
|
||||
app.Run(iris.Addr(":8080"), iris.WithoutServerError(iris.ErrServerClosed))
|
||||
}
|
||||
|
||||
/* For a golang SSE client you can look at: https://github.com/r3labs/sse#example-client */
|
|
@ -17,7 +17,7 @@ func main() {
|
|||
i := 0
|
||||
// goroutine in order to no block and just wait,
|
||||
// goroutine is OPTIONAL and not a very good option but it depends on the needs
|
||||
// Look the streaming_simple_2 for an alternative code style
|
||||
// Look the /alternative route for an alternative code style
|
||||
// Send the response in chunks and wait for a second between each chunk.
|
||||
go ctx.StreamWriter(func(w io.Writer) bool {
|
||||
i++
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"encoding/gob"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/kataras/iris/core/memstore"
|
||||
)
|
||||
|
@ -96,42 +95,21 @@ type SyncPayload struct {
|
|||
Store RemoteStore
|
||||
}
|
||||
|
||||
var spPool = sync.Pool{New: func() interface{} { return SyncPayload{} }}
|
||||
|
||||
func acquireSyncPayload(session *Session, action Action) SyncPayload {
|
||||
p := spPool.Get().(SyncPayload)
|
||||
p.SessionID = session.sid
|
||||
|
||||
// clone the life time, except the timer.
|
||||
// lifetime := LifeTime{
|
||||
// Time: session.lifetime.Time,
|
||||
// OriginalDuration: session.lifetime.OriginalDuration,
|
||||
// }
|
||||
|
||||
// lifetime := acquireLifetime(session.lifetime.OriginalDuration, nil)
|
||||
|
||||
p.Store = RemoteStore{
|
||||
Values: session.values,
|
||||
Lifetime: session.lifetime,
|
||||
func newSyncPayload(session *Session, action Action) SyncPayload {
|
||||
return SyncPayload{
|
||||
SessionID: session.sid,
|
||||
Action: action,
|
||||
Store: RemoteStore{
|
||||
Values: session.values,
|
||||
Lifetime: session.lifetime,
|
||||
},
|
||||
}
|
||||
|
||||
p.Action = action
|
||||
return p
|
||||
}
|
||||
|
||||
func releaseSyncPayload(p SyncPayload) {
|
||||
p.Value.Key = ""
|
||||
p.Value.ValueRaw = nil
|
||||
|
||||
// releaseLifetime(p.Store.Lifetime)
|
||||
spPool.Put(p)
|
||||
}
|
||||
|
||||
func syncDatabases(databases []Database, payload SyncPayload) {
|
||||
for i, n := 0, len(databases); i < n; i++ {
|
||||
databases[i].Sync(payload)
|
||||
}
|
||||
releaseSyncPayload(payload)
|
||||
}
|
||||
|
||||
// RemoteStore is a helper which is a wrapper
|
||||
|
|
|
@ -97,10 +97,12 @@ func (p *provider) newSession(sid string, expires time.Duration) *Session {
|
|||
}
|
||||
|
||||
func (p *provider) loadSessionFromDB(sid string) (memstore.Store, LifeTime) {
|
||||
var store memstore.Store
|
||||
var lifetime LifeTime
|
||||
var (
|
||||
store memstore.Store
|
||||
lifetime LifeTime
|
||||
firstValidIdx = 1
|
||||
)
|
||||
|
||||
firstValidIdx := 1
|
||||
for i, n := 0, len(p.databases); i < n; i++ {
|
||||
storeDB := p.databases[i].Load(sid)
|
||||
if storeDB.Lifetime.HasExpired() { // if expired then skip this db
|
||||
|
@ -120,11 +122,16 @@ func (p *provider) loadSessionFromDB(sid string) (memstore.Store, LifeTime) {
|
|||
// else append this database's key-value pairs
|
||||
// to the store
|
||||
storeDB.Values.Visit(func(key string, value interface{}) {
|
||||
store.Set(key, value)
|
||||
store.Save(key, value, false)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// default to memstore if no other store given.
|
||||
// if store == nil {
|
||||
// store = &memstore.Store{}
|
||||
// }
|
||||
|
||||
// Note: if one database and it's being expired then the lifetime will be zero(unlimited)
|
||||
// this by itself is wrong but on the `newSession` we make check of this case too and update the lifetime
|
||||
// if the configuration has expiration registered.
|
||||
|
@ -200,5 +207,7 @@ func (p *provider) DestroyAll() {
|
|||
|
||||
func (p *provider) deleteSession(sess *Session) {
|
||||
delete(p.sessions, sess.sid)
|
||||
syncDatabases(p.databases, acquireSyncPayload(sess, ActionDestroy))
|
||||
if len(p.databases) > 0 {
|
||||
syncDatabases(p.databases, newSyncPayload(sess, ActionDestroy))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -366,11 +366,11 @@ func (s *Session) GetBooleanDefault(key string, defaultValue bool) bool {
|
|||
|
||||
// GetAll returns a copy of all session's values.
|
||||
func (s *Session) GetAll() map[string]interface{} {
|
||||
items := make(map[string]interface{}, len(s.values))
|
||||
items := make(map[string]interface{}, s.values.Len())
|
||||
s.mu.RLock()
|
||||
for _, kv := range s.values {
|
||||
items[kv.Key] = kv.Value()
|
||||
}
|
||||
s.values.Visit(func(key string, value interface{}) {
|
||||
items[key] = value
|
||||
})
|
||||
s.mu.RUnlock()
|
||||
return items
|
||||
}
|
||||
|
@ -394,7 +394,6 @@ func (s *Session) VisitAll(cb func(k string, v interface{})) {
|
|||
}
|
||||
|
||||
func (s *Session) set(key string, value interface{}, immutable bool) {
|
||||
action := ActionCreate // defaults to create, means the first insert.
|
||||
|
||||
isFirst := s.values.Len() == 0
|
||||
entry, isNew := s.values.Save(key, value, immutable)
|
||||
|
@ -403,25 +402,25 @@ func (s *Session) set(key string, value interface{}, immutable bool) {
|
|||
s.isNew = false
|
||||
s.mu.Unlock()
|
||||
|
||||
if !isFirst {
|
||||
// we could use s.isNew
|
||||
// which is setted at sessions.go#Start when values are empty
|
||||
// but here we want the specific key-value pair's state.
|
||||
if isNew {
|
||||
action = ActionInsert
|
||||
} else {
|
||||
action = ActionUpdate
|
||||
if len(s.provider.databases) > 0 {
|
||||
action := ActionCreate // defaults to create, means the first insert.
|
||||
if !isFirst {
|
||||
// we could use s.isNew
|
||||
// which is setted at sessions.go#Start when values are empty
|
||||
// but here we want the specific key-value pair's state.
|
||||
if isNew {
|
||||
action = ActionInsert
|
||||
} else {
|
||||
action = ActionUpdate
|
||||
}
|
||||
}
|
||||
|
||||
p := newSyncPayload(s, action)
|
||||
p.Value = entry
|
||||
|
||||
syncDatabases(s.provider.databases, p)
|
||||
}
|
||||
|
||||
/// TODO: remove the expireAt pointer, wtf, we could use zero time instead,
|
||||
// that was not my commit so I will ask for permission first...
|
||||
// rename the expireAt to expiresAt, it seems to make more sense to me
|
||||
|
||||
p := acquireSyncPayload(s, action)
|
||||
p.Value = entry
|
||||
|
||||
syncDatabases(s.provider.databases, p)
|
||||
}
|
||||
|
||||
// Set fills the session with an entry "value", based on its "key".
|
||||
|
@ -475,9 +474,11 @@ func (s *Session) Delete(key string) bool {
|
|||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
p := acquireSyncPayload(s, ActionDelete)
|
||||
p.Value = memstore.Entry{Key: key}
|
||||
syncDatabases(s.provider.databases, p)
|
||||
if len(s.provider.databases) > 0 {
|
||||
p := newSyncPayload(s, ActionDelete)
|
||||
p.Value = memstore.Entry{Key: key}
|
||||
syncDatabases(s.provider.databases, p)
|
||||
}
|
||||
|
||||
return removed
|
||||
}
|
||||
|
@ -496,8 +497,10 @@ func (s *Session) Clear() {
|
|||
s.isNew = false
|
||||
s.mu.Unlock()
|
||||
|
||||
p := acquireSyncPayload(s, ActionClear)
|
||||
syncDatabases(s.provider.databases, p)
|
||||
if len(s.provider.databases) > 0 {
|
||||
p := newSyncPayload(s, ActionClear)
|
||||
syncDatabases(s.provider.databases, p)
|
||||
}
|
||||
}
|
||||
|
||||
// ClearFlashes removes all flash messages.
|
||||
|
|
Loading…
Reference in New Issue
Block a user