diff --git a/_examples/sessions/database/redis_cluster/main.go b/_examples/sessions/database/redis_cluster/main.go new file mode 100644 index 00000000..ed074d89 --- /dev/null +++ b/_examples/sessions/database/redis_cluster/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "time" + + "github.com/kataras/iris/v12" + + "github.com/kataras/iris/v12/sessions" + "github.com/kataras/iris/v12/sessions/sessiondb/redis" + + "github.com/kataras/iris/v12/_examples/sessions/overview/example" +) + +// tested with redis version 3.0.503. +// for windows see: https://github.com/ServiceStack/redis-windows +func main() { + // These are the default values, + // you can replace them based on your running redis' server settings: + cluster := []string{ + "192.168.1.101:7000", + "192.168.1.101:7001", + "192.168.1.101:7002", + "192.168.1.102:7003", + "192.168.1.102:7004", + "192.168.1.102:7005", + } + db := redis.New(redis.Config{ + Network: "tcp", + Addr: "", + Timeout: time.Duration(30) * time.Second, + MaxActive: 10, + Password: "will-share", + Database: "", + Prefix: "", + Delim: "-", + Driver: redis.RadixCluster(), + Clusters: cluster, + }) + + // Optionally configure the underline driver: + // driver := redis.Redigo() + // driver.MaxIdle = ... + // driver.IdleTimeout = ... + // driver.Wait = ... + // redis.Config {Driver: driver} + + // Close connection when control+C/cmd+C + iris.RegisterOnInterrupt(func() { + db.Close() + }) + + defer db.Close() // close the database connection if application errored. + + sess := sessions.New(sessions.Config{ + Cookie: "_session_id", + Expires: 0, // defaults to 0: unlimited life. Another good value is: 45 * time.Minute, + AllowReclaim: true, + CookieSecureTLS: true, + }) + + // + // IMPORTANT: + // + sess.UseDatabase(db) + + app := example.NewApp(sess) + app.Listen(":8080") +} diff --git a/sessions/sessiondb/redis/database.go b/sessions/sessiondb/redis/database.go index a731b5da..2a150e21 100644 --- a/sessions/sessiondb/redis/database.go +++ b/sessions/sessiondb/redis/database.go @@ -31,7 +31,7 @@ type Config struct { Addr string // Clusters a list of network addresses for clusters. // If not empty "Addr" is ignored. - // Currently only Radix() Driver supports it. + // Currently only RadixCluster() Driver supports it. Clusters []string // Password string .If no password then no 'AUTH'. Defaults to "". Password string @@ -51,7 +51,7 @@ type Config struct { // See https://golang.org/pkg/crypto/tls/#Config TLSConfig *tls.Config - // Driver supports `Redigo()` or `Radix()` go clients for redis. + // Driver supports `Redigo()` or `Radix()` or `RadixCluster()` go clients for redis. // Configure each driver by the return value of their constructors. // // Defaults to `Redigo()`. @@ -119,8 +119,7 @@ func New(cfg ...Config) *Database { db := &Database{c: c} _, err := db.c.Driver.PingPong() if err != nil { - golog.Debugf("error connecting to redis: %v", err) - return nil + panic(err) } // runtime.SetFinalizer(db, closeDB) return db diff --git a/sessions/sessiondb/redis/driver.go b/sessions/sessiondb/redis/driver.go index 3aae3ab9..15135d41 100644 --- a/sessions/sessiondb/redis/driver.go +++ b/sessions/sessiondb/redis/driver.go @@ -19,6 +19,7 @@ type Driver interface { var ( _ Driver = (*RedigoDriver)(nil) _ Driver = (*RadixDriver)(nil) + _ Driver = (*RadixClusterDriver)(nil) ) // Redigo returns the driver for the redigo go redis client. @@ -32,3 +33,8 @@ func Redigo() *RedigoDriver { func Radix() *RadixDriver { return &RadixDriver{} } + +// RadixCluster returns the driver for the radix go redis client(only support redis cluster). +func RadixCluster() *RadixClusterDriver { + return &RadixClusterDriver{} +} diff --git a/sessions/sessiondb/redis/driver_radix.go b/sessions/sessiondb/redis/driver_radix.go index c06cfd48..e6388caa 100644 --- a/sessions/sessiondb/redis/driver_radix.go +++ b/sessions/sessiondb/redis/driver_radix.go @@ -4,7 +4,6 @@ import ( "bufio" "errors" "fmt" - "math/rand" "strconv" "github.com/mediocregopher/radix/v3" @@ -69,26 +68,9 @@ func (r *RadixDriver) Connect(c Config) error { } var connFunc radix.ConnFunc - - if len(c.Clusters) > 0 { - cluster, err := radix.NewCluster(c.Clusters) - if err != nil { - // maybe an - // ERR This instance has cluster support disabled - return err - } - - connFunc = func(network, addr string) (radix.Conn, error) { - topo := cluster.Topo() - node := topo[rand.Intn(len(topo))] - return radix.Dial(c.Network, node.Addr, options...) - } - } else { - connFunc = func(network, addr string) (radix.Conn, error) { - return radix.Dial(c.Network, c.Addr, options...) - } + connFunc = func(network, addr string) (radix.Conn, error) { + return radix.Dial(c.Network, c.Addr, options...) } - pool, err := radix.NewPool(c.Network, c.Addr, c.MaxActive, radix.PoolConnFunc(connFunc)) if err != nil { return err diff --git a/sessions/sessiondb/redis/driver_radix_cluster.go b/sessions/sessiondb/redis/driver_radix_cluster.go new file mode 100644 index 00000000..5afe1229 --- /dev/null +++ b/sessions/sessiondb/redis/driver_radix_cluster.go @@ -0,0 +1,223 @@ +package redis + +import ( + "errors" + "fmt" + "github.com/mediocregopher/radix/v3" + "strconv" + "time" +) + +// RadixDriver the Redis service based on the radix go client, +// contains the config and the redis cluster. +type RadixClusterDriver struct { + Connected bool //Connected is true when the Service has already connected + Config Config //Config the read-only redis database config. + cluster *radix.Cluster +} + +// Connect connects to the redis, called only once +func (r *RadixClusterDriver) Connect(c Config) error { + if c.Timeout < 0 { + c.Timeout = DefaultRedisTimeout + } + if c.Network == "" { + c.Network = DefaultRedisNetwork + } + if c.Addr == "" { + c.Addr = DefaultRedisAddr + } + if c.MaxActive == 0 { + c.MaxActive = 10 + } + if c.Delim == "" { + c.Delim = DefaultDelim + } + if len(c.Clusters) < 1 { + return errors.New("cluster empty") + } + + var options []radix.DialOpt + if c.Password != "" { + options = append(options, radix.DialAuthPass(c.Password)) + } + if c.Timeout > 0 { + options = append(options, radix.DialTimeout(c.Timeout)) + } + + customConnFunc := func(network, addr string) (radix.Conn, error) { + return radix.Dial(network, addr, + radix.DialTimeout(1*time.Minute), + radix.DialAuthPass(c.Password), + ) + } + poolFunc := func(network, addr string) (radix.Client, error) { + return radix.NewPool(network, addr, c.MaxActive, radix.PoolConnFunc(customConnFunc)) + } + cluster, err := radix.NewCluster(c.Clusters, radix.ClusterPoolFunc(poolFunc)) + if err != nil { + return err + } + + r.Connected = true + r.cluster = cluster + r.Config = c + return nil +} + +// PingPong sends a ping and receives a pong, if no pong received then returns false and filled error +func (r *RadixClusterDriver) PingPong() (bool, error) { + var msg string + err := r.cluster.Do(radix.Cmd(&msg, "PING")) + if err != nil { + return false, err + } + + return (msg == "PONG"), nil +} + +// CloseConnection closes the redis connection. +func (r *RadixClusterDriver) CloseConnection() error { + if r.cluster != nil { + return r.cluster.Close() + } + + return ErrRedisClosed +} + +// Set sets a key-value to the redis store. +// The expiration is setted by the secondsLifetime. +func (r *RadixClusterDriver) Set(key string, value interface{}, secondsLifetime int64) error { + var cmd radix.CmdAction + // if has expiration, then use the "EX" to delete the key automatically. + if secondsLifetime > 0 { + cmd = radix.FlatCmd(nil, "SETEX", r.Config.Prefix+key, secondsLifetime, value) + } else { + cmd = radix.FlatCmd(nil, "SET", r.Config.Prefix+key, value) // MSET same performance... + } + + return r.cluster.Do(cmd) +} + +// Get returns value, err by its key +// returns nil and a filled error if something bad happened. +func (r *RadixClusterDriver) Get(key string) (interface{}, error) { + var redisVal interface{} + mn := radix.MaybeNil{Rcv: &redisVal} + + err := r.cluster.Do(radix.Cmd(&mn, "GET", r.Config.Prefix+key)) + if err != nil { + return nil, err + } + if mn.Nil { + return nil, fmt.Errorf("%s: %w", key, ErrKeyNotFound) + } + + return redisVal, nil +} + +// TTL returns the seconds to expire, if the key has expiration and error if action failed. +// Read more at: https://redis.io/commands/ttl +func (r *RadixClusterDriver) TTL(key string) (seconds int64, hasExpiration bool, found bool) { + var redisVal interface{} + err := r.cluster.Do(radix.Cmd(&redisVal, "TTL", r.Config.Prefix+key)) + if err != nil { + return -2, false, false + } + seconds = redisVal.(int64) + // if -1 means the key has unlimited life time. + hasExpiration = seconds > -1 + // if -2 means key does not exist. + found = seconds != -2 + return +} + +func (r *RadixClusterDriver) updateTTLConn(key string, newSecondsLifeTime int64) error { + var reply int + err := r.cluster.Do(radix.FlatCmd(&reply, "EXPIRE", r.Config.Prefix+key, newSecondsLifeTime)) + if err != nil { + return err + } + + // 1 if the timeout was set. + // 0 if key does not exist. + if reply == 1 { + return nil + } else if reply == 0 { + return fmt.Errorf("unable to update expiration, the key '%s' was stored without ttl", key) + } // do not check for -1. + + return nil +} + +func (r RadixClusterDriver) getKeys(cursor, prefix string) ([]string, error) { + var res scanResult + err := r.cluster.Do(radix.Cmd(&res, "SCAN", cursor, "MATCH", r.Config.Prefix+prefix+"", "COUNT", "300000")) + if err != nil { + return nil, err + } + + keys := res.keys[0:] + if res.cur != "0" { + moreKeys, err := r.getKeys(res.cur, prefix) + if err != nil { + return nil, err + } + + keys = append(keys, moreKeys...) + } + + return keys, nil +} + +// UpdateTTL will update the ttl of a key. +// Using the "EXPIRE" command. +// Read more at: https://redis.io/commands/expire#refreshing-expires +func (r *RadixClusterDriver) UpdateTTL(key string, newSecondsLifeTime int64) error { + return r.updateTTLConn(key, newSecondsLifeTime) +} + +// UpdateTTLMany like UpdateTTL but for all keys starting with that "prefix", +// it is a bit faster operation if you need to update all sessions keys (although it can be even faster if we used hash but this will limit other features), +// look the sessions/Database#OnUpdateExpiration for example. +func (r *RadixClusterDriver) UpdateTTLMany(prefix string, newSecondsLifeTime int64) error { + keys, err := r.getKeys("0", prefix) + if err != nil { + return err + } + + for _, key := range keys { + if err = r.updateTTLConn(key, newSecondsLifeTime); err != nil { // fail on first error. + return err + } + } + + return err +} + +// GetAll returns all redis entries using the "SCAN" command (2.8+). +func (r *RadixClusterDriver) GetAll() (interface{}, error) { + var redisVal []interface{} + mn := radix.MaybeNil{Rcv: &redisVal} + err := r.cluster.Do(radix.Cmd(&mn, "SCAN", strconv.Itoa(0))) // 0 -> cursor + if err != nil { + return nil, err + } + + if mn.Nil { + return nil, err + } + + return redisVal, nil +} + +// GetKeys returns all redis keys using the "SCAN" with MATCH command. +// Read more at: https://redis.io/commands/scan#the-match-option. +func (r *RadixClusterDriver) GetKeys(prefix string) ([]string, error) { + return r.getKeys("0", prefix) +} + +// Delete removes redis entry by specific key +func (r *RadixClusterDriver) Delete(key string) error { + return r.cluster.Do(radix.Cmd(nil, "DEL", r.Config.Prefix+key)) +}