赞
踩
一、在java中配置pom
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.8.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>2.8.0</version>
- </dependency>
- </dependencies>
二、生产者方法
(1)、Producer
Java中写在生产者输入内容在kafka中可以让消费者提取
[root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22
- package nj.zb.kb22.Kafka;
-
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
- import java.util.Properties;
- import java.util.Scanner;
-
- /**
- * 用java生产消息 在xshell消费消息
- */
- public class MyProducer {
- public static void main(String[] args) {
- Properties properties = new Properties();
- //生产者的配置文件
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
- //key的序列化
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- //value的序列化
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
- /**
- * ack应答机制
- * 0
- * 1
- * all
- */
- properties.put(ProducerConfig.ACKS_CONFIG,"1");
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
- Scanner scanner = new Scanner(System.in);
- while (true){
- System.out.println("请输入kafka的内容");
-
- String msg =scanner.next();
- ProducerRecord<String,String> record = new ProducerRecord<String, String>("kb22",msg);
- producer.send(record);
- }
- }
- }
(2)、Producer进行多线程操作
生产者多线程是一种常见的技术实践,可以提高消息生产的并发性和吞吐量。通过将消息生产任务分配给多个线程来并行地发送消息,可以有效地利用系统资源,加快消息的发送速度。
- package nj.zb.kb22.Kafka;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringSerializer;
- import java.util.Properties;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
-
- public class MyProducer2 {
- public static void main(String[] args) {
- ExecutorService executorService = Executors.newCachedThreadPool();
- for (int i = 0; i < 10; i++) {//i代表线程
- Thread thread =new Thread(new Runnable() {
- @Override
- public void run() {
- Properties properties = new Properties();
-
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
- properties.put(ProducerConfig.ACKS_CONFIG,"0");
- KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
- //多线程操作 j代表消息
- for (int j = 0; j < 100; j++) {
- String msg=Thread.currentThread().getName()+" "+ j;
- System.out.println(msg);
- ProducerRecord<String, String> re = new ProducerRecord<String, String>("kb22", msg);
- producer.send(re);
-
- }
-
- }
- });
- executorService.execute(thread);
- }
- executorService.shutdown();
- while (true){
- if (executorService.isTerminated()){
- System.out.println("game over");
- break;
- }
-
- }
- }
- }
三、消费者方法
(1)、Consumer
通过java来实现消费者
- package nj.zb.kb22.Kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- public class MyConsumer {
- public static void main(String[] args) {
- Properties properties = new Properties();
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
-
- //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
-
- /**
- * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
- * latest 获取最新的数据(当前没有获取过此topic信息)
- * none
- * group消费者分组的概念
- */
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
- //创建好kafka消费者对象后,订阅消息,指定消费的topic
- consumer.subscribe(Collections.singleton("kb22"));
-
- while (true){
- Duration mills = Duration.ofMillis(100);
- ConsumerRecords<String, String> records = consumer.poll(mills);
- for (ConsumerRecord<String,String> record:records){
- String topic = record.topic();
- int partition = record.partition();
- long offset = record.offset();
- String key = record.key();
- String value = record.value();
- long timestamp = record.timestamp();
- System.out.println("topic:"+topic+"\tpartition"+partition+"\toffset"+offset+"\tkey"+key+"\tvalue"+value+"\ttimestamp"+timestamp);
- }
- //consumer.commitAsync();//手动提交
- }
- }
- }
(2)、设置多人访问
- package nj.zb.kb22.Kafka;
-
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.common.serialization.StringDeserializer;
-
- import java.time.Duration;
- import java.util.Collections;
- import java.util.Properties;
-
- public class MyConsumerThread {
- //模仿多人访问
- public static void main(String[] args) {
- for (int i = 0; i <3; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- Properties properties = new Properties();
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
-
- //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
-
- /**
- * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
- * latest 获取最新的数据(当前没有获取过此topic信息)
- * none
- * group消费者分组的概念
- */
- properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
- properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
-
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- consumer.subscribe(Collections.singleton("kb22"));
- while (true){
- //poll探寻数据
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- for (ConsumerRecord<String,String>record:records){
- String topic = record.topic();
- int partition = record.partition();
- long offset = record.offset();
- String key = record.key();
- String value = record.value();
- long timestamp = record.timestamp();
- String name = Thread.currentThread().getName();
- System.out.println("name"+name
- +"\ttopic:"+topic
- +"\tpartition" +partition
- +"\toffset"+offset
- +"\tkey"+key
- +"\tvalue"+value
- +"\ttimestamp"+timestamp
- );
- }
- }
- }
- }).start();
-
- }
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。