当前位置:   article > 正文

Mqtt服务器搭建_java mqtt服务器搭建

java mqtt服务器搭建

前话

  业务需求,需要使用到mqtt协议(中间件)。



MQTT协议



简介

  MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。



设计原则

  • 精简,不添加可有可无的功能;
  • 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;
  • 允许用户动态创建主题,零运维成本;
  • 把传输量降到最低以提高传输效率;
  • 把低带宽、高延迟、不稳定的网络等因素考虑在内;
  • 支持连续的会话控制;
  • 理解客户端计算能力可能很低;
  • 提供服务质量管理;
  • 假设数据不可知,不强求传输数据的类型与格式,保持灵活性。



特点

  1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
      这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。
  2. 对负载内容屏蔽的消息传输
  3. 使用TCP/IP提供网络连接
      主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
  4. 有三种消息发布服务质量
    "至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
    "至少一次",确保消息到达,但消息重复可能会发生。
    "只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
  5. 小型传输,开销小
      (固定长度的头部是2字节),协议交换最小化,以降低网络流量。非常适合"在物联网领域,传感器与服务器的通信,信息的收集",嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
  6. 客户端异常中断的机制。
    Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。
    Testament:遗嘱机制,功能类似于Last Will。



发布/订阅者模式

  MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。 在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
  在这里插入图片描述
  从图上MQTT有三种角色的存在:

  • Broker代理:很多人理解为中间件,当然可以这样子认为。他就是一个中间件。用于处理信息并发送到相应的订阅者。
  • 发布者:用于发布信息到代理上面。注意:发布者也可以是订阅者。
  • 订阅者:就是用于接受信息的客户端。



MQTT服务器

  MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

  • 接受来自客户的网络连接;
  • 接受客户发布的应用信息;
  • 处理来自客户端的订阅和退订请求;
  • 向订阅的客户转发应用程序消息。



MQTT协议中的方法

  MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

  • Connect:等待与服务器建立连接
  • Disconnect:等待MQTT客户端完成所作的工作,并于服务器断开TCP/IP会话
  • Subscribe:等待完成订阅
  • UnSubscribe:等待服务器取消客户端的一个活多个和topics订阅
  • Publish:MQTT客户端发送消息请求,发送完成后返回应用程序线程



Windows上Apache Apoll环境搭建(mqtt)



下载Apache Apoll

  官方下载地址:ActiveMQ
  CSDN下载地址:apache-activemq-5.15.9.rar_mqtt搭建-其它工具类资源-CSDN下载



安装jdk

  官方下载地址:Java Downloads | Oracle
  CSDN下载地址:jdk-12.0.1_windows-x64_bin.rar_mqtt搭建-其它工具类资源-CSDN下载
  下载系统对应的版本,windows x64位
   在这里插入图片描述
  添加java路径到系统Path变量
  JAVA_HOME

C:\Program Files\Java\jdk-12.0.1

  PATH(最后面加上)

;%JAVA_HOME%\bin\

  加粗样式CLASSPATH

.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar

  使用cmd运行java命名,jdk环境配置成功,如下图:
  在这里插入图片描述



安装Apache Apoll



解压

  将Apache Apoll解压到C盘下(自定义),如下图:
  在这里插入图片描述\



创建实例

  使用cmd进入该文件夹创建实例:

  1. cd C:\apache-activemq-5.15.9
  2. cd bin
  3. activemq-admin.bat create mybroker

  在这里插入图片描述
   在这里插入图片描述



查看登录用户名和密码

  进入mybroker/conf,查看users.properties,可以看到用户名
  在这里插入图片描述
  在这里插入图片描述
  查看tcp监听端口(可自行修改,笔者不修改)
  
  在这里插入图片描述
  查看web管理页面端口(可自行修改,笔者不修改)
  在这里插入图片描述
  在这里插入图片描述



运行apache apoll

  1. cd C:\apache-activemq-5.15.9\bin\mybroker\bin
  2. mybroker.bat start

  在这里插入图片描述
  在这里插入图片描述
   在这里插入图片描述



测试安装



步骤一:打开ie(浏览器)



步骤二:输入网址http://localhost:8161/admin

  在这里插入图片描述



步骤三:输入admin,admin

  在这里插入图片描述



步骤四:创建队列测试

  在这里插入图片描述
  在这里插入图片描述
  在这里插入图片描述
  在这里插入图片描述



步骤五(补充):远程管理

  在这里插入图片描述
  至此windows server2008 r2系统上的apache apoll中间件环境以及服务成功搭建完成。



常见错误处理



连接时Qt返回错误码257

  客户端遇到协议违规,因此关闭了连接。
错误
  检查服务器报错
   在这里插入图片描述
原因
  ActiveMQ有时会报类似Frame size of 257 MB larger than max allowed 100 MB的错误,意思是单条消息超过了预设的最大值,在配置文件中

  1. <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?
  2. maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576000"/>

  我们可以配置这个值,但是有时会突然出现很大的单条消息,比如1G。
分析
  QtMqtt与服务连接,传过去的属性最大值可能是258MB,所以直接修改服务器配置。
解决方法
  重启服务

Linux上安装搭建MQTT服务器

准备工作
由于搭建Apollo环境变量需要有JAVA_HOME,这个时候需要安装JDK,可以参考这篇文章:《Ubuntu安装JDK1.8.0并配置环境变量》。

下载及解压
首先是下载Apache-Apollo,下载页面:http://www.apache.org/dyn/closer.cgi?path=activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz

或者输入下面命令:

wget http://apache.fayea.com/activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-unix-distro.tar.gz
1
解压源码包:

tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz
1
配置
进入apache-apollo-1.7.1/bin目录

cd apache-apollo-1.7.1/bin/
1
输入./apollo可以查看帮助

pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./apollo
usage: apollo [--log <log_level>] <command> [<args>]

The most commonly used apollo commands are:
    create           creates a new broker instance
    disk-benchmark   Benchmarks your disk's speed
    help             Display help information
    version          Displays the broker version

See 'apollo help <command>' for more information on a specific command.

创建一个Broker示例:/apollo create mybroker。MQTT服务器都是叫Broker。

pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./apollo create mybroker
Creating apollo instance at: mybroker
Generating ssl keystore...

You can now start the broker by executing:  

   "/home/***/Downloads/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker" run

Or you can setup the broker as system service and run it in the background:

   sudo ln -s "/home/***/Downloads/apache-apollo-1.7.1/bin/mybroker/bin/apollo-broker-service" /etc/init.d/
   /etc/init.d/apollo-broker-service start

后面会有提示怎么启动服务器,以及创建一个service。

启动Apollo :

pi@raspberry-pi:~/Downloads/apache-apollo-1.7.1/bin$ ./mybroker/bin/apollo-broker run
1
之后查看打印信息即可知道MQTT要连接的端口和管理页面端口。
 

 注意里面的ip地址,想要你的broker被外网访问,就改成 0.0.0.0 吧。

        进入你创建的需要改动的broker中, vim etc/apollo.xml  将ip的绑定都改一下吧,如下图:

        

        外网访问你这台机器的地址就好了 例如: http://10.10.22.37:61680/  

        如果端口不能访问的话,注意开放端口。

         开放端口的方法在上一篇redis里面介绍到了。 vim /etc/sysconfig/iptables 

         有很多连接MQTT服务器的工具,尽情的玩耍吧~

此界面表示已经安装成功:该登录的用户名和密码在\apache-apollo-1.7.1\bin\mybroker\etc\users.properties里,打开users.properties文件:

  ## —————————————————————————
  ## Licensed to the Apache Software Foundation (ASF) under one or more
  ## contributor license agreements. See the NOTICE file distributed with
  ## this work for additional information regarding copyright ownership.
  ## The ASF licenses this file to You under the Apache License, Version 2.0
  ## (the “License”); you may not use this file except in compliance with
  ## the License. You may obtain a copy of the License at
  ##
  ## http://www.apache.org/licenses/LICENSE-2.0
  ##
  ## Unless required by applicable law or agreed to in writing, software
  ## distributed under the License is distributed on an “AS IS” BASIS,
  ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  ## See the License for the specific language governing permissions and
  ## limitations under the License.
  ## —————————————————————————

  #
  # The list of users that can login. This file supports both plain text or
  # encrypted passwords. Here is an example what an encrypted password
  # would look like:
  #
  # admin=ENC(Cf3Jf3tM+UrSOoaKU50od5CuBa8rxjoL)
  #

  admin=password

经过上面的简单步骤,服务器基本上就已经完成。输入admin,password就可以登录了,如下图:

服务端代码:

  1. package bsit.mqtt.demo.one_way;
  2. import org.eclipse.paho.client.mqttv3.MqttClient;
  3. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  4. import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  5. import org.eclipse.paho.client.mqttv3.MqttException;
  6. import org.eclipse.paho.client.mqttv3.MqttMessage;
  7. import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
  8. import org.eclipse.paho.client.mqttv3.MqttTopic;
  9. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  10. /**
  11. *
  12. * Title:Server
  13. * Description: 服务器向多个客户端推送主题,即不同客户端可向服务器订阅相同主题
  14. * @author chenrl
  15. * 2016年1月6日下午3:29:28
  16. */
  17. public class Server {
  18. public static final String HOST = "tcp://192.168.1.3:61613";
  19. public static final String TOPIC = "toclient/124";
  20. public static final String TOPIC125 = "toclient/125";
  21. private static final String clientid = "server";
  22. private MqttClient client;
  23. private MqttTopic topic;
  24. private MqttTopic topic125;
  25. private String userName = "admin";
  26. private String passWord = "password";
  27. private MqttMessage message;
  28. public Server() throws MqttException {
  29. // MemoryPersistence设置clientid的保存形式,默认为以内存保存
  30. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  31. connect();
  32. }
  33. private void connect() {
  34. MqttConnectOptions options = new MqttConnectOptions();
  35. options.setCleanSession(false);
  36. options.setUserName(userName);
  37. options.setPassword(passWord.toCharArray());
  38. // 设置超时时间
  39. options.setConnectionTimeout(10);
  40. // 设置会话心跳时间
  41. options.setKeepAliveInterval(20);
  42. try {
  43. client.setCallback(new PushCallback());
  44. client.connect(options);
  45. topic = client.getTopic(TOPIC);
  46. topic125 = client.getTopic(TOPIC125);
  47. } catch (Exception e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
  52. MqttException {
  53. MqttDeliveryToken token = topic.publish(message);
  54. token.waitForCompletion();
  55. System.out.println("message is published completely! "
  56. + token.isComplete());
  57. }
  58. public static void main(String[] args) throws MqttException {
  59. Server server = new Server();
  60. server.message = new MqttMessage();
  61. server.message.setQos(2);
  62. server.message.setRetained(true);
  63. server.message.setPayload("给客户端124推送的信息".getBytes());
  64. server.publish(server.topic , server.message);
  65. server.message = new MqttMessage();
  66. server.message.setQos(2);
  67. server.message.setRetained(true);
  68. server.message.setPayload("给客户端125推送的信息".getBytes());
  69. server.publish(server.topic125 , server.message);
  70. System.out.println(server.message.isRetained() + "------ratained状态");
  71. }
  72. }

客户端代码:

  1. package bsit.mqtt.demo.one_way;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.ScheduledExecutorService;
  4. import java.util.concurrent.TimeUnit;
  5. import org.eclipse.paho.client.mqttv3.MqttClient;
  6. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  7. import org.eclipse.paho.client.mqttv3.MqttException;
  8. import org.eclipse.paho.client.mqttv3.MqttSecurityException;
  9. import org.eclipse.paho.client.mqttv3.MqttTopic;
  10. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  11. public class Client {
  12. public static final String HOST = "tcp://192.168.1.3:61613";
  13. public static final String TOPIC = "toclient/124";
  14. private static final String clientid = "client124";
  15. private MqttClient client;
  16. private MqttConnectOptions options;
  17. private String userName = "admin";
  18. private String passWord = "password";
  19. private ScheduledExecutorService scheduler;
  20. private void start() {
  21. try {
  22. // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
  23. client = new MqttClient(HOST, clientid, new MemoryPersistence());
  24. // MQTT的连接设置
  25. options = new MqttConnectOptions();
  26. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  27. options.setCleanSession(true);
  28. // 设置连接的用户名
  29. options.setUserName(userName);
  30. // 设置连接的密码
  31. options.setPassword(passWord.toCharArray());
  32. // 设置超时时间 单位为秒
  33. options.setConnectionTimeout(10);
  34. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
  35. options.setKeepAliveInterval(20);
  36. // 设置回调
  37. client.setCallback(new PushCallback());
  38. MqttTopic topic = client.getTopic(TOPIC);
  39. //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
  40. options.setWill(topic, "close".getBytes(), 2, true);
  41. client.connect(options);
  42. //订阅消息
  43. int[] Qos = {1};
  44. String[] topic1 = {TOPIC};
  45. client.subscribe(topic1, Qos);
  46. } catch (Exception e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. public static void main(String[] args) throws MqttException {
  51. Client client = new Client();
  52. client.start();
  53. }
  54. }

MQTT订阅回调类:

  1. package bsit.mqtt.demo.one_way;
  2. import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  3. import org.eclipse.paho.client.mqttv3.MqttCallback;
  4. import org.eclipse.paho.client.mqttv3.MqttMessage;
  5. /**
  6. * 发布消息的回调类
  7. *
  8. * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
  9. * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。
  10. * 在回调中,将它用来标识已经启动了该回调的哪个实例。
  11. * 必须在回调类中实现三个方法:
  12. *
  13. * public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
  14. *
  15. * public void connectionLost(Throwable cause)在断开连接时调用。
  16. *
  17. * public void deliveryComplete(MqttDeliveryToken token))
  18. * 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
  19. * 由 MqttClient.connect 激活此回调。
  20. *
  21. */
  22. public class PushCallback implements MqttCallback {
  23. public void connectionLost(Throwable cause) {
  24. // 连接丢失后,一般在这里面进行重连
  25. System.out.println("连接断开,可以做重连");
  26. }
  27. public void deliveryComplete(IMqttDeliveryToken token) {
  28. System.out.println("deliveryComplete---------" + token.isComplete());
  29. }
  30. public void messageArrived(String topic, MqttMessage message) throws Exception {
  31. // subscribe后得到的消息会执行到这里面
  32. System.out.println("接收消息主题 : " + topic);
  33. System.out.println("接收消息Qos : " + message.getQos());
  34. System.out.println("接收消息内容 : " + new String(message.getPayload()));
  35. }
  36. }

运行服务端代码,可看到服务器会给客户端124/125各推送一条消息,
这里写图片描述
在运行124客户端代码,可看到124客户端接收的信息:
这里写图片描述
然后把客户端代码的Topic改为TOPIC = “toclient/125”;clientid = “client125”;再运行该段代码,可看到125客户端接收的信息:
这里写图片描述
多个客户端订阅同一主题,其clientid必不相同。客户端124/125订阅各自主题的内容,但是不同时间启动,都在启动后接收到各自信息,这体现出了服务器的推送功能。同样的,发送的主题信息,可以在服务器的topic可以看到,访问路径是:http://127.0.0.1:61680/

其实,如若服务端和客户端相互通信,即客户端可以订阅可以发布,服务端可以订阅也可以发布,则可不区分服务端客户端,两边代码几乎一样。类似,两个客户端都在订阅同一主题,这时由第三个客户端发布这一主题的请求,前两个客户端同样可以接受该主题的内容,这时三个客户端的代码几乎一样,只是前两个是订阅,后一个是发布。

 

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号