当前位置:   article > 正文

Kafka从入门到高级(一篇完结)_message-send-max-retries

message-send-max-retries

一、Kafka的概述

1.1 消息队列的简介

1.1.1 为什么要有消息队列

1. 当服务器在一次性处理大量的数据时,可能会造成服务器瘫痪,或者数据丢失这种情况
2. 如果将要处理的数据先维护在一个缓存系统中,然后再慢慢的处理,这样就可以避免上述情况。
3. 缓存系统就是用来存储消息的,该系统中至少要维护一个用来存储消息的先后顺序的队列(数据结构)
4. 为什么要使用队列(Queue,Deque,LinkedList),而不是其他的数据结构(Arraylist),因为消息要进行频繁的生产和消费(增删操作)
  • 1
  • 2
  • 3
  • 4

1.1.2 什么是消息队列

消息,就是指的网络中传输的数据,比如行为日志,文本、视频,音频、图片
队列,用来存储消息的容器,该容器是一个首尾相接的环形队列,规定的是FIFO

消息队列就是两者的结合,以及提供了各种API,和底层优化设计的应用程序(框架)
  • 1
  • 2
  • 3
  • 4

1.1.3 消息队列的分类

主要分为两大类,一类是点对点模式,一类是发布/订阅模式

1)点对点模式

1. 可以叫  peer-to-peer  ,也可以叫point-to-point
2. 角色分为:消息队列(Queue)、发送者(Sender)、接收者(Receiver)
3. 发送者发送消息到队列中,该消息只能被一个接收者所接受,即使有多个接收者同时侦听到了这一条消息。
4. 该消息一旦被消费,则不存储在消息队列中,比如打电话
5. 支持异步/同步操作
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述

2)发布/订阅模式

1. 叫 pub/sub模式
2. 角色分为: 
	-- 消息队列(Queue),
	-- 发布者(Publisher、也叫producer)、
	-- 订阅者(Subscriber,也叫consumer),
	-- 主题(Topic)     用来将消息进行逻辑分类的
3. 一个消息可以被多个消费者消费,互不影响,比如我发布一个微博:关注我的人都能够看到。
4. 消费者在消费数据时,
	--可以是push模式(消息队列主动将信息push给消费者),
	--也可以使pull模式(消费者主动拉取消息对列中的消息),该模式的优点,消费者可以在自己处理消息的能力范围内,进行消费数据,
5. 支持异步/同步操作
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

在这里插入图片描述

1.1.4 消息队列的应用场景

1. 解耦:  消息系统可以作为中间件,  下游的软件和上游的软件无需了解彼此
2. 冗余:  消息可以持久化到磁盘上,或者做备份处理,避免数据丢失
3. 扩展:  消息队列提供了统计的生产者/消费者接口,
          任何软件都可以调用生产者接口API,作为生产者
          软件软件都可以调用消费者接口API,作为消费者
4. 销(削)峰能力:避免流量高峰期造成的系统瘫痪, 消息队列可以缓存这一时期的数据,慢慢处理
5. 可靠性:  消息队列中部分数据丢失,是有副本策略的,可以恢复数据
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

1.1.5 常见的消息队列系统

RabbitMQ
Redis   :  本身是一个KV形式的NoSql数据,但是也有消息队列功能
ZeroMQ
ActiveMQ JMS
Kafka/Jafka : 高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理
MetaQ/RocketMQ
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

1.2 Kafka的简介

1.2.1 Kafka的官网介绍

apache官网:https://kafka.apache.org/documentation/#quickstart
中文官网:https://kafka.apachecn.org/
  • 1
  • 2

1.2.2 Kafka是什么

1. 是一个分布式、用于处理消息的发布/订阅消息系统
2. 是用scala语言编写的(scala编写另外一门比较火的框架是spark)
3. 具有以下特点:
	-- 高吞吐量: 可以满足每秒百万级别消息的生产和消费——生产消费。
	-- 持久化(保存在磁盘上一定时间,默认7天,区别于永久性)
	-- 分布式:基于分布式的扩展和容错机制;Kafka的数据都会复制到几台服务器上。当某一台故障失效时,生产者和消费者转而使用其它的机器——整体
	-- 健壮性:稳定,API接口的通用
4. 同时适应在线流处理和离线批处理
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

二、Kafka的架构与安装

2.1 Kafka的组织架构

2.1.1 架构图

在这里插入图片描述

2.1.2 Kafka架构中的核心概念

1)架构中的角色
--borker: kafka集群中各个节点的名称, 值得注意的是,每个broker都有一个唯一标识符,broker间的标识符不能重复。
--producer: 就是生产消息到Kafka集群的应用程序
--consumer: 就是从Kafka集群中消费消息的应用程序
--consumer-group:为了方便管理消费者,比如设置消费者的一些配置属性,引入了消费者组的概念,进而统一设置。
--zookeeper:作为协调Kafka集群工作的角色,比如多个broker谁是controller,broker的动态上下线。分区的副本冗余策略,leader的选举
  • 1
  • 2
  • 3
  • 4
  • 5
2)工作机制中的概念
--message:  生产者产生的、消费者消费的数据,就是message,也可以称之为event。
--topic:    用于将message进行逻辑划分,即划分成不同的主题。比如有一些消息是关于美食的,一些是关于旅游的,一些是关于宠物等
--partition:是kafka集群中真正缓存数据的地方
			    1)本质是目录。
			    2)并发消费:每个主题可以有多个分区(分区多,消费者就可以并发处理消息)
			    3)可靠性:每个分区可以有多个副本,同一个分区的多个副本不能在同一个节点上。
--leader:  同一个分区的多个副本中,要有一个leader角色。
				 1)生产者向leader生产消息,
				 2)消费者从leader上消费消息,
				 3)follower从leader上同步消息
--follower: 同一个分区的多个副本中,除了leader角色,剩下的都是follower角色
             作用:就是提高消息的安全可靠性,副本冗余
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

2.2 KAFKA的安装

2.2.1 安装步骤

步骤1)上传、解压、更名、配置环境变量

[root@qianfeng01 ~]# tar -zxvf kafka_2.11-1.1.1.tgz -C /usr/local/
[root@qianfeng01 ~]# cd /usr/local/
[root@qianfeng01 ~]# mv kafka_2.11-1.1.1/ kafka
[root@qianfeng01 ~]# vim /etc/profile
.....省略....
#kafka environment
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin

[root@qianfeng01 ~]# source /etc/profile
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

步骤2)修改server.properties

[root@qianfeng01 kafka]# cd config
[root@qianfeng01 config]# vim server.properties

# 每个节点的唯一标识符的配置
broker.id=0
# 设置消息的存储位置,如果有多个目录,可以用逗号隔开
log.dirs=/usr/local/kafka/data
# 设置zookeeper的集群地址,同时指定kafka在zookeeper上的各个节点的父znode
zookeeper.connect=qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka


# 下面属性可以改可不改
# 发送消息的缓存大小,100K
socket.send.buffer.bytes=102400
# 接收消息的缓存大小,100K
socket.receive.buffer.bytes=102400
# 服务端处理发送过来的数据的最大字节数 100M
socket.request.max.bytes=104857600
# 消息对应的文件保留的时间,默认使7天
log.retention.hours=168
# 消息对应的文件的最大字节数,1G
log.segment.bytes=1073741824
# 用来检查消息对应的文件是否过期或者是大于1G的时间周期,默认是300秒一检查
log.retention.check.interval.ms=300000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

步骤3)同步到其他节点上

[root@qianfeng01 local]# scp -r kafka/ qianfeng02:/usr/local/
[root@qianfeng01 local]# scp -r kafka/ qianfeng03:/usr/local/

[root@qianfeng01 local]# scp /etc/profile qianfeng02:/etc/
[root@qianfeng01 local]# scp /etc/profile qianfeng03:/etc/
  • 1
  • 2
  • 3
  • 4
  • 5

步骤4)修改其他节点上的brokerId

qianfeng02的broker.id   为  1
qianfeng03的broker.id   为  2
  • 1
  • 2

2.2.2 启动操作

步骤1)先启动zookeeper      
	可以使用myzkServer.sh脚本,同时启动三台机器的zookeeper
步骤2)启动三台机器的kafka
      方式1: 正常启动, 注意,带上配置文件,进行后台启动
           kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
           关闭:
           kafka-server-stop.sh -daemon /usr/local/kafka/config/server.properties
      方式2: 自己编写一个启动三台机器kafka的脚本
            比如  mykafka.sh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2.3 查看Zookeeper维护的数据

kafka在zookeeper上维护了多个znode节点,分别用于存储不同的信息

--cluster/id  :  用于存储kafka集群的唯一标识
   {
   "version":"1","id":"mO77ods8Q-ek99Y7lTOwUg"}
--controller  :  用于记录多个broker中谁是控制角色
   {
   "version":1,"brokerid":0,"timestamp":"1644464244261"}
--controller_epoch  :  记录的是第几次选举controller角色
   3
--brokers/ids:  以子znode的形式记录所有的broker唯一标识符
              brokers/ids/0 : 记录着自己的信息
              {
   ..."endpoints":["PLAINTEXT://qianfeng01:9092"],"host":"qianfeng01","port":9092}
              brokers/ids/1 : 记录着自己的信息
              {
   ..."endpoints":["PLAINTEXT://qianfeng02:9092"],"host":"qianfeng02","port":9092}
              brokers/ids/2 : 记录着自己的信息
              {
   ..."endpoints":["PLAINTEXT://qianfeng03:9092"],"host":"qianfeng03","port":9092}
--brokers/topics : 以子znode的形式记录kafka集群中的所有主题名
--consumers :  旧版本用来记录消费者消费消息的偏移量,以便下次继续消费,但是新版本不用该znode,
					而是以一个主题"__consumer_offsets"来记录各个消费者的偏移量             
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

三、Kafka的基本操作

3.1 topic的CRUD

3.1.1 帮助信息

对应的脚本是:kafka-topics.sh, 用法直接输入脚本名称回车

[root@qianfeng01 data]# kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description
------                                   -----------
--alter                                  修改一个主题的分区数量,副本,以及配置等
--config <String: name=value>            修改主题的一个配置。
--create                                 创建一个新的主题
--delete                                 删除一个主题
--delete-config <String: name>           移除一个配置
--describe                               列出指定的主题的详情信息
--disable-rack-aware                     禁用机架感知副本分配
--force                                  抑制控制台提示
--help                                   打印帮助信息
--if-exists                              如果在更改或删除主题时设置该操作,则该操作只会在主题存在时执行
--if-not-exists                          如果在创建主题时设置,则只在主题不存在时执行该操作
--list                                   列出所有的主题名称
--partitions <number>  						  `必需属性`,创建或者修改时的分区数量
--replica-assignment <.....>             正在创建或更改的主题的手动分区到代理分配列表。
--replication-factor <Integer>           `必需属性`,创建主题时的副本因子
--topic <String: topic>                  要创建,修改或者描述的主题名称
--topics-with-overrides                  如果在描述主题时设置,则只显示覆盖了配置的主题
--unavailable-partitions                 如果在描述主题时设置,只显示leader不可用的分区
--under-replicated-partitions            如果在描述主题时设置,则只显示除了leader的副本
--zookeeper <String: hosts>              `必需属性`,用于zookeeper连接的连接字符串,格式为host:port。可以指定多个主机以允许故障转移。                                   
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

3.1.2 创建主题

案例1:
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--create \
--topic food \
--partitions 4 \
--replication-factor 2

案例2:
[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--create \
--topic pet \
--partitions 2 \
--replication-factor 3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

小贴士:

1)  副本因子不能大于broker的数量, 否则报以下错误:
ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException Replication factor: 4 larger than available brokers: 3

2) zookeeper的路径,必需写到server.properties里指定的kafka的根znode.
3) 创建时,必需指定分区数量,副本因子,zookeeper路径
  • 1
  • 2
  • 3
  • 4
  • 5

3.1.3 列出所有的主题

[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--list
  • 1
  • 2
  • 3

原理:

列出所有主题名称的逻辑:其实就是访问zookeeper的/kafka/brokers/topics/ 子节点的名字
  • 1

3.1.4 查看指定主题

[root@qianfeng01 data]# kafka-topics.sh \
--zookeeper qianfeng01:2181,qianfeng02:2181,qianfeng03:2181/kafka \
--describe \
--topic food 
  • 1
  • 2
  • 3
  • 4

获取的描述信息如下:

Topic:food   	PartitionCount:4        ReplicationFactor:2     
Configs:
        Topic: food     Partition: 0    Leader: 0       Replicas: 0,1   Isr: 0,1
        Topic: food     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: food     Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: food     Partition: 3    Leader: 0       Replicas: 0,2   Isr: 0,2

可以获取四个信息:
Topic: 主题名
PartitionCount: 该主题的分区数量
ReplicationFactor: 该主题的每个分区的副本因子
Configs: 列出的是每个分区的详情信息
         
摘抄一条解析分区的详情信息: 
Topic: food     Partition: 0    Leader: 0                       Replicas: 0,1            Isr: 0,1
     ^             ^                 ^                               ^                    ^
     |             |                 |                               |                    |
 主题名称        分区号码        该分区所有副本的leader的所在broker    副本所在的broker    当前可用的副本的broker位置
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3.1.5 修改主题

[root@qianfeng01 ~]# kafka-topics.sh \
--zookeeper qianfeng01,qianfeng02,qianfeng03/kafka \
--alter \
--topic food \
--partitions 5 
  • 1
  • 2
  • 3
  • 4
  • 5

小贴士

1. 修改主题的分区时,数量只能增加,不能减少
2. 副本因子不能被修改
  • 1
  • 2

3.1.6 删除主题

[root@qianfeng01 ~]# kafka-topics.sh \
--zookeeper qianfeng01,qianfeng02,qianfeng03/kafka \
--delete \
--topic pet
  • 1
  • 2
  • 3
  • 4

删除的原理:

1. 将zookeeper里对应的znode删除
2. 将kafka的存储目录下的该主题的分区目录打标记,过一会,再删除.
  • 1
  • 2

3.2 生产和消费消息

3.2.1 生产者生产消息

1)帮助信息

使用 kafka-console-producer.sh脚本,回车,即可显示帮助信息。

[root@qianfeng01 data]# kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option                                   Description
------                                   -----------
--batch-size <Integer: size>             如果没有同步发送消息,则在单个批处理中发送的消息数。(默认:200)
--broker-list <String: broker-list>      `REQUIRED`: 形式为HOST1:PORT1,HOST2:PORT2的代理列表字符串。
--compression-codec [compression-codec]    'none','gzip', 'snappy', or 'lz4'.  默认值为'gzip'
--key-serializer <className>               用于序列化key的消息编码器实现的类名。(默认:kafka.serializer.DefaultEncoder)
--line-reader <String: reader_class>       用于从标准中读取行的类的类名。默认情况下,每行读取为单独的消息。
                                          (default: kafka.tools. ConsoleProducer$LineMessageReader)
--max-block-ms <Long>                     生产者在发送请求期间阻塞的最大时间 (default: 60000)
--max-memory-bytes <Long>                生产者用来缓冲等待发送到服务器的记录的总内存。(default: 33554432)
--max-partition-memory-bytes <Long>      分配给分区的缓冲区大小。当接收到小于这个大小的记录时,
														生产者将尝试乐观地将它们组合在一起,直到达到这个大小。
                                           (default: 16384)
--message-send-max-retries <Integer>       重试次数(default: 3)
--producer-property <String>              将用户定义的属性以key=value形式传递给生产者的机制。
--producer.config <String: config file>   引用生产者的配置文件
--property <String: prop>                  自定义属性
--queue-enqueuetimeout-ms <Integer:        消息排队超时时间 (default:2147483647)
--queue-size <Integer: queue_size>        如果设置并且生产者以异步模式运行,这将给出等待足够批处理
														大小的消息队列的最大数量。(default: 10000)
--request-required-acks <String:          生产者请求所需的ack   (default: 1)
--request-timeout-ms <Integer: request    生产者请求的ack超时。值必须是非负且非零 (default: 1500)
--retry-backoff-ms <Integer>             在每次重试之前,生产者会刷新相关主题的元数据。由于leader选举需要一些时间,
														这个属性指定生产者在刷新元数据之前等待的时间。 (default: 100)
--socket-buffer-size <Integer: size>     tcp协议的缓存大小。(default: 102400)
--sync                                   如果设置消息,发送到代理的请求是同步的,每次一个。
--timeout <Integer: timeout_ms>         如果设置并且生产者在异步模式下运行,这将给出消息队列等待足够大的批处理的最大时间。
														该值的单位是ms。(default: 1000)
--topic <String: topic>                  `REQUIRED`: 生产的消息去往的主题名称.
--value-serializer <String:              kafka的消息对应的value的序列化实现类
													 (default: kafka. serializer.DefaultEncoder)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
2)生产数据到food主题的分区里

该脚本的作用是读取控制台的数据,发送到kafka集群

[root@qianfeng01 data]# kafka-console-producer.sh \
--broker-list qianfeng01:9092,qianfeng02:9092,qianfeng03:9092 \
--topic food
  • 1
  • 2
  • 3

3.2.2 消费者消费消息

1)帮助信息

使用 kafka-console-consumer.sh脚本,回车,即可显示帮助信息。

    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/912211
    推荐阅读
    相关标签
      

    闽ICP备14008679号