当前位置:   article > 正文

使用HiveMQ实现Android MQTT

使用HiveMQ实现Android MQTT

MQTT官网:https://mqtt.org/

百度Android MQTT,或者B站上搜索,发现大多使用https://github.com/eclipse/paho.mqtt.android,这是Eclipse的一个Android MQTT客户端实现库,但是我发现这个库在运行到高版本的手机上时报错了,这个库也是N年没有更新的了,而且这个库不支持MQTT5.0的,所以我找了新的库。

在查看MQTT官网的时候,发现关于MQTT的很多介绍是链接到了HiveMQ上面的,不知道它们是什么关系,我发现HiveMQ即有提供MQTT的服务器端,也有提供客户端,而且官方都给他跳转了,那我就用它的库来实现吧!使用了之后才发现,这个库是真的好用啊,封装的非常好,代码写起来特别简洁,响应式编程,支持异步,可以使用Java自带的,也可以使用RxJava或Reactor,HiveMQ的断线自动重连做的也比较好。库地址:https://github.com/hivemq/hivemq-mqtt-client,这个库没有限定Android,所以在普通的Java项目中也是可以使用的。android示例代码如下:

  1. 添加依赖

    implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
    
    • 1
  2. 在编译运行的时候,这个库会报错(如果是用在普通的java项目中的话是不会报错的),在gradle的android节点中添加如下设置:

    android {
        。。。其它代码
    
        packaging {
            resources.excludes.add("META-INF/*")
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
  3. 权限声明

    <uses-permission android:name="android.permission.INTERNET"/>
    
    • 1

    可以看到,相比paho.mqtt.android,HiveMQ的只需要声明一个互联网权限。

  4. 界面UI
    在这里插入图片描述

  5. MQTT实现代码:Mqtt.kt

    object Mqtt {
    
        private const val clientId = "9527"
        private const val host = "192.168.1.188"
        private const val port = 1883
        private const val topic = "message/topic"
    
        private var client: Mqtt3AsyncClient? = null
    
        private fun createMqttClient(): Mqtt3AsyncClient =
            Mqtt3Client.builder()
                .identifier(clientId)
                .serverHost(host)
                .serverPort(port)
    
                // 认证设置
                /*.simpleAuth()
                .username("admin")
                .password("password".toByteArray())
                .applySimpleAuth()*/
    
                // 重连设置
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS) // 断线1秒后开始自动重连,如果重连还失败,则下次会等时间会按指数增长,比如2秒、4秒、8秒,双倍增长等待时间,但是不会超过最大值,由maxDelay函数来指定最大值。
                .maxDelay(32, TimeUnit.SECONDS)    // 断线后最多32秒就会自动重连,第5次连会来到32的位置,前面4次已用掉31秒的等待时间了。
                .applyAutomaticReconnect()
    
                // 连接状态监听器设置
                .addConnectedListener {
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接成功")
                }
                .addDisconnectedListener {
                    // 客户端断开连接,或者连接失败都会回调这里
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接断开:${it.cause.message},连接状态:${it.clientConfig.state.name}")
                    /*when (it.clientConfig.state) {
                        MqttClientState.CONNECTING -> println("手动连接失败")             // 即主动调用connect时没连接成功
                        MqttClientState.CONNECTING_RECONNECT -> println("自动重连失败")   // 即连接成功后异常断开自动重连时连接失败
                        MqttClientState.CONNECTED -> println("连接正常断开或异常断开")
                        else -> println("连接断开:${it.clientConfig.state.name}")
                    }*/
                }
                .buildAsync()
    
                // 消息监听器设置
                .also {
                    // 接收订阅的消息。publishes必须在subscribe之前调用以确保消息不会丢失,可以在connect之前调用它以便接收前一个会话的消息。
                    it.publishes(MqttGlobalPublishFilter.ALL) { publish: Mqtt3Publish ->
                        println("收到${publish.topic}的消息:${String(publish.payloadAsBytes)}")
                    }
                }
    
        fun connect() {
            disconnect()
            // 断开连接后的client没法再复用,复用的client重新再连接时会收不到离线时的消息。所以每次连接时创建一个新的client。
            val client = createMqttClient().also { this.client = it }
            client.connectWith()
                .cleanSession(false) // false为持久会话,这样离线再上线时还能收到离线时别人推送的消息。
                .keepAlive(60)  // 心跳时间间隔,单位为秒
                .send()
                .whenComplete { ack, e ->
                    if (ack != null) {
                        // 连接成功之后订阅主题
                        println("手动连接成功:$ack")
                        client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
                    } else if (e != null) {
                        println("手动连接失败: ${e.message}")
                    }
                }
        }
    
        fun disconnect() {
            client?.let {
                it.disconnect().thenAccept { println("手动断开了连接") }
                client = null
            }
        }
    
        fun subscribe(topic: String = this.topic, qos: MqttQos = MqttQos.EXACTLY_ONCE) {
            val client = this.client ?: return
            client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
        }
    
        fun unsubscribe(topic: String = this.topic) {
            val client = this.client ?: return
            client.unsubscribeWith().topicFilter(topic).send()
        }
    
        fun publish(message: String, topic: String = this.topic,  qos: MqttQos = MqttQos.EXACTLY_ONCE, retain: Boolean = false) {
            val client = this.client ?: return
            client
                .publishWith()
                .topic(topic)
                .qos(qos)
                .retain(retain)
                .payload(message.toByteArray())
                .send()
        }
    
        fun clearRetainMessage(topic: String = this.topic) {
            val client = this.client ?: return
            // 发送一条空的retain消息即可清除retain消息
            client.publishWith().topic(topic).retain(true).send()
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
  6. Activity调用Mqtt相关功能:MainActivity.kt

    class MainActivity : AppCompatActivity() {
    
        private val testTopic = "topic/hello"
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
        }
    
        fun connect(view: View) = Mqtt.connect()
        fun disconnect(view: View) = Mqtt.disconnect()
        fun subscribe(view: View) = Mqtt.subscribe(testTopic)
        fun unsubscribe(view: View) = Mqtt.unsubscribe(testTopic)
        fun publish(view: View) = Mqtt.publish("How are you?", testTopic)
        fun publishRetain(view: View) = Mqtt.publish("I'm a retain message!", testTopic, retain = true)
        fun clearRetain(view: View) = Mqtt.clearRetainMessage(testTopic)
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    示例完整代码:https://gitee.com/daizhufei/HelloMQTT

    这里MQTT的服务器端我使用的是ActivityMQ,它目前的ActiveMQ Classic最新版本是6.0.1,只支持MQTT3.13.1.1ActiveMQ Artemis的最新版本是2.32.0,支持支持MQTT3.13.1.15.0,官方说明如下:

    随着时间的推移,它支持的版本可能会发生变化。这里截图如下:
    在这里插入图片描述
    在这里插入图片描述

MQTT其他知识总结:

  • 关于MQTT的基础知识,可查看:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • clientId (也叫identifier), 连接的时候用到这个参数,此参数应该唯一,可以使用账号做为clientId,比如我们公司的需求是先使用账号密码登录一个Web接口,然后再连接MQTT,但是这个账号是可以在多台设备上同时登录的,所以使用直接使用账号的话会出现重复,可以为每台设备生成一个唯一标识然后持久化存储,然后clientId就可以使用账号和唯一标识组合一起使用。比如:username + uuid,而uuid是持久化保存的。
  • cleanSession , 连接的时候用到这个参数,持久会话设置,如果设置为false,则是持久的,true为非持久,持久的意思是当设备离线后,如果有消息推过来,上线时还能收到。如果是非持久的则收不到离线时的消息。
  • keepAlive,连接的时候用到这个参数,用于设置发送心跳的时间间隔,在客户端和服务器没有交流时,在指定的keepAlive时间到达后会发送心跳给服务器,以证明客户端还活着(也就是说确保连接是正常的)。MQTT是使用TCP的,虽然理论上TCP/IP会在套接字中断时通知您,但在实践中,特别是在移动和卫星链路等情况下,它们经常在空中“伪造”TCP并在每一端放回报头,TCP会话很可能会“黑洞”,即它看起来仍然打开,但只是将您写入的任何内容倾倒到地板上,内容来自:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • qos 订阅主题和推送消息的时候用到这个参数,有0、1、2,2是最好的质量,现在的手机网络已经比以前好太多了,所以我就选最好的质量了,质量越好需要的流量越多,但是现在的流量已经不像从前那样贵了,所以无所谓。
  • retain 推送消息的时候用到这个参数,如果设置为true,则推送的这条消息会在客户端每次连接时都接收到,比如连接了收到了,然后断开连接,然后再连接上,还是能收到这条消息。如果需要清除这样的消息,则往这个主题再发送一条空的retain为true的消息即可,这样客户端在连接成功时就不会再收到这条消息了。
  • lastWill 简称遗嘱,遗嘱是一条普通的消息,设置遗嘱后,当客户端异常断开时,服务器会给遗嘱指定的主题推送这条消息。一开始我在想,客户端都断开了,怎么推送消息的啊,这是在客户端上线的时候把遗嘱先传给服务器了,所以客户端断线,则服务器来推送这条消息。如果是正常的断开连接,则服务器不会推送遗嘱消息。
  • MQTT测试客户端:MQTTX,目前感觉这个测试的客户端是非常好用的,而且这个网站上也有很多关于MQTT的教程,值得花点时间去学习。需要注意的是,新建连接是默认选的MQTT5.0协议,需要选择服务器支持的协议。该客户端的一个非常好用的是可以创建多个连接,比如我可以创建多个连接来连接同一个MQTT服务器,这样一台设置就可以测试MQTT的消息发送和接收了,而不需要找多台电脑或手机,一台电脑就能当成多个MQTT客户端,方便测试。
  • MQTT入门:https://www.hivemq.com/mqtt/

关于ActiveMQ服务器:

  • 服务器默认是启用了MQTT的,也就是说安装好ActiveMQ之后不需要配置就可以使用客户端来进行MQTT的连接了
  • 默认没有启用MQTT认证,也就是MQTT客户端连接时不需要提供用户名和密码就能连接
  • 默认的MQTT端口是1883
  • 目前的最新版本 ActiveMQ Classic 6.0.1支持 MQTT v3.1v3.1.1
  • 目前的最新版本 ActiveMQ Artemis 2.32.0支持 MQTT v3.1v3.1.1v5.0

遇到的问题:

每个客户端设置的identifier(也就是clientId)不能一样,否则当这两个一样的客户端都登录时,就会不断的自动断线,如果我们又设置有自动重连的话,断线后它就会自动重连,且能立马连接成功,但是马上又会断线,然后又重连,一直这样重复。断线时报的错误信息如下:

Server closed connection without DISCONNECT.
  • 1

这个信息说明是服务器端那边关闭的连接,而且服务器没有发送DISCONNECT消息给客户端就直接关掉了连接,因为你有两个identifier一样的客户端,所以我猜想是当第二个客户端登录的时候,服务器就会关闭掉第一个客户端,以便让第二个客户端成功登录,然后被断开的第一个客户端会自动重连,然后服务器又会关闭掉第二个客户端,以让第一个客户端连接成功,然后第二个客户端又自动重连,服务器就又关闭掉第一个客户端,以便让第二个客户端登录成功。。。 之所以会这样,是因为它们的identifier一样,服务器认为这是同一个客户端的重复登录,所以会先断开之前的连接,然后使用新的连接。就这像在两台不同的手机上登录同一个微信账号,一台登上了就会把另一台挤下线。在我的代码中,每个客户端登录使用不一样的identifier,但是依旧出现了这个问题,后来发现原因是我同一个客户端调用了两次connect()函数,原因是我的应用需要先登录一个网页,网页登录成功了我就调用MQTTconnect函数,问题出在网页登录成功后它会调用两次登录成功的回调函数,导致我的connect()函数就跟着被调用了两次,这就像一台手机上进行了微信多开,两个微信都登录同一账号一样。知道了原因之后解决起来也很简单,当MQTT在登录的过程中,我们不允许第二次调用 connect() 即可,示例如下:

object Mqtt {

    。。。
    private var isConnecting = false

	fun connect() {
        if (isConnecting) { // 预防还在连接中又调用连接
            return
        }
        isConnecting = true
        disconnect()
        
        val client = createMqttClient().also { this.client = it }
        client.connectWith()
            .cleanSession(false) // false为持久会话,这样离线再上线时还能收到离线时别人推送的消息。
            .keepAlive(60)  // 心跳时间间隔,单位为秒
            .send()
            .whenComplete { ack, e ->
                isConnecting = false
                if (ack != null) {
                    // 连接成功之后订阅主题
                    Timber.fi("手动连接成功:$ack")
                    client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
                } else if (e != null) {
                    Timber.fi("手动连接失败: ${e.message}")
                }
            }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Cpp五条/article/detail/209384
推荐阅读
相关标签
  

闽ICP备14008679号