赞
踩
业务需求,需要使用到mqtt协议(中间件)。
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。 在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
从图上MQTT有三种角色的存在:
MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:
MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:
官方下载地址:ActiveMQ
CSDN下载地址:apache-activemq-5.15.9.rar_mqtt搭建-其它工具类资源-CSDN下载
官方下载地址:Java Downloads | Oracle
CSDN下载地址:jdk-12.0.1_windows-x64_bin.rar_mqtt搭建-其它工具类资源-CSDN下载
下载系统对应的版本,windows x64位
添加java路径到系统Path变量
JAVA_HOME
C:\Program Files\Java\jdk-12.0.1
PATH(最后面加上)
;%JAVA_HOME%\bin\
加粗样式CLASSPATH
.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
使用cmd运行java命名,jdk环境配置成功,如下图:
将Apache Apoll解压到C盘下(自定义),如下图:
\
使用cmd进入该文件夹创建实例:
- cd C:\apache-activemq-5.15.9
- cd bin
- activemq-admin.bat create mybroker
进入mybroker/conf,查看users.properties,可以看到用户名
查看tcp监听端口(可自行修改,笔者不修改)
查看web管理页面端口(可自行修改,笔者不修改)
- cd C:\apache-activemq-5.15.9\bin\mybroker\bin
- mybroker.bat start
至此windows server2008 r2系统上的apache apoll中间件环境以及服务成功搭建完成。
客户端遇到协议违规,因此关闭了连接。
错误
检查服务器报错
原因
ActiveMQ有时会报类似Frame size of 257 MB larger than max allowed 100 MB的错误,意思是单条消息超过了预设的最大值,在配置文件中
- <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?
- maximumConnections=1000&wireFormat.maxFrameSize=1048576000"/>
我们可以配置这个值,但是有时会突然出现很大的单条消息,比如1G。
分析
QtMqtt与服务连接,传过去的属性最大值可能是258MB,所以直接修改服务器配置。
解决方法
重启服务
准备工作
由于搭建Apollo环境变量需要有JAVA_HOME,这个时候需要安装JDK,可以参考这篇文章:《Ubuntu安装JDK1.8.0并配置环境变量》。
下载及解压
首先是下载Apache-Apollo,下载页面:http://www.apache.org/dyn/closer.cgi?path=activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz
或者输入下面命令:
wget http://apache.fayea.com/activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz
1
解压源码包:
tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
1
配置
进入apache-apollo-1.7.1/bin目录
cd apache-apollo-1.7.1/bin/
1
输入./apollo可以查看帮助
pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./apollo
usage: apollo [--log <log_level>] <command> [<args>]
The most commonly used apollo commands are:
create creates a new broker instance
disk-benchmark Benchmarks your disk's speed
help Display help information
version Displays the broker version
See 'apollo help <command>' for more information on a specific command.
创建一个Broker示例:/apollo create mybroker。MQTT服务器都是叫Broker。
pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./apollo create mybroker
Creating apollo instance at: mybroker
Generating ssl keystore...
You can now start the broker by executing:
"/home/***/Downloads/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker" run
Or you can setup the broker as system service and run it in the background:
sudo ln -s "/home/***/Downloads/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker-service" /etc/init.d/
/etc/init.d/apollo-broker-service start
后面会有提示怎么启动服务器,以及创建一个service。
启动Apollo :
pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./mybroker/bin/apollo-broker run
1
之后查看打印信息即可知道MQTT要连接的端口和管理页面端口。
注意里面的ip地址,想要你的broker被外网访问,就改成 0.0.0.0 吧。
进入你创建的需要改动的broker中, vim etc/apollo.xml 将ip的绑定都改一下吧,如下图:
外网访问你这台机器的地址就好了 例如: http://10.10.22.37:61680/
如果端口不能访问的话,注意开放端口。
开放端口的方法在上一篇redis里面介绍到了。 vim /etc/sysconfig/iptables
有很多连接MQTT服务器的工具,尽情的玩耍吧~
此界面表示已经安装成功:该登录的用户名和密码在\apache-apollo-1.7.1\bin\mybroker\etc\users.properties里,打开users.properties文件:
## —————————————————————————
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the “License”); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an “AS IS” BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## —————————————————————————
#
# The list of users that can login. This file supports both plain text or
# encrypted passwords. Here is an example what an encrypted password
# would look like:
#
# admin=ENC(Cf3Jf3tM+UrSOoaKU50od5CuBa8rxjoL)
#
admin=password
经过上面的简单步骤,服务器基本上就已经完成。输入admin,password就可以登录了,如下图:
服务端代码:
- package bsit.mqtt.demo.one_way;
-
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
- import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
- /**
- *
- * Title:Server
- * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
- * @author chenrl
- * 2016年1月6日下午3:29:28
- */
- public class Server {
-
- public static final String HOST = "tcp://192.168.1.3:61613";
- public static final String TOPIC = "toclient/124";
- public static final String TOPIC125 = "toclient/125";
- private static final String clientid = "server";
-
- private MqttClient client;
- private MqttTopic topic;
- private MqttTopic topic125;
- private String userName = "admin";
- private String passWord = "password";
-
- private MqttMessage message;
-
- public Server() throws MqttException {
- // MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- connect();
- }
-
- private void connect() {
- MqttConnectOptions options = new MqttConnectOptions();
- options.setCleanSession(false);
- options.setUserName(userName);
- options.setPassword(passWord.toCharArray());
- // 设置超时时间
- options.setConnectionTimeout(10);
- // 设置会话心跳时间
- options.setKeepAliveInterval(20);
- try {
- client.setCallback(new PushCallback());
- client.connect(options);
- topic = client.getTopic(TOPIC);
- topic125 = client.getTopic(TOPIC125);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
- MqttException {
- MqttDeliveryToken token = topic.publish(message);
- token.waitForCompletion();
- System.out.println("message is published completely! "
- + token.isComplete());
- }
-
- public static void main(String[] args) throws MqttException {
- Server server = new Server();
-
- server.message = new MqttMessage();
- server.message.setQos(2);
- server.message.setRetained(true);
- server.message.setPayload("给客户端124推送的信息".getBytes());
- server.publish(server.topic , server.message);
-
- server.message = new MqttMessage();
- server.message.setQos(2);
- server.message.setRetained(true);
- server.message.setPayload("给客户端125推送的信息".getBytes());
- server.publish(server.topic125 , server.message);
-
- System.out.println(server.message.isRetained() + "------ratained状态");
- }
- }
客户端代码:
- package bsit.mqtt.demo.one_way;
-
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
-
- import org.eclipse.paho.client.mqttv3.MqttClient;
- import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
- import org.eclipse.paho.client.mqttv3.MqttException;
- import org.eclipse.paho.client.mqttv3.MqttSecurityException;
- import org.eclipse.paho.client.mqttv3.MqttTopic;
- import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
- public class Client {
-
- public static final String HOST = "tcp://192.168.1.3:61613";
- public static final String TOPIC = "toclient/124";
- private static final String clientid = "client124";
- private MqttClient client;
- private MqttConnectOptions options;
- private String userName = "admin";
- private String passWord = "password";
-
- private ScheduledExecutorService scheduler;
-
- private void start() {
- try {
- // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
- client = new MqttClient(HOST, clientid, new MemoryPersistence());
- // MQTT的连接设置
- options = new MqttConnectOptions();
- // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
- options.setCleanSession(true);
- // 设置连接的用户名
- options.setUserName(userName);
- // 设置连接的密码
- options.setPassword(passWord.toCharArray());
- // 设置超时时间 单位为秒
- options.setConnectionTimeout(10);
- // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
- options.setKeepAliveInterval(20);
- // 设置回调
- client.setCallback(new PushCallback());
- MqttTopic topic = client.getTopic(TOPIC);
- //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
- options.setWill(topic, "close".getBytes(), 2, true);
-
- client.connect(options);
- //订阅消息
- int[] Qos = {1};
- String[] topic1 = {TOPIC};
- client.subscribe(topic1, Qos);
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) throws MqttException {
- Client client = new Client();
- client.start();
- }
- }
MQTT订阅回调类:
- package bsit.mqtt.demo.one_way;
-
- import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
- import org.eclipse.paho.client.mqttv3.MqttCallback;
- import org.eclipse.paho.client.mqttv3.MqttMessage;
-
- /**
- * 发布消息的回调类
- *
- * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
- * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
- * 在回调中,将它用来标识已经启动了该回调的哪个实例。
- * 必须在回调类中实现三个方法:
- *
- * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
- *
- * public void connectionLost(Throwable cause)在断开连接时调用。
- *
- * public void deliveryComplete(MqttDeliveryToken token))
- * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
- * 由 MqttClient.connect 激活此回调。
- *
- */
- public class PushCallback implements MqttCallback {
-
- public void connectionLost(Throwable cause) {
- // 连接丢失后,一般在这里面进行重连
- System.out.println("连接断开,可以做重连");
- }
-
- public void deliveryComplete(IMqttDeliveryToken token) {
- System.out.println("deliveryComplete---------" + token.isComplete());
- }
-
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- // subscribe后得到的消息会执行到这里面
- System.out.println("接收消息主题 : " + topic);
- System.out.println("接收消息Qos : " + message.getQos());
- System.out.println("接收消息内容 : " + new String(message.getPayload()));
- }
- }
运行服务端代码,可看到服务器会给客户端124/125各推送一条消息,
在运行124客户端代码,可看到124客户端接收的信息:
然后把客户端代码的Topic改为TOPIC = “toclient/125”;clientid = “client125”;再运行该段代码,可看到125客户端接收的信息:
多个客户端订阅同一主题,其clientid必不相同。客户端124/125订阅各自主题的内容,但是不同时间启动,都在启动后接收到各自信息,这体现出了服务器的推送功能。同样的,发送的主题信息,可以在服务器的topic可以看到,访问路径是:http://127.0.0.1:61680/
其实,如若服务端和客户端相互通信,即客户端可以订阅可以发布,服务端可以订阅也可以发布,则可不区分服务端客户端,两边代码几乎一样。类似,两个客户端都在订阅同一主题,这时由第三个客户端发布这一主题的请求,前两个客户端同样可以接受该主题的内容,这时三个客户端的代码几乎一样,只是前两个是订阅,后一个是发布。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。