赞
踩
加油,新时代打工人!
1、导入Maven Kafka POM依赖
<repositories> <repository> <id>central</id> <url>http://maven.aliyun.com/nexus/content/groups/public//</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>true</enabled> <updatePolicy>always</updatePolicy> <checksumPolicy>fail</checksumPolicy> </snapshots> </repository> </repositories> <dependencies> <!-- kafka客户端工具 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <!-- 工具类 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <!-- SLF桥接LOG4J日志 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.6</version> </dependency> <!-- SLOG4J日志 --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.7.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build>
2、代码
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.88.100:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
参考代码:
public class KafkaProducerTest { public static void main(String[] args) { // 1. 创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.88.100:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 创建一个生产者对象KafkaProducer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); // 3. 调用send发送1-100消息到指定Topic test for(int i = 0; i < 100; ++i) { try { // 获取返回值Future,该对象封装了返回值 Future<RecordMetadata> future = producer.send(new ProducerRecord<String, String>("test", null, i + "")); // 调用一个Future.get()方法等待响应 future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } // 5. 关闭生产者 producer.close(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。