当前位置:   article > 正文

KafKa消费(代码实例)_kafka消费者代码

kafka消费者代码

Maven配置

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4

Nacos配置

提示:Nacos配置使用的是Spring Cloud Stream

spring:
  cloud:
    stream:
      kafka:
        binder:
          replication-factor: 1
          brokers: <kafka地址>
          auto-create-topics: true       
          auto-add-partitions: true
          #最大传参:10.48M
          producer-properties:
            max.request.size: 10485760
      bindings:
      	# 任务创建-生产者 - Producer
        createModel-out-0:
          #主题
          destination: test-model-topic
          content-type: application/json
        # 任务创建-消费者 - Consumer
        createModel-in-0:
          destination: test-model-topic
          content-type: application/json
          # 消费组,同一个组内只能被消费一次
          group: mark-model_test
          #消费者实例数:10
          consumer.concurrency: 10
          # 对应服务中接收消息的Bean
    function:
      definition: createModel;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

Kafka生产者

import cn.hutool.json.JSONObject;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Service;

/**
 * @description: 生产者
 * @Author sq.cai
 * @Date 2023/5/4 11:35
 * @Version 1.0
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProvider {
    private final StreamBridge streamBridge;
 	/**
     * 任务(创建任务)
     *
     * @param kafkaModelVO
     */
    public void modelCreate(String kafkaModelVO) {
        streamBridge.send(MQConstant.MODEL_CREATE_OUT, kafkaModelVO);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
/**
 * @description:	常量参数
 * @Author sq.cai
 * @Date 2023/5/4 11:37
 * @Version 1.0
 */
public class MQConstant {
    /**
     * 任务创建
     */
    public static final String MODEL_CREATE_OUT = "createModel-out-0";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

Kafka消费者

提示:优化消费方法,增加消费者实例数,能加快消费

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.math3.stat.descriptive.moment.Mean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * @description: 消费者
 * @Author sq.cai
 * @Date 2023/3/17 15:44
 * @Version 1.0
 */
@Slf4j
@Service
public class KafkaConsumer {
	/**
     * 消费
     *
     * @return
     */
    @Bean
    public Consumer<String> createModel() {
        return message -> {
            log.info("Kafka任务记录  >>  {}", message);
            StopWatch s = new StopWatch();
            s.start();
            //将字符串消息体转为对象
            //KafkaModelVO vo = BeanUtil.toBean(JSONUtil.parseObj(message), KafkaModelVO.class);
            //具体消费方法   todo
            s.stop();
            log.info("-----Kafka任务记录耗时  >>  {}", s.getTotalTimeMillis() + "ms");
        };
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

示例

@Resource
private KafkaProvider kafkaProvider;

public static void main(String[] args) {
    String voStr = JSONUtil.toJsonStr(kafkaModelVO);
    log.info("任务添加进Kafka消息队列中  >>  {}", voStr);
    kafkaProvider.modelCreate(voStr);
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/638919
推荐阅读
相关标签
  

闽ICP备14008679号