当前位置:   article > 正文

go-zero整合Kafka实现消息生产和消费

go-zero整合Kafka实现消息生产和消费

go-zero整合Kafka实现消息生产和消费

本教程基于go-zero微服务入门教程,项目工程结构同上一个教程。
go-zero微服务入门教程(点击进入)

本教程主要实现go-zero框架整合单机版Kafka,并暴露接口实现Kafka消息的生产和消费。
本文源码:https://gitee.com/songfayuan/go-zero-demo (教程源码分支:3.zero整合单机kafka)

准备工作

  • 如不熟悉go-zero项目的,请先查看上一篇go-zero微服务入门教程
  • 请自行安装好单机版Kafka,建议采用docker安装。

common工具

在common目录下创建task/kafkaconf新目录,在kafkaconf目录下创建conf.go文件,内容如下:

package kafkaconf

type Conf struct {
	Host        string
	Brokers     []string
	Group       string
	Topic       string
	Offset      string `json:",options=first|last,default=last"`
	OffsetId    int64  `json:",default=-1"` //-1时表示不使用该配置
	Consumers   int    `json:",default=8"`
	Processors  int    `json:",default=8"`
	MinBytes    int    `json:",default=10240"`    // 10K
	MaxBytes    int    `json:",default=10485760"` // 10M
	Username    string `json:",optional"`
	Password    string `json:",optional"`
	ForceCommit bool   `json:",default=true"`
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

rpc新增Kafka配置

以下操作在rpc模块执行。

sys.yaml

sys.yaml配置文件新增Kafka配置信息,如下:

# Kafka配置
KafkaConf:
  Host: 192.168.2.204:9092
  Brokers:
    - 192.168.2.204:9092
  Group: "consumer-group-id"
  Topic: kafka-test-topic3
  Consumers: 5
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

config.go

config.go文件中新增KafkaConf配置信息,如下:

KafkaConf kafkaconf.Conf
  • 1

servicecontext.go

servicecontext.go文件新增Kafka配置信息,完整代码如下:

package svc

import (
	"github.com/zeromicro/go-zero/core/stores/redis"
	"github.com/zeromicro/go-zero/core/stores/sqlx"
	"go-zero-demo/common/task/kafkaconf"
	"go-zero-demo/rpc/model/sysmodel"
	"go-zero-demo/rpc/sys/internal/config"
)

type ServiceContext struct {
	Config      config.Config
	RedisClient *redis.Redis

	KafkaConf *kafkaconf.Conf

	UserModel sysmodel.SysUserModel
}

func NewServiceContext(c config.Config) *ServiceContext {
	sqlConn := sqlx.NewMysql(c.Mysql.Datasource)

	conf := redis.RedisConf{
		Host: c.RedisConf.Host,
		Type: c.RedisConf.Type,
		Pass: c.RedisConf.Pass,
		Tls:  c.RedisConf.Tls,
	}

	return &ServiceContext{
		Config:      c,
		RedisClient: redis.MustNewRedis(conf),

		KafkaConf: &c.KafkaConf,

		UserModel: sysmodel.NewSysUserModel(sqlConn),
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

编写rpc服务

修改sys.proto文件

新增Kafka操作请求的配置,如下:

message KafkaReq{
  string name = 1;
  string nickName = 2;
  string password = 3;
  string email = 4;
}

message  KafkaResp{
  string name = 1;
  string nickName = 2;
  string password = 3;
  string email = 4;
}

message Empty {
}

service Sys{
  //Kafka生产者演示请求
  rpc KafkaProducer(KafkaReq)returns(KafkaResp);
  //Kafka消费者演示请求
  rpc KafkaConsumer(Empty)returns(KafkaResp);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

用goctl生成rpc代码

生成方法同上篇文章,自行查看。

编写API Gateway代码

编写api文件

kafka.api

在api目录下创建新目录doc/kafka,在kafka目录下创建kafka.api文件。

syntax = "v1"

info(
	title: "Kafka生产消费案例演示"
	desc: "Kafka生产消费案例演示"
	author: "songfayuan"
)

type (
	ApiKafkaReq {
		Name     string `json:"name"`
		NickName string `json:"nickName"`
		Password string `json:"password"`
		Email    string `json:"email"`
	}

	ApiKafkaResp {
		Code    int64       `json:"code"`
		Message string      `json:"message"`
		Data    ApiKafkaReq `json:"data"`
	}
)

@server (
	group : kafka/test
	prefix : /kafka/test
)

service admin-api{
	@doc(
		summary : "Kafka生产者演示"
	)
	@handler KafkaProducer
	post /kafkaProducer(ApiKafkaReq)returns(ApiKafkaResp)

	@doc (
		summary :"Kafka消费者演示"
	)
	@handler KafkaConsumer
	get /kafkaConsumer returns(ApiKafkaResp)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
admin.api

在api/doc/admin.api文件添加配置信息。

import "kafka/kafka.api"
  • 1

用goctl生成API Gateway代码

生成方法同上篇文章,自行查看。但是此处要基于admin.api文件去生成代码,如果基于kafka.api生成,则生成的代码只有kafka.api定义的接口代码,其他api文件定义的接口代码不被生成。

修改API Gateway代码调用rpc服务

kafkaproducerlogic.go

修改api/internal/logic/kafka/test/kafkaproducerlogic.go里的KafkaProducer方法,如下:

func (l *KafkaProducerLogic) KafkaProducer(req *types.ApiKafkaReq) (resp *types.ApiKafkaResp, err error) {
	producer, err := l.svcCtx.Sys.KafkaProducer(l.ctx, &sysclient.KafkaReq{
		Name:     req.Name,
		NickName: req.NickName,
		Password: req.Password,
		Email:    req.Email,
	})

	if err != nil {
		resJson, _ := json.Marshal(producer)
		logx.WithContext(l.ctx).Errorf("Kafka生产者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error())
		return nil, rpcerror.New(err)
	}

	return &types.ApiKafkaResp{
		Code:    200,
		Message: "操作成功",
		Data: types.ApiKafkaReq{
			Name:     producer.Name,
			NickName: producer.NickName,
			Password: producer.Password,
			Email:    producer.Email,
		},
	}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

kafkaconsumerlogic.go

修改api/internal/logic/kafka/test/kafkaconsumerlogic.go里的KafkaConsumer方法,如下:

func (l *KafkaConsumerLogic) KafkaConsumer() (resp *types.ApiKafkaResp, err error) {
	consumer, err := l.svcCtx.Sys.KafkaConsumer(l.ctx, &sysclient.Empty{})

	if err != nil {
		resJson, _ := json.Marshal(consumer)
		logx.WithContext(l.ctx).Errorf("Kafka消费者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error())
		return nil, rpcerror.New(err)
	}

	return &types.ApiKafkaResp{
		Code:    200,
		Message: "操作成功",
		Data:    types.ApiKafkaReq{},
	}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

修改rpc代码完成消息生产和消费

kafkaproducerlogic.go

  • 修改rpc/sys/internal/logic/kafkaproducerlogic.go,如下内容:
// Kafka生产者演示请求
func (l *KafkaProducerLogic) KafkaProducer(in *sysclient.KafkaReq) (*sysclient.KafkaResp, error) {
	if in.Name == "" {
		return nil, errors.New("账号不能为空")
	}
	if in.NickName == "" {
		return nil, errors.New("姓名不能为空")
	}
	if in.Email == "" {
		return nil, errors.New("邮箱不能为空")
	}

	// 创建一个writer,向topic发送消息
	w := &kafka.Writer{
		Addr:         kafka.TCP(l.svcCtx.Config.KafkaConf.Host),
		Topic:        l.svcCtx.Config.KafkaConf.Topic,
		Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
		RequiredAcks: kafka.RequireAll,    // ack模式
		Async:        true,
	}

	// 定义消息内容
	messages := []string{in.Name, in.NickName, in.Password, in.Email}

	// 循环发送消息
	for i, msg := range messages {
		err := w.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键
				Value: []byte(msg),
			},
		)
		if err != nil {
			log.Fatalf("Kafka生产者演示请求,向kafka写入数据失败: %v", err)
		}
	}

	if err := w.Close(); err != nil {
		log.Fatal("Kafka生产者演示请求,failed to close writer:", err)
	}

	return &sysclient.KafkaResp{}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

kafkaconsumerlogic.go

  • 修改rpc/sys/internal/logic/kafkaconsumerlogic.go,如下内容:
// Kafka消费者演示请求
// 这里演示手动请求触发kafka消费,实际项目中要做成项目启动后就一直监听消费。
func (l *KafkaConsumerLogic) KafkaConsumer(in *sysclient.Empty) (*sysclient.KafkaResp, error) {
	// 创建一个reader,指定GroupID,消费消息
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{l.svcCtx.KafkaConf.Host},
		GroupID:  l.svcCtx.KafkaConf.Group, //指定消费者组ID
		Topic:    l.svcCtx.KafkaConf.Topic,
		MaxBytes: 10e6, //10MB
	})

	//接收消息
	for {
		//ReadMessage会自动提交偏移量
		message, err := reader.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("Kafka消费者演示请求:message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
	}

	//程序退出前关闭Reader
	if err := reader.Close(); err != nil {
		log.Fatal("Kafka消费者演示请求:failed to close reader:", err)
	}

	return &sysclient.KafkaResp{}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

完整调用演示

最后,在根目录go-zero-demo执行下命令。

go mod tidy
  • 1

运行rpc服务

运行方法同上篇文章,自行查看。

运行api

运行方法同上篇文章,自行查看。

api调用

以下调用采用curl进行,你也可以用postman调用。

消息生产
 songfayuan@MacBook-Pro  ~  curl -X POST -H "Content-Type: application/json" -d '{"name":"songfayuan","nickName":"宋发元6666","email":"1414@qq.com","password":"123456"}' localhost:8888/kafka/test/kafkaProducer
 
{"code":200,"message":"操作成功","data":{"name":"","nickName":"","password":"","email":""}}%
  • 1
  • 2
  • 3

此时,查看Kafka相关Topic,即可看到成功生产的数据。

消息消费
 songfayuan@MacBook-Pro  ~  curl "localhost:8888/kafka/test/kafkaConsumer"
  • 1

此时,即可看到运行日志打印出消费成功的信息。

附录

Kafka消息生产和多消费者消费同一个Topic测试案例。

kafka_demo.go

package main

import (
	"context"
	"errors"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"time"
)

// 演示kafka读写
func main() {
	//writeByConn()
	writeByWriter3()
	//writeByWriter2()
	//readByConn()
	//readByReader()
	//readByReaderGroup()
}

// writeByConn基于Conn发送消息
func writeByConn() {
	topic := "kafka-test-topic3"
	partition := 0

	//连接至kafka集群的Leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	//设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

	//发送消息
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte("one!")},
		kafka.Message{Value: []byte("two!")},
		kafka.Message{Value: []byte("three!")},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	//关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

func writeByWriter() {
	//创建一个writer,向topic发送消息
	w := &kafka.Writer{
		Addr:         kafka.TCP("192.168.2.204:9092"),
		Topic:        "kafka-test-topic",
		Balancer:     &kafka.LeastBytes{}, //指定分区的balancer模式为最小字节分布
		RequiredAcks: kafka.RequireAll,    //ack模式
		Async:        true,
	}

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("Two!"),
		},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}
	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

func writeByWriter3() {
	// 创建一个writer,向topic发送消息
	w := &kafka.Writer{
		Addr:         kafka.TCP("192.168.2.204:9092"),
		Topic:        "kafka-test-topic3",
		Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
		RequiredAcks: kafka.RequireAll,    // ack模式
		Async:        true,
	}

	// 定义消息内容
	messages := []string{"Hello World!", "One!", "Two!", "song", "fa", "yuan"}

	// 循环发送消息
	for i, msg := range messages {
		err := w.WriteMessages(context.Background(),
			kafka.Message{
				Key:   []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键
				Value: []byte(msg),
			},
		)
		if err != nil {
			log.Fatalf("failed to write message: %v", err)
		}
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

// 创建不存在的topic
// 如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。
func writeByWriter2() {
	writer := kafka.Writer{
		Addr:                   kafka.TCP("192.168.2.204:9092"),
		Topic:                  "kafka-test-topic",
		AllowAutoTopicCreation: true, //自动创建topic
	}

	messages := []kafka.Message{
		{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		{
			Key:   []byte("Key-C"),
			Value: []byte("Tow!"),
		},
	}

	const retries = 3
	//重试3次
	for i := 0; i < retries; i++ {
		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
		defer cancel()

		err := writer.WriteMessages(ctx, messages...)
		if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
			time.Sleep(time.Millisecond * 250)
			continue
		}

		if err != nil {
			log.Fatal("unexpected error %v", err)
		}
		break
	}

	//关闭Writer
	if err := writer.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

// readByConn连接到kafka后接收消息
func readByConn() {
	//指定要连接的topic和partition
	topic := "kafka-test-topic"
	partition := 0

	//连接至kafka的Leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	//设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	//读取一批信息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) //fetch 10KB min, 1MB max

	//遍历读取消息
	//b := make([]byte, 10e3) //10KB max per message
	fmt.Println("******遍历读取消息******")
	for {
		//使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer,如果传入的buffer太小(消息装不下),就会返回io.ErrShortBuffer
		//n, err := batch.Read(b)
		//如果不考虑内存分配的效率问题,可以使用batch.ReadMessage读取消息
		mag, err := batch.ReadMessage()
		if err != nil {
			break
		}
		//fmt.Println(string(b[:n]))
		fmt.Println(string(mag.Value))
	}

	//关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}

	//关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

// readByReader通过Reader接收消息
func readByReader() {
	//创建Reader
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"192.168.2.204:9092"},
		Topic:     "kafka-test-topic",
		Partition: 0,
		MaxBytes:  10e6, //10MB
	})
	//设置Offset
	reader.SetOffset(1)

	//接收消息
	for {
		message, err := reader.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at offset %d: %s = %s\n", message.Offset, string(message.Key), string(message.Value))
	}

	if err := reader.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

// 消费者组
func readByReaderGroup() {
	// 创建一个reader,指定GroupID,消费消息
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"192.168.2.204:9092"},
		GroupID:  "consumer-group-id", //指定消费者组ID
		Topic:    "kafka-test-topic",
		MaxBytes: 10e6, //10MB
	})

	//接收消息
	for {
		//ReadMessage会自动提交偏移量
		message, err := reader.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
	}

	//程序退出前关闭Reader
	if err := reader.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

// 消费者组,手动提交
func readByReaderGroup2() {
	// 创建一个reader,指定GroupID,消费消息
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"192.168.2.204:9092"},
		GroupID:  "consumer-group-id", //指定消费者组ID
		Topic:    "kafka-test-topic",
		MaxBytes: 10e6, //10MB
	})

	//接收消息
	ctx := context.Background()
	for {
		//获取消息
		message, err := reader.FetchMessage(ctx)
		if err != nil {
			break
		}
		//处理消息
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))

		//显示提交
		if err := reader.CommitMessages(ctx, message); err != nil {
			log.Fatal("failed to commit messages:", err)
		}
	}

	//程序退出前关闭Reader
	if err := reader.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289

kafka_consumer_demo.go

package main

import (
	"context"
	"fmt"
	"github.com/segmentio/kafka-go"
	"log"
	"sync"
	"time"
)

// 多个消费者同时消费同一Topic的数据
// 一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费
func main() {
	// 创建消费者组ID
	consumerGroupID := "consumer-group-id16"

	// 创建两个消费者
	consumer1 := createConsumer(consumerGroupID, "Consumer1")
	consumer2 := createConsumer(consumerGroupID, "Consumer2")
	consumer3 := createConsumer(consumerGroupID, "Consumer3")
	consumer4 := createConsumer(consumerGroupID, "consumer4")
	consumer5 := createConsumer(consumerGroupID, "consumer5")

	// 启动消费者
	var wg sync.WaitGroup
	wg.Add(4)
	go consumeMessages(consumer1, &wg, "Consumer1")
	go consumeMessages(consumer2, &wg, "Consumer2")
	go consumeMessages(consumer3, &wg, "consumer3")
	go consumeMessages(consumer4, &wg, "consumer4")
	go consumeMessages(consumer5, &wg, "consumer5")

	wg.Wait()
}

func createConsumer(groupID, consumerName string) *kafka.Reader {
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"192.168.2.204:9092"},
		GroupID: groupID,
		Topic:   "kafka-test-topic3",
	})
}

func consumeMessages(reader *kafka.Reader, wg *sync.WaitGroup, consumerName string) {
	defer wg.Done()
	for {
		message, err := reader.ReadMessage(context.Background())
		if err != nil {
			break
		}
		time.Sleep(1 * time.Second)

		fmt.Printf("[%s] Message at topic/partition/offset %v/%v/%v: %s = %s\n", consumerName, message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
	}
	if err := reader.Close(); err != nil {
		log.Fatalf("[%s] Failed to close reader: %v\n", consumerName, err)
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/484051
推荐阅读
相关标签
  

闽ICP备14008679号