From b8218df789b21005cee3b8ad4ddab3115b47fb60 Mon Sep 17 00:00:00 2001
From: will <>
Date: Sat, 27 Jun 2020 17:33:02 +0800
Subject: [PATCH] add cluster driver

Former-commit-id: 1ca87c78690a3c7a5bb40d8d669de4371bd3932c
 .../sessions/database/redis_cluster/main.go   |  68 ++++++
 sessions/sessiondb/redis/database.go          |   7 +-
 sessions/sessiondb/redis/driver.go            |   6 +
 sessions/sessiondb/redis/driver_radix.go      |  22 +-
 .../sessiondb/redis/driver_radix_cluster.go   | 223 ++++++++++++++++++
 5 files changed, 302 insertions(+), 24 deletions(-)
 create mode 100644 _examples/sessions/database/redis_cluster/main.go
 create mode 100644 sessions/sessiondb/redis/driver_radix_cluster.go

diff --git a/_examples/sessions/database/redis_cluster/main.go b/_examples/sessions/database/redis_cluster/main.go
new file mode 100644
index 00000000..ed074d89
--- /dev/null
+++ b/_examples/sessions/database/redis_cluster/main.go
@@ -0,0 +1,68 @@
+package main
+import (
+	"time"
+	""
+	""
+	""
+	""
+// tested with redis version 3.0.503.
+// for windows see:
+func main() {
+	// These are the default values,
+	// you can replace them based on your running redis' server settings:
+	cluster := []string{
+		"",
+		"",
+		"",
+		"",
+		"",
+		"",
+	}
+	db := redis.New(redis.Config{
+		Network:   "tcp",
+		Addr:      "",
+		Timeout:   time.Duration(30) * time.Second,
+		MaxActive: 10,
+		Password:  "will-share",
+		Database:  "",
+		Prefix:    "",
+		Delim:     "-",
+		Driver:    redis.RadixCluster(),
+		Clusters:  cluster,
+	})
+	// Optionally configure the underline driver:
+	// driver := redis.Redigo()
+	// driver.MaxIdle = ...
+	// driver.IdleTimeout = ...
+	// driver.Wait = ...
+	// redis.Config {Driver: driver}
+	// Close connection when control+C/cmd+C
+	iris.RegisterOnInterrupt(func() {
+		db.Close()
+	})
+	defer db.Close() // close the database connection if application errored.
+	sess := sessions.New(sessions.Config{
+		Cookie:          "_session_id",
+		Expires:         0, // defaults to 0: unlimited life. Another good value is: 45 * time.Minute,
+		AllowReclaim:    true,
+		CookieSecureTLS: true,
+	})
+	//
+	//
+	sess.UseDatabase(db)
+	app := example.NewApp(sess)
+	app.Listen(":8080")
diff --git a/sessions/sessiondb/redis/database.go b/sessions/sessiondb/redis/database.go
index a731b5da..2a150e21 100644
--- a/sessions/sessiondb/redis/database.go
+++ b/sessions/sessiondb/redis/database.go
@@ -31,7 +31,7 @@ type Config struct {
 	Addr string
 	// Clusters a list of network addresses for clusters.
 	// If not empty "Addr" is ignored.
-	// Currently only Radix() Driver supports it.
+	// Currently only RadixCluster() Driver supports it.
 	Clusters []string
 	// Password string .If no password then no 'AUTH'. Defaults to "".
 	Password string
@@ -51,7 +51,7 @@ type Config struct {
 	// See
 	TLSConfig *tls.Config
-	// Driver supports `Redigo()` or `Radix()` go clients for redis.
+	// Driver supports `Redigo()` or `Radix()` or `RadixCluster()` go clients for redis.
 	// Configure each driver by the return value of their constructors.
 	// Defaults to `Redigo()`.
@@ -119,8 +119,7 @@ func New(cfg ...Config) *Database {
 	db := &Database{c: c}
 	_, err := db.c.Driver.PingPong()
 	if err != nil {
-		golog.Debugf("error connecting to redis: %v", err)
-		return nil
+		panic(err)
 	// runtime.SetFinalizer(db, closeDB)
 	return db
diff --git a/sessions/sessiondb/redis/driver.go b/sessions/sessiondb/redis/driver.go
index 3aae3ab9..15135d41 100644
--- a/sessions/sessiondb/redis/driver.go
+++ b/sessions/sessiondb/redis/driver.go
@@ -19,6 +19,7 @@ type Driver interface {
 var (
 	_ Driver = (*RedigoDriver)(nil)
 	_ Driver = (*RadixDriver)(nil)
+	_ Driver = (*RadixClusterDriver)(nil)
 // Redigo returns the driver for the redigo go redis client.
@@ -32,3 +33,8 @@ func Redigo() *RedigoDriver {
 func Radix() *RadixDriver {
 	return &RadixDriver{}
+// RadixCluster returns the driver for the radix go redis client(only support redis cluster).
+func RadixCluster() *RadixClusterDriver {
+	return &RadixClusterDriver{}
diff --git a/sessions/sessiondb/redis/driver_radix.go b/sessions/sessiondb/redis/driver_radix.go
index c06cfd48..e6388caa 100644
--- a/sessions/sessiondb/redis/driver_radix.go
+++ b/sessions/sessiondb/redis/driver_radix.go
@@ -4,7 +4,6 @@ import (
-	"math/rand"
@@ -69,26 +68,9 @@ func (r *RadixDriver) Connect(c Config) error {
 	var connFunc radix.ConnFunc
-	if len(c.Clusters) > 0 {
-		cluster, err := radix.NewCluster(c.Clusters)
-		if err != nil {
-			// maybe an
-			// ERR This instance has cluster support disabled
-			return err
-		}
-		connFunc = func(network, addr string) (radix.Conn, error) {
-			topo := cluster.Topo()
-			node := topo[rand.Intn(len(topo))]
-			return radix.Dial(c.Network, node.Addr, options...)
-		}
-	} else {
-		connFunc = func(network, addr string) (radix.Conn, error) {
-			return radix.Dial(c.Network, c.Addr, options...)
-		}
+	connFunc = func(network, addr string) (radix.Conn, error) {
+		return radix.Dial(c.Network, c.Addr, options...)
 	pool, err := radix.NewPool(c.Network, c.Addr, c.MaxActive, radix.PoolConnFunc(connFunc))
 	if err != nil {
 		return err
diff --git a/sessions/sessiondb/redis/driver_radix_cluster.go b/sessions/sessiondb/redis/driver_radix_cluster.go
new file mode 100644
index 00000000..5afe1229
--- /dev/null
+++ b/sessions/sessiondb/redis/driver_radix_cluster.go
@@ -0,0 +1,223 @@
+package redis
+import (
+	"errors"
+	"fmt"
+	""
+	"strconv"
+	"time"
+// RadixDriver the Redis service based on the radix go client,
+// contains the config and the redis cluster.
+type RadixClusterDriver struct {
+	Connected bool   //Connected is true when the Service has already connected
+	Config    Config //Config the read-only redis database config.
+	cluster   *radix.Cluster
+// Connect connects to the redis, called only once
+func (r *RadixClusterDriver) Connect(c Config) error {
+	if c.Timeout < 0 {
+		c.Timeout = DefaultRedisTimeout
+	}
+	if c.Network == "" {
+		c.Network = DefaultRedisNetwork
+	}
+	if c.Addr == "" {
+		c.Addr = DefaultRedisAddr
+	}
+	if c.MaxActive == 0 {
+		c.MaxActive = 10
+	}
+	if c.Delim == "" {
+		c.Delim = DefaultDelim
+	}
+	if len(c.Clusters) < 1 {
+		return errors.New("cluster empty")
+	}
+	var options []radix.DialOpt
+	if c.Password != "" {
+		options = append(options, radix.DialAuthPass(c.Password))
+	}
+	if c.Timeout > 0 {
+		options = append(options, radix.DialTimeout(c.Timeout))
+	}
+	customConnFunc := func(network, addr string) (radix.Conn, error) {
+		return radix.Dial(network, addr,
+			radix.DialTimeout(1*time.Minute),
+			radix.DialAuthPass(c.Password),
+		)
+	}
+	poolFunc := func(network, addr string) (radix.Client, error) {
+		return radix.NewPool(network, addr, c.MaxActive, radix.PoolConnFunc(customConnFunc))
+	}
+	cluster, err := radix.NewCluster(c.Clusters, radix.ClusterPoolFunc(poolFunc))
+	if err != nil {
+		return err
+	}
+	r.Connected = true
+	r.cluster = cluster
+	r.Config = c
+	return nil
+// PingPong sends a ping and receives a pong, if no pong received then returns false and filled error
+func (r *RadixClusterDriver) PingPong() (bool, error) {
+	var msg string
+	err := r.cluster.Do(radix.Cmd(&msg, "PING"))
+	if err != nil {
+		return false, err
+	}
+	return (msg == "PONG"), nil
+// CloseConnection closes the redis connection.
+func (r *RadixClusterDriver) CloseConnection() error {
+	if r.cluster != nil {
+		return r.cluster.Close()
+	}
+	return ErrRedisClosed
+// Set sets a key-value to the redis store.
+// The expiration is setted by the secondsLifetime.
+func (r *RadixClusterDriver) Set(key string, value interface{}, secondsLifetime int64) error {
+	var cmd radix.CmdAction
+	// if has expiration, then use the "EX" to delete the key automatically.
+	if secondsLifetime > 0 {
+		cmd = radix.FlatCmd(nil, "SETEX", r.Config.Prefix+key, secondsLifetime, value)
+	} else {
+		cmd = radix.FlatCmd(nil, "SET", r.Config.Prefix+key, value) // MSET same performance...
+	}
+	return r.cluster.Do(cmd)
+// Get returns value, err by its key
+// returns nil and a filled error if something bad happened.
+func (r *RadixClusterDriver) Get(key string) (interface{}, error) {
+	var redisVal interface{}
+	mn := radix.MaybeNil{Rcv: &redisVal}
+	err := r.cluster.Do(radix.Cmd(&mn, "GET", r.Config.Prefix+key))
+	if err != nil {
+		return nil, err
+	}
+	if mn.Nil {
+		return nil, fmt.Errorf("%s: %w", key, ErrKeyNotFound)
+	}
+	return redisVal, nil
+// TTL returns the seconds to expire, if the key has expiration and error if action failed.
+// Read more at:
+func (r *RadixClusterDriver) TTL(key string) (seconds int64, hasExpiration bool, found bool) {
+	var redisVal interface{}
+	err := r.cluster.Do(radix.Cmd(&redisVal, "TTL", r.Config.Prefix+key))
+	if err != nil {
+		return -2, false, false
+	}
+	seconds = redisVal.(int64)
+	// if -1 means the key has unlimited life time.
+	hasExpiration = seconds > -1
+	// if -2 means key does not exist.
+	found = seconds != -2
+	return
+func (r *RadixClusterDriver) updateTTLConn(key string, newSecondsLifeTime int64) error {
+	var reply int
+	err := r.cluster.Do(radix.FlatCmd(&reply, "EXPIRE", r.Config.Prefix+key, newSecondsLifeTime))
+	if err != nil {
+		return err
+	}
+	// 1 if the timeout was set.
+	// 0 if key does not exist.
+	if reply == 1 {
+		return nil
+	} else if reply == 0 {
+		return fmt.Errorf("unable to update expiration, the key '%s' was stored without ttl", key)
+	} // do not check for -1.
+	return nil
+func (r RadixClusterDriver) getKeys(cursor, prefix string) ([]string, error) {
+	var res scanResult
+	err := r.cluster.Do(radix.Cmd(&res, "SCAN", cursor, "MATCH", r.Config.Prefix+prefix+"", "COUNT", "300000"))
+	if err != nil {
+		return nil, err
+	}
+	keys := res.keys[0:]
+	if res.cur != "0" {
+		moreKeys, err := r.getKeys(res.cur, prefix)
+		if err != nil {
+			return nil, err
+		}
+		keys = append(keys, moreKeys...)
+	}
+	return keys, nil
+// UpdateTTL will update the ttl of a key.
+// Using the "EXPIRE" command.
+// Read more at:
+func (r *RadixClusterDriver) UpdateTTL(key string, newSecondsLifeTime int64) error {
+	return r.updateTTLConn(key, newSecondsLifeTime)
+// UpdateTTLMany like UpdateTTL but for all keys starting with that "prefix",
+// it is a bit faster operation if you need to update all sessions keys (although it can be even faster if we used hash but this will limit other features),
+// look the sessions/Database#OnUpdateExpiration for example.
+func (r *RadixClusterDriver) UpdateTTLMany(prefix string, newSecondsLifeTime int64) error {
+	keys, err := r.getKeys("0", prefix)
+	if err != nil {
+		return err
+	}
+	for _, key := range keys {
+		if err = r.updateTTLConn(key, newSecondsLifeTime); err != nil { // fail on first error.
+			return err
+		}
+	}
+	return err
+// GetAll returns all redis entries using the "SCAN" command (2.8+).
+func (r *RadixClusterDriver) GetAll() (interface{}, error) {
+	var redisVal []interface{}
+	mn := radix.MaybeNil{Rcv: &redisVal}
+	err := r.cluster.Do(radix.Cmd(&mn, "SCAN", strconv.Itoa(0))) // 0 -> cursor
+	if err != nil {
+		return nil, err
+	}
+	if mn.Nil {
+		return nil, err
+	}
+	return redisVal, nil
+// GetKeys returns all redis keys using the "SCAN" with MATCH command.
+// Read more at:
+func (r *RadixClusterDriver) GetKeys(prefix string) ([]string, error) {
+	return r.getKeys("0", prefix)
+// Delete removes redis entry by specific key
+func (r *RadixClusterDriver) Delete(key string) error {
+	return r.cluster.Do(radix.Cmd(nil, "DEL", r.Config.Prefix+key))