赞
踩
# ThgingsBoard
https://iothub.org.cn/docs/iot/
https://iothub.org.cn/docs/iot/dev/dev-rpc/
服务端RPC调用可以分为单向和双向:
单向RPC请求直接发送请求,并且不对设备响应做任何处理
双向RPC请求会发送到设备,并且超时期间内接收到来自设备的响应
{
"method": "getValue",
"params": "{\"pin\":88,\"value\":99}",
"additionalInfo": null
}
{
"deviceName": "Test-rpc",
"deviceType": "default",
"expirationTime": "1696921879114",
"oneway": "false",
"originServiceId": "VM-24-10-centos",
"persistent": "false",
"requestUUID": "2e023965-6e0b-4b28-8523-26380de2aa98"
}
{
"method": "setValue",
"params": "{\"pin\":7,\"value\":1}",
"additionalInfo": null
}
{
"deviceName": "Test-rpc",
"deviceType": "default",
"expirationTime": "1696921942198",
"oneway": "true",
"originServiceId": "VM-24-10-centos",
"persistent": "false",
"requestUUID": "a238826d-0f8d-49dc-8240-d3382c14f9a2"
}
{
"method": "getServerValue",
"params": ""
}
{
"deviceName": "Test-rpc",
"deviceType": "default",
"requestId": "1",
"serviceId": "VM-24-10-centos",
"sessionId": "e8c36497-9601-4df5-b8d6-3755bf6c500c"
}
# POST路径 /api/rpc/oneway/{deviceId} # 设备ID Test-rpc eb9cc990-6711-11ee-afb9-c790163a721a # 发送数据 { "method": "setValue", "params": { "pin": 7, "value": 1 }, "persistent": false, "timeout": 5000 } http://localhost:8999/rpc/oneway
package com.iothub.rest; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.http.*; import org.springframework.web.client.RestTemplate; public class OneRPC { public void sendRpc() throws JsonProcessingException { String baseURL = "http://82.157.166.86:8080"; String token = Login.getToken(); // 地址:/api/rpc/oneway/{deviceId} // 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a String apiUrl = "/api/rpc/oneway/{deviceId}"; String deviceId = "eb9cc990-6711-11ee-afb9-c790163a721a"; HttpHeaders headers = new HttpHeaders(); //headers.add("Authorization", "Bearer "+ mainToken); headers.set("X-Authorization", "Bearer " + token); //headers.set("Content-Type", "application/json"); headers.setContentType(MediaType.APPLICATION_JSON); String body = "{\n" + " \"method\": \"setValue\",\n" + " \"params\": {\n" + " \"pin\": 7,\n" + " \"value\": 1\n" + " },\n" + " \"persistent\": false,\n" + " \"timeout\": 5000\n" + "}"; HttpEntity entity = new HttpEntity<>(body, headers); RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId}); System.out.println(response); if (response.getStatusCodeValue() == 200) { String device = (String) response.getBody(); System.out.println(response.getBody()); } } }
# POST路径 /api/rpc/twoway/{deviceId} # 设备ID Test-rpc eb9cc990-6711-11ee-afb9-c790163a721a # 发送数据 { "method": "getValue", "params": { "pin": 88, "value": 99 }, "persistent": false, "timeout": 5000 } http://localhost:8999/rpc/twoway
# 响应
v1/devices/me/rpc/request/$request_id
v1/devices/me/rpc/response/$request_id
{
"pin": 4,
"value": 1,
"changed": true
}
订阅者订阅到了消息,topic=v1/devices/me/rpc/request/10,messageid=1,qos=1,payload={"method":"getValue","params":{"pin":88,"value":99}}
package com.iothub.rest; import com.fasterxml.jackson.core.JsonProcessingException; import org.springframework.http.*; import org.springframework.web.client.RestTemplate; public class TwoRPC { public void sendRpc() throws JsonProcessingException { String baseURL = "http://82.157.166.86:8080"; String token = Login.getToken(); // 地址:/api/rpc/oneway/{deviceId} // 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a String apiUrl = "/api/rpc/twoway/{deviceId}"; String deviceId = "eb9cc990-6711-11ee-afb9-c790163a721a"; HttpHeaders headers = new HttpHeaders(); //headers.add("Authorization", "Bearer "+ mainToken); headers.set("X-Authorization", "Bearer " + token); //headers.set("Content-Type", "application/json"); headers.setContentType(MediaType.APPLICATION_JSON); String body = "{\n" + " \"method\": \"getValue\",\n" + " \"params\": {\n" + " \"pin\": 88,\n" + " \"value\": 99\n" + " },\n" + " \"persistent\": false,\n" + " \"timeout\": 5000\n" + "}"; HttpEntity entity = new HttpEntity<>(body, headers); RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId}); System.out.println(response); if (response.getStatusCodeValue() == 200) { String device = (String) response.getBody(); System.out.println(response.getBody()); } } }
package com.iothub.rest; import com.fasterxml.jackson.core.JsonProcessingException; import org.springframework.http.*; import org.springframework.web.client.RestTemplate; public class TwoRPC { public void sendRpc() throws JsonProcessingException { String baseURL = "http://82.157.166.86:8080"; String token = Login.getToken(); // 地址:/api/rpc/oneway/{deviceId} // 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a String apiUrl = "/api/rpc/twoway/{deviceId}"; String deviceId = "eb9cc990-6711-11ee-afb9-c790163a721a"; HttpHeaders headers = new HttpHeaders(); //headers.add("Authorization", "Bearer "+ mainToken); headers.set("X-Authorization", "Bearer " + token); //headers.set("Content-Type", "application/json"); headers.setContentType(MediaType.APPLICATION_JSON); String body = "{\n" + " \"method\": \"getValue\",\n" + " \"params\": {\n" + " \"pin\": 88,\n" + " \"value\": 99\n" + " },\n" + " \"persistent\": false,\n" + " \"timeout\": 5000\n" + "}"; HttpEntity entity = new HttpEntity<>(body, headers); RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId}); System.out.println(response); if (response.getStatusCodeValue() == 200) { String device = (String) response.getBody(); System.out.println(response.getBody()); } } }
package com.iothub.rpc; import com.iothub.mqtt.EmqClient; import com.iothub.mqtt.MqttProperties; import com.iothub.mqtt.QosEnum; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class ServerRpc { @Autowired private EmqClient emqClient; @Autowired private MqttProperties properties; @PostConstruct public void init(){ //连接服务端 emqClient.connect(properties.getUsername(),properties.getPassword()); //订阅一个主题 emqClient.subscribe("v1/devices/me/rpc/request/+", QosEnum.QoS1); } }
# MessageCallback /** * 应用收到消息后触发的回调 * @param topic * @param message * @throws Exception */ @Override public void messageArrived(String topic, MqttMessage message) throws Exception { log.info("订阅者订阅到了消息,topic={},messageid={},qos={},payload={}", topic, message.getId(), message.getQos(), new String(message.getPayload())); / // 双向RPC // v1/devices/me/rpc/request/$request_id String[] buff = topic.split("/"); String request_id = buff[buff.length-1]; // 客户端PUBLISH下面主题进行响应: // v1/devices/me/rpc/response/$request_id String retData = "{\n" + " \"pin\": 4,\n" + " \"value\": 1,\n" + " \"changed\": true\n" + "}"; emqClient.publish("v1/devices/me/rpc/response/" + request_id, retData, QosEnum.QoS1,false); }
# POST路径 /api/rpc/twoway/{deviceId} # 设备ID Test-rpc eb9cc990-6711-11ee-afb9-c790163a721a # 发送数据 { "method": "getValue", "params": { "pin": 88, "value": 99 }, "persistent": false, "timeout": 5000 } # 返回结果 { "pin": 4, "value": 1, "changed": true } http://localhost:8999/rpc/twoway
2023-10-10 15:28:30.926 INFO 23200 --- [ emq-client-rpc] com.iothub.mqtt.MessageCallback : 订阅者订阅到了消息,topic=v1/devices/me/rpc/request/32,messageid=5,qos=1,payload={"method":"getValue","params":{"pin":88,"value":99}}
2023-10-10 15:28:30.933 INFO 23200 --- [ emq-client-rpc] com.iothub.mqtt.MessageCallback : 消息发布完成,messageid=6,topics=[v1/devices/me/rpc/response/32]
<200,{"pin":4,"value":1,"changed":true},[Vary:"Origin", "Access-Control-Request-Method", "Access-Control-Request-Headers", X-Content-Type-Options:"nosniff", X-XSS-Protection:"1; mode=block", Cache-Control:"no-cache, no-store, max-age=0, must-revalidate", Pragma:"no-cache", Expires:"0", Content-Type:"application/json", Transfer-Encoding:"chunked", Date:"Tue, 10 Oct 2023 07:28:30 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]>
{"pin":4,"value":1,"changed":true}
package com.iothub.rest; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.http.*; import org.springframework.web.client.RestTemplate; public class OneRPC { public void sendRpc() throws JsonProcessingException { String baseURL = "http://82.157.166.86:8080"; String token = Login.getToken(); // 地址:/api/rpc/oneway/{deviceId} // 设备ID: eb9cc990-6711-11ee-afb9-c790163a721a String apiUrl = "/api/rpc/oneway/{deviceId}"; String deviceId = "eb9cc990-6711-11ee-afb9-c790163a721a"; HttpHeaders headers = new HttpHeaders(); //headers.add("Authorization", "Bearer "+ mainToken); headers.set("X-Authorization", "Bearer " + token); //headers.set("Content-Type", "application/json"); headers.setContentType(MediaType.APPLICATION_JSON); String body = "{\n" + " \"method\": \"setValue\",\n" + " \"params\": {\n" + " \"pin\": 7,\n" + " \"value\": 1\n" + " },\n" + " \"persistent\": false,\n" + " \"timeout\": 5000\n" + "}"; HttpEntity entity = new HttpEntity<>(body, headers); RestTemplate restTemplate = new RestTemplate(); ResponseEntity response = restTemplate.exchange(baseURL + apiUrl, HttpMethod.POST, entity, String.class, new Object[]{deviceId}); System.out.println(response); if (response.getStatusCodeValue() == 200) { String device = (String) response.getBody(); System.out.println(response.getBody()); } } }
package com.iothub.rest; import com.fasterxml.jackson.databind.JsonNode; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; import java.util.HashMap; import java.util.Map; public class Login { public static String getToken(){ String username = "tenant@thingsboard.org"; String password = "tenant"; String baseURL = "http://82.157.166.86:8080"; RestTemplate restTemplate = new RestTemplate(); // 登录 long ts = System.currentTimeMillis(); Map<String, String> loginRequest = new HashMap(); loginRequest.put("username", username); loginRequest.put("password", password); ResponseEntity<JsonNode> tokenInfo1 = restTemplate.postForEntity(baseURL + "/api/auth/login", loginRequest, JsonNode.class, new Object[0]); JsonNode tokenInfo = (JsonNode)tokenInfo1.getBody(); //System.out.println(tokenInfo); // 解析数据 String mainToken = tokenInfo.get("token").asText(); String refreshToken = tokenInfo.get("refreshToken").asText(); //System.out.println("token: " + tokenInfo); //System.out.println("refreshToken: " + refreshToken); return mainToken; } }
同 1.2.设备(MQTT)
# POST路径 /api/rpc/oneway/{deviceId} # 设备ID Test-rpc eb9cc990-6711-11ee-afb9-c790163a721a # 发送数据 { "method": "setValue", "params": { "pin": 7, "value": 1 }, "persistent": false, "timeout": 5000 } http://localhost:8999/rpc/oneway
<200,[Vary:"Origin", "Access-Control-Request-Method", "Access-Control-Request-Headers", X-Content-Type-Options:"nosniff", X-XSS-Protection:"1; mode=block", Cache-Control:"no-cache, no-store, max-age=0, must-revalidate", Pragma:"no-cache", Expires:"0", Content-Length:"0", Date:"Tue, 10 Oct 2023 07:33:01 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]>
null
2023-10-10 15:33:01.593 INFO 23200 --- [ emq-client-rpc] com.iothub.mqtt.MessageCallback : 订阅者订阅到了消息,topic=v1/devices/me/rpc/request/33,messageid=6,qos=1,payload={"method":"setValue","params":{"pin":7,"value":1}}
2023-10-10 15:33:01.600 INFO 23200 --- [ emq-client-rpc] com.iothub.mqtt.MessageCallback : 消息发布完成,messageid=7,topics=[v1/devices/me/rpc/response/33]
{
"deviceName": "Test-rpc",
"deviceType": "default",
"requestId": "1",
"serviceId": "VM-24-10-centos",
"sessionId": "e8c36497-9601-4df5-b8d6-3755bf6c500c"
}
# 按设备类型流转命令
if(msg.deviceType === 'test-telemetry') {
return ['test-telemetry'];
} else {
return ['default'];
}
队列:telemetry-queue、telemetry-default-queue
交换机:telemetry-devicetype-exchange
package com.iothub.rpc; import com.iothub.mqtt.EmqClient; import com.iothub.mqtt.MqttProperties; import com.iothub.mqtt.QosEnum; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class ClientRpc { @Autowired private EmqClient emqClient; @Autowired private MqttProperties properties; @PostConstruct public void init(){ //连接服务端 emqClient.connect(properties.getUsername(),properties.getPassword()); //订阅一个主题 emqClient.subscribe("v1/devices/me/rpc/response/+", QosEnum.QoS1); } @Scheduled(fixedRate = 3000) public void publish(){ String data = getData(); emqClient.publish("v1/devices/me/rpc/request/1",data,QosEnum.QoS1,false); } private String getData(){ String data = "{\n" + "\t\"method\": \"getServerValue\",\n" + "\t\"params\": \"\"\n" + "}"; return data; } }
{
"method": "getServerValue",
"params": "",
"deviceType": "default",
"deviceName": "Test-rpc"
}
# ThgingsBoard
https://iothub.org.cn/docs/iot/
https://iothub.org.cn/docs/iot/dev/dev-rpc/
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。