当前位置:   article > 正文

物联网数据库 IoTDB —— 从协议到数据

iot 接口怎么从协议上拉取数据

首先,先允许我,祝各位读者小可爱们,节日快乐。

在这个系列之前的文章里,我们介绍了Iotdb的LSM,以及Iot中的最佳实践,这次我们看看如何将mqtt和Iotdb整合起来。下面我们开始:

iotdb in docker

首先,做一个测试环境,我现在越发喜欢docker 和 WSL 了,除了吃点硬盘,内存和CPU资源以外,没有什么缺点了......

run in docker

直接把该开的端口都打开,只是测试环境,我就没再挂目录。

docker run -d -p 6667:6667 -p 31999:31999 -p 8181:8181 -p 5555:5555 -p 1883:1883 apache/iotdb

等待一会,执行 docker ps 查看是否成功了

  1. ➜ ~ docker ps
  2. CONTAINER ID   IMAGE         COMMAND                 CREATED       STATUS       PORTS                                                                                                                                                                                                                       NAMES
  3. ad9b18f8bff3   apache/iotdb   "/iotdb/sbin/start-s…"   2 hours ago   Up 2 hours    0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 0.0.0.0:6667->6667/tcp, :::6667->6667/tcp, 0.0.0.0:8181->8181/tcp, :::

初步的iotdb in docker 环境,我们就搞好了。接下来,开启mqtt服务。

开启 Mqtt 服务

进入iotdb的docker docker exec -it ad9b18f8bff3 /bin/bash

编辑配置文件vi iotdb/conf/iotdb-engine.properties

开启服务,根据自己的需要,配置ip和端口等。

  1. ####################
  2. ### MQTT Broker Configuration
  3. ####################
  4. # whether to enable the mqtt service.
  5. enable_mqtt_service=false   # 修改成 true , 代表开启 mqtt服务
  6. # the mqtt service binding host.
  7. mqtt_host=0.0.0.0 # ip
  8. # the mqtt service binding port.
  9. mqtt_port=1883  # 端口
  10. # the handler pool size for handing the mqtt messages.
  11. mqtt_handler_pool_size=1
  12. # the mqtt message payload formatter.
  13. mqtt_payload_formatter=json  # 数据格式
  14. # max length of mqtt message in byte
  15. mqtt_max_message_size=1048576

重启服务,如果不会,就重启docker镜像。

iotdb 基础操作

  • 启动服务: sbin/start-client.sh

    1. root@ad9b18f8bff3:/iotdb/sbin# ./start-cli.sh
    2. ---------------------
    3. Starting IoTDB Cli
    4. ---------------------
    5. _____       _________ ______   ______
    6. |_   _|     | _   _ ||_   _ `.|_   _ \
    7. | |   .--.|_/ | | \_| | | `. \ | |_) |
    8. | | / .'`\ \ | |     | | | | | __'.
    9. _| |_| \__. | _| |_   _| |_.' /_| |__) |
    10. |_____|'.__.' |_____| |______.'|_______/ version 0.11.1
    11. IoTDB> login successfully
  • 退出CLI: quitexit

  • 停止服务:$sbin/stop-server.sh

  • 设置一个存储组到IOTDB,名为root  :  IoTDB> SET STORAGE GROUP TO root

  • 查看当前IOTDB的存储组 : IoTDB> SHOW STORAGE GROUP

    1. IoTDB> SHOW STORAGE GROUP
    2. +-------------+
    3. |storage group|
    4. +-------------+
    5. |   root.test|
    6. +-------------+
    7. Total line number = 1
    8. It costs 0.127s
  • 查看系统中存在的所有时间序列 :IoTDB> SHOW TIMESERIES

    1. IoTDB> show timeseries
    2. +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    3. |                     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
    4. +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    5. |root.test.wf01.wt01.temperature| null|   root.test|   FLOAT| GORILLA|     SNAPPY|null|     null|
    6. |     root.test.wf01.wt01.status| null|   root.test| BOOLEAN|     RLE|     SNAPPY|null|     null|
    7. |   root.test.wf01.wt01.hardware| null|   root.test|   TEXT|   PLAIN|     SNAPPY|null|     null|
    8. +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    9. Total line number = 3
    10. It costs 0.009s
  • 查看系统中存在的特定时间序列: SHOW TIMESERIES root.test.wf01.wt01.status

    1. IoTDB> SHOW TIMESERIES root.test.wf01.wt01.status
    2. +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    3. |               timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
    4. +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    5. |root.test.wf01.wt01.status| null|   root.test| BOOLEAN|     RLE|     SNAPPY|null|     null|
    6. +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    7. Total line number = 1
    8. It costs 0.003s
  • 插入数据 INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)

    1. IoTDB> INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)
    2. Msg: The statement is executed successfully.
  • 查看数据: select * from root.test;

    1. IoTDB> select * from root.test;
    2. +------------------------+-------------------------------+--------------------------+----------------------------+
    3. |                   Time|root.test.wf01.wt01.temperature|root.test.wf01.wt01.status|root.test.wf01.wt01.hardware|
    4. +------------------------+-------------------------------+--------------------------+----------------------------+
    5. |2021-01-20T02:00:00.000Z|                           21.2|                     true|                       hello|
    6. +------------------------+-------------------------------+--------------------------+----------------------------+
    7. Total line number = 1
    8. It costs 0.077s
  • 查看设备:show devices

    1. IoTDB> show devices
    2. +-------------------+
    3. |           devices|
    4. +-------------------+
    5. |root.test.wf01.wt01|
    6. +-------------------+
    7. Total line number = 1
    8. It costs 0.002s

mqtt to iotdb

代码

构建一个实体对象,用于存储

  1. package wang.datahub.iotdb;
  2. import com.google.gson.Gson;
  3. import java.util.List;
  4. public class IotdbVO {
  5.    private String device;
  6.    private long timestamp = System.currentTimeMillis();
  7.    private List<String> measurements;
  8.    private List<Object> values;
  9.    public String getDevice() {
  10.        return device;
  11.   }
  12.    public void setDevice(String device) {
  13.        this.device = device;
  14.   }
  15.    public long getTimestamp() {
  16.        return timestamp;
  17.   }
  18.    public void setTimestamp(long timestamp) {
  19.        this.timestamp = timestamp;
  20.   }
  21.    public List<String> getMeasurements() {
  22.        return measurements;
  23.   }
  24.    public void setMeasurements(List<String> measurements) {
  25.        this.measurements = measurements;
  26.   }
  27.    public List<Object> getValues() {
  28.        return values;
  29.   }
  30.    public void setValues(List<Object> values) {
  31.        this.values = values;
  32.   }
  33.    public String toJson(){
  34.        Gson g = new Gson();
  35.        String jsonData = g.toJson(this);
  36.        return jsonData;
  37.   }
  38.    @Override
  39.    public String toString() {
  40.        return "IotdbVO{" +
  41.                "device='" + device + '\'' +
  42.                ", timestamp=" + timestamp +
  43.                ", measurements=" + measurements +
  44.                ", values=" + values +
  45.                '}';
  46.   }
  47. }

使用祖传的代码来模拟数据发射到iotdb,这里直接将mqtt的主机和端口,配置到前文所修改的iotdb的mqtt服务上,就大功告成了。

  1. package wang.datahub.iotdb;
  2. import org.fusesource.mqtt.client.BlockingConnection;
  3. import org.fusesource.mqtt.client.MQTT;
  4. import org.fusesource.mqtt.client.QoS;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Random;
  8. public class EmmitToIotdb {
  9.    public static void main(String[] args) {
  10.        String[] hardwares = new String[]{
  11.                "a1",
  12.                "b1",
  13.                "b2",
  14.                "c3",
  15.                "d1",
  16.                "f5"
  17.       };
  18.        int count = 1000;
  19.              
  20.        for(int i = 0; i < count ;i++){
  21.            IotdbVO iotdbVO = new IotdbVO();
  22.            iotdbVO.setDevice("root.test.wf01.wt01");
  23.            List<String> measurements = new ArrayList<>();
  24.            List<Object> values = new ArrayList<>();
  25.            measurements.add("temperature");
  26.            measurements.add("status");
  27.            measurements.add("hardware");
  28.            Random r = new Random();
  29.            values.add(r.nextInt(40));
  30.            values.add(r.nextBoolean());
  31.            values.add(hardwares[r.nextInt(hardwares.length)]);
  32.            iotdbVO.setMeasurements(measurements);
  33.            iotdbVO.setValues(values);
  34.            emmitToIotdb(iotdbVO);
  35.       }
  36.   }
  37.    public static void emmitToIotdb(IotdbVO content){
  38.        try {
  39.            MQTT mqtt = new MQTT();
  40.            mqtt.setHost("127.0.0.1", 1883);
  41.            mqtt.setUserName("root");
  42.            mqtt.setPassword("root");
  43.            BlockingConnection connection = mqtt.blockingConnection();
  44.            connection.connect();
  45.            String payload = content.toJson();
  46.            connection.publish(content.getDevice(),payload.getBytes(), QoS.AT_LEAST_ONCE,false);
  47.            connection.disconnect();
  48.       } catch (Exception e){
  49.            e.printStackTrace();
  50.       }
  51.   }
  52. }

执行结果

iotdb,功能还是相当强大的,也非常有意思,希望本篇文章对你有所帮助,也非常欢迎您来与我交流。

关注 【 麒思妙想】解锁更多硬核。

历史文章导读

如果文章对您有那么一点点帮助,我将倍感荣幸

欢迎  关注、在看、点赞、转发 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/943045
推荐阅读
相关标签
  

闽ICP备14008679号