当前位置:   article > 正文

大数据——使用Java连接至Kafka_java如何解析程序并将数据投到kafka

java如何解析程序并将数据投到kafka

使用Java连接至Kafka

使用Java连接至Kafka

  • 创建一个topic,并查看详情
[root@hadoop100 ~]# kafka-topics.sh --zookeeper 192.168.136.100:2181 --create --topic kb09two --partitions 3 --replication-factor 1
[root@hadoop100 ~]# kafka-topics.sh --zookeeper 192.168.136.100:2181 --topic kb09two --describe
  • 1
  • 2

在这里插入图片描述

  • 打开IDEA
  • 添加依赖包
</dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 所有的依赖包
<dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core -->
    <dependency>
      <groupId>org.apache.flume</groupId>
      <artifactId>flume-ng-core</artifactId>
      <version>1.6.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.0.0</version>
    </dependency>
  </dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 创建生产者MyProducer类
package nj.zb.kb09;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class MyProducer {
	public static void main(String[] args) {
		Properties prop = new Properties();
		 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.100:9092");
		 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		 prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
		 prop.put(ProducerConfig.ACKS_CONFIG,"-1");

		KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
		for (int i =0; i <200 ; i++) {
			ProducerRecord<String,String> producerRecord=new ProducerRecord<>("kb09two","hello world" +i);
			producer.send(producerRecord);
			try {
				Thread.sleep(100);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		System.out.println("Game over");
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 运行程序在这里插入图片描述
  • 进入kafka,查看topic中各个分区中的数据数量
[root@hadoop100 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.136.100:9092 --topic kb09two --time -1 --offsets 1
  • 1

在这里插入图片描述

  • 从消费者拉取生产者传入的数据
[root@hadoop100 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.136.100:9092 --topic kb09two --from-beginning
  • 1

在这里插入图片描述

  • 创建消费者单线程MyConsumer类拉取数据
package nj.zb.kb09;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer {
	public static void main(String[] args) {
		Properties prop = new Properties();
		prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.100:9092");
		prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
		prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		//earliest 从最早的开始(不记录提交点)
		//latest 从最新的开始(记录提交点)
		//none 报错
		prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


		prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
		//消费者订阅
		consumer.subscribe(Collections.singleton("kb09two"));

		//一个消费者组G1里只有一个消费者
		while (true){
			ConsumerRecords<String, String> poll = consumer.poll(100);
			for (ConsumerRecord<String,String> record:
				 poll) {
				System.out.println(record.offset() + "\t" + record.key() + "\t" + record.value());

			}
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

结果展示:
在这里插入图片描述

  • 创建消费者多线程MyConsumer2类拉取数据
package nj.zb.kb09;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer2 {
	public static void main(String[] args) {
		Properties prop = new Properties();
		prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.100:9092");
		prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
		prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
		prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		//earliest 从最早的开始(不记录提交点)
		//latest 从最新的开始(记录提交点)
		//none 报错
		prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
		
		//模拟多消费者在同一个消费者分组里G2
		prop.put(ConsumerConfig.GROUP_ID_CONFIG, "G2");
		for (int i = 0; i < 3; i++) {
			new Thread(new Runnable() {
				@Override
				public void run() {
					KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
					consumer.subscribe(Collections.singleton("kb09two"));
					while (true){
						ConsumerRecords<String, String> poll = consumer.poll(100);
						for (ConsumerRecord<String,String> record:
								poll) {
							System.out.println(Thread.currentThread().getName()+"\t"+record.offset() + "\t" + record.key() + "\t" + record.value());

						}
					}
				}
			}).start();
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

结果展示:
在这里插入图片描述
注意:这里我们创建的topic是三个分区,所以我们设置的i为3,让它们分别去每个分区拉取数据(一个分区的数据同时只能让一个消费者去拉取)。也可以把i设置为大于3的数,但拉取数据时,只会等上一个消费者拉取数据结束后,才会去拉取数据

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/958846
推荐阅读
相关标签
  

闽ICP备14008679号