当前位置:   article > 正文

笔记(三)mqtt客户端代码分析(python)_python mqtt消息解析

python mqtt消息解析

一、逻辑分析

逻辑一: 

逻辑二: 

 注:因为在我们这个课设项目中服务器向客户端传输图片时将一个图片分包传输,所以在客户端需要将图片按序重新组装,然后再进行人脸识别 

二、重要模块代码

1.通过影子设备文件获取userid

  1. a = []#获取userId
  2. with open("/home/simon/mqtt/"+"shadow.ini","r") as ud:
  3. mes = ud.read()
  4. mes_parsed = json.loads(mes)
  5. meslist = mes_parsed['payload']['state']['reported']['faceList']
  6. for i in meslist:
  7. #print(str(i))
  8. userid = i['userId']
  9. a.append(int(userid))
  10. #print(userid)
  11. #print(a)

2.对文件夹中的图片进行循环扫描

  1. def file_circle(dirs5,houzhui,i,picture):
  2. global picture2
  3. for dirs4 in dirs5:
  4. #print("dirs4:"+dirs4)
  5. d2 = os.path.basename(dirs4)#图片名加后缀
  6. num2 = os.path.splitext(d2)[0]#图片名
  7. basename2 = os.path.splitext(d2)[1] #后缀名
  8. if(int(num2)==i):
  9. with open("/home/simon/mqtt/picture_recieve/"+str(num2)+basename2,"rb") as p1:
  10. picture2 = picture + str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  11. break
  12. return(picture2)

3.当topic为control且type为imageUpdate时需现将存有图片的文件夹中的对应userid的图片删除

  1. if(msg_type=="imageUpdate"):
  2. rootdir = "/home/simon/mqtt/picture_recieve/"
  3. file_delete = os.listdir(rootdir)
  4. for file in file_delete:
  5. if houzhui in file:
  6. del_file = rootdir + file#当代码和要删除的文件不在同一个文件夹时,必须使用绝对路径
  7. os.remove(del_file)#删除文件

4.判断接收的到一个userid的图片是否全部收到

  1. key = len(u)-1
  2. if(key<0):
  3. u.append(useridnum)
  4. v[key]=1
  5. else:
  6. while(key>=0):
  7. if(uesridnum==u[key]):
  8. v[key]+=1
  9. else:
  10. u.append(useridnum)
  11. v[key]=1
  12. if(v[key]==packettotal):
  13. break
  14. key = key-1

5.一张图片接收完以后进行组装

  1. #一张图片全部接收后进行组装
  2. if(v[key]==packettotal):
  3. i=1
  4. #all = r'/home/simon/mqtt/picture_recieve'
  5. bool_flag2 = False#大循环跳出标志
  6. dirs2 = []
  7. dirs0 = []
  8. for dirs1 in glob.glob('/home/simon/mqtt/picture_recieve'+"/*"+houzhui):
  9. dirs2.append(dirs1)
  10. dirs0.append(dirs1)
  11. #print(dirs2)
  12. for dirs3 in dirs2:
  13. #print("dirs3:"+dirs3)
  14. d1 = os.path.basename(dirs3)#图片名加后缀
  15. num1 = os.path.splitext(d1)[0]#图片名
  16. basename1 = os.path.splitext(d1)[1] #后缀名
  17. if(int(num1)==1):
  18. with open("/home/simon/mqtt/picture_recieve/"+str(num1)+basename1,"rb") as p1:
  19. picture1 = str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  20. i=i+1
  21. bool_flag1 = False#跳出小循环标志
  22. while True:
  23. if(i!=packettotal):
  24. picture1 = file_circle(dirs0,houzhui,i,picture1)
  25. i=i+1
  26. if(i==packettotal):
  27. picture1 = file_circle(dirs0,houzhui,i,picture1)
  28. i=1
  29. bool_flag1 = True
  30. break
  31. if(bool_flag1):
  32. bool_flag2 = True
  33. temp = base64.b64decode(picture1)#bytes
  34. with open("/home/simon/mqtt/picture/"+str(recieve_parsed['payload']['user'])+str(recieve_parsed['payload']['userName'])+".jpg","wb") as o:
  35. o.write(temp)
  36. if(bool_flag2):
  37. break

6.如果get到的影子设备文件内desired有内容,即表示客户有需要我做的事情,所以完成desired的事情后,将其放置到reported内发送delta,报告给客户表明我做完了这件事

  1. if(shadow_desired!={}):
  2. if('state' in shadow_desired):
  3. state = shadow_desired['state']
  4. else:state = shadow_reported['state']
  5. if('faceList' in shadow_desired):
  6. s_d_facelist = shadow_desired['faceList']#数组
  7. s_d_f = ",".join('%s'%id for id in s_d_facelist)#字符串
  8. else:
  9. s_r_facelist = shadow_reported['faceList']#数组
  10. s_d_f = ",".join('%s'%id for id in s_r_facelist)#字符串
  11. data_delta = {
  12. "type":"delta",
  13. "state":{
  14. "reported":{
  15. "state":state,
  16. "faceList":[s_d_f]
  17. }
  18. },
  19. "version":shadow_version,
  20. "clientToken":str(clientToken)
  21. }
  22. data_delta_param = json.dumps(data_delta)
  23. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=data_delta_param,qos=2,retain=False)#delta

7.循环扫描存有要发送信息的文件,有信息则发布,然后删除发送的信息,继续循环扫描

  1. while True:
  2. with open('/home/simon/mqtt/'+'FaceAndTime.json','r',encoding="utf-8") as old_f:
  3. #读取所有行,每行会是一个字符串
  4. for j in old_f.readlines():
  5. if(j):
  6. print(j)
  7. face = json.loads(j,object_hook=dict2Data)
  8. data_face = {
  9. "type":"history",
  10. "payload":{
  11. "history":[
  12. {
  13. "timestamp":face.timestamp,
  14. "history":{"name":face.name}
  15. }]
  16. },
  17. "clientToken":str(clientToken)
  18. }
  19. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=str(data_face), qos=0,retain=False)#发送人脸数据
  20. with open('/home/simon/mqtt/'+'FaceAndTime.json','w+',encoding="utf-8") as new_f:
  21. seek_point = old_f.tell()#光标在被删除行的行首,记录该位置
  22. new_f.seek(seek_point,0)#设置光标位置
  23. old_f.readline()#读需要删除的行,光标移动到下一行行首
  24. next_line=old_f.readline()#被删除行的下一行读给next_line
  25. while next_line:#连续覆盖剩余行,后面所有行上移一行
  26. new_f.write(next_line)
  27. next_line=old_f.readline()
  28. new_f.truncate()

三、完整代码

逻辑一: 

  1. import paho.mqtt.client as mqtt
  2. import json
  3. import base64
  4. import uuid
  5. import time
  6. import sys
  7. import multiprocessing
  8. import re
  9. import os
  10. from face_recognition.face_recognition_cli import image_files_in_folder
  11. import glob
  12. HOST = "api.yumik.top"
  13. PORT = 1883
  14. client_id="414ce45bde0a4bc1949fec75a7d2f699"
  15. clientToken=uuid.uuid4()
  16. class Data(object):
  17. def __init__(self,name,timestamp):
  18. self.timestamp = timestamp
  19. self.name = name
  20. def dict2Data(d):
  21. return Data(d['name'],d['timestamp'])
  22. a = []#获取userId
  23. with open("/home/simon/mqtt/"+"shadow.ini","r") as ud:
  24. mes = ud.read()
  25. mes_parsed = json.loads(mes)
  26. meslist = mes_parsed['payload']['state']['reported']['faceList']
  27. for i in meslist:
  28. #print(str(i))
  29. userid = i['userId']
  30. a.append(int(userid))
  31. #print(userid)
  32. #print(a)
  33. #publish event
  34. data_in = {
  35. "type":"face",
  36. "payload":{"userList":a,},
  37. #"payload":{"userList":[2],},
  38. "clientToken":str(clientToken)
  39. }
  40. param_in = json.dumps(data_in)
  41. #publish shadow/operation
  42. data1 = {
  43. "type":"get",
  44. "clientToken":str(clientToken)
  45. }
  46. param1 = json.dumps(data1)
  47. def on_connect(client, userdata, flags, rc):
  48. print("Connected with result code "+str(rc))
  49. def file_circle(dirs5,houzhui,i,picture):
  50. global picture2
  51. for dirs4 in dirs5:
  52. #print("dirs4:"+dirs4)
  53. d2 = os.path.basename(dirs4)#图片名加后缀
  54. num2 = os.path.splitext(d2)[0]#图片名
  55. basename2 = os.path.splitext(d2)[1] #后缀名
  56. if(int(num2)==i):
  57. with open("/home/simon/mqtt/picture_recieve/"+str(num2)+basename2,"rb") as p1:
  58. picture2 = picture + str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  59. break
  60. return(picture2)
  61. picture = ""
  62. houzhui = ""
  63. u = []#存用户的id
  64. v = []#存每个用户的图片数量,递增
  65. def on_message(client, userdata, msg):
  66. global picture
  67. global i
  68. global houzhui
  69. recieve = msg.payload.decode("utf-8")
  70. recieve_parsed = json.loads(recieve)
  71. msg_type = recieve_parsed['type']
  72. if(msg.topic=="414ce45bde0a4bc1949fec75a7d2f699/control" and (msg_type=="face" or msg_type=="imageUpdate")):
  73. print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  74. print("control achieve!")
  75. packetnum = recieve_parsed['payload']['packet']
  76. packettotal = recieve_parsed['payload']['totalPacket']
  77. picture = recieve_parsed['payload']['userImageBase64']
  78. temp = base64.b64decode(picture)#bytes
  79. useridnum = str(recieve_parsed['payload']['user'])
  80. houzhui = "."+str(recieve_parsed['payload']['user'])
  81. if(msg_type=="imageUpdate"):
  82. rootdir = "/home/simon/mqtt/picture_recieve/"
  83. file_delete = os.listdir(rootdir)
  84. for file in file_delete:
  85. if houzhui in file:
  86. del_file = rootdir + file#当代码和要删除的文件不在同一个文件夹时,必须使用绝对路径
  87. os.remove(del_file)#删除文件
  88. with open("/home/simon/mqtt/picture_recieve/"+str(packetnum)+"."+str(recieve_parsed['payload']['user']),"wb") as f:
  89. f.write(temp)#int
  90. #判断一张图片是否全部接收
  91. key = len(u)-1
  92. if(key<0):
  93. u.append(useridnum)
  94. v[key]=1
  95. else:
  96. while(key>=0):
  97. if(uesridnum==u[key]):
  98. v[key]+=1
  99. else:
  100. u.append(useridnum)
  101. v[key]=1
  102. if(v[key]==packettotal):
  103. break
  104. key = key-1
  105. #一张图片全部接收后进行组装
  106. if(v[key]==packettotal):
  107. i=1
  108. #all = r'/home/simon/mqtt/picture_recieve'
  109. bool_flag2 = False#大循环跳出标志
  110. dirs2 = []
  111. dirs0 = []
  112. for dirs1 in glob.glob('/home/simon/mqtt/picture_recieve'+"/*"+houzhui):
  113. dirs2.append(dirs1)
  114. dirs0.append(dirs1)
  115. #print(dirs2)
  116. for dirs3 in dirs2:
  117. #print("dirs3:"+dirs3)
  118. d1 = os.path.basename(dirs3)#图片名加后缀
  119. num1 = os.path.splitext(d1)[0]#图片名
  120. basename1 = os.path.splitext(d1)[1] #后缀名
  121. if(int(num1)==1):
  122. with open("/home/simon/mqtt/picture_recieve/"+str(num1)+basename1,"rb") as p1:
  123. picture1 = str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  124. i=i+1
  125. bool_flag1 = False#跳出小循环标志
  126. while True:
  127. if(i!=packettotal):
  128. picture1 = file_circle(dirs0,houzhui,i,picture1)
  129. i=i+1
  130. if(i==packettotal):
  131. picture1 = file_circle(dirs0,houzhui,i,picture1)
  132. i=1
  133. bool_flag1 = True
  134. break
  135. if(bool_flag1):
  136. bool_flag2 = True
  137. temp = base64.b64decode(picture1)#bytes
  138. with open("/home/simon/mqtt/picture/"+str(recieve_parsed['payload']['user'])+str(recieve_parsed['payload']['userName'])+".jpg","wb") as o:
  139. o.write(temp)
  140. if(bool_flag2):
  141. break
  142. if(msg.topic=="414ce45bde0a4bc1949fec75a7d2f699/control" and msg_type=="history"):
  143. print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  144. if(msg.topic=="$shadow/operation/result/414ce45bde0a4bc1949fec75a7d2f699"):
  145. #print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  146. print("shadow achieve!")
  147. #本地的shadow
  148. with open("/home/simon/mqtt/"+"shadow.ini","r") as sd:
  149. vsn = sd.read()
  150. vsn_parsed = json.loads(vsn)
  151. in_version = vsn_parsed['payload']['version']
  152. #get到的shadow
  153. shadow = msg.payload.decode('utf-8')
  154. shadow_get = json.loads(shadow)
  155. shadow_desired = shadow_get['payload']['state']['desired']
  156. shadow_reported = shadow_get['payload']['state']['reported']
  157. print(shadow_get)
  158. shadow_version = shadow_get['payload']['version']#字符串
  159. if(shadow_desired!={}):
  160. if('state' in shadow_desired):
  161. state = shadow_desired['state']
  162. else:state = shadow_reported['state']
  163. if('faceList' in shadow_desired):
  164. s_d_facelist = shadow_desired['faceList']#数组
  165. s_d_f = ",".join('%s'%id for id in s_d_facelist)#字符串
  166. else:
  167. s_r_facelist = shadow_reported['faceList']#数组
  168. s_d_f = ",".join('%s'%id for id in s_r_facelist)#字符串
  169. data_delta = {
  170. "type":"delta",
  171. "state":{
  172. "reported":{
  173. "state":state,
  174. "faceList":[s_d_f]
  175. }
  176. },
  177. "version":shadow_version,
  178. "clientToken":str(clientToken)
  179. }
  180. data_delta_param = json.dumps(data_delta)
  181. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=data_delta_param,qos=2,retain=False)#delta
  182. elif(shadow_version!=in_version):
  183. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=param1,qos=2,retain=False)#get
  184. with open("/home/simon/mqtt/"+"shadow.ini","w") as sd:
  185. sd.write(msg.payload.decode('utf-8'))
  186. def on_publish(client,userdata,mid):
  187. print("On onPublish: qos = %d" % mid)
  188. def on_subscribe(client, userdata, mid, granted_qos):
  189. print("On Subscribed: qos = %d" % granted_qos)
  190. def on_disconnect(client, userdata, rc):
  191. if rc != 0:
  192. print("Unexpected disconnection %s" % rc)
  193. def client_loop():
  194. while True:
  195. with open('/home/simon/mqtt/'+'FaceAndTime.json','r',encoding="utf-8") as old_f:
  196. #读取所有行,每行会是一个字符串
  197. for j in old_f.readlines():
  198. if(j):
  199. print(j)
  200. face = json.loads(j,object_hook=dict2Data)
  201. data_face = {
  202. "type":"history",
  203. "payload":{
  204. "history":[
  205. {
  206. "timestamp":face.timestamp,
  207. "history":{"name":face.name}
  208. }]
  209. },
  210. "clientToken":str(clientToken)
  211. }
  212. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=str(data_face), qos=0,retain=False)#发送人脸数据
  213. with open('/home/simon/mqtt/'+'FaceAndTime.json','w+',encoding="utf-8") as new_f:
  214. seek_point = old_f.tell()#光标在被删除行的行首,记录该位置
  215. new_f.seek(seek_point,0)#设置光标位置
  216. old_f.readline()#读需要删除的行,光标移动到下一行行首
  217. next_line=old_f.readline()#被删除行的下一行读给next_line
  218. while next_line:#连续覆盖剩余行,后面所有行上移一行
  219. new_f.write(next_line)
  220. next_line=old_f.readline()
  221. new_f.truncate()
  222. if __name__ == '__main__':
  223. client = mqtt.Client(client_id)
  224. client.on_connect = on_connect
  225. client.connect(HOST, PORT, 60)
  226. client.on_subscribe = on_subscribe
  227. client.subscribe("414ce45bde0a4bc1949fec75a7d2f699/control",qos=1)
  228. client.subscribe("$shadow/operation/result/414ce45bde0a4bc1949fec75a7d2f699",qos=1)
  229. client.on_publish = on_publish
  230. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=param_in,qos=2,retain=False)#获得图片
  231. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=param1,qos=2,retain=False)#get
  232. client.on_message = on_message
  233. client.loop_start()
  234. client_loop()

逻辑二: 

  1. import paho.mqtt.client as mqtt
  2. import json
  3. import base64
  4. import uuid
  5. import time
  6. import sys
  7. import multiprocessing
  8. import re
  9. import os
  10. from face_recognition.face_recognition_cli import image_files_in_folder
  11. import glob
  12. HOST = "api.yumik.top"
  13. PORT = 1883
  14. client_id="414ce45bde0a4bc1949fec75a7d2f699"
  15. clientToken=uuid.uuid4()
  16. class Data(object):
  17. def __init__(self,name,timestamp):
  18. self.timestamp = timestamp
  19. self.name = name
  20. def dict2Data(d):
  21. return Data(d['name'],d['timestamp'])
  22. #publish shadow/operation
  23. data1 = {
  24. "type":"get",
  25. "clientToken":str(clientToken)
  26. }
  27. param1 = json.dumps(data1)
  28. def on_connect(client, userdata, flags, rc):
  29. print("Connected with result code "+str(rc))
  30. def file_circle(dirs5,houzhui,i,picture):
  31. global picture2
  32. for dirs4 in dirs5:
  33. d2 = os.path.basename(dirs4)#图片名加后缀
  34. num2 = os.path.splitext(d2)[0]#图片名
  35. basename2 = os.path.splitext(d2)[1] #后缀名
  36. if(int(num2)==i):
  37. with open("/home/simon/mqtt/picture_recieve/"+str(num2)+basename2,"rb") as p1:
  38. picture2 = picture + str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  39. break
  40. return(picture2)
  41. picture = ""
  42. houzhui = ""
  43. u = []#存用户的id
  44. v = []#存每个用户的图片数量,递增
  45. a = []#获取userId
  46. def on_message(client, userdata, msg):
  47. global picture
  48. global i
  49. global houzhui
  50. recieve = msg.payload.decode("utf-8")
  51. recieve_parsed = json.loads(recieve)
  52. msg_type = recieve_parsed['type']
  53. if(msg.topic=="414ce45bde0a4bc1949fec75a7d2f699/control" and msg_type=="history"):
  54. print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  55. if(msg.topic=="$shadow/operation/result/414ce45bde0a4bc1949fec75a7d2f699"):
  56. if(msg_type=="get"):
  57. print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  58. print("shadow achieve!")
  59. #get到的shadow
  60. #shadow = msg.payload.decode('utf-8')
  61. #shadow_get = json.loads(shadow)
  62. shadow_desired = recieve_parsed['payload']['state']['desired']
  63. shadow_reported = recieve_parsed['payload']['state']['reported']
  64. shadow_version = recieve_parsed['payload']['version']#字符串
  65. if(shadow_desired!={}):
  66. if('faceList' in shadow_desired):
  67. s_d_facelist = shadow_desired['faceList']#数组
  68. if('state' in shadow_desired):
  69. state = shadow_desired['state']
  70. data_delta = {
  71. "type":"delta",
  72. "state":{
  73. "reported":{
  74. "state":state,
  75. "faceList":s_d_facelist
  76. }
  77. },
  78. "version":shadow_version,
  79. "clientToken":str(clientToken)
  80. }
  81. else:
  82. data_delta = {
  83. "type":"delta",
  84. "state":{
  85. "reported":{
  86. "faceList":s_d_facelist
  87. }
  88. },
  89. "version":shadow_version,
  90. "clientToken":str(clientToken)
  91. }
  92. else:
  93. state = shadow_desired['state']
  94. data_delta = {
  95. "type":"delta",
  96. "state":{
  97. "reported":{
  98. "state":state
  99. }
  100. },
  101. "version":shadow_version,
  102. "clientToken":str(clientToken)
  103. }
  104. data_delta_param = json.dumps(data_delta)
  105. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=data_delta_param,qos=2,retain=False)#delta
  106. else:#desired内没有内容需要完成时
  107. with open("/home/simon/mqtt/"+"shadow.ini","r") as ud:
  108. mes = ud.read()
  109. mes_parsed = json.loads(mes)
  110. meslist = mes_parsed['payload']['state']['reported']['faceList']
  111. for i in meslist:
  112. userid = i['userId']
  113. a.append(int(userid))
  114. #publish event
  115. data_in = {
  116. "type":"face",
  117. "payload":{"userList":a,},
  118. #"payload":{"userList":[2,3,4,5],},
  119. "clientToken":str(clientToken)
  120. }
  121. param_in = json.dumps(data_in)
  122. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=param_in,qos=2,retain=False)#获得图片
  123. if(msg_type=="delta"):
  124. print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  125. print("shadow achieve!")
  126. with open("/home/simon/mqtt/"+"shadow.ini","w") as sd:
  127. sd.write(msg.payload.decode('utf-8'))
  128. with open("/home/simon/mqtt/"+"shadow.ini","r") as ud:
  129. mes = ud.read()
  130. mes_parsed = json.loads(mes)
  131. meslist = mes_parsed['payload']['state']['reported']['faceList']
  132. for i in meslist:
  133. userid = i['userId']
  134. a.append(int(userid))
  135. #publish event
  136. data_in = {
  137. "type":"face",
  138. "payload":{"userList":a,},
  139. #"payload":{"userList":[2,3,4,5],},
  140. "clientToken":str(clientToken)
  141. }
  142. param_in = json.dumps(data_in)
  143. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=param_in,qos=2,retain=False)#获得图片
  144. if(msg.topic=="414ce45bde0a4bc1949fec75a7d2f699/control" and (msg_type=="face" or msg_type=="imageUpdate")):
  145. #print("topic:"+msg.topic+" message:"+str(msg.payload.decode('utf-8')))
  146. print("control achieve!")
  147. packetnum = recieve_parsed['payload']['packet']
  148. packettotal = recieve_parsed['payload']['totalPacket']
  149. picture = recieve_parsed['payload']['userImageBase64']
  150. temp = base64.b64decode(picture)#bytes
  151. useridnum = str(recieve_parsed['payload']['user'])
  152. houzhui = "."+str(recieve_parsed['payload']['user'])
  153. if(msg_type=="imageUpdate"):
  154. rootdir = "/home/simon/mqtt/picture_recieve/"
  155. file_delete = os.listdir(rootdir)
  156. for file in file_delete:
  157. if houzhui in file:
  158. del_file = rootdir + file#当代码和要删除的文件不在同一个文件夹时,必须使用绝对路径
  159. os.remove(del_file)#删除文件
  160. with open("/home/simon/mqtt/picture_recieve/"+str(packetnum)+"."+str(recieve_parsed['payload']['user']),"wb") as f:
  161. f.write(temp)#int
  162. #判断一张图片是否全部接收
  163. key = len(u)-1
  164. if(key<0):#接收第一张图片
  165. u.append(useridnum)
  166. v.append(1)
  167. else:
  168. while(key>=0):
  169. if(useridnum==u[key]):
  170. v[key]+=1
  171. else:
  172. u.append(useridnum)
  173. v.append(1)
  174. if(v[key]==packettotal):
  175. break
  176. key = key-1
  177. #一张图片全部接收后进行组装
  178. if(v[key]==packettotal):
  179. i=1
  180. bool_flag2 = False#大循环跳出标志
  181. dirs2 = []#大循环的数组
  182. dirs0 = []#小循环的数组
  183. for dirs1 in glob.glob('/home/simon/mqtt/picture_recieve'+"/*"+houzhui):
  184. dirs2.append(dirs1)
  185. dirs0.append(dirs1)
  186. for dirs3 in dirs2:
  187. d1 = os.path.basename(dirs3)#图片名加后缀
  188. num1 = os.path.splitext(d1)[0]#图片名
  189. basename1 = os.path.splitext(d1)[1] #后缀名
  190. if(int(num1)==1):#找到序号为1的照片部分
  191. with open("/home/simon/mqtt/picture_recieve/"+str(num1)+basename1,"rb") as p1:
  192. picture1 = str(base64.b64encode(p1.read()),encoding="utf-8")#bytes
  193. i=i+1
  194. bool_flag1 = False#跳出小循环标志
  195. while True:
  196. if(i!=packettotal):#不是最后一部分就一直组装照片
  197. picture1 = file_circle(dirs0,houzhui,i,picture1)
  198. i=i+1
  199. if(i==packettotal):#找到照片的最后一部分
  200. picture1 = file_circle(dirs0,houzhui,i,picture1)
  201. i=1
  202. bool_flag1 = True
  203. break
  204. if(bool_flag1):
  205. bool_flag2 = True
  206. temp = base64.b64decode(picture1)#bytes
  207. with open("/home/simon/mqtt/picture/"+str(recieve_parsed['payload']['user'])+str(recieve_parsed['payload']['userName'])+".jpg","wb") as o:
  208. o.write(temp)
  209. if(bool_flag2):
  210. break
  211. def on_publish(client,userdata,mid):
  212. print("On onPublish: qos = %d" % mid)
  213. def on_subscribe(client, userdata, mid, granted_qos):
  214. print("On Subscribed: qos = %d" % granted_qos)
  215. def on_disconnect(client, userdata, rc):
  216. if rc != 0:
  217. print("Unexpected disconnection %s" % rc)
  218. def client_loop():
  219. while True:
  220. with open('/home/simon/mqtt/'+'FaceAndTime.json','r',encoding="utf-8") as old_f:
  221. #读取所有行,每行会是一个字符串
  222. for j in old_f.readlines():
  223. if(j):
  224. print(j)
  225. face = json.loads(j,object_hook=dict2Data)
  226. data_face = {
  227. "type":"history",
  228. "payload":{
  229. "history":[
  230. {
  231. "timestamp":face.timestamp,
  232. "history":{"name":face.name}
  233. }]
  234. },
  235. "clientToken":str(clientToken)
  236. }
  237. client.publish("414ce45bde0a4bc1949fec75a7d2f699/event", payload=str(data_face), qos=0,retain=False)#发送人脸数据
  238. with open('/home/simon/mqtt/'+'FaceAndTime.json','w+',encoding="utf-8") as new_f:
  239. seek_point = old_f.tell()#光标在被删除行的行首,记录该位置
  240. new_f.seek(seek_point,0)#设置光标位置
  241. old_f.readline()#读需要删除的行,光标移动到下一行行首
  242. next_line=old_f.readline()#被删除行的下一行读给next_line
  243. while next_line:#连续覆盖剩余行,后面所有行上移一行
  244. new_f.write(next_line)
  245. next_line=old_f.readline()
  246. new_f.truncate()
  247. if __name__ == '__main__':
  248. client = mqtt.Client(client_id)
  249. client.username_pw_set("device", "15f56228e74f2c978b0c376df35d3df8166da22f858f016f6906430d0c4b6070")
  250. client.on_connect = on_connect
  251. client.connect(HOST, PORT, 60)
  252. client.on_subscribe = on_subscribe
  253. client.subscribe("414ce45bde0a4bc1949fec75a7d2f699/control",qos=1)
  254. client.subscribe("$shadow/operation/result/414ce45bde0a4bc1949fec75a7d2f699",qos=1)
  255. client.on_publish = on_publish
  256. client.publish("$shadow/operation/414ce45bde0a4bc1949fec75a7d2f699", payload=param1,qos=2,retain=False)#get
  257. client.on_message = on_message
  258. client.loop_start()
  259. client_loop()

 此篇博客仅作为课设的笔记,如有错误欢迎指正~

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/188558
推荐阅读
相关标签
  

闽ICP备14008679号