minor improvements to the kafka-api example

This commit is contained in:
Gerasimos (Makis) Maropoulos 2020-09-18 13:58:31 +03:00
parent d5a179cc45
commit d1f32e723a
No known key found for this signature in database
GPG Key ID: 5DBE766BD26A54E7
5 changed files with 162 additions and 12 deletions

View File

@ -12,6 +12,22 @@ Read the [code](main.go).
$ docker-compose up $ docker-compose up
``` ```
### Troubleshooting
On windows, if you get an error of `An attempt was made to access a socket in a way forbidden by its access permissions`
Solution:
1. Stop Docker
2. Open CMD with Administrator privileges and execute the following commands:
```sh
$ dism.exe /Online /Disable-Feature:Microsoft-Hyper-V
$ netsh int ipv4 add excludedportrange protocol=tcp startport=2181 numberofports=1
$ dism.exe /Online /Enable-Feature:Microsoft-Hyper-V /All
$ docker-compose up --build
```
## Manually ## Manually
Install & run Kafka and Zookeper locally and then: Install & run Kafka and Zookeper locally and then:

View File

@ -10,7 +10,7 @@ services:
ports: ports:
- 9092:9092 - 9092:9092
environment: environment:
KAFKA_ADVERTISED_HOST_NAME: 10.122.1.109 # replace that with your own local ipv4 addr. KAFKA_ADVERTISED_HOST_NAME: 10.122.1.142 # replace that with your own local ipv4 addr.
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# kafka: # kafka:
# image: confluentinc/cp-kafka:5.5.0 # image: confluentinc/cp-kafka:5.5.0

View File

@ -3,6 +3,6 @@ module myapp
go 1.15 go 1.15
require ( require (
github.com/Shopify/sarama v1.26.4 github.com/Shopify/sarama v1.27.0
github.com/kataras/iris/v12 master github.com/kataras/iris/v12 master
) )

View File

@ -3,7 +3,6 @@ package main
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"os" "os"
"strings" "strings"
"time" "time"
@ -86,8 +85,8 @@ func main() {
app.Logger().Infof("Brokers: %s", strings.Join(brokers, ", ")) app.Logger().Infof("Brokers: %s", strings.Join(brokers, ", "))
// GET : http://localhost:8080 // GET : http://localhost:8080
// POST, GET: http://localhost:8080/api/v1/topics // POST, GET: http://localhost:8080/api/v1/topics
// POST : http://localhost:8080/apiv1/topics/{topic}/produce?key=my-key // POST : http://localhost:8080/api/v1/topics/{topic}/produce?key=my-key
// GET : http://localhost:8080/apiv1/topics/{topic}/consume?partition=0&offset=0 // GET : http://localhost:8080/api/v1/topics/{topic}/consume?partition=0&offset=0
app.Listen(":8080") app.Listen(":8080")
} }
@ -138,7 +137,7 @@ func docsHandler(ctx iris.Context) {
type httpError struct { type httpError struct {
Code int `json:"code"` Code int `json:"code"`
Reason string `json:"reason"` Reason string `json:"reason,omitempty"`
} }
func (h httpError) Error() string { func (h httpError) Error() string {
@ -226,10 +225,8 @@ func postTopicsHandler(ctx iris.Context) {
return return
} }
// unnecessary statement but it's here to show you that topic is created, ctx.StatusCode(iris.StatusCreated)
// depending on your API expectations and how you used to work ctx.Writef("Topic %q created", t.Topic)
// you may want to change the status code to something like `iris.StatusCreated`.
ctx.StatusCode(iris.StatusOK)
} }
func getKafkaTopics() ([]string, error) { func getKafkaTopics() ([]string, error) {
@ -279,7 +276,7 @@ func postTopicProduceHandler(ctx iris.Context) {
key := ctx.URLParamDefault("key", "default") key := ctx.URLParamDefault("key", "default")
// read the request data and store them as they are (not recommended in production ofcourse, do your own checks here). // read the request data and store them as they are (not recommended in production ofcourse, do your own checks here).
body, err := ioutil.ReadAll(ctx.Request().Body) body, err := ctx.GetBody()
if err != nil { if err != nil {
fail(ctx, iris.StatusUnprocessableEntity, "unable to read your data: %v", err) fail(ctx, iris.StatusUnprocessableEntity, "unable to read your data: %v", err)
return return
@ -347,7 +344,7 @@ func getTopicConsumeSSEHandler(ctx iris.Context) {
} }
// `OnClose` fires when the request is finally done (all data read and handler exits) or interrupted by the user. // `OnClose` fires when the request is finally done (all data read and handler exits) or interrupted by the user.
ctx.OnClose(func() { ctx.OnClose(func(_ iris.Context) {
ctx.Application().Logger().Warnf("a client left") ctx.Application().Logger().Warnf("a client left")
// Close shuts down the consumer. It must be called after all child // Close shuts down the consumer. It must be called after all child

View File

@ -0,0 +1,137 @@
{
"info": {
"_postman_id": "8b135d95-ea8c-4dd5-a127-4b83cb735504",
"name": "iris-kafka-postman",
"description": "Postman API Requests for Iris + Kafka example",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "Create Topic",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n \"topic\":\"mytopic\",\r\n \"partitions\": 1,\r\n \"replication\":1\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://localhost:8080/api/v1/topics",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"v1",
"topics"
]
},
"description": "Create a new kafka topic"
},
"response": []
},
{
"name": "List all Topics",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://localhost:8080/api/v1/topics",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"v1",
"topics"
]
},
"description": "List all topics"
},
"response": []
},
{
"name": "Store data to Topic",
"request": {
"method": "POST",
"header": [],
"body": {
"mode": "raw",
"raw": "{\r\n \"username\":\"kataras\",\r\n \"repo\":\"iris\"\r\n}",
"options": {
"raw": {
"language": "json"
}
}
},
"url": {
"raw": "http://localhost:8080/api/v1/topics/mytopic/produce?key=mykey",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"v1",
"topics",
"mytopic",
"produce"
],
"query": [
{
"key": "key",
"value": "mykey"
}
]
},
"description": "Produce some data to a Topic"
},
"response": []
},
{
"name": "(Open in Browser) Consume data from a Topic",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "http://localhost:8080/api/v1/topics/mytopic/consume?partition=0&offset=0",
"protocol": "http",
"host": [
"localhost"
],
"port": "8080",
"path": [
"api",
"v1",
"topics",
"mytopic",
"consume"
],
"query": [
{
"key": "partition",
"value": "0"
},
{
"key": "offset",
"value": "0"
}
]
},
"description": "Note that, you have to open this one at your browser. Postman does not support SSE testing, see: https://github.com/postmanlabs/postman-app-support/issues/6682"
},
"response": []
}
],
"protocolProfileBehavior": {}
}