赞
踩
目录
3.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
读写分离,纵向扩展,所有的写操作一般在master上完成,slave只提供一个热备
分布式的一种存储,水平的扩展,消息的分布式共享
- <persistenceAdapter>
- <kahaDB directory="/data/kahadb"/> # 自定义的地址
- </persistenceAdapter>
主要是通过共享目录存储目录来实现master和slave的热备,谁先启动,谁就可以最终取得共享目录的控制权成为master,其它的应用就只能作为slave
- <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destory-method="close">
- <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
- <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
- <property name="username" value="root"/>
- <property name="password" value="root" />
- <property name="poolPreparedStatements" value="true"/>
- </bean>
- <persistenceAdapter>
- <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
- </persistenceAdapter>
与shared filesystem方式类似,知识共享的存储介质由文件系统改成数据库而已
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
我这里三台主机名分别设置了 master node1 node2
三台服务器均下载安装activemq
- [root@master /]# mkdir myactivemq
- [root@master /]# cd myactivemq
- [root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
- [root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
2.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
zkAddress 为zookeeper的地址
- <!--
- <persistenceAdapter>
- <kahaDB directory="${activemq.data}/kahadb"/>
- </persistenceAdapter>
- -->
- <persistenceAdapter>
- <replicatedLevelDB
- directory="${activemq.data}/leveldb"
- replicas="3"
- bind="tcp://0.0.0.0:0"
- zkAddress="192.168.190.200:2181"
- hostname="192.168.190.201" <!-- 集群中任意一台服务器ip或通过host文件配置的主机名 -->
- sync="local_disk"
- zkPath="/activemq/leveldb-stores"
- />
- </persistenceAdapter>
如果是单机需要修改下边配置,61616端口修改,其它全部注掉,防止端口冲突,我们这里是多服务器配置,可以不用更改,本配置忽略下边配置
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <!--
- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- -->
- </transportConnectors>
先启动zookeeper,然后分别启动三台服务器的activemq
- [root@master bin]# pwd
- /myactivemq/apache-activemq-5.15.9/bin
- [root@master bin]# ./activemq start
我这里先启动了241的节点,所以241为主master节点
因此我们只能访问241的控制界面
选择master节点, ./activemq stop
我们发现其中一台从节点被选举为主节点
重新启动原来master,其会作为slave服务器继续提供服务
需要在节点的配置文件中,显示的配置其他节点的IP地址和服务端口号,例如:
- <networkConnectors>
- <networkConnector uri="static:(tcp://192.168.190.200:61616,tcp://192.168.190.201:61616,tcp://192.168.190.202:61616)" />
- </networkConnectors>
通过广播的方式,动态的发现其它节点。例如:
- <networkConnectors>
- <networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
- </networkConnectors>
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
- />
- <!--
- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- -->
- </transportConnectors>
在每一个节点中activemq.xml配置文件中配置上述代码。其中defalut是我们自定义的名称,在discoveryUri属性中进行引用即可。有利于集群节点的动态变更。
注意,如单机部署多个activemq示例,jetty.xml文件中的8161端口也需要更改
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
三台服务器均下载安装activemq
- [root@master /]# mkdir myactivemq
- [root@master /]# cd myactivemq
- [root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
- [root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
- <networkConnectors>
- <networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
- </networkConnectors>
- <transportConnectors>
- <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
- <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
- />
- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
- </transportConnectors>
每台brokerName不能重名,名字随意
我这里启动2台的能成功加入集群,启动三台失败,...当启动第三台的时候,查看日志报错如下
- 2024-04-15 12:06:18,843 | WARN | Failed to add Connection id=localhost->localhost-36779-1713153927901-42:2, clientId=NC_localhost_outbound due to {} | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///192.168.190.202:50616@61616
- javax.jms.InvalidClientIDException: Broker: localhost - Client: NC_localhost_outbound already connected from tcp://192.168.190.200:47904
- at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
- at org.apache.activemq.transport.MutexTransport.onComm
我们打开2台集群中的其中一台可以看到,连接的名字都是NC:localhost:outbound,
其中localhost是brokerName,所以这里需要修改每一台的brokerName不能重名
启动第一台服务器activemq
这里查看的都是一台服务器的日志
加入第二台服务器的日志
加入第三台服务器的日志
随便查看一台服务器的地址的可视页面,可以看到集群成功
生产者连接其中一个activemq实例生产消息
- package com.dolphin;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.springframework.jms.core.MessagePostProcessor;
-
- import javax.jms.*;
-
- public class JmsProduce {
- public static final String ACTIVEMQ_URL = "tcp://192.168.190.200:61616";
- public static final String QUEUE_NAME = "queue01";
- public static void main(String[] args) throws JMSException {
- //1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
- //2 通过连接工厂,获得链接connection并启动访问
- Connection connection = activeMQConnectionFactory.createConnection();
- connection.start();
-
- //3、创建会话session
- //两个参数,第一个叫事务/第二个叫签收
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //4、创建目的地(具体是队列还是主题topic)
- Queue queue = session.createQueue(QUEUE_NAME);
- //5、创建消息的生产者
- MessageProducer messageProducer = session.createProducer(queue);
- //6、 通过使用messageProducer生产3条消息发送到MQ的队列里面
- for (int i = 1;i<=6;i++) {
- //7 创建消息
- TextMessage textMessage = session.createTextMessage("message---" + i);//理解为一个字符串
- textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
- //8 通过messageProducer发送给mq
- messageProducer.send(textMessage);
- }
- //9 关闭资源
- messageProducer.close();
- session.close();
- connection.close();
- System.out.println("*****消息发布完成");
- }
- }
消费者连接另一台服务实例
- package com.dolphin;
-
- import org.apache.activemq.ActiveMQConnectionFactory;
-
- import javax.jms.*;
- import java.io.IOException;
-
- public class JmsConsumer1 {
- public static final String ACTIVEMQ_URL = "tcp://192.168.190.201:61616";
- public static final String QUEUE_NAME = "queue01";
- public static void main(String[] args) throws JMSException, IOException {
- System.out.println("1号消费者");
- //1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
- //2 通过连接工厂,获得链接connection并启动访问
- Connection connection = activeMQConnectionFactory.createConnection();
- connection.start();
-
- //3、创建会话session
- //两个参数,第一个叫事务/第二个叫签收
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- //4、创建目的地(具体是队列还是主题topic)
- Queue queue = session.createQueue(QUEUE_NAME);
- //5、创建消息的消费者
- MessageConsumer messageConsumer = session.createConsumer(queue);
- //6、 通过使用messageProducer生产3条消息发送到MQ的队列里面
-
- messageConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- if (null != message && message instanceof TextMessage) {
- TextMessage textMessage = (TextMessage) message;
- try {
- System.out.println("*****消费者接受到消息"+textMessage.getText());
- } catch (JMSException e) {
- e.printStackTrace();
- }
-
- }
- }
- });
- System.in.read(); //进程处于运行状态
- messageConsumer.close();
- session.close();
- connection.close();
- System.out.println("*****消息消费完成");
- }
- }
运行生产者生产了6条消息,消息发送到指定的服务实例上,其它集群实例中看不到消息
运行连接了另一台服务的消费者
运行结果,这里消费了6条上述生产者的消息
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。