赞
踩
本教程基于go-zero微服务入门教程
,项目工程结构同上一个教程。
go-zero微服务入门教程(点击进入)
本教程主要实现go-zero框架整合单机版Kafka,并暴露接口实现Kafka消息的生产和消费。
本文源码:https://gitee.com/songfayuan/go-zero-demo (教程源码分支:3.zero整合单机kafka)
go-zero微服务入门教程
。在common目录下创建task/kafkaconf新目录,在kafkaconf目录下创建conf.go文件,内容如下:
package kafkaconf type Conf struct { Host string Brokers []string Group string Topic string Offset string `json:",options=first|last,default=last"` OffsetId int64 `json:",default=-1"` //-1时表示不使用该配置 Consumers int `json:",default=8"` Processors int `json:",default=8"` MinBytes int `json:",default=10240"` // 10K MaxBytes int `json:",default=10485760"` // 10M Username string `json:",optional"` Password string `json:",optional"` ForceCommit bool `json:",default=true"` }
以下操作在rpc模块执行。
sys.yaml配置文件新增Kafka配置信息,如下:
# Kafka配置
KafkaConf:
Host: 192.168.2.204:9092
Brokers:
- 192.168.2.204:9092
Group: "consumer-group-id"
Topic: kafka-test-topic3
Consumers: 5
config.go文件中新增KafkaConf配置信息,如下:
KafkaConf kafkaconf.Conf
servicecontext.go文件新增Kafka配置信息,完整代码如下:
package svc import ( "github.com/zeromicro/go-zero/core/stores/redis" "github.com/zeromicro/go-zero/core/stores/sqlx" "go-zero-demo/common/task/kafkaconf" "go-zero-demo/rpc/model/sysmodel" "go-zero-demo/rpc/sys/internal/config" ) type ServiceContext struct { Config config.Config RedisClient *redis.Redis KafkaConf *kafkaconf.Conf UserModel sysmodel.SysUserModel } func NewServiceContext(c config.Config) *ServiceContext { sqlConn := sqlx.NewMysql(c.Mysql.Datasource) conf := redis.RedisConf{ Host: c.RedisConf.Host, Type: c.RedisConf.Type, Pass: c.RedisConf.Pass, Tls: c.RedisConf.Tls, } return &ServiceContext{ Config: c, RedisClient: redis.MustNewRedis(conf), KafkaConf: &c.KafkaConf, UserModel: sysmodel.NewSysUserModel(sqlConn), } }
新增Kafka操作请求的配置,如下:
message KafkaReq{ string name = 1; string nickName = 2; string password = 3; string email = 4; } message KafkaResp{ string name = 1; string nickName = 2; string password = 3; string email = 4; } message Empty { } service Sys{ //Kafka生产者演示请求 rpc KafkaProducer(KafkaReq)returns(KafkaResp); //Kafka消费者演示请求 rpc KafkaConsumer(Empty)returns(KafkaResp); }
生成方法同上篇文章,自行查看。
在api目录下创建新目录doc/kafka,在kafka目录下创建kafka.api文件。
syntax = "v1" info( title: "Kafka生产消费案例演示" desc: "Kafka生产消费案例演示" author: "songfayuan" ) type ( ApiKafkaReq { Name string `json:"name"` NickName string `json:"nickName"` Password string `json:"password"` Email string `json:"email"` } ApiKafkaResp { Code int64 `json:"code"` Message string `json:"message"` Data ApiKafkaReq `json:"data"` } ) @server ( group : kafka/test prefix : /kafka/test ) service admin-api{ @doc( summary : "Kafka生产者演示" ) @handler KafkaProducer post /kafkaProducer(ApiKafkaReq)returns(ApiKafkaResp) @doc ( summary :"Kafka消费者演示" ) @handler KafkaConsumer get /kafkaConsumer returns(ApiKafkaResp) }
在api/doc/admin.api文件添加配置信息。
import "kafka/kafka.api"
生成方法同上篇文章,自行查看。但是此处要基于admin.api文件去生成代码,如果基于kafka.api生成,则生成的代码只有kafka.api定义的接口代码,其他api文件定义的接口代码不被生成。
修改api/internal/logic/kafka/test/kafkaproducerlogic.go
里的KafkaProducer
方法,如下:
func (l *KafkaProducerLogic) KafkaProducer(req *types.ApiKafkaReq) (resp *types.ApiKafkaResp, err error) { producer, err := l.svcCtx.Sys.KafkaProducer(l.ctx, &sysclient.KafkaReq{ Name: req.Name, NickName: req.NickName, Password: req.Password, Email: req.Email, }) if err != nil { resJson, _ := json.Marshal(producer) logx.WithContext(l.ctx).Errorf("Kafka生产者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error()) return nil, rpcerror.New(err) } return &types.ApiKafkaResp{ Code: 200, Message: "操作成功", Data: types.ApiKafkaReq{ Name: producer.Name, NickName: producer.NickName, Password: producer.Password, Email: producer.Email, }, }, nil }
修改api/internal/logic/kafka/test/kafkaconsumerlogic.go
里的KafkaConsumer
方法,如下:
func (l *KafkaConsumerLogic) KafkaConsumer() (resp *types.ApiKafkaResp, err error) {
consumer, err := l.svcCtx.Sys.KafkaConsumer(l.ctx, &sysclient.Empty{})
if err != nil {
resJson, _ := json.Marshal(consumer)
logx.WithContext(l.ctx).Errorf("Kafka消费者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error())
return nil, rpcerror.New(err)
}
return &types.ApiKafkaResp{
Code: 200,
Message: "操作成功",
Data: types.ApiKafkaReq{},
}, nil
}
rpc/sys/internal/logic/kafkaproducerlogic.go
,如下内容:// Kafka生产者演示请求 func (l *KafkaProducerLogic) KafkaProducer(in *sysclient.KafkaReq) (*sysclient.KafkaResp, error) { if in.Name == "" { return nil, errors.New("账号不能为空") } if in.NickName == "" { return nil, errors.New("姓名不能为空") } if in.Email == "" { return nil, errors.New("邮箱不能为空") } // 创建一个writer,向topic发送消息 w := &kafka.Writer{ Addr: kafka.TCP(l.svcCtx.Config.KafkaConf.Host), Topic: l.svcCtx.Config.KafkaConf.Topic, Balancer: &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布 RequiredAcks: kafka.RequireAll, // ack模式 Async: true, } // 定义消息内容 messages := []string{in.Name, in.NickName, in.Password, in.Email} // 循环发送消息 for i, msg := range messages { err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键 Value: []byte(msg), }, ) if err != nil { log.Fatalf("Kafka生产者演示请求,向kafka写入数据失败: %v", err) } } if err := w.Close(); err != nil { log.Fatal("Kafka生产者演示请求,failed to close writer:", err) } return &sysclient.KafkaResp{}, nil }
rpc/sys/internal/logic/kafkaconsumerlogic.go
,如下内容:// Kafka消费者演示请求 // 这里演示手动请求触发kafka消费,实际项目中要做成项目启动后就一直监听消费。 func (l *KafkaConsumerLogic) KafkaConsumer(in *sysclient.Empty) (*sysclient.KafkaResp, error) { // 创建一个reader,指定GroupID,消费消息 reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{l.svcCtx.KafkaConf.Host}, GroupID: l.svcCtx.KafkaConf.Group, //指定消费者组ID Topic: l.svcCtx.KafkaConf.Topic, MaxBytes: 10e6, //10MB }) //接收消息 for { //ReadMessage会自动提交偏移量 message, err := reader.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("Kafka消费者演示请求:message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value)) } //程序退出前关闭Reader if err := reader.Close(); err != nil { log.Fatal("Kafka消费者演示请求:failed to close reader:", err) } return &sysclient.KafkaResp{}, nil }
最后,在根目录go-zero-demo执行下命令。
go mod tidy
运行方法同上篇文章,自行查看。
运行方法同上篇文章,自行查看。
以下调用采用curl进行,你也可以用postman调用。
songfayuan@MacBook-Pro ~ curl -X POST -H "Content-Type: application/json" -d '{"name":"songfayuan","nickName":"宋发元6666","email":"1414@qq.com","password":"123456"}' localhost:8888/kafka/test/kafkaProducer
{"code":200,"message":"操作成功","data":{"name":"","nickName":"","password":"","email":""}}%
此时,查看Kafka相关Topic,即可看到成功生产的数据。
songfayuan@MacBook-Pro ~ curl "localhost:8888/kafka/test/kafkaConsumer"
此时,即可看到运行日志打印出消费成功的信息。
Kafka消息生产和多消费者消费同一个Topic测试案例。
package main import ( "context" "errors" "fmt" "github.com/segmentio/kafka-go" "log" "time" ) // 演示kafka读写 func main() { //writeByConn() writeByWriter3() //writeByWriter2() //readByConn() //readByReader() //readByReaderGroup() } // writeByConn基于Conn发送消息 func writeByConn() { topic := "kafka-test-topic3" partition := 0 //连接至kafka集群的Leader节点 conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition) if err != nil { log.Fatal("failed to dial leader:", err) } //设置发送消息的超时时间 conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) //发送消息 _, err = conn.WriteMessages( kafka.Message{Value: []byte("one!")}, kafka.Message{Value: []byte("two!")}, kafka.Message{Value: []byte("three!")}, ) if err != nil { log.Fatal("failed to write messages:", err) } //关闭连接 if err := conn.Close(); err != nil { log.Fatal("failed to close writer:", err) } } func writeByWriter() { //创建一个writer,向topic发送消息 w := &kafka.Writer{ Addr: kafka.TCP("192.168.2.204:9092"), Topic: "kafka-test-topic", Balancer: &kafka.LeastBytes{}, //指定分区的balancer模式为最小字节分布 RequiredAcks: kafka.RequireAll, //ack模式 Async: true, } err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("failed to write messages:", err) } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) } } func writeByWriter3() { // 创建一个writer,向topic发送消息 w := &kafka.Writer{ Addr: kafka.TCP("192.168.2.204:9092"), Topic: "kafka-test-topic3", Balancer: &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布 RequiredAcks: kafka.RequireAll, // ack模式 Async: true, } // 定义消息内容 messages := []string{"Hello World!", "One!", "Two!", "song", "fa", "yuan"} // 循环发送消息 for i, msg := range messages { err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键 Value: []byte(msg), }, ) if err != nil { log.Fatalf("failed to write message: %v", err) } } if err := w.Close(); err != nil { log.Fatal("failed to close writer:", err) } } // 创建不存在的topic // 如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。 func writeByWriter2() { writer := kafka.Writer{ Addr: kafka.TCP("192.168.2.204:9092"), Topic: "kafka-test-topic", AllowAutoTopicCreation: true, //自动创建topic } messages := []kafka.Message{ { Key: []byte("Key-A"), Value: []byte("Hello World!"), }, { Key: []byte("Key-B"), Value: []byte("One!"), }, { Key: []byte("Key-C"), Value: []byte("Tow!"), }, } const retries = 3 //重试3次 for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err := writer.WriteMessages(ctx, messages...) if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } if err != nil { log.Fatal("unexpected error %v", err) } break } //关闭Writer if err := writer.Close(); err != nil { log.Fatal("failed to close writer:", err) } } // readByConn连接到kafka后接收消息 func readByConn() { //指定要连接的topic和partition topic := "kafka-test-topic" partition := 0 //连接至kafka的Leader节点 conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition) if err != nil { log.Fatal("failed to dial leader:", err) } //设置读取超时时间 conn.SetReadDeadline(time.Now().Add(10 * time.Second)) //读取一批信息,得到的batch是一系列消息的迭代器 batch := conn.ReadBatch(10e3, 1e6) //fetch 10KB min, 1MB max //遍历读取消息 //b := make([]byte, 10e3) //10KB max per message fmt.Println("******遍历读取消息******") for { //使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer,如果传入的buffer太小(消息装不下),就会返回io.ErrShortBuffer //n, err := batch.Read(b) //如果不考虑内存分配的效率问题,可以使用batch.ReadMessage读取消息 mag, err := batch.ReadMessage() if err != nil { break } //fmt.Println(string(b[:n])) fmt.Println(string(mag.Value)) } //关闭batch if err := batch.Close(); err != nil { log.Fatal("failed to close batch:", err) } //关闭连接 if err := conn.Close(); err != nil { log.Fatal("failed to close connection:", err) } } // readByReader通过Reader接收消息 func readByReader() { //创建Reader reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"192.168.2.204:9092"}, Topic: "kafka-test-topic", Partition: 0, MaxBytes: 10e6, //10MB }) //设置Offset reader.SetOffset(1) //接收消息 for { message, err := reader.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at offset %d: %s = %s\n", message.Offset, string(message.Key), string(message.Value)) } if err := reader.Close(); err != nil { log.Fatal("failed to close reader:", err) } } // 消费者组 func readByReaderGroup() { // 创建一个reader,指定GroupID,消费消息 reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"192.168.2.204:9092"}, GroupID: "consumer-group-id", //指定消费者组ID Topic: "kafka-test-topic", MaxBytes: 10e6, //10MB }) //接收消息 for { //ReadMessage会自动提交偏移量 message, err := reader.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value)) } //程序退出前关闭Reader if err := reader.Close(); err != nil { log.Fatal("failed to close reader:", err) } } // 消费者组,手动提交 func readByReaderGroup2() { // 创建一个reader,指定GroupID,消费消息 reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"192.168.2.204:9092"}, GroupID: "consumer-group-id", //指定消费者组ID Topic: "kafka-test-topic", MaxBytes: 10e6, //10MB }) //接收消息 ctx := context.Background() for { //获取消息 message, err := reader.FetchMessage(ctx) if err != nil { break } //处理消息 fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value)) //显示提交 if err := reader.CommitMessages(ctx, message); err != nil { log.Fatal("failed to commit messages:", err) } } //程序退出前关闭Reader if err := reader.Close(); err != nil { log.Fatal("failed to close reader:", err) } }
package main import ( "context" "fmt" "github.com/segmentio/kafka-go" "log" "sync" "time" ) // 多个消费者同时消费同一Topic的数据 // 一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费 func main() { // 创建消费者组ID consumerGroupID := "consumer-group-id16" // 创建两个消费者 consumer1 := createConsumer(consumerGroupID, "Consumer1") consumer2 := createConsumer(consumerGroupID, "Consumer2") consumer3 := createConsumer(consumerGroupID, "Consumer3") consumer4 := createConsumer(consumerGroupID, "consumer4") consumer5 := createConsumer(consumerGroupID, "consumer5") // 启动消费者 var wg sync.WaitGroup wg.Add(4) go consumeMessages(consumer1, &wg, "Consumer1") go consumeMessages(consumer2, &wg, "Consumer2") go consumeMessages(consumer3, &wg, "consumer3") go consumeMessages(consumer4, &wg, "consumer4") go consumeMessages(consumer5, &wg, "consumer5") wg.Wait() } func createConsumer(groupID, consumerName string) *kafka.Reader { return kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"192.168.2.204:9092"}, GroupID: groupID, Topic: "kafka-test-topic3", }) } func consumeMessages(reader *kafka.Reader, wg *sync.WaitGroup, consumerName string) { defer wg.Done() for { message, err := reader.ReadMessage(context.Background()) if err != nil { break } time.Sleep(1 * time.Second) fmt.Printf("[%s] Message at topic/partition/offset %v/%v/%v: %s = %s\n", consumerName, message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value)) } if err := reader.Close(); err != nil { log.Fatalf("[%s] Failed to close reader: %v\n", consumerName, err) } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。