赞
踩
阿里云物联网python linkkitsdk的说明太不清楚,不能够很好的指导我通过Python连接平台,经过参考官方说明和网上的一些文章已经能够连接到平台,但是还是没能更进一步的应用。
之前一直用arduino IDE官网的aliyunsdk连接aliyun物联网,觉得阿里云对arduino的教程更是少,因此想研究一下python的开发。
个人只是业余爱好,水平有限,解读linkkit只是为了自己更好的连接阿里云,如有错误请请原谅,我用业余时间解读,完成这个这个工程可能需要一段时间。
第二天,看的我有点要放弃了,本来水平就有限,还有好多不懂得地方。另外我觉得既然官方的出的sdk应有的注释或者使用说明应该要配套好啊,还是阿里的java工程师多啊,没有时间研究python啊。
不多说了明天继续,找出几个常用的订阅和发布就够用了,物联网开发也不用发送太多的信息。
import os import logging ## 引入logging 日志模块 import threading #引入threading 多线程模块 import queue #列队,依赖threading线程模块 import urllib.request #引入urllib.request爬虫模块 import urllib.parse #引入urlib.parse模块,作用是将URL按一定的格式进行拆分 import json #引入json库 import hashlib #引入hashlib库,应该是用于计算认证秘钥的,看看后面哪里用到了! import hmac #这个也是哈希加密,不知道和上面的有什么区别,是做什么用的? import random #引入random 生成随机数模块 import ssl #引入ssl模块,应该是访问网站的一种协议 import socket #引入socket 套接字模块 import string #引入string 字符串处理模块 import time #引入time 时间模块 import re #引入re 正则表达式 import sys #引入sys 系统功能模块 from enum import Enum #引入enum 枚举模块 import paho.mqtt.client as mqtt #引入MQTT 模块 from paho.mqtt.client import MQTTMessage #引入MQTT 模块 from linkkit import h2client #引入linkkit 中的h2client模块 #python的版本,主版本/次版本,下面定义的就是python3.5 REQUIRED_MAJOR_VERSION = 3 REQUIRED_MINOR_VERSION = 5 # check Python version/检查python版本 def lk_check_python_version(major_version, minor_version): version = sys.version_info if version[0] < major_version or (version[0] == major_version and version[1] < minor_version): print("WARNING: linkit requires Python %d.%d or higher, and the current version is %s" % (major_version, minor_version, sys.version)) #调用lk_chech_python_version函数 lk_check_python_version(REQUIRED_MAJOR_VERSION, REQUIRED_MINOR_VERSION) #定义类(继承object类) class LinkKit(object): TAG_KEY = "attrKey" TAG_VALUE = "attrValue" class LinkKitState(Enum): INITIALIZED = 1 CONNECTING = 2 CONNECTED = 3 DISCONNECTING = 4 DISCONNECTED = 5 DESTRUCTING = 6 DESTRUCTED = 7 class StateError(Exception): def __init__(self, err): Exception.__init__(self, err) class Shadow(object): def __init__(self): self.__version = None self.__timestamp = None self.__state = None self.__metadata = None self.__latest_shadow_lock = threading.Lock() self.__latest_received_time = None self.__lastest_received_payload = None def get_version(self): with self.__latest_shadow_lock: return self.__version def get_metadata(self): with self.__latest_shadow_lock: return self.__metadata def get_state(self): with self.__latest_shadow_lock: return self.__state def set_state(self, state): with self.__latest_shadow_lock: self.__state = state def set_metadata(self, metadata): with self.__latest_shadow_lock: self.__metadata = metadata def set_version(self, version): with self.__latest_shadow_lock: self.__version = version def set_timestamp(self, timestamp): with self.__latest_shadow_lock: self.__timestamp = timestamp def set_latest_recevied_time(self, timestamp): with self.__latest_shadow_lock: self.__latest_received_time = timestamp def get_latest_recevied_time(self): with self.__latest_shadow_lock: return self.__latest_received_time def set_latest_recevied_payload(self, payload): with self.__latest_shadow_lock: self.__latest_received_payload = payload def get_latest_recevied_payload(self): with self.__latest_shadow_lock: return self.__latest_received_payload def to_dict(self): return { 'state':self.__state, 'metadata':self.__metadata, 'version':self.__version, 'timestamp':self.__timestamp} def to_json_string(self): return json.dumps(self.to_dict()) class __HandlerTask(object): def __init__(self, logger=None): self.__logger = logger if self.__logger is not None: self.__logger.info("HandlerTask init enter") self.__message_queue = queue.Queue(20) self.__cmd_callback = { } self.__started = False self.__exited = False self.__thread = None pass def register_cmd_callback(self, cmd, callback): if self.__started is False: if cmd != "req_exit": self.__cmd_callback[cmd] = callback return 0 else: return 1 pass else: return 2 pass def post_message(self, cmd, value): self.__logger.debug("post_message :%r " % cmd) if self.__started and self.__exited is False: try: self.__message_queue.put((cmd, value), timeout=5) except queue.Full as e: self.__logger.error("queue full: %r" % e) return False self.__logger.debug("post_message success") return True self.__logger.debug("post_message fail started:%r,exited:%r" % (self.__started, self.__exited)) return False pass def start(self): if self.__logger is not None: self.__logger.info("HandlerTask start") if self.__started is False: if self.__logger is not None: self.__logger.info("HandlerTask try start") self.__exited = False self.__started = True self.__message_queue = queue.Queue(20) self.__thread = threading.Thread(target=self.__thread_runnable) self.__thread.daemon = True self.__thread.start() return 0 return 1 def stop(self): if self.__started and self.__exited is False: self.__exited = True self.__message_queue.put(("req_exit", None)) def wait_stop(self): if self.__started is True: self.__thread.join() def __thread_runnable(self): if self.__logger is not None: self.__logger.debug("thread runnable enter") while True: cmd, value = self.__message_queue.get() self.__logger.debug("thread runnable pop cmd:%r" % cmd) if cmd == "req_exit": break if self.__cmd_callback[cmd] is not None: try: self.__cmd_callback[cmd](value) except Exception as e: if self.__logger is not None: self.__logger.error("thread runnable raise exception:%s" % e) self.__started = False if self.__logger is not None: self.__logger.debug("thread runnable exit") pass class LoopThread(object): def __init__(self, logger=None): self.__logger = logger if logger is not None: self.__logger.info("LoopThread init enter") self.__callback = None self.__thread = None self.__started = False self.__req_wait = threading.Event() if logger is not None: self.__logger.info("LoopThread init exit") def start(self, callback): if self.__started is True: self.__logger.info("LoopThread already ") return 1 else: self.__callback = callback self.__thread = threading.Thread(target=self.__thread_main) self.__thread.daemon = True self.__thread.start() return 0 def stop(self): self.__req_wait.wait() self.__req_wait.clear() def __thread_main(self): self.__started = True try: if self.__logger is not None: self.__logger.debug("LoopThread thread enter") if self.__callback is not None: self.__callback() if self.__logger is not None: self.__logger.debug("LoopThread thread exit") except Exception as e: self.__logger.error("LoopThread thread Exception:" + str(e)) self.__started = False self.__req_wait.set() class __H2FileUploadSink(h2client.H2FileUploadSink): def __init__(self, linkkit_instance): self.__lk_instance = linkkit_instance def on_file_upload_start(self, id, upload_file_info, user_data): self.__lk_instance._on_file_upload_start(id, upload_file_info, user_data) def on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data): self.__lk_instance._on_file_upload_end(id, upload_file_info, upload_file_result, user_data) def on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data): self.__lk_instance._on_file_upload_progress(id, upload_file_result, upload_file_info, user_data) def _on_file_upload_start(self, id, upload_file_info, user_data): if self.__on_file_upload_begin != None: self.__on_file_upload_begin(id, upload_file_info, self.__user_data) def _on_file_upload_end(self, id, upload_file_info, upload_file_result, user_data): if self.__on_file_upload_end != None: self.__on_file_upload_end(id, upload_file_info, upload_file_result, self.__user_data) def _on_file_upload_progress(self, id, upload_file_result, upload_file_info, user_data): pass class __LinkKitLog(object): def __init__(self): self.__logger = logging.getLogger("linkkit") self.__enabled = False pass def enable_logger(self): self.__enabled = True def disable_logger(self): self.__enabled = False def is_enabled(self): return self.__enabled def config_logger(self, level): self.__logger.setLevel(level) def debug(self, fmt, *args): if self.__enabled: self.__logger.debug(fmt, *args) def warring(self, fmt, *args): if self.__enabled: self.__logger.warning(fmt, *args) def info(self, fmt, *args): if self.__enabled: self.__logger.info(fmt, *args) def error(self, fmt, *args): if self.__enabled: self.__logger.error(fmt, *args) def critical(self, fmt, *args): if self.__enabled: self.__logger.critical(fmt, *args) __USER_TOPIC_PREFIX = "/%s/%s/%s" __ALIYUN_BROKER_CA_DATA = "\ -----BEGIN CERTIFICATE-----\n\ MIIDdTCCAl2gAwIBAgILBAAAAAABFUtaw5QwDQYJKoZIhvcNAQEFBQAwVzELMAkG\ A1UEBhMCQkUxGTAXBgNVBAoTEEdsb2JhbFNpZ24gbnYtc2ExEDAOBgNVBAsTB1Jv\ b3QgQ0ExGzAZBgNVBAMTEkdsb2JhbFNpZ24gUm9vdCBDQTAeFw05ODA5MDExMjAw\ MDBaFw0yODAxMjgxMjAwMDBaMFcxCzAJBgNVBAYTAkJFMRkwFwYDVQQKExBHbG9i\ YWxTaWduIG52LXNhMRAwDgYDVQQLEwdSb290IENBMRswGQYDVQQDExJHbG9iYWxT\ aWduIFJvb3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaDuaZ\ jc6j40+Kfvvxi4Mla+pIH/EqsLmVEQS98GPR4mdmzxzdzxtIK+6NiY6arymAZavp\ xy0Sy6scTHAHoT0KMM0VjU/43dSMUBUc71DuxC73/OlS8pF94G3VNTCOXkNz8kHp\ 1Wrjsok6Vjk4bwY8iGlbKk3Fp1S4bInMm/k8yuX9ifUSPJJ4ltbcdG6TRGHRjcdG\ snUOhugZitVtbNV4FpWi6cgKOOvyJBNPc1STE4U6G7weNLWLBYy5d4ux2x8gkasJ\ U26Qzns3dLlwR5EiUWMWea6xrkEmCMgZK9FGqkjWZCrXgzT/LCrBbBlDSgeF59N8\ 9iFo7+ryUp9/k5DPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNVHRMBAf8E\ BTADAQH/MB0GA1UdDgQWBBRge2YaRQ2XyolQL30EzTSo//z9SzANBgkqhkiG9w0B\ AQUFAAOCAQEA1nPnfE920I2/7LqivjTFKDK1fPxsnCwrvQmeU79rXqoRSLblCKOz\ yj1hTdNGCbM+w6DjY1Ub8rrvrTnhQ7k4o+YviiY776BQVvnGCv04zcQLcFGUl5gE\ 38NflNUVyRRBnMRddWQVDf9VMOyGj/8N7yy5Y0b2qvzfvGn9LhJIZJrglfCm7ymP\ AbEVtQwdpf5pLGkkeB6zpxxxYu7KyJesF12KwvhHhm4qxFYxldBniYUr+WymXUad\ DKqC5JlR3XC321Y9YeRq4VzW9v493kHMB65jUr9TU/Qr6cf9tveCX4XSQRjbgbME\ HMUfpIBvFSDJ3gyICh3WZlXi/EjJKSZp4A==\n\ -----END CERTIFICATE-----" def __init__(self, host_name, product_key, device_name, device_secret, product_secret=None, user_data=None): # logging configs self.__just_for_pycharm_autocomplete = False def __str_is_empty(value): if value is None or value == "": return True else: return False # param check if __str_is_empty(host_name): raise ValueError("host_name wrong") if __str_is_empty(product_key): raise ValueError("product key wrong") if __str_is_empty(device_name): raise ValueError("device name wrong") if __str_is_empty(device_secret) and __str_is_empty(product_secret): raise ValueError("device secret & product secret are both empty") self.__link_log = LinkKit.__LinkKitLog() self.__PahoLog = logging.getLogger("Paho") self.__PahoLog.setLevel(logging.DEBUG) # config internal property/配置内部属性(初始化了一堆变量,慢慢看) self.__host_name = host_name self.__product_key = product_key self.__device_name = device_name self.__device_secret = device_secret self.__product_secret = product_secret self.__user_data = user_data self.__device_interface_info = "" self.__device_mac = None self.__cellular_IMEI = None self.__cellular_ICCID = None self.__cellular_IMSI = None self.__cellular_MSISDN = None self.__mqtt_client = None self.__sdk_version = "1.2.0" self.__sdk_program_language = "Python" self.__endpoint = None self.__h2_endpoint = None self.__mqtt_port = 1883 self.__mqtt_protocol = "MQTTv311" self.__mqtt_transport = "TCP" self.__mqtt_secure = "TLS" self.__mqtt_keep_alive = 60 self.__mqtt_clean_session = True self.__mqtt_max_inflight_message = 20 self.__mqtt_max_queued_message = 40 self.__mqtt_auto_reconnect_min_sec = 1 self.__mqtt_auto_reconnect_max_sec = 60 self.__mqtt_auto_reconnect_sec = 0 self.__mqtt_request_timeout = 10 self.__linkkit_state = LinkKit.LinkKitState.INITIALIZED self.__aliyun_broker_ca_data = self.__ALIYUN_BROKER_CA_DATA self.__latest_shadow = LinkKit.Shadow() # aliyun IoT callbacks/阿里云IOT回调,初始化变量 self.__on_device_dynamic_register = None # mqtt callbacks/MQTT回调,初始化变量 self.__on_connect = None self.__on_disconnect = None self.__on_publish_topic = None self.__on_subscribe_topic = None self.__on_unsubscribe_topic = None self.__on_topic_message = None self.__on_topic_rrpc_message = None self.__on_subscribe_rrpc_topic = None self.__on_unsubscribe_rrpc_topic = None # thing model callbacks/事件回调,初始化变量 self.__on_thing_create = None self.__on_thing_enable = None self.__on_thing_disable = None self.__on_thing_raw_data_arrived = None self.__on_thing_raw_data_post = None self.__on_thing_call_service = None self.__on_thing_prop_changed = None self.__on_thing_event_post = None self.__on_thing_prop_post = None self.__on_thing_shadow_get = None self.__on_thing_device_info_update = None self.__on_thing_device_info_delete = None self.__on_file_upload_begin = None self.__on_file_upload_end = None self.__user_topics = { } #初始化为空字典 self.__user_topics_subscribe_request = { } self.__user_topics_unsubscribe_request = { } self.__user_topics_request_lock = threading.Lock() self.__user_topics_unsubscribe_request_lock = threading.Lock() self.__user_rrpc_topics = { } self.__user_rrpc_topics_lock = threading.RLock() self.__user_rrpc_topics_subscribe_request = { } self.__user_rrpc_topics_unsubscribe_request = { } self.__user_rrpc_topics_subscribe_request_lock = threading.RLock() self.__user_rrpc_topics_unsubscribe_request_lock = threading.RLock() self.__user_rrpc_request_ids = [] self.__user_rrpc_request_id_index_map = { } self.__user_rrpc_request_ids_lock = threading.RLock() self.__user_rrpc_request_max_len = 100 # things topic - Alink #物模型通信 Topic self.__thing_topic_prop_post = '/sys/%s/%s/thing/event/property/post' % (self.__product_key, self.__device_name) #设备属性上报,发布 self.__thing_topic_prop_post_reply = self.__thing_topic_prop_post + "_reply" #云端响应属性上报,订阅 self.__thing_topic_prop_set = '/sys/%s/%s/thing/service/property/set' % (self.__product_key, self.__device_name) #设备属性设置,订阅 self.__thing_topic_prop_set_reply = self.__thing_topic_prop_set + "_reply" #设备响应replyTopic,官方的说明里并没有这个响应订阅 self.__thing_topic_prop_get = '/sys/%s/%s/thing/service/property/get' % (self.__product_key, self.__device_name) #云端获取设备属性,官方的说明里并没有这个设备订阅 ''' self.__thing_topic_event_post_pattern = '/sys/%s/%s/thing/event/%s/post' ##这个应该是事件上报,没格式化字符串,还是不需要格式化啊? #/sys/a1VWcAX5JPn/${deviceName}/thing/event/${tsl.event.identifer}/post ''' self.__thing_topic_event_post_pattern = '/sys/%s/%s/thing/event/%s/post' %() self.__thing_prop_post_mid = { } self.__thing_prop_post_mid_lock = threading.Lock() self.__thing_prop_set_reply_mid = { } self.__thing_prop_set_reply_mid_lock = threading.Lock() # event:post topic/事件:发布主题 self.__thing_topic_event_post = { } self.__thing_topic_event_post_reply = set() self.__thing_events = set() self.__thing_request_id_max = 1000000 self.__thing_request_value = 0 self.__thing_request_id = { } self.__thing_request_id_lock = threading.Lock() self.__thing_event_post_mid = { } self.__thing_event_post_mid_lock = threading.Lock() self.__thing_topic_shadow_get = '/shadow/get/%s/%s' % \ (self.__product_key, self.__device_name) self.__thing_topic_shadow_update = '/shadow/update/%s/%s' % \ (self.__product_key, self.__device_name) self.__thing_shadow_mid = { } self.__thing_shadow_mid_lock = threading.Lock() # service topic self.__thing_topic_service_pattern = '/sys/%s/%s/thing/service/%s' self.__thing_topic_services = set() self.__thing_topic_services_reply = set() self.__thing_services = set() self.__thing_answer_service_mid = { } self.__thing_answer_service_mid_lock = threading.Lock() # thing topic - raw self.__thing_topic_raw_up = '/sys/%s/%s/thing/model/up_raw' % (self.__product_key, self.__device_name) self.__thing_topic_raw_up_reply = self.__thing_topic_raw_up + "_reply" self.__thing_topic_raw_down = '/sys/%s/%s/thing/model/down_raw' % (self.__product_key, self.__device_name) self.__thing_topic_raw_down_reply = self.__thing_topic_raw_down + "_reply" self.__thing_raw_up_mid = { } self.__thing_raw_up_mid_lock = threading.Lock() self.__thing_raw_down_reply_mid = { } self.__thing_raw_down_reply_mid_lock = threading.Lock() # thing topic - device_info self.__thing_topic_update_device_info_up = '/sys/%s/%s/thing/deviceinfo/update' % (self.__product_key, self.__device_name) self.__thing_topic_update_device_info_reply = self.__thing_topic_update_device_info_up + "_reply" self.__thing_topic_delete_device_info_up = '/sys/%s/%s/thing/deviceinfo/delete' % (self.__product_key, self.__device_name) self.__thing_topic_delete_device_info_reply = self.__thing_topic_delete_device_info_up + "_reply" self.__thing_update_device_info_up_mid = { } self.__thing_update_device_info_up_mid_lock = threading.Lock() self.__thing_delete_device_info_up_mid = { } self.__thing_delete_device_info_up_mid_lock
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。