赞
踩
一、上一节讲了kafka的安装,这节讲一下Java开发kafka程序。
所需jar包
https://download.csdn.net/download/hezhihuahzh/10647732
可下载。
二、写代码前先大概说一下kafka的生产发送和接收消费的细节。
1、Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。
2、Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。
3、如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。
三、生产者(consumer)代码。发送简单的String类型数据。
1、生产者配置文件 producer_config.properties
- #kafka服务的ip和端口
- bootstrap.servers=192.168.169.128:9092
- #ack方式有三种方式,0,1,-1,这里选择全部
- acks=all
- retries=30
- batch.size=16384
- linger.ms=50
- buffer.memory=33554432
- auto.commit.interval.ms=1000
- compression.type=gzip
- reconnect.backoff.ms=20000
- retry.backoff.ms=20000
- #默认的key序列化方式
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- #默认的value序列化方式,发送String简单类型的即可,发送对象或者其他复杂数据,需要自定义序列化方式
- value.serializer=org.apache.kafka.common.serialization.StringSerializer

2、生产者Java代码
- //定义topic
- private final static String TOPIC1 = "appreportdata_700091";
- public static void send() {
- Producer<String, Object> producer = null;
- Properties props = null;
- try {
- //初始化kafka配置信息
- props = PropertyUtils.load("producer_config.properties");
- producer = new KafkaProducer<>(props);
- //发送1万条数据
- for(int i=0;i++;i<=10000){
- producer.send(new ProducerRecord<String, Object>(TOPIC1, i));
- }
-
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- try {
- producer.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }

PS:producer一定要关闭,否则会丢失数据。或者调用flush方法。
四、消费者(consumer)代码,接收String简单类型数据。
1、消费者端配置文件 consumer_config.properties
- bootstrap_servers=192.168.39.50:9092
- #groupID标示一个消费组,里边的消费者均衡消费某个topic的消息,不会重复
- group_id=appreportdata_900050
- #消费消息后是否自动提交事务
- enable_auto_commit=true
- auto_commit_interval_ms=1000
- auto_offset_reset=earliest
- session_timeout_ms=30000
- #key-value反序列化方式
- key_deserializer=org.apache.kafka.common.serialization.StringDeserializer
- value_deserializer=org.apache.kafka.common.serialization.StringDeserializer
2、消费者代码
- public static void getMessage() {
- //定义topic
- String TOPIC ="appreportdata_700091";
- ConsumerRecords<String, Object> records = null;
- Properties props = null;
- KafkaConsumer<String, Object> consumer = null;
- Properties props = null;
- try {
- props = PropertyUtils.load("consumer_config.properties");
- } catch (IOException e1) {
- e1.printStackTrace();
- }
- try {
- props = getProperties();
- consumer = new KafkaConsumer<>(props);
- // 订阅topic
- consumer.subscribe(Arrays.asList(TOPIC));
- while (true) {
- try {
- //这里500是每隔500毫秒拉取一次数据,每次拉取的数据个数由kafka配置决定,默认拉取条数是500条,如果少于500条,每次拉取条数随机
- records = consumer.poll(500);
- count=records.count();
- System.err.println(count);
-
- for (ConsumerRecord<String, Object> record : records) {
- String content= record.value();
- //把接收到的数据进行处理
- offset = record.offset();
- System.out.println("接收数据条数"+offset);
- }
- } catch (Exception e) {
-
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- }

3、properties加载工具类
-
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
-
- import org.apache.commons.io.IOUtils;
- public class PropertyUtils {
- private PropertyUtils(){
-
- }
-
- public static Properties load(File file) throws IOException{
-
- InputStream in = null;
- try {
- in = new FileInputStream(file);
- Properties props = new Properties();
- props.load(in);
-
- return props;
-
- }finally{
- IOUtils.closeQuietly(in);
- }
- }
-
- public static Properties load(String path) throws IOException{
-
- InputStream in = null;
- try {
- in = PropertyUtils.class.getClassLoader().getResourceAsStream(path);
- Properties props = new Properties();
- props.load(in);
-
- return props;
-
- }finally{
- IOUtils.closeQuietly(in);
- }
- }
- }

一个简单的发送接收demo就完成了。希望能给大家带来帮助。下节说一下kafka发送对象数据,自定义序列化和反序列化。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。