赞
踩
目录
XOA(Xena Open-Source Automation)是一个开源的测试自动化框架,追求“高效、易用、灵活”的跨操作系统的开发框架。能与Xena现有解决方案无缝配合,借助XOA可调用Xena(Z系列打流仪、E系列损伤仪)完成自动化测试任务。同时它提供了操作接口可将其他仪表/设备做并栈集成测试验证,统一整理输出测试报告。
XOA包含:
XOA CLI、XOA Python API、XOA Python TestSuite、XOA Converter
XOA CLI 提供了一套简洁直观的基于文本语言的独立命令,用以控制和集成 Xena测试仪硬件,实现各种测试任务的自动化。
任何客户端平台/编程语言(如 Python、Tcl、Bash)都可与 XOA CLI 配合使用。
CLI 可在远程登录终端上使用,直接向 Xena 测试仪发送命令。
ValkyrieManager通过测试端口配置文件(.xpc ),可在 XOA CLI 环境之间无缝转换。
XOA Python API与XOA CLI和XenaManager无缝集成
- import asyncio
- from contextlib import suppress
- from xoa_driver import testers
- from xoa_driver import modules
- from xoa_driver import ports
- from xoa_driver import utils
- from xoa_driver import enums
- from xoa_driver import exceptions
- from ipaddress import IPv4Address, IPv6Address
- from binascii import hexlify
- from xoa_driver.misc import Hex
-
- #---------------------------
- # Global parameters
- #---------------------------
-
- CHASSIS_IP = "10.165.16.70" # Chassis IP address or hostname
- USERNAME = "XOA" # Username
- MODULE_INDEX = 4 # Module index
- TX_PORT_INDEX = 0 # TX Port index
-
- FRAME_SIZE_BYTES = 4178 # Frame size on wire including the FCS.
- FRAME_COUNT = 20 # The number of frames including the first, the middle, and the last.
- REPETITION = 1 # The number of repetitions of the frame sequence, set to 0 if you want the port to repeat over and over
- TRAFFIC_RATE_FPS = 100 # Traffic rate in frames per second
- TRAFFIC_RATE_PERCENT = int(4/10 * 1000000)
-
- SHOULD_BURST = False # Whether the middle frames should be bursty
- BURST_SIZE_FRAMES = 9 # Burst size in frames for the middle frames
- INTER_BURST_GAP_BYTES = 3000 # The inter-burst gap in bytes
- INTRA_BURST_GAP_BYTES = 1000 # The inter-frame gap within a burst, aka. intra-burst gap, in bytes
-
-
-
- #---------------------------
- # Header content for streams
- #---------------------------
- class Ethernet:
- def __init__(self):
- self.dst_mac = "0000.0000.0000"
- self.src_mac = "0000.0000.0000"
- self.ethertype = "86DD"
-
- def __str__(self):
- _dst_mac = self.dst_mac.replace(".", "")
- _src_mac = self.src_mac.replace(".", "")
- _ethertype = self.ethertype
- return f"{_dst_mac}{_src_mac}{_ethertype}".upper()
-
- class IPV4:
- def __init__(self):
- self.version = 4
- self.header_length = 5
- self.dscp = 0
- self.ecn = 0
- self.total_length = 42
- self.identification = "0000"
- self.flags = 0
- self.offset = 0
- self.ttl = 255
- self.proto = 255
- self.checksum = "0000"
- self.src = "0.0.0.0"
- self.dst = "0.0.0.0"
-
- def __str__(self):
- _ver = '{:01X}'.format(self.version)
- _header_length = '{:01X}'.format(self.header_length)
- _dscp_ecn = '{:02X}'.format((self.dscp<<2)+self.ecn)
- _total_len = '{:04X}'.format(self.total_length)
- _ident = self.identification
- _flag_offset = '{:04X}'.format((self.flags<<13)+self.offset)
- _ttl = '{:02X}'.format(self.ttl)
- _proto = '{:02X}'.format(self.proto)
- _check = self.checksum
- _src = hexlify(IPv4Address(self.src).packed).decode()
- _dst = hexlify(IPv4Address(self.dst).packed).decode()
- return f"{_ver}{_header_length}{_dscp_ecn}{_total_len}{_ident}{_flag_offset}{_ttl}{_proto}{_check}{_src}{_dst}".upper()
-
- class IPV6:
- def __init__(self):
- self.version = 6
- self.traff_class = 8
- self.flow_label = 0
- self.payload_length = 0
- self.next_header = "11"
- self.hop_limit = 1
- self.src = "2000::2"
- self.dst = "2000::100"
-
- def __str__(self):
- _ver = '{:01X}'.format(self.version)
- _traff_class = '{:01X}'.format(self.traff_class)
- _flow_label = '{:06X}'.format(self.flow_label)
- _payload_len = '{:04X}'.format(self.payload_length)
- _next_header = self.next_header
- _hop_limit = '{:02X}'.format(self.hop_limit)
- _src = hexlify(IPv6Address(self.src).packed).decode()
- _dst = hexlify(IPv6Address(self.dst).packed).decode()
- return f"{_ver}{_traff_class}{_flow_label}{_payload_len}{_next_header}{_hop_limit}{_src}{_dst}".upper()
-
- class UDP:
- def __init__(self):
- self.src_port = 0
- self.dst_port = 0
- self.length = 0
- self.checksum = 0
-
- def __str__(self):
- _src_port = '{:04X}'.format(self.src_port)
- _dst_port = '{:04X}'.format(self.dst_port)
- _length = '{:04X}'.format(self.length)
- _checksum = '{:04X}'.format(self.checksum)
- return f"{_src_port}{_dst_port}{_length}{_checksum}".upper()
-
- class ROCEV2:
- def __init__(self):
- self.opcode = 0
- self.solicited_event = 0
- self.mig_req = 0
- self.pad_count = 1
- self.header_version = 0
- self.partition_key = 65535
- self.reserved = 7
- self.dest_queue_pair = 2
- self.ack_request = 0
- self.reserved_7bits = 0
- self.packet_seq_number =0
-
- def __str__(self):
- _opcode = '{:02X}'.format(self.opcode)
- _combo_1 = '{:02X}'.format((self.solicited_event<<7)+(self.mig_req<<6)+(self.pad_count<<4)+self.header_version)
- _pk = '{:04X}'.format(self.partition_key)
- _reserved = '{:02X}'.format(self.reserved)
- _qp = '{:06X}'.format(self.dest_queue_pair)
- _combo_2 = '{:02X}'.format((self.ack_request<<7)+self.reserved_7bits)
- _ps = '{:06X}'.format(self.packet_seq_number)
- return f"{_opcode}{_combo_1}{_pk}{_reserved}{_qp}{_combo_2}{_ps}".upper()
-
-
- #------------------------------
- # def my_awesome_func()
- #------------------------------
- async def my_awesome_func(stop_event: asyncio.Event, should_burst: bool) -> None:
- """This Python function uses XOA Python API to configure the TX port
- :param stop_event:
- :type stop_event: asyncio.Event
- :param should_burst: Whether the middle frames should be bursty.
- :type should_burst: bool
- """
- # create tester instance and establish connection
- tester = await testers.L23Tester(CHASSIS_IP, USERNAME, enable_logging=False)
-
- # access the module on the tester
- module = tester.modules.obtain(MODULE_INDEX)
-
- # check if the module is of type Loki-100G-5S-2P
- if not isinstance(module, modules.ModuleChimera):
-
- # access the txport on the module
- txport = module.ports.obtain(TX_PORT_INDEX)
-
- #---------------------------
- # Port reservation
- #---------------------------
- print(f"#---------------------------")
- print(f"# Port reservation")
- print(f"#---------------------------")
- if txport.is_released():
- print(f"The txport is released (not owned by anyone). Will reserve the txport to continue txport configuration.")
- await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session
- elif not txport.is_reserved_by_me():
- print(f"The txport is reserved by others. Will relinquish and reserve the txport to continue txport configuration.")
- await txport.reservation.set_relinquish() # send relinquish the txport
- await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session
-
- #---------------------------
- # Start port configuration
- #---------------------------
- print(f"#---------------------------")
- print(f"# Start port configuration")
- print(f"#---------------------------")
-
- print(f"Reset the txport")
- await txport.reset.set()
-
- print(f"Configure the txport")
- await utils.apply(
- # txport.speed.mode.selection.set(mode=enums.PortSpeedMode.F100G),
- txport.comment.set(comment="RoCE2 on Loki"),
- txport.tx_config.enable.set_on(),
- txport.latency_config.offset.set(offset=0),
- txport.latency_config.mode.set(mode=enums.LatencyMode.LAST2LAST),
- txport.tx_config.burst_period.set(burst_period=0),
- txport.tx_config.packet_limit.set(packet_count_limit=FRAME_COUNT*REPETITION),
- txport.max_header_length.set(max_header_length=128),
- txport.autotrain.set(interval=0),
- txport.loop_back.set_none(), # If you want loopback the port TX to its own RX, change it to set_txoff2rx()
- txport.checksum.set(offset=0),
- txport.tx_config.delay.set(delay_val=0),
- txport.tpld_mode.set_normal(),
- txport.payload_mode.set_normal(),
- #txport.rate.pps.set(port_rate_pps=TRAFFIC_RATE_FPS), # If you want to control traffic rate with FPS, uncomment this.
- txport.rate.fraction.set(TRAFFIC_RATE_PERCENT), # If you want to control traffic rate with fraction, uncomment this. 1,000,000 = 100%
- )
- if should_burst:
- await txport.tx_config.mode.set_burst()
- else:
- await txport.tx_config.mode.set_sequential()
-
- #--------------------------------------
- # Configure stream_0 on the txport
- #--------------------------------------
- print(f" Configure first-packet stream on the txport")
-
- stream_0 = await txport.streams.create()
- eth = Ethernet()
- eth.src_mac = "aaaa.aaaa.0005"
- eth.dst_mac = "bbbb.bbbb.0005"
-
- ipv4 = IPV4()
- ipv4.src = "1.1.1.5"
- ipv4.dst = "2.2.2.5"
-
- ipv6 = IPV6()
- ipv6.src = "2001::5"
- ipv6.dst = "2002::5"
-
- udp = UDP()
- udp.src_port = 4791
- udp.dst_port = 4791
-
- rocev2 = ROCEV2()
- rocev2.opcode = 0
- rocev2.dest_queue_pair = 2
- rocev2.packet_seq_number = 0
-
- await utils.apply(
- stream_0.enable.set_on(),
- stream_0.packet.limit.set(packet_count=1),
- stream_0.comment.set(f"First packet"),
- stream_0.rate.fraction.set(stream_rate_ppm=10000),
- stream_0.packet.header.protocol.set(segments=[
- enums.ProtocolOption.ETHERNET,
- enums.ProtocolOption.IPV6,
- enums.ProtocolOption.UDP,
- enums.ProtocolOption.RAW_12,
- ]),
- stream_0.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
- stream_0.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
- stream_0.payload.content.set(
- payload_type=enums.PayloadType.PATTERN,
- hex_data=Hex("AABBCCDD")
- ),
- stream_0.tpld_id.set(test_payload_identifier = 0),
- stream_0.insert_packets_checksum.set_on()
- )
- if should_burst:
- await stream_0.burst.burstiness.set(size=1, density=100)
- await stream_0.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)
-
- #--------------------------------------
- # Configure stream_1 on the txport
- #--------------------------------------
- print(f" Configure middle-packets stream on the txport")
-
- stream_1 = await txport.streams.create()
-
- rocev2.opcode = 1
- rocev2.dest_queue_pair = 2
- rocev2.packet_seq_number = 1
-
- await utils.apply(
- stream_1.enable.set_on(),
- stream_1.packet.limit.set(packet_count=FRAME_COUNT-2),
- stream_1.comment.set(f"Middle packets"),
- stream_1.rate.fraction.set(stream_rate_ppm=10000),
- stream_1.packet.header.protocol.set(segments=[
- enums.ProtocolOption.ETHERNET,
- enums.ProtocolOption.IPV6,
- enums.ProtocolOption.UDP,
- enums.ProtocolOption.RAW_12,
- ]),
- stream_1.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
- stream_1.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
- stream_1.payload.content.set(
- payload_type=enums.PayloadType.PATTERN,
- hex_data=Hex("AABBCCDD")
- ),
- stream_1.tpld_id.set(test_payload_identifier = 1),
- stream_1.insert_packets_checksum.set_on()
- )
- if should_burst:
- await stream_1.burst.burstiness.set(size=BURST_SIZE_FRAMES, density=100)
- await stream_1.burst.gap.set(inter_packet_gap=INTRA_BURST_GAP_BYTES, inter_burst_gap=INTER_BURST_GAP_BYTES)
-
- # Configure a modifier on the stream_1
- await stream_1.packet.header.modifiers.configure(1)
-
- # Modifier on the SQN
- modifier = stream_1.packet.header.modifiers.obtain(0)
- await modifier.specification.set(position=72, mask="FFFF0000", action=enums.ModifierAction.INC, repetition=1)
- await modifier.range.set(min_val=1, step=1, max_val=FRAME_COUNT-2)
-
-
- #--------------------------------------
- # Configure stream_2 on the txport
- #--------------------------------------
- print(f" Configure last-packet stream on the txport")
-
- stream_2 = await txport.streams.create()
-
- rocev2.opcode = 2
- rocev2.dest_queue_pair = 2
- rocev2.packet_seq_number = FRAME_COUNT-1
-
- await utils.apply(
- stream_2.enable.set_on(),
- stream_2.packet.limit.set(packet_count=1),
- stream_2.comment.set(f"Last packet"),
- stream_2.rate.fraction.set(stream_rate_ppm=10000),
- stream_2.packet.header.protocol.set(segments=[
- enums.ProtocolOption.ETHERNET,
- enums.ProtocolOption.IPV6,
- enums.ProtocolOption.UDP,
- enums.ProtocolOption.RAW_12,
- ]),
- stream_2.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
- stream_2.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
- stream_2.payload.content.set(
- payload_type=enums.PayloadType.PATTERN,
- hex_data=Hex("AABBCCDD")
- ),
- stream_2.tpld_id.set(test_payload_identifier = 2),
- stream_2.insert_packets_checksum.set_on()
- )
- if should_burst:
- await stream_2.burst.burstiness.set(size=1, density=100)
- await stream_2.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)
-
-
-
- async def main():
- stop_event =asyncio.Event()
- try:
- await my_awesome_func(stop_event, should_burst=SHOULD_BURST)
- except KeyboardInterrupt:
- stop_event.set()
-
-
- if __name__=="__main__":
- asyncio.run(main())
无效代码 var code = "eeb31072-6429-44ff-8a6f-18ac2155687f"
XOA Python 测试套件是一个测试框架,为开发人员和测试专家执行和集成 Xena 测试套件提供了定义明确的 API。
该框架以自动化方式处理各种任务,如测试资源管理、测试执行和发布测试结果。
每个 RFC 测试套件都被设计成独立的 "插件",可根据需要有选择性地集成到项目中。
目前,XOA Python 测试套件包括
- RFC2544
- RFC2889
- RFC3918
如果您希望将当前的 Xena 测试套件配置快速迁移到 XOA,现在使用 XOA 转换器工具比以往任何时候都更容易。
以前,Xena的测试套件应用程序仅与Windows兼容。但今后,所有现有和未来的测试套件都将并入 XOA Python 测试套件,从而消除 Windows 限制。
为了简化过渡,我们推出了 XOA 转换器。该工具允许用户将现有的Xena测试套件配置(Xena2544、Xena2889和Xena3918)从Xena窗口桌面应用程序无缝迁移到XOA Python测试套件中。有了 XOA 转换器,迁移过程变得轻松简单。
GitHub 是我们托管 XOA 源代码的首选平台,因为它具有出色的版本控制和协作能力。它为管理代码变更提供了一个极佳的环境,确保项目的历史记录完备且易于访问。我们崇尚开放,鼓励每个人使用、分享、贡献和反馈我们的源代码。GitHub 允许进行无缝协作,并促进以社区为导向的方法,让每个人都能积极参与 XOA 的开发和改进。我们重视来自社区的意见和贡献,因为这能提高源代码的整体质量和创新性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。