赞
踩
// KafkaConfig kafka配置 type KafkaConfig struct { DisableConsumer bool `mapstructure:"disableConsumer"` Debug bool `mapstructure:"debug"` Address string `mapstructure:"address"` ReadTimeout int64 `mapstructure:"read-timeout"` WriteTimeout int64 `mapstructure:"write-timeout"` RequiredAck int `mapstructure:"required-ack"` MaxOpenRequests int `mapstructure:"max-open-requests"` Partition int `mapstructure:"partition"` } // Conf 上层使用 type Conf struct { KafKa map[string]*KafkaConfig `mapstructure:"kafka"` } // ymal kafka: disableConsumer: debug: true address: 127.0.0.1:9092 required-ack: -1 # 发送完数据后是否需要拿多少个副本确认 -1 需要全部 read-timeout: 30 # 默认30s write-timeout: 30 # 默认30s max-open-requests: 5 # 在发送阻塞之前,允许有多少个未完成的请求,默认为5 partition: 2 # 分区生成方案 0根据topic进行hash、1随机、2轮询
使用的是"github.com/IBM/sarama"包
type Kafka struct { Key string DisableConsumer bool Debug bool Producer sarama.SyncProducer Consumer sarama.Consumer Client sarama.Client } // kafkaClient 做kafka的k-v,因为kafka集群搭建,这用的是单体,通过map获取不同的broker var kafkaClient = new(sync.Map) func InitKafka() { for k, v := range setting.Conf.KafKa { key := k val := v scfg := buildConfig(val) kafka, err := newKafkaClient(key, val, scfg) if err != nil { zap.L().Error("newKafkaClient(key, val, scfg) failed:", zap.Error(err)) return } kafkaClient.Store(key, kafka) } } // GetClient 根据map的key获取kafka实例 func GetClient(key string) (*Kafka, error) { val, ok := kafkaClient.Load(key) if !ok { return nil, fmt.Errorf("获取kafka client失败,key:%s", key) } return val.(*Kafka), nil } // buildConfig kafka配置 func buildConfig(v *setting.KafkaConfig) *sarama.Config { cfg := sarama.NewConfig() cfg.Producer.RequiredAcks = sarama.RequiredAcks(v.RequiredAck) cfg.Producer.Return.Successes = true if v.Partition == 1 { cfg.Producer.Partitioner = sarama.NewRandomPartitioner } if v.Partition == 2 { cfg.Producer.Partitioner = sarama.NewRoundRobinPartitioner } if v.ReadTimeout != 0 { cfg.Net.ReadTimeout = time.Duration(v.ReadTimeout) * time.Second } if v.WriteTimeout != 0 { cfg.Net.WriteTimeout = time.Duration(v.WriteTimeout) * time.Second } if v.MaxOpenRequests != 0 { cfg.Net.MaxOpenRequests = v.MaxOpenRequests } return cfg } func newKafkaClient(key string, cfg *setting.KafkaConfig, scfg *sarama.Config) (*Kafka, error) { client, err := sarama.NewClient(strings.Split(cfg.Address, ","), scfg) if err != nil { return nil, err } syncProducer, err := sarama.NewSyncProducer(strings.Split(cfg.Address, ","), scfg) if err != nil { return nil, err } consumer, err := sarama.NewConsumer(strings.Split(cfg.Address, ","), scfg) if err != nil { return nil, err } return &Kafka{ Key: key, DisableConsumer: cfg.DisableConsumer, Debug: cfg.Debug, Producer: syncProducer, Consumer: consumer, Client: client, }, nil }
// SendMessage 发送消息,默认分区 func SendMessage(key, topic, value string) error { return SendMessagePartitionPar(key, topic, value, "") } // SendMessagePartitionPar 发送消息,指定分区 func SendMessagePartitionPar(key, topic, value, partitionKey string) error { kafka, err := GetClient(key) if err != nil { return err } msg := &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(value), Timestamp: time.Now(), } if partitionKey != "" { msg.Key = sarama.StringEncoder(partitionKey) } partition, offset, err := kafka.Producer.SendMessage(msg) if err != nil { return err } if kafka.Debug { zap.L().Info("发送kafka消息成功", zap.Int32("partition", partition), zap.Int64("offset", offset)) } return err }
单个消费者消费topic
// Consumer 消费者函数 func Consumer(ctx context.Context, key, topic string, fn func(message *sarama.ConsumerMessage) error) (err error) { kafka, err := GetClient(key) if err != nil { return } partitions, err := kafka.Consumer.Partitions(topic) if err != nil { return } for _, partition := range partitions { // 针对每个分区创建一个对应的分区消费者 offset, errx := kafka.Client.GetOffset(topic, partition, sarama.OffsetNewest) if errx != nil { zap.L().Info("获取Offset失败:", zap.Error(errx)) continue } pc, errx := kafka.Consumer.ConsumePartition(topic, partition, offset-1) if errx != nil { zap.L().Info("获取Offset失败:", zap.Error(errx)) return nil } // 从每个分区都消费消息 go func(consume sarama.PartitionConsumer) { defer func() { if err := recover(); err != nil { zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err)) } }() defer func() { err := pc.Close() if err != nil { zap.L().Error("消费kafka信息发生panic,err:%s", zap.Any("err:", err)) } }() for { select { case msg := <-pc.Messages(): err := MiddlewareConsumerHandler(fn)(msg) if err != nil { return } case <-ctx.Done(): return } } }(pc) } return nil } func MiddlewareConsumerHandler(fn func(message *sarama.ConsumerMessage) error) func(message *sarama.ConsumerMessage) error { return func(msg *sarama.ConsumerMessage) error { return fn(msg) } } type ConsumerGroupHandler func(message *sarama.ConsumerMessage) error func (ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { if err := h(msg); err != nil { zap.L().Info("消息处理失败", zap.String("topic", msg.Topic), zap.String("value", string(msg.Value))) continue } sess.MarkMessage(msg, "") } return nil }
消费者组消费topic
// ConsumerGroup 消费者组消费消息 func ConsumerGroup(ctx context.Context, key, groupId, topics string, msgHandler ConsumerGroupHandler) (err error) { kafka, err := GetClient(key) if err != nil { return } if isConsumerDisabled(kafka) { return } consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, kafka.Client) if err != nil { return } go func() { defer func() { if err := recover(); err != nil { zap.L().Error("消费kafka发生panic", zap.Any("panic", err)) } }() defer func() { err := consumerGroup.Close() if err != nil { zap.L().Error("close err", zap.Any("panic", err)) } }() for { select { case <-ctx.Done(): return default: err := consumerGroup.Consume(ctx, strings.Split(topics, ","), ConsumerGroupHandler(func(msg *sarama.ConsumerMessage) error { return MiddlewareConsumerHandler(msgHandler)(msg) })) if err != nil { zap.L().Error("消费kafka失败 err", zap.Any("panic", err)) } } } }() return } func isConsumerDisabled(in *Kafka) bool { if in.DisableConsumer { zap.L().Info("kafka consumer disabled,key:%s", zap.String("key", in.Key)) } return in.DisableConsumer }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。