当前位置:   article > 正文

使用python-can和cantools实现arxml报文解析、发送和接收的完整指南_python cantools 发送报文

python cantools 发送报文


背景

在汽车行业中,CAN (Controller Area Network) 总线是用于车辆内部通信的关键技术。arxml文件是一种用于描述CAN消息和信号的标准格式。在本指南中,我们将介绍如何使用python-cancantools库来解析arxml文件,并在Python中发送和接收CAN消息,我大概以下面的逻辑去实现整个发送和接收的框架,CANMsgSender和CANMsgReceiver建议通过线程去实现。

在这里插入图片描述

一、硬件支持

1、python-can的库支持多种硬件,另外还支持虚拟接口,方便调试的时候使用。

点击查看python-can硬件介绍

在这里插入图片描述


二、环境准备

1、python解释器安装

1)请确保安装的python解释器大于3.8,因为我们在发送报文时会用到send periodic()函数,这个函数在python-can4.3.0版本之后才开始增加回调函数的方法(如下图示),当设定send_periodic(register_message,period = cycle_time,modifier_callback = update_msg_data),发送报文时会自动回调update_msg_data函数,我们可以在update_msg_data函数中更新报文数据。

点击查看python-can release信息

在这里插入图片描述

2、python库安装

1)首先,确保你的Python环境中已经安装了python-cancantools库,这里的python-can的版本一定要大于等于4.3.0。如果没有安装,可以使用pip进行安装:

pip install python-can
pip install cantools
  • 1
  • 2

三、 收发案例

这个例子可以简单地实现报文的发送与接收,这里我就不展示完整的代码了,因为每个人的需求都不一样,可以按上面提供的框架进行开发。

import can
import time
import cantools


#创建虚拟总线对象
send_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True)
recv_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True)

#cantools解析arxml
database = cantools.database.load_file('demo.arxml' , strict=False)

#这个是send_periodic的回调函数,python-can会周期回调这个函数
#另外这个message还有其他属性
def update_msg_data(message):
    for msg in database.messages:
        if message.arbitration_id == msg.frame_id:
            #这里可以处理message的data
            message.data = message.data
            print(f"正在回调{message.arbitration_id}报文")                        


if __name__ == "__main__":

    #注册周期报文
    for msg in database.messages:
        encode_data = []
        msg_data = None
        #判断是否含有pdu
        if msg.is_container:
            for contain_msg in msg.contained_messages:
                #获取signal的字典,值就默认填0
                signals_dict = {signal.name : 0 for signal in contain_msg.signals}
                contain_msg_data = contain_msg.encode(signals_dict)
                encode_data.append((contain_msg,contain_msg_data))
            #获取报文的发送周期
            cycle_time = contain_msg.cycle_time if contain_msg.cycle_time is not None else 20
            #encode每个pdu的data
            msg_data = msg.encode(encode_data)
        #如果是报文类型
        else:
            signals_dict = {signal.name : 0 for signal in msg.signals}
            cycle_time = msg.cycle_time if msg.cycle_time is not None else 20
            msg_data = msg.encode(signals_dict)
        #注册周期发送报文
        register_message = can.Message(timestamp=time.time(), arbitration_id=msg.frame_id, is_extended_id=msg.is_extended_frame, is_fd=True, data=msg_data)
        send_bus.send_periodic(register_message,period = cycle_time /1000,modifier_callback = update_msg_data)


    #循环接收报文
    database_msg_dict = {msg.frame_id : msg for msg in database.messages}
    while True:
        decode_msg_dict = {}
        recv_msg = recv_bus.recv()
        frame_id = recv_msg.arbitration_id
        data = recv_msg.data
        try:
            if database_msg_dict[frame_id].is_container:
                contain_msgs = database.decode_message(frame_id,data,decode_choices=False,decode_containers=True,allow_truncated=True)
                for contain_msg in contain_msgs:
                    contain_msg_signals = contain_msg[1]
                    if isinstance(contain_msg_signals,dict):
                        decode_msg_dict.update(contain_msg[1])
            else:
                decode_msg_dict =  database.decode_message(frame_id,data,decode_choices=False,decode_containers=False)
            print(decode_msg_dict)
        except Exception as e:
            print(e)
            continue
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

循环打印结果:
在这里插入图片描述


四、 方法拓展

1、canoe硬件调用

1)这里以canoe为例,首先在canoe中定义Application pycan(这个名字随便取),然后再分配到真实的canoe通道中去。
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

2)修改通道定义

#send_bus = can.interface.Bus('test', interface='virtual', preserve_timestamps=True)
send_bus= can.interfaces.vector.canlib.VectorBus(app_name='pycan', channel=0, bitrate=500000, data_bitrate=2000000, fd=True)
  • 1
  • 2

3)其他硬件调用
点击查看python-can硬件介绍


2、回调函数介绍

1)回调函数中的message有其他属性,其他属性可以搭配cantools一起使用。

def update_msg_data(message):
    for msg in database.messages:
        if message.arbitration_id == msg.frame_id:
            #这里可以处理message的data
            message.data = message.data
            print(f"正在回调{message.arbitration_id}报文")       
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
'
运行

在这里插入图片描述


结论

通过本指南,你应该能够理解如何使用python-cancantools库来处理CAN通信。这些库提供了强大的功能,可以帮助你在Python环境中轻松地实现CAN消息的发送和接收,以及arxml文件的解析。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/1011008
推荐阅读
相关标签
  

闽ICP备14008679号