赞
踩
需求:获取设备通过mqtt协议发送过来的数据并将数据保存到外部服务中,期间需要使用EMQX代理服务器将消息转发到自有的Web服务中
实现:通过EMQX中的webhook实现消息转发
官方:https://www.emqx.io/docs/zh/v5.0/data-integration/data-bridge-webhook.html
http://localhost:18083/
(localhost
根据实际IP地址修改)bin
目录./emqx/bin/emqx stop
本次Web服务器使用Python Flask进行搭建
app.py
中完成本次将 mqtt客户端 和 接收EMQX转发的消息数据 都写在该Flask项目中
import json from flask import request, jsonify, Flask, Blueprint, render_template, session, current_app from flask_mqtt import Mqtt from werkzeug.local import LocalProxy app = Flask(__name__) # 代理地址(根据实际使用的IP地址进行修改,需要和EMQX处于同一地址) app.config['MQTT_BROKER_URL'] = '127.0.0.1' # 端口 app.config['MQTT_BROKER_PORT'] = 1883 # 当需要验证用户名和密码时,设置该项(根据实际情况设定) # app.config['MQTT_USERNAME'] = 'user' # 当需要验证用户名和密码时,设置该项(根据实际情况设定) # app.config['MQTT_PASSWORD'] = '123456' # 设置心跳时间,单位为秒 app.config['MQTT_KEEPALIVE'] = 60 # 如果服务器支持 TLS,则设置为 True app.config['MQTT_TLS_ENABLED'] = False # 主题(根据实际情况设定) topic = 't/1' # 实例化 mqtt_client = Mqtt(app) @app.route('/') def index(): # 初始路由 return render_template('index.html') @mqtt_client.on_connect() def handle_connect(client, userdata, flags, rc): """连接回调函数""" if rc == 0: print('Connected successfully') # 订阅主题 mqtt_client.subscribe(topic) else: # 连接失败 print('Bad connection. Code:', rc) @mqtt_client.on_message() def handle_mqtt_message(client, userdata, message): """ 消息回调函数 """ # 定义接受到的消息 data = dict( # 主题 topic=message.topic, # 内容 payload=message.payload.decode() ) print(data) # 打印输出接收到的消息 print('Received message on topic: {topic} with payload: {payload}'.format(**data)) @app.route('/publish', methods=['POST']) def publish_message(): """ 消息发布接口(实际应用中,该接口可能需要处理一些复杂业务逻辑) """ # 获取前端页面提交的数据,并格式化 request_data = request.get_json() # print("接收到的数据", request_data) # 发布消息 publish_result = mqtt_client.publish(request_data['topic'], request_data['payload']) # 返回JSON数据 return jsonify({'code': publish_result[0]}) @app.route('/emqx', methods=['POST']) def test_emqx_conn(): """ 测试 搭建简易EMQX HTTP服务(用于接收EMQX转发过来的消息) 在后面的 webhook数据桥接 创建中,URL填写为:http://127.0.0.1:5000/emqx """ # 响应 reply = {"result": "ok", "message": "success"} print("got post request: ", request.get_data()) return json.dumps(reply), 200 if __name__ == '__main__': app.debug = True app.run()
webhook
,点击创建SQL编辑器
根据个人实际业务进行修改,修改完成后直接点击更新
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。