赞
踩
市面上绝大部分智能门禁系统都有一个问题:对于开关门方面的确认尤为疏漏。绝大部分门锁都是通过手动拉动锁的开关实现门锁的开合,也存在部分门锁通过双锁设计,在开门锁的时候只打开一个电子锁,同时提供手拉的锁实现门锁打开、关闭与锁定操作,而这些门锁通通都没有对关门进行检测,只进行了短暂的延时就确认关锁,或是增加许多额外的传感器用来专门检测门锁。
基于以上社会背景,我决定研发一款智能门禁系统,它在实现大部分门锁基础功能上,能够实现对开关门的判断,同时可以为用户提供可视化界面,以供更好的人机交互和更优秀的用户体验,作为普及性最高以及利用率最高的门禁系统,我们将使用场景设定为个体户家居门禁,以此作为整体设计开发的依据。
为了实现该项目目标,提升门禁系统的安全性与便利性,本项目旨在开发一套先进的智慧家居门锁系统。该系统将集成多项现代化技术,包括可视化交互界面、门禁检测和门状态监测功能,以提供全面的安全解决方案。
作为家用门禁系统最直接的交互模式,可视化交互界面需要不仅能够为用户提供直观且友好的操作体验,还将涵盖密码开锁、修改门锁密码和连接云端等功能,以此实现对门锁的全面控制和管理,与同类型智能门锁相比较我们遥遥领先。
为了优化传感器配置,降低系统复杂性,并有效控制设备成本,我们引入了一种创新的动态检测设计。这一设计仅需一个RFID读卡器和一个固定在门上的NFC卡,无需配置多个传感器来分别检测门锁信号和门状态。RFID读卡器通过舵机的可旋转的特性,可以在不同位置对信号进行探测,从而显著提升系统的灵活性和检测精度。这种设计不仅简化了硬件结构,使得系统的安装和维护更加便捷,同时还有效减少了生产成本。通过动态检测设计,我们能够实现对门锁和门状态的精准监测,确保系统在各种使用场景下的可靠性。此外,这种设计方案还提升了系统的响应速度,使得用户在使用过程中能够获得更为流畅和高效的体验。整体而言,动态检测设计在保障功能完整性的同时,显著优化了系统的性能和成本效益,为智慧家居门锁系统的广泛应用奠定了坚实基础。
该门禁系统不同于其他传统智能门禁系统之处在于对关门有了非接触式检测,对家庭环境有了更高效的安全保障。门状态监测功能将通过传感器实时监测门的开关状态,并将状态信息显示在可视化界面上。在门未关闭的情况下,系统将在一定时间后发出声音通知,提醒用户及时关门,防止安全隐患的发生。
系统将通过云平台实现动态门锁的远程管理和监控。这一功能的主要组件包括本地门锁系统、云服务器、用户移动应用和管理控制台。通过云平台,门锁系统能够实现实时的数据上报和下发功能,从而实现对门锁开关状态的精确控制。用户可以通过移动应用随时随地查看门锁状态、接收异常报警,并进行远程开锁或上锁操作。同时,管理控制台提供了对所有门锁设备的集中监控和管理功能,使管理员能够方便地进行权限分配、日志查询和设备维护。云平台不仅提升了门锁系统的智能化程度,还大大增强了安全性和便捷性,确保了用户和管理者的良好体验。
综上所述,我们项目中的智慧家居门锁系统通过创新技术和功能整合,不仅让用户轻松管理门禁,还显著增强了安全防护能力。它适用于各种场所,比如公司大楼、学校和公共场所等,为现代社会的安全管理带来了全新的解决方案和可能性。
系统将分为云端、PC可视界面和硬件三个主要模块。
其中硬件部分通过ESP32连接RFID传感器、舵机、电机、人体传感器、LED灯组成。当ESP32得到RFID传感器的门禁信号后旋转舵机将RFID传感器转至侧面检测关门信号。如果检测到关门信号后转移到正面准备检测信号。具体结构如图。
综上,需使用一个SR505人体传感器,一个LED灯和按钮,一个舵机,一个有刷步进电机,一个CH340串口通信模块,一个RFID-RC522模块实现。
我这里用的是01studio家的ESP32,还有手册可以辅助编写(没有广告)。
具体连线里,最重要的是RFID,RFID模块通过SPI通信,根据ESP32手册定义,SDA接口接入15脚,SCL接14脚,MOSI接13脚,MISO接12脚,RST可以自定义接5脚,需要注意的是该供电为3.3V。CH340直接使用系统的串口2,17脚为TX,16脚为RX,与CH340模块互相反接,并共地LED和按键模块使用系统硬件自带的模块,接通0和2脚,人体模块接入33脚,因为ESP32支持所有管脚的PWM,所以随意选择27脚为舵机管脚,设备中的电机需要用H桥来接通实现正反转,但没有电源和Mos管,所以直接接入25和26脚实现简单工作即可。
设备端的软件总体分为光照控制部分和门锁控制部分。
光照控制部分中,我们在判断是否要开灯的逻辑里,需要对是否有开灯信号进行判断,如果是白天,只要没有外界开灯信号我们都关灯,如果有开灯信号则开灯3秒关闭开灯信号,如果是晚上,我们也先对是否有开灯信号进行判断,如果有则开灯三秒关闭开灯信号,如果没有再对人体传感器进行判断,如果检测到人则开灯,如果没有检测到人就不开灯。
该设备对灯控系统做繁琐的控制,是因为不仅要为使用者在平时使用的时候及时开灯,也要照顾到小孩和残疾人群体如果行动不便或不够高没办法被人体传感器检测到导致无法自动开灯问题。
门锁控制中,我们可以通过云端下发,或PC端通过密码或是通过门禁卡打开,根据系统总体架构中的结构图可以得出,我们需要对分别门锁以及RFID的识别进行控制,所以,程序设计中,得到信号转动之前要对当前门状态进行判断,如果是锁门状态,则将舵机旋转至90度,电机正转,直到触碰按键,如果是开门状态,则将舵机旋转回0度,电机反转,直到触碰按键。
PC端通过串口进行通信,提供图形化界面,用以更好的交互操作体验。为了使设备具有更好的灵活性,在连接到串口后,会等待ESP32的Token需求,在得到Token需求时,自动生成发出,实现实时Token交互,实现代码。当PC端打开时,需要展示输入密码界面,同时也需要能够在线更改密码。所以除了从0-9的按钮外,也需要修改密码和确认密码并开锁按钮,为了照顾到特殊人群,结合上文中的灯控系统,PC端开灯按钮也需要加上。同时需要实时展示门锁状态、灯状态 、人体信号、当前为白天和黑夜,以及输入的密码的界文本框,同时展示,便于交互。
ESP32会实时向PC发送设备信息,所以当PC端获取到设备信息时,对信号内容进行判断,内容基本分为锁状态、灯状态、天气状态、人体传感器状态,将获取到对应的状态进行展示。同时可以获取密码,对密码进行判断,如果是密码则开门锁,如果不是就报警并返回先前的界面。修改密码操作也要先获取密码,验证密码正确后再完成修改密码操作。
ESP32在得到PC端的Topic数据之后,会将数据结合设备名、网址、用户名等信息进行MQTT连接,向设备对应的Topic进行订阅,并设置要发送的数据的Topic。
ESP32端通过定时器每1秒将人体数据、是否开灯、是否开锁和RFID检测的门禁卡号数据传给云平台。云平台通过ujson协议进行解包,将数据分解后加入数据流。
云平台上对数据流进行建立,定义与ESP32相同的数据流名称,并标定单位的名称和符号。
在设备管理中,我们设置实时刷新后,数据就会实时更新显示,同时在也会将数据记录保存下来,
此外,我们也能够支持数据可视化,在可视化界面建立一个项目后,拖动图表,将开关门锁的实时状态展示在上面。我们将数据选择为物联网平台,然后获取我们的accesskey、userid和产品id,就可以自动找到对应的文件,通过文件找到对应的数据流
onenet平台使用数据流模式,方便结构和操作。
参考代码如下
main.c代码
from read_all import do_read
import network, time
from machine import UART ,Pin,ADC,Timer,PWM
import random
from threading import Thread
# from simple import MQTTClient #导入MQTT板块
# import simple.py
import onewire,ds18x20
import time,dht,ujson
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
uart=UART(2,9600) #设置串口号2和波特率,TX--Y9--17,RX--Y10--16
#message=do_read()
#print("xxis",message)
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0")
msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7F:
premsg[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg )
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"\xe0\0")
self.sock.close()
def ping(self):
self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
#启动硬件读取传感器
LED=Pin(2,Pin.OUT)
KEY=Pin(0,Pin.IN,Pin.PULL_UP) #构建 KEY 对象
ren=Pin(33,Pin.IN)#
S1 = PWM(Pin(27), freq=50, duty=0) # Servo1 的引脚是 18
moto1=Pin(25,Pin.OUT)#电机转动
moto2=Pin(26,Pin.OUT)#电机转动2
def Servo(angle):
S1.duty(int(((angle+90)*2/180+0.5)/20*1023))
doorflag=0
rfid_mes="0"
qidong=1
day=0
ledflag=0
renflag=0
cnt=0
uartflag=0
def Key_Fun():
if KEY.value()==0: #按键被按下
time.sleep_ms(10) #消除抖动
if KEY.value()==0: #确认按键被按下
return 1
return 0
def ren_Fun():
if ren.value()==1:
time.sleep_ms(10) #消除抖动
if ren.value()==1: #确认
return 1
return 0
def door_statu():
global doorflag,qidong
if(doorflag==1):
if qidong==0:
print("doorflag",doorflag,"qidong",qidong)
qidong=1#只启动一次
Servo(90)
time.sleep(0.1)
#-90 度
Servo(45)
time.sleep(0.1)
#-90 度
Servo(0)
time.sleep(0.1)
while(1):
moto1.value(1)
moto2.value(0)
if Key_Fun():
moto1.value(0)
return
elif(doorflag==0):
if qidong==0:
print("doorflag",doorflag,"qidong",qidong)
qidong=1#只启动一次
Servo(0)
time.sleep(0.1)
#-90 度
Servo(45)
time.sleep(0.1)
#-90 度
Servo(90)
while(1):
moto1.value(1)
moto2.value(0)
if Key_Fun():
moto1.value(0)
return
def light_statu():
global day,renflag,ledflag
if day==1:
LED.value(ledflag)
if ledflag==1:
uart.write(("command=arduino3;\n").encode("utf-8"))
print("白天开灯")
else:
uart.write(("command=arduino4;\n").encode("utf-8"))
elif day==0:
if ledflag==1:
LED.value(1)
print("晚上开灯")
uart.write(("command=arduino3;\n").encode("utf-8"))
elif(ledflag==0):
if renflag==1:
LED.value(1)
uart.write(("command=arduino3;\n").encode("utf-8"))
elif renflag==0:
LED.value(0)
uart.write(("command=arduino4;\n").encode("utf-8"))
def chuankou():
global doorflag,qidong,day,ledflag
#time.sleep(0.3)
if uart.any():
#time.sleep(0.3)
text=uart.read().decode("utf-8")
print("uart read ",text)
#time.sleep(0.3)
if text.startswith("command=arduino2"):
print("uno2,ledstatu",str(ledflag),"daystatu",str(day))
ledflag=1
time.sleep(0.3)
if text.startswith("command=arduino1"):
print("doorflag",str(doorflag),"qidong",str(qidong))
if doorflag==0:
doorflag=1
qidong=0
uart.write(("command=arduino1;\n").encode("utf-8"))
time.sleep(0.3)
def sendchuankou():
global doorflag,rfid_mes,day,ledflag,renflag,uartflag
time.sleep(0.3)
if uartflag==0:
uart.write(("command=arduino5;renti="+str(renflag)+";\n").encode("utf-8"))
if uartflag==1:
uart.write(("command=arduino6;led="+str(ledflag)+";\n").encode("utf-8"))
if uartflag==2:
uart.write(("command=arduino7;rfid="+str(rfid_mes)+";\n").encode("utf-8"))
if uartflag==3:
uart.write(("command=arduino8;day="+str(day)+";\n").encode("utf-8"))
if uartflag==4:
if doorflag==1:
uart.write(("command=arduino1;\n").encode("utf-8"))
else:
uart.write(("command=arduino2;\n").encode("utf-8"))
time.sleep(0.3)
uartflag+=1
uartflag%=5
#定时器数据处理发送
def PostProperty_Fun():
global renflag,ledflag,cnt,doorflag,rfid_mes
renflag=ren_Fun()
print("renflag=",str(renflag))
chuankou()
door_statu()#门锁状态,让他及时跑
light_statu()#灯状态
if ledflag==1:
cnt+=1
if cnt>=6:
ledflag=0
cnt=0
if ledflag==1:
lightmqtt=1
else:
lightmqtt=renflag
if rfid_mes!="0":
data_Message = {"id":1,"dp":{"renti":[{"v":renflag}],"door":[{"v":doorflag}],"light":[{"v":lightmqtt}],"rfid":[{"v":rfid_mes}]}}
else:
data_Message = {"id":1,"dp":{"renti":[{"v":renflag}],"door":[{"v":doorflag}],"light":[{"v":lightmqtt}]}}
#print(data_Message)
sendchuankou()
Property_Message=ujson.dumps(data_Message)
client.publish(topic=Post_PropPost_Property_Topic, msg=Property_Message, retain=False, qos=0)
#接收信号回调函数
def MQTT_callback(topic, msg):
global doorflag,day,ledflag,qidong
print(msg.decode("utf-8"))
datadp=ujson.loads(msg)["dp"]
print(datadp)
if datadp['day'][0]['v']=="1":#白天
day=1
if datadp['day'][0]['v']=="0":#晚上
day=0
if datadp['led'][0]['v']=="on":#白天或晚上开灯
ledflag=1
if datadp['led'][0]['v']=="off":#关灯
ledflag=0
if datadp['door'][0]['v']==1:#开门
doorflag=1
qidong=0
print("doorflag",doorflag)
if datadp['door'][0]['v']==0:#锁门
doorflag=0
qidong=0
print("doorflag",doorflag)
def MQTT_PostSubscribe(tim):
client.check_msg()##检查服务器是否有待处理消息
PostProperty_Fun()
#WiFi连接
def WIFI_Connect():
wlan=network.WLAN(network.STA_IF)
wlan.active(True)
start_time=time.time()
if not wlan.isconnected():
print('connecting to network...')
#wlan.connect('Xiaomi 6','Jhzdhl1314.')
wlan.connect('0032','12345678')
while not wlan.isconnected():
if time.time()-start_time > 15:
print('WIFI Connected Timeout!')
break
if wlan.isconnected():
print("is connected")
return True
else:
return False
if WIFI_Connect():
SERVER = "mqtts.heclouds.com"
device_name="这是设备名"
PORT = 1883
product_id="这里是项目id"
product_access_key="这里填你的acceskey"
#topic_get=""
topic_post="$sys/5FIWxZXlN3/temp/dp/post/json"
username = "这里是项目id"
password=""
while(1):
uart.write("startlink".encode("utf-8"))
if uart.any():
text=uart.read().decode("utf-8")
print(text)
if text.startswith("version"):
password = text
uart.write("linkstart".encode("utf-8"))
break
print("linkstart")
Post_PropPost_Property_Topic = "$sys/"+product_id+"/"+device_name+"/dp/post/json"
REV_TOPIC="$sys/"+product_id+"/"+device_name+"/cmd/request/+"
client=MQTTClient(device_name, SERVER, PORT, username, password, 60) #建立客户端对象
client.connect()
client.set_callback(MQTT_callback)#配置回调函数
client.subscribe(REV_TOPIC) #订阅主题
Tim_RTOS = Timer(-1)
Tim_RTOS.init(
period=1500,mode=Timer.PERIODIC, callback=MQTT_PostSubscribe
)
while 1:
rfid_mes="0"
rfid_mes=do_read()
if(doorflag==0):#关锁状态开锁
doorflag=1
uart.write(("command=arduino1;\n").encode("utf-8"))
qidong= 0
elif(doorflag==1):#开锁状态关锁
doorflag=0
uart.write(("command=arduino2;\n").encode("utf-8"))
qidong=0
print("RFID",str(rfid_mes),"doorflag",str(doorflag),"qidong",str(qidong))
time.sleep(2)
print("delay done")
mfrc522.py代码
from machine import Pin, SPI
from os import uname
#SDA-----15
#SCK-----14
#MOSI----13
#MISO----12
#RST-----5
class MFRC522:
OK = 0
NOTAGERR = 1
ERR = 2
REQIDL = 0x26
REQALL = 0x52
AUTHENT1A = 0x60
AUTHENT1B = 0x61
def __init__(self, sck, mosi, miso, rst, cs):
self.sck = Pin(sck, Pin.OUT)
self.mosi = Pin(mosi, Pin.OUT)
self.miso = Pin(miso)
self.rst = Pin(rst, Pin.OUT)
self.cs = Pin(cs, Pin.OUT)
self.rst.value(0)
self.cs.value(1)
board = uname()[0]
self.spi = SPI(baudrate=100000, polarity=0, phase=0, sck=self.sck, mosi=self.mosi, miso=self.miso)
self.spi.init()
self.rst.value(1)
self.init()
def _wreg(self, reg, val):
self.cs.value(0)
self.spi.write(b'%c' % int(0xff & ((reg << 1) & 0x7e)))
self.spi.write(b'%c' % int(0xff & val))
self.cs.value(1)
def _rreg(self, reg):
self.cs.value(0)
self.spi.write(b'%c' % int(0xff & (((reg << 1) & 0x7e) | 0x80)))
val = self.spi.read(1)
self.cs.value(1)
return val[0]
def _sflags(self, reg, mask):
self._wreg(reg, self._rreg(reg) | mask)
def _cflags(self, reg, mask):
self._wreg(reg, self._rreg(reg) & (~mask))
def _tocard(self, cmd, send):
recv = []
bits = irq_en = wait_irq = n = 0
stat = self.ERR
if cmd == 0x0E:
irq_en = 0x12
wait_irq = 0x10
elif cmd == 0x0C:
irq_en = 0x77
wait_irq = 0x30
self._wreg(0x02, irq_en | 0x80)
self._cflags(0x04, 0x80)
self._sflags(0x0A, 0x80)
self._wreg(0x01, 0x00)
for c in send:
self._wreg(0x09, c)
self._wreg(0x01, cmd)
if cmd == 0x0C:
self._sflags(0x0D, 0x80)
i = 2000
while True:
n = self._rreg(0x04)
i -= 1
if ~((i != 0) and ~(n & 0x01) and ~(n & wait_irq)):
break
self._cflags(0x0D, 0x80)
if i:
if (self._rreg(0x06) & 0x1B) == 0x00:
stat = self.OK
if n & irq_en & 0x01:
stat = self.NOTAGERR
elif cmd == 0x0C:
n = self._rreg(0x0A)
lbits = self._rreg(0x0C) & 0x07
if lbits != 0:
bits = (n - 1) * 8 + lbits
else:
bits = n * 8
if n == 0:
n = 1
elif n > 16:
n = 16
for _ in range(n):
recv.append(self._rreg(0x09))
else:
stat = self.ERR
return stat, recv, bits
def _crc(self, data):
self._cflags(0x05, 0x04)
self._sflags(0x0A, 0x80)
for c in data:
self._wreg(0x09, c)
self._wreg(0x01, 0x03)
i = 0xFF
while True:
n = self._rreg(0x05)
i -= 1
if not ((i != 0) and not (n & 0x04)):
break
return [self._rreg(0x22), self._rreg(0x21)]
def init(self):
self.reset()
self._wreg(0x2A, 0x8D)
self._wreg(0x2B, 0x3E)
self._wreg(0x2D, 30)
self._wreg(0x2C, 0)
self._wreg(0x15, 0x40)
self._wreg(0x11, 0x3D)
self.antenna_on()
def reset(self):
self._wreg(0x01, 0x0F)
def antenna_on(self, on=True):
if on and ~(self._rreg(0x14) & 0x03):
self._sflags(0x14, 0x03)
else:
self._cflags(0x14, 0x03)
def request(self, mode):
self._wreg(0x0D, 0x07)
(stat, recv, bits) = self._tocard(0x0C, [mode])
if (stat != self.OK) | (bits != 0x10):
stat = self.ERR
return stat, bits
def anticoll(self):
ser_chk = 0
ser = [0x93, 0x20]
self._wreg(0x0D, 0x00)
(stat, recv, bits) = self._tocard(0x0C, ser)
if stat == self.OK:
if len(recv) == 5:
for i in range(4):
ser_chk = ser_chk ^ recv[i]
if ser_chk != recv[4]:
stat = self.ERR
else:
stat = self.ERR
return stat, recv
def select_tag(self, ser):
buf = [0x93, 0x70] + ser[:5]
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)
return self.OK if (stat == self.OK) and (bits == 0x18) else self.ERR
def auth(self, mode, addr, sect, ser):
return self._tocard(0x0E, [mode, addr] + sect + ser[:4])[0]
def stop_crypto1(self):
self._cflags(0x08, 0x08)
def read(self, addr):
data = [0x30, addr]
data += self._crc(data)
(stat, recv, _) = self._tocard(0x0C, data)
return recv if stat == self.OK else None
def write(self, addr, data):
buf = [0xA0, addr]
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)
if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
stat = self.ERR
else:
buf = []
for i in range(16):
buf.append(data[i])
buf += self._crc(buf)
(stat, recv, bits) = self._tocard(0x0C, buf)
if not (stat == self.OK) or not (bits == 4) or not ((recv[0] & 0x0F) == 0x0A):
stat = self.ERR
return stat
read_all.py代码
import mfrc522
from os import uname
ESP32_HSPI_CLOCK = 14 #P14 (HSPICLK in ESP32 Docs)
ESP32_HSPI_SLAVE_SELECT = 15 #P15 (HSPICS0 in ESP32 Docs)
ESP32_HSPI_MISO = 12 #P12 (HSPIQ[uery] in ESP32 Docs)
ESP32_HSPI_MOSI = 13 #P13 (HSPID[ata] in ESP32 Docs)
ESP32_MFRC522_RST = 5 #SD3, as plain GPIO - can b any unused GPIO
def do_read():
if uname()[0] == 'WiPy':
rdr = mfrc522.MFRC522("GP14", "GP16", "GP15", "GP22", "GP17")
elif uname()[0] == 'esp32':
rdr = mfrc522.MFRC522(ESP32_HSPI_CLOCK, ESP32_HSPI_MOSI, ESP32_HSPI_MISO, ESP32_MFRC522_RST, ESP32_HSPI_SLAVE_SELECT)
else:
raise RuntimeError("Unsupported platform")
print("")
#print("Place card before reader to read from address 0x08")
print("")
try:
while True:
(stat, tag_type) = rdr.request(rdr.REQIDL)
if stat == rdr.OK:
(stat, raw_uid) = rdr.anticoll()
if stat == rdr.OK:
#print("New card detected")
#print(" - tag type: 0x%02x" % tag_type)
print(" - uid : 0x%02x%02x%02x%02x" % (raw_uid[0], raw_uid[1], raw_uid[2], raw_uid[3]))
print("")
xx=("0x%02x%02x%02x%02x" % (raw_uid[0], raw_uid[1], raw_uid[2], raw_uid[3]))
return xx
if rdr.select_tag(raw_uid) == rdr.OK:
key = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF]
if rdr.auth(rdr.AUTHENT1A, 8, key, raw_uid) == rdr.OK:
#print("Address 8 data: %s" % rdr.read(8))
rdr.stop_crypto1()
else:
print("Authentication error")
else:
print("Failed to select tag")
except KeyboardInterrupt:
print("Bye")
云平台设计这里不多赘述,基本就是创建项目的方法,这里还使用了串口连接PC端,详见PC端的代码。
from tkinter import *
from tkinter import font,simpledialog,messagebox
from onenet import tokenn
import base64
import hmac
import time
from urllib.parse import quote
import serial,threading
recflag=1
ser=serial.Serial("COM4",9600)
ser.bytesize=8
ser.stopbits=1
ser.parity=serial.PARITY_NONE#奇偶校验位
def rece():
global recflag
while 1:
# print(122233)
time.sleep(1)
a = ser.read_all();
print((a.decode('utf-8')))
print("")
if (a.decode('utf-8')).find("startlink") != -1:
my_token = tokenn()
ser.write(my_token.encode('utf-8'))
print(my_token)
pass
if (a.decode('utf-8')).find("command=arduino1") != -1:
l1['text']="门开锁"
pass
if (a.decode('utf-8')).find("command=arduino2") != -1:
l1['text'] = "门关锁"
pass
if (a.decode('utf-8')).find("command=arduino3") != -1:
l2['text'] = "灯已开"
pass
if (a.decode('utf-8')).find("command=arduino4") != -1:
l2['text'] = "灯已关"
pass
if (a.decode('utf-8')).find("command=arduino5") != -1:
tt = a.decode('utf-8').split(";")[1].split('=')[1]
print(tt)
if tt=='1':
l3['text'] = "有人"
elif tt=='0':
l3['text'] = "没人"
pass
if (a.decode('utf-8')).find("command=arduino6") != -1:
tt = a.decode('utf-8').split(";")[1].split('=')[1]
if tt == "1":
l2['text'] = "灯已开"
else:
l2['text'] = "灯已关"
pass
# if (a.decode('utf-8')).find("command=arduino7") != -1:
# tt = a.split(";")[1].split('=')[1]
#
# pass
if (a.decode('utf-8')).find("command=arduino8") != -1:
tt = a.decode('utf-8').split(";")[1].split('=')[1]
if tt == "1":
l4['text'] = "白天"
else:
l4['text'] = "黑夜"
pass
thread1=threading.Thread(target=rece)
thread1.start()
def tokenn():
version = '2018-10-31'
res = 'products/%s' % '写你的设备id' # 通过产品'5FIWxZXlN3'访问产品API
# 用户自定义token过期时间
et = str(int(time.time()) + 3600)
#et="1672735919"
# 签名方法,支持md5、sha1、sha256
method = 'sha1'
# 对'QmlOM0g3Y0o1MjdIQ1M3bUdGd2JxRUYzT3JZekdnTjc='进行decode
key = base64.b64decode('写你的accesskey')
# 计算sign
org = et + '\n' + method + '\n' + res + '\n' + version
sign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)
sign = base64.b64encode(sign_b.digest()).decode()
# value 部分进行url编码,method/res/version值较为简单无需编码
sign = quote(sign, safe='')
res = quote(res, safe='')
# token参数拼接
token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)
return token
######tkinter图形界面
passwd=""
cng_passwd_flag=0
password="123456"
def cng_passwd():
global cng_passwd_flag,passwd
if cng_passwd_flag==0:
passwd = ""
cng_passwd_flag=2
l5['text']="旧密码"
b1['text'] = "取消修改"
b3['text'] = "确认密码"
elif cng_passwd_flag==1 or cng_passwd_flag==2:
passwd = ""
cng_passwd_flag=0
l5['text'] = "欢迎回家"
b1['text'] = "修改密码"
b3['text'] = "打开门锁"
pass
top=Tk()
top.title("门禁系统PC端")
top.geometry("350x350")
large_font = font.Font(family="Helvetica", size=30, weight="bold")
def light():
ser.write("command=arduino2".encode('utf-8'))
pass
def lock():
global passwd,password,cng_passwd_flag
if cng_passwd_flag==0:
if passwd==password:
messagebox.showinfo("成功", "开锁成功")
ser.write("command=arduino1".encode('utf-8'))
else:
messagebox.showinfo("失败", "密码错误")
l5['text'] ="欢迎回家"
passwd = ""
if cng_passwd_flag == 1:
password=passwd
messagebox.showinfo("成功", "修改成功")
cng_passwd_flag = 0
l5['text'] = "欢迎回家"
b1['text'] = "修改密码"
b3['text'] = "打开门锁"
passwd = ""
print("新密码:",password)
if cng_passwd_flag == 2:
if passwd==password:
cng_passwd_flag = 1
l5['text'] = "新密码"
b3['text'] = "确认修改"
passwd = ""
else:
messagebox.showinfo("错误", "密码错误,修改失败")
passwd = ""
cng_passwd_flag = 0
l5['text'] = "欢迎回家"
b1['text'] = "修改密码"
b3['text'] = "打开门锁"
pass
def cmd1():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd+='1'
l5['text']="密码"+passwd
if cng_passwd_flag == 1:
passwd += '1'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '1'
l5['text'] = "旧密码:" + passwd
pass
def cmd2():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '2'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '2'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '2'
l5['text'] = "旧密码:" + passwd
pass
def cmd3():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '3'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '3'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '3'
l5['text'] = "旧密码:" + passwd
pass
def cmd4():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '4'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '4'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '4'
l5['text'] = "旧密码:" + passwd
pass
def cmd5():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '5'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '5'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '5'
l5['text'] = "旧密码:" + passwd
pass
def cmd6():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '6'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '6'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '6'
l5['text'] = "旧密码:" + passwd
pass
def cmd7():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '7'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '7'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '7'
l5['text'] = "旧密码:" + passwd
pass
def cmd8():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '8'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '8'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '8'
l5['text'] = "旧密码:" + passwd
pass
def cmd9():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '9'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '9'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '9'
l5['text'] = "旧密码:" + passwd
pass
def cmd0():
global passwd,cng_passwd_flag
if cng_passwd_flag == 0:
passwd += '0'
l5['text'] = "密码" + passwd
if cng_passwd_flag == 1:
passwd += '0'
l5['text'] = "新密码:" + passwd
if cng_passwd_flag == 2:
passwd += '0'
l5['text'] = "旧密码:" + passwd
pass
l5 = Label(top, text="欢迎回家", font=large_font)
l5.place(x=100, y=10)
l1 = Label(top, text="门状态", font=font.Font(family="Helvetica", size=10,weight="bold"))
l1.place(x=10, y=100)
l2 = Label(top, text="灯", font=font.Font(family="Helvetica", size=10,weight="bold"))
l2.place(x=10, y=150)
l3 = Label(top, text="检测人", font=font.Font(family="Helvetica", size=10,weight="bold"))
l3.place(x=10, y=200)
l4 = Label(top, text="白天",font=font.Font(family="Helvetica", size=10,weight="bold"))
l4.place(x=10, y=250)
# l6 = Label(top, text="rfid",font=font.Font(family="Helvetica", size=10,weight="bold"))
# l6.place(x=10, y=300)
b1 = Button(top, text="修改密码",activebackground='#00DE8B', command=cng_passwd)
b1.place(x=90, y=250)
b2 = Button(top, text="开灯",activebackground='#C9DE13', command=light)
b2.place(x=180, y=300)
b3 = Button(top, text="打开门锁",activebackground='#8658E8', command=lock)
b3.place(x=250, y=250)
b4 = Button(top, text="退出应用",activebackground='#DE173F', command=top.destroy)
b4.place(x=250, y=300)
block1=Button(top, text="1",padx='10',activebackground='#72E1FF', command=cmd1)
block1.place(x=100,y=100)
block2=Button(top, text="2",padx='10',activebackground='#72E1FF', command=cmd2)
block2.place(x=180,y=100)
block3=Button(top, text="3",padx='10',activebackground='#72E1FF', command=cmd3)
block3.place(x=260,y=100)
block4=Button(top, text="4",padx='10',activebackground='#FFC90E', command=cmd4)
block4.place(x=100,y=150)
block5=Button(top, text="5",padx='10',activebackground='#FFC90E', command=cmd5)
block5.place(x=180,y=150)
block6=Button(top, text="6",padx='10',activebackground='#FFC90E', command=cmd6)
block6.place(x=260,y=150)
block7=Button(top, text="7",padx='10',activebackground='#FF8282', command=cmd7)
block7.place(x=100,y=200)
block8=Button(top, text="8",padx='10',activebackground='#FF8282', command=cmd8)
block8.place(x=180,y=200)
block9=Button(top, text="9",padx='10',activebackground='#FF8282', command=cmd9)
block9.place(x=260,y=200)
block0=Button(top, text="0",padx='10',activebackground='#FFC1FE', command=cmd0)
block0.place(x=180,y=250)
top.mainloop()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。