diff --git a/_examples/README.md b/_examples/README.md index e7356dfc..c281b5ff 100644 --- a/_examples/README.md +++ b/_examples/README.md @@ -71,6 +71,7 @@ $ go run main.go - [Tutorial: DropzoneJS Uploader](tutorial/dropzonejs) - [Tutorial: Caddy](tutorial/caddy) - [Tutorial:Iris Go Framework + MongoDB](https://medium.com/go-language/iris-go-framework-mongodb-552e349eab9c) +- [Tutorial: API for Apache Kafka](tutorial/api-for-apache-kafka) **NEW** ### Structuring diff --git a/_examples/README_ZH.md b/_examples/README_ZH.md index e4df3ed6..59c8dc74 100644 --- a/_examples/README_ZH.md +++ b/_examples/README_ZH.md @@ -21,6 +21,7 @@ - [教程: DropzoneJS 上传](tutorial/dropzonejs) - [教程: Caddy 服务器使用](tutorial/caddy) - [教程: Iris + MongoDB](https://medium.com/go-language/iris-go-framework-mongodb-552e349eab9c) +- [教程: API for Apache Kafka](tutorial/api-for-apache-kafka) **NEW** ### 目录结构 diff --git a/_examples/tutorial/api-for-apache-kafka/1_create_topic.png b/_examples/tutorial/api-for-apache-kafka/1_create_topic.png new file mode 100644 index 00000000..15f172d6 Binary files /dev/null and b/_examples/tutorial/api-for-apache-kafka/1_create_topic.png differ diff --git a/_examples/tutorial/api-for-apache-kafka/2_list_topics.png b/_examples/tutorial/api-for-apache-kafka/2_list_topics.png new file mode 100644 index 00000000..8ff6df54 Binary files /dev/null and b/_examples/tutorial/api-for-apache-kafka/2_list_topics.png differ diff --git a/_examples/tutorial/api-for-apache-kafka/3_store_to_topic.png b/_examples/tutorial/api-for-apache-kafka/3_store_to_topic.png new file mode 100644 index 00000000..c0fc8114 Binary files /dev/null and b/_examples/tutorial/api-for-apache-kafka/3_store_to_topic.png differ diff --git a/_examples/tutorial/api-for-apache-kafka/4_retrieve_from_topic_real_time.png b/_examples/tutorial/api-for-apache-kafka/4_retrieve_from_topic_real_time.png new file mode 100644 index 00000000..b336205b Binary files /dev/null and b/_examples/tutorial/api-for-apache-kafka/4_retrieve_from_topic_real_time.png differ diff --git a/_examples/tutorial/api-for-apache-kafka/README.md b/_examples/tutorial/api-for-apache-kafka/README.md new file mode 100644 index 00000000..b478b229 --- /dev/null +++ b/_examples/tutorial/api-for-apache-kafka/README.md @@ -0,0 +1,19 @@ +# Writing an API for Apache Kafka with Iris + +Article is coming soon, follow and stay tuned + +- +- + +Read [the fully functional example](src/main.go). + +## Screens + +![](1_create_topic.png) + +![](2_list_topics.png) + +![](3_store_to_topic.png) + +![](4_retrieve_from_topic_real_time.png) + diff --git a/_examples/tutorial/api-for-apache-kafka/src/main.go b/_examples/tutorial/api-for-apache-kafka/src/main.go new file mode 100644 index 00000000..4448a349 --- /dev/null +++ b/_examples/tutorial/api-for-apache-kafka/src/main.go @@ -0,0 +1,325 @@ +package main + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "time" + + "github.com/Shopify/sarama" + "github.com/kataras/iris" +) + +/* +First of all, read about Apache Kafka, install and run it, if you didn't already: https://kafka.apache.org/quickstart + +Secondly, install your favourite Go library for Apache Kafka communication. +I have chosen the shopify's one although I really loved the `segmentio/kafka-go` as well but it needs more to be done there +and you will be bored to read all the necessary code required to get started with it, so: + $ go get -u github.com/Shopify/sarama + +The minimum Apache Kafka broker(s) version required is 0.10.0.0 but 0.11.x+ is recommended (tested with 2.0). + +Resources: + - https://github.com/apache/kafka + - https://github.com/Shopify/sarama/blob/master/examples/http_server/http_server.go + - DIY +*/ + +// package-level variables for the shake of the example +// but you can define them inside your main func +// and pass around this config whenever you need to create a client or a producer or a consumer or use a cluster. +var ( + // The Kafka brokers to connect to, as a comma separated list. + brokers = []string{"localhost:9092"} + // The config which makes our live easier when passing around, it pre-mades a lot of things for us. + config *sarama.Config +) + +func init() { + config = sarama.NewConfig() + config.ClientID = "iris-example-client" + config.Version = sarama.V0_11_0_2 + // config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message. + config.Producer.Compression = sarama.CompressionSnappy + config.Producer.Flush.Frequency = 500 * time.Millisecond + config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message. + config.Producer.Return.Successes = true + + config.Consumer.Return.Errors = true +} + +func main() { + app := iris.New() + app.OnAnyErrorCode(handleErrors) + + v1 := app.Party("/api/v1") + { + topicsAPI := v1.Party("/topics") + { + topicsAPI.Post("/", postTopicsHandler) // create a topic. + topicsAPI.Get("/", getTopicsHandler) // list all topics. + + topicsAPI.Post("/{topic:string}/produce", postTopicProduceHandler) // store to a topic. + topicsAPI.Get("/{topic:string}/consume", getTopicConsumeSSEHandler) // retrieve all messages from a topic. + } + } + + // POST, GET: http://localhost:8080/api/v1/topics + // POST : http://localhost:8080/apiv1/topics/{topic}/produce?key=my-key + // GET : http://localhost:8080/apiv1/topics/{topic}/consume?partition=0&offset=0 (these url query parameters are optional) + app.Run(iris.Addr(":8080"), iris.WithoutServerError(iris.ErrServerClosed)) +} + +type httpError struct { + Code int `json:"code"` + Reason string `json:"reason"` +} + +func (h httpError) Error() string { + return fmt.Sprintf("Status Code: %d\nReason: %s", h.Code, h.Reason) +} + +const reasonKey = "reason" + +func fail(ctx iris.Context, statusCode int, format string, a ...interface{}) { + ctx.StatusCode(statusCode) + if format != "" { + ctx.Values().Set(reasonKey, fmt.Sprintf(format, a...)) + } + + // no next handlers will run, you can comment the below if you want, + // error code will still be written. + ctx.StopExecution() +} + +func handleErrors(ctx iris.Context) { + err := httpError{ + Code: ctx.GetStatusCode(), + Reason: ctx.Values().GetStringDefault(reasonKey, "unknown"), + } + + ctx.JSON(err) +} + +// Topic the payload for a kafka topic creation. +type Topic struct { + Topic string `json:"topic"` + Partitions int32 `json:"partitions"` + ReplicationFactor int16 `json:"replication"` + Configs []kv `json:"configs,omitempty"` +} + +type kv struct { + Key string `json:"key"` + Value string `json:"value"` +} + +func createKafkaTopic(t Topic) error { + cluster, err := sarama.NewClusterAdmin(brokers, config) + if err != nil { + return err + } + defer cluster.Close() + + topicName := t.Topic + topicDetail := sarama.TopicDetail{ + NumPartitions: t.Partitions, + ReplicationFactor: t.ReplicationFactor, + } + + if len(t.Configs) > 0 { + topicDetail.ConfigEntries = make(map[string]*string, len(t.Configs)) + for _, c := range t.Configs { + topicDetail.ConfigEntries[c.Key] = &c.Value // generate a ptr, or fill a new(string) with it and use that. + } + } + + return cluster.CreateTopic(topicName, &topicDetail, false) +} + +func postTopicsHandler(ctx iris.Context) { + var t Topic + err := ctx.ReadJSON(&t) + if err != nil { + fail(ctx, iris.StatusBadRequest, + "received invalid topic payload: %v", err) + return + } + + // try to create the topic inside kafka. + err = createKafkaTopic(t) + if err != nil { + fail(ctx, iris.StatusInternalServerError, + "unable to create topic: %v", err) + return + } + + // unnecessary statement but it's here to show you that topic is created, + // depending on your API expectations and how you used to work + // you may want to change the status code to something like `iris.StatusCreated`. + ctx.StatusCode(iris.StatusOK) +} + +func getKafkaTopics() ([]string, error) { + client, err := sarama.NewClient(brokers, config) + if err != nil { + return nil, err + } + defer client.Close() + + return client.Topics() +} + +func getTopicsHandler(ctx iris.Context) { + topics, err := getKafkaTopics() + if err != nil { + fail(ctx, iris.StatusInternalServerError, + "unable to retrieve topics: %v", err) + return + } + + ctx.JSON(topics) +} + +func produceKafkaMessage(toTopic string, key string, value []byte) (partition int32, offset int64, err error) { + // On the broker side, you may want to change the following settings to get + // stronger consistency guarantees: + // - For your broker, set `unclean.leader.election.enable` to false + // - For the topic, you could increase `min.insync.replicas`. + + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + return -1, -1, err + } + defer producer.Close() + + // We are not setting a message key, which means that all messages will + // be distributed randomly over the different partitions. + return producer.SendMessage(&sarama.ProducerMessage{ + Topic: toTopic, + Key: sarama.StringEncoder(key), + Value: sarama.ByteEncoder(value), + }) +} + +func postTopicProduceHandler(ctx iris.Context) { + topicName := ctx.Params().Get("topic") + key := ctx.URLParamDefault("key", "default") + + // read the request data and store them as they are (not recommended in production ofcourse, do your own checks here). + body, err := ioutil.ReadAll(ctx.Request().Body) + if err != nil { + fail(ctx, iris.StatusUnprocessableEntity, "unable to read your data: %v", err) + return + } + + partition, offset, err := produceKafkaMessage(topicName, key, body) + if err != nil { + fail(ctx, iris.StatusInternalServerError, "failed to store your data: %v", err) + return + } + + // The tuple (topic, partition, offset) can be used as a unique identifier + // for a message in a Kafka cluster. + ctx.Writef("Your data is stored with unique identifier: %s/%d/%d", topicName, partition, offset) +} + +type message struct { + Time time.Time `json:"time"` + Key string `json:"key"` + // Value []byte/json.RawMessage(if you are sure that you are sending only JSON) `json:"value"` + // or: + Value string `json:"value"` // for simple key-value storage. +} + +func getTopicConsumeSSEHandler(ctx iris.Context) { + flusher, ok := ctx.ResponseWriter().Flusher() + if !ok { + ctx.StatusCode(iris.StatusHTTPVersionNotSupported) + ctx.WriteString("streaming unsupported") + return + } + + ctx.ContentType("application/json, text/event-stream") + ctx.Header("Cache-Control", "no-cache") + ctx.Header("Connection", "keep-alive") + + master, err := sarama.NewConsumer(brokers, config) + if err != nil { + fail(ctx, iris.StatusInternalServerError, "unable to start master consumer: %v", err) + return + } + + fromTopic := ctx.Params().Get("topic") + // take the partition, defaults to the first found if not url query parameter "partition" passed. + var partition int32 + partitions, err := master.Partitions(fromTopic) + if err != nil { + master.Close() + fail(ctx, iris.StatusInternalServerError, "unable to get partitions for topic: '%s': %v", fromTopic, err) + return + } + + if len(partitions) > 0 { + partition = partitions[0] + } + + partition = ctx.URLParamInt32Default("partition", partition) + offset := ctx.URLParamInt64Default("offset", sarama.OffsetOldest) + + consumer, err := master.ConsumePartition(fromTopic, partition, offset) + if err != nil { + ctx.Application().Logger().Error(err) + master.Close() // close the master here to avoid any leaks, we will exit. + fail(ctx, iris.StatusInternalServerError, "unable to start partition consumer: %v", err) + return + } + + // `OnClose` fires when the request is finally done (all data read and handler exits) or interrupted by the user. + ctx.OnClose(func() { + ctx.Application().Logger().Warnf("a client left") + + // Close shuts down the consumer. It must be called after all child + // PartitionConsumers have already been closed. <-- That is what + // godocs says but it doesn't work like this. + // if err = consumer.Close(); err != nil { + // ctx.Application().Logger().Errorf("[%s] unable to close partition consumer: %v", ctx.RemoteAddr(), err) + // } + // so close the master only and omit the first ^ consumer.Close: + if err = master.Close(); err != nil { + ctx.Application().Logger().Errorf("[%s] unable to close master consumer: %v", ctx.RemoteAddr(), err) + } + }) + + for { + select { + case consumerErr, ok := <-consumer.Errors(): + if !ok { + return + } + ctx.Writef("data: error: {\"reason\": \"%s\"}\n\n", consumerErr.Error()) + flusher.Flush() + case incoming, ok := <-consumer.Messages(): + if !ok { + return + } + + msg := message{ + Time: incoming.Timestamp, + Key: string(incoming.Key), + Value: string(incoming.Value), + } + + b, err := json.Marshal(msg) + if err != nil { + ctx.Application().Logger().Error(err) + continue + } + + ctx.Writef("data: %s\n\n", b) + flusher.Flush() + } + } + +} diff --git a/context/context.go b/context/context.go index c842bbc7..c6923c37 100644 --- a/context/context.go +++ b/context/context.go @@ -446,7 +446,7 @@ type Context interface { // Keep note that this checks the "User-Agent" request header. IsMobile() bool // +------------------------------------------------------------+ - // | Response Headers helpers | + // | Headers helpers | // +------------------------------------------------------------+ // Header adds a header to the response writer. @@ -457,16 +457,18 @@ type Context interface { // GetContentType returns the response writer's header value of "Content-Type" // which may, setted before with the 'ContentType'. GetContentType() string + // GetContentType returns the request's header value of "Content-Type". + GetContentTypeRequested() string // GetContentLength returns the request's header value of "Content-Length". // Returns 0 if header was unable to be found or its value was not a valid number. GetContentLength() int64 // StatusCode sets the status code header to the response. - // Look .GetStatusCode too. + // Look .`GetStatusCode` too. StatusCode(statusCode int) // GetStatusCode returns the current status code of the response. - // Look StatusCode too. + // Look `StatusCode` too. GetStatusCode() int // Redirect sends a redirect response to the client @@ -501,6 +503,9 @@ type Context interface { // URLParamIntDefault returns the url query parameter as int value from a request, // if not found or parse failed then "def" is returned. URLParamIntDefault(name string, def int) int + // URLParamInt32Default returns the url query parameter as int32 value from a request, + // if not found or parse failed then "def" is returned. + URLParamInt32Default(name string, def int32) int32 // URLParamInt64 returns the url query parameter as int64 value from a request, // returns -1 and an error if parse failed. URLParamInt64(name string) (int64, error) @@ -648,6 +653,10 @@ type Context interface { // Examples of usage: context.ReadJSON, context.ReadXML. // // Example: https://github.com/kataras/iris/blob/master/_examples/http_request/read-custom-via-unmarshaler/main.go + // + // UnmarshalBody does not check about gzipped data. + // Do not rely on compressed data incoming to your server. The main reason is: https://en.wikipedia.org/wiki/Zip_bomb + // However you are still free to read the `ctx.Request().Body io.Reader` manually. UnmarshalBody(outPtr interface{}, unmarshaler Unmarshaler) error // ReadJSON reads JSON from request's body and binds it to a pointer of a value of any json-valid type. // @@ -1720,6 +1729,11 @@ func (ctx *context) GetContentType() string { return ctx.writer.Header().Get(ContentTypeHeaderKey) } +// GetContentType returns the request's header value of "Content-Type". +func (ctx *context) GetContentTypeRequested() string { + return ctx.GetHeader(ContentTypeHeaderKey) +} + // GetContentLength returns the request's header value of "Content-Length". // Returns 0 if header was unable to be found or its value was not a valid number. func (ctx *context) GetContentLength() int64 { @@ -1819,6 +1833,21 @@ func (ctx *context) URLParamIntDefault(name string, def int) int { return v } +// URLParamInt32Default returns the url query parameter as int32 value from a request, +// if not found or parse failed then "def" is returned. +func (ctx *context) URLParamInt32Default(name string, def int32) int32 { + if v := ctx.URLParam(name); v != "" { + n, err := strconv.ParseInt(v, 10, 32) + if err != nil { + return def + } + + return int32(n) + } + + return def +} + // URLParamInt64 returns the url query parameter as int64 value from a request, // returns -1 and an error if parse failed or not found. func (ctx *context) URLParamInt64(name string) (int64, error) { @@ -2224,6 +2253,10 @@ func (ctx *context) SetMaxRequestBodySize(limitOverBytes int64) { // Examples of usage: context.ReadJSON, context.ReadXML. // // Example: https://github.com/kataras/iris/blob/master/_examples/http_request/read-custom-via-unmarshaler/main.go +// +// UnmarshalBody does not check about gzipped data. +// Do not rely on compressed data incoming to your server. The main reason is: https://en.wikipedia.org/wiki/Zip_bomb +// However you are still free to read the `ctx.Request().Body io.Reader` manually. func (ctx *context) UnmarshalBody(outPtr interface{}, unmarshaler Unmarshaler) error { if ctx.request.Body == nil { return errors.New("unmarshal: empty body")