赞
踩
需要先安装好docker
docker pull confluentinc/cp-kafka
docker pull confluentinc/cp-zookeeper
因为只是一个基础的测试, 关于broker其他的参数就省略不设置了, 后续后机会可以再讲讲
--- version: '2' services: cp-zookeeper-1: user: root restart: always container_name: cp-zookeeper-1 image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 cp-zookeeper-2: user: root restart: always container_name: cp-zookeeper-2 image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 32181:2181 cp-zookeeper-3: user: root restart: always container_name: cp-zookeeper-3 image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 42181:2181 kafka-1: user: root restart: always container_name: kafka-1 image: confluentinc/cp-kafka:latest depends_on: - cp-zookeeper-1 - cp-zookeeper-2 - cp-zookeeper-3 ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper-1:2181,cp-zookeeper-2:2181,cp-zookeeper-3:2181/kafkas-a KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-2: user: root restart: always container_name: kafka-2 image: confluentinc/cp-kafka:latest depends_on: - cp-zookeeper-1 - cp-zookeeper-2 - cp-zookeeper-3 ports: - 39092:39092 environment: KAFKA_BROKER_ID: 2 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper-1:2181,cp-zookeeper-2:2181,cp-zookeeper-3:2181/kafkas-a KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 kafka-3: user: root restart: always container_name: kafka-3 image: confluentinc/cp-kafka:latest depends_on: - cp-zookeeper-1 - cp-zookeeper-2 - cp-zookeeper-3 ports: - 49092:39092 environment: KAFKA_BROKER_ID: 3 KAFKA_ZOOKEEPER_CONNECT: cp-zookeeper-1:2181,cp-zookeeper-2:2181,cp-zookeeper-3:2181/kafkas-a KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:9092,PLAINTEXT_HOST://localhost:39092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
$ docker-compose -f docker_compose_cp-kafka-cluster.yaml up -d
[+] Running 6/6
⠿ Container cp-zookeeper-2 Started 0.4s
⠿ Container cp-zookeeper-1 Started 0.4s
⠿ Container cp-zookeeper-3 Started 0.4s
⠿ Container kafka-2 Started 0.5s
⠿ Container kafka-1 Started 0.7s
⠿ Container kafka-3 Started
使用到的库链接:go-utils
config in yaml
Kafka: localhost:29092,localhost:39092,localhost:49092 #for cp/kafka
code main body
const ( TopicCoinOption = "coinOption" // charge, add, decrease TopicFlowFlow = "followFlow" // follow, unfollow ) func TestKafkaConsumerGroup(t *testing.T) { // Producer go func() { var syncProducer sarama.SyncProducer syncProducer = gkafka.InitSyncProducer(gkafka.Hosts(strings.Split(GetKafkaConfig().Kafka, ","))) logrus.Infof("init kafka host:%v", GetKafkaConfig().Kafka) tick := time.Tick(1 * time.Second) ctx, cancel := context.WithCancel(context.Background()) gexit.Close(cancel) Loop: for { select { case <-tick: uu, _ := uuid.GenerateUUID() sid := algorithms.Snowflake.NextOptStreamID() var msg *UserCoinOption = &UserCoinOption{ Header: gkafka.Header{ SeqID: gcast.ToString(sid), TraceID: fmt.Sprintf("%v.%v", uu, ginfos.FuncName()), CorrelationID: uu, Topic: TopicCoinOption, ContentType: gkafka.ContentTypeJSON, Key: gkafka.KeyInfo{ KeyType: gkafka.KeyTypeProductID, KeyContent: "my_app", }, Source: ginfos.Runtime.IP(), Timestamp: time.Now().Unix(), }, SID: sid, UID: 123456, Type: 100, Amount: int64(1) + rand.Int63()%99, Balance: 100, } if err := gkafka.PublishSyncMessage(syncProducer, TopicCoinOption, msg); err != nil { logrus.Errorf("publish error|msg=%v, err:%v", msg, err) } else { logrus.Infof("[producer]push ok|msg=%v", msg) } case <-ctx.Done(): logrus.Errorf("publish error") break Loop } } }() // Consumer go func() { gkafka.InitKafkaConsumerGroup(gkafka.Hosts(strings.Split(GetKafkaConfig().Kafka, ",")), gkafka.Retry(true), gkafka.Topics( TopicCoinOption, TopicFlowFlow, ), gkafka.Handler(kafkaHander)) }() gexit.Wait() } func kafkaHander(ctx context.Context, consumerMsg *sarama.ConsumerMessage) (err error) { switch consumerMsg.Topic { case TopicCoinOption: err = handleCoinOption(ctx, consumerMsg.Value) case TopicFlowFlow: default: logrus.Warningf("unknown topic:%v,msg:%v", consumerMsg.Topic, consumerMsg.Value) } return nil } type UserCoinOption struct { gkafka.Header SID int64 `json:"sid"` UID int64 `json:"uid"` Type int64 `json:"type"` Amount int64 `json:"amount"` Balance int64 `json:"balance"` } func (msg *UserCoinOption) RandMarshal() (err error) { msg = &UserCoinOption{} uu, _ := uuid.GenerateUUID() msg.Header = gkafka.Header{ SeqID: gcast.ToString(algorithms.Snowflake.NextOptStreamID()), TraceID: fmt.Sprintf("%v.%v", uu, ginfos.FuncName()), CorrelationID: uu, Topic: TopicCoinOption, ContentType: gkafka.ContentTypeJSON, Key: gkafka.KeyInfo{}, Source: ginfos.Runtime.IP(), Timestamp: time.Now().Unix(), } return nil } func (msg *UserCoinOption) Unmarshal(b []byte) (err error) { err = json.Unmarshal(b, msg) return err } func handleCoinOption(ctx context.Context, b []byte) (err error) { var msg *UserCoinOption = &UserCoinOption{} err = msg.Unmarshal(b) if err != nil { logrus.Errorf("unmarshal err:%v", b) return err } logrus.Infof("[consumer]marshal ok:%v", msg) return nil }
15039-03-15 10:36:11.291 [/Users/xxx/mod/gomod/pkg/mod/github.com/aipave/go-utils@v0.0.58/gkafka/producer_with_sync.go:67 gkafka.PublishSyncMessage] [INFO] send message success, topic:coinOption partition:0 offset:688 value:{"id":"1635832233163300864","traceID":"3340cb86-464b-790f-92bc-f53a3057b6b1.func1","correlationID":"3340cb86-464b-790f-92bc-f53a3057b6b1","topic":"coinOption","contentType":"json","keyInfo":{"keyType":"key/product","keyContent":"my_app"},"version":"","source":"172.17.17.153","timestamp":1678847771,"sid":1635832233163300864,"uid":123456,"type":100,"amount":33,"balance":100}
15039-03-15 10:36:11.292 [/Users/xxx/mod/go-utils/test-example/gkafka_test.go:184 command-line-arguments.TestKafkaConsumerGroup.func1] [INFO] [producer]push ok|msg=&{{1635832233163300864 3340cb86-464b-790f-92bc-f53a3057b6b1.func1 3340cb86-464b-790f-92bc-f53a3057b6b1 coinOption json {key/product my_app} 172.17.17.153 1678847771} 1635832233163300864 123456 100 33 100}
15039-03-15 10:36:11.292 [/Users/xxx/mod/go-utils/test-example/gkafka_test.go:259 command-line-arguments.handleCoinOption] [INFO] [consumer]marshal ok:&{{1635832233163300864 3340cb86-464b-790f-92bc-f53a3057b6b1.func1 3340cb86-464b-790f-92bc-f53a3057b6b1 coinOption json {key/product my_app} 172.17.17.153 1678847771} 1635832233163300864 123456 100 33 100}
可以看到我们的kafka集群就搭建完毕了, 测试生产者消费者, 等等也可以通过了!
version: '3' networks: app-tier: driver: bridge external: true services: # When stopping the kafka cluster, be sure to wait until all node processes of kafka are stopped before stopping the zookeeper cluster. Because the kafka cluster related information is recorded in the zookeeper cluster, once the zookeeper cluster is stopped first, # the kafka cluster has no way to obtain the information of the stopped process, and can only manually kill the kafka process zookeeper-01: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-01 restart: always hostname: zookeeper-01 ports: - "2181:2181" networks: - app-tier environment: - ZOO_MY_ID=1 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181 - ZOOKEEPER_CLIENT_PORT=zookeeper-01:2181 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 zookeeper-02: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-02 restart: always hostname: zookeeper-02 ports: - "2182:2181" networks: - app-tier environment: - ZOO_MY_ID=2 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-02:2182 - ZOOKEEPER_CLIENT_PORT=zookeeper-02:2182 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 zookeeper-03: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-03 restart: always hostname: zookeeper-03 ports: - "2183:2181" networks: - app-tier environment: - ZOO_MY_ID=3 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-03:2183 - ZOOKEEPER_CLIENT_PORT=zookeeper-03:2183 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 kafka-01: user: root restart: always container_name: kafka-01 hostname: kafka-01 image: 'bitnami/kafka:latest' ports: - '9092:9092' networks: - app-tier environment: - KAFKA_BROKER_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03 kafka-02: user: root restart: always container_name: kafka-02 hostname: kafka-02 image: 'bitnami/kafka:latest' ports: - '9093:9092' networks: - app-tier environment: - KAFKA_BROKER_ID=2 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03 kafka-03: user: root restart: always container_name: kafka-03 hostname: kafka-03 image: 'bitnami/kafka:latest' ports: - '9094:9092' networks: - app-tier environment: - KAFKA_BROKER_ID=3 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9094 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03
config:
[zk: localhost:2181(CONNECTED) 0] get /kafkas01/brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9092"],"jmx_port":-1,"features":{},"host":"localhost","timestamp":"1678864730406","port":9092,"version":5}
[zk: localhost:2181(CONNECTED) 1] get /kafkas01/brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9093"],"jmx_port":-1,"features":{},"host":"localhost","timestamp":"1678864730485","port":9093,"version":5}
[zk: localhost:2181(CONNECTED) 2] get /kafkas01/brokers/ids/3
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://localhost:9094"],"jmx_port":-1,"features":{},"host":"localhost","timestamp":"1678864881743","port":9094,"version":5}
output:
15039-03-15 15:24:46.773 [/Users/xxxx/mod/gomod/pkg/mod/github.com/aipave/go-utils@v0.0.58/gkafka/producer_with_sync.go:67 gkafka.PublishSyncMessage] [INFO] send message success, topic:coinOption partition:0 offset:297 value:{"id":"1635904859353194496","traceID":"e39c914b-a423-b0c1-29bc-48857ef636b9.func1","correlationID":"e39c914b-a423-b0c1-29bc-48857ef636b9","topic":"coinOption","contentType":"json","keyInfo":{"keyType":"key/product","keyContent":"my_app"},"version":"","source":"172.17.17.153","timestamp":1678865086,"sid":1635904859353194496,"uid":123456,"type":100,"amount":14,"balance":100}
15039-03-15 15:24:46.773 [/Users/xxxx/mod/go-utils/test-example/gkafka_test.go:184 command-line-arguments.TestKafkaConsumerGroup.func1] [INFO] [producer]push ok|msg=&{{1635904859353194496 e39c914b-a423-b0c1-29bc-48857ef636b9.func1 e39c914b-a423-b0c1-29bc-48857ef636b9 coinOption json {key/product my_app} 172.17.17.153 1678865086} 1635904859353194496 123456 100 14 100}
15039-03-15 15:24:46.774 [/Users/xxxx/mod/go-utils/test-example/gkafka_test.go:259 command-line-arguments.handleCoinOption] [INFO] [consumer]marshal ok:&{{1635904859353194496 e39c914b-a423-b0c1-29bc-48857ef636b9.func1 e39c914b-a423-b0c1-29bc-48857ef636b9 coinOption json {key/product my_app} 172.17.17.153 1678865086} 1635904859353194496 123456 100 14 100}
启动后的zookeeper配置:
[zk: localhost:2181(CONNECTED) 8] get /kafkas01/brokers/ids/1
{"listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},"endpoints":["INTERNAL://kafka-01:9092","EXTERNAL://localhost:19092"],"jmx_port":-1,"features":{},"host":"kafka-01","timestamp":"1678866631100","port":9092,"version":5}
[zk: localhost:2181(CONNECTED) 9] get /kafkas01/brokers/ids/2
{"listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},"endpoints":["INTERNAL://kafka-02:9093","EXTERNAL://localhost:19093"],"jmx_port":-1,"features":{},"host":"kafka-02","timestamp":"1678866631946","port":9093,"version":5}
[zk: localhost:2181(CONNECTED) 10] get /kafkas01/brokers/ids/3
{"listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},"endpoints":["INTERNAL://kafka-03:9094","EXTERNAL://localhost:19094"],"jmx_port":-1,"features":{},"host":"kafka-03","timestamp":"1678866632012","port":9094,"version":5}
version: '3' networks: app-tier: driver: bridge external: true services: zookeeper-01: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-01 restart: always hostname: zookeeper-01 ports: - "2181:2181" networks: - app-tier environment: - ZOO_MY_ID=1 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181 - ZOOKEEPER_CLIENT_PORT=zookeeper-01:2181 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 zookeeper-02: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-02 restart: always hostname: zookeeper-02 ports: - "2182:2181" networks: - app-tier environment: - ZOO_MY_ID=2 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-02:2182 - ZOOKEEPER_CLIENT_PORT=zookeeper-02:2182 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 zookeeper-03: user: root image: 'bitnami/zookeeper:latest' container_name: zookeeper-03 restart: always hostname: zookeeper-03 ports: - "2183:2181" networks: - app-tier environment: - ZOO_MY_ID=3 - ALLOW_ANONYMOUS_LOGIN=yes - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-03:2183 - ZOOKEEPER_CLIENT_PORT=zookeeper-03:2183 - ZOO_SERVER=server.1=zookeeper-01:2888:3888;2181 server.2=zookeeper-02:2888:3888;2181 server.3=zookeeper-03:2888:3888;2181 kafka-01: user: root restart: always container_name: kafka-01 hostname: kafka-01 image: 'bitnami/kafka:latest' ports: - '9092:9092' - '19092:19092' networks: - app-tier environment: - KAFKA_BROKER_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:9092,EXTERNAL://:19092 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-01:9092,EXTERNAL://localhost:19092 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03 kafka-02: user: root restart: always container_name: kafka-02 hostname: kafka-02 image: 'bitnami/kafka:latest' ports: - '9093:9092' - '19093:19303' networks: - app-tier environment: - KAFKA_BROKER_ID=2 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:9093,EXTERNAL://:19093 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-02:9093,EXTERNAL://localhost:19093 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03 kafka-03: user: root restart: always container_name: kafka-03 hostname: kafka-03 image: 'bitnami/kafka:latest' ports: - '9094:9092' - '19094:19094' networks: - app-tier environment: - KAFKA_BROKER_ID=3 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-01:2181,zookeeper-02:2182,zookeeper-03:2183/kafkas01 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT - KAFKA_CFG_LISTENERS=INTERNAL://:9094,EXTERNAL://:19094 - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://kafka-03:9094,EXTERNAL://localhost:19094 depends_on: - zookeeper-01 - zookeeper-02 - zookeeper-03
yaml config for 内网
Kafka: localhost:9092,localhost:9093,localhost:9094
yaml config for 外网
Kafka: localhost:19092,localhost:19093,localhost:19094
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。