赞
踩
我是一名python后端开发程序员,在一家创业公司中兢兢业业工作快两年了,从软件架构、开发、测试、部署、运维一手经办,到开发文档、API接口、开发周期、设备交付、安装完成全程对接跟踪。
自己曾经也是个小白呀,也是从校招时有老员工带入门的平稳过渡,到如今一个人扛下大部分研发责任,一直秉承着系统能够稳定运行最好就不要大刀阔斧改动,能够使用自己熟悉的技术领域解决问题就不要涉略新兴事物徒添麻烦。这是自己的立场。
但工作中往往是客户优先,一切开发以客户需求为第一立场。满足了客户的要求,客户下单了,那大家皆大欢喜。有订单,创业公司才得以生存,谋发展才有些微可能。
客户的立场就是:你能不能解决这个问题?需要多长时间?效果怎么样?
那么,站在客户的立场去想问题,开发尽可能以客户为中心,才可能开发出好的产品。
今年5月份,有个客户设备需要以MQTT协议对接,我当时并没有接触过,我跟老板说,其实可以用TCP、HTTP来完成设备间数据的交换,这些我是熟悉的,搭建一个简易的TCP服务器,我以前就做过。
原文跳转,其中的服务端代码:
# !/usr/bin/python
# -*- coding: utf-8 -*-
"""
@contact: 微信 1257309054
@file: tornado_server.py
@time: 2021/6/16 17:26
@author: LDC
"""
import asyncio
import re
import threading
import os
from functools import wraps
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.tcpserver import TCPServer
'''
设备监控程序
开启TCP服务,与设备采集器建立连接,
'''
def threadIng_async(f):
"""
使用装饰器和多线程实现异步执行
:param f:
:return:
"""
@wraps(f)
def wrapper(*args, **kwargs):
thr = threading.Thread(target=f, args=args, kwargs=kwargs)
thr.start()
return wrapper
class DeviceServer(TCPServer):
def __init__(self, account, servernum):
super(DeviceServer, self).__init__()
self.account = account
self.servernum = servernum
# 接收客户端连接请求
@gen.coroutine
def handle_stream(self, stream, address):
conn = Connection(self.account, self.servernum, stream, address)
yield conn.send_messages()
class Connection(object):
clients = set() # 客户端对象
def __init__(self, account, servernum, stream, address):
Connection.clients.add(self)
self.account = account
self.servernum = servernum
self._read_future = None
self._stream = stream # TCP对象
self._address = address # TCP地址
self.heart_count = 0 # 设备发送心跳包次数
self.previous_order = 'aa0550\r\n' # 上一条指令默认为回复心跳包
self.machine_code = '' # 机器编号
self.tcp_is_activate = True # tcp通道是否保持连接
self._stream.set_close_callback(self.on_close) # tcp通道关闭后相关变量处理函数
print('第{}个客户端,地址:{}'.format(len(self.__class__.clients),self._address))
@gen.coroutine
def send_messages(self, data='hello\r\n'):
# 使用coroutine实现函数分离的异步编程,即异步接收客户端发来的数据,其中数据以'\r\n'结尾
try:
yield self.send_message(data)
response1 = yield self.read_message()
self.deal_message(response1)
except Exception as e:
print('出错啦',e)
def read_message(self):
# 接收数据,遇到\r\n结束符就取出缓存区的数据
return self._stream.read_until(b'\r\n')
def deal_message(self, data):
# 处理数据
data = data.decode('utf-8').replace('\r\n', '')
print(self._address, data)
self.send_messages(data='I get it {}'.format(data))
def send_message(self, data):
# 发送数据
return self._stream.write(data.encode('utf-8'))
def on_close(self):
# 关闭通道
Connection.clients.remove(self)
self.tcp_is_activate = False # 机器tcp处于断开状态
print("客户端已经关闭",self._address)
def kill_port_process(port):
# 根据端口号杀死进程
ret = os.popen("netstat -nao|findstr " + str(port))
str_list = ret.read()
if not str_list:
print('端口未使用')
return
if 'TCP' in str_list:
# 只关闭处于LISTENING的端口
ret_list = str_list.replace(' ', '')
ret_list = re.split('\n', ret_list)
listening_list = [rl.split('LISTENING') for rl in ret_list]
process_pids = [ll[1] for ll in listening_list if len(ll) >= 2]
process_pid_set = set(process_pids)
for process_pid in process_pid_set:
os.popen('taskkill /pid ' + str(process_pid) + ' /F')
print(port, '端口已被释放')
time.sleep(1)
elif 'UDP' in str_list:
ret_list = re.split(' ', str_list)
process_pid = ret_list[-1].strip()
if process_pid:
os.popen('taskkill /pid ' + str(process_pid) + ' /F')
print('端口已被释放')
else:
print("端口未被使用")
def main():
# 程序入口
port = 5500
ip = '0.0.0.0'
kill_port_process(port)
asyncio.set_event_loop(asyncio.new_event_loop())
server = DeviceServer('123', 'device_server')
print('启动服务器')
# 启动服务器前先杀死端口
server.listen(port=port, address=ip)
IOLoop.instance().start()
if __name__ == '__main__':
main()
当时还想基于这个服务器做一个在线聊天系统,苦于一直没有找到一个好的桌面软件(后来工作中接触了PYQT5,觉得与Tornado结合成聊天室软件简直是天衣无缝)。
但客户已经使用了MQTT协议很多年了,这也是一个封装性、安全性、规范性很高的协议,老板说不能更改,需要按照客户的来。
我只好硬着头皮去CSDN平台找文档学习,去摸索,去看别人是怎么实现的。最后从了解到什么是MQTT,到如何在windows、linux上部署安装服务器,当然也整理成了博客,大家可以点传送门去看看,我摘取其中的部分代码。
代码:
import random
import time
from paho.mqtt import client as mqtt_client
topic = 'python_mqtt' # 发布的主题,订阅时需要使用这个主题才能订阅此消息
# 随机生成一个客户端id
client_id = 'python-mqtt-{}'.format(random.randint(0, 1000))
def connect_mqtt():
#连接mqtt服务器
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code \n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
# broker = 'broker.emqx.io'
# port = 1883
# client.connect(broker, port)
client.connect(host='127.0.0.1', port=1883)
return client
def publish(client):
# 发布消息
msg_count = 0
while True:
time.sleep(1)
msg = '这是客户端发送的第{}条消息'.format(msg_count)
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print('第{}条消息发送成功'.format(msg_count))
else:
print('第{}条消息发送失败'.format(msg_count))
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
import random
from paho.mqtt import client as mqtt_client
topic = "python_mqtt"
client_id = 'python-mqtt-{}'.format(random.randint(0, 100))
def connect_mqtt() -> mqtt_client:
# 连接MQTT服务器
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code \n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
# broker = 'broker.emqx.io'
# port = 1883
# client.connect(broker, port)
client.connect(host='127.0.0.1', port=1883)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
data = msg.payload.decode()
print('订阅【{}】的消息为:{}'.format(msg.topic, data))
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
经过沉淀后,发现网上关于Python的MQTT协议没有一个非常系统的讲解,大部分都是七零八落的片段,于是我也自己整理了一份更加详细的关于MQTT协议讲解,一方面是自己学习研究后的总结,一方面希望可以帮到有需要的人,大家能够互相学习精进。
这是传送门,点我跳转,其中有MQTT简介、安装服务器、使用示例、设置登录验证、后台访问、客户端上线下线通知。
对于小白的我来说,MQTT协议就是技术盲区,一开始很排斥,人在面对未知时往往是恐惧的。如果要成长,要蜕变,那就必须果敢地去学习,去钻研。
完成工作中的任务并不难,只需要把技术学习到七七八八就可以了,但要精通,到能够教导别人,这就需要下一番功夫了,不经一番寒彻骨,怎得梅花扑鼻香。
只有不断刻意练习,技术才能日益精进。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。