赞
踩
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
提示:Nacos配置使用的是Spring Cloud Stream
spring: cloud: stream: kafka: binder: replication-factor: 1 brokers: <kafka地址> auto-create-topics: true auto-add-partitions: true #最大传参:10.48M producer-properties: max.request.size: 10485760 bindings: # 任务创建-生产者 - Producer createModel-out-0: #主题 destination: test-model-topic content-type: application/json # 任务创建-消费者 - Consumer createModel-in-0: destination: test-model-topic content-type: application/json # 消费组,同一个组内只能被消费一次 group: mark-model_test #消费者实例数:10 consumer.concurrency: 10 # 对应服务中接收消息的Bean function: definition: createModel;
import cn.hutool.json.JSONObject; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Service; /** * @description: 生产者 * @Author sq.cai * @Date 2023/5/4 11:35 * @Version 1.0 */ @Slf4j @Service @RequiredArgsConstructor public class KafkaProvider { private final StreamBridge streamBridge; /** * 任务(创建任务) * * @param kafkaModelVO */ public void modelCreate(String kafkaModelVO) { streamBridge.send(MQConstant.MODEL_CREATE_OUT, kafkaModelVO); } }
/**
* @description: 常量参数
* @Author sq.cai
* @Date 2023/5/4 11:37
* @Version 1.0
*/
public class MQConstant {
/**
* 任务创建
*/
public static final String MODEL_CREATE_OUT = "createModel-out-0";
}
提示:优化消费方法,增加消费者实例数,能加快消费
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.convert.Convert; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.StopWatch; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.math3.stat.descriptive.moment.Mean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Lazy; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.stream.Collectors; /** * @description: 消费者 * @Author sq.cai * @Date 2023/3/17 15:44 * @Version 1.0 */ @Slf4j @Service public class KafkaConsumer { /** * 消费 * * @return */ @Bean public Consumer<String> createModel() { return message -> { log.info("Kafka任务记录 >> {}", message); StopWatch s = new StopWatch(); s.start(); //将字符串消息体转为对象 //KafkaModelVO vo = BeanUtil.toBean(JSONUtil.parseObj(message), KafkaModelVO.class); //具体消费方法 todo s.stop(); log.info("-----Kafka任务记录耗时 >> {}", s.getTotalTimeMillis() + "ms"); }; } }
@Resource
private KafkaProvider kafkaProvider;
public static void main(String[] args) {
String voStr = JSONUtil.toJsonStr(kafkaModelVO);
log.info("任务添加进Kafka消息队列中 >> {}", voStr);
kafkaProvider.modelCreate(voStr);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。