当前位置:   article > 正文

【EMQX】通过EMQX webhook实现转发消息到Python web服务器_emqx 数据转发

emqx 数据转发

一、前言

需求:获取设备通过mqtt协议发送过来的数据并将数据保存到外部服务中,期间需要使用EMQX代理服务器将消息转发到自有的Web服务中


实现:通过EMQX中的webhook实现消息转发

官方:https://www.emqx.io/docs/zh/v5.0/data-integration/data-bridge-webhook.html
在这里插入图片描述

二、实现

1、EMQX服务器搭建

EMQX下载、安装、启动
  • 到EMQX官网进行下载:https://www.emqx.io/zh/downloads?os=Windows
    EMQX下载安装启动
  • 安装运行完成后,可直接访问 EMQX Dashboard 管理控制台
    • http://localhost:18083/localhost根据实际IP地址修改)
      EMQX服务器控制台
  • 默认用户名及密码
    • admin / public
  • 停止EMQX服务
    • 打开cmd,进入到emqx所在文件夹中的bin目录
    • 输入一下指令
    ./emqx/bin/emqx stop
    
    • 1

2、本地Web服务搭建

本次Web服务器使用Python Flask进行搭建

创建Flask项目

项目结构

  • 相关功能的代码实现直接在app.py中完成
代码

本次将 mqtt客户端接收EMQX转发的消息数据 都写在该Flask项目中

import json

from flask import request, jsonify, Flask, Blueprint, render_template, session, current_app
from flask_mqtt import Mqtt
from werkzeug.local import LocalProxy

app = Flask(__name__)

# 代理地址(根据实际使用的IP地址进行修改,需要和EMQX处于同一地址)
app.config['MQTT_BROKER_URL'] = '127.0.0.1'
# 端口
app.config['MQTT_BROKER_PORT'] = 1883
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_USERNAME'] = 'user'
# 当需要验证用户名和密码时,设置该项(根据实际情况设定)
# app.config['MQTT_PASSWORD'] = '123456'
# 设置心跳时间,单位为秒
app.config['MQTT_KEEPALIVE'] = 60
# 如果服务器支持 TLS,则设置为 True
app.config['MQTT_TLS_ENABLED'] = False
# 主题(根据实际情况设定)
topic = 't/1'
# 实例化
mqtt_client = Mqtt(app)


@app.route('/')
def index():
    # 初始路由
    return render_template('index.html')


@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
    """连接回调函数"""
    if rc == 0:
        print('Connected successfully')
        # 订阅主题
        mqtt_client.subscribe(topic)
    else:
        # 连接失败
        print('Bad connection. Code:', rc)


@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
    """ 消息回调函数 """
    # 定义接受到的消息
    data = dict(
        # 主题
        topic=message.topic,
        # 内容
        payload=message.payload.decode()
    )
    print(data)
    # 打印输出接收到的消息
    print('Received message on topic: {topic} with payload: {payload}'.format(**data))


@app.route('/publish', methods=['POST'])
def publish_message():
    """
    消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑)
    """
    # 获取前端页面提交的数据,并格式化
    request_data = request.get_json()
    # print("接收到的数据", request_data)
    # 发布消息
    publish_result = mqtt_client.publish(request_data['topic'], request_data['payload'])
    # 返回JSON数据
    return jsonify({'code': publish_result[0]})


@app.route('/emqx', methods=['POST'])
def test_emqx_conn():
    """
    测试 搭建简易EMQX HTTP服务(用于接收EMQX转发过来的消息)
    在后面的 webhook数据桥接 创建中,URL填写为:http://127.0.0.1:5000/emqx
    """
    # 响应
    reply = {"result": "ok", "message": "success"}
    print("got post request: ", request.get_data())
    return json.dumps(reply), 200


if __name__ == '__main__':
    app.debug = True
    app.run()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

3、EMQX中创建webhook数据桥接

  • EMQX控制台中找到webhook,点击创建
    在这里插入图片描述
  • 输入数据桥接名称(要求是大小写英文字母和数字的组合)
  • 请求方法选择 POST,
  • URL 为 http://127.0.0.1:5000/emqx(根据实际使用填写)
  • 其他使用默认值
  • 点击最下方保存按钮完成规则创建。
    在这里插入图片描述

4、EMQX中创建数据转发规则

  • 创建好webhook后,会自动根据创建的webhook桥接生成一个规则
  • 直接点击生成的规则中的 设置
    在这里插入图片描述
  • SQL编辑器根据个人实际业务进行修改,修改完成后直接点击更新
    在这里插入图片描述

三、效果

在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/188557
推荐阅读
相关标签
  

闽ICP备14008679号