赞
踩
目录
之前写了一个Jmeter kafka 插件是基于公司对kafka二次封装写的,这次基于原生kafka写一个插件,如下,废话不多说,中心思想是一样的,可以参考我上篇文章
用于填写Kafka broker地址,一般为IP+端口号如:127.0.0.1:1234
用于填写Kafka主题名称,如:test
填写Kafka发送的报文,如:test123
需要建一个gui包和一个sampler包,Jmeter在加载插件的时候会加载gui里面的类,可以参考JMeter源码的包命名方式,以下是参考的项目目录结构:
- package com.jmeter.gui;
-
- import com.jmeter.sampler.KafkaSampler;
- import org.apache.jmeter.gui.util.JSyntaxTextArea;
- import org.apache.jmeter.gui.util.JTextScrollPane;
- import org.apache.jmeter.gui.util.VerticalPanel;
- import org.apache.jmeter.samplers.gui.AbstractSamplerGui;
- import org.apache.jmeter.testelement.TestElement;
- import org.apache.jorphan.gui.layout.VerticalLayout;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import javax.swing.*;
- import java.awt.*;
- import java.awt.event.ActionEvent;
- import java.awt.event.ActionListener;
-
- public class KafkaSamplerUI extends AbstractSamplerGui implements ActionListener {
- private static final long serialVersionUID = 1L;
- private static final Logger log = LoggerFactory.getLogger(KafkaSamplerUI.class);
-
- private JLabel brokerAddressLable;
- private JTextField brokerAddressField;
-
- // private JLabel clusterNameLabel;
- // private JTextField clusterNameField;
-
- private JLabel topicNameLabel;
- private JTextField topicNameField;
-
- // private JLabel chekCodeLabel;
- // private JTextField chekCodeField;
-
-
- private JSyntaxTextArea textMessage = new JSyntaxTextArea(40, 50);
-
- private JLabel textArea = new JLabel();
- private JTextScrollPane textPanel = new JTextScrollPane(textMessage);
-
- // private JLabel kafkaLabel;
- // private JRadioButton readWriteMode;
- // private JRadioButton writeMode;
- // private JRadioButton readMode;
-
-
- Box brokerAddressPanel, topicNamePanel, kafkaPanel;
- // static Box filterPanel, readNumPanel;
-
- JPanel mainPanel = new VerticalPanel();
- JPanel extendPanel = new VerticalPanel();
- static JPanel contentPanel;
-
-
- public KafkaSamplerUI() {
- super();
- this.init();
- }
-
- //创建消息输入框
- private JPanel createContentPanel() {
-
- JPanel ContentPanel = new VerticalPanel();
- JPanel messageContentPanel = new JPanel(new BorderLayout());
- messageContentPanel.add(this.textArea, BorderLayout.NORTH);
- messageContentPanel.add(this.textPanel, BorderLayout.CENTER);
- ContentPanel.add(messageContentPanel);
- ContentPanel.setBorder(BorderFactory.createTitledBorder(BorderFactory.createLineBorder(Color.gray), "写入数据"));
-
- return ContentPanel;
- }
-
- private Box createTextFiledPanle(String name) {
-
- JLabel jLabel = new JLabel(name);
- JTextField jTextField = new JTextField(6);
-
- Box box = Box.createHorizontalBox();
- box.add(jLabel);
- box.add(jTextField);
- return box;
- }
-
-
- private void init() {
- log.info("初始化UI界面");
-
- brokerAddressLable = new JLabel("broker endpoint(s)地址:");
- brokerAddressField = new JTextField(6);
-
-
- topicNameLabel = new JLabel("主题名称:");
- topicNameField = new JTextField(6);
-
-
- // kafkaLabel = new JLabel("kafka:");
- //
- // readWriteMode = new JRadioButton("读写模式");
- // writeMode = new JRadioButton("写模式");
- // readMode = new JRadioButton("读模式");
-
- // kafkaPanel = Box.createHorizontalBox();
- //
- // kafkaPanel.add(kafkaLabel);
- // kafkaPanel.add(readWriteMode);
- // kafkaPanel.add(writeMode);
- // kafkaPanel.add(readMode);
-
- //设置默认读写方式
- // writeMode.setSelected(true);
- // readWriteMode.setSelected(false);
- // readMode.setSelected(false);
-
- contentPanel = createContentPanel();
- contentPanel.setVisible(true);
- // filterPanel = createTextFiledPanle("筛选条件");
- //
- // readNumPanel = createTextFiledPanle("读取条数");
- setLayout(new VerticalLayout(5, VerticalLayout.BOTH, VerticalLayout.TOP));
-
- setBorder(makeBorder());
- add(makeTitlePanel());
-
- brokerAddressPanel = Box.createHorizontalBox();
-
- brokerAddressPanel.add(brokerAddressLable);
- brokerAddressPanel.add(brokerAddressField);
-
-
- topicNamePanel = Box.createHorizontalBox();
-
- topicNamePanel.add(topicNameLabel);
- topicNamePanel.add(topicNameField);
-
-
- mainPanel.add(brokerAddressPanel);
- mainPanel.add(topicNamePanel);
- // mainPanel.add(kafkaPanel);
-
- extendPanel.add(contentPanel);
- // extendPanel.add(filterPanel);
- // extendPanel.add(readNumPanel);
- extendPanel.setVisible(true);
- add(mainPanel, BorderLayout.CENTER);
- add(extendPanel, BorderLayout.CENTER);
-
-
- }
-
- /**
- * 此方法应创建TestElement类的新实例,然后将其传递给modifyTestElement(TestElement) 方法
- *
- * @return
- */
- public TestElement createTestElement() {
- KafkaSampler sampler = new KafkaSampler();
- modifyTestElement((TestElement) sampler);
- return (TestElement) sampler;
- }
-
- @Override
- public void clearGui() {
- super.clearGui();
- brokerAddressField.setText("");
- topicNameField.setText("");
-
-
- }
-
- /**
- * 一定要调用super.configure(e)。这将为您填充一些数据,例如元素的名称。
- * 使用此方法将数据设置到GUI元素中
- * 通过调用此方法,可以用测试元素对象的内容初始化新创建的GUI组件。组件负责查询测试元素对象,以获取要在其GUI中显示的相关信息。
- *
- * @param element
- */
- @Override
- public void configure(TestElement element) {
- super.configure(element);
-
- KafkaSampler sampler = (KafkaSampler) element;
- brokerAddressField.setText(sampler.getBrokerraddress());
- log.info("设置broker集群地址为:" + sampler.getBrokerraddress());
-
-
- topicNameField.setText(sampler.getTopicName());
- log.info("设置集群主题为:" + sampler.getTopicName());
-
-
- textMessage.setText(sampler.getMessage());
- log.info("设置发送消息为:" + sampler.getMessage());
-
- }
-
-
- @Override
- public String getStaticLabel() {
- return "Kafka Sampler";
- }
-
- public String getLabelResource() {
- throw new IllegalStateException("This shouldn't be called");
- }
-
- /**
- * 将GUI元素中的数据移动到TestElement,
- *
- * @param e
- */
-
- public void modifyTestElement(TestElement e) {
- e.clear();
- // 调用super.configureTestElement(e)。这将处理一些默认数据
- configureTestElement(e);
-
- KafkaSampler sampler = new KafkaSampler();
-
- ((KafkaSampler) e).setBrokerAddress(brokerAddressField.getText());
- // log.info("填入的broker集群地址为:" + this.brokerAddressField.getText());
-
-
- ((KafkaSampler) e).setTopicName(topicNameField.getText());
- // log.info("填入的主题名称地址为:" + this.topicNameField.getText());
-
- ((KafkaSampler) e).setMessage(textMessage.getText());
- // log.info("填入消息为:" + this.textMessage.getText());
-
-
- //添加监听
- // readWriteMode.addActionListener(this);
- // readMode.addActionListener(this);
- // writeMode.addActionListener(this);
-
-
- }
-
- public void actionPerformed(ActionEvent e) {
- // if (e.getSource() == readWriteMode) {
- // readWriteMode.setSelected(true);
- // readMode.setSelected(false);
- // writeMode.setSelected(false);
- // log.info("读写模式新增contentPanel");
- //
- // contentPanel.setVisible(true);
- //
- // log.info("读写模式新增filterPanel");
- //
- // filterPanel.setVisible(true);
- //
- // log.info("读写模式新增readNumPanel");
- //
- // readNumPanel.setVisible(true);
- //
- //
- // extendPanel.setVisible(true);
- // updateUI();
- // repaint();
- //
- // }
- // if (e.getSource() == writeMode) {
- //
- // writeMode.setSelected(true);
- // readWriteMode.setSelected(false);
- // readMode.setSelected(false);
- //
- // log.info("写模式新增contentPanel");
- // contentPanel.setVisible(true);
- //
- // filterPanel.setVisible(false);
- //
- // readNumPanel.setVisible(false);
- //
- // extendPanel.setVisible(false);
- // extendPanel.setVisible(true);
- //
- // updateUI();
- // repaint();
- //
- //
- // }
- // if (e.getSource() == readMode) {
- // readMode.setSelected(true);
- // writeMode.setSelected(false);
- // readWriteMode.setSelected(false);
- // log.info("读模式新增filterPanel");
- //
- // contentPanel.setVisible(false);
- // filterPanel.setVisible(true);
- //
- // log.info("读模式新增readNumPanel");
- // readNumPanel.setVisible(true);
- //
- // extendPanel.setVisible(true);
- //
- // updateUI();
- // repaint();
- //
- // }
-
- }
- }
-
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.jmeter.kafka;
-
- public interface IKafkaProducer {
- public void close();
-
- public void sendString(String topicName, String message);
- }
- package com.jmeter.kafka;
-
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- import java.util.Properties;
-
- public class ProduceConfig extends Properties{
- Properties kafkaProps = new Properties();
-
-
- public ProduceConfig(String brokerraddress) {
- kafkaProps.put("bootstrap.servers", brokerraddress);
- kafkaProps.put("key.serializer",
- "org.apache.kafka.common.serialization.StringSerializer");
- kafkaProps.put("value.serializer",
- "org.apache.kafka.common.serialization.StringSerializer");
-
- }
-
-
- }
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.jmeter.kafka;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- public class ProducerPool extends ProduceConfig implements IKafkaProducer {
- KafkaProducer<String, String> producer;
-
- public ProducerPool(String brokerraddress) {
- super(brokerraddress);
- }
-
- public void close() {
- if (producer!=null){
- producer.close();
- }
- }
-
- public void sendString(String topicName, String message) {
- ProducerRecord<String,String> record = new ProducerRecord<String,String>(topicName,message); // 主题,key,value
-
- producer = new KafkaProducer<String, String>(kafkaProps);
- producer.send(record);
- }
- }
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- package com.jmeter.sampler;
-
- import com.jmeter.kafka.IKafkaProducer;
- import com.jmeter.kafka.ProducerPool;
- import org.apache.jmeter.samplers.AbstractSampler;
- import org.apache.jmeter.samplers.Entry;
- import org.apache.jmeter.samplers.SampleResult;
- import org.apache.jmeter.testelement.TestElement;
-
-
- public class KafkaSampler extends AbstractSampler implements TestElement {
-
-
- public static final String BROKERRADDRESS = "KafkaSampler.brokerAddress";
-
- public static final String TOPICNAME = "SFKafkaSampler.topicName";
-
- public static final String MESSAGE = "SFKafkaSampler.message";
-
- public void setBrokerAddress(String clusterAddress) {
-
- setProperty(BROKERRADDRESS, clusterAddress);
-
- }
-
-
- public void setTopicName(String topicName) {
-
- setProperty(TOPICNAME, topicName);
-
- }
-
-
- public void setMessage(String message) {
-
-
- setProperty(MESSAGE, message);
-
- }
-
-
- public String getMessage() {
-
- return getPropertyAsString(MESSAGE);
-
- }
-
-
- public String getBrokerraddress() {
-
- return getPropertyAsString(BROKERRADDRESS);
-
- }
-
-
- public String getTopicName() {
-
- return getPropertyAsString(TOPICNAME);
-
- }
-
-
- public KafkaSampler() {
-
- setName("Kafka Sampler");
-
- }
-
-
- public SampleResult sample(Entry entry) {
-
- SampleResult result = new SampleResult();
-
- result.setSampleLabel(getName());
-
- try {
-
- result.sampleStart();
-
- //写入业务数据
-
-
-
- IKafkaProducer kafkaProducer = new ProducerPool(getBrokerraddress());
-
-
- kafkaProducer.sendString(getTopicName(), getMessage());
-
-
-
-
- result.setSamplerData("请求集群地址为:" + getBrokerraddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "\n" + "请求消息为:" + getMessage());
-
-
- result.setResponseData(getMessage(), "utf-8");
-
-
- result.sampleEnd();
-
- result.setSuccessful(true);
-
- result.setResponseCodeOK();
-
-
- } catch (Exception e) {
-
- result.setSamplerData("请求集群地址为:" + getBrokerraddress() + "\n" + "请求集群主题为:" + getTopicName() + "\n" + "请求消息为:" + getMessage());
-
-
- result.sampleEnd(); // stop stopwatch
-
- result.setSuccessful(false);
-
- result.setResponseMessage("Exception: " + e);
-
- // get stack trace as a String to return as document data
-
- java.io.StringWriter stringWriter = new java.io.StringWriter();
-
- e.printStackTrace(new java.io.PrintWriter(stringWriter));
-
- result.setResponseData(stringWriter.toString(), null);
-
- result.setDataType(org.apache.jmeter.samplers.SampleResult.TEXT);
-
- result.setResponseCode("FAILED");
-
- }
-
- return result;
-
- }
-
-
- }
-
-
-
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>org.example</groupId>
- <artifactId>JmeterKafkaSampler</artifactId>
- <version>1.0-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>JmeterKafkaSampler</name>
- <url>http://maven.apache.org</url>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- </properties>
-
- <dependencies>
- <!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_java -->
- <dependency>
- <groupId>org.apache.jmeter</groupId>
- <artifactId>ApacheJMeter_java</artifactId>
- <version>5.5</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.jmeter/ApacheJMeter_core -->
- <dependency>
- <groupId>org.apache.jmeter</groupId>
- <artifactId>ApacheJMeter_core</artifactId>
- <version>5.5</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>3.2.1</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
-
- <plugins>
- <plugin>
-
- <artifactId> maven-assembly-plugin </artifactId>
-
- <configuration>
-
- <descriptorRefs>
-
- <descriptorRef>jar-with-dependencies</descriptorRef>
-
- </descriptorRefs>
-
- <archive>
-
- <manifest>
-
- <mainClass>com.xx.util.GenerateSortCode</mainClass>
-
- </manifest>
-
- </archive>
-
- </configuration>
-
- <executions>
-
- <execution>
-
- <id>make-assembly</id>
-
- <phase>package</phase>
-
- <goals>
-
- <goal>single</goal>
-
- </goals>
-
- </execution>
-
- </executions>
-
- </plugin>
- </plugins>
- </build>
-
- </project>
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
然后用maven打完包就可以放到Jmeter/lib/ext目录下,重启JMeter后新建Sampler可以查看到自己定义的Kafka Sampler如下图:
经测试数据是成功写入到对应的kafka主题中了,到此就完成了Kafka插件的写入功能开发
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。