当前位置:   article > 正文

kafka应用及运行原理_华为kafka recordmetadata

华为kafka recordmetadata

一、集群启动脚本

       1、集群环境:192.168.2.131 (主机名master),192.168.2.130(主机名slave1)

  1. #!/bin/bash
  2. zkPath=/home/master/zookeeper-3.4.14/bin
  3. kafkaPath=/home/master/kafka_2.11-2.2.1
  4. function sshStartZk(){
  5. echo "登陆主机"$1
  6. ssh root@$1 << 'EOF'
  7. cd /home/master/zookeeper-3.4.14/bin
  8. ./zkServer.sh stop
  9. ./zkServer.sh start
  10. echo $1"启动->zookeeper成功"
  11. exit
  12. EOF
  13. }
  14. function sshStartKafka(){
  15. echo "登陆主机"$1
  16. ssh root@$1 << 'EOF'
  17. cd /home/master/kafka_2.11-2.2.1/bin
  18. ./kafka-server-stop.sh
  19. ./kafka-server-start.sh ../config/server.properties
  20. echo "启动->kafka成功"$1
  21. exit
  22. EOF
  23. }
  24. echo "开始启动zk"
  25. sshStartZk "master";
  26. sshStartZk "slave1";
  27. echo "启动zk完成"
  28. echo "开始启动kafka"
  29. sshStartKafka "master";
  30. sshStartKafka "slave1";
  31. echo "启动kafka完成"

二、java的api操作kafka

        1、加入maven依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.12</artifactId>
  4. <version>2.2.1</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka-clients</artifactId>
  9. <version>2.2.1</version>
  10. </dependency>

2、调用工具类

  1. public class KafkaConnectUtils {
  2. private final static Logger LOGGER = LoggerFactory.getLogger(KafkaConnectUtils.class);
  3. /**
  4. * 创建生产者
  5. *
  6. * @return
  7. */
  8. public static KafkaProducer<String, Object> createProducer() {
  9. Properties props = new Properties();
  10. // Kafka服务端的主机名和端口号
  11. props.put("bootstrap.servers", "master:9092");
  12. // 等待所有副本节点的应答
  13. props.put("acks", "all");
  14. // 消息发送最大尝试次数
  15. props.put("retries", 0);
  16. // 一批消息处理大小
  17. props.put("batch.size", 16384);
  18. // 增加服务端请求延时
  19. props.put("linger.ms", 1);
  20. // 发送缓存区内存大小 32m
  21. props.put("buffer.memory", 33554432);
  22. // key序列化
  23. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. // value序列化
  25. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  26. return new KafkaProducer<>(props);
  27. }
  28. /**
  29. * 发送消息
  30. */
  31. public static void kafkaSendMsg(String topic, String key, Object value) {
  32. KafkaProducer<String, Object> producer = KafkaConnectUtils.createProducer();
  33. ProducerRecord<String, Object> record = new ProducerRecord<>(topic, key, value);
  34. producer.send(record, (recordMetadata, ex) -> {
  35. if (recordMetadata == null) {
  36. LOGGER.info("kafkaSendMsg->发送消息失败:" + ex.toString());
  37. } else {
  38. LOGGER.info("kafkaSendMsg->发送消息成功:topic=" + recordMetadata.topic()
  39. + ",key=" +recordMetadata.serializedKeySize()+",value="+recordMetadata.serializedValueSize());
  40. }
  41. });
  42. producer.close();
  43. }
  44. /**
  45. * 创建消费者
  46. *
  47. * @param
  48. * @return
  49. */
  50. public static KafkaConsumer<String, Object> createKafkaConsumer() {
  51. Properties props = new Properties();
  52. // 定义kakfa 服务的地址,不需要将所有broker指定上
  53. props.put("bootstrap.servers", "master:9092");
  54. //制定consumer group
  55. props.put("group.id", "AppGroup");
  56. // 是否自动确认offset
  57. props.put("enable.auto.commit", "true");
  58. // 自动确认offset的时间间隔
  59. props.put("auto.commit.interval.ms", "1000");
  60. // key的序列化类
  61. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  62. // value的序列化类
  63. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  64. return new KafkaConsumer<>(props);
  65. }
  66. /**
  67. * 拉取消费信息
  68. */
  69. public static void pullKafkaMsg(List<String> topics) {
  70. KafkaConsumer<String, Object> kafkaConsumer = createKafkaConsumer();
  71. ///订阅多个主题topic
  72. kafkaConsumer.subscribe(topics);
  73. ConsumerRecords<String, Object> consumerRecord = kafkaConsumer.poll(Duration.ofSeconds(10));
  74. for (ConsumerRecord<String, Object> next : consumerRecord) {
  75. LOGGER.info("消息消费{}topic=" + next.topic() + ",key=" + next.key()+",value="+next.value());
  76. }
  77. }
  78. }

3、测试调用

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class AppTest {
  4. @Test
  5. public void kafkaSendMsg(){
  6. KafkaConnectUtils.kafkaSendMsg("testdemo","liucui","刘翠");
  7. KafkaConnectUtils.kafkaSendMsg("testdemo","liuping","刘萍");
  8. }
  9. @Test
  10. public void kafkaPullMsg(){
  11. List<String> list=new ArrayList<>();
  12. list.add("testdemo");
  13. KafkaConnectUtils.pullKafkaMsg(list);
  14. }
  15. }

三、关于kafka的角色及特性

       1、zookeeper:用于协调broker集群的,保存broker的元数据(与rokcetmq的nameserver作用类 似,用于存储broker的信息),选举broker的controller.

       2、 broker : mq队列存储的服务实体

       3、topic: 主题是mq在broker上的逻辑分区

       4、partition物理分区,一个topic可以分成若干个分区,一般与broker集群的个数一致partition使用主从复制,leader负责读写数据,follower并作为备份副本repilca, partition的leader节点选举是从ISR集中产生的,不是通过zookeeper实现的

       5、AR   、ISR 、OSR   用于描述主从节点复制状态的

    (1) AR:  all  replicas                 (所有副本)

    (2)ISR: in sync replicas        (与leader节点同步的副本集)

    (3)OSR: out  sync replicas    (与leader节点同步不一致的副本集合)

       6、生产者写消息的ACK机制

     (1) 0  不管消息写入内存落盘是否成功直接返回发送成功

     (2)1    leader写入消息落盘成功时返回成功

     (3)-1  leader和isr都写入成功后返回

       7、 消费组再均衡Rebalance

        为了保证大体上partition和consumer的均衡性,提高topic的并发消费能力,就有了Rebalance的协议,规定ConsumerGroup下的所有consumer如何达成一致,来分配topic下的分区。Rebalance触发的条件:

    (1)有新的Consumer加入 

    (2)有consumer挂了   

     (3)有新的topic增加   

    (4)有consumer取消对topic的订阅

      8、顺序写和零拷贝

     (1)磁盘顺序写

       磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大 量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断 追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了 显著提升。

       (2)零拷贝

          网络编程传统的文件传输模型及问题

          

         首先要明确一个概念上下文切换,所谓的上上下文切换是从用户态切换至内核态,等待内核完成任务后再从内核态切换回用户态。 从上图可以看出,一次文件拷贝发生了四次用户态和内核态的上下文切换,以及四次数据拷贝,也就是在这个地方产生了大量不必要的损耗。可以很清楚地看到数据拷贝地过程, 第一次拷贝将磁盘中的数据拷贝到内核的缓冲区中,第二次拷贝内核将数据处理完,接着拷贝到用户缓冲区中,第三次拷贝此时需要通过socket将数据发送出去,将用户缓冲区中的数据拷贝至内核中socket的缓冲区中, 第四次拷贝:把内核中socket缓冲区的数据拷贝到网卡的缓冲区中,通过网卡将数据发送出去。所以要想优化传输性能,就要从减少数据拷贝和用户态内核态的上下文切换下手,这也就是零拷贝技术的由来。

         零拷贝就是一种避免CPU将数据从一块存储拷贝到另外一块存储的技术。linux操作系统“零拷贝”机制使用了 sendfile方法,允许操作系统将数据从 Page Cache直接发送到网络,只需要最后一步的copy操作将数据复制到NIC缓冲 区,这样避免重新复制数据.

          Linux系统中常用的零拷贝技术有sendfile、mamp等

         sendfile的作用是直接在两个文件描述符之间传递数据。由于整个操作完全在内核中(直接从内核缓冲区拷贝到socket缓冲区),从而避免了内核缓冲区和用户缓冲区之间的数据拷贝,是专门为了在网络上传输文件而实现的函数。

        mmap用于申请一段内存空间,也就是我们在进程间通信中提到过的共享内存,通过将内核缓冲区的数据映射到用户空间中,两者通过共享缓冲区直接访问统一资源,此时内核与用户空间就不需要再进行任何的数据拷贝操作.

    

    

       

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/428610
推荐阅读
相关标签
  

闽ICP备14008679号