当前位置:   article > 正文

spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ)ActiveMQ

java 动态连接kafka、pulsar、activemq多种数据源

上文:spring整合中间件(RocketMQ、kafka、RabbitMQ)-RabbitMQ


环境相关先参照:ActiveMQ windows10 安装

activemq java实现简单收发

项目结构

  1. │ pom.xml
  2. └─src
  3.     ├─main
  4.     │ ├─java
  5.     │ │ └─com
  6.     │ │ └─hong
  7.     │ │ └─activemq
  8.     │ │ Consumer.java
  9.     │ │ Producer.java
  10.     │ │
  11.     │ └─resources
  12.     └─test
  13.         └─java

com.hong.activemq.Consumer

  1. package com.hong.activemq;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. /**
  5.  * @author: csh
  6.  * @Date: 2021/4/19 15:06
  7.  * @Description:消费者
  8.  */
  9. public class Consumer {
  10.     public static void main(String[] args) throws JMSException {
  11.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(Producer.URL);
  12.         Connection connection = connectionFactory.createConnection();
  13.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  14.         connection.start();
  15.         Queue destination = session.createQueue(Producer.queueName);
  16.         MessageConsumer consumer = session.createConsumer(destination);
  17.         //创建监听器
  18.         consumer.setMessageListener(new MessageListener() {
  19.             public void onMessage(Message message) {
  20.                 TextMessage textMessage = (TextMessage)message;
  21.             try {
  22.                     System.out.println("收到信息:"+textMessage.getText());
  23.                 } catch (JMSException e) {
  24.                     e.printStackTrace();
  25.                 }
  26.             }
  27.         });
  28.     }
  29. }
  1. package com.hong.activemq;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import javax.jms.*;
  4. /**
  5.  * @author: csh
  6.  * @Date: 2021/4/19 15:05
  7.  * @Description:active生产
  8.  */
  9. public class Producer {
  10.     //地址
  11.     public static final String URL = "tcp://localhost:61616";
  12.     //队列名称
  13.     public static final String queueName = "QUEUE_HONG";
  14.     public static void main(String args[]) throws JMSException {
  15.         //1.创建ConnectionFactory
  16.         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
  17.         //2.创建Connection
  18.         Connection connection = connectionFactory.createConnection();
  19.         //3.启动连接
  20.         connection.start();
  21.         //4.创建会话
  22.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  23.         //5.创建一个目标
  24.         Destination destination = session.createQueue(queueName);
  25.         //6.创建一个生产者
  26.         MessageProducer producer = session.createProducer(destination);
  27.         for (int i = 0 ;i < 10; i++){
  28.             //7.创建消息
  29.             TextMessage message = session.createTextMessage("hello I'm hong:"+i);
  30.             //8.发送消息
  31.             producer.send(message);
  32.             System.out.println("发送消息:"+i+"成功!");
  33.         }
  34.         //9.关闭连接
  35.         connection.close();
  36. }
  37. }

spring 整合 activemq

spring_activemq_producer 生产者 端口:8488

项目结构

  1. ├─src
  2. │ ├─main
  3. │ │ ├─java
  4. │ │ │ └─com
  5. │ │ │ └─hong
  6. │ │ │ └─spring
  7. │ │ │ ├─ao
  8. │ │ │ └─common
  9. │ │ └─resources
  10. │ └─test
  11. │ └─java
  12. ├─target
  13. │ ├─classes
  14. │ │ └─com
  15. │ │ └─hong
  16. │ │ └─spring
  17. │ │ ├─ao
  18. │ │ └─common
  19. │ └─generated-sources
  20. │ └─annotations
  21. └─web
  22.     └─WEB-INF

实现代码

com.hong.spring.common.MqUtils

  1. package com.hong.spring.common;
  2. import lombok.extern.log4j.Log4j2;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.jms.core.JmsTemplate;
  5. import org.springframework.jms.core.MessageCreator;
  6. import org.springframework.stereotype.Service;
  7. import javax.jms.Destination;
  8. import javax.jms.JMSException;
  9. import javax.jms.Message;
  10. import javax.jms.Session;
  11. /**
  12.  * @author: csh
  13.  * @Date: 2021/4/19 16:21
  14.  * @Description:mq工具类
  15.  */
  16. @Service
  17. @Log4j2
  18. public class MqUtils {
  19.     @Autowired
  20.     private JmsTemplate jmsTemplate;
  21.     /**
  22.      * 向指定Destination(队列)发送text消息
  23.      *
  24.      * @param destination
  25.      * @param message
  26.      */
  27.     public void sendTxtMessage(Destination destination, final String message) {
  28.         if (null == destination) {
  29.             destination = jmsTemplate.getDefaultDestination();
  30.         }
  31.         jmsTemplate.send(destination, new MessageCreator() {
  32.             public Message createMessage(Session session) throws JMSException {
  33.                 log.info("发送的消息{}",message);
  34.                return session.createTextMessage(message);
  35.             }
  36.         });
  37.     }
  38. }

com.hong.spring.UserController

  1. package com.hong.spring;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.hong.spring.ao.UserSaveAO;
  4. import com.hong.spring.common.MqUtils;
  5. import com.hong.spring.entity.User;
  6. import com.hong.spring.utils.DataResponse;
  7. import lombok.extern.log4j.Log4j2;
  8. import org.springframework.beans.BeanUtils;
  9. import org.springframework.beans.factory.annotation.Autowired;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import javax.jms.Destination;
  13. /**
  14.  * @Auther: csh
  15.  * @Date: 2020/8/18 16:11
  16.  * @Description:
  17.  */
  18. @RestController
  19. @RequestMapping("/user/")
  20. @Log4j2
  21. public class UserController {
  22.     @Autowired
  23.     private MqUtils mqUtils;
  24.     @Autowired
  25.     private Destination queueDestination;
  26.     @RequestMapping("save")
  27.     public DataResponse<Boolean> save(UserSaveAO ao){
  28.         log.info("添加用户入参{}",JSONObject.toJSONString(ao));
  29.         if(null==ao){
  30.             return DataResponse.BuildFailResponse("参数不能为空!");
  31.         }
  32.        try {
  33.            User user = new User();
  34.            BeanUtils.copyProperties(ao,user);
  35.            //发送
  36.             mqUtils.sendTxtMessage(queueDestination,JSONObject.toJSONString(user));
  37.            return DataResponse.BuildSuccessResponse("添加用户成功!");
  38.        }catch (Exception e){
  39.            log.error("添加出错{}",e);
  40.            return DataResponse.BuildFailResponse("添加出错请重试!");
  41.        }
  42.     }
  43. }

com.hong.spring.ao.UserSaveAO

  1. package com.hong.spring.ao;
  2. import lombok.Data;
  3. import java.io.Serializable;
  4. /**
  5.  * @author: csh
  6.  * @Date: 2021/3/16 11:21
  7.  * @Description:用户入参
  8.  */
  9. @Data
  10. public class UserSaveAO implements Serializable {
  11.     private Integer id;
  12.     private String username;
  13.     private Integer age;
  14. }

logging.properties

  1. org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
  2. org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
  3. handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
  4. ############################################################
  5. # Handler specific properties.
  6. # Describes specific configuration info for Handlers.
  7. ############################################################
  8. org.apache.juli.FileHandler.level = FINE
  9. org.apache.juli.FileHandler.directory = ../logs
  10. org.apache.juli.FileHandler.prefix = error-debug.
  11. java.util.logging.ConsoleHandler.level = FINE
  12. java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

log4j2.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration status="INFO">
  3.     <appenders>
  4.         <Console name="Console" target="SYSTEM_OUT">
  5.             <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
  6.         </Console>
  7.         <RollingFile name="RollingFile" fileName="logs/app.log"
  8.                      filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
  9.             <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
  10.             <SizeBasedTriggeringPolicy size="5 MB"/>
  11.         </RollingFile>
  12.     </appenders>
  13.     <loggers>
  14.         <root level="DEBUG">
  15.             <appender-ref ref="Console"/>
  16.             <appender-ref ref="RollingFile"/>
  17.         </root>
  18.     </loggers>
  19. </configuration>

applicationContext.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.       xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
  5.       xmlns:mvc="http://www.springframework.org/schema/mvc"
  6.       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  7.     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
  8.    <!-- 配置组件扫描 -->
  9.    <context:component-scan base-package="com.hong.spring"></context:component-scan>
  10.    <!--加载配置文件-->
  11.    <context:property-placeholder location="classpath:activemq.properties"/>
  12.    <!-- 开启注解 -->
  13.    <context:annotation-config />
  14.    <mvc:default-servlet-handler />
  15.    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
  16.         id="internalResourceViewResolver">
  17.       <!-- 前缀 -->
  18.       <property name="prefix" value="/WEB-INF/pages/" />
  19.       <!-- 后缀 -->
  20.       <property name="suffix" value=".html" />
  21.       <property name="contentType" value="text/html"/>
  22.    </bean>
  23.    <!--开启mvc注解事务-->
  24.    <!-- 定义注解驱动 -->
  25.    <mvc:annotation-driven>
  26.       <mvc:message-converters>
  27.          <!-- 设置支持中文 -->
  28.          <bean class="org.springframework.http.converter.StringHttpMessageConverter">
  29.             <property name="supportedMediaTypes">
  30.                <list>
  31.                   <value>text/plain;charset=UTF-8</value>
  32.                   <value>text/html;charset=UTF-8</value>
  33.                </list>
  34.             </property>
  35.          </bean>
  36.          <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
  37.       </mvc:message-converters>
  38.    </mvc:annotation-driven>
  39. </beans>

application.properties

  1. logging.level.root=WARN
  2. logging.level.org.springframework.web=DEBUG
  3. logging.level.org.hibernate=ERROR

activemq.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.       xmlns:context="http://www.springframework.org/schema/context"
  5.       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6.    <!--创建ActiveMQ为我们提供的连接工厂-->
  7.    <bean id = "targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  8.       <property name="brokerURL" value="${activemq.brokerURL}"></property>
  9.       <property name="userName" value="${activemq.username}" ></property>
  10.       <property name="password" value="${activemq.password}"></property>
  11.    </bean>
  12.    <!--创建spring提供的JMS连接池-->
  13.    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
  14.       <property name="targetConnectionFactory" ref="targetConnectionFactory"></property>
  15.    </bean>
  16.    <!--创建目的地,队列模式-->
  17.    <bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  18.         <!--多队列名称以逗号分割开-->
  19.         <constructor-arg name="name" value="${activemq.queueName}"></constructor-arg>
  20.    </bean>
  21.    <!--创建目的地,订阅模式-->
  22.    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
  23.       <constructor-arg value="topic_spring_hong"/>
  24.    </bean>
  25.     <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。-->
  26.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  27.         <property name="connectionFactory" ref="connectionFactory" />
  28.         <property name="defaultDestination" ref="queueDestination" />
  29.         <property name="receiveTimeout" value="10000" />
  30.         <!-- true是topic,false是queue,默认是false,此处显示写出false -->
  31.         <property name="pubSubDomain" value="false" />
  32.     </bean>
  33. </beans>

activemq.properties

  1. #地址
  2. activemq.brokerURL=tcp://127.0.0.1:61616
  3. activemq.username=admin
  4. activemq.password=admin
  5. #多队列名称以逗号分割开
  6. activemq.queueName=queue_spring_user

WEB-INF/web.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
  5.          version="3.1">
  6.     <servlet>
  7.         <servlet-name>spring_activemq_producer</servlet-name>
  8.         <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
  9.         <init-param>
  10.             <param-name>contextConfigLocation</param-name>
  11.             <param-value>classpath:applicationContext.xml,
  12.                 classpath:activemq.xml
  13.             </param-value>
  14.         </init-param>
  15.         <load-on-startup>1</load-on-startup>
  16.     </servlet>
  17.     <filter>
  18.         <filter-name>encodingFilter</filter-name>
  19.         <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
  20.         <init-param>
  21.             <param-name>encoding</param-name>
  22.             <param-value>UTF-8</param-value>
  23.         </init-param>
  24.         <init-param>
  25.             <param-name>forceEncoding</param-name>
  26.             <param-value>true</param-value>
  27.         </init-param>
  28.     </filter>
  29.     <filter-mapping>
  30.         <filter-name>encodingFilter</filter-name>
  31.         <url-pattern>/*</url-pattern>
  32.     </filter-mapping>
  33.     <servlet-mapping>
  34.         <servlet-name>spring_activemq_producer</servlet-name>
  35.         <url-pattern>/</url-pattern>
  36.     </servlet-mapping>
  37. </web-app>

请求接口:http://localhost:8488/user/save?username=spring_activemq&age=101

参数

  1. username:spring_activemq
  2. age:101

spring_activemq_consumer 消费者 端口:8483

项目结构

  1. │ pom.xml
  2. │ spring_activemq_consumer.iml
  3. ├─src
  4. │ ├─main
  5. │ │ ├─java
  6. │ │ │ └─com
  7. │ │ │ └─hong
  8. │ │ │ └─spring
  9. │ │ │ ├─dao
  10. │ │ │ │ UserMapper.java
  11. │ │ │ │
  12. │ │ │ ├─listener
  13. │ │ │ │ UserListener.java
  14. │ │ │ │
  15. │ │ │ ├─mapper
  16. │ │ │ │ UserMapper.xml
  17. │ │ │ │
  18. │ │ │ └─provider
  19. │ │ │ UserServiceImpl.java
  20. │ │ │
  21. │ │ └─resources
  22. │ │ activemq.properties
  23. │ │ activemq.xml
  24. │ │ application.properties
  25. │ │ applicationContext.xml
  26. │ │ jdbc.properties
  27. │ │ log4j2.xml
  28. │ │ logging.properties
  29. │ │ mybatis.xml
  30. │ │
  31. │ └─test
  32. │ └─java
  33. │ │ └─com
  34. │ │ └─hong
  35. │ │ └─spring
  36. │ │ ├─dao
  37. │ │ │ UserMapper.class
  38. │ │ │
  39. │ │ ├─listener
  40. │ │ │ UserListener.class
  41. │ │ │
  42. │ │ ├─mapper
  43. │ │ │ UserMapper.xml
  44. │ │ │
  45. │ │ └─provider
  46. │ │ UserServiceImpl.class
  47. │ │
  48. │ └─generated-sources
  49. │ └─annotations
  50. └─web
  51.     └─WEB-INF
  52.             web.xml

代码实现

com.hong.spring.dao.UserMapper

  1. package com.hong.spring.dao;
  2. import com.hong.spring.entity.User;
  3. import com.hong.spring.entity.ao.UserAO;
  4. import org.apache.ibatis.annotations.Param;
  5. import java.util.List;
  6. /**
  7.  * @Auther: csh
  8.  * @Date: 2020/8/18 15:04
  9.  * @Description:用户dao层
  10.  */
  11. public interface UserMapper {
  12.     /**
  13.      *
  14.      * 功能描述:查询总条数
  15.      *
  16.      * @param:
  17.      * @return:
  18.      * @auther: csh
  19.      * @date: 2020/8/18 15:31
  20.      */
  21.     List<User> findAllUserList();
  22.     /**
  23.      *
  24.      * 功能描述:获取总数
  25.      *
  26.      * @param:
  27.      * @return:
  28.      * @auther: csh
  29.      * @date: 2020/8/18 15:30
  30.      */
  31.     int findAllTotal();
  32.     /**
  33.      *
  34.      * 功能描述:更新
  35.      *
  36.      * @param:
  37.      * @return:
  38.      * @auther: csh
  39.      * @date: 2020/8/18 15:30
  40.      */
  41.     int update(User user);
  42.     /**
  43.      *
  44.      * 功能描述:添加
  45.      *
  46.      * @param:
  47.      * @return:
  48.      * @auther: csh
  49.      * @date: 2020/8/19 18:39
  50.      */
  51.     int save(User user);
  52.     /**
  53.      *
  54.      * 功能描述:批量添加
  55.      *
  56.      * @param:
  57.      * @return:
  58.      * @auther: csh
  59.      * @date: 2020/8/21 15:46
  60.      */
  61.     int insertBatch(@Param("list") List <User> list);
  62.     /**
  63.      *
  64.      * 功能描述:通过id查询
  65.      *
  66.      * @param:
  67.      * @return:
  68.      * @auther: csh
  69.      * @date: 2020/8/19 18:39
  70.      */
  71.     User findById(int id);
  72.     /**
  73.      *
  74.      * 功能描述:通过分页查询
  75.      *
  76.      * @param:
  77.      * @return:
  78.      * @auther: csh
  79.      * @date: 2020/8/21 16:05
  80.      */
  81.     List<User> findByPage(UserAO ao);
  82. }

com.hong.spring.listener.UserListener

  1. package com.hong.spring.listener;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.hong.spring.api.IUserService;
  4. import com.hong.spring.entity.User;
  5. import lombok.extern.log4j.Log4j2;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.jms.annotation.EnableJms;
  8. import org.springframework.jms.annotation.JmsListener;
  9. import org.springframework.stereotype.Component;
  10. import org.springframework.stereotype.Service;
  11. import javax.jms.Message;
  12. import javax.jms.MessageListener;
  13. import javax.jms.TextMessage;
  14. /**
  15.  * @author: csh
  16.  * @Date: 2021/3/16 11:14
  17.  * @Description:用户监听
  18.  */
  19. @Log4j2
  20. @Service("userListener")
  21. public class UserListener implements MessageListener {
  22.     @Autowired
  23.     private IUserService userService;
  24.     @Override
  25.     public void onMessage(Message message) {
  26.         try {
  27.             log.info("传进来的数据为{}",JSONObject.toJSONString(message));
  28.             if(null!=message){
  29.                 TextMessage tm = (TextMessage) message;
  30.                 String text = tm.getText();
  31.                 log.info("获取到的文本内容:"+text);
  32.                 User user = JSONObject.parseObject(text, User.class);
  33.                 userService.save(user);
  34.             }
  35.         }catch (Exception e){
  36.             log.error("处理出错{}",e);
  37.         }
  38.     }
  39. }

src/main/java/com/hong/spring/mapper/UserMapper.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE mapper
  3.         PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
  4.         "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
  5. <mapper namespace="com.hong.spring.dao.UserMapper">
  6.     <resultMap type="com.hong.spring.entity.User" id="user">
  7.         <id column="id" property="id" />
  8.         <result column="user_name" property="username" />
  9.         <result column="age" property="age" />
  10.     </resultMap>
  11.     <select id="findById" resultType="com.hong.spring.entity.User">
  12.       SELECT * FROM user WHERE id = #{id,jdbcType=INTEGER}
  13.     </select>
  14.     <select id="findByPage" resultMap="user" parameterType="com.hong.spring.entity.ao.UserAO">
  15.         select * from user where 1=1 limit #{page},#{pageSize}
  16.     </select>
  17.     <select id="findAllUserList" resultMap="user">
  18.       SELECT * FROM user
  19.     </select>
  20.     <select id="findAllTotal" resultType="int">
  21.       SELECT count(*) FROM user
  22.     </select>
  23.     <insert id="save" >
  24.          INSERT INTO user ( user_name, age)
  25.         VALUES (#{username,jdbcType=VARCHAR},
  26.         #{age,jdbcType=INTEGER})
  27.     </insert>
  28.     <insert id="insertBatch">
  29.         insert into user
  30.         ( user_name, age)
  31.         values
  32.         <foreach collection="list" item="user" index="index"
  33.                  separator=",">
  34.             (#{user.username,jdbcType=VARCHAR},#{user.age,jdbcType=INTEGER})
  35.         </foreach>
  36.     </insert>
  37.     <update id="update" >
  38.         update user
  39.         <set>
  40.             <if test="username !=null">
  41.                 user_name=#{username,jdbcType=VARCHAR},
  42.             </if>
  43.             <if test="age !=null">
  44.                 age =#{age,jdbcType=INTEGER}
  45.             </if>
  46.         </set>
  47.         where id = #{id,jdbcType=INTEGER}
  48.     </update>
  49. </mapper>

com.hong.spring.provider.UserServiceImpl

  1. package com.hong.spring.provider;
  2. import com.hong.spring.api.IUserService;
  3. import com.hong.spring.dao.UserMapper;
  4. import com.hong.spring.entity.User;
  5. import com.hong.spring.entity.ao.UserAO;
  6. import com.hong.spring.utils.DataResponse;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Service;
  9. import org.springframework.transaction.annotation.Transactional;
  10. import java.util.List;
  11. /**
  12.  * @Auther: csh
  13.  * @Date: 2020/8/18 15:16
  14.  * @Description:用户实现
  15.  */
  16. @Service("userService")
  17. public class UserServiceImpl implements IUserService {
  18.     @Autowired
  19.     private UserMapper userDao;
  20.     @Override
  21.     public DataResponse<List<User>> findByAll() {
  22.         List <User> allUserList = userDao.findAllUserList();
  23.         int allTotal = userDao.findAllTotal();
  24.         return DataResponse.BuildSuccessResponse(allUserList,allTotal);
  25.     }
  26.     @Override
  27.     @Transactional
  28.     public DataResponse <Boolean> save(User user) {
  29.         if(null==user){
  30.             return DataResponse.BuildFailResponse("必传参数不能为空!");
  31.         }
  32.         int save = userDao.save(user);
  33.         return DataResponse.BuildSuccessResponse(save>0?true:false);
  34.     }
  35.     @Override
  36.     public DataResponse <Boolean> insertBatch(List <User> list) {
  37.         if(null==list){
  38.             return DataResponse.BuildFailResponse("参数不能为空!");
  39.         }
  40.         int batchSave = userDao.insertBatch(list);
  41.         return DataResponse.BuildSuccessResponse(batchSave>0?true:false);
  42.     }
  43.     @Override
  44.     @Transactional
  45.     public DataResponse <Boolean> update(User user) {
  46.         if(null==user || user.getId()==null){
  47.             return DataResponse.BuildFailResponse("必传参数不能为空!");
  48.         }
  49.         int update = userDao.update(user);
  50.         return DataResponse.BuildSuccessResponse(update>0?true:false);
  51.     }
  52.     @Override
  53.     public DataResponse <User> findById(int i) {
  54.         User byId = userDao.findById(i);
  55.         return DataResponse.BuildSuccessResponse(byId);
  56.     }
  57.     @Override
  58.     public DataResponse <List <User>> findByPage(UserAO ao) {
  59.         if(ao==null){
  60.             ao.setPage(0);
  61.             ao.setPageSize(10);
  62.         }else{
  63.             ao.setPage(ao.getPageSize() * ao.getPage());
  64.         }
  65.         int allTotal = userDao.findAllTotal();
  66.         List <User> byPage = userDao.findByPage(ao);
  67.         return DataResponse.BuildSuccessResponse(byPage,allTotal);
  68.     }
  69. }

src/main/resources/activemq.properties

  1. #地址
  2. activemq.brokerURL=tcp://127.0.0.1:61616
  3. activemq.username=admin
  4. activemq.password=admin
  5. #多队列名称以逗号分割开
  6. activemq.queueName=queue_spring_user

src/main/resources/activemq.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.       xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.alibaba.com/schema/stat"
  5.       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.alibaba.com/schema/stat http://www.alibaba.com/schema/stat.xsd">
  6.    <jms:annotation-driven/>
  7.    <!--创建ActiveMQ为我们提供的连接工厂-->
  8.    <bean id = "consumerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
  9.       <property name="brokerURL" value="${activemq.brokerURL}"></property>
  10.       <property name="userName" value="${activemq.username}" ></property>
  11.       <property name="password" value="${activemq.password}"></property>
  12.    </bean>
  13.    <!-- 配置JMS连接工长 -->
  14.    <bean id="mqConnectionFactory"
  15.         class="org.springframework.jms.connection.CachingConnectionFactory">
  16.       <constructor-arg ref="consumerConnectionFactory" />
  17.       <property name="sessionCacheSize" value="100" />
  18.    </bean>
  19.    <!--创建目的地,队列模式-->
  20.    <bean id = "queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
  21.         <!--多队列名称以逗号分割开-->
  22.         <constructor-arg name="name" value="${activemq.queueName}"></constructor-arg>
  23.    </bean>
  24.    <!--创建目的地,订阅模式-->
  25.    <!--<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">-->
  26.       <!--<constructor-arg value="topic_spring_hong"/>-->
  27.    <!--</bean>-->
  28.     <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。-->
  29.     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
  30.         <property name="connectionFactory" ref="mqConnectionFactory" />
  31.         <property name="defaultDestination" ref="queueDestination" />
  32.         <property name="receiveTimeout" value="10000" />
  33.         <!-- true是topic,false是queue,默认是false,此处显示写出false -->
  34.         <property name="pubSubDomain" value="false" />
  35.     </bean>
  36.    <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
  37.    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
  38.       <property name="connectionFactory" ref="mqConnectionFactory" />
  39.       <property name="destination" ref="queueDestination" />
  40.       <property name="messageListener" ref="userListener" />
  41.    </bean>
  42. </beans>

src/main/resources/application.properties

  1. logging.level.root=WARN
  2. logging.level.org.springframework.web=DEBUG
  3. logging.level.org.hibernate=ERROR

src/main/resources/applicationContext.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.       xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx"
  5.       xmlns:mvc="http://www.springframework.org/schema/mvc"
  6.       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
  7.     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd">
  8.    <!-- 配置组件扫描 -->
  9.    <context:component-scan base-package="com.hong.spring"></context:component-scan>
  10.    <!--加载配置文件-->
  11.    <context:property-placeholder location="classpath:jdbc.properties,classpath:activemq.properties"/>
  12.    <!-- 开启注解 -->
  13.    <context:annotation-config />
  14.    <!--开启注解事务-->
  15.    <tx:annotation-driven transaction-manager="transactionManager" />
  16.    <!--放行静态资源-->
  17.    <mvc:default-servlet-handler />
  18.    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver"
  19.         id="internalResourceViewResolver">
  20.       <!-- 前缀 -->
  21.       <property name="prefix" value="/WEB-INF/pages/" />
  22.       <!-- 后缀 -->
  23.       <property name="suffix" value=".html" />
  24.       <property name="contentType" value="text/html"/>
  25.    </bean>
  26.    <!--开启mvc注解事务-->
  27.    <!-- 定义注解驱动 -->
  28.    <mvc:annotation-driven>
  29.       <mvc:message-converters>
  30.          <!-- 设置支持中文 -->
  31.          <bean class="org.springframework.http.converter.StringHttpMessageConverter">
  32.             <property name="supportedMediaTypes">
  33.                <list>
  34.                   <value>text/plain;charset=UTF-8</value>
  35.                   <value>text/html;charset=UTF-8</value>
  36.                </list>
  37.             </property>
  38.          </bean>
  39.          <bean class="com.alibaba.fastjson.support.spring.FastJsonHttpMessageConverter"/>
  40.       </mvc:message-converters>
  41.    </mvc:annotation-driven>
  42.    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
  43.       <!-- 基础配置 -->
  44.       <property name="url" value="${jdbc.url}"></property>
  45.       <property name="driverClassName" value="${jdbc.driver}"></property>
  46.       <property name="username" value="${jdbc.user}"></property>
  47.       <property name="password" value="${jdbc.password}"></property>
  48.       <!-- 关键配置 -->
  49.       <!-- 初始化时建立物理连接的个数。初始化发生在显示调用init方法,或者第一次getConnection时 -->
  50.       <property name="initialSize" value="3" />
  51.       <!-- 最小连接池数量 -->
  52.       <property name="minIdle" value="2" />
  53.       <!-- 最大连接池数量 -->
  54.       <property name="maxActive" value="15" />
  55.       <!-- 配置获取连接等待超时的时间 -->
  56.       <property name="maxWait" value="10000" />
  57.       <!-- 性能配置 -->
  58.       <!-- 打开PSCache,并且指定每个连接上PSCache的大小 -->
  59.       <property name="poolPreparedStatements" value="true" />
  60.       <property name="maxPoolPreparedStatementPerConnectionSize" value="20" />
  61.       <!-- 其他配置 -->
  62.       <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
  63.       <property name="timeBetweenEvictionRunsMillis" value="60000" />
  64.       <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
  65.       <property name="minEvictableIdleTimeMillis" value="300000" />
  66.       <!-- 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,
  67.                   执行validationQuery检测连接是否有效。-->
  68.       <property name="testWhileIdle" value="true" />
  69.       <!-- 这里建议配置为TRUE,防止取到的连接不可用 ,申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。-->
  70.       <property name="testOnBorrow" value="true" />
  71.       <!-- 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能 -->
  72.       <property name="testOnReturn" value="false" />
  73.    </bean>
  74.    <!--事务管理器-->
  75.    <!-- sqlSessionFactory -->
  76.    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
  77.       <!-- 加载 MyBatis 的配置文件 -->
  78.       <property name="configLocation" value="classpath:mybatis.xml"/>
  79.       <!-- 数据源 -->
  80.       <property name="dataSource" ref="dataSource"/>
  81.       <!-- 所有配置的mapper文件 -->
  82.       <property name="mapperLocations" value="classpath*:com/hong/spring/mapper/*.xml" />
  83.    </bean>
  84.    <!-- Mapper 扫描器 -->
  85.    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
  86.       <!-- 扫描 包下的组件 -->
  87.       <property name="basePackage" value="com.hong.spring.dao" />
  88.       <!-- 关联mapper扫描器 与 sqlsession管理器 -->
  89.       <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
  90.    </bean>
  91.    <!--事务配置-->
  92.    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
  93.       <property name="dataSource" ref="dataSource" />
  94.    </bean>
  95. </beans>

src/main/resources/jdbc.properties

  1. config.properties:
  2. #数据库驱动
  3. jdbc.driver=com.mysql.jdbc.Driver
  4. #数据库连接url
  5. jdbc.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8
  6. #数据库用户名
  7. jdbc.user=root
  8. #数据库密码
  9. jdbc.password=123456

src/main/resources/log4j2.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <configuration status="INFO">
  3.     <appenders>
  4.         <Console name="Console" target="SYSTEM_OUT">
  5.             <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
  6.         </Console>
  7.         <RollingFile name="RollingFile" fileName="logs/app.log"
  8.                      filePattern="logs/$${date:yyyy-MM}/app-%d{MM-dd-yyyy}-%i.log.gz">
  9.             <PatternLayout pattern="%d{yyyy.MM.dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
  10.             <SizeBasedTriggeringPolicy size="5 MB"/>
  11.         </RollingFile>
  12.     </appenders>
  13.     <loggers>
  14.         <root level="DEBUG">
  15.             <appender-ref ref="Console"/>
  16.             <appender-ref ref="RollingFile"/>
  17.         </root>
  18.     </loggers>
  19. </configuration>

src/main/resources/logging.properties

  1. org.apache.catalina.core.ContainerBase.[Catalina].level=INFO
  2. org.apache.catalina.core.ContainerBase.[Catalina].handlers=java.util.logging.ConsoleHandler
  3. org.apache.jasper.servlet.TldScanner.level = FINE
  4. handlers = org.apache.juli.FileHandler, java.util.logging.ConsoleHandler
  5. ############################################################
  6. # Handler specific properties.
  7. # Describes specific configuration info for Handlers.
  8. ############################################################
  9. org.apache.juli.FileHandler.level = FINE
  10. org.apache.juli.FileHandler.directory = ../logs
  11. org.apache.juli.FileHandler.prefix = error-debug.
  12. java.util.logging.ConsoleHandler.level = FINE
  13. java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter

src/main/resources/mybatis.xml

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <!DOCTYPE configuration
  3.         PUBLIC "-//mybatis.org//DTD Config 3.0//EN"
  4.         "http://mybatis.org/dtd/mybatis-3-config.dtd">
  5. <configuration>
  6.     <!-- settings -->
  7.     <settings>
  8.         <!-- 打开延迟加载的开关 -->
  9.         <setting name="lazyLoadingEnabled" value="true"/>
  10.         <!-- 将积极加载改为消极加载(即按需加载) -->
  11.         <setting name="aggressiveLazyLoading" value="false"/>
  12.         <!-- 打开全局缓存开关(二级缓存)默认值就是 true -->
  13.         <setting name="cacheEnabled" value="true"/>
  14.         <!-- 开启驼峰命名转换 Table(create_time) -> Entity(createtime) -->
  15.         <setting name="mapUnderscoreToCamelCase" value="true"/>
  16.         <!-- 使用列别名代替列名 默认:true seslect name as title from table -->
  17.         <setting name="useColumnLabel" value="true"/>
  18.         <!--使用jdbc的getGeneratedKeys获取数据库自增主键值-->
  19.         <setting name="useGeneratedKeys" value="true"/>
  20.     </settings>
  21.     <!-- 别名定义 -->
  22.     <typeAliases>
  23.         <package name="com.hong.spring.entity"/>
  24.     </typeAliases>
  25. </configuration>

web/WEB-INF/web.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <web-app xmlns="http://xmlns.jcp.org/xml/ns/javaee"
  3.          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4.          xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd"
  5.          version="3.1">
  6.     <servlet>
  7.         <servlet-name>spring_activemq_consumer</servlet-name>
  8.         <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
  9.         <init-param>
  10.             <param-name>contextConfigLocation</param-name>
  11.             <param-value>classpath:applicationContext.xml,
  12.                 classpath:activemq.xml
  13.             </param-value>
  14.         </init-param>
  15.         <load-on-startup>1</load-on-startup>
  16.     </servlet>
  17.     <filter>
  18.         <filter-name>encodingFilter</filter-name>
  19.         <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
  20.         <init-param>
  21.             <param-name>encoding</param-name>
  22.             <param-value>UTF-8</param-value>
  23.         </init-param>
  24.         <init-param>
  25.             <param-name>forceEncoding</param-name>
  26.             <param-value>true</param-value>
  27.         </init-param>
  28.     </filter>
  29.     <filter-mapping>
  30.         <filter-name>encodingFilter</filter-name>
  31.         <url-pattern>/*</url-pattern>
  32.     </filter-mapping>
  33.     <servlet-mapping>
  34.         <servlet-name>spring_activemq_consumer</servlet-name>
  35.         <url-pattern>/</url-pattern>
  36.     </servlet-mapping>
  37. </web-app>

tomcat配置及启动消费

  1. [2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Artifact is deployed successfully
  2. [2021-04-19 05:11:09,841] Artifact spring_activemq_consumer:war exploded: Deploy took 10,965 milliseconds
  3. 17:11:09.982 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821878763,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAJ3siYWdlIjoxMCwidXNlcm5hbWUiOiJzcHJpbmdfYWN0aXZlbXEifQ==","length":43,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821878761,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":92,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50761-1618821878478-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1067,"text":"{\"age\":10,\"username\":\"spring_activemq\"}","timestamp":1618821878761,"wireFormatInfo":false}
  4.  INFO | {dataSource-1} inited
  5. 17:11:11.731 [queueListenerContainer-1] INFO com.hong.spring.listener.UserListener - 传进来的数据为{"advisory":false,"allPropertyNames":["JMSPriority","JMSType","JMSXGroupID","JMSReplyTo","JMSXDeliveryCount","JMSExpiration","JMSRedelivered","JMSTimestamp","JMSXGroupSeq","JMSCorrelationID","JMSDeliveryMode"],"arrival":0,"brokerInTime":1618821970677,"brokerInfo":false,"brokerOutTime":1618823469852,"commandId":5,"compressed":false,"connectionControl":false,"consumerControl":false,"content":{"data":"AAAAKHsiYWdlIjoxMDEsInVzZXJuYW1lIjoic3ByaW5nX2FjdGl2ZW1xIn0=","length":44,"offset":0},"contentMarshalled":true,"dataStructureType":28,"destination":{"composite":false,"dLQ":false,"dataStructureType":100,"destinationType":1,"destinationTypeAsString":"Queue","marshallAware":false,"physicalName":"queue_spring_user","properties":{"physicalName":"queue_spring_user"},"qualifiedName":"queue://queue_spring_user","queue":true,"queueName":"queue_spring_user","reference":{"all":[{"content":"queue_spring_user","type":"physicalName"}],"className":"org.apache.activemq.command.ActiveMQQueue","factoryClassName":"org.apache.activemq.jndi.JNDIReferenceFactory"},"temporary":false,"topic":false},"droppable":false,"dropped":false,"expiration":0,"expired":false,"groupSequence":0,"inTransaction":false,"jMSDeliveryMode":2,"jMSDestination":{"$ref":"$.destination"},"jMSExpiration":0,"jMSMessageID":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1:1:1:1","jMSPriority":4,"jMSRedelivered":false,"jMSTimestamp":1618821970675,"jMSXGroupFirstForConsumer":false,"jMSXMimeType":"jms/text-message","marshallAware":true,"marshalled":true,"message":{"$ref":"@"},"messageAck":false,"messageDispatch":false,"messageDispatchNotification":false,"messageHardRef":{"$ref":"@"},"messageId":{"brokerSequenceId":97,"dataStructureType":110,"marshallAware":false,"producerId":{"connectionId":"ID:DESKTOP-1VMHJGQ-50907-1618821970418-1:1","dataStructureType":123,"marshallAware":false,"sessionId":1,"value":1},"producerSequenceId":1},"persistent":true,"priority":4,"producerId":{"$ref":"$.messageId.producerId"},"properties":{},"propertyNames":[],"readOnlyBody":true,"readOnlyProperties":true,"redelivered":false,"redeliveryCounter":0,"response":false,"responseRequired":true,"shutdownInfo":false,"size":1068,"text":"{\"age\":101,\"username\":\"spring_activemq\"}","timestamp":1618821970675,"wireFormatInfo":false}

相对来说spring 整合activemq非常简单,而且这个activemq相关的接口文档也很easy。

springboot 整合 activemq

启动前先启动zk


实现一个逻辑,通过api请求,然后请求producer如果是get请求则直接查mybatis,如果是post请求则直接发送activemq给consumer消费端,当消费消息的时候i%==0则为队列添加,如果是i%!=0则进行topic添加。

springboot_activemq_api 端口:8386

  1. │ pom.xml
  2. └─src
  3.     └─main
  4.         ├─java
  5.         │ └─com
  6.         │ └─hong
  7.         │ └─springboot
  8.         │ │ Application.java
  9.         │ │
  10.         │ └─controller
  11.         │ IndexController.java
  12.         │ UserController.java
  13.         │
  14.         └─resources
  15.                 application.properties

com.hong.springboot.controller.IndexController

  1. package com.hong.springboot.controller;
  2. import org.springframework.web.bind.annotation.RequestMapping;
  3. import org.springframework.web.bind.annotation.RestController;
  4. /**
  5.  * @author: csh
  6.  * @Date: 2021/1/12 10:16
  7.  * @Description:首页
  8.  */
  9. @RestController
  10. public class IndexController {
  11.     @RequestMapping("/")
  12.     public String index(){
  13.         return "成功!";
  14.     }
  15. }

com.hong.springboot.controller.UserController

  1. package com.hong.springboot.controller;
  2. import com.alibaba.dubbo.common.utils.StringUtils;
  3. import com.alibaba.dubbo.config.annotation.Reference;
  4. import com.hong.springboot.api.IUserService;
  5. import com.hong.springboot.entity.User;
  6. import com.hong.springboot.utils.DataResponse;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.web.bind.annotation.GetMapping;
  9. import org.springframework.web.bind.annotation.PostMapping;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RestController;
  12. import java.util.List;
  13. /**
  14.  * @Auther: csh
  15.  * @Date: 2020/8/18 16:11
  16.  * @Description:用户
  17.  */
  18. @RestController
  19. @Slf4j
  20. @RequestMapping("/user")
  21. public class UserController {
  22.     @Reference
  23.     private IUserService userService;
  24.     @GetMapping("/findByAll")
  25.     public DataResponse<List<User>> findByAll(){
  26.         try {
  27.             return userService.findByAll();
  28.         } catch (Exception e){
  29.             log.error("查询出错{}",e);
  30.         }
  31.         return DataResponse.BuildFailResponse("查询出错!");
  32.     }
  33.     @PostMapping("/save")
  34.     public DataResponse<Boolean> save(User ao){
  35.         if(null==ao || ao.getAge()==null || StringUtils.isBlank(ao.getUsername())){
  36.             return DataResponse.BuildFailResponse("参数不能为空!");
  37.         }
  38.         DataResponse <Boolean> save = userService.save(ao);
  39.         return save;
  40.     }
  41. }

com.hong.springboot.Application

  1. package com.hong.springboot;
  2. import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. /**
  6.  * @author: csh
  7.  * @Date: 2020/11/21 11:37
  8.  * @Description:springboot dubbo消费端
  9.  */
  10. @SpringBootApplication(scanBasePackages = "com.hong.springboot")
  11. @EnableDubbo
  12. public class Application {
  13.     public static void main(String[] args) {
  14.         SpringApplication.run(Application.class);
  15.     }
  16. }

application.properties

  1. #dubbo configuration
  2. #服务名称
  3. dubbo.application.name=springboot_dubbo_consumer
  4. #注册中心协议
  5. dubbo.registry.protocol=zookeeper
  6. #注册地址
  7. dubbo.registry.address=zookeeper://127.0.0.1:2181
  8. #扫描注解包通过该设置将服务注册到zookeeper
  9. dubbo.protocol.scan=com.hong.springboot.api
  10. #注册端口
  11. dubbo.protocol.port=20880
  12. #协议名称
  13. dubbo.protocol.name=dubbo
  14. #扫包
  15. dubbo.scan.basePackages=com.hong.springboot.controller
  16. #避免端口冲突
  17. server.port=8386

springboot_all/springboot_activemq_api/pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <parent>
  5.         <groupId>com.hong.springboot</groupId>
  6.         <artifactId>springboot_all</artifactId>
  7.         <version>0.0.1-SNAPSHOT</version>
  8.         <relativePath/>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <groupId>com.hong.springboot</groupId>
  12.     <artifactId>springboot_activemq_api</artifactId>
  13.     <properties>
  14.         <java.version>1.8</java.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>com.hong.springboot</groupId>
  19.             <artifactId>springboot_mq_api</artifactId>
  20.             <version>1.0.0-SNAPSHOT</version>
  21.         </dependency>
  22.         <dependency>
  23.             <groupId>org.springframework.boot</groupId>
  24.             <artifactId>spring-boot-starter-web</artifactId>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.springframework.boot</groupId>
  28.             <artifactId>spring-boot-starter-test</artifactId>
  29.             <scope>test</scope>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>com.alibaba.boot</groupId>
  33.             <artifactId>dubbo-spring-boot-starter</artifactId>
  34.             <version>0.2.0</version>
  35.         </dependency>
  36.         <dependency>
  37.         <groupId>org.apache.zookeeper</groupId>
  38.         <artifactId>zookeeper</artifactId>
  39.         <version>3.5.4-beta</version>
  40.             <exclusions>
  41.                 <exclusion>
  42.                     <artifactId>slf4j-api</artifactId>
  43.                     <groupId>org.slf4j</groupId>
  44.                 </exclusion>
  45.                 <exclusion>
  46.                     <artifactId>slf4j-log4j12</artifactId>
  47.                     <groupId>org.slf4j</groupId>
  48.                 </exclusion>
  49.             </exclusions>
  50.         </dependency>
  51.     </dependencies>
  52.     <!--静态资源导出问题-->
  53.     <build>
  54.         <plugins>
  55.             <plugin>
  56.                 <groupId>org.springframework.boot</groupId>
  57.                 <artifactId>spring-boot-maven-plugin</artifactId>
  58.             </plugin>
  59.         </plugins>
  60.         <resources>
  61.             <resource>
  62.                 <directory>src/main/java</directory>
  63.                 <includes>
  64.                     <include>**/*.properties</include>
  65.                     <include>**/*.xml</include>
  66.                 </includes>
  67.                 <filtering>false</filtering>
  68.             </resource>
  69.             <resource>
  70.                 <directory>src/main/resources</directory>
  71.                 <includes>
  72.                     <include>**/*.properties</include>
  73.                     <include>**/*.xml</include>
  74.                 </includes>
  75.                 <filtering>false</filtering>
  76.             </resource>
  77.         </resources>
  78.     </build>
  79. </project>

springboot_activemq_producer 端口:8387

com.hong.springboot.config.ActiveMQConfig

  1. package com.hong.springboot.config;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import org.apache.activemq.command.ActiveMQQueue;
  4. import org.apache.activemq.command.ActiveMQTopic;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  9. import org.springframework.jms.config.JmsListenerContainerFactory;
  10. import javax.jms.Queue;
  11. import javax.jms.Topic;
  12. @Configuration
  13. public class ActiveMQConfig {
  14.     @Value("${spring.activemq.queue}")
  15.     private String queueName;
  16.     @Value("${spring.activemq.topic}")
  17.     private String topicName;
  18.     @Value("${spring.activemq.user}")
  19.     private String usrName;
  20.     @Value("${spring.activemq.password}")
  21.     private  String password;
  22.     @Value("${spring.activemq.broker-url}")
  23.     private  String brokerUrl;
  24.     @Bean
  25.     public Queue queue(){
  26.         return new ActiveMQQueue(queueName);
  27.     }
  28.     @Bean
  29.     public Topic topic(){
  30.         return new ActiveMQTopic(topicName);
  31.     }
  32.     @Bean
  33.     public ActiveMQConnectionFactory connectionFactory() {
  34.         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
  35.     }
  36.     @Bean
  37.     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
  38.         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  39.         bean.setConnectionFactory(connectionFactory);
  40.         return bean;
  41.     }
  42.     @Bean
  43.     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
  44.         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  45.         //设置为发布订阅方式, 默认情况下使用的生产消费者方式
  46.         bean.setPubSubDomain(true);
  47.         bean.setConnectionFactory(connectionFactory);
  48.         return bean;
  49.     }
  50. }

com.hong.springboot.config.DruidConfig

  1. package com.hong.springboot.config;
  2. import com.alibaba.druid.pool.DruidDataSource;
  3. import org.springframework.boot.context.properties.ConfigurationProperties;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import javax.sql.DataSource;
  7. /**
  8.  * @author: csh
  9.  * @Date: 2021/1/8 18:08
  10.  * @Description:数据源配置
  11.  */
  12. @Configuration
  13. public class DruidConfig {
  14.     @Bean
  15.     @ConfigurationProperties(prefix = "spring.datasource")
  16.     public DataSource dataSource(){
  17.         return new DruidDataSource();
  18.     }
  19. }

com.hong.springboot.dao.UserMapper

  1. package com.hong.springboot.dao;
  2. import com.hong.springboot.entity.User;
  3. import org.apache.ibatis.annotations.Insert;
  4. import org.apache.ibatis.annotations.Select;
  5. import java.util.List;
  6. /**
  7.  * @Auther: csh
  8.  * @Date: 2020/8/18 15:04
  9.  * @Description:用户dao层
  10.  */
  11. public interface UserMapper {
  12.     @Select("select id,user_name,age from user")
  13.     List<User> findAllUser();
  14.     @Insert("insert into user (user_name,age) values(#{username},#{age})")
  15.     int insert(User user);
  16. }

com.hong.springboot.provider.UserServiceImpl

  1. package com.hong.springboot.provider;
  2. import com.alibaba.druid.util.StringUtils;
  3. import com.alibaba.dubbo.config.annotation.Service;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.hong.springboot.api.IUserService;
  6. import com.hong.springboot.dao.UserMapper;
  7. import com.hong.springboot.entity.User;
  8. import com.hong.springboot.utils.DataResponse;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.beans.factory.annotation.Autowired;
  11. import org.springframework.jms.core.JmsMessagingTemplate;
  12. import org.springframework.transaction.annotation.Transactional;
  13. import javax.jms.Queue;
  14. import javax.jms.Topic;
  15. import java.util.List;
  16. /**
  17.  * @Auther: csh
  18.  * @Date: 2020/8/18 15:16
  19.  * @Description:用户实现
  20.  */
  21. @Service(interfaceClass = IUserService.class,timeout = 6000)
  22. @Slf4j
  23. public class UserServiceImpl implements IUserService {
  24.     @Autowired
  25.     private UserMapper userDao;
  26.     @Autowired
  27.     private JmsMessagingTemplate jmsMessagingTemplate;
  28.     @Autowired
  29.     private Queue queue;
  30.     @Autowired
  31.     private Topic topic;
  32.     @Override
  33.     public DataResponse<List<User>> findByAll() {
  34.         List <User> allUserList = userDao.findAllUser();
  35.         return DataResponse.BuildSuccessResponse(allUserList,allUserList.size());
  36.     }
  37.     //随机用
  38.     private static int i = 0;
  39.     @Override
  40.     public DataResponse <Boolean> save(User userAO) {
  41.         i++;
  42.         log.info("需要activemq添加的用户信息{}",JSONObject.toJSONString(userAO));
  43.         try {
  44.             //当==0时则用队列发送,当不等于的时候用topic发送。
  45.             if(i%2==0){
  46.                 jmsMessagingTemplate.convertAndSend(queue,JSONObject.toJSONString(userAO));
  47.             }else{
  48.                 jmsMessagingTemplate.convertAndSend(topic,JSONObject.toJSONString(userAO));
  49.             }
  50.         }catch (Exception e){
  51.             e.printStackTrace();
  52.             return DataResponse.BuildSuccessResponse(false);
  53.         }
  54.         return DataResponse.BuildSuccessResponse(true);
  55.     }
  56.     @Transactional
  57.     @Override
  58.     public DataResponse <Boolean> reallySave(User user) {
  59.         log.info("要添加的用户信息{}",JSONObject.toJSONString(user));
  60.         if(null==user || user.getAge()==null || StringUtils.isEmpty(user.getUsername())){
  61.             return DataResponse.BuildFailResponse("参数不能为空!");
  62.         }
  63.         int insert = userDao.insert(user);
  64.         return insert>0?DataResponse.BuildSuccessResponse(true):DataResponse.BuildFailResponse("失败",false);
  65.     }
  66. }

com.hong.springboot.Application

  1. package com.hong.springboot;
  2. import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
  3. import org.mybatis.spring.annotation.MapperScan;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. /**
  7.  * @author: csh
  8.  * @Date: 2020/11/21 11:37
  9.  * @Description:
  10.  */
  11. @SpringBootApplication(scanBasePackages = "com.hong.springboot")
  12. @MapperScan("com.hong.springboot.dao")
  13. @EnableDubbo
  14. public class Application  {
  15.     public static void main(String[] args) {
  16.         SpringApplication.run(Application.class);
  17.     }
  18. }

application.properties

  1. rocketmq.name-server=localhost:9876
  2. rocketmq.producer.group=hong_group
  3. rocketmq.producer.sendMessageTimeout=300000
  4. #dubbo configuration
  5. #服务名称
  6. dubbo.application.name=springboot_rabbitmq_producer
  7. dubbo.registry.protocol=zookeeper
  8. #注册地址
  9. dubbo.registry.address=zookeeper://127.0.0.1:2181
  10. #扫描注解包通过该设置将服务注册到zookeeper
  11. dubbo.protocol.scan=com.hong.springboot.api
  12. #注册端口
  13. dubbo.protocol.port=20881
  14. #协议名称
  15. dubbo.protocol.name=dubbo
  16. #避免端口冲突
  17. server.port=8387
  18. spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
  19. spring.datasource.url=jdbc:mysql://localhost:3306/spring?useUnicode=true&characterEncoding=utf-8&serverTimezone=UTC
  20. spring.datasource.username=root
  21. spring.datasource.password=123456
  22. #mybatis配置
  23. mybatis.typeAliasesPackage=com.hong.springboot.entity
  24. spring.activemq.broker-url=tcp://localhost:61616
  25. spring.activemq.user=admin
  26. spring.activemq.password=admin
  27. spring.activemq.pool.enabled=true
  28. spring.activemq.pool.max-connections=50
  29. # 基于内存的ActiveMQ
  30. #spring.activemq.in-memory=true
  31. spring.activemq.queue=queue_springboot_user
  32. spring.activemq.topic=topic_springboot_user

springboot_all/springboot_activemq_producer/pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <parent>
  5.         <groupId>com.hong.springboot</groupId>
  6.         <artifactId>springboot_all</artifactId>
  7.         <version>0.0.1-SNAPSHOT</version>
  8.         <relativePath/>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <groupId>com.hong.springboot</groupId>
  12.     <artifactId>springboot_activemq_producer</artifactId>
  13.     <properties>
  14.         <java.version>1.8</java.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.springframework.boot</groupId>
  19.             <artifactId>spring-boot-starter-activemq</artifactId>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>com.hong.springboot</groupId>
  23.             <artifactId>springboot_mq_api</artifactId>
  24.             <version>1.0.0-SNAPSHOT</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.mybatis.spring.boot</groupId>
  28.             <artifactId>mybatis-spring-boot-starter</artifactId>
  29.             <version>2.1.3</version>
  30.         </dependency>
  31.         <dependency>
  32.             <groupId>com.alibaba</groupId>
  33.             <artifactId>druid-spring-boot-starter</artifactId>
  34.             <version>1.1.10</version>
  35.         </dependency>
  36.         <dependency>
  37.             <groupId>org.springframework.boot</groupId>
  38.             <artifactId>spring-boot-starter-web</artifactId>
  39.         </dependency>
  40.         <dependency>
  41.             <groupId>com.alibaba.boot</groupId>
  42.             <artifactId>dubbo-spring-boot-starter</artifactId>
  43.             <version>0.2.0</version>
  44.         </dependency>
  45.         <dependency>
  46.         <groupId>org.apache.zookeeper</groupId>
  47.         <artifactId>zookeeper</artifactId>
  48.         <version>3.5.4-beta</version>
  49.             <exclusions>
  50.                 <exclusion>
  51.                     <artifactId>slf4j-api</artifactId>
  52.                     <groupId>org.slf4j</groupId>
  53.                 </exclusion>
  54.                 <exclusion>
  55.                     <artifactId>slf4j-log4j12</artifactId>
  56.                     <groupId>org.slf4j</groupId>
  57.                 </exclusion>
  58.             </exclusions>
  59.         </dependency>
  60.         <dependency>
  61.             <groupId>org.springframework.boot</groupId>
  62.             <artifactId>spring-boot-starter-test</artifactId>
  63.             <scope>test</scope>
  64.         </dependency>
  65.     </dependencies>
  66.     <!--<build>-->
  67.         <!--<plugins>-->
  68.             <!--<plugin>-->
  69.                 <!--<groupId>org.springframework.boot</groupId>-->
  70.                 <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
  71.                 <!--<configuration>-->
  72.                     <!--<skip>true</skip>-->
  73.                 <!--</configuration>-->
  74.             <!--</plugin>-->
  75.         <!--</plugins>-->
  76.     <!--</build>-->
  77.     <!--静态资源导出问题-->
  78.     <build>
  79.         <resources>
  80.             <resource>
  81.                 <directory>src/main/java</directory>
  82.                 <includes>
  83.                     <include>**/*.properties</include>
  84.                     <include>**/*.xml</include>
  85.                 </includes>
  86.                 <filtering>false</filtering>
  87.             </resource>
  88.             <resource>
  89.                 <directory>src/main/resources</directory>
  90.                 <includes>
  91.                     <include>**/*.properties</include>
  92.                     <include>**/*.xml</include>
  93.                 </includes>
  94.                 <filtering>false</filtering>
  95.             </resource>
  96.         </resources>
  97.     </build>
  98. </project>

springboot_activemq_consumer 端口:8388

com.hong.springboot.config.ActiveMQConfig

  1. package com.hong.springboot.config;
  2. import org.apache.activemq.ActiveMQConnectionFactory;
  3. import org.apache.activemq.command.ActiveMQQueue;
  4. import org.apache.activemq.command.ActiveMQTopic;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
  9. import org.springframework.jms.config.JmsListenerContainerFactory;
  10. import javax.jms.Queue;
  11. import javax.jms.Topic;
  12. @Configuration
  13. public class ActiveMQConfig {
  14.     @Value("${spring.activemq.user}")
  15.     private String queueName;
  16.     @Value("${spring.activemq.topic}")
  17.     private String topicName;
  18.     @Value("${spring.activemq.user}")
  19.     private String usrName;
  20.     @Value("${spring.activemq.password}")
  21.     private  String password;
  22.     @Value("${spring.activemq.broker-url}")
  23.     private  String brokerUrl;
  24.     @Bean
  25.     public Queue queue(){
  26.         return new ActiveMQQueue(queueName);
  27.     }
  28.     @Bean
  29.     public Topic topic(){
  30.         return new ActiveMQTopic(topicName);
  31.     }
  32.     @Bean
  33.     public ActiveMQConnectionFactory connectionFactory() {
  34.         return new ActiveMQConnectionFactory(usrName, password, brokerUrl);
  35.     }
  36.     @Bean
  37.     public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ActiveMQConnectionFactory connectionFactory){
  38.         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  39.         bean.setConnectionFactory(connectionFactory);
  40.         bean.setPubSubDomain(false);
  41.         return bean;
  42.     }
  43.     @Bean
  44.     public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory connectionFactory){
  45.         DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
  46.         //设置为发布订阅方式, 默认情况下使用的生产消费者方式
  47.         bean.setPubSubDomain(true);
  48.         bean.setConnectionFactory(connectionFactory);
  49.         return bean;
  50.     }
  51. }

com.hong.springboot.listener.UserListener

  1. package com.hong.springboot.listener;
  2. import com.alibaba.dubbo.config.annotation.Reference;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.hong.springboot.api.IUserService;
  5. import com.hong.springboot.entity.User;
  6. import com.hong.springboot.utils.DataResponse;
  7. import lombok.extern.log4j.Log4j2;
  8. import org.springframework.jms.annotation.JmsListener;
  9. import org.springframework.stereotype.Service;
  10. import javax.jms.TextMessage;
  11. /**
  12.  * @author: csh
  13.  * @Date: 2021/3/16 11:14
  14.  * @Description:用户监听
  15.  */
  16. @Service
  17. @Log4j2
  18. public class UserListener {
  19.     @Reference
  20.     private IUserService userService;
  21.     /**
  22.      *
  23.      * 功能描述: 基于队列添加
  24.      *
  25.      * @param:
  26.      * @return:
  27.      * @auther: csh
  28.      * @date: 2021/4/20 10:56
  29.      */
  30.     @JmsListener(destination = "${spring.activemq.queue}",containerFactory = "jmsListenerContainerQueue")
  31.     public void onMessage(TextMessage msg) {
  32.         log.info("进入队列消费");
  33.         if(null==msg){
  34.             return;
  35.         }
  36.         try {
  37.             User user = JSONObject.parseObject(msg.getText(),User.class);
  38.             log.info("最终要添加的值{}",JSONObject.toJSONString(user));
  39.             DataResponse<Boolean> save = userService.reallySave(user);
  40.             if(save==null || !save.getData()){
  41.                 log.info("添加失败,原因{}",JSONObject.toJSONString(save));
  42.             }
  43.         }catch (Exception e){
  44.             log.error("添加出错",e);
  45.         }
  46.     }
  47.     @JmsListener(destination = "${spring.activemq.topic}",containerFactory = "jmsListenerContainerTopic")
  48.     public void onMessage2(TextMessage msg) {
  49.         log.info("进入topic消费");
  50.         if(null==msg){
  51.             return;
  52.         }
  53.         try {
  54.             User user = JSONObject.parseObject(msg.getText(),User.class);
  55.             log.info("最终要添加的值{}",JSONObject.toJSONString(user));
  56.             DataResponse<Boolean> save = userService.reallySave(user);
  57.             if(save==null || !save.getData()){
  58.                 log.info("添加失败,原因{}",JSONObject.toJSONString(save));
  59.             }
  60.         }catch (Exception e){
  61.             log.error("添加出错",e);
  62.         }
  63.     }
  64. }

com.hong.springboot.Application

  1. package com.hong.springboot;
  2. import com.alibaba.dubbo.config.spring.context.annotation.EnableDubbo;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.jms.annotation.EnableJms;
  6. /**
  7.  * @author: csh
  8.  * @Date: 2020/11/21 11:37
  9.  * @Description:
  10.  */
  11. @SpringBootApplication(scanBasePackages = "com.hong.springboot")
  12. @EnableDubbo
  13. @EnableJms
  14. public class Application  {
  15.     public static void main(String[] args) {
  16.         SpringApplication.run(Application.class);
  17.     }
  18. }

application.properties

  1. rocketmq.name-server=localhost:9876
  2. rocketmq.producer.group=hong_group
  3. rocketmq.producer.sendMessageTimeout=300000
  4. #避免端口冲突
  5. server.port=8388
  6. #dubbo configuration
  7. #服务名称
  8. dubbo.application.name=springboot_rocketmq_consumer
  9. dubbo.registry.protocol=zookeeper
  10. #注册地址
  11. dubbo.registry.address=zookeeper://127.0.0.1:2181
  12. #扫描注解包通过该设置将服务注册到zookeeper
  13. dubbo.protocol.scan=com.hong.springboot.api
  14. #注册端口
  15. dubbo.protocol.port=20882
  16. #协议名称
  17. dubbo.protocol.name=dubbo
  18. #扫包
  19. dubbo.scan.basePackages=com.hong.springboot.listener
  20. spring.activemq.broker-url=tcp://localhost:61616
  21. spring.activemq.user=admin
  22. spring.activemq.password=admin
  23. #spring.activemq.pool.enabled=true
  24. #spring.activemq.pool.max-connections=50
  25. # 基于内存的ActiveMQ
  26. #spring.activemq.in-memory=true
  27. spring.activemq.queue=queue_springboot_user
  28. spring.activemq.topic=topic_springboot_user

springboot_all/springboot_activemq_consumer/pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4.     <parent>
  5.         <groupId>com.hong.springboot</groupId>
  6.         <artifactId>springboot_all</artifactId>
  7.         <version>0.0.1-SNAPSHOT</version>
  8.         <relativePath/>
  9.     </parent>
  10.     <modelVersion>4.0.0</modelVersion>
  11.     <groupId>com.hong.springboot</groupId>
  12.     <artifactId>springboot_activemq_consumer</artifactId>
  13.     <properties>
  14.         <java.version>1.8</java.version>
  15.     </properties>
  16.     <dependencies>
  17.         <dependency>
  18.             <groupId>org.springframework.boot</groupId>
  19.             <artifactId>spring-boot-starter-activemq</artifactId>
  20.         </dependency>
  21.         <dependency>
  22.             <groupId>com.hong.springboot</groupId>
  23.             <artifactId>springboot_mq_api</artifactId>
  24.             <version>1.0.0-SNAPSHOT</version>
  25.         </dependency>
  26.         <dependency>
  27.             <groupId>org.springframework.boot</groupId>
  28.             <artifactId>spring-boot-starter-web</artifactId>
  29.         </dependency>
  30.         <dependency>
  31.             <groupId>com.alibaba.boot</groupId>
  32.             <artifactId>dubbo-spring-boot-starter</artifactId>
  33.             <version>0.2.0</version>
  34.         </dependency>
  35.         <dependency>
  36.         <groupId>org.apache.zookeeper</groupId>
  37.         <artifactId>zookeeper</artifactId>
  38.         <version>3.5.4-beta</version>
  39.             <exclusions>
  40.                 <exclusion>
  41.                     <artifactId>slf4j-api</artifactId>
  42.                     <groupId>org.slf4j</groupId>
  43.                 </exclusion>
  44.                 <exclusion>
  45.                     <artifactId>slf4j-log4j12</artifactId>
  46.                     <groupId>org.slf4j</groupId>
  47.                 </exclusion>
  48.             </exclusions>
  49.         </dependency>
  50.         <dependency>
  51.             <groupId>org.springframework.boot</groupId>
  52.             <artifactId>spring-boot-starter-test</artifactId>
  53.             <scope>test</scope>
  54.         </dependency>
  55.     </dependencies>
  56.     <!--<build>-->
  57.         <!--<plugins>-->
  58.             <!--<plugin>-->
  59.                 <!--<groupId>org.springframework.boot</groupId>-->
  60.                 <!--<artifactId>spring-boot-maven-plugin</artifactId>-->
  61.                 <!--<configuration>-->
  62.                     <!--<skip>true</skip>-->
  63.                 <!--</configuration>-->
  64.             <!--</plugin>-->
  65.         <!--</plugins>-->
  66.     <!--</build>-->
  67.     <!--静态资源导出问题-->
  68.     <build>
  69.         <resources>
  70.             <resource>
  71.                 <directory>src/main/java</directory>
  72.                 <includes>
  73.                     <include>**/*.properties</include>
  74.                     <include>**/*.xml</include>
  75.                 </includes>
  76.                 <filtering>false</filtering>
  77.             </resource>
  78.             <resource>
  79.                 <directory>src/main/resources</directory>
  80.                 <includes>
  81.                     <include>**/*.properties</include>
  82.                     <include>**/*.xml</include>
  83.                 </includes>
  84.                 <filtering>false</filtering>
  85.             </resource>
  86.         </resources>
  87.     </build>
  88. </project>

然后请求如下:http://localhost:8386/user/save?username=springboot_activemq&age=1000

  1. username:springboot_activemq
  2. age:1000

结果如下:

  1. 2021-04-20 11:07:42.501  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入topic消费
  2. 2021-04-20 11:07:42.571  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}
  3. 2021-04-20 11:07:45.861  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 进入队列消费
  4. 2021-04-20 11:07:45.861  INFO 20632 --- [enerContainer-1] c.hong.springboot.listener.UserListener : 最终要添加的值{"age":1000,"username":"springboot_activemq"}

这样基本就完成了springboot整合activemq非常easy,并且容易上手,还整合相关的rpc进行查询与添加。

topic与队列的区别?

文章中涉及到队列与topic,但是有什么区别?

 名称

Topic

Queue

概要

Publish  Subscribe messaging 发布订阅消息

Point-to-Point  点对点

重复消费

可以重复消费

点对点模式所以不可重复消费

接收方式

无需主动请求,可由服务器推送

需要主动获取队列中的消息

有无状态

topic数据默认不落地,是无状态的。

Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。

完整性保障

并不保证publisher发布的每条数据,Subscriber都能接受到。

Queue保证每条数据都能被receiver接收。

消息是否会丢失

一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。

Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。

消息发布接收策略

一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器

一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

 

消息类型

是否持久化

是否有Durable订阅者

消费者延迟启动时,消息是否保留

Broker重启时,消息是否保留

Queue

N

-

Y

N

Queue

Y

-

Y

Y

Topic

N

N

N

N

Topic

N

Y

Y

N

Topic

Y

N

N

N

Topic

Y

Y

Y

Y

    参考文章:

    https://blog.csdn.net/admin1973/article/details/60125938

压测

    通过jmeter压测,发现大量堆积,虽然说跟http和mysql有一定关系,但是这个堆积量太大了..还有失败率达97%....

最后

    虽然说activemq属于中间件的第一批鼻主来的,但是国内使用activemq作为商业的场景不是特别多,因为kafka和rocketmq、rabbitmq该有的功能都有,并且性能更高及支持的场景更多,当然没有好坏只是合不合适罢了,本文只是一个基础整合,后续有时间再深入。

    参考文章:

    https://www.cnblogs.com/jaycekon/p/6225058.html

    https://activemq.apache.org/maven/apidocs/

    https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-messaging

    https://www.cnblogs.com/cyfonly/p/6380860.html

     更多学习资料可关注以下公众号,获取作者个人微信沟通或获取。

系列文章:

spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)

spring整合各种RPC框架(netty、dubbo、dubbox、RPC、Motan)-续netty

spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(gRPC)

spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)

spring整合中间件(RocketMQ、kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)

spring整合中间件(kafka、RabbitMQ、ActiveMQ、ZeroMQ、TubeMQ、NSQ)-kafka

spring整合中间件(RocketMQ、kafka、RabbitMQ)-RabbitMQ

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/139137
推荐阅读
相关标签
  

闽ICP备14008679号