package main import ( "encoding/json" "fmt" "os" "strings" "time" "github.com/IBM/sarama" "github.com/kataras/iris/v12" ) /* First of all, read about Apache Kafka, install and run it, if you didn't already: https://kafka.apache.org/quickstart Secondly, install your favourite Go library for Apache Kafka communication. I have chosen the shopify's one although I really loved the `segmentio/kafka-go` as well but it needs more to be done there and you will be bored to read all the necessary code required to get started with it, so: $ go get -u github.com/IBM/sarama The minimum Apache Kafka broker(s) version required is 0.10.0.0 but 0.11.x+ is recommended (tested with 2.5.0). Resources: - https://github.com/apache/kafka - https://github.com/IBM/sarama/blob/master/examples/http_server/http_server.go - DIY */ // package-level variables for the sake of the example // but you can define them inside your main func // and pass around this config whenever you need to create a client or a producer or a consumer or use a cluster. var ( // The Kafka brokers to connect to, as a comma separated list. brokers = []string{getenv("KAFKA_1", "localhost:9092")} // The config which makes our live easier when passing around, it pre-mades a lot of things for us. config *sarama.Config ) func getenv(key string, def string) string { if value := os.Getenv(key); value != "" { return value } return def } func init() { config = sarama.NewConfig() config.ClientID = "iris-example-client" config.Version = sarama.V0_11_0_2 // config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message. config.Producer.Compression = sarama.CompressionSnappy config.Producer.Flush.Frequency = 500 * time.Millisecond config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message. config.Producer.Return.Successes = true // for SASL/basic plain text authentication: config.Net.SASL. // config.Net.SASL.Enable = true // config.Net.SASL.Handshake = false // config.Net.SASL.User = "myuser" // config.Net.SASL.Password = "mypass" config.Consumer.Return.Errors = true } func main() { app := iris.New() app.OnErrorCode(iris.StatusNotFound, handleNotFound) v1 := app.Party("/api/v1") { topicsAPI := v1.Party("/topics") { topicsAPI.Post("/", postTopicsHandler) // create a topic. topicsAPI.Get("/", getTopicsHandler) // list all topics. topicsAPI.Post("/{topic}/produce", postTopicProduceHandler) // store to a topic. topicsAPI.Get("/{topic}/consume", getTopicConsumeSSEHandler) // retrieve all messages from a topic. } } app.Get("/", docsHandler) app.Logger().Infof("Brokers: %s", strings.Join(brokers, ", ")) // GET : http://localhost:8080 // POST, GET: http://localhost:8080/api/v1/topics // POST : http://localhost:8080/api/v1/topics/{topic}/produce?key=my-key // GET : http://localhost:8080/api/v1/topics/{topic}/consume?partition=0&offset=0 app.Listen(":8080") } // simple use-case, you can use templates and views obviously, see the "_examples/views" examples. func docsHandler(ctx iris.Context) { ctx.ContentType("text/html") // or ctx.HTML(fmt.Sprintf(...)) ctx.Writef(` `) defer ctx.Writef("") ctx.Writef("") defer ctx.Writef("") ctx.Writef(` `) defer ctx.Writef(`
Method Path Handler
`) registeredRoutes := ctx.Application().GetRoutesReadOnly() for _, r := range registeredRoutes { if r.Path() == "/" { // don't list the root, current one. continue } ctx.Writef(` %s %s%s %s `, r.Method(), ctx.Host(), r.Path(), r.MainHandlerName()) } } type httpError struct { Code int `json:"code"` Reason string `json:"reason,omitempty"` } func (h httpError) Error() string { return fmt.Sprintf("Status Code: %d\nReason: %s", h.Code, h.Reason) } func fail(ctx iris.Context, statusCode int, format string, a ...interface{}) { reason := "unspecified" if format != "" { reason = fmt.Sprintf(format, a...) } err := httpError{ Code: statusCode, Reason: reason, } ctx.StopWithJSON(statusCode, err) } func handleNotFound(ctx iris.Context) { suggestPaths := ctx.FindClosest(3) if len(suggestPaths) == 0 { ctx.WriteString("not found") return } ctx.HTML("Did you mean?") } // Topic the payload for a kafka topic creation. type Topic struct { Topic string `json:"topic"` Partitions int32 `json:"partitions"` ReplicationFactor int16 `json:"replication"` Configs []kv `json:"configs,omitempty"` } type kv struct { Key string `json:"key"` Value string `json:"value"` } func createKafkaTopic(t Topic) error { cluster, err := sarama.NewClusterAdmin(brokers, config) if err != nil { return err } defer cluster.Close() topicName := t.Topic topicDetail := sarama.TopicDetail{ NumPartitions: t.Partitions, ReplicationFactor: t.ReplicationFactor, } if len(t.Configs) > 0 { topicDetail.ConfigEntries = make(map[string]*string, len(t.Configs)) for _, c := range t.Configs { topicDetail.ConfigEntries[c.Key] = &c.Value // generate a ptr, or fill a new(string) with it and use that. } } return cluster.CreateTopic(topicName, &topicDetail, false) } func postTopicsHandler(ctx iris.Context) { var t Topic err := ctx.ReadJSON(&t) if err != nil { fail(ctx, iris.StatusBadRequest, "received invalid topic payload: %v", err) return } // try to create the topic inside kafka. err = createKafkaTopic(t) if err != nil { fail(ctx, iris.StatusInternalServerError, "unable to create topic: %v", err) return } ctx.StatusCode(iris.StatusCreated) ctx.Writef("Topic %q created", t.Topic) } func getKafkaTopics() ([]string, error) { client, err := sarama.NewClient(brokers, config) if err != nil { return nil, err } defer client.Close() return client.Topics() } func getTopicsHandler(ctx iris.Context) { topics, err := getKafkaTopics() if err != nil { fail(ctx, iris.StatusInternalServerError, "unable to retrieve topics: %v", err) return } ctx.JSON(topics) } func produceKafkaMessage(toTopic string, key string, value []byte) (partition int32, offset int64, err error) { // On the broker side, you may want to change the following settings to get // stronger consistency guarantees: // - For your broker, set `unclean.leader.election.enable` to false // - For the topic, you could increase `min.insync.replicas`. producer, err := sarama.NewSyncProducer(brokers, config) if err != nil { return -1, -1, err } defer producer.Close() // We are not setting a message key, which means that all messages will // be distributed randomly over the different partitions. return producer.SendMessage(&sarama.ProducerMessage{ Topic: toTopic, Key: sarama.StringEncoder(key), Value: sarama.ByteEncoder(value), }) } func postTopicProduceHandler(ctx iris.Context) { topicName := ctx.Params().Get("topic") key := ctx.URLParamDefault("key", "default") // read the request data and store them as they are (not recommended in production ofcourse, do your own checks here). body, err := ctx.GetBody() if err != nil { fail(ctx, iris.StatusUnprocessableEntity, "unable to read your data: %v", err) return } partition, offset, err := produceKafkaMessage(topicName, key, body) if err != nil { fail(ctx, iris.StatusInternalServerError, "failed to store your data: %v", err) return } // The tuple (topic, partition, offset) can be used as a unique identifier // for a message in a Kafka cluster. ctx.Writef("Your data is stored with unique identifier: %s/%d/%d", topicName, partition, offset) } type message struct { Time time.Time `json:"time"` Key string `json:"key"` // Value []byte/json.RawMessage(if you are sure that you are sending only JSON) `json:"value"` // or: Value string `json:"value"` // for simple key-value storage. } func getTopicConsumeSSEHandler(ctx iris.Context) { flusher, ok := ctx.ResponseWriter().Flusher() if !ok { ctx.StopWithText(iris.StatusHTTPVersionNotSupported, "streaming unsupported") return } ctx.ContentType("application/json, text/event-stream") ctx.Header("Cache-Control", "no-cache") ctx.Header("Connection", "keep-alive") master, err := sarama.NewConsumer(brokers, config) if err != nil { fail(ctx, iris.StatusInternalServerError, "unable to start master consumer: %v", err) return } fromTopic := ctx.Params().Get("topic") // take the partition, defaults to the first found if not url query parameter "partition" passed. var partition int32 partitions, err := master.Partitions(fromTopic) if err != nil { master.Close() fail(ctx, iris.StatusInternalServerError, "unable to get partitions for topic: '%s': %v", fromTopic, err) return } if len(partitions) > 0 { partition = partitions[0] } partition = ctx.URLParamInt32Default("partition", partition) offset := ctx.URLParamInt64Default("offset", sarama.OffsetOldest) consumer, err := master.ConsumePartition(fromTopic, partition, offset) if err != nil { ctx.Application().Logger().Error(err) master.Close() // close the master here to avoid any leaks, we will exit. fail(ctx, iris.StatusInternalServerError, "unable to start partition consumer: %v", err) return } // `OnClose` fires when the request is finally done (all data read and handler exits) or interrupted by the user. ctx.OnClose(func(_ iris.Context) { ctx.Application().Logger().Warnf("a client left") // Close shuts down the consumer. It must be called after all child // PartitionConsumers have already been closed. <-- That is what // godocs says but it doesn't work like this. // if err = consumer.Close(); err != nil { // ctx.Application().Logger().Errorf("[%s] unable to close partition consumer: %v", ctx.RemoteAddr(), err) // } // so close the master only and omit the first ^ consumer.Close: if err = master.Close(); err != nil { ctx.Application().Logger().Errorf("[%s] unable to close master consumer: %v", ctx.RemoteAddr(), err) } }) for { select { case consumerErr, ok := <-consumer.Errors(): if !ok { return } ctx.Writef("data: error: {\"reason\": \"%s\"}\n\n", consumerErr.Error()) flusher.Flush() case incoming, ok := <-consumer.Messages(): if !ok { return } msg := message{ Time: incoming.Timestamp, Key: string(incoming.Key), Value: string(incoming.Value), } b, err := json.Marshal(msg) if err != nil { ctx.Application().Logger().Error(err) continue } ctx.Writef("data: %s\n\n", b) flusher.Flush() } } }