当前位置:   article > 正文

go如何操作kafka-直接上代码_go-zero kafk 怎么设置requiredacks

go-zero kafk 怎么设置requiredacks

1、kafka的配置项

// KafkaConfig kafka配置
type KafkaConfig struct {
	DisableConsumer bool   `mapstructure:"disableConsumer"`
	Debug           bool   `mapstructure:"debug"`
	Address         string `mapstructure:"address"`
	ReadTimeout     int64  `mapstructure:"read-timeout"`
	WriteTimeout    int64  `mapstructure:"write-timeout"`
	RequiredAck     int    `mapstructure:"required-ack"`
	MaxOpenRequests int    `mapstructure:"max-open-requests"`
	Partition       int    `mapstructure:"partition"`
}
// Conf 上层使用
type Conf struct {
  KafKa            map[string]*KafkaConfig `mapstructure:"kafka"`
}
// ymal
kafka:
  disableConsumer:
    debug: true
    address: 127.0.0.1:9092
    required-ack: -1 # 发送完数据后是否需要拿多少个副本确认 -1 需要全部
    read-timeout: 30 # 默认30s
    write-timeout: 30 # 默认30s
    max-open-requests: 5  # 在发送阻塞之前,允许有多少个未完成的请求,默认为5
    partition: 2 # 分区生成方案 0根据topic进行hash、1随机、2轮询
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

2、kafka初始化配置

使用的是"github.com/IBM/sarama"包

type Kafka struct {
	Key             string
	DisableConsumer bool
	Debug           bool
	Producer        sarama.SyncProducer
	Consumer        sarama.Consumer
	Client          sarama.Client
}

// kafkaClient 做kafka的k-v,因为kafka集群搭建,这用的是单体,通过map获取不同的broker
var kafkaClient = new(sync.Map)

func InitKafka() {
	for k, v := range setting.Conf.KafKa {
		key := k
		val := v
		scfg := buildConfig(val)
		kafka, err := newKafkaClient(key, val, scfg)
		if err != nil {
			zap.L().Error("newKafkaClient(key, val, scfg) failed:", zap.Error(err))
			return
		}
		kafkaClient.Store(key, kafka)
	}
}

// GetClient 根据map的key获取kafka实例
func GetClient(key string) (*Kafka, error) {
	val, ok := kafkaClient.Load(key)
	if !ok {
		return nil, fmt.Errorf("获取kafka client失败,key:%s", key)
	}

	return val.(*Kafka), nil
}

// buildConfig kafka配置
func buildConfig(v *setting.KafkaConfig) *sarama.Config {
	cfg := sarama.NewConfig()
	cfg.Producer.RequiredAcks = sarama.RequiredAcks(v.RequiredAck)
	cfg.Producer.Return.Successes = true

	if v.Partition == 1 {
		cfg.Producer.Partitioner = sarama.NewRandomPartitioner
	}

	if v.Partition == 2 {
		cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	}

	if v.ReadTimeout != 0 {
		cfg.Net.ReadTimeout = time.Duration(v.ReadTimeout) * time.Second
	}

	if v.WriteTimeout != 0 {
		cfg.Net.WriteTimeout = time.Duration(v.WriteTimeout) * time.Second
	}

	if v.MaxOpenRequests != 0 {
		cfg.Net.MaxOpenRequests = v.MaxOpenRequests
	}

	return cfg
}

func newKafkaClient(key string, cfg *setting.KafkaConfig, scfg *sarama.Config) (*Kafka, error) {
	client, err := sarama.NewClient(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	syncProducer, err := sarama.NewSyncProducer(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	consumer, err := sarama.NewConsumer(strings.Split(cfg.Address, ","), scfg)
	if err != nil {
		return nil, err
	}

	return &Kafka{
		Key:             key,
		DisableConsumer: cfg.DisableConsumer,
		Debug:           cfg.Debug,
		Producer:        syncProducer,
		Consumer:        consumer,
		Client:          client,
	}, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90

3、生产者模型

// SendMessage 发送消息,默认分区
func SendMessage(key, topic, value string) error {
	return SendMessagePartitionPar(key, topic, value, "")
}

// SendMessagePartitionPar 发送消息,指定分区
func SendMessagePartitionPar(key, topic, value, partitionKey string) error {
	kafka, err := GetClient(key)
	if err != nil {
		return err
	}

	msg := &sarama.ProducerMessage{
		Topic:     topic,
		Value:     sarama.StringEncoder(value),
		Timestamp: time.Now(),
	}

	if partitionKey != "" {
		msg.Key = sarama.StringEncoder(partitionKey)
	}
	partition, offset, err := kafka.Producer.SendMessage(msg)
	if err != nil {
		return err
	}
	if kafka.Debug {
		zap.L().Info("发送kafka消息成功",
			zap.Int32("partition", partition),
			zap.Int64("offset", offset))
	}

	return err
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4、消费者模型

单个消费者消费topic

// Consumer 消费者函数
func Consumer(ctx context.Context, key, topic string, fn func(message *sarama.ConsumerMessage) error) (err error) {
	kafka, err := GetClient(key)
	if err != nil {
		return
	}
	partitions, err := kafka.Consumer.Partitions(topic)
	if err != nil {
		return
	}
	for _, partition := range partitions {
		// 针对每个分区创建一个对应的分区消费者
		offset, errx := kafka.Client.GetOffset(topic, partition, sarama.OffsetNewest)
		if errx != nil {
			zap.L().Info("获取Offset失败:", zap.Error(errx))
			continue
		}
		pc, errx := kafka.Consumer.ConsumePartition(topic, partition, offset-1)
		if errx != nil {
			zap.L().Info("获取Offset失败:", zap.Error(errx))
			return nil
		}
		// 从每个分区都消费消息
		go func(consume sarama.PartitionConsumer) {
			defer func() {
				if err := recover(); err != nil {
					zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err))
				}
			}()

			defer func() {
				err := pc.Close()
				if err != nil {
					zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err))
				}
			}()

			for {
				select {
				case msg := <-pc.Messages():
					err := MiddlewareConsumerHandler(fn)(msg)
					if err != nil {
						return
					}
				case <-ctx.Done():
					return
				}
			}

		}(pc)
	}
	return nil
}


func MiddlewareConsumerHandler(fn func(message *sarama.ConsumerMessage) error) func(message *sarama.ConsumerMessage) error {
	return func(msg *sarama.ConsumerMessage) error {
		return fn(msg)
	}
}

type ConsumerGroupHandler func(message *sarama.ConsumerMessage) error

func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
	return nil
}

func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for msg := range claim.Messages() {
		if err := h(msg); err != nil {
			zap.L().Info("消息处理失败",
				zap.String("topic", msg.Topic),
				zap.String("value", string(msg.Value)))
			continue
		}
		sess.MarkMessage(msg, "")
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84

消费者组消费topic

// ConsumerGroup 消费者组消费消息
func ConsumerGroup(ctx context.Context, key, groupId, topics string, msgHandler ConsumerGroupHandler) (err error) {
	kafka, err := GetClient(key)
	if err != nil {
		return
	}

	if isConsumerDisabled(kafka) {
		return
	}

	consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, kafka.Client)
	if err != nil {
		return
	}

	go func() {
		defer func() {
			if err := recover(); err != nil {
				zap.L().Error("消费kafka发生panic", zap.Any("panic", err))
			}
		}()

		defer func() {
			err := consumerGroup.Close()
			if err != nil {
				zap.L().Error("close err", zap.Any("panic", err))
			}
		}()

		for {
			select {
			case <-ctx.Done():
				return
			default:
				err := consumerGroup.Consume(ctx, strings.Split(topics, ","), ConsumerGroupHandler(func(msg *sarama.ConsumerMessage) error {
					return MiddlewareConsumerHandler(msgHandler)(msg)
				}))
				if err != nil {
					zap.L().Error("消费kafka失败 err", zap.Any("panic", err))

				}
			}
		}
	}()

	return
}

func isConsumerDisabled(in *Kafka) bool {
	if in.DisableConsumer {
		zap.L().Info("kafka consumer disabled,key:%s", zap.String("key", in.Key))
	}

	return in.DisableConsumer
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/600202
推荐阅读
相关标签
  

闽ICP备14008679号