赞
踩
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT最大优点在于,用极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。
MQTT 是一个 发布订阅线路协议(publish/subscription wire protocol),发布订阅系统工作原理类似于消息总线。你将一条消息发布到一个主题(topic)上,那么所有订阅了该主题的客户端都可以获得该消息的一份拷贝。它提供了一对多的消息分发机制,从而实现与应用程序的解耦。这是一种消息传递模式,消息不是直接从发送器发送到接收器(即点对点),而是由MQTT server(或称为 MQTT Broker)分发的。
对于Broker(MQTT服务器)来说,不论我们是发布方,还是订阅方,都是属于客户端。
HTTP 协议是一个无状态的协议,每个 HTTP 请求为 TCP 短连接,每次请求都需要重新创建一个 TCP 连接(可以通过 keep-alive 属性来优化 TCP 连接的使用,多个 HTTP 请求可以共享该 TCP 连接);而 MQTT 协议为长连接协议,每个客户端都会保持一个长连接。与 HTTP 协议相比优势在于:
MQTT 的长连接可以用于实现从设备端到服务器端的消息传送之外,还可以实现从服务器端到设备端的实时控制消息发送,而 HTTP 协议要实现此功能只能通过轮询的方式,效率相对来说比较低;
MQTT 协议在维护连接的时候会发送心跳包,因此协议以最小代价内置支持设备 “探活” 的功能,而 HTTP 协议要实现此功能的话需要单独发出 HTTP 请求,实现的代价会更高;
低带宽、低功耗。MQTT 在传输报文的大小上与 HTTP 相比有巨大的优势,因为 MQTT 协议在连接建立之后,由于避免了建立连接所需要的额外的资源消耗,发送实际数据的时候报文传输所需带宽与 HTTP 相比有很大的优势,参考网上有人做的测评,发送一样大小的数据,MQTT 比 HTTP 少近 50 倍的网络传输数据,而且速度快了将近 20 倍。在网上有人做的另外一个评测显示,接收消息的场景,MQTT 协议的耗电量为 HTTP 协议的百分之一,而发送数据的时候 MQTT 协议的耗电量为 HTTP 协议的十分之一;
MQTT 提供消息质量控制(QoS),消息质量等级越高,消息交付的质量就越有保障,在物联网的应用场景下,用户可以根据不同的使用场景来设定不同的消息质量等级;
介绍之前需要说明,回调函数是这个包的特点之一,可以通过自己定义回调函数,决定返回的信息的内容、格式或者处理方法等。因此使用该包连接、发布、订阅等操作之前,需要定义并引用回调函数。
函数 | 功能 | callback | callback内容 |
---|---|---|---|
client() | 创建实例 | ||
connect() | 客户端连接broker | on_connect() | broker回应连接请求 |
disconnect() | 客户端取消连接broker | on_disconnect() | broker回应不连接请求 |
publish() | 客户端发布:将消息发送至broker,然后从broker发送到订阅匹配主题的任何客户端 | on_publish() | 当要使用 publish()调用发送的消息已完成向broker的传输时 |
publish.single() | 客户端发布单条信息,然后断开连接 | ||
publish.multiple() | 客户端发布多条信息,然后断开连接 | ||
subscribe() | 客户端订阅 | on_subscribe() | 当broker响应订阅请求时 |
subscribe.simple() | 客户端订阅一组topics并返回收到的消息 | ||
on_message() | 默认返回订阅消息 | ||
message_callback_add() | 定义并处理特定订阅筛选器(包括通配符)的传入消息的callback | ||
message_callback_remove() | 移除特定订阅筛选器的callback | ||
unsubscribe() | 客户端取消订阅 | on_unsubscribe() | 当broker响应取消订阅请求时 |
import paho.mqtt.client as mqtt # 连接成功回调 def on_connect(client, userdata, flags, rc): print('Connected with result code '+str(rc)) client.subscribe('testtopic/#') # 消息接收回调 def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) client = mqtt.Client() # 指定回调函数 client.on_connect = on_connect client.on_message = on_message # 建立连接 client.connect('broker.emqx.io', 1883, 60) # 发布消息 client.publish('emqtt',payload='Hello World',qos=0) client.loop_forever()
import paho.mqtt.client as mqtt # 连接成功回调 def on_connect(client, userdata, flags, rc): print("Connected with result code "+str(rc)) # 消息接收回调,msg就是接收到的消息内容 def on_message(client, userdata, msg): print(msg.topic+" "+str(msg.payload)) # 创建实例 client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message # 将客户端连接到broker,60为间隔 client.connect("mqtt.eclipseprojects.io", 1883, 60) client.subscribe('mqtt_topic', qos=0) # 保持连接不中断 client.loop_forever()
以上示例中,我们均是通过1883端口进行连接并订阅/发布主题,但是对于ssl监听端口,如默认的8883端口,连接的代码则需要部分修改:
client = mqtt.Client('broker.emqx.io', 8883)
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.check_hostname = False
client.tls_set_context(context)
client.connect()
kafka-高产出的分布式消息系统(A high-throughput distributed messaging system)。
Kafka是一个高吞吐、分布式、基于发布订阅的消息系统,利用Kafka技术可以在廉价的PC Server上搭建起大规模消息系统。
(和上面的MQTT很类似,只不过发布端变成了生产者, 订阅端变成了消费者,都通过代理进行消息的传输,相应的消息都在对应topic中)
kafka-python中有以下几个重要的对象:
import json from kafka import KafkaConsumer, KafkaProducer class KProducer: def __init__(self, bootstrap_servers, topic): """ kafka 生产者 :param bootstrap_servers: 地址 :param topic: topic """ self.producer = KafkaProducer( bootstrap_servers=bootstrap_servers, value_serializer=lambda m: json.dumps(m).encode('ascii'), ) # json 格式化发送的内容 self.topic = topic def sync_producer(self, data_li: list): """ 同步发送 数据 :param data_li: 发送数据 :return: """ for data in data_li: future = self.producer.send(self.topic, data) record_metadata = future.get(timeout=10) # 同步确认消费 partition = record_metadata.partition # 数据所在的分区 offset = record_metadata.offset # 数据所在分区的位置 print('save success, partition: {}, offset: {}'.format(partition, offset)) def asyn_producer(self, data_li: list): """ 异步发送数据 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data) self.producer.flush() # 批量提交 def asyn_producer_callback(self, data_li: list): """ 异步发送数据 + 发送状态处理 :param data_li:发送数据 :return: """ for data in data_li: self.producer.send(self.topic, data).add_callback(self.send_success).add_errback(self.send_error) self.producer.flush() # 批量提交 def send_success(self, *args, **kwargs): """异步发送成功回调函数""" print('save success') return def send_error(self, *args, **kwargs): """异步发送错误回调函数""" print('save error') return def close_producer(self): try: self.producer.close() except: pass if __name__ == '__main__': send_data_li = [{"test": 1}, {"test": 2}] kp = KProducer(topic='topic', bootstrap_servers='127.0.0.1:9001,127.0.0.1:9002') # 同步发送(最慢,安全性最高) kp.sync_producer(send_data_li) # 异步发送(最快,安全性最低) # kp.asyn_producer(send_data_li) # 异步+回调(中等) # kp.asyn_producer_callback(send_data_li) kp.close_producer()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。