赞
踩
推送:
- /**
- * 推送数据到kafka
- * @param key 推送数据 key 值
- * @param value 推送数据 value 值
- */
- public static void NormalProducer(String key, String value){
- Properties properties = new Properties();
- // 1.配置生产者启动的关键属性参数
-
- // 1.1 BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari-1:9092");
- // 1.2 CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
- // properties.put(ProducerConfig.CLIENT_ID_CONFIG, clientIdConfig);
- // 1.3 KEY_SERIALIZER_CLASS_CONFIG VALUE_SERIALIZER_CLASS_CONFIG
- // Q: 对 kafka的 key 和 value 做序列化,为什么需要序列化?
- // A: 因为KAFKA Broker 在接收消息的时候,必须要以二进制的方式接收,所以必须要对KEY和VALUE进行序列化
- // 字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
- // KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- // VALUE: 实际发送消息的内容
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-
- // 2.创建kafka生产者对象 传递properties属性参数集合
- KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
-
- // 3.构造消息内容
- ProducerRecord<String, String> record =
- //test:是 topic(主题) 名称,key:推送的key值,value:推送的value值
- new ProducerRecord<String, String>("test",key,value);
-
- // 4.发送消息
- try {
- RecordMetadata metadata = producer.send(record).get();
- System.out.println("消息发送成功,offset:" + metadata.offset());
- } catch (Exception e) {
- System.out.println("消息发送失败:" + e.getMessage());
- }
- // 5.关闭生产者
- producer.close();
-
-
- }
测试:
- /**
- * 测试获取kafaka推送数据
- */
- @Test
- public void runAJobNow() {
-
- // 1. 配置属性参数
- Properties properties = new Properties();
-
- properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ambari-1:9092");
-
- // org.apache.kafka.common.serialization.StringDeserializer
- properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- // 非常重要的属性配置:与我们消费者订阅组有关系
- properties.put(ConsumerConfig.GROUP_ID_CONFIG, "quickstart-group");
- // 常规属性:会话连接超时时间
- properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
- // 消费者提交offset: 自动提交 & 手工提交,默认是自动提交
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
- properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
- properties.put(ProducerConfig.ACKS_CONFIG, "all");
- properties.put(ProducerConfig.RETRIES_CONFIG, 1);
- // props.put("batch.size", 16384);
- properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
- properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
-
- // 2. 创建消费者对象
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
-
- // 3. 订阅你感兴趣的主题:test
- consumer.subscribe(Collections.singletonList("test"));
-
- System.err.println("quickstart consumer started...");
-
- try {
- // 4.采用拉取消息的方式消费数据
- while(true) {
- // 等待多久拉取一次消息
- // 拉取TOPIC_QUICKSTART主题里面所有的消息
- // topic 和 partition是 一对多的关系,一个topic可以有多个partition
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
- // 因为消息是在partition中存储的,所以需要遍历partition集合
- for(TopicPartition topicPartition : records.partitions()) {
- // 通过TopicPartition获取指定的消息集合,获取到的就是当前topicPartition下面所有的消息
- List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
- // 获取TopicPartition对应的主题名称
- String topic = topicPartition.topic();
- // 获取当前topicPartition下的消息条数
- int size = partitionRecords.size();
-
- System.err.println(String.format("--- 获取topic: %s, 分区位置:%s, 消息总数: %s",
- topic,
- topicPartition.partition(),
- size));
-
- for(int i = 0; i < size; i++) {
- ConsumerRecord<String, String> consumerRecord = partitionRecords.get(i);
- // 实际的数据内容
- String value = consumerRecord.value();
- String key = consumerRecord.key();
- // 当前获取的消息偏移量
- long offset = consumerRecord.offset();
- // ISR : High Watermark, 如果要提交的话,比如提交当前消息的offset+1
- // 表示下一次从什么位置(offset)拉取消息
- long commitOffser = offset + 1;
- System.err.println(String.format("获取实际消息 key:%s, value:%s, 消息offset: %s, 提交offset: %s",
- key, value, offset, commitOffser));
- }
- }
- }
- } finally {
- consumer.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。