当前位置:   article > 正文

python实现modbus通讯-modbus_tk_python modbus rtu

python modbus rtu

使用Modbus_Tk实现Modbus RTU和Modbus TCP客户端/服务端

简介

Modbus_Tk是一个用于Python编程语言的Modbus协议库,它提供了实现Modbus RTU和Modbus TCP通信的功能。本文将介绍如何使用Modbus_Tk库来实现Modbus RTU和Modbus TCP的客户端和服务端。

准备工作

在开始之前,我们需要完成以下准备工作:

  1. 安装Python:确保你已经安装了Python解释器,可以从Python官方网站下载并安装最新版本。
  2. 安装Modbus_Tk:使用pip命令运行pip install modbus_tk来安装Modbus_Tk库。

Modbus RTU客户端

下面是一个示例代码,展示了如何使用Modbus_Tk实现Modbus RTU客户端:

from modbus_tk import modbus_rtu

# 创建一个Modbus RTU主站
# master = modbus_rtu.RtuMaster('/dev/ttyUSB0')
master = modbus_rtu.RtuMaster(
            serial.Serial(port='/dev/ttyUSB0', baudrate=9600, bytesize=8, parity='N', stopbits=1, xonxoff=0)
        )

# 设置串口参数
master.set_timeout(5.0)
master.set_verbose(True)

# 连接到设备
master.open()

# 读取保持寄存器的值
result = master.execute(1, modbus_rtu.READ_HOLDING_REGISTERS, 0, 10)

# 打印结果
print(result)

# 关闭连接
master.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在上面的代码中,我们首先导入了modbus_rtu模块,然后创建了一个Modbus RTU主站对象,指定了串口设备路径。接下来,我们设置了串口的超时时间和详细输出模式。然后,我们使用execute方法发送了一个读取保持寄存器的请求,并打印出了结果。最后,我们关闭了连接。

Modbus RTU服务端

下面是一个示例代码,展示了如何使用Modbus_Tk实现Modbus RTU服务端:

from modbus_tk import modbus_rtu

# 创建一个Modbus RTU从站
# slave = modbus_rtu.RtuSlave('/dev/ttyUSB0')
slave = modbus_rtu.RtuSlave( 
	serial.Serial(port='/dev/ttyUSB0', baudrate=9600, bytesize=8, parity='N', 			stopbits=1, xonxoff=0)
)

# 设置从站ID
slave.set_slave(1)

# 启动从站
slave.start()

# 设置保持寄存器的初始值
slave.set_values('holding_registers', 0, [1, 2, 3, 4, 5])

# 运行从站
slave.join()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

在上面的代码中,我们同样创建了一个Modbus RTU从站对象,并指定了串口设备路径。然后,我们设置了从站的ID和保持寄存器的初始值。最后,我们通过调用join方法来运行从站。

Modbus TCP客户端

下面是一个示例代码,展示了如何使用Modbus_Tk实现Modbus TCP客户端:

import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp, hooks
import logging

def main():
    """main"""
    logger = modbus_tk.utils.create_logger("console", level=logging.DEBUG)

    def on_after_recv(data):
        master, bytes_data = data
        logger.info(bytes_data)

    hooks.install_hook('modbus.Master.after_recv', on_after_recv)

    try:

        def on_before_connect(args):
            master = args[0]
            logger.debug("on_before_connect {0} {1}".format(master._host, master._port))

        hooks.install_hook("modbus_tcp.TcpMaster.before_connect", on_before_connect)

        def on_after_recv(args):
            response = args[1]
            logger.debug("on_after_recv {0} bytes received".format(len(response)))

        hooks.install_hook("modbus_tcp.TcpMaster.after_recv", on_after_recv)

        # Connect to the slave
        master = modbus_tcp.TcpMaster()
        master.set_timeout(5.0)
        logger.info("connected")

        logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 3))

        # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='f'))

        # Read and write floats
        # master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, starting_address=0, output_value=[3.14], data_format='>f')
        # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 0, 2, data_format='>f'))

        # send some queries
        # logger.info(master.execute(1, cst.READ_COILS, 0, 10))
        # logger.info(master.execute(1, cst.READ_DISCRETE_INPUTS, 0, 8))
        # logger.info(master.execute(1, cst.READ_INPUT_REGISTERS, 100, 3))
        # logger.info(master.execute(1, cst.READ_HOLDING_REGISTERS, 100, 12))
        # logger.info(master.execute(1, cst.WRITE_SINGLE_COIL, 7, output_value=1))
        # logger.info(master.execute(1, cst.WRITE_SINGLE_REGISTER, 100, output_value=54))
        # logger.info(master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=[1, 1, 0, 1, 1, 0, 1, 1]))
        # logger.info(master.execute(1, cst.WRITE_MULTIPLE_REGISTERS, 100, output_value=xrange(12)))

    except modbus_tk.modbus.ModbusError as exc:
        logger.error("%s- Code=%d", exc, exc.get_exception_code())

if __name__ == "__main__":
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

在上面的代码中,我们创建了一个Modbus TCP主站对象,指定了连接的IP地址和端口号。然后,我们使用execute方法发送了一个读取保持寄存器的请求,并打印出了结果。

Modbus TCP服务端

下面是一个示例代码,展示了如何使用Modbus_Tk实现Modbus TCP服务端:

import sys

import modbus_tk
import modbus_tk.defines as cst
from modbus_tk import modbus_tcp


def main():
    """main"""

    logger = modbus_tk.utils.create_logger(name="console", record_format="%(message)s")

    try:
        #Create the server
        server = modbus_tcp.TcpServer()
        logger.info("running...")
        logger.info("enter 'quit' for closing the server")

        server.start()

        slave_1 = server.add_slave(1)
        slave_1.add_block('0', cst.HOLDING_REGISTERS, 0, 100)

        while True:
            cmd = sys.stdin.readline()
            args = cmd.split(' ')

            if cmd.find('quit') == 0:
                sys.stdout.write('bye-bye\r\n')
                break

            elif args[0] == 'add_slave':
                slave_id = int(args[1])
                server.add_slave(slave_id)
                sys.stdout.write('done: slave %d added\r\n' % slave_id)

            elif args[0] == 'add_block':
                slave_id = int(args[1])
                name = args[2]
                block_type = int(args[3])
                starting_address = int(args[4])
                length = int(args[5])
                slave = server.get_slave(slave_id)
                slave.add_block(name, block_type, starting_address, length)
                sys.stdout.write('done: block %s added\r\n' % name)

            elif args[0] == 'set_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                values = []
                for val in args[4:]:
                    values.append(int(val))
                slave = server.get_slave(slave_id)
                slave.set_values(name, address, values)
                values = slave.get_values(name, address, len(values))
                sys.stdout.write('done: values written: %s\r\n' % str(values))

            elif args[0] == 'get_values':
                slave_id = int(args[1])
                name = args[2]
                address = int(args[3])
                length = int(args[4])
                slave = server.get_slave(slave_id)
                values = slave.get_values(name, address, length)
                sys.stdout.write('done: values read: %s\r\n' % str(values))

            else:
                sys.stdout.write("unknown command %s\r\n" % args[0])
    finally:
        server.stop()


if __name__ == "__main__":
    main()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75

在上面的代码中,我们同样创建了一个Modbus TCP从站对象,并指定了监听的IP地址和端口号。然后,我们设置了从站的ID和保持寄存器的初始值。最后,我们通过调用join方法来运行从站。

总结

通过使用Modbus_Tk库,我们可以方便地实现Modbus RTU和Modbus TCP的客户端和服务端。本文提供了示例代码,帮助读者快速上手。如果你对Modbus_Tk库有更多的需求,可以查阅官方文档以获取更多信息和示例代码。

希望本文能够对你有所帮助!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/991506
推荐阅读
相关标签
  

闽ICP备14008679号