赞
踩
上文:spring整合中间件(RocketMQ、kafka、RabbitMQ)-RabbitMQ
环境相关先参照:ActiveMQ windows10 安装
activemq java实现简单收发
项目结构
- │ pom.xml
- │
- └─src
- ├─main
- │ ├─java
- │ │ └─com
- │ │ └─hong
- │ │ └─activemq
- │ │ Consumer.java
- │ │ Producer.java
- │ │
- │ └─resources
- └─test
- └─java
com.hong.activemq.Consumer
- package com.hong.activemq;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- import javax.jms.*;
-
- /**
- * @author: csh
- * @Date: 2021/4/19 15:06
- * @Description:消费者
- */
- public class Consumer {
- public static void main(String[] args) throws JMSException {
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.URL);
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- connection.start();
- Queue destination = session.createQueue(Producer.queueName);
- MessageConsumer consumer = session.createConsumer(destination);
- //创建监听器
- consumer.setMessageListener(new MessageListener() {
- public void onMessage(Message message) {
- TextMessage textMessage = (TextMessage)message;
- try {
- System.out.println("收到信息:"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
- }
- package com.hong.activemq;
-
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- import javax.jms.*;
-
- /**
- * @author: csh
- * @Date: 2021/4/19 15:05
- * @Description:active生产
- */
- public class Producer {
- //地址
- public static final String URL = "tcp://localhost:61616";
- //队列名称
- public static final String queueName = "QUEUE_HONG";
-
- public static void main(String args[]) throws JMSException {
- //1.创建ConnectionFactory
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
- //2.创建Connection
- Connection connection = connectionFactory.createConnection();
- //3.启动连接
- connection.start();
- //4.创建会话
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //5.创建一个目标
- Destination destination = session.createQueue(queueName);
- //6.创建一个生产者
- MessageProducer producer = session.createProducer(destination);
- for (int i = 0 ;i < 10; i++){
- //7.创建消息
- TextMessage message = session.createTextMessage("hello I'm hong:"+i);
- //8.发送消息
- producer.send(message);
- System.out.println("发送消息:"+i+"成功!");
- }
- //9.关闭连接
- connection.close();
- }
- }
spring 整合 activemq
spring_activemq_producer 生产者 端口:8488
项目结构
├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hong │ │ │ └─spring │ │ │ ├─ao │ │ │ └─common │ │ └─resources │ └─test │ └─java ├─target │ ├─classes │ │ └─com │ │ └─hong │ │ └─spring │ │ ├─ao │ │ └─common │ └─generated-sources │ └─annotations └─web └─WEB-INF
实现代码
com.hong.spring.common.MqUtils
- package com.hong.spring.common;
-
- import lombok.extern.log4j.Log4j2;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsTemplate;
- import org.springframework.jms.core.MessageCreator;
- import org.springframework.stereotype.Service;
-
- import javax.jms.Destination;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.Session;
-
-
- /**
- * @author: csh
- * @Date: 2021/4/19 16:21
- * @Description:mq工具类
- */
- @Service
- @Log4j2
- public class MqUtils {
- @Autowired
- private JmsTemplate jmsTemplate;
- /**
- * 向指定Destination(队列)发送text消息
- *
- * @param destination
- * @param message
- */
- public void sendTxtMessage(Destination destination, final String message) {
- if (null == destination) {
- destination = jmsTemplate.getDefaultDestination();
- }
- jmsTemplate.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- log.info("发送的消息{}",message);
- return session.createTextMessage(message);
- }
- });
-
- }
- }
com.hong.spring.UserController
- package com.hong.spring;
-
- import com.alibaba.fastjson.JSONObject;
- import com.hong.spring.ao.UserSaveAO;
- import com.hong.spring.common.MqUtils;
- import com.hong.spring.entity.User;
- import com.hong.spring.utils.DataResponse;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.beans.BeanUtils;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import javax.jms.Destination;
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 16:11
- * @Description:
- */
- @RestController
- @RequestMapping("/user/")
- @Log4j2
- public class UserController {
-
- @Autowired
- private MqUtils mqUtils;
-
- @Autowired
- private Destination queueDestination;
-
-
- @RequestMapping("save")
- public DataResponse<Boolean> save(UserSaveAO ao){
- log.info("添加用户入参{}",JSONObject.toJSONString(ao));
- if(null==ao){
- return DataResponse.BuildFailResponse("参数不能为空!");
- }
- try {
- User user = new User();
- BeanUtils.copyProperties(ao,user);
- //发送
- mqUtils.sendTxtMessage(queueDestination,JSONObject.toJSONString(user));
- return DataResponse.BuildSuccessResponse("添加用户成功!");
- }catch (Exception e){
- log.error("添加出错{}",e);
- return DataResponse.BuildFailResponse("添加出错请重试!");
- }
- }
- }
com.hong.spring.ao.UserSaveAO
- package com.hong.spring.ao;
-
- import lombok.Data;
-
- import java.io.Serializable;
-
- /**
- * @author: csh
- * @Date: 2021/3/16 11:21
- * @Description:用户入参
- */
- @Data
- public class UserSaveAO implements Serializable {
- private Integer id;
- private String username;
- private Integer age;
- }
logging.properties
- org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
- org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
-
- handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
-
- ############################################################
- # Handler specific properties.
- # Describes specific configuration info for Handlers.
- ############################################################
-
- org.apache.juli.FileHandler.level = FINE
- org.apache.juli.FileHandler.directory = ../logs
- org.apache.juli.FileHandler.prefix = error-debug.
-
- java.util.logging.ConsoleHandler.level = FINE
- java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
log4j2.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration status="INFO">
- <appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
- </Console>
- <RollingFile name="RollingFile" fileName="logs/app.log"
- filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
- <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
- <SizeBasedTriggeringPolicy size="5 MB"/>
- </RollingFile>
- </appenders>
- <loggers>
- <root level="DEBUG">
- <appender-ref ref="Console"/>
- <appender-ref ref="RollingFile"/>
- </root>
- </loggers>
- </configuration>
applicationContext.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
-
-
- <!-- 配置组件扫描 -->
- <context:component-scan base-package="com.hong.spring"></context:component-scan>
- <!--加载配置文件-->
- <context:property-placeholder location="classpath:activemq.properties"/>
-
- <!-- 开启注解 -->
- <context:annotation-config />
-
- <mvc:default-servlet-handler />
-
-
- <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
- id="internalResourceViewResolver">
- <!-- 前缀 -->
- <property name="prefix" value="/WEB-INF/pages/" />
- <!-- 后缀 -->
- <property name="suffix" value=".html" />
- <property name="contentType" value="text/html"/>
-
- </bean>
-
- <!--开启mvc注解事务-->
- <!-- 定义注解驱动 -->
- <mvc:annotation-driven>
- <mvc:message-converters>
- <!-- 设置支持中文 -->
- <bean class="org.springframework.http.converter.StringHttpMessageConverter">
- <property name="supportedMediaTypes">
- <list>
- <value>text/plain;charset=UTF-8</value>
- <value>text/html;charset=UTF-8</value>
- </list>
- </property>
- </bean>
- <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
- </mvc:message-converters>
- </mvc:annotation-driven>
- </beans>
application.properties
- logging.level.root=WARN
- logging.level.org.springframework.web=DEBUG
- logging.level.org.hibernate=ERROR
activemq.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
- <!--创建ActiveMQ为我们提供的连接工厂-->
- <bean id = "targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="${activemq.brokerURL}"></property>
- <property name="userName" value="${activemq.username}" ></property>
- <property name="password" value="${activemq.password}"></property>
- </bean>
- <!--创建spring提供的JMS连接池-->
- <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
- <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
- </bean>
- <!--创建目的地,队列模式-->
- <bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
- <!--多队列名称以逗号分割开-->
- <constructor-arg name="name" value="${activemq.queueName}"></constructor-arg>
- </bean>
- <!--创建目的地,订阅模式-->
- <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
- <constructor-arg value="topic_spring_hong"/>
- </bean>
-
- <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。-->
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="defaultDestination" ref="queueDestination" />
- <property name="receiveTimeout" value="10000" />
- <!-- true是topic,false是queue,默认是false,此处显示写出false -->
- <property name="pubSubDomain" value="false" />
- </bean>
- </beans>
activemq.properties
- #地址
- activemq.brokerURL=tcp://127.0.0.1:61616
- activemq.username=admin
- activemq.password=admin
- #多队列名称以逗号分割开
- activemq.queueName=queue_spring_user
WEB-INF/web.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
- version="3.1">
- <servlet>
- <servlet-name>spring_activemq_producer</servlet-name>
- <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
- <init-param>
- <param-name>contextConfigLocation</param-name>
- <param-value>classpath:applicationContext.xml,
- classpath:activemq.xml
- </param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
-
- <filter>
- <filter-name>encodingFilter</filter-name>
- <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
- <init-param>
- <param-name>encoding</param-name>
- <param-value>UTF-8</param-value>
- </init-param>
- <init-param>
- <param-name>forceEncoding</param-name>
- <param-value>true</param-value>
- </init-param>
- </filter>
- <filter-mapping>
- <filter-name>encodingFilter</filter-name>
- <url-pattern>/*</url-pattern>
- </filter-mapping>
- <servlet-mapping>
- <servlet-name>spring_activemq_producer</servlet-name>
- <url-pattern>/</url-pattern>
- </servlet-mapping>
- </web-app>
请求接口:http://localhost:8488/user/save?username=spring_activemq&age=101
参数
- username:spring_activemq
- age:101
spring_activemq_consumer 消费者 端口:8483
项目结构
│ pom.xml │ spring_activemq_consumer.iml │ ├─src │ ├─main │ │ ├─java │ │ │ └─com │ │ │ └─hong │ │ │ └─spring │ │ │ ├─dao │ │ │ │ UserMapper.java │ │ │ │ │ │ │ ├─listener │ │ │ │ UserListener.java │ │ │ │ │ │ │ ├─mapper │ │ │ │ UserMapper.xml │ │ │ │ │ │ │ └─provider │ │ │ UserServiceImpl.java │ │ │ │ │ └─resources │ │ activemq.properties │ │ activemq.xml │ │ application.properties │ │ applicationContext.xml │ │ jdbc.properties │ │ log4j2.xml │ │ logging.properties │ │ mybatis.xml │ │ │ └─test │ └─java │ │ └─com │ │ └─hong │ │ └─spring │ │ ├─dao │ │ │ UserMapper.class │ │ │ │ │ ├─listener │ │ │ UserListener.class │ │ │ │ │ ├─mapper │ │ │ UserMapper.xml │ │ │ │ │ └─provider │ │ UserServiceImpl.class │ │ │ └─generated-sources │ └─annotations └─web └─WEB-INF web.xml
代码实现
com.hong.spring.dao.UserMapper
- package com.hong.spring.dao;
-
- import com.hong.spring.entity.User;
- import com.hong.spring.entity.ao.UserAO;
- import org.apache.ibatis.annotations.Param;
-
- import java.util.List;
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 15:04
- * @Description:用户dao层
- */
-
- public interface UserMapper {
-
- /**
- *
- * 功能描述:查询总条数
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/18 15:31
- */
- List<User> findAllUserList();
- /**
- *
- * 功能描述:获取总数
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/18 15:30
- */
- int findAllTotal();
- /**
- *
- * 功能描述:更新
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/18 15:30
- */
- int update(User user);
- /**
- *
- * 功能描述:添加
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/19 18:39
- */
- int save(User user);
- /**
- *
- * 功能描述:批量添加
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/21 15:46
- */
- int insertBatch(@Param("list") List <User> list);
- /**
- *
- * 功能描述:通过id查询
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/19 18:39
- */
- User findById(int id);
- /**
- *
- * 功能描述:通过分页查询
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2020/8/21 16:05
- */
- List<User> findByPage(UserAO ao);
- }
com.hong.spring.listener.UserListener
- package com.hong.spring.listener;
-
- import com.alibaba.fastjson.JSONObject;
- import com.hong.spring.api.IUserService;
- import com.hong.spring.entity.User;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.annotation.EnableJms;
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Component;
- import org.springframework.stereotype.Service;
-
- import javax.jms.Message;
- import javax.jms.MessageListener;
- import javax.jms.TextMessage;
-
- /**
- * @author: csh
- * @Date: 2021/3/16 11:14
- * @Description:用户监听
- */
- @Log4j2
- @Service("userListener")
- public class UserListener implements MessageListener {
-
- @Autowired
- private IUserService userService;
-
- @Override
- public void onMessage(Message message) {
- try {
- log.info("传进来的数据为{}",JSONObject.toJSONString(message));
- if(null!=message){
- TextMessage tm = (TextMessage) message;
- String text = tm.getText();
- log.info("获取到的文本内容:"+text);
- User user = JSONObject.parseObject(text, User.class);
- userService.save(user);
- }
- }catch (Exception e){
- log.error("处理出错{}",e);
- }
- }
- }
src/main/java/com/hong/spring/mapper/UserMapper.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <!DOCTYPE mapper
- PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
- <mapper namespace="com.hong.spring.dao.UserMapper">
- <resultMap type="com.hong.spring.entity.User" id="user">
- <id column="id" property="id" />
- <result column="user_name" property="username" />
- <result column="age" property="age" />
- </resultMap>
-
- <select id="findById" resultType="com.hong.spring.entity.User">
- SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
- </select>
-
- <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
- select * from user where 1=1 limit #{page},#{pageSize}
- </select>
-
- <select id="findAllUserList" resultMap="user">
- SELECT * FROM user
- </select>
-
- <select id="findAllTotal" resultType="int">
- SELECT count(*) FROM user
- </select>
-
- <insert id="save" >
- INSERT INTO user ( user_name, age)
- VALUES (#{username,jdbcType=VARCHAR},
- #{age,jdbcType=INTEGER})
- </insert>
-
- <insert id="insertBatch">
- insert into user
- ( user_name, age)
- values
- <foreach collection="list" item="user" index="index"
- separator=",">
- (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
- </foreach>
- </insert>
-
- <update id="update" >
- update user
- <set>
- <if test="username !=null">
- user_name=#{username,jdbcType=VARCHAR},
- </if>
- <if test="age !=null">
- age =#{age,jdbcType=INTEGER}
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
- </mapper>
com.hong.spring.provider.UserServiceImpl
- package com.hong.spring.provider;
-
- import com.hong.spring.api.IUserService;
- import com.hong.spring.dao.UserMapper;
- import com.hong.spring.entity.User;
- import com.hong.spring.entity.ao.UserAO;
- import com.hong.spring.utils.DataResponse;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.List;
-
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 15:16
- * @Description:用户实现
- */
- @Service("userService")
- public class UserServiceImpl implements IUserService {
- @Autowired
- private UserMapper userDao;
-
- @Override
- public DataResponse<List<User>> findByAll() {
- List <User> allUserList = userDao.findAllUserList();
- int allTotal = userDao.findAllTotal();
- return DataResponse.BuildSuccessResponse(allUserList,allTotal);
- }
- @Override
- @Transactional
- public DataResponse <Boolean> save(User user) {
- if(null==user){
- return DataResponse.BuildFailResponse("必传参数不能为空!");
- }
- int save = userDao.save(user);
- return DataResponse.BuildSuccessResponse(save>0?true:false);
- }
-
- @Override
- public DataResponse <Boolean> insertBatch(List <User> list) {
- if(null==list){
- return DataResponse.BuildFailResponse("参数不能为空!");
- }
- int batchSave = userDao.insertBatch(list);
- return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
- }
-
- @Override
- @Transactional
- public DataResponse <Boolean> update(User user) {
- if(null==user || user.getId()==null){
- return DataResponse.BuildFailResponse("必传参数不能为空!");
- }
- int update = userDao.update(user);
- return DataResponse.BuildSuccessResponse(update>0?true:false);
- }
- @Override
- public DataResponse <User> findById(int i) {
- User byId = userDao.findById(i);
- return DataResponse.BuildSuccessResponse(byId);
- }
-
- @Override
- public DataResponse <List <User>> findByPage(UserAO ao) {
- if(ao==null){
- ao.setPage(0);
- ao.setPageSize(10);
- }else{
- ao.setPage(ao.getPageSize() * ao.getPage());
- }
- int allTotal = userDao.findAllTotal();
- List <User> byPage = userDao.findByPage(ao);
- return DataResponse.BuildSuccessResponse(byPage,allTotal);
- }
- }
src/main/resources/activemq.properties
- #地址
- activemq.brokerURL=tcp://127.0.0.1:61616
- activemq.username=admin
- activemq.password=admin
- #多队列名称以逗号分割开
- activemq.queueName=queue_spring_user
src/main/resources/activemq.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.alibaba.com/schema/stat"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.alibaba.com/schema/stat http://www.alibaba.com/schema/stat.xsd">
-
- <jms:annotation-driven/>
-
- <!--创建ActiveMQ为我们提供的连接工厂-->
- <bean id = "consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="${activemq.brokerURL}"></property>
- <property name="userName" value="${activemq.username}" ></property>
- <property name="password" value="${activemq.password}"></property>
- </bean>
- <!-- 配置JMS连接工长 -->
- <bean id="mqConnectionFactory"
- class="org.springframework.jms.connection.CachingConnectionFactory">
- <constructor-arg ref="consumerConnectionFactory" />
- <property name="sessionCacheSize" value="100" />
- </bean>
-
- <!--创建目的地,队列模式-->
- <bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
- <!--多队列名称以逗号分割开-->
- <constructor-arg name="name" value="${activemq.queueName}"></constructor-arg>
- </bean>
- <!--创建目的地,订阅模式-->
- <!--<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">-->
- <!--<constructor-arg value="topic_spring_hong"/>-->
- <!--</bean>-->
-
- <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。-->
- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
- <property name="connectionFactory" ref="mqConnectionFactory" />
- <property name="defaultDestination" ref="queueDestination" />
- <property name="receiveTimeout" value="10000" />
- <!-- true是topic,false是queue,默认是false,此处显示写出false -->
- <property name="pubSubDomain" value="false" />
- </bean>
-
-
- <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
- <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="mqConnectionFactory" />
- <property name="destination" ref="queueDestination" />
- <property name="messageListener" ref="userListener" />
- </bean>
- </beans>
src/main/resources/application.properties
- logging.level.root=WARN
- logging.level.org.springframework.web=DEBUG
- logging.level.org.hibernate=ERROR
src/main/resources/applicationContext.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
- xmlns:mvc="http://www.springframework.org/schema/mvc"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
-
-
- <!-- 配置组件扫描 -->
- <context:component-scan base-package="com.hong.spring"></context:component-scan>
- <!--加载配置文件-->
- <context:property-placeholder location="classpath:jdbc.properties,classpath:activemq.properties"/>
-
- <!-- 开启注解 -->
- <context:annotation-config />
- <!--开启注解事务-->
- <tx:annotation-driven transaction-manager="transactionManager" />
- <!--放行静态资源-->
- <mvc:default-servlet-handler />
-
-
- <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
- id="internalResourceViewResolver">
- <!-- 前缀 -->
- <property name="prefix" value="/WEB-INF/pages/" />
- <!-- 后缀 -->
- <property name="suffix" value=".html" />
- <property name="contentType" value="text/html"/>
-
- </bean>
-
- <!--开启mvc注解事务-->
- <!-- 定义注解驱动 -->
- <mvc:annotation-driven>
- <mvc:message-converters>
- <!-- 设置支持中文 -->
- <bean class="org.springframework.http.converter.StringHttpMessageConverter">
- <property name="supportedMediaTypes">
- <list>
- <value>text/plain;charset=UTF-8</value>
- <value>text/html;charset=UTF-8</value>
- </list>
- </property>
- </bean>
- <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
- </mvc:message-converters>
- </mvc:annotation-driven>
-
-
- <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
- <!-- 基础配置 -->
- <property name="url" value="${jdbc.url}"></property>
- <property name="driverClassName" value="${jdbc.driver}"></property>
- <property name="username" value="${jdbc.user}"></property>
- <property name="password" value="${jdbc.password}"></property>
-
- <!-- 关键配置 -->
- <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
- <property name="initialSize" value="3" />
- <!-- 最小连接池数量 -->
- <property name="minIdle" value="2" />
- <!-- 最大连接池数量 -->
- <property name="maxActive" value="15" />
- <!-- 配置获取连接等待超时的时间 -->
- <property name="maxWait" value="10000" />
-
- <!-- 性能配置 -->
- <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
- <property name="poolPreparedStatements" value="true" />
- <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
-
- <!-- 其他配置 -->
- <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
- <property name="timeBetweenEvictionRunsMillis" value="60000" />
- <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
- <property name="minEvictableIdleTimeMillis" value="300000" />
- <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
- 执行validationQuery检测连接是否有效。-->
- <property name="testWhileIdle" value="true" />
- <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
- <property name="testOnBorrow" value="true" />
- <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
- <property name="testOnReturn" value="false" />
- </bean>
-
- <!--事务管理器-->
- <!-- sqlSessionFactory -->
- <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
- <!-- 加载 MyBatis 的配置文件 -->
- <property name="configLocation" value="classpath:mybatis.xml"/>
- <!-- 数据源 -->
- <property name="dataSource" ref="dataSource"/>
- <!-- 所有配置的mapper文件 -->
- <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
- </bean>
-
- <!-- Mapper 扫描器 -->
- <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
- <!-- 扫描 包下的组件 -->
- <property name="basePackage" value="com.hong.spring.dao" />
- <!-- 关联mapper扫描器 与 sqlsession管理器 -->
- <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
- </bean>
- <!--事务配置-->
- <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
- <property name="dataSource" ref="dataSource" />
- </bean>
- </beans>
src/main/resources/jdbc.properties
- config.properties:
- #数据库驱动
- jdbc.driver=com.mysql.jdbc.Driver
- #数据库连接url
- jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
- #数据库用户名
- jdbc.user=root
- #数据库密码
- jdbc.password=123456
src/main/resources/log4j2.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <configuration status="INFO">
- <appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
- </Console>
- <RollingFile name="RollingFile" fileName="logs/app.log"
- filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
- <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
- <SizeBasedTriggeringPolicy size="5 MB"/>
- </RollingFile>
- </appenders>
- <loggers>
- <root level="DEBUG">
- <appender-ref ref="Console"/>
- <appender-ref ref="RollingFile"/>
- </root>
- </loggers>
- </configuration>
src/main/resources/logging.properties
- org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
- org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
- org.apache.jasper.servlet.TldScanner.level = FINE
-
- handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
-
- ############################################################
- # Handler specific properties.
- # Describes specific configuration info for Handlers.
- ############################################################
-
- org.apache.juli.FileHandler.level = FINE
- org.apache.juli.FileHandler.directory = ../logs
- org.apache.juli.FileHandler.prefix = error-debug.
-
- java.util.logging.ConsoleHandler.level = FINE
- java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
src/main/resources/mybatis.xml
- <?xml version="1.0" encoding="UTF-8" ?>
- <!DOCTYPE configuration
- PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-config.dtd">
- <configuration>
-
- <!-- settings -->
- <settings>
- <!-- 打开延迟加载的开关 -->
- <setting name="lazyLoadingEnabled" value="true"/>
- <!-- 将积极加载改为消极加载(即按需加载) -->
- <setting name="aggressiveLazyLoading" value="false"/>
- <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
- <setting name="cacheEnabled" value="true"/>
- <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
- <setting name="mapUnderscoreToCamelCase" value="true"/>
- <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
- <setting name="useColumnLabel" value="true"/>
- <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
- <setting name="useGeneratedKeys" value="true"/>
- </settings>
-
- <!-- 别名定义 -->
- <typeAliases>
- <package name="com.hong.spring.entity"/>
- </typeAliases>
-
- </configuration>
web/WEB-INF/web.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
- version="3.1">
- <servlet>
- <servlet-name>spring_activemq_consumer</servlet-name>
- <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
- <init-param>
- <param-name>contextConfigLocation</param-name>
- <param-value>classpath:applicationContext.xml,
- classpath:activemq.xml
- </param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
-
- <filter>
- <filter-name>encodingFilter</filter-name>
- <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
- <init-param>
- <param-name>encoding</param-name>
- <param-value>UTF-8</param-value>
- </init-param>
- <init-param>
- <param-name>forceEncoding</param-name>
- <param-value>true</param-value>
- </init-param>
- </filter>
- <filter-mapping>
- <filter-name>encodingFilter</filter-name>
- <url-pattern>/*</url-pattern>
- </filter-mapping>
- <servlet-mapping>
- <servlet-name>spring_activemq_consumer</servlet-name>
- <url-pattern>/</url-pattern>
- </servlet-mapping>
- </web-app>
tomcat配置及启动消费
- [2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Artifact is deployed successfully
- [2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Deploy took 10,965 milliseconds
- 17:11:09.982 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821878763,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAJ3siYWdlIjoxMCwidXNlcm5hbWUiOiJzcHJpbmdfYWN0aXZlbXEifQ==","length":43,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821878761,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":92,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1067,"text":"{\"age\":10,\"username\":\"spring_activemq\"}","timestamp":1618821878761,"wireFormatInfo":false}
- INFO | {dataSource-1} inited
- 17:11:11.731 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821970677,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAKHsiYWdlIjoxMDEsInVzZXJuYW1lIjoic3ByaW5nX2FjdGl2ZW1xIn0=","length":44,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821970675,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":97,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1068,"text":"{\"age\":101,\"username\":\"spring_activemq\"}","timestamp":1618821970675,"wireFormatInfo":false}
相对来说spring 整合activemq非常简单,而且这个activemq相关的接口文档也很easy。
springboot 整合 activemq
启动前先启动zk
实现一个逻辑,通过api请求,然后请求producer如果是get请求则直接查mybatis,如果是post请求则直接发送activemq给consumer消费端,当消费消息的时候i%==0则为队列添加,如果是i%!=0则进行topic添加。
springboot_activemq_api 端口:8386
│ pom.xml │ └─src └─main ├─java │ └─com │ └─hong │ └─springboot │ │ Application.java │ │ │ └─controller │ IndexController.java │ UserController.java │ └─resources application.properties
com.hong.springboot.controller.IndexController
- package com.hong.springboot.controller;
-
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- /**
- * @author: csh
- * @Date: 2021/1/12 10:16
- * @Description:首页
- */
- @RestController
- public class IndexController {
- @RequestMapping("/")
- public String index(){
- return "成功!";
- }
- }
com.hong.springboot.controller.UserController
- package com.hong.springboot.controller;
-
-
- import com.alibaba.dubbo.common.utils.StringUtils;
- import com.alibaba.dubbo.config.annotation.Reference;
- import com.hong.springboot.api.IUserService;
- import com.hong.springboot.entity.User;
- import com.hong.springboot.utils.DataResponse;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.PostMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.util.List;
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 16:11
- * @Description:用户
- */
- @RestController
- @Slf4j
- @RequestMapping("/user")
- public class UserController {
- @Reference
- private IUserService userService;
-
- @GetMapping("/findByAll")
- public DataResponse<List<User>> findByAll(){
- try {
- return userService.findByAll();
- } catch (Exception e){
- log.error("查询出错{}",e);
- }
- return DataResponse.BuildFailResponse("查询出错!");
- }
-
- @PostMapping("/save")
- public DataResponse<Boolean> save(User ao){
- if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
- return DataResponse.BuildFailResponse("参数不能为空!");
- }
- DataResponse <Boolean> save = userService.save(ao);
- return save;
- }
- }
com.hong.springboot.Application
- package com.hong.springboot;
-
-
- import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- /**
- * @author: csh
- * @Date: 2020/11/21 11:37
- * @Description:springboot dubbo消费端
- */
- @SpringBootApplication(scanBasePackages = "com.hong.springboot")
- @EnableDubbo
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
- }
application.properties
- #dubbo configuration
- #服务名称
- dubbo.application.name=springboot_dubbo_consumer
- #注册中心协议
- dubbo.registry.protocol=zookeeper
- #注册地址
- dubbo.registry.address=zookeeper://127.0.0.1:2181
- #扫描注解包通过该设置将服务注册到zookeeper
- dubbo.protocol.scan=com.hong.springboot.api
- #注册端口
- dubbo.protocol.port=20880
- #协议名称
- dubbo.protocol.name=dubbo
- #扫包
- dubbo.scan.basePackages=com.hong.springboot.controller
-
- #避免端口冲突
- server.port=8386
springboot_all/springboot_activemq_api/pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_all</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <relativePath/>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_activemq_api</artifactId>
-
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_mq_api</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.boot</groupId>
- <artifactId>dubbo-spring-boot-starter</artifactId>
- <version>0.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.4-beta</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- </dependencies>
-
- <!--静态资源导出问题-->
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- </resources>
- </build>
-
- </project>
springboot_activemq_producer 端口:8387
com.hong.springboot.config.ActiveMQConfig
- package com.hong.springboot.config;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.activemq.command.ActiveMQTopic;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
- import org.springframework.jms.config.JmsListenerContainerFactory;
-
- import javax.jms.Queue;
- import javax.jms.Topic;
-
-
- @Configuration
- public class ActiveMQConfig {
- @Value("${spring.activemq.queue}")
- private String queueName;
-
- @Value("${spring.activemq.topic}")
- private String topicName;
-
- @Value("${spring.activemq.user}")
- private String usrName;
-
- @Value("${spring.activemq.password}")
- private String password;
-
- @Value("${spring.activemq.broker-url}")
- private String brokerUrl;
-
- @Bean
- public Queue queue(){
- return new ActiveMQQueue(queueName);
- }
-
- @Bean
- public Topic topic(){
- return new ActiveMQTopic(topicName);
- }
-
- @Bean
- public ActiveMQConnectionFactory connectionFactory() {
- return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
- }
-
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- bean.setConnectionFactory(connectionFactory);
- return bean;
- }
-
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- //设置为发布订阅方式, 默认情况下使用的生产消费者方式
- bean.setPubSubDomain(true);
- bean.setConnectionFactory(connectionFactory);
- return bean;
- }
- }
com.hong.springboot.config.DruidConfig
- package com.hong.springboot.config;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import javax.sql.DataSource;
-
- /**
- * @author: csh
- * @Date: 2021/1/8 18:08
- * @Description:数据源配置
- */
- @Configuration
- public class DruidConfig {
- @Bean
- @ConfigurationProperties(prefix = "spring.datasource")
- public DataSource dataSource(){
- return new DruidDataSource();
- }
- }
com.hong.springboot.dao.UserMapper
- package com.hong.springboot.dao;
-
- import com.hong.springboot.entity.User;
- import org.apache.ibatis.annotations.Insert;
- import org.apache.ibatis.annotations.Select;
-
- import java.util.List;
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 15:04
- * @Description:用户dao层
- */
-
- public interface UserMapper {
- @Select("select id,user_name,age from user")
- List<User> findAllUser();
-
- @Insert("insert into user (user_name,age) values(#{username},#{age})")
- int insert(User user);
- }
com.hong.springboot.provider.UserServiceImpl
- package com.hong.springboot.provider;
-
-
- import com.alibaba.druid.util.StringUtils;
- import com.alibaba.dubbo.config.annotation.Service;
- import com.alibaba.fastjson.JSONObject;
- import com.hong.springboot.api.IUserService;
- import com.hong.springboot.dao.UserMapper;
- import com.hong.springboot.entity.User;
- import com.hong.springboot.utils.DataResponse;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jms.core.JmsMessagingTemplate;
- import org.springframework.transaction.annotation.Transactional;
-
- import javax.jms.Queue;
- import javax.jms.Topic;
- import java.util.List;
-
-
- /**
- * @Auther: csh
- * @Date: 2020/8/18 15:16
- * @Description:用户实现
- */
- @Service(interfaceClass = IUserService.class,timeout = 6000)
- @Slf4j
- public class UserServiceImpl implements IUserService {
- @Autowired
- private UserMapper userDao;
- @Autowired
- private JmsMessagingTemplate jmsMessagingTemplate;
-
- @Autowired
- private Queue queue;
-
- @Autowired
- private Topic topic;
-
- @Override
- public DataResponse<List<User>> findByAll() {
- List <User> allUserList = userDao.findAllUser();
- return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
- }
- //随机用
- private static int i = 0;
-
- @Override
- public DataResponse <Boolean> save(User userAO) {
- i++;
- log.info("需要activemq添加的用户信息{}",JSONObject.toJSONString(userAO));
- try {
- //当==0时则用队列发送,当不等于的时候用topic发送。
- if(i%2==0){
- jmsMessagingTemplate.convertAndSend(queue,JSONObject.toJSONString(userAO));
- }else{
- jmsMessagingTemplate.convertAndSend(topic,JSONObject.toJSONString(userAO));
- }
- }catch (Exception e){
- e.printStackTrace();
- return DataResponse.BuildSuccessResponse(false);
- }
- return DataResponse.BuildSuccessResponse(true);
- }
-
- @Transactional
- @Override
- public DataResponse <Boolean> reallySave(User user) {
- log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
- if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
- return DataResponse.BuildFailResponse("参数不能为空!");
- }
- int insert = userDao.insert(user);
- return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
- }
- }
com.hong.springboot.Application
- package com.hong.springboot;
-
- import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- /**
- * @author: csh
- * @Date: 2020/11/21 11:37
- * @Description:
- */
- @SpringBootApplication(scanBasePackages = "com.hong.springboot")
- @MapperScan("com.hong.springboot.dao")
- @EnableDubbo
- public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
-
- }
application.properties
- rocketmq.name-server=localhost:9876
- rocketmq.producer.group=hong_group
- rocketmq.producer.sendMessageTimeout=300000
-
- #dubbo configuration
- #服务名称
- dubbo.application.name=springboot_rabbitmq_producer
-
- dubbo.registry.protocol=zookeeper
- #注册地址
- dubbo.registry.address=zookeeper://127.0.0.1:2181
- #扫描注解包通过该设置将服务注册到zookeeper
- dubbo.protocol.scan=com.hong.springboot.api
- #注册端口
- dubbo.protocol.port=20881
- #协议名称
- dubbo.protocol.name=dubbo
-
- #避免端口冲突
- server.port=8387
- spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
- spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
- spring.datasource.username=root
- spring.datasource.password=123456
-
-
- #mybatis配置
- mybatis.typeAliasesPackage=com.hong.springboot.entity
-
- spring.activemq.broker-url=tcp://localhost:61616
- spring.activemq.user=admin
- spring.activemq.password=admin
- spring.activemq.pool.enabled=true
- spring.activemq.pool.max-connections=50
- # 基于内存的ActiveMQ
- #spring.activemq.in-memory=true
- spring.activemq.queue=queue_springboot_user
- spring.activemq.topic=topic_springboot_user
springboot_all/springboot_activemq_producer/pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_all</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <relativePath/>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_activemq_producer</artifactId>
-
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_mq_api</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.mybatis.spring.boot</groupId>
- <artifactId>mybatis-spring-boot-starter</artifactId>
- <version>2.1.3</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba</groupId>
- <artifactId>druid-spring-boot-starter</artifactId>
- <version>1.1.10</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.boot</groupId>
- <artifactId>dubbo-spring-boot-starter</artifactId>
- <version>0.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.4-beta</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <!--<build>-->
- <!--<plugins>-->
- <!--<plugin>-->
- <!--<groupId>org.springframework.boot</groupId>-->
- <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
- <!--<configuration>-->
- <!--<skip>true</skip>-->
- <!--</configuration>-->
- <!--</plugin>-->
- <!--</plugins>-->
- <!--</build>-->
- <!--静态资源导出问题-->
- <build>
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- </resources>
- </build>
-
- </project>
springboot_activemq_consumer 端口:8388
com.hong.springboot.config.ActiveMQConfig
- package com.hong.springboot.config;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.command.ActiveMQQueue;
- import org.apache.activemq.command.ActiveMQTopic;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
- import org.springframework.jms.config.JmsListenerContainerFactory;
-
- import javax.jms.Queue;
- import javax.jms.Topic;
-
-
- @Configuration
- public class ActiveMQConfig {
- @Value("${spring.activemq.user}")
- private String queueName;
-
- @Value("${spring.activemq.topic}")
- private String topicName;
-
- @Value("${spring.activemq.user}")
- private String usrName;
-
- @Value("${spring.activemq.password}")
- private String password;
-
- @Value("${spring.activemq.broker-url}")
- private String brokerUrl;
-
- @Bean
- public Queue queue(){
- return new ActiveMQQueue(queueName);
- }
-
- @Bean
- public Topic topic(){
- return new ActiveMQTopic(topicName);
- }
-
-
-
-
- @Bean
- public ActiveMQConnectionFactory connectionFactory() {
- return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
- }
-
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- bean.setConnectionFactory(connectionFactory);
- bean.setPubSubDomain(false);
- return bean;
- }
-
- @Bean
- public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
- DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
- //设置为发布订阅方式, 默认情况下使用的生产消费者方式
- bean.setPubSubDomain(true);
- bean.setConnectionFactory(connectionFactory);
- return bean;
- }
- }
com.hong.springboot.listener.UserListener
- package com.hong.springboot.listener;
-
- import com.alibaba.dubbo.config.annotation.Reference;
- import com.alibaba.fastjson.JSONObject;
- import com.hong.springboot.api.IUserService;
- import com.hong.springboot.entity.User;
- import com.hong.springboot.utils.DataResponse;
- import lombok.extern.log4j.Log4j2;
- import org.springframework.jms.annotation.JmsListener;
- import org.springframework.stereotype.Service;
-
- import javax.jms.TextMessage;
-
- /**
- * @author: csh
- * @Date: 2021/3/16 11:14
- * @Description:用户监听
- */
- @Service
- @Log4j2
- public class UserListener {
-
- @Reference
- private IUserService userService;
-
- /**
- *
- * 功能描述: 基于队列添加
- *
- * @param:
- * @return:
- * @auther: csh
- * @date: 2021/4/20 10:56
- */
- @JmsListener(destination = "${spring.activemq.queue}",containerFactory = "jmsListenerContainerQueue")
- public void onMessage(TextMessage msg) {
- log.info("进入队列消费");
- if(null==msg){
- return;
- }
- try {
- User user = JSONObject.parseObject(msg.getText(),User.class);
- log.info("最终要添加的值{}",JSONObject.toJSONString(user));
- DataResponse<Boolean> save = userService.reallySave(user);
- if(save==null || !save.getData()){
- log.info("添加失败,原因{}",JSONObject.toJSONString(save));
- }
- }catch (Exception e){
- log.error("添加出错",e);
- }
- }
-
- @JmsListener(destination = "${spring.activemq.topic}",containerFactory = "jmsListenerContainerTopic")
- public void onMessage2(TextMessage msg) {
- log.info("进入topic消费");
- if(null==msg){
- return;
- }
- try {
- User user = JSONObject.parseObject(msg.getText(),User.class);
- log.info("最终要添加的值{}",JSONObject.toJSONString(user));
- DataResponse<Boolean> save = userService.reallySave(user);
- if(save==null || !save.getData()){
- log.info("添加失败,原因{}",JSONObject.toJSONString(save));
- }
- }catch (Exception e){
- log.error("添加出错",e);
- }
- }
-
- }
com.hong.springboot.Application
- package com.hong.springboot;
-
- import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.jms.annotation.EnableJms;
-
- /**
- * @author: csh
- * @Date: 2020/11/21 11:37
- * @Description:
- */
- @SpringBootApplication(scanBasePackages = "com.hong.springboot")
- @EnableDubbo
- @EnableJms
- public class Application {
-
- public static void main(String[] args) {
- SpringApplication.run(Application.class);
- }
-
- }
application.properties
- rocketmq.name-server=localhost:9876
- rocketmq.producer.group=hong_group
- rocketmq.producer.sendMessageTimeout=300000
-
- #避免端口冲突
- server.port=8388
-
- #dubbo configuration
- #服务名称
- dubbo.application.name=springboot_rocketmq_consumer
-
- dubbo.registry.protocol=zookeeper
- #注册地址
- dubbo.registry.address=zookeeper://127.0.0.1:2181
- #扫描注解包通过该设置将服务注册到zookeeper
- dubbo.protocol.scan=com.hong.springboot.api
- #注册端口
- dubbo.protocol.port=20882
- #协议名称
- dubbo.protocol.name=dubbo
- #扫包
- dubbo.scan.basePackages=com.hong.springboot.listener
-
- spring.activemq.broker-url=tcp://localhost:61616
- spring.activemq.user=admin
- spring.activemq.password=admin
- #spring.activemq.pool.enabled=true
- #spring.activemq.pool.max-connections=50
- # 基于内存的ActiveMQ
- #spring.activemq.in-memory=true
- spring.activemq.queue=queue_springboot_user
- spring.activemq.topic=topic_springboot_user
springboot_all/springboot_activemq_consumer/pom.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_all</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <relativePath/>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_activemq_consumer</artifactId>
-
-
- <properties>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-activemq</artifactId>
- </dependency>
-
-
-
- <dependency>
- <groupId>com.hong.springboot</groupId>
- <artifactId>springboot_mq_api</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.boot</groupId>
- <artifactId>dubbo-spring-boot-starter</artifactId>
- <version>0.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.5.4-beta</version>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <!--<build>-->
- <!--<plugins>-->
- <!--<plugin>-->
- <!--<groupId>org.springframework.boot</groupId>-->
- <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
- <!--<configuration>-->
- <!--<skip>true</skip>-->
- <!--</configuration>-->
- <!--</plugin>-->
- <!--</plugins>-->
- <!--</build>-->
- <!--静态资源导出问题-->
- <build>
- <resources>
- <resource>
- <directory>src/main/java</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- <resource>
- <directory>src/main/resources</directory>
- <includes>
- <include>**/*.properties</include>
- <include>**/*.xml</include>
- </includes>
- <filtering>false</filtering>
- </resource>
- </resources>
- </build>
-
- </project>
然后请求如下:http://localhost:8386/user/save?username=springboot_activemq&age=1000
- username:springboot_activemq
- age:1000
结果如下:
- 2021-04-20 11:07:42.501 INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入topic消费
- 2021-04-20 11:07:42.571 INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}
- 2021-04-20 11:07:45.861 INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入队列消费
- 2021-04-20 11:07:45.861 INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}
这样基本就完成了springboot整合activemq非常easy,并且容易上手,还整合相关的rpc进行查询与添加。
topic与队列的区别?
文章中涉及到队列与topic,但是有什么区别?
名称 | Topic | Queue |
概要 | Publish Subscribe messaging 发布订阅消息 | Point-to-Point 点对点 |
重复消费 | 可以重复消费 | 点对点模式所以不可重复消费 |
接收方式 | 无需主动请求,可由服务器推送 | 需要主动获取队列中的消息 |
有无状态 | topic数据默认不落地,是无状态的。 | Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。 |
完整性保障 | 并不保证publisher发布的每条数据,Subscriber都能接受到。 | Queue保证每条数据都能被receiver接收。 |
消息是否会丢失 | 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 | Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。 |
消息发布接收策略 | 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 | 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。
|
消息类型 | 是否持久化 | 是否有Durable订阅者 | 消费者延迟启动时,消息是否保留 | Broker重启时,消息是否保留 |
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |
参考文章:
https://blog.csdn.net/admin1973/article/details/60125938
压测
通过jmeter压测,发现大量堆积,虽然说跟http和mysql有一定关系,但是这个堆积量太大了..还有失败率达97%....
最后
虽然说activemq属于中间件的第一批鼻主来的,但是国内使用activemq作为商业的场景不是特别多,因为kafka和rocketmq、rabbitmq该有的功能都有,并且性能更高及支持的场景更多,当然没有好坏只是合不合适罢了,本文只是一个基础整合,后续有时间再深入。
参考文章:
https://www.cnblogs.com/jaycekon/p/6225058.html
https://activemq.apache.org/maven/apidocs/
https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-messaging
https://www.cnblogs.com/cyfonly/p/6380860.html
更多学习资料可关注以下公众号,获取作者个人微信沟通或获取。
系列文章:
spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)
spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)-续netty
spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(gRPC)
spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)
spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)
spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。