赞
踩
Go 语言中有一些流行的 Kafka 客户端库。以下是几个常用的库及其优劣与区别:
优点:
librdkafka
,性能非常高。缺点:
librdkafka
,需要额外安装该库。优点:
缺点:
confluent-kafka-go
,性能稍逊一筹。优点:
缺点:
confluent-kafka-go
,性能稍逊一筹。优点:
缺点:
sarama
和 confluent-kafka-go
,社区支持稍弱。confluent-kafka-go
是一个不错的选择。sarama
或 segmentio/kafka-go
。franz-go
。本文我们就以confluent-kafka-go
库为例来编写代码。
不知道如何搭建集群请点击这里 ----》Kafka 集群部署(CentOS 单机模拟版)
如果你懒得启动集群,那么直接跳过。
cluster
目录下运行集群启动脚本 cluster.sh
;cd cluster
./cluster.sh
ll zookeeper-data/
total 4
drwxr-xr-x 3 root root 4096 May 27 10:20 zookeeper
ll broker-data/
total 12
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-1
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-2
drwxr-xr-x 2 root root 4096 May 27 10:21 broker-3
go工作目录
echo $GOPATH
GOPATH
目录下的src
目录下新建 produce
项目mkdir src/produce
cd src/produce
go mod init
命令来初始化一个新的 Go 模块
go mod init produce
confluent-kafka-go
库go get github.com/confluentinc/confluent-kafka-go/kafka
producer.go
touch producer.go
package main
import (
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 创建生产者实例
broker := "localhost:9091" // 集群地址
topic := "test" // 主题名称
producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) // 创建生产者实例
// 检查错误
if err != nil {
log.Fatalf("Failed to create producer: %s", err)
}
defer producer.Close()
fmt.Printf("Created Producer %v\n", producer)
// 生产消息
message := "hello kafka"
for i := 0; i < 10; i++ {
producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, // 任题名称
Value: []byte(message + fmt.Sprintf("%d", i)), // 消息内容
}, nil)
}
if err != nil {
log.Fatalf("Failed to produce message: %v", err)
}
// 等待消息发送完成
e := <-producer.Events() // 阻塞直到消息发送完成
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
log.Printf("Failed to deliver message: %v", ev.TopicPartition)
} else {
fmt.Printf("Delivered message: %s to %v\n", string(ev.Value), ev.TopicPartition)
}
}
// 冲刷缓冲区消息
producer.Flush(15 * 1000)
}
代码说明
集群地址
以及主题信息
,如果没有该主题则自动创建
。异步
地将消息发送到 Kafka,因此你需要处理交付报告以确保消息成功发送。我们需要了解一下Go语言和Kafka之间的关系:Go是一种静态类型、编译型的编程语言,由Google开发并开源。它适用于构建高性能服务器端应用程序和网络服务。而Apache Kafka是一个分布式流处理平台,主要面向大规模数据传输和存储。
在这个例子中,我们有一个生产者程序,它使用Kafka的客户端库来连接到Kafka集群,然后通过创建一个生产者实例来开始发送消息。当生产者准备好要发送的消息时,它就会调用Send()
方法将其添加到缓冲区中。一旦缓冲区满了或者用户主动触发了Flush()
方法,生产者就会把缓冲区里的所有消息一起发送给Kafka集群。
go build producer.go
./producer
Created Producer rdkafka#producer-1
Delivered message: hello kafka0 to test[0]@0
ll cluster/broker-data/broker-1
total 20
-rw-r--r-- 1 root root 0 May 27 10:20 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 May 27 11:36 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 May 27 10:20 meta.properties
-rw-r--r-- 1 root root 13 May 27 11:36 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 14 May 27 11:36 replication-offset-checkpoint
drwxr-xr-x 2 root root 4096 May 27 11:21 test-0 # 我们创建的主题 数字代表分区号
ll cluster/broker-data/broker-1/test-0/
total 12
-rw-r--r-- 1 root root 10485760 May 27 11:21 00000000000000000000.index
-rw-r--r-- 1 root root 251 May 27 11:21 00000000000000000000.log
-rw-r--r-- 1 root root 10485756 May 27 11:21 00000000000000000000.timeindex
-rw-r--r-- 1 root root 8 May 27 11:21 leader-epoch-checkpoint
-rw-r--r-- 1 root root 43 May 27 11:21 partition.metadata
Kafka 的数据文件存储在每个分区的目录中,这些文件包括 .index
、.log
、.timeindex
、leader-epoch-checkpoint
和 partition.metadata
文件。每个文件都有其特定的用途,下面是对这些文件的详细解释:
.log
文件:
.log
文件代表一个日志段(log segment),文件名通常是该段的起始偏移量(offset)。.index
文件:
.timeindex
文件:
.index
文件,但索引的是时间戳而不是偏移量。leader-epoch-checkpoint
文件:
partition.metadata
文件:
这些文件共同作用,确保 Kafka 能够高效、可靠地存储和检索消息数据。
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
~/cluster/broker-1/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log --print-data-log
Dumping ./00000000000000000000.log
Log starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1716780091840 size: 251 magic: 2 compresscodec: none crc: 997822510 isvalid: true
| offset: 0 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka0
| offset: 1 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka1
| offset: 2 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka2
| offset: 3 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka3
| offset: 4 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka4
| offset: 5 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka5
| offset: 6 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka6
| offset: 7 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka7
| offset: 8 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka8
| offset: 9 CreateTime: 1716780091840 keySize: -1 valueSize: 12 sequence: -1 headerKeys: [] payload: hello kafka9
如上我们可以看到消息已经成功的发送。
mkdir src/consume
cd src/consume
go mod init
命令来初始化一个新的 Go 模块
go mod init consume
confluent-kafka-go
库go get github.com/confluentinc/confluent-kafka-go/kafka
touch consumer.go
package main
import (
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// 创建消费者实例
broker := "localhost:9091" // 集群地址
topic := "test" // 主题名称
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker, // 集群地址
"group.id": "my-group", // 消费者组
"auto.offset.reset": "earliest", // 设置偏移量 从头开始消费
})
// 检查错误
if err != nil {
log.Printf("Failed to create consumer: %s\n", err)
}
defer c.Close()
// 描述订阅主题
c.SubscribeTopics([]string{topic}, nil)
fmt.Printf("Consuming topic %s\n", topic)
// 消费消息
for {
msg, err := c.ReadMessage(-1) // 阻塞直到消息到达
if err == nil {
fmt.Printf("Consumed message: %s\n", msg.Value)
} else {
// 消费者错误
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
go build consumer.go
./consumer
Consuming topic test
Consumed message: hello kafka0
Consumed message: hello kafka1
Consumed message: hello kafka2
Consumed message: hello kafka3
Consumed message: hello kafka4
Consumed message: hello kafka5
Consumed message: hello kafka6
Consumed message: hello kafka7
Consumed message: hello kafka8
Consumed message: hello kafka9
可以看到已经成功的消费刚才生产的消息。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。