赞
踩
在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>boot-kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>boot-kafka</name> <description>boot-kafka</description> <properties> <java.version>17</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.39</version> </dependency> </dependencies>
@Data public class InterOrderDto extends OrderDto implements OrderMessage{ /** * 属于哪个分区 */ private String partition; @Override public String getUniqueNo() { return getOrderNo(); } } @Data public class InterOrderDto extends OrderDto implements OrderMessage{ /** * 属于哪个分区 */ private String partition; @Override public String getUniqueNo() { return getOrderNo(); } } public interface OrderMessage { /** * 线程池路由key * @return */ String getUniqueNo(); }
这里是 3个分区,2个副本
@Configuration
public class KafkaConfiguration {
@Bean
public NewTopic topic(){
return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);
}
}
public interface Constants {
String TOPIC_ORDER = "order";
}
消费者:OrderListener
@Component @Slf4j public class OrderListener { @Autowired private OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool; @KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3") public void logListener(ConsumerRecord<String, String> record) { log.debug("> receive log event: {}-{}", record.partition(), record.value()); try { OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class); InterOrderDto interOrderDto = new InterOrderDto(); BeanUtils.copyProperties(orderDto, interOrderDto); interOrderDto.setPartition(record.partition() + ""); orderThreadPool.dispatch(interOrderDto); } catch (Exception e) { log.error("# kafka log listener error: {}", record.value(), e); } } }
线程池: OrderThreadPool
/** * @Date: 2024/1/24 10:23 * 线程池实现 * * @param W: worker * @param D: message */ @Slf4j public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> { private List<W> workers; private int size; public OrderThreadPool(int size, Supplier<W> provider) { this.size = size; workers = new ArrayList<>(size); for (int i = 0; i < size; i++) { workers.add(provider.get()); } if (CollectionUtils.isEmpty(workers)) { throw new RuntimeException("worker size is 0"); } start(); } /** * route message to single thread * * @param data */ public void dispatch(D data) { W w = getUniqueQueue(data.getUniqueNo()); w.offer(data); } private W getUniqueQueue(String uniqueNo) { int queueNo = uniqueNo.hashCode() % size; for (W worker : workers) { if (queueNo == worker.getQueueNo()) { return worker; } } throw new RuntimeException("worker 路由失败"); } /** * start worker, only start once */ private void start() { for (W worker : workers) { new Thread(worker, "OWorder-" + worker.getQueueNo()).start(); } } /** * 关闭所有 workder, 等待所有任务执行完 */ public void shutdown() { for (W worker : workers) { worker.shutdown(); } } }
工作线程:SingleThreadWorker
, 内部使用阻塞队列使其串行化
/** * @Date: 2024/1/24 10:58 * single thread with a blocking-queue */ @Slf4j public abstract class SingleThreadWorker<T> implements Runnable { private static AtomicInteger cnt = new AtomicInteger(0); private BlockingQueue<T> queue; private boolean started = true; /** * worker 唯一id */ @Getter private int queueNo; public SingleThreadWorker(int size) { this.queue = new LinkedBlockingQueue<>(size); this.queueNo = cnt.getAndIncrement(); log.info("init worker {}", this.queueNo); } /** * 提交消息 * * @param data */ public void offer(T data) { try { queue.put(data); } catch (InterruptedException e) { log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e); } } @Override public void run() { log.info("{} worker start take ", Thread.currentThread().getName()); while (started) { try { T data = queue.take(); doConsumer(data); } catch (InterruptedException e) { log.error("queue take error", e); } } } /** * do real consume message * * @param data */ protected abstract void doConsumer(T data); /** * consume rest of message in the queue when thread-pool shutdown */ public void shutdown() { this.started = false; ArrayList<T> rest = new ArrayList<>(); int i = queue.drainTo(rest); if (i > 0) { log.info("{} has rest in queue {}", Thread.currentThread().getName(), i); for (T t : rest) { doConsumer(t); } } } }
工作线程实现:OrderWorker
, 这里就单独处理订单事件
/** * @Date: 2024/1/24 13:42 * 具体消费者 */ @Slf4j public class OrderWorker extends SingleThreadWorker<InterOrderDto>{ public OrderWorker(int size) { super(size); } @Override protected void doConsumer(InterOrderDto data) { log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data)); } }
生产者:OrderController
, 模拟发送不同的事件类型的订单
@RestController public class OrderController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @GetMapping("/send") public String send() throws InterruptedException { int size = 1000; for (int i = 0; i < size; i++) { OrderDto orderDto = new InterOrderDto(); orderDto.setOrderNo(i + ""); orderDto.setPayStatus(getStatus(0)); orderDto.setTimestamp(System.currentTimeMillis()); //相同的key发送到相同的分区 kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto)); TimeUnit.MILLISECONDS.sleep(10); orderDto.setPayStatus(getStatus(1)); orderDto.setTimestamp(System.currentTimeMillis()); kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto)); TimeUnit.MILLISECONDS.sleep(10); orderDto.setPayStatus(getStatus(2)); orderDto.setTimestamp(System.currentTimeMillis()); kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto)); } return "success"; } private String getStatus(int status){ return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败"; } }
# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
@Slf4j @SpringBootApplication public class BootKafkaApplication { public static void main(String[] args) { SpringApplication.run(BootKafkaApplication.class, args); } /** * 配置线程池 * @return */ @Bean public OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){ OrderThreadPool<OrderWorker, InterOrderDto> threadPool = new OrderThreadPool<>(3, () -> new OrderWorker(100)); Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("shutdown orderThreadPool"); //容器关闭时让工作线程中的任务都被消费完 threadPool.shutdown(); })); return threadPool; } }
访问: http://localhost:8080/send
, 结果:
OWorder-0 worker start take
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}
可以发现,在我们工作线程中,事件消费是有序的
good luck!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。