From 98636112642d379f5cb071b1a17c9178881d12bf Mon Sep 17 00:00:00 2001 From: "Gerasimos (Makis) Maropoulos" Date: Sat, 27 Jun 2020 13:53:16 +0300 Subject: [PATCH] fix PR: https://github.com/kataras/iris/pull/1546 Former-commit-id: be901767a8008556e3499da60f6ca65f80219346 --- sessions/sessiondb/redis/driver.go | 6 - sessions/sessiondb/redis/driver_radix.go | 54 ++++- .../sessiondb/redis/driver_radix_cluster.go | 223 ------------------ 3 files changed, 50 insertions(+), 233 deletions(-) delete mode 100644 sessions/sessiondb/redis/driver_radix_cluster.go diff --git a/sessions/sessiondb/redis/driver.go b/sessions/sessiondb/redis/driver.go index 15135d41..3aae3ab9 100644 --- a/sessions/sessiondb/redis/driver.go +++ b/sessions/sessiondb/redis/driver.go @@ -19,7 +19,6 @@ type Driver interface { var ( _ Driver = (*RedigoDriver)(nil) _ Driver = (*RadixDriver)(nil) - _ Driver = (*RadixClusterDriver)(nil) ) // Redigo returns the driver for the redigo go redis client. @@ -33,8 +32,3 @@ func Redigo() *RedigoDriver { func Radix() *RadixDriver { return &RadixDriver{} } - -// RadixCluster returns the driver for the radix go redis client(only support redis cluster). -func RadixCluster() *RadixClusterDriver { - return &RadixClusterDriver{} -} diff --git a/sessions/sessiondb/redis/driver_radix.go b/sessions/sessiondb/redis/driver_radix.go index e6388caa..87bb2597 100644 --- a/sessions/sessiondb/redis/driver_radix.go +++ b/sessions/sessiondb/redis/driver_radix.go @@ -10,6 +10,12 @@ import ( "github.com/mediocregopher/radix/v3/resp/resp2" ) +// radixPool an interface to complete both *radix.Pool and *radix.Cluster. +type radixPool interface { + Do(a radix.Action) error + Close() error +} + // RadixDriver the Redis service based on the radix go client, // contains the config and the redis pool. type RadixDriver struct { @@ -17,7 +23,7 @@ type RadixDriver struct { Connected bool // Config the read-only redis database config. Config Config - pool *radix.Pool + pool radixPool } // Connect connects to the redis, called only once @@ -68,12 +74,52 @@ func (r *RadixDriver) Connect(c Config) error { } var connFunc radix.ConnFunc + + /* Note(@kataras): according to #1545 the below does NOT work, and we should + use the Cluster instance itself to fire requests. + We need a separate `radix.Cluster` instance to do the calls, + fortunally both Pool and Cluster implement the same Do and Close methods we need, + so a new `radixPool` interface to remove any dupl code is used instead. + + if len(c.Clusters) > 0 { + cluster, err := radix.NewCluster(c.Clusters) + if err != nil { + // maybe an + // ERR This instance has cluster support disabled + return err + } + + connFunc = func(network, addr string) (radix.Conn, error) { + topo := cluster.Topo() + node := topo[rand.Intn(len(topo))] + return radix.Dial(c.Network, node.Addr, options...) + } + } else { + */ connFunc = func(network, addr string) (radix.Conn, error) { return radix.Dial(c.Network, c.Addr, options...) } - pool, err := radix.NewPool(c.Network, c.Addr, c.MaxActive, radix.PoolConnFunc(connFunc)) - if err != nil { - return err + + var pool radixPool + + if len(c.Clusters) > 0 { + poolFunc := func(network, addr string) (radix.Client, error) { + return radix.NewPool(network, addr, c.MaxActive, radix.PoolConnFunc(connFunc)) + } + + cluster, err := radix.NewCluster(c.Clusters, radix.ClusterPoolFunc(poolFunc)) + if err != nil { + return err + } + + pool = cluster + } else { + p, err := radix.NewPool(c.Network, c.Addr, c.MaxActive, radix.PoolConnFunc(connFunc)) + if err != nil { + return err + } + + pool = p } r.Connected = true diff --git a/sessions/sessiondb/redis/driver_radix_cluster.go b/sessions/sessiondb/redis/driver_radix_cluster.go deleted file mode 100644 index 5afe1229..00000000 --- a/sessions/sessiondb/redis/driver_radix_cluster.go +++ /dev/null @@ -1,223 +0,0 @@ -package redis - -import ( - "errors" - "fmt" - "github.com/mediocregopher/radix/v3" - "strconv" - "time" -) - -// RadixDriver the Redis service based on the radix go client, -// contains the config and the redis cluster. -type RadixClusterDriver struct { - Connected bool //Connected is true when the Service has already connected - Config Config //Config the read-only redis database config. - cluster *radix.Cluster -} - -// Connect connects to the redis, called only once -func (r *RadixClusterDriver) Connect(c Config) error { - if c.Timeout < 0 { - c.Timeout = DefaultRedisTimeout - } - if c.Network == "" { - c.Network = DefaultRedisNetwork - } - if c.Addr == "" { - c.Addr = DefaultRedisAddr - } - if c.MaxActive == 0 { - c.MaxActive = 10 - } - if c.Delim == "" { - c.Delim = DefaultDelim - } - if len(c.Clusters) < 1 { - return errors.New("cluster empty") - } - - var options []radix.DialOpt - if c.Password != "" { - options = append(options, radix.DialAuthPass(c.Password)) - } - if c.Timeout > 0 { - options = append(options, radix.DialTimeout(c.Timeout)) - } - - customConnFunc := func(network, addr string) (radix.Conn, error) { - return radix.Dial(network, addr, - radix.DialTimeout(1*time.Minute), - radix.DialAuthPass(c.Password), - ) - } - poolFunc := func(network, addr string) (radix.Client, error) { - return radix.NewPool(network, addr, c.MaxActive, radix.PoolConnFunc(customConnFunc)) - } - cluster, err := radix.NewCluster(c.Clusters, radix.ClusterPoolFunc(poolFunc)) - if err != nil { - return err - } - - r.Connected = true - r.cluster = cluster - r.Config = c - return nil -} - -// PingPong sends a ping and receives a pong, if no pong received then returns false and filled error -func (r *RadixClusterDriver) PingPong() (bool, error) { - var msg string - err := r.cluster.Do(radix.Cmd(&msg, "PING")) - if err != nil { - return false, err - } - - return (msg == "PONG"), nil -} - -// CloseConnection closes the redis connection. -func (r *RadixClusterDriver) CloseConnection() error { - if r.cluster != nil { - return r.cluster.Close() - } - - return ErrRedisClosed -} - -// Set sets a key-value to the redis store. -// The expiration is setted by the secondsLifetime. -func (r *RadixClusterDriver) Set(key string, value interface{}, secondsLifetime int64) error { - var cmd radix.CmdAction - // if has expiration, then use the "EX" to delete the key automatically. - if secondsLifetime > 0 { - cmd = radix.FlatCmd(nil, "SETEX", r.Config.Prefix+key, secondsLifetime, value) - } else { - cmd = radix.FlatCmd(nil, "SET", r.Config.Prefix+key, value) // MSET same performance... - } - - return r.cluster.Do(cmd) -} - -// Get returns value, err by its key -// returns nil and a filled error if something bad happened. -func (r *RadixClusterDriver) Get(key string) (interface{}, error) { - var redisVal interface{} - mn := radix.MaybeNil{Rcv: &redisVal} - - err := r.cluster.Do(radix.Cmd(&mn, "GET", r.Config.Prefix+key)) - if err != nil { - return nil, err - } - if mn.Nil { - return nil, fmt.Errorf("%s: %w", key, ErrKeyNotFound) - } - - return redisVal, nil -} - -// TTL returns the seconds to expire, if the key has expiration and error if action failed. -// Read more at: https://redis.io/commands/ttl -func (r *RadixClusterDriver) TTL(key string) (seconds int64, hasExpiration bool, found bool) { - var redisVal interface{} - err := r.cluster.Do(radix.Cmd(&redisVal, "TTL", r.Config.Prefix+key)) - if err != nil { - return -2, false, false - } - seconds = redisVal.(int64) - // if -1 means the key has unlimited life time. - hasExpiration = seconds > -1 - // if -2 means key does not exist. - found = seconds != -2 - return -} - -func (r *RadixClusterDriver) updateTTLConn(key string, newSecondsLifeTime int64) error { - var reply int - err := r.cluster.Do(radix.FlatCmd(&reply, "EXPIRE", r.Config.Prefix+key, newSecondsLifeTime)) - if err != nil { - return err - } - - // 1 if the timeout was set. - // 0 if key does not exist. - if reply == 1 { - return nil - } else if reply == 0 { - return fmt.Errorf("unable to update expiration, the key '%s' was stored without ttl", key) - } // do not check for -1. - - return nil -} - -func (r RadixClusterDriver) getKeys(cursor, prefix string) ([]string, error) { - var res scanResult - err := r.cluster.Do(radix.Cmd(&res, "SCAN", cursor, "MATCH", r.Config.Prefix+prefix+"", "COUNT", "300000")) - if err != nil { - return nil, err - } - - keys := res.keys[0:] - if res.cur != "0" { - moreKeys, err := r.getKeys(res.cur, prefix) - if err != nil { - return nil, err - } - - keys = append(keys, moreKeys...) - } - - return keys, nil -} - -// UpdateTTL will update the ttl of a key. -// Using the "EXPIRE" command. -// Read more at: https://redis.io/commands/expire#refreshing-expires -func (r *RadixClusterDriver) UpdateTTL(key string, newSecondsLifeTime int64) error { - return r.updateTTLConn(key, newSecondsLifeTime) -} - -// UpdateTTLMany like UpdateTTL but for all keys starting with that "prefix", -// it is a bit faster operation if you need to update all sessions keys (although it can be even faster if we used hash but this will limit other features), -// look the sessions/Database#OnUpdateExpiration for example. -func (r *RadixClusterDriver) UpdateTTLMany(prefix string, newSecondsLifeTime int64) error { - keys, err := r.getKeys("0", prefix) - if err != nil { - return err - } - - for _, key := range keys { - if err = r.updateTTLConn(key, newSecondsLifeTime); err != nil { // fail on first error. - return err - } - } - - return err -} - -// GetAll returns all redis entries using the "SCAN" command (2.8+). -func (r *RadixClusterDriver) GetAll() (interface{}, error) { - var redisVal []interface{} - mn := radix.MaybeNil{Rcv: &redisVal} - err := r.cluster.Do(radix.Cmd(&mn, "SCAN", strconv.Itoa(0))) // 0 -> cursor - if err != nil { - return nil, err - } - - if mn.Nil { - return nil, err - } - - return redisVal, nil -} - -// GetKeys returns all redis keys using the "SCAN" with MATCH command. -// Read more at: https://redis.io/commands/scan#the-match-option. -func (r *RadixClusterDriver) GetKeys(prefix string) ([]string, error) { - return r.getKeys("0", prefix) -} - -// Delete removes redis entry by specific key -func (r *RadixClusterDriver) Delete(key string) error { - return r.cluster.Do(radix.Cmd(nil, "DEL", r.Config.Prefix+key)) -}