当前位置:   article > 正文

Go语言操作Kafka_go kafka

go kafka

Go语言操作Kafka

本文档详细介绍了如何使用Go语言对Kafka进行基础操作。我们将介绍如何使用Go连接Kafka、生产消息、消费消息。以下是详细操作步骤:

1. 安装驱动

首先,使用以下命令安装Sarama,一个优秀的Kafka Go客户端库:

go get github.com/Shopify/sarama
  • 1

2. 导入依赖

导入必要的依赖包:

import (
 "fmt"
 "github.com/Shopify/sarama"
 "log"
 "os"
 "os/signal"
 "strings"
 "sync"
 "time"
)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3. 生产者(Producer)

创建一个函数,用于连接并返回一个Kafka生产者:

func createProducer(brokers []string) (sarama.AsyncProducer, error) {
 config := sarama.NewConfig()
 config.Producer.Return.Successes = true
 config.Producer.Timeout = 5 * time.Second
 return sarama.NewAsyncProducer(brokers, config)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

创建一个函数,用于发送消息到指定的Kafka主题:

func produceMessage(producer sarama.AsyncProducer, topic, value string) {
 message := &sarama.ProducerMessage{
  Topic: topic,
  Value: sarama.StringEncoder(value),
 }

 producer.Input() <- message
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4. 消费者(Consumer)

创建一个函数,用于连接并返回一个Kafka消费者:

func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
 config := sarama.NewConfig()
 config.Consumer.Offsets.Initial = sarama.OffsetOldest
 return sarama.NewConsumerGroup(brokers, groupID, config)
}
  • 1
  • 2
  • 3
  • 4
  • 5

定义一个消费者组对象:

type KafkaConsumerGroupHandler struct {
 ready chan bool
}

func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
 close(handler.ready)
 return nil
}

func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
 return nil
}

func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 for message := range claim.Messages() {
  fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset)
  fmt.Printf("消息内容: %s\n", string(message.Value))
  sess.MarkMessage(message, "")
 }
 return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

创建一个函数,用于消费指定的Kafka主题:

func consumeMessages(consumer sarama.ConsumerGroup, topics []string) {
 handler := &KafkaConsumerGroupHandler{
  ready: make(chan bool),
 }

 for {
  err := consumer.Consume(context.Background(), topics, handler)
  if err != nil {
   log.Printf("消费者错误: %v", err)
  }

  select {
  case <-handler.ready:
  default:
   return
  }
 }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

参考代码

main 函数中调用以上方法展示生产和消费操作:

func main() {
 brokers := strings.Split("localhost:9092", ",")
 topic := "my_topic"
 groupID := "my_group"

 // 创建生产者
 producer, err := createProducer(brokers)
 if err != nil {
  log.Fatal("无法创建生产者:", err)
 }
 defer func() {
  if err := producer.Close(); err != nil {
   log.Fatal("无法关闭生产者:", err)
  }
 }()

 // 发送消息
 produceMessage(producer, topic, "hello world")

 // 创建消费者
 consumer, err := createConsumer(brokers, groupID)
 if err != nil {
  log.Fatal("无法创建消费者:", err)
 }
 defer func() {
  if err := consumer.Close(); err != nil {
   log.Fatal("无法关闭消费者:", err)
  }
 }()

 topics := []string{topic}
 wg := &sync.WaitGroup{}
 wg.Add(1)

 go func() {
  defer wg.Done()
  consumeMessages(consumer, topics)
 }()

 // 监听退出信号
 sigterm := make(chan os.Signal, 1)
 signal.Notify(sigterm, os.Interrupt)
 <-sigterm

 // 优雅关闭消费者
 wg.Wait()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

当你运行上述程序时,你将首先连接到Kafka集群并创建一个生产者,然后发送一条"hello world"消息到名为 “my_topic” 的主题。接下来,程序创建一个消费者,用于消费刚刚发送的消息并在终端输出消息内容。程序运行过程中,使用Ctrl+C或发送中断信号,可以优雅终止消费者并退出程序。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/529524
推荐阅读
相关标签
  

闽ICP备14008679号