当前位置:   article > 正文

信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试

信雅纳网络测试的二次开发集成:XOA(Xena Open-Source Automation)开源自动化测试

目录

XOA是什么

XOA CLI

XOA Python API

​XOA Python Test Suite/测试套件

XOA Converter

Source Code


XOA是什么

XOA(Xena Open-Source Automation)是一个开源的测试自动化框架,追求“高效、易用、灵活”的跨操作系统的开发框架。能与Xena现有解决方案无缝配合,借助XOA可调用Xena(Z系列打流仪、E系列损伤仪)完成自动化测试任务。同时它提供了操作接口可将其他仪表/设备做并栈集成测试验证,统一整理输出测试报告。

XOA包含:
XOA CLI、XOA Python API、XOA Python TestSuite、XOA Converter

信雅纳:网络测试自动化

XOA CLI

XOA CLI 提供了一套简洁直观的基于文本语言的独立命令,用以控制和集成 Xena测试仪硬件,实现各种测试任务的自动化。
任何客户端平台/编程语言(如 Python、Tcl、Bash)都可与 XOA CLI 配合使用。
CLI 可在远程登录终端上使用,直接向 Xena 测试仪发送命令。
ValkyrieManager通过测试端口配置文件(.xpc ),可在 XOA CLI 环境之间无缝转换。

XOA Python API

XOA Python API与XOA CLI和XenaManager无缝集成

  • 面向对象的高级抽象: XOA Python API采用面向对象的方法,提供了更高层次的抽象,加快了自动化脚本的开发。
  • 集成开发环境自动完成,内置手册: XOA Python API 包含 IDE 自动完成以及类、函数和 API 内置手册等功能,可显著提高开发效率。
  • 命令分组和响应自动匹配:该功能允许命令分组和响应自动匹配,从而优化了测试执行效率。
  • 服务器到客户端推送通知订阅: XOA Python API 支持服务器到客户端的推送通知订阅,降低了用户代码的复杂性。
  • 支持 Python 3.8 及更高版本: XOA Python API 兼容 Python 3.8 及更高版本,确保与现代 Python 环境兼容。
    1. import asyncio
    2. from contextlib import suppress
    3. from xoa_driver import testers
    4. from xoa_driver import modules
    5. from xoa_driver import ports
    6. from xoa_driver import utils
    7. from xoa_driver import enums
    8. from xoa_driver import exceptions
    9. from ipaddress import IPv4Address, IPv6Address
    10. from binascii import hexlify
    11. from xoa_driver.misc import Hex
    12. #---------------------------
    13. # Global parameters
    14. #---------------------------
    15. CHASSIS_IP = "10.165.16.70" # Chassis IP address or hostname
    16. USERNAME = "XOA" # Username
    17. MODULE_INDEX = 4 # Module index
    18. TX_PORT_INDEX = 0 # TX Port index
    19. FRAME_SIZE_BYTES = 4178 # Frame size on wire including the FCS.
    20. FRAME_COUNT = 20 # The number of frames including the first, the middle, and the last.
    21. REPETITION = 1 # The number of repetitions of the frame sequence, set to 0 if you want the port to repeat over and over
    22. TRAFFIC_RATE_FPS = 100 # Traffic rate in frames per second
    23. TRAFFIC_RATE_PERCENT = int(4/10 * 1000000)
    24. SHOULD_BURST = False # Whether the middle frames should be bursty
    25. BURST_SIZE_FRAMES = 9 # Burst size in frames for the middle frames
    26. INTER_BURST_GAP_BYTES = 3000 # The inter-burst gap in bytes
    27. INTRA_BURST_GAP_BYTES = 1000 # The inter-frame gap within a burst, aka. intra-burst gap, in bytes
    28. #---------------------------
    29. # Header content for streams
    30. #---------------------------
    31. class Ethernet:
    32. def __init__(self):
    33. self.dst_mac = "0000.0000.0000"
    34. self.src_mac = "0000.0000.0000"
    35. self.ethertype = "86DD"
    36. def __str__(self):
    37. _dst_mac = self.dst_mac.replace(".", "")
    38. _src_mac = self.src_mac.replace(".", "")
    39. _ethertype = self.ethertype
    40. return f"{_dst_mac}{_src_mac}{_ethertype}".upper()
    41. class IPV4:
    42. def __init__(self):
    43. self.version = 4
    44. self.header_length = 5
    45. self.dscp = 0
    46. self.ecn = 0
    47. self.total_length = 42
    48. self.identification = "0000"
    49. self.flags = 0
    50. self.offset = 0
    51. self.ttl = 255
    52. self.proto = 255
    53. self.checksum = "0000"
    54. self.src = "0.0.0.0"
    55. self.dst = "0.0.0.0"
    56. def __str__(self):
    57. _ver = '{:01X}'.format(self.version)
    58. _header_length = '{:01X}'.format(self.header_length)
    59. _dscp_ecn = '{:02X}'.format((self.dscp<<2)+self.ecn)
    60. _total_len = '{:04X}'.format(self.total_length)
    61. _ident = self.identification
    62. _flag_offset = '{:04X}'.format((self.flags<<13)+self.offset)
    63. _ttl = '{:02X}'.format(self.ttl)
    64. _proto = '{:02X}'.format(self.proto)
    65. _check = self.checksum
    66. _src = hexlify(IPv4Address(self.src).packed).decode()
    67. _dst = hexlify(IPv4Address(self.dst).packed).decode()
    68. return f"{_ver}{_header_length}{_dscp_ecn}{_total_len}{_ident}{_flag_offset}{_ttl}{_proto}{_check}{_src}{_dst}".upper()
    69. class IPV6:
    70. def __init__(self):
    71. self.version = 6
    72. self.traff_class = 8
    73. self.flow_label = 0
    74. self.payload_length = 0
    75. self.next_header = "11"
    76. self.hop_limit = 1
    77. self.src = "2000::2"
    78. self.dst = "2000::100"
    79. def __str__(self):
    80. _ver = '{:01X}'.format(self.version)
    81. _traff_class = '{:01X}'.format(self.traff_class)
    82. _flow_label = '{:06X}'.format(self.flow_label)
    83. _payload_len = '{:04X}'.format(self.payload_length)
    84. _next_header = self.next_header
    85. _hop_limit = '{:02X}'.format(self.hop_limit)
    86. _src = hexlify(IPv6Address(self.src).packed).decode()
    87. _dst = hexlify(IPv6Address(self.dst).packed).decode()
    88. return f"{_ver}{_traff_class}{_flow_label}{_payload_len}{_next_header}{_hop_limit}{_src}{_dst}".upper()
    89. class UDP:
    90. def __init__(self):
    91. self.src_port = 0
    92. self.dst_port = 0
    93. self.length = 0
    94. self.checksum = 0
    95. def __str__(self):
    96. _src_port = '{:04X}'.format(self.src_port)
    97. _dst_port = '{:04X}'.format(self.dst_port)
    98. _length = '{:04X}'.format(self.length)
    99. _checksum = '{:04X}'.format(self.checksum)
    100. return f"{_src_port}{_dst_port}{_length}{_checksum}".upper()
    101. class ROCEV2:
    102. def __init__(self):
    103. self.opcode = 0
    104. self.solicited_event = 0
    105. self.mig_req = 0
    106. self.pad_count = 1
    107. self.header_version = 0
    108. self.partition_key = 65535
    109. self.reserved = 7
    110. self.dest_queue_pair = 2
    111. self.ack_request = 0
    112. self.reserved_7bits = 0
    113. self.packet_seq_number =0
    114. def __str__(self):
    115. _opcode = '{:02X}'.format(self.opcode)
    116. _combo_1 = '{:02X}'.format((self.solicited_event<<7)+(self.mig_req<<6)+(self.pad_count<<4)+self.header_version)
    117. _pk = '{:04X}'.format(self.partition_key)
    118. _reserved = '{:02X}'.format(self.reserved)
    119. _qp = '{:06X}'.format(self.dest_queue_pair)
    120. _combo_2 = '{:02X}'.format((self.ack_request<<7)+self.reserved_7bits)
    121. _ps = '{:06X}'.format(self.packet_seq_number)
    122. return f"{_opcode}{_combo_1}{_pk}{_reserved}{_qp}{_combo_2}{_ps}".upper()
    123. #------------------------------
    124. # def my_awesome_func()
    125. #------------------------------
    126. async def my_awesome_func(stop_event: asyncio.Event, should_burst: bool) -> None:
    127. """This Python function uses XOA Python API to configure the TX port
    128. :param stop_event:
    129. :type stop_event: asyncio.Event
    130. :param should_burst: Whether the middle frames should be bursty.
    131. :type should_burst: bool
    132. """
    133. # create tester instance and establish connection
    134. tester = await testers.L23Tester(CHASSIS_IP, USERNAME, enable_logging=False)
    135. # access the module on the tester
    136. module = tester.modules.obtain(MODULE_INDEX)
    137. # check if the module is of type Loki-100G-5S-2P
    138. if not isinstance(module, modules.ModuleChimera):
    139. # access the txport on the module
    140. txport = module.ports.obtain(TX_PORT_INDEX)
    141. #---------------------------
    142. # Port reservation
    143. #---------------------------
    144. print(f"#---------------------------")
    145. print(f"# Port reservation")
    146. print(f"#---------------------------")
    147. if txport.is_released():
    148. print(f"The txport is released (not owned by anyone). Will reserve the txport to continue txport configuration.")
    149. await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session
    150. elif not txport.is_reserved_by_me():
    151. print(f"The txport is reserved by others. Will relinquish and reserve the txport to continue txport configuration.")
    152. await txport.reservation.set_relinquish() # send relinquish the txport
    153. await txport.reservation.set_reserve() # set reservation , means txport will be controlled by our session
    154. #---------------------------
    155. # Start port configuration
    156. #---------------------------
    157. print(f"#---------------------------")
    158. print(f"# Start port configuration")
    159. print(f"#---------------------------")
    160. print(f"Reset the txport")
    161. await txport.reset.set()
    162. print(f"Configure the txport")
    163. await utils.apply(
    164. # txport.speed.mode.selection.set(mode=enums.PortSpeedMode.F100G),
    165. txport.comment.set(comment="RoCE2 on Loki"),
    166. txport.tx_config.enable.set_on(),
    167. txport.latency_config.offset.set(offset=0),
    168. txport.latency_config.mode.set(mode=enums.LatencyMode.LAST2LAST),
    169. txport.tx_config.burst_period.set(burst_period=0),
    170. txport.tx_config.packet_limit.set(packet_count_limit=FRAME_COUNT*REPETITION),
    171. txport.max_header_length.set(max_header_length=128),
    172. txport.autotrain.set(interval=0),
    173. txport.loop_back.set_none(), # If you want loopback the port TX to its own RX, change it to set_txoff2rx()
    174. txport.checksum.set(offset=0),
    175. txport.tx_config.delay.set(delay_val=0),
    176. txport.tpld_mode.set_normal(),
    177. txport.payload_mode.set_normal(),
    178. #txport.rate.pps.set(port_rate_pps=TRAFFIC_RATE_FPS), # If you want to control traffic rate with FPS, uncomment this.
    179. txport.rate.fraction.set(TRAFFIC_RATE_PERCENT), # If you want to control traffic rate with fraction, uncomment this. 1,000,000 = 100%
    180. )
    181. if should_burst:
    182. await txport.tx_config.mode.set_burst()
    183. else:
    184. await txport.tx_config.mode.set_sequential()
    185. #--------------------------------------
    186. # Configure stream_0 on the txport
    187. #--------------------------------------
    188. print(f" Configure first-packet stream on the txport")
    189. stream_0 = await txport.streams.create()
    190. eth = Ethernet()
    191. eth.src_mac = "aaaa.aaaa.0005"
    192. eth.dst_mac = "bbbb.bbbb.0005"
    193. ipv4 = IPV4()
    194. ipv4.src = "1.1.1.5"
    195. ipv4.dst = "2.2.2.5"
    196. ipv6 = IPV6()
    197. ipv6.src = "2001::5"
    198. ipv6.dst = "2002::5"
    199. udp = UDP()
    200. udp.src_port = 4791
    201. udp.dst_port = 4791
    202. rocev2 = ROCEV2()
    203. rocev2.opcode = 0
    204. rocev2.dest_queue_pair = 2
    205. rocev2.packet_seq_number = 0
    206. await utils.apply(
    207. stream_0.enable.set_on(),
    208. stream_0.packet.limit.set(packet_count=1),
    209. stream_0.comment.set(f"First packet"),
    210. stream_0.rate.fraction.set(stream_rate_ppm=10000),
    211. stream_0.packet.header.protocol.set(segments=[
    212. enums.ProtocolOption.ETHERNET,
    213. enums.ProtocolOption.IPV6,
    214. enums.ProtocolOption.UDP,
    215. enums.ProtocolOption.RAW_12,
    216. ]),
    217. stream_0.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
    218. stream_0.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
    219. stream_0.payload.content.set(
    220. payload_type=enums.PayloadType.PATTERN,
    221. hex_data=Hex("AABBCCDD")
    222. ),
    223. stream_0.tpld_id.set(test_payload_identifier = 0),
    224. stream_0.insert_packets_checksum.set_on()
    225. )
    226. if should_burst:
    227. await stream_0.burst.burstiness.set(size=1, density=100)
    228. await stream_0.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)
    229. #--------------------------------------
    230. # Configure stream_1 on the txport
    231. #--------------------------------------
    232. print(f" Configure middle-packets stream on the txport")
    233. stream_1 = await txport.streams.create()
    234. rocev2.opcode = 1
    235. rocev2.dest_queue_pair = 2
    236. rocev2.packet_seq_number = 1
    237. await utils.apply(
    238. stream_1.enable.set_on(),
    239. stream_1.packet.limit.set(packet_count=FRAME_COUNT-2),
    240. stream_1.comment.set(f"Middle packets"),
    241. stream_1.rate.fraction.set(stream_rate_ppm=10000),
    242. stream_1.packet.header.protocol.set(segments=[
    243. enums.ProtocolOption.ETHERNET,
    244. enums.ProtocolOption.IPV6,
    245. enums.ProtocolOption.UDP,
    246. enums.ProtocolOption.RAW_12,
    247. ]),
    248. stream_1.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
    249. stream_1.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
    250. stream_1.payload.content.set(
    251. payload_type=enums.PayloadType.PATTERN,
    252. hex_data=Hex("AABBCCDD")
    253. ),
    254. stream_1.tpld_id.set(test_payload_identifier = 1),
    255. stream_1.insert_packets_checksum.set_on()
    256. )
    257. if should_burst:
    258. await stream_1.burst.burstiness.set(size=BURST_SIZE_FRAMES, density=100)
    259. await stream_1.burst.gap.set(inter_packet_gap=INTRA_BURST_GAP_BYTES, inter_burst_gap=INTER_BURST_GAP_BYTES)
    260. # Configure a modifier on the stream_1
    261. await stream_1.packet.header.modifiers.configure(1)
    262. # Modifier on the SQN
    263. modifier = stream_1.packet.header.modifiers.obtain(0)
    264. await modifier.specification.set(position=72, mask="FFFF0000", action=enums.ModifierAction.INC, repetition=1)
    265. await modifier.range.set(min_val=1, step=1, max_val=FRAME_COUNT-2)
    266. #--------------------------------------
    267. # Configure stream_2 on the txport
    268. #--------------------------------------
    269. print(f" Configure last-packet stream on the txport")
    270. stream_2 = await txport.streams.create()
    271. rocev2.opcode = 2
    272. rocev2.dest_queue_pair = 2
    273. rocev2.packet_seq_number = FRAME_COUNT-1
    274. await utils.apply(
    275. stream_2.enable.set_on(),
    276. stream_2.packet.limit.set(packet_count=1),
    277. stream_2.comment.set(f"Last packet"),
    278. stream_2.rate.fraction.set(stream_rate_ppm=10000),
    279. stream_2.packet.header.protocol.set(segments=[
    280. enums.ProtocolOption.ETHERNET,
    281. enums.ProtocolOption.IPV6,
    282. enums.ProtocolOption.UDP,
    283. enums.ProtocolOption.RAW_12,
    284. ]),
    285. stream_2.packet.header.data.set(hex_data=Hex(str(eth)+str(ipv6)+str(udp)+str(rocev2))),
    286. stream_2.packet.length.set(length_type=enums.LengthType.FIXED, min_val=FRAME_SIZE_BYTES, max_val=FRAME_SIZE_BYTES),
    287. stream_2.payload.content.set(
    288. payload_type=enums.PayloadType.PATTERN,
    289. hex_data=Hex("AABBCCDD")
    290. ),
    291. stream_2.tpld_id.set(test_payload_identifier = 2),
    292. stream_2.insert_packets_checksum.set_on()
    293. )
    294. if should_burst:
    295. await stream_2.burst.burstiness.set(size=1, density=100)
    296. await stream_2.burst.gap.set(inter_packet_gap=0, inter_burst_gap=0)
    297. async def main():
    298. stop_event =asyncio.Event()
    299. try:
    300. await my_awesome_func(stop_event, should_burst=SHOULD_BURST)
    301. except KeyboardInterrupt:
    302. stop_event.set()
    303. if __name__=="__main__":
    304. asyncio.run(main())

无效代码 var code = "eeb31072-6429-44ff-8a6f-18ac2155687f"

信雅纳:Python XOA测试自动化

XOA Python Test Suite/测试套件

XOA Python 测试套件是一个测试框架,为开发人员和测试专家执行和集成 Xena 测试套件提供了定义明确的 API。
该框架以自动化方式处理各种任务,如测试资源管理、测试执行和发布测试结果。
每个 RFC 测试套件都被设计成独立的 "插件",可根据需要有选择性地集成到项目中。
目前,XOA Python 测试套件包括
- RFC2544
- RFC2889
- RFC3918

XOA Converter

如果您希望将当前的 Xena 测试套件配置快速迁移到 XOA,现在使用 XOA 转换器工具比以往任何时候都更容易。

以前,Xena的测试套件应用程序仅与Windows兼容。但今后,所有现有和未来的测试套件都将并入 XOA Python 测试套件,从而消除 Windows 限制。

为了简化过渡,我们推出了 XOA 转换器。该工具允许用户将现有的Xena测试套件配置(Xena2544、Xena2889和Xena3918)从Xena窗口桌面应用程序无缝迁移到XOA Python测试套件中。有了 XOA 转换器,迁移过程变得轻松简单。

信雅纳:网络测试仪自动化测试

Source Code

GitHub 是我们托管 XOA 源代码的首选平台,因为它具有出色的版本控制和协作能力。它为管理代码变更提供了一个极佳的环境,确保项目的历史记录完备且易于访问。我们崇尚开放,鼓励每个人使用、分享、贡献和反馈我们的源代码。GitHub 允许进行无缝协作,并促进以社区为导向的方法,让每个人都能积极参与 XOA 的开发和改进。我们重视来自社区的意见和贡献,因为这能提高源代码的整体质量和创新性。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/花生_TL007/article/detail/269258
推荐阅读
相关标签
  

闽ICP备14008679号