当前位置:   article > 正文

大数据之Kafka————java来实现kafka相关操作_java kafka

java kafka

一、在java中配置pom

  1. <dependencies>
  2. <dependency>
  3. <groupId>junit</groupId>
  4. <artifactId>junit</artifactId>
  5. <version>4.11</version>
  6. <scope>test</scope>
  7. </dependency>
  8. <dependency>
  9. <groupId>org.apache.kafka</groupId>
  10. <artifactId>kafka-clients</artifactId>
  11. <version>2.8.0</version>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.kafka</groupId>
  15. <artifactId>kafka_2.12</artifactId>
  16. <version>2.8.0</version>
  17. </dependency>
  18. </dependencies>

二、生产者方法

(1)、Producer

Java中写在生产者输入内容在kafka中可以让消费者提取

[root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22

  1. package nj.zb.kb22.Kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. import java.util.Scanner;
  8. /**
  9. * 用java生产消息 在xshell消费消息
  10. */
  11. public class MyProducer {
  12. public static void main(String[] args) {
  13. Properties properties = new Properties();
  14. //生产者的配置文件
  15. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
  16. //key的序列化
  17. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  18. //value的序列化
  19. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  20. /**
  21. * ack应答机制
  22. * 0
  23. * 1
  24. * all
  25. */
  26. properties.put(ProducerConfig.ACKS_CONFIG,"1");
  27. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  28. Scanner scanner = new Scanner(System.in);
  29. while (true){
  30. System.out.println("请输入kafka的内容");
  31. String msg =scanner.next();
  32. ProducerRecord<String,String> record = new ProducerRecord<String, String>("kb22",msg);
  33. producer.send(record);
  34. }
  35. }
  36. }

(2)、Producer进行多线程操作

  生产者多线程是一种常见的技术实践,可以提高消息生产的并发性和吞吐量。通过将消息生产任务分配给多个线程来并行地发送消息,可以有效地利用系统资源,加快消息的发送速度。

  1. package nj.zb.kb22.Kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerConfig;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import org.apache.kafka.common.serialization.StringSerializer;
  6. import java.util.Properties;
  7. import java.util.concurrent.ExecutorService;
  8. import java.util.concurrent.Executors;
  9. public class MyProducer2 {
  10. public static void main(String[] args) {
  11. ExecutorService executorService = Executors.newCachedThreadPool();
  12. for (int i = 0; i < 10; i++) {//i代表线程
  13. Thread thread =new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. Properties properties = new Properties();
  17. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
  18. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  19. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
  20. properties.put(ProducerConfig.ACKS_CONFIG,"0");
  21. KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
  22. //多线程操作 j代表消息
  23. for (int j = 0; j < 100; j++) {
  24. String msg=Thread.currentThread().getName()+" "+ j;
  25. System.out.println(msg);
  26. ProducerRecord<String, String> re = new ProducerRecord<String, String>("kb22", msg);
  27. producer.send(re);
  28. }
  29. }
  30. });
  31. executorService.execute(thread);
  32. }
  33. executorService.shutdown();
  34. while (true){
  35. if (executorService.isTerminated()){
  36. System.out.println("game over");
  37. break;
  38. }
  39. }
  40. }
  41. }

三、消费者方法

(1)、Consumer

通过java来实现消费者

  1. package nj.zb.kb22.Kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import org.apache.kafka.common.serialization.StringSerializer;
  8. import java.time.Duration;
  9. import java.util.Collections;
  10. import java.util.Properties;
  11. public class MyConsumer {
  12. public static void main(String[] args) {
  13. Properties properties = new Properties();
  14. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
  15. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  16. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  17. //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
  18. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  19. /**
  20. * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
  21. * latest 获取最新的数据(当前没有获取过此topic信息)
  22. * none
  23. * group消费者分组的概念
  24. */
  25. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  26. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
  27. KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
  28. //创建好kafka消费者对象后,订阅消息,指定消费的topic
  29. consumer.subscribe(Collections.singleton("kb22"));
  30. while (true){
  31. Duration mills = Duration.ofMillis(100);
  32. ConsumerRecords<String, String> records = consumer.poll(mills);
  33. for (ConsumerRecord<String,String> record:records){
  34. String topic = record.topic();
  35. int partition = record.partition();
  36. long offset = record.offset();
  37. String key = record.key();
  38. String value = record.value();
  39. long timestamp = record.timestamp();
  40. System.out.println("topic:"+topic+"\tpartition"+partition+"\toffset"+offset+"\tkey"+key+"\tvalue"+value+"\ttimestamp"+timestamp);
  41. }
  42. //consumer.commitAsync();//手动提交
  43. }
  44. }
  45. }

(2)、设置多人访问

  1. package nj.zb.kb22.Kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.apache.kafka.common.serialization.StringDeserializer;
  7. import java.time.Duration;
  8. import java.util.Collections;
  9. import java.util.Properties;
  10. public class MyConsumerThread {
  11. //模仿多人访问
  12. public static void main(String[] args) {
  13. for (int i = 0; i <3; i++) {
  14. new Thread(new Runnable() {
  15. @Override
  16. public void run() {
  17. Properties properties = new Properties();
  18. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.153.144:9092");
  19. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  20. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
  21. //设置拉取信息后是否自动提交(kafka记录当前app是否已经获取到此信息),false 手动提交 ;true 自动提交
  22. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
  23. /**
  24. * earliest 第一条数据开始拉取(当前应该没有获取过此topic信息)
  25. * latest 获取最新的数据(当前没有获取过此topic信息)
  26. * none
  27. * group消费者分组的概念
  28. */
  29. properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
  30. properties.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP3");
  31. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  32. consumer.subscribe(Collections.singleton("kb22"));
  33. while (true){
  34. //poll探寻数据
  35. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
  36. for (ConsumerRecord<String,String>record:records){
  37. String topic = record.topic();
  38. int partition = record.partition();
  39. long offset = record.offset();
  40. String key = record.key();
  41. String value = record.value();
  42. long timestamp = record.timestamp();
  43. String name = Thread.currentThread().getName();
  44. System.out.println("name"+name
  45. +"\ttopic:"+topic
  46. +"\tpartition" +partition
  47. +"\toffset"+offset
  48. +"\tkey"+key
  49. +"\tvalue"+value
  50. +"\ttimestamp"+timestamp
  51. );
  52. }
  53. }
  54. }
  55. }).start();
  56. }
  57. }
  58. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/448180
推荐阅读
相关标签
  

闽ICP备14008679号