当前位置:   article > 正文

Jmeter Kafka Producer Sampler_jmeter kafka插件

jmeter kafka插件

目录

前言

Jmeter Kafka插件开发之Sampler篇

插件界面如下:

主要有3个区域:

broker地址:Kafka Broker地址

topic名称:Kafka topic

body:Kafka 报文区域

项目说明:

实现方式:

UI代码

业务代码:

pom文件

界面如下:

 测试验证​编辑


前言

之前写了一个Jmeter kafka 插件是基于公司对kafka二次封装写的,这次基于原生kafka写一个插件,如下,废话不多说,中心思想是一样的,可以参考我上篇文章

Jmeter Kafka插件开发之Sampler篇

插件界面如下:

主要有3个区域:

broker地址:Kafka Broker地址

用于填写Kafka broker地址,一般为IP+端口号如:127.0.0.1:1234

topic名称:Kafka topic

用于填写Kafka主题名称,如:test

body:Kafka 报文区域

填写Kafka发送的报文,如:test123

项目说明:

需要建一个gui包和一个sampler包,Jmeter在加载插件的时候会加载gui里面的类,可以参考JMeter源码的包命名方式,以下是参考的项目目录结构:

实现方式:

  • 继承AbstractSamplerGui方法
  • 重写createTestElement()方法:此方法应创建TestElement类的新实例,然后将其传递给modifyTestElement(TestElement) 方法
  • 重写configure()方法:通过调用此方法,可以用测试元素对象的内容初始化新创建的GUI组件
  • 重写modifyTestElement()方法:将GUI元素中的数据移动到TestElement中
  • 主要重写2、3、4这3个方法,其他的包括重写clearGui()用于恢复GUI到初始状态,getStaticLabel()返回插件名称
  • 引入ApacheJMeter_java、ApacheJMeter_core以及需要使用的其他库
  • GUI类主要负责输入数据然后将数据传送到TestElement中,然后sampler类拿到数据进行业务逻辑处理。
     

UI代码

  1. package com.jmeter.gui;
  2. import com.jmeter.sampler.KafkaSampler;
  3. import org.apache.jmeter.gui.util.JSyntaxTextArea;
  4. import org.apache.jmeter.gui.util.JTextScrollPane;
  5. import org.apache.jmeter.gui.util.VerticalPanel;
  6. import org.apache.jmeter.samplers.gui.AbstractSamplerGui;
  7. import org.apache.jmeter.testelement.TestElement;
  8. import org.apache.jorphan.gui.layout.VerticalLayout;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import javax.swing.*;
  12. import java.awt.*;
  13. import java.awt.event.ActionEvent;
  14. import java.awt.event.ActionListener;
  15. public class KafkaSamplerUI extends AbstractSamplerGui implements ActionListener {
  16. private static final long serialVersionUID = 1L;
  17. private static final Logger log = LoggerFactory.getLogger(KafkaSamplerUI.class);
  18. private JLabel brokerAddressLable;
  19. private JTextField brokerAddressField;
  20. // private JLabel clusterNameLabel;
  21. // private JTextField clusterNameField;
  22. private JLabel topicNameLabel;
  23. private JTextField topicNameField;
  24. // private JLabel chekCodeLabel;
  25. // private JTextField chekCodeField;
  26. private JSyntaxTextArea textMessage = new JSyntaxTextArea(40, 50);
  27. private JLabel textArea = new JLabel();
  28. private JTextScrollPane textPanel = new JTextScrollPane(textMessage);
  29. // private JLabel kafkaLabel;
  30. // private JRadioButton readWriteMode;
  31. // private JRadioButton writeMode;
  32. // private JRadioButton readMode;
  33. Box brokerAddressPanel, topicNamePanel, kafkaPanel;
  34. // static Box filterPanel, readNumPanel;
  35. JPanel mainPanel = new VerticalPanel();
  36. JPanel extendPanel = new VerticalPanel();
  37. static JPanel contentPanel;
  38. public KafkaSamplerUI() {
  39. super();
  40. this.init();
  41. }
  42. //创建消息输入框
  43. private JPanel createContentPanel() {
  44. JPanel ContentPanel = new VerticalPanel();
  45. JPanel messageContentPanel = new JPanel(new BorderLayout());
  46. messageContentPanel.add(this.textArea, BorderLayout.NORTH);
  47. messageContentPanel.add(this.textPanel, BorderLayout.CENTER);
  48. ContentPanel.add(messageContentPanel);
  49. ContentPanel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createLineBorder(Color.gray), "写入数据"));
  50. return ContentPanel;
  51. }
  52. private Box createTextFiledPanle(String name) {
  53. JLabel jLabel = new JLabel(name);
  54. JTextField jTextField = new JTextField(6);
  55. Box box = Box.createHorizontalBox();
  56. box.add(jLabel);
  57. box.add(jTextField);
  58. return box;
  59. }
  60. private void init() {
  61. log.info("初始化UI界面");
  62. brokerAddressLable = new JLabel("broker endpoint(s)地址:");
  63. brokerAddressField = new JTextField(6);
  64. topicNameLabel = new JLabel("主题名称:");
  65. topicNameField = new JTextField(6);
  66. // kafkaLabel = new JLabel("kafka:");
  67. //
  68. // readWriteMode = new JRadioButton("读写模式");
  69. // writeMode = new JRadioButton("写模式");
  70. // readMode = new JRadioButton("读模式");
  71. // kafkaPanel = Box.createHorizontalBox();
  72. //
  73. // kafkaPanel.add(kafkaLabel);
  74. // kafkaPanel.add(readWriteMode);
  75. // kafkaPanel.add(writeMode);
  76. // kafkaPanel.add(readMode);
  77. //设置默认读写方式
  78. // writeMode.setSelected(true);
  79. // readWriteMode.setSelected(false);
  80. // readMode.setSelected(false);
  81. contentPanel = createContentPanel();
  82. contentPanel.setVisible(true);
  83. // filterPanel = createTextFiledPanle("筛选条件");
  84. //
  85. // readNumPanel = createTextFiledPanle("读取条数");
  86. setLayout(new VerticalLayout(5, VerticalLayout.BOTH, VerticalLayout.TOP));
  87. setBorder(makeBorder());
  88. add(makeTitlePanel());
  89. brokerAddressPanel = Box.createHorizontalBox();
  90. brokerAddressPanel.add(brokerAddressLable);
  91. brokerAddressPanel.add(brokerAddressField);
  92. topicNamePanel = Box.createHorizontalBox();
  93. topicNamePanel.add(topicNameLabel);
  94. topicNamePanel.add(topicNameField);
  95. mainPanel.add(brokerAddressPanel);
  96. mainPanel.add(topicNamePanel);
  97. // mainPanel.add(kafkaPanel);
  98. extendPanel.add(contentPanel);
  99. // extendPanel.add(filterPanel);
  100. // extendPanel.add(readNumPanel);
  101. extendPanel.setVisible(true);
  102. add(mainPanel, BorderLayout.CENTER);
  103. add(extendPanel, BorderLayout.CENTER);
  104. }
  105. /**
  106. * 此方法应创建TestElement类的新实例,然后将其传递给modifyTestElement(TestElement) 方法
  107. *
  108. * @return
  109. */
  110. public TestElement createTestElement() {
  111. KafkaSampler sampler = new KafkaSampler();
  112. modifyTestElement((TestElement) sampler);
  113. return (TestElement) sampler;
  114. }
  115. @Override
  116. public void clearGui() {
  117. super.clearGui();
  118. brokerAddressField.setText("");
  119. topicNameField.setText("");
  120. }
  121. /**
  122. * 一定要调用super.configure(e)。这将为您填充一些数据,例如元素的名称。
  123. * 使用此方法将数据设置到GUI元素中
  124. * 通过调用此方法,可以用测试元素对象的内容初始化新创建的GUI组件。组件负责查询测试元素对象,以获取要在其GUI中显示的相关信息。
  125. *
  126. * @param element
  127. */
  128. @Override
  129. public void configure(TestElement element) {
  130. super.configure(element);
  131. KafkaSampler sampler = (KafkaSampler) element;
  132. brokerAddressField.setText(sampler.getBrokerraddress());
  133. log.info("设置broker集群地址为:" + sampler.getBrokerraddress());
  134. topicNameField.setText(sampler.getTopicName());
  135. log.info("设置集群主题为:" + sampler.getTopicName());
  136. textMessage.setText(sampler.getMessage());
  137. log.info("设置发送消息为:" + sampler.getMessage());
  138. }
  139. @Override
  140. public String getStaticLabel() {
  141. return "Kafka Sampler";
  142. }
  143. public String getLabelResource() {
  144. throw new IllegalStateException("This shouldn't be called");
  145. }
  146. /**
  147. * 将GUI元素中的数据移动到TestElement,
  148. *
  149. * @param e
  150. */
  151. public void modifyTestElement(TestElement e) {
  152. e.clear();
  153. // 调用super.configureTestElement(e)。这将处理一些默认数据
  154. configureTestElement(e);
  155. KafkaSampler sampler = new KafkaSampler();
  156. ((KafkaSampler) e).setBrokerAddress(brokerAddressField.getText());
  157. // log.info("填入的broker集群地址为:" + this.brokerAddressField.getText());
  158. ((KafkaSampler) e).setTopicName(topicNameField.getText());
  159. // log.info("填入的主题名称地址为:" + this.topicNameField.getText());
  160. ((KafkaSampler) e).setMessage(textMessage.getText());
  161. // log.info("填入消息为:" + this.textMessage.getText());
  162. //添加监听
  163. // readWriteMode.addActionListener(this);
  164. // readMode.addActionListener(this);
  165. // writeMode.addActionListener(this);
  166. }
  167. public void actionPerformed(ActionEvent e) {
  168. // if (e.getSource() == readWriteMode) {
  169. // readWriteMode.setSelected(true);
  170. // readMode.setSelected(false);
  171. // writeMode.setSelected(false);
  172. // log.info("读写模式新增contentPanel");
  173. //
  174. // contentPanel.setVisible(true);
  175. //
  176. // log.info("读写模式新增filterPanel");
  177. //
  178. // filterPanel.setVisible(true);
  179. //
  180. // log.info("读写模式新增readNumPanel");
  181. //
  182. // readNumPanel.setVisible(true);
  183. //
  184. //
  185. // extendPanel.setVisible(true);
  186. // updateUI();
  187. // repaint();
  188. //
  189. // }
  190. // if (e.getSource() == writeMode) {
  191. //
  192. // writeMode.setSelected(true);
  193. // readWriteMode.setSelected(false);
  194. // readMode.setSelected(false);
  195. //
  196. // log.info("写模式新增contentPanel");
  197. // contentPanel.setVisible(true);
  198. //
  199. // filterPanel.setVisible(false);
  200. //
  201. // readNumPanel.setVisible(false);
  202. //
  203. // extendPanel.setVisible(false);
  204. // extendPanel.setVisible(true);
  205. //
  206. // updateUI();
  207. // repaint();
  208. //
  209. //
  210. // }
  211. // if (e.getSource() == readMode) {
  212. // readMode.setSelected(true);
  213. // writeMode.setSelected(false);
  214. // readWriteMode.setSelected(false);
  215. // log.info("读模式新增filterPanel");
  216. //
  217. // contentPanel.setVisible(false);
  218. // filterPanel.setVisible(true);
  219. //
  220. // log.info("读模式新增readNumPanel");
  221. // readNumPanel.setVisible(true);
  222. //
  223. // extendPanel.setVisible(true);
  224. //
  225. // updateUI();
  226. // repaint();
  227. //
  228. // }
  229. }
  230. }

业务代码:

  1. package com.jmeter.kafka;
  2. public interface IKafkaProducer {
  3. public void close();
  4. public void sendString(String topicName, String message);
  5. }
  1. package com.jmeter.kafka;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import java.util.Properties;
  4. public class ProduceConfig extends Properties{
  5. Properties kafkaProps = new Properties();
  6. public ProduceConfig(String brokerraddress) {
  7. kafkaProps.put("bootstrap.servers", brokerraddress);
  8. kafkaProps.put("key.serializer",
  9. "org.apache.kafka.common.serialization.StringSerializer");
  10. kafkaProps.put("value.serializer",
  11. "org.apache.kafka.common.serialization.StringSerializer");
  12. }
  13. }
  1. package com.jmeter.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. public class ProducerPool extends ProduceConfig implements IKafkaProducer {
  5. KafkaProducer<String, String> producer;
  6. public ProducerPool(String brokerraddress) {
  7. super(brokerraddress);
  8. }
  9. public void close() {
  10. if (producer!=null){
  11. producer.close();
  12. }
  13. }
  14. public void sendString(String topicName, String message) {
  15. ProducerRecord<String,String> record = new ProducerRecord<String,String>(topicName,message); // 主题,key,value
  16. producer = new KafkaProducer<String, String>(kafkaProps);
  17. producer.send(record);
  18. }
  19. }
  1. package com.jmeter.sampler;
  2. import com.jmeter.kafka.IKafkaProducer;
  3. import com.jmeter.kafka.ProducerPool;
  4. import org.apache.jmeter.samplers.AbstractSampler;
  5. import org.apache.jmeter.samplers.Entry;
  6. import org.apache.jmeter.samplers.SampleResult;
  7. import org.apache.jmeter.testelement.TestElement;
  8. public class KafkaSampler extends AbstractSampler implements TestElement {
  9. public static final String BROKERRADDRESS = "KafkaSampler.brokerAddress";
  10. public static final String TOPICNAME = "SFKafkaSampler.topicName";
  11. public static final String MESSAGE = "SFKafkaSampler.message";
  12. public void setBrokerAddress(String clusterAddress) {
  13. setProperty(BROKERRADDRESS, clusterAddress);
  14. }
  15. public void setTopicName(String topicName) {
  16. setProperty(TOPICNAME, topicName);
  17. }
  18. public void setMessage(String message) {
  19. setProperty(MESSAGE, message);
  20. }
  21. public String getMessage() {
  22. return getPropertyAsString(MESSAGE);
  23. }
  24. public String getBrokerraddress() {
  25. return getPropertyAsString(BROKERRADDRESS);
  26. }
  27. public String getTopicName() {
  28. return getPropertyAsString(TOPICNAME);
  29. }
  30. public KafkaSampler() {
  31. setName("Kafka Sampler");
  32. }
  33. public SampleResult sample(Entry entry) {
  34. SampleResult result = new SampleResult();
  35. result.setSampleLabel(getName());
  36. try {
  37. result.sampleStart();
  38. //写入业务数据
  39. IKafkaProducer kafkaProducer = new ProducerPool(getBrokerraddress());
  40. kafkaProducer.sendString(getTopicName(), getMessage());
  41. result.setSamplerData("请求集群地址为:" + getBrokerraddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "\n" + "请求消息为:" + getMessage());
  42. result.setResponseData(getMessage(), "utf-8");
  43. result.sampleEnd();
  44. result.setSuccessful(true);
  45. result.setResponseCodeOK();
  46. } catch (Exception e) {
  47. result.setSamplerData("请求集群地址为:" + getBrokerraddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "请求消息为:" + getMessage());
  48. result.sampleEnd(); // stop stopwatch
  49. result.setSuccessful(false);
  50. result.setResponseMessage("Exception: " + e);
  51. // get stack trace as a String to return as document data
  52. java.io.StringWriter stringWriter = new java.io.StringWriter();
  53. e.printStackTrace(new java.io.PrintWriter(stringWriter));
  54. result.setResponseData(stringWriter.toString(), null);
  55. result.setDataType(org.apache.jmeter.samplers.SampleResult.TEXT);
  56. result.setResponseCode("FAILED");
  57. }
  58. return result;
  59. }
  60. }

pom文件

  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>org.example</groupId>
  5. <artifactId>JmeterKafkaSampler</artifactId>
  6. <version>1.0-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>JmeterKafkaSampler</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. </properties>
  13. <dependencies>
  14. <!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_java -->
  15. <dependency>
  16. <groupId>org.apache.jmeter</groupId>
  17. <artifactId>ApacheJMeter_java</artifactId>
  18. <version>5.5</version>
  19. </dependency>
  20. <!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_core -->
  21. <dependency>
  22. <groupId>org.apache.jmeter</groupId>
  23. <artifactId>ApacheJMeter_core</artifactId>
  24. <version>5.5</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.kafka</groupId>
  28. <artifactId>kafka-clients</artifactId>
  29. <version>3.2.1</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>junit</groupId>
  33. <artifactId>junit</artifactId>
  34. <version>4.12</version>
  35. <scope>test</scope>
  36. </dependency>
  37. </dependencies>
  38. <build>
  39. <plugins>
  40. <plugin>
  41. <artifactId> maven-assembly-plugin </artifactId>
  42. <configuration>
  43. <descriptorRefs>
  44. <descriptorRef>jar-with-dependencies</descriptorRef>
  45. </descriptorRefs>
  46. <archive>
  47. <manifest>
  48. <mainClass>com.xx.util.GenerateSortCode</mainClass>
  49. </manifest>
  50. </archive>
  51. </configuration>
  52. <executions>
  53. <execution>
  54. <id>make-assembly</id>
  55. <phase>package</phase>
  56. <goals>
  57. <goal>single</goal>
  58. </goals>
  59. </execution>
  60. </executions>
  61. </plugin>
  62. </plugins>
  63. </build>
  64. </project>

然后用maven打完包就可以放到Jmeter/lib/ext目录下,重启JMeter后新建Sampler可以查看到自己定义的Kafka Sampler如下图:

界面如下:

 测试验证

经测试数据是成功写入到对应的kafka主题中了,到此就完成了Kafka插件的写入功能开发

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/秋刀鱼在做梦/article/detail/824522
推荐阅读
相关标签
  

闽ICP备14008679号