当前位置:   article > 正文

Kafka详解二Java程序实现发送与接收数据_java开发接收数据程序

java开发接收数据程序

一、上一节讲了kafka的安装,这节讲一下Java开发kafka程序。

所需jar包

https://download.csdn.net/download/hezhihuahzh/10647732 

可下载

二、写代码前先大概说一下kafka的生产发送和接收消费的细节。

       1、Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。

            2、Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即便是来自不同的consumer group的也不行。它不能像AMQ那样可以多个BET作为consumer去处理message,这是因为多个BET去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。

              3、如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。

三、生产者(consumer)代码。发送简单的String类型数据。

      1、生产者配置文件  producer_config.properties

  1. #kafka服务的ip和端口
  2. bootstrap.servers=192.168.169.128:9092
  3. #ack方式有三种方式,0,1,-1,这里选择全部
  4. acks=all
  5. retries=30
  6. batch.size=16384
  7. linger.ms=50
  8. buffer.memory=33554432
  9. auto.commit.interval.ms=1000
  10. compression.type=gzip
  11. reconnect.backoff.ms=20000
  12. retry.backoff.ms=20000
  13. #默认的key序列化方式
  14. key.serializer=org.apache.kafka.common.serialization.StringSerializer
  15. #默认的value序列化方式,发送String简单类型的即可,发送对象或者其他复杂数据,需要自定义序列化方式
  16. value.serializer=org.apache.kafka.common.serialization.StringSerializer

     2、生产者Java代码

  1. //定义topic
  2. private final static String TOPIC1 = "appreportdata_700091";
  3. public static void send() {
  4. Producer<String, Object> producer = null;
  5. Properties props = null;
  6. try {
  7. //初始化kafka配置信息
  8. props = PropertyUtils.load("producer_config.properties");
  9. producer = new KafkaProducer<>(props);
  10. //发送1万条数据
  11. for(int i=0;i++;i<=10000){
  12. producer.send(new ProducerRecord<String, Object>(TOPIC1, i));
  13. }
  14. } catch (IOException e1) {
  15. e1.printStackTrace();
  16. }
  17. try {
  18. producer.close();
  19. } catch (IOException e) {
  20. e.printStackTrace();
  21. }
  22. }

PS:producer一定要关闭,否则会丢失数据。或者调用flush方法。

四、消费者(consumer)代码,接收String简单类型数据。

    1、消费者端配置文件 consumer_config.properties

  1. bootstrap_servers=192.168.39.50:9092
  2. #groupID标示一个消费组,里边的消费者均衡消费某个topic的消息,不会重复
  3. group_id=appreportdata_900050
  4. #消费消息后是否自动提交事务
  5. enable_auto_commit=true
  6. auto_commit_interval_ms=1000
  7. auto_offset_reset=earliest
  8. session_timeout_ms=30000
  9. #key-value反序列化方式
  10. key_deserializer=org.apache.kafka.common.serialization.StringDeserializer
  11. value_deserializer=org.apache.kafka.common.serialization.StringDeserializer

2、消费者代码

  1. public static void getMessage() {
  2. //定义topic
  3. String TOPIC ="appreportdata_700091";
  4. ConsumerRecords<String, Object> records = null;
  5. Properties props = null;
  6. KafkaConsumer<String, Object> consumer = null;
  7. Properties props = null;
  8. try {
  9. props = PropertyUtils.load("consumer_config.properties");
  10. } catch (IOException e1) {
  11. e1.printStackTrace();
  12. }
  13. try {
  14. props = getProperties();
  15. consumer = new KafkaConsumer<>(props);
  16. // 订阅topic
  17. consumer.subscribe(Arrays.asList(TOPIC));
  18. while (true) {
  19. try {
  20. //这里500是每隔500毫秒拉取一次数据,每次拉取的数据个数由kafka配置决定,默认拉取条数是500条,如果少于500条,每次拉取条数随机
  21. records = consumer.poll(500);
  22. count=records.count();
  23. System.err.println(count);
  24. for (ConsumerRecord<String, Object> record : records) {
  25. String content= record.value();
  26. //把接收到的数据进行处理
  27. offset = record.offset();
  28. System.out.println("接收数据条数"+offset);
  29. }
  30. } catch (Exception e) {
  31. }
  32. }
  33. } catch (Exception e) {
  34. e.printStackTrace();
  35. }
  36. }

3、properties加载工具类

  1. import java.io.File;
  2. import java.io.FileInputStream;
  3. import java.io.IOException;
  4. import java.io.InputStream;
  5. import java.util.Properties;
  6. import org.apache.commons.io.IOUtils;
  7. public class PropertyUtils {
  8. private PropertyUtils(){
  9. }
  10. public static Properties load(File file) throws IOException{
  11. InputStream in = null;
  12. try {
  13. in = new FileInputStream(file);
  14. Properties props = new Properties();
  15. props.load(in);
  16. return props;
  17. }finally{
  18. IOUtils.closeQuietly(in);
  19. }
  20. }
  21. public static Properties load(String path) throws IOException{
  22. InputStream in = null;
  23. try {
  24. in = PropertyUtils.class.getClassLoader().getResourceAsStream(path);
  25. Properties props = new Properties();
  26. props.load(in);
  27. return props;
  28. }finally{
  29. IOUtils.closeQuietly(in);
  30. }
  31. }
  32. }

一个简单的发送接收demo就完成了。希望能给大家带来帮助。下节说一下kafka发送对象数据,自定义序列化和反序列化。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/一键难忘520/article/detail/828945
推荐阅读
相关标签
  

闽ICP备14008679号