赞
踩
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT 消息服务器。
Erlang/OTP是出色的软实时 (Soft-Realtime)、低延时 (Low-Latency)、分布式 (Distributed)的语言平台。
MQTT 是轻量的 (Lightweight)、发布订阅模式 (PubSub) 的物联网消息协议。
EMQ X 设计目标是实现高可靠,并支持承载海量物联网终端的MQTT连接,支持在海量物联网设备间低延时消息路由:
稳定承载大规模的 MQTT 客户端连接,单服务器节点支持50万到100万连接。
分布式节点集群,快速低延时的消息路由,单集群支持1000万规模的路由。
消息服务器内扩展,支持定制多种认证方式、高效存储消息到后端数据库。
完整物联网协议支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有协议支持
文档地址 https://docs.emqx.cn/cn/broker/latest/
wget https://www.emqx.io/cn/downloads/broker/v4.2.5/emqx-centos7-4.2.5-x86_64.zip
unzip emqx-centos7-4.2.5-x86_64.zip
./bin/emqx start
https://docs.emqx.cn/cn/broker/latest/development/java.html
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
(2)Consumer
@Slf4j @Configuration @IntegrationComponentScan public class MqttReceiveConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.default.completionTimeout}") private int completionTimeout;//连接超时 //初始化连接 @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(50); return mqttConnectOptions; } //初始化mqtt工厂 @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } //接收通道 @Primary @Bean("mqttInputChannel") public MessageChannel mqttInputChannel() { return new DirectChannel(); } //配置client,监听的topic @Bean public MessageProducer inbound(@Qualifier("mqttInputChannel") MessageChannel messageChannel) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId + "_inbound", mqttClientFactory(),defaultTopic); //这里的defaultTopic是发布者的主题 adapter.setCompletionTimeout(completionTimeout); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(messageChannel); return adapter; } //订阅消费数据,通过通道获取数据 @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println(message); log.info("主题:{},消息接收到的数据:{}", message.getHeaders().get(MqttHeaders.TOPIC), message.getPayload()); } }; } }
(3)Publish
@Slf4j @Configuration @IntegrationComponentScan public class MqttSenderConfig { @Value("${spring.mqtt.username}") private String username; @Value("${spring.mqtt.password}") private String password; @Value("${spring.mqtt.url}") private String hostUrl; @Value("${spring.mqtt.client.id}") private String clientId; @Value("${spring.mqtt.default.topic}") private String defaultTopic; @Value("${spring.mqtt.default.completionTimeout}") private int completionTimeout; @Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setConnectionTimeout(10); mqttConnectOptions.setKeepAliveInterval(90); // mqttConnectOptions.setAutomaticReconnect(true); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(defaultTopic); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
@RestController
public class MessageController {
@Autowired
MqttGateway mqttGateway;
/***
* 发布消息,用于其他客户端消息接收测试
*/
@GetMapping("/sendMqttMessage")
public String sendMqttMessage(@RequestParam("message") String message,@RequestParam("topic") String topic) {
mqttGateway.sendToMqtt(message, topic);
return "ok";
}
}
spring.mqtt.username = root
spring.mqtt.password = O77inm79
spring.mqtt.url = tcp://127.0.0.1:1883
spring.mqtt.client.id = 127.0.0.1
spring.mqtt.default.topic = my_mqtt_topic
spring.mqtt.default.completionTimeout = 3000
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。