赞
踩
好久没有写博客了,现在在复习一下kafka的使用,于是就出现了这样一篇博客,进行一个系统的概述,方便自己的同时也可以给新学习kafka的同学提供帮助,可能写的不够好,请大牛批评指正!
1.kafka是一个处理流的系统,而不是一个消息队列,但是在使用中一般将他作为一个消息队列来使用
2.kafka是基于zookeep的一个流程系统,也就是说在使用kafka的时候,需要安装zookeep和jdk,至于zookeep和jdk的安装环境,这里不做过多的介绍
3.安装kafka:(需要先安装好zookeep和jdk,并且操作kafka的时候需要启动zookeep)
a.将下载过来的tar包放入Linux系统中
b.通过解压命令,将kafka解压到指定位置
c.解压完成后,修改kafka中的server.properities文件
d.修改内容
1.修改监听用的listeners,将其注释放开,配置上本地ip
2.修改advertised.listeners放开,配置上本地ip
3.修改log.dirs的存储路径,这个kafka的日志信息位置,可修改也可以不修改,但是在项目中会进行修改,因为存储空间比较大,如果不修改的话可能存不下来
4.修改zookeep的配置信息,默认是本地的zookeep信息,端口是2181,超时时间是6000,可修改成指定ip的zookeep位置,并且可以配置集群
4.kafka的基本命令
1.启动:
bin/kafka-server-start.sh config/server.properties &
2.停止:
bin/kafka-server-stop.sh
3.创建topic
bin/kafka-topics.sh --create --zookeeper loclhost:2182 --replicaltion-factor 1 --partiitions 1 --topic jiangzh -topic
4.查看已经创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.发送消息
bin/kafka-comsole-producer.sh --broker-list 192.168.1.111:9092 --topic jiangzh-topic
6.接收消息
bin/kafka-console-consumer.sh --boostarta-server 192.168.1.111:9092 --topic jiangzh-topic --from-beginning
5.kafka的几个基本概念
1.topic 一个虚拟的概念,由一个到多个partitions组成
2.prititon 实际消息存储单位
3. producer 消息的生产者
4. consumer 消息的消费者
5. kafka的通信就如一个三角形,生产者通过topic将消息放入第三方中,也就是这里的partition中,进行存储,而消费者就通过指定的topic去找对应的topic进行获取到partitions中的每一个partition,进行解析数据,消费生产者提供的消息
6. kafka在java客户端的操作
kafka在默认的情况下就是一个集群,一个节点也是,也就是说没有单机的情况
这里体现了kafka的四大API,producers,consumers,connectors,stream processors。在通常情况下,使用的producers和consumer,在涉及大数据的时候会使用到stream processors,最后一个就是adminapi是处理kafka管理使用的,叫做AdminClinet Api
7. AdminClientApi的使用
查询配置文件
修改配置文件信息
public static void alterConfig(){
AdminClient adminClient = adminClint();
// 这是新的,但是对于部分的kakfka版本配置会存在些许问题,使用起来是和下面的一样的(2.2或者2.3以上)
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
ConfigEntry configEntry = new ConfigEntry("preallocat","true");
configs.put(configResource,Arrays.asList(new AlterConfigOp(configEntry,AlterConfigOp.OpType.SET)));
adminClient.incrementalAlterConfigs(configs);
/*书写config配置文件*/
// Map<ConfigResource, Config> configs = new HashMap<>();
// ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
// Config config = new Config(Arrays.asList(new ConfigEntry("preallocat","true")));
// configs.put(configResource,config);
// adminClient.alterConfigs(configs);
}
修改partition(增加partition数量 )
8. kafka客户端Producer API操作
producer发送的三种模式
同步发送
异步发送
异步回调发送
三种方式实现代码
public static void main(String[] args) { sendKafka(); } /*异步发送消息*/ private static void sendKafka(){ Properties properties = new Properties(); /*配置kafka服务ip*/ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,"0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> msg = new ProducerRecord<>("topic","",""); producer.send(msg); producer.close(); } /*同步发送*/ private static void sendKafkaSyn() throws ExecutionException, InterruptedException { Properties properties = new Properties(); /*配置kafka服务ip*/ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,"0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> msg = new ProducerRecord<>("topic","",""); /*通过get方法阻塞消息,实现同步发送*/ Future<RecordMetadata> send = producer.send(msg); RecordMetadata recordMetadata = send.get(); System.out.println(recordMetadata.offset()+recordMetadata.partition()); producer.close(); } /*异步回调发送*/ private static void sendKafkaOnWith() throws ExecutionException, InterruptedException { Properties properties = new Properties(); /*配置kafka服务ip*/ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"10.1.1.123"); properties.put(ProducerConfig.ACKS_CONFIG,"all"); properties.put(ProducerConfig.RETRIES_CONFIG,"0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG,"1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); Producer<String,String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> msg = new ProducerRecord<>("topic","",""); /*通过get方法阻塞消息,实现同步发送*/ Future<RecordMetadata> send = producer.send(msg, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.offset()+recordMetadata.partition()); producer.close(); } }); }
producer的加载过程
通过源码分析,可以总结出,producer是线程安全的,并且他不是一条一条的发送,是在一定时间内,批量发送多少条数据的
对于producer的send方法来说,就进行两部分操作,创建一个消息发送的批次和向批次中追加消息,通过计算分区判断消息具体进入那个partition,其次进行计算批次,通过accumulator.append方法加入批次
消息发送流程
我们还可以设置kafka的负载均衡,通过实现partition类,重写里面的partition方法,通过方法的值不一样进行负载发送
kafka为我们提供了三种的消息传递保障机制
最多一次(消费者消费0-1次)
最少一次(消费者最少消费1次)
正好一次(消费者有且只有一次消费)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。