当前位置:   article > 正文

java:连接kafka推送示例_java kafka推送数据代码

java kafka推送数据代码

推送:

  1. /**
  2. * 推送数据到kafka
  3. * @param key 推送数据 key 值
  4. * @param value 推送数据 value 值
  5. */
  6. public static void NormalProducer(String key, String value){
  7. Properties properties = new Properties();
  8. // 1.配置生产者启动的关键属性参数
  9. // 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
  10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari-1:9092");
  11. // 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
  12. // properties.put(ProducerConfig.CLIENT_ID_CONFIG, clientIdConfig);
  13. // 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
  14. // Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
  15. // A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
  16. // 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
  17. // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
  18. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  19. // VALUE: 实际发送消息的内容
  20. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  21. // 2.创建kafka生产者对象 传递properties属性参数集合
  22. KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
  23. // 3.构造消息内容
  24. ProducerRecord<String, String> record =
  25. //test:是 topic(主题) 名称,key:推送的key值,value:推送的value值
  26. new ProducerRecord<String, String>("test",key,value);
  27. // 4.发送消息
  28. try {
  29. RecordMetadata metadata = producer.send(record).get();
  30. System.out.println("消息发送成功,offset:" + metadata.offset());
  31. } catch (Exception e) {
  32. System.out.println("消息发送失败:" + e.getMessage());
  33. }
  34. // 5.关闭生产者
  35. producer.close();
  36. }

测试:

  1. /**
  2. * 测试获取kafaka推送数据
  3. */
  4. @Test
  5. public void runAJobNow() {
  6. // 1. 配置属性参数
  7. Properties properties = new Properties();
  8. properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari-1:9092");
  9. // org.apache.kafka.common.serialization.StringDeserializer
  10. properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  11. properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  12. // 非常重要的属性配置:与我们消费者订阅组有关系
  13. properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
  14. // 常规属性:会话连接超时时间
  15. properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
  16. // 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
  17. properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  18. properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
  19. properties.put(ProducerConfig.ACKS_CONFIG, "all");
  20. properties.put(ProducerConfig.RETRIES_CONFIG, 1);
  21. // props.put("batch.size", 16384);
  22. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  23. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  24. // 2. 创建消费者对象
  25. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
  26. // 3. 订阅你感兴趣的主题:test
  27. consumer.subscribe(Collections.singletonList("test"));
  28. System.err.println("quickstart consumer started...");
  29. try {
  30. // 4.采用拉取消息的方式消费数据
  31. while(true) {
  32. // 等待多久拉取一次消息
  33. // 拉取TOPIC_QUICKSTART主题里面所有的消息
  34. // topic 和 partition是 一对多的关系,一个topic可以有多个partition
  35. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  36. // 因为消息是在partition中存储的,所以需要遍历partition集合
  37. for(TopicPartition topicPartition : records.partitions()) {
  38. // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
  39. List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
  40. // 获取TopicPartition对应的主题名称
  41. String topic = topicPartition.topic();
  42. // 获取当前topicPartition下的消息条数
  43. int size = partitionRecords.size();
  44. System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
  45. topic,
  46. topicPartition.partition(),
  47. size));
  48. for(int i = 0; i < size; i++) {
  49. ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
  50. // 实际的数据内容
  51. String value = consumerRecord.value();
  52. String key = consumerRecord.key();
  53. // 当前获取的消息偏移量
  54. long offset = consumerRecord.offset();
  55. // ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
  56. // 表示下一次从什么位置(offset)拉取消息
  57. long commitOffser = offset + 1;
  58. System.err.println(String.format("获取实际消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s",
  59. key, value, offset, commitOffser));
  60. }
  61. }
  62. }
  63. } finally {
  64. consumer.close();
  65. }
  66. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/828946
推荐阅读
相关标签
  

闽ICP备14008679号