赞
踩
代码地址:https://gitee.com/lymgoforIT/golang-trick/tree/master/31-kafka-go
之前已经介绍过一个操作kafka的go库了,28.windows安装kafka,Go操作kafka示例(sarama库) ,但是这个库比较老了,当前比较流行的库是github.com/segmentio/kafka-go
,所以本次我们就使用一下它。
我们在GitHub
直接输入kafka
并带上language
标签为Go
时,可以可以看到当前get github.com/segmentio/kafka-go
库是最流行的。
首先启动kafka的服务器,然后在项目中go get github.com/segmentio/kafka-go
接着我们就可以创建生产者和消费者了,注意:在实际工作中,一般是一个服务为生产者,另一个服务作为消费者,但是本案例中不涉及微服务,就是演示一下生成和消费的示例代码,因此写到了一个服务当中。
代码文件组织如下:
user.go :用于测试发送和消费结构体字符串消息
package model
type User struct {
Id int64 `json:"id"`
UserName string `json:"user_name"`
Age int64 `json:"age"`
}
启动zookeeper
和kafka
,并创建名为test
的topic
,步骤可以参考:28.windows安装kafka,Go操作kafka示例(sarama库)
producer.go
package producer import ( "context" "encoding/json" "fmt" "golang-trick/31-kafka-go/model" "time" "github.com/segmentio/kafka-go" ) var ( topic = "user" Producer *kafka.Writer ) func init() { Producer = &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), //TCP函数参数为不定长参数,可以传多个地址组成集群 Topic: topic, Balancer: &kafka.Hash{}, // 用于对key进行hash,决定消息发送到哪个分区 MaxAttempts: 0, WriteBackoffMin: 0, WriteBackoffMax: 0, BatchSize: 0, BatchBytes: 0, BatchTimeout: 0, ReadTimeout: 0, WriteTimeout: time.Second, // kafka有时候可能负载很高,写不进去,那么超时后可以放弃写入,用于可以丢消息的场景 RequiredAcks: kafka.RequireNone, // 不需要任何节点确认就返回 Async: false, Completion: nil, Compression: 0, Logger: nil, ErrorLogger: nil, Transport: nil, AllowAutoTopicCreation: false, // 第一次发消息的时候,如果topic不存在,就自动创建topic,工作中禁止使用 } } // 生产消息,发送user信息 func SendMessage(ctx context.Context, user *model.User) { msgContent, err := json.Marshal(user) if err != nil { fmt.Println(fmt.Sprintf("json marshal user err,user:%v,err:%v", user, err)) } msg := kafka.Message{ Topic: "", Partition: 0, Offset: 0, HighWaterMark: 0, Key: []byte(fmt.Sprintf("%d", user.Id)), Value: msgContent, Headers: nil, WriterData: nil, Time: time.Time{}, } err = Producer.WriteMessages(ctx, msg) if err != nil { fmt.Println(fmt.Sprintf("写入kafka失败,user:%v,err:%v", user, err)) } }
main.go
: 测试消息发送
package main import ( "context" "fmt" "golang-trick/31-kafka-go/model" "golang-trick/31-kafka-go/producer" ) func main() { ctx := context.Background() for i := 0; i < 5; i++ { user := &model.User{ Id: int64(i + 1), UserName: fmt.Sprintf("lym:%d", i), Age: 18, } producer.SendMessage(ctx, user) } producer.Producer.Close() // 消息发送完毕后,关闭生产者 }
可以看到五条消息都发送成功
consumer.go
package consumer import ( "context" "encoding/json" "fmt" "golang-trick/24-gin-learning/class08/model" "time" "github.com/segmentio/kafka-go" ) var ( topic = "user" Consumer *kafka.Reader ) func init() { Consumer = kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, // broker地址 数组 GroupID: "test", // 消费者组id,每个消费者组可以消费kafka的完整数据,但是同一个消费者组中的消费者根据设置的分区消费策略共同消费kafka中的数据 GroupTopics: nil, Topic: topic, // 消费哪个topic Partition: 0, Dialer: nil, QueueCapacity: 0, MinBytes: 0, MaxBytes: 0, MaxWait: 0, ReadBatchTimeout: 0, ReadLagInterval: 0, GroupBalancers: nil, HeartbeatInterval: 0, CommitInterval: time.Second, // offset 上报间隔 PartitionWatchInterval: 0, WatchPartitionChanges: false, SessionTimeout: 0, RebalanceTimeout: 0, JoinGroupBackoff: 0, RetentionTime: 0, StartOffset: kafka.FirstOffset, // 仅对新创建的消费者组生效,从头开始消费,工作中可能更常用从最新的开始消费kafka.LastOffset ReadBackoffMin: 0, ReadBackoffMax: 0, Logger: nil, ErrorLogger: nil, IsolationLevel: 0, MaxAttempts: 0, OffsetOutOfRangeError: false, }) } // 消费消息 func ReadMessage(ctx context.Context) { // 消费者应该通过协程一直开着,一直消费 for { if msg, err := Consumer.ReadMessage(ctx); err != nil { fmt.Println(fmt.Sprintf("读kafka失败,err:%v", err)) break // 当前消息读取失败时,并不退出for终止所有后续消费,而是跳过该消息即可 } else { user := &model.User{} err := json.Unmarshal(msg.Value, user) if err != nil { fmt.Println(fmt.Sprintf("json unmarshal msg value err,msg:%v,err:%v", user, err)) break // 当前消息处理失败时,并不退出for终止所有后续消费,而是跳过该消息即可 } fmt.Println(fmt.Sprintf("topic=%s,partition=%d,offset=%d,key=%s,user=%v", msg.Topic, msg.Partition, msg.Offset, msg.Key, user)) } } }
main.go: 测试接收消息
package main import ( "context" "fmt" "golang-trick/31-kafka-go/consumer" "os" "os/signal" "syscall" ) // 需要监听信息2和15,在程序退出时,关闭Consumer func listenSignal() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) sig := <-c fmt.Printf("收到信号 %s ", sig.String()) if consumer.Consumer != nil { consumer.Consumer.Close() } os.Exit(0) } func main() { ctx := context.Background() //for i := 0; i < 5; i++ { // user := &model.User{ // Id: int64(i + 1), // UserName: fmt.Sprintf("lym:%d", i), // Age: 18, // } // producer.SendMessage(ctx, user) //} //producer.Producer.Close() go consumer.ReadMessage(ctx) listenSignal() }
启动后,因为我们设置的从头开始消费,所以原有的五条消息消费成功,然后在等待着队列中有消息时继续消费
我们可以通过kafka
客户端发两条消息,看看我们的消费者程序是否能消费到
最后关闭服务停止消费
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。