赞
踩
本文档详细介绍了如何使用Go语言对Kafka进行基础操作。我们将介绍如何使用Go连接Kafka、生产消息、消费消息。以下是详细操作步骤:
首先,使用以下命令安装Sarama,一个优秀的Kafka Go客户端库:
go get github.com/Shopify/sarama
导入必要的依赖包:
import (
"fmt"
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"strings"
"sync"
"time"
)
创建一个函数,用于连接并返回一个Kafka生产者:
func createProducer(brokers []string) (sarama.AsyncProducer, error) {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Timeout = 5 * time.Second
return sarama.NewAsyncProducer(brokers, config)
}
创建一个函数,用于发送消息到指定的Kafka主题:
func produceMessage(producer sarama.AsyncProducer, topic, value string) {
message := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(value),
}
producer.Input() <- message
}
创建一个函数,用于连接并返回一个Kafka消费者:
func createConsumer(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
return sarama.NewConsumerGroup(brokers, groupID, config)
}
定义一个消费者组对象:
type KafkaConsumerGroupHandler struct { ready chan bool } func (handler *KafkaConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { close(handler.ready) return nil } func (handler *KafkaConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (handler *KafkaConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { fmt.Printf("消息: 主题=%s 分区=%d 偏移量=%d\n", message.Topic, message.Partition, message.Offset) fmt.Printf("消息内容: %s\n", string(message.Value)) sess.MarkMessage(message, "") } return nil }
创建一个函数,用于消费指定的Kafka主题:
func consumeMessages(consumer sarama.ConsumerGroup, topics []string) { handler := &KafkaConsumerGroupHandler{ ready: make(chan bool), } for { err := consumer.Consume(context.Background(), topics, handler) if err != nil { log.Printf("消费者错误: %v", err) } select { case <-handler.ready: default: return } } }
在 main
函数中调用以上方法展示生产和消费操作:
func main() { brokers := strings.Split("localhost:9092", ",") topic := "my_topic" groupID := "my_group" // 创建生产者 producer, err := createProducer(brokers) if err != nil { log.Fatal("无法创建生产者:", err) } defer func() { if err := producer.Close(); err != nil { log.Fatal("无法关闭生产者:", err) } }() // 发送消息 produceMessage(producer, topic, "hello world") // 创建消费者 consumer, err := createConsumer(brokers, groupID) if err != nil { log.Fatal("无法创建消费者:", err) } defer func() { if err := consumer.Close(); err != nil { log.Fatal("无法关闭消费者:", err) } }() topics := []string{topic} wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() consumeMessages(consumer, topics) }() // 监听退出信号 sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, os.Interrupt) <-sigterm // 优雅关闭消费者 wg.Wait() }
当你运行上述程序时,你将首先连接到Kafka集群并创建一个生产者,然后发送一条"hello world"消息到名为 “my_topic” 的主题。接下来,程序创建一个消费者,用于消费刚刚发送的消息并在终端输出消息内容。程序运行过程中,使用Ctrl+C或发送中断信号,可以优雅终止消费者并退出程序。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。