当前位置:   article > 正文

物联网项目:充电桩项目实战~

github 充电桩

你好,我是田哥

最近除了忙于面试辅导、模拟面试以外,还在搞一件大事:充电桩项目。

分布式微服务项目实战:充电桩项目

充电桩项目肯定是和物联网相关的,聊到物联网又不得不聊的是MQTT协议

什么是MQTT

MQTT,全称Message Queuing Telemetry Transport,即消息MQTT,即消息队列遥测传输,是一种基于客户端-服务器的消息发布/订阅传输协议。这种协议的设计思想是轻量、开放、简单和规范,因此易于实现。

MQTT协议的这些特点使它在很多情况下都非常适用,特别是在受限的环境中,例如机器与机器(M2M)通信和物联网(IoT)。此外,对于需要通过带宽有限的资源受限网络进行数据传输的设备,如智能传感器、可穿戴设备等物联网(IoT)设备,使用MQTT进行数据传输是非常适合的。这主要是因为MQTT拥有简单紧凑的架构和较小的代码占用空间,适用于低成本、低功耗的IoT微控制设备。

总的来说,MQTT协议是一种轻量级、易于实现且适用范围广泛的通信协议,特别适用于物联网设备的数据传输

说明MQTT只是一种协议,既然是协议那就得有实现。

实现MQTT协议的第三方框架主要包括以下几个:

  1. Paho MQTT C库:这是一个用C语言实现的开源MQTT客户端库,主要用于在Linux环境下进行MQTT协议的实现。

  2. EMQX:这是一个基于Erlang/OTP平台开发的开源物联网MQTT消息服务器,具有出色的软实时、低延时和分布式的特性。

  3. Qt MQTT类库:Qt官方提供了两种开发MQTT程序的方式,一种是Qt官方提供的基于MQTT的封装,另一种是第三方(EMQ)开发的用于Qt调用MQTT的接口。

以上这些框架都可以用来实现MQTT协议,开发者可以根据实际需求选择适合自己的框架,本项目采用的是EMQX。

什么是EMQX

EMQX 是一款开源的大规模分布式 MQTT 消息服务器,功能丰富,专为物联网和实时通信应用而设计。EMQX 5.0 单集群支持 MQTT 并发连接数高达 1 亿条,单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息,同时保证毫秒级的低时延。

EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。

内置基于 SQL 的规则引擎,EMQX 可以实时提取、过滤、丰富和转换物联网数据。此外,EMQX 采用了无主分布式架构,以确保高可用性和水平扩展性,并提供操作友好的用户体验和出色的可观测性。

EMQX 拥有来自 50 多个国家的 20,000 多家企业用户,连接全球超过 1 亿台物联网设备,服务企业数字化、实时化、智能化转型。

acd9eb96fa6625053f7fa8903923fee6.png
architecture_image

MQTT 发布/订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

  • 发布者(Publisher)

    负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。

  • 订阅者(Subscriber)

    订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。

  • 代理(Broker)

    负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。

  • 主题(Topic)

    主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

    MQTT 还支持订阅者使用主题通配符一次订阅多个主题。

EMQX 的优势

超大规模:EMQX 5.0 单集群可支持 MQTT 并发连接数高达 1 亿条。

高性能:单服务器的传输与处理吞吐量可达每秒百万级 MQTT 消息。

低延时:近乎实时的信息传递,保证延迟在亚毫秒级。

全面支持 MQTT 5.0 标准:100% 符合 MQTT 5.0 和 3.x 标准,具有更好的可扩展性、安全性和可靠性。

高可用:通过无主节点分布式架构实现高可用和水平扩展性。

云原生:通过 Kubernetes Operator 和 Terraform,可以轻松地在企业内部和公共云中进行部署。

项目集成

本文的开发环境为:

  • 构建工具:Maven

  • IDE:IntelliJ IDEA

  • Java 版本:JDK 8+

添加以下依赖到项目 pom.xml 文件中。

  1. <dependencies>
  2.    <dependency>
  3.        <groupId>org.eclipse.paho</groupId>
  4.        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
  5.        <version>1.2.5</version>
  6.    </dependency>
  7. </dependencies>

下面是发布和订阅,

本文将使用 EMQX 提供的 免费公共 MQTT 服务器 创建。服务器接入信息如下:

  • Broker: broker.emqx.io(中国用户可以使用 broker-cn.emqx.io

  • TCP Port: 1883

  • SSL/TLS Port: 8883

也可以自己下载一个安装,并使用自己的。

下载地址:https://www.emqx.io/zh/downloads

f5a837989ef1d76ac31e81a9cec61636.png
发布

发布代码实现:

  1. import org.eclipse.paho.client.mqttv3.MqttClient;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.eclipse.paho.client.mqttv3.MqttException;
  4. import org.eclipse.paho.client.mqttv3.MqttMessage;
  5. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  6. /**
  7.  * @author tianwc  公众号:java后端技术全栈、面试专栏
  8.  * @version 1.0.0
  9.  * 2023年11月15日 10:01
  10.  * 在线刷题 1200+题和1000+篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
  11.  * 发布
  12.  */
  13. public class PublishSample {
  14.     public static void main(String[] args) {
  15.         String broker = "tcp://broker.emqx.io:1883";
  16.         String topic = "mqtt/test";
  17.         String username = "emqx";
  18.         String password = "public";
  19.         String clientid = "publish_client";
  20.         String content = "你好,MQTT";
  21.         int qos = 0;
  22.         try {
  23.             MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
  24.             // 连接参数
  25.             MqttConnectOptions options = new MqttConnectOptions();
  26.             // 设置用户名和密码
  27.             options.setUserName(username);
  28.             options.setPassword(password.toCharArray());
  29.             options.setConnectionTimeout(60);
  30.             options.setKeepAliveInterval(60);
  31.             // 连接
  32.             client.connect(options);
  33.             // 创建消息并设置 QoS
  34.             MqttMessage message = new MqttMessage(content.getBytes());
  35.             message.setQos(qos);
  36.             // 发布消息
  37.             client.publish(topic, message);
  38.             System.out.println("send content: " + content);
  39.             System.out.println("topic: " + topic);
  40.             System.out.println("Message published");
  41.             // 关闭连接
  42.             client.disconnect();
  43.             // 关闭客户端
  44.             client.close();
  45.         } catch (MqttException e) {
  46.             throw new RuntimeException(e);
  47.         }
  48.     }
  49. }

接着我们看看订阅实现。

  1. import org.eclipse.paho.client.mqttv3.*;
  2. import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  3. /**
  4.  * @author tianwc  公众号:java后端技术全栈、面试专栏
  5.  * @version 1.0.0
  6.  * 2023年11月15日 10:01
  7.  * 在线刷题 1200+题和1000+篇干货文章:<a href="https://woaijava.cc/">博客地址</a>
  8.  * 订阅
  9.  */
  10. public class SubscribeSample {
  11.     public static void main(String[] args) {
  12.         String broker = "tcp://broker.emqx.io:1883";
  13.         String topic = "mqtt/test";
  14.         String username = "emqx";
  15.         String password = "public";
  16.         String clientid = "subscribe_client";
  17.         int qos = 0;
  18.         try {
  19.             MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
  20.             // 连接参数
  21.             MqttConnectOptions options = new MqttConnectOptions();
  22.             options.setUserName(username);
  23.             options.setPassword(password.toCharArray());
  24.             options.setConnectionTimeout(60);
  25.             options.setKeepAliveInterval(60);
  26.             // 设置回调
  27.             client.setCallback(new MqttCallback() {
  28.                 public void connectionLost(Throwable cause) {
  29.                     System.out.println("connectionLost: " + cause.getMessage());
  30.                 }
  31.                 public void messageArrived(String topic, MqttMessage message) {
  32.                     System.out.println("topic: " + topic);
  33.                     System.out.println("Qos: " + message.getQos());
  34.                     System.out.println("received content: " + new String(message.getPayload()));
  35.                 }
  36.                 public void deliveryComplete(IMqttDeliveryToken token) {
  37.                     System.out.println("deliveryComplete---------" + token.isComplete());
  38.                 }
  39.             });
  40.             client.connect(options);
  41.             client.subscribe(topic, qos);
  42.         } catch (Exception e) {
  43.             e.printStackTrace();
  44.         }
  45.     }
  46. }

我们先启动订阅,让它等待发布,接着启动发布。

简单的图一个图,让大家更好的理解:

e867135c0c21c1280634d1cedba3854e.png

发布控制台输出:

  1. send content: 你好,MQTT
  2. topic: mqtt/test
  3. Message published

最后订阅控制台输出:

627f797c17c339d0cd2b04fec40ecad7.png

.到这里我们的demo案例就搞定了。具体在充电桩中的应用情况充电桩源码:

源码地址:https://gitee.com/trsunmu/charge-station-single

好了,今天就分享到这里。关于充电桩的文档(属于学习圈子内容)已更新到第十三篇了。

2b143ab204d1a7fd946c4791f2710321.png

充电桩项目文档还在持续更新中。

需要加入学习圈子的请加我weixin:tj20120622,非诚勿扰~感谢。

Java面经小程序版

回复77获取面试小抄2.0版

回复电子书获取Java程序员必备电子书籍

推荐

MySQL 开发规范,非常详细,建议收藏!

16k面试中的10个问题

从0开始搭建公司技术栈,yyds

简历写成这样,CTO会主动联系你

全程面试辅导,保驾护航!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/158586
推荐阅读
相关标签
  

闽ICP备14008679号