当前位置:   article > 正文

kafka使用_kafuka使用

kafuka使用

好久没有写博客了,现在在复习一下kafka的使用,于是就出现了这样一篇博客,进行一个系统的概述,方便自己的同时也可以给新学习kafka的同学提供帮助,可能写的不够好,请大牛批评指正!
1.kafka是一个处理流的系统,而不是一个消息队列,但是在使用中一般将他作为一个消息队列来使用
2.kafka是基于zookeep的一个流程系统,也就是说在使用kafka的时候,需要安装zookeep和jdk,至于zookeep和jdk的安装环境,这里不做过多的介绍
3.安装kafka:(需要先安装好zookeep和jdk,并且操作kafka的时候需要启动zookeep)
a.将下载过来的tar包放入Linux系统中
b.通过解压命令,将kafka解压到指定位置
c.解压完成后,修改kafka中的server.properities文件
d.修改内容
1.修改监听用的listeners,将其注释放开,配置上本地ip
2.修改advertised.listeners放开,配置上本地ip
3.修改log.dirs的存储路径,这个kafka的日志信息位置,可修改也可以不修改,但是在项目中会进行修改,因为存储空间比较大,如果不修改的话可能存不下来
4.修改zookeep的配置信息,默认是本地的zookeep信息,端口是2181,超时时间是6000,可修改成指定ip的zookeep位置,并且可以配置集群
4.kafka的基本命令在这里插入图片描述
1.启动:
bin/kafka-server-start.sh config/server.properties &
2.停止:
bin/kafka-server-stop.sh
3.创建topic
bin/kafka-topics.sh --create --zookeeper loclhost:2182 --replicaltion-factor 1 --partiitions 1 --topic jiangzh -topic
4.查看已经创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.发送消息
bin/kafka-comsole-producer.sh --broker-list 192.168.1.111:9092 --topic jiangzh-topic
6.接收消息
bin/kafka-console-consumer.sh --boostarta-server 192.168.1.111:9092 --topic jiangzh-topic --from-beginning
5.kafka的几个基本概念
1.topic 一个虚拟的概念,由一个到多个partitions组成
2.prititon 实际消息存储单位
3. producer 消息的生产者
4. consumer 消息的消费者
5. kafka的通信就如一个三角形,生产者通过topic将消息放入第三方中,也就是这里的partition中,进行存储,而消费者就通过指定的topic去找对应的topic进行获取到partitions中的每一个partition,进行解析数据,消费生产者提供的消息
6. kafka在java客户端的操作
在这里插入图片描述
kafka在默认的情况下就是一个集群,一个节点也是,也就是说没有单机的情况
这里体现了kafka的四大API,producers,consumers,connectors,stream processors。在通常情况下,使用的producers和consumer,在涉及大数据的时候会使用到stream processors,最后一个就是adminapi是处理kafka管理使用的,叫做AdminClinet Api
在这里插入图片描述
在这里插入图片描述
7. AdminClientApi的使用
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
查询配置文件
在这里插入图片描述
修改配置文件信息
在这里插入图片描述

    public static void alterConfig(){
        AdminClient adminClient = adminClint();
//        这是新的,但是对于部分的kakfka版本配置会存在些许问题,使用起来是和下面的一样的(2.2或者2.3以上)
        Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
        ConfigEntry configEntry = new ConfigEntry("preallocat","true");
        configs.put(configResource,Arrays.asList(new AlterConfigOp(configEntry,AlterConfigOp.OpType.SET)));
        adminClient.incrementalAlterConfigs(configs);
        /*书写config配置文件*/
//        Map<ConfigResource, Config> configs = new HashMap<>();
//        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
//        Config config = new Config(Arrays.asList(new ConfigEntry("preallocat","true")));
//        configs.put(configResource,config);
//        adminClient.alterConfigs(configs);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

修改partition(增加partition数量 )
在这里插入图片描述
8. kafka客户端Producer API操作
producer发送的三种模式
同步发送
异步发送
异步回调发送
三种方式实现代码

    public static void main(String[] args) {
        sendKafka();
    }
    /*异步发送消息*/
    private static void sendKafka(){
        Properties properties = new Properties();
        /*配置kafka服务ip*/
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> msg = new ProducerRecord<>("topic","","");

        producer.send(msg);
        producer.close();

    }
    /*同步发送*/
    private static void sendKafkaSyn() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        /*配置kafka服务ip*/
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> msg = new ProducerRecord<>("topic","","");
       /*通过get方法阻塞消息,实现同步发送*/
        Future<RecordMetadata> send =  producer.send(msg);
        RecordMetadata recordMetadata = send.get();
        System.out.println(recordMetadata.offset()+recordMetadata.partition());
        producer.close();
    }
    /*异步回调发送*/
    private static void sendKafkaOnWith() throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        /*配置kafka服务ip*/
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123");
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.RETRIES_CONFIG,"0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        Producer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> msg = new ProducerRecord<>("topic","","");
        /*通过get方法阻塞消息,实现同步发送*/
        Future<RecordMetadata> send =  producer.send(msg, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                System.out.println(recordMetadata.offset()+recordMetadata.partition());
                producer.close();
            }
        });

    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67

producer的加载过程
在这里插入图片描述
通过源码分析,可以总结出,producer是线程安全的,并且他不是一条一条的发送,是在一定时间内,批量发送多少条数据的

对于producer的send方法来说,就进行两部分操作,创建一个消息发送的批次和向批次中追加消息,通过计算分区判断消息具体进入那个partition,其次进行计算批次,通过accumulator.append方法加入批次
消息发送流程
在这里插入图片描述
在这里插入图片描述
我们还可以设置kafka的负载均衡,通过实现partition类,重写里面的partition方法,通过方法的值不一样进行负载发送
在这里插入图片描述
kafka为我们提供了三种的消息传递保障机制
最多一次(消费者消费0-1次)
最少一次(消费者最少消费1次)
正好一次(消费者有且只有一次消费)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/902740
推荐阅读
相关标签
  

闽ICP备14008679号