赞
踩
在汽车行业中,CAN (Controller Area Network) 总线是用于车辆内部通信的关键技术。arxml文件是一种用于描述CAN消息和信号的标准格式。在本指南中,我们将介绍如何使用python-can
和cantools
库来解析arxml文件,并在Python中发送和接收CAN消息,我大概以下面的逻辑去实现整个发送和接收的框架,CANMsgSender和CANMsgReceiver建议通过线程去实现。
1、python-can的库支持多种硬件,另外还支持虚拟接口,方便调试的时候使用。
1)请确保安装的python解释器大于3.8,因为我们在发送报文时会用到send periodic()
函数,这个函数在python-can4.3.0版本之后才开始增加回调函数的方法(如下图示),当设定send_periodic(register_message,period = cycle_time,modifier_callback = update_msg_data)
,发送报文时会自动回调update_msg_data
函数,我们可以在update_msg_data
函数中更新报文数据。
1)首先,确保你的Python环境中已经安装了python-can
和cantools
库,这里的python-can的版本一定要大于等于4.3.0。如果没有安装,可以使用pip进行安装:
pip install python-can
pip install cantools
这个例子可以简单地实现报文的发送与接收,这里我就不展示完整的代码了,因为每个人的需求都不一样,可以按上面提供的框架进行开发。
import can import time import cantools #创建虚拟总线对象 send_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True) recv_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True) #cantools解析arxml database = cantools.database.load_file('demo.arxml' , strict=False) #这个是send_periodic的回调函数,python-can会周期回调这个函数 #另外这个message还有其他属性 def update_msg_data(message): for msg in database.messages: if message.arbitration_id == msg.frame_id: #这里可以处理message的data message.data = message.data print(f"正在回调{message.arbitration_id}报文") if __name__ == "__main__": #注册周期报文 for msg in database.messages: encode_data = [] msg_data = None #判断是否含有pdu if msg.is_container: for contain_msg in msg.contained_messages: #获取signal的字典,值就默认填0 signals_dict = {signal.name : 0 for signal in contain_msg.signals} contain_msg_data = contain_msg.encode(signals_dict) encode_data.append((contain_msg,contain_msg_data)) #获取报文的发送周期 cycle_time = contain_msg.cycle_time if contain_msg.cycle_time is not None else 20 #encode每个pdu的data msg_data = msg.encode(encode_data) #如果是报文类型 else: signals_dict = {signal.name : 0 for signal in msg.signals} cycle_time = msg.cycle_time if msg.cycle_time is not None else 20 msg_data = msg.encode(signals_dict) #注册周期发送报文 register_message = can.Message(timestamp=time.time(), arbitration_id=msg.frame_id, is_extended_id=msg.is_extended_frame, is_fd=True, data=msg_data) send_bus.send_periodic(register_message,period = cycle_time /1000,modifier_callback = update_msg_data) #循环接收报文 database_msg_dict = {msg.frame_id : msg for msg in database.messages} while True: decode_msg_dict = {} recv_msg = recv_bus.recv() frame_id = recv_msg.arbitration_id data = recv_msg.data try: if database_msg_dict[frame_id].is_container: contain_msgs = database.decode_message(frame_id,data,decode_choices=False,decode_containers=True,allow_truncated=True) for contain_msg in contain_msgs: contain_msg_signals = contain_msg[1] if isinstance(contain_msg_signals,dict): decode_msg_dict.update(contain_msg[1]) else: decode_msg_dict = database.decode_message(frame_id,data,decode_choices=False,decode_containers=False) print(decode_msg_dict) except Exception as e: print(e) continue
循环打印结果:
1)这里以canoe为例,首先在canoe中定义Application pycan(这个名字随便取)
,然后再分配到真实的canoe通道中去。
2)修改通道定义
#send_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True)
send_bus= can.interfaces.vector.canlib.VectorBus(app_name='pycan', channel=0, bitrate=500000, data_bitrate=2000000, fd=True)
3)其他硬件调用
点击查看python-can硬件介绍
1)回调函数中的message有其他属性,其他属性可以搭配cantools一起使用。
def update_msg_data(message):
for msg in database.messages:
if message.arbitration_id == msg.frame_id:
#这里可以处理message的data
message.data = message.data
print(f"正在回调{message.arbitration_id}报文")
通过本指南,你应该能够理解如何使用python-can
和cantools
库来处理CAN通信。这些库提供了强大的功能,可以帮助你在Python环境中轻松地实现CAN消息的发送和接收,以及arxml文件的解析。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。