当前位置:   article > 正文

消息中间件——RabbitMQ(十一)RabbitMQ整合Spring Cloud Stream实战!(全)_spring cloud stream集成rabbitmq实现消息处理

spring cloud stream集成rabbitmq实现消息处理

2、定义Barista接口

/**

  • 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。

  • 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

*/

public interface Barista {

String OUTPUT_CHANNEL = “output_channel”;

//注解@Output声明了它是一个输出类型的通道,名字是output_channel。这一名字与app1中通道名一致,表明注入了一个名字为output_channel的通道,类型是output,发布的主题名为mydest。

@Output(Barista.OUTPUT_CHANNEL)

MessageChannel logoutput();

}

3、定义RabbitmqSender类

//启动这个绑定

@EnableBinding(Barista.class)

@Service //注入到spring容器

public class RabbitmqSender {

//注入Barista

@Autowired

private Barista barista;

// 发送消息

public String sendMessage(Object message, Map<String, Object> properties) throws Exception {

try{

MessageHeaders mhs = new MessageHeaders(properties);

Message msg = MessageBuilder.createMessage(message, mhs);

boolean sendStatus = barista.logoutput().send(msg);

System.err.println(“--------------sending -------------------”);

System.out.println(“发送数据:” + message + ",sendStatus: " + sendStatus);

}catch (Exception e){

System.err.println(“-------------error-------------”);

e.printStackTrace();

throw new RuntimeException(e.getMessage());

}

return null;

}

}

4、application.properties

server.port=8001

server.servlet.context-path=/producer

spring.application.name=producer

spring.cloud.stream.bindings.output_channel.destination=exchange-3

group相当于RabbitMQ中Queue的名称

spring.cloud.stream.bindings.output_channel.group=queue-3

##以下为集群环境配置,rabbit_cluster与下面的spring.cloud.stream.binders.rabbit_cluster是对应的。

spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster

spring.cloud.stream.binders.rabbit_cluster.type=rabbit

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=user_cp

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_cp

2.1 消费端

1、pom.xml引入依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns=“http://maven.apache.org/POM/4.0.0” xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”

xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>

4.0.0

com.cp

rabbitmq-springcloudstream-consumer

0.0.1-SNAPSHOT

jar

rabbitmq-springcloudstream-consumer

rabbitmq-spring

org.springframework.boot

spring-boot-starter-parent

1.5.8.RELEASE

<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<java.version>1.8</java.version>

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-test

test

org.springframework.cloud

spring-cloud-starter-stream-rabbit

1.3.4.RELEASE

org.springframework.boot

spring-boot-starter-actuator

org.springframework.boot

spring-boot-maven-plugin

2、定义Barista接口

/**

  • 这里的Barista接口是定义来作为后面类的参数,这一接口定义来通道类型和通道名称。

  • 通道名称是作为配置用,通道类型则决定了app会使用这一通道进行发送消息还是从中接收消息。

*/

public interface Barista {

String INPUT_CHANNEL = “input_channel”;

//注解@Input声明了它是一个输入类型的通道,名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。这一名字与上述配置app2的配置文件中position1应该一致,表明注入了一个名字叫做input_channel的通道,它的类型是input,订阅的主题是position2处声明的mydest这个主题

@Input(Barista.INPUT_CHANNEL)

SubscribableChannel loginput();

}

3、定义RabbitmqReceiver类

//启动binding

@EnableBinding(Barista.class)

@Service

public class RabbitmqReceiver {

@StreamListener(Barista.INPUT_CHANNEL)

public void receiver(Message message) throws Exception {

//手工签收必须要有channel与deliveryTag

Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);

Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);

System.out.println(“Input Stream 1 接受数据:” + message);

System.out.println(“消费完毕------------”);

//批量签收设置为false

channel.basicAck(deliveryTag, false);

}

}

4、application.properties

server.port=8002

server.context-path=/consumer

spring.application.name=consumer

spring.cloud.stream.bindings.input_channel.destination=exchange-3

spring.cloud.stream.bindings.input_channel.group=queue-3

spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster

##默认监听数

spring.cloud.stream.bindings.input_channel.consumer.concurrency=1

##针对消费端channel进行设置,是否支持requeue,重回队列

spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false

##是否支持签收,签收模式:手工签收

spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL

##服务重连

spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000

##是否持久化订阅

spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true

##最大监听数

spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5

##采用rabbitmq方式,也可以采用kafka

spring.cloud.stream.binders.rabbit_cluster.type=rabbit

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=localhost:5672

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=user_cp

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=123456

spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/vhost_cp

3. 运行测试


3.1 运行消费端

启动项目后,查看管控台是否生成了Exchange与Queue

Exchange

Queue

启动项目后,SpringCloudStream生成了Exchange与Queue。

3.2 运行生产端测试代码

@RunWith(SpringRunner.class)

@SpringBootTest

public class ApplicationTests {

@Autowired

private RabbitmqSender rabbitmqSender;

@Test

public void sendMessageTest1() throws InterruptedException {

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

最后的内容

在开头跟大家分享的时候我就说,面试我是没有做好准备的,全靠平时的积累,确实有点临时抱佛脚了,以至于我自己还是挺懊恼的。(准备好了或许可以拿个40k,没做准备只有30k+,你们懂那种感觉吗)

如何准备面试?

1、前期铺垫(技术沉积)

程序员面试其实是对于技术的一次摸底考试,你的技术牛逼,那你就是大爷。大厂对于技术的要求主要体现在:基础,原理,深入研究源码,广度,实战五个方面,也只有将原理理论结合实战才能把技术点吃透。

下面是我会看的一些资料笔记,希望能帮助大家由浅入深,由点到面的学习Java,应对大厂面试官的灵魂追问

这部分内容过多,小编只贴出部分内容展示给大家了,见谅见谅!

  • Java程序员必看《Java开发核心笔记(华山版)》

  • Redis学习笔记

  • Java并发编程学习笔记

四部分,详细拆分并发编程——并发编程+模式篇+应用篇+原理篇

  • Java程序员必看书籍《深入理解 ava虚拟机第3版》(pdf版)

  • 大厂面试必问——数据结构与算法汇集笔记

其他像Spring,SpringBoot,SpringCloud,SpringCloudAlibaba,Dubbo,Zookeeper,Kafka,RocketMQ,RabbitMQ,Netty,MySQL,Docker,K8s等等我都整理好,这里就不一一展示了。

2、狂刷面试题

技术主要是体现在平时的积累实用,面试前准备两个月的时间再好好复习一遍,紧接着就可以刷面试题了,下面这些面试题都是小编精心整理的,贴给大家看看。

①大厂高频45道笔试题(智商题)

②BAT大厂面试总结(部分内容截图)

③面试总结

3、结合实际,修改简历

程序员的简历一定要多下一些功夫,尤其是对一些字眼要再三斟酌,如“精通、熟悉、了解”这三者的区别一定要区分清楚,否则就是在给自己挖坑了。当然不会包装,我可以将我的简历给你参考参考,如果还不够,那下面这些简历模板任你挑选:

以上分享,希望大家可以在金三银四跳槽季找到一份好工作,但千万也记住,技术一定是平时工作种累计或者自学(或报班跟着老师学)通过实战累计的,千万不要临时抱佛脚。

另外,面试中遇到不会的问题不妨尝试讲讲自己的思路,因为有些问题不是考察我们的编程能力,而是逻辑思维表达能力;最后平时要进行自我分析与评价,做好职业规划,不断摸索,提高自己的编程能力和抽象思维能力。
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!
982785)]

[外链图片转存中…(img-FM3HsOnK-1712483982786)]

③面试总结

[外链图片转存中…(img-7XZLjshl-1712483982786)]

[外链图片转存中…(img-wtdIi5qp-1712483982786)]

3、结合实际,修改简历

程序员的简历一定要多下一些功夫,尤其是对一些字眼要再三斟酌,如“精通、熟悉、了解”这三者的区别一定要区分清楚,否则就是在给自己挖坑了。当然不会包装,我可以将我的简历给你参考参考,如果还不够,那下面这些简历模板任你挑选:

[外链图片转存中…(img-QHLrFA2A-1712483982787)]

以上分享,希望大家可以在金三银四跳槽季找到一份好工作,但千万也记住,技术一定是平时工作种累计或者自学(或报班跟着老师学)通过实战累计的,千万不要临时抱佛脚。

另外,面试中遇到不会的问题不妨尝试讲讲自己的思路,因为有些问题不是考察我们的编程能力,而是逻辑思维表达能力;最后平时要进行自我分析与评价,做好职业规划,不断摸索,提高自己的编程能力和抽象思维能力。
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》点击传送门即可获取!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/781786
推荐阅读
相关标签
  

闽ICP备14008679号