当前位置:   article > 正文

大数据最佳实践-kafka

大数据最佳实践-kafka

目录

架构

在这里插入图片描述在这里插入图片描述

优缺点

优点:
支持多个生产者和消费者1
支持broker的横向拓展
副本集机制,实现数据冗余,保证数据不丢失
通过topic将数据进行分类
通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量
支持多种模式的消息
基于磁盘实现数据的持久化
高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟
一个消费者可以支持多种topic的消息
对CPU和内存的消耗比较小
对网络开销也比较小
支持跨数据中心的数据复制
支持镜像集群

缺点:
由于是批量发送,所以数据达不到真正的实时
对于mqtt协议不支持
不支持物联网传感数据直接接入
只能支持统一分区内消息有序,无法实现全局消息有序
监控不完善,需要安装插件
需要配合zookeeper进行元数据管理
会丢失数据,并且不支持事务
可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

硬件

磁盘存储

使用多块磁盘,并配置为 Kafka 专用的磁盘;
JBOD vs RAID10;
JBOD(Just a Bunch of Disks,简单来说它表示一个没有控制软件提供协调控制的磁盘集合,它将多个物理磁盘串联起来,提供一个巨大的逻辑磁盘,数据是按序存储,它的性能与单块磁盘类似)
JBOD 的一些缺陷:
任何磁盘的损坏都会导致异常关闭,并且需要较长的时间恢复;
数据不保证一致性;
多级目录;
社区也正在解决这么问题,可以关注 KIP 112、113:
必要的工具用于管理 JBOD;
自动化的分区管理;
磁盘损坏时,Broker 可以将 replicas 迁移到好的磁盘上;
在同一个 Broker 的磁盘间 reassign replicas;

RAID 10 的特点:
可以允许单磁盘的损坏;
性能和保护;
不同磁盘间的负载均衡;
高命中来减少 space;
单一的 mount point;
文件系统:
使用 EXT 或 XFS;
SSD;

kafka机器数

Kafka机器数量 =2*(峰值生产速度副本数/100)+1
(例如峰值生产速度是50M/s。副本数为2。
Kafka机器数量=2
(50*2/100)+ 1=3台)

确定topic分区数

创建一个只有1个分区的topic,然后测试这个topic的producer吞吐量和consumer吞吐量。假设它们的值分别是Tp和Tc,单位可以是MB/s。然后假设总的目标吞吐量是Tt,Partitions = max(NP, NC)=max(TT/TP,TT/TC)

每个broker的分区不应超过4000个,集群中的分区不应超过200000个。

如何确定Partition的副本数

Partition的副本数不要超过节点个数
Kafka中Topic的Partition的副本是为了提升数据的可靠性而存在的,同一个Partition的副本会分布在不同的节点,因此副本数不允许超过节点个数

同一个组的消费者的数量建议与待消费的Topic下的Partition数保持一致

若同一个组的消费者数量多于Topic的Partition数时,会有多余的消费者一直无法消费该Topic的消息,若消费者数量少于Topic的Partition数时,并发消费得不到完全体现,因此建议两者相等。

每个消费者消费几个分区

将分区按数字顺序排行序,消费者按消费者名称的字典序排好序
然后,用分区总数除以消费者总数。如果能够除尽,则皆大欢喜,平均分配;若除不尽,则位于排序前面的消费者将多负责一个分区

只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range。
partition.assignment.strategy: RangeAssignor、RoundRobinAssignor

Range策略

Range策略是对每个主题而言的,首先对同一个主题里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。在我们的例子里面,排完序的分区将会是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消费者线程排完序将会是C1-0, C2-0, C2-1。然后将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。

如果除不尽,那么前面几个消费者线程将会多消费一个分区
Range策略中一个很明显的短板就是对于多topic的情况,可能会存在某一个消费线程压力过大的问题,无法做到真正的均衡。

Roundrobin策略

同一个Consumer Group里面的所有消费者的num.streams必须相等;
每个消费者订阅的主题必须相同

所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序。在例子里面,加入按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:最后按照round-robin风格将分区分别分配给不同的消费者线程。

分区策略

如果有key
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
key为null时,从缓存中取分区id或者随机取一个

if(key == null) {
     // 如果没有指定key
    val id = sendPartitionPerTopicCache.get(topic)  // 先看看Kafka有没有缓存的现成的分区Id
    id match {
   
      case Some(partitionId) =>  
        partitionId  // 如果有的话直接使用这个分区Id就好了
      case None => // 如果没有的话,
        val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)  //找出所有可用分区的leader所在的broker
        if (availablePartitions.isEmpty)
          throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
        val index = Utils.abs(Random.nextInt) % availablePartitions.size  // 从中随机挑一个
        val partitionId = availablePartitions(index).partitionId
        sendPartitionPerTopicCache.put(topic, partitionId) // 更新缓存以备下一次直接使用
        partitionId
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

如何避免重复消费

Kafka 的事务机制结合 Flink 的两段提交协议实现端到端的仅一次语义

同一个 Consumer Group 的所有消费端实例,会按分区均匀分配。如果用户的消费端个数 N 可以整除 24(默认分区个数) 分区对消费端的分配就是均匀的

如果消费端个数 N 不能整除 24(默认分区个数),那么有的消费端处理的分区个数会比其他消费端大 1

Java 客户端设置回调是否会影响消息发送的速度

主要取决于您的回调里的处理是否耗时,另外跟 max.in.flight.requests.per.connection 这个参数的设置有关。

为了减少回调里的处理耗时,最好不要过于频繁地在回调里面做耗时的处理,可以积累一定量 Ack 后再做批量的回调处理,或者把处理放到异步的另一个线程去处理,不阻塞回调的完成。
在阻塞结束之前,最多能发的消息数由 max.in.flight.requests.per.connection 决定

producer-参数

Kafka使用异步发布/订阅模型。当生产者调用send()时,返回的结果是未来。future提供了一些方法,让您可以检查正在处理的信息的状态。当批处理就绪时,生产者将其发送给代理。Kafka代理等待事件,接收结果,然后响应事务已完成。

如果不使用future,可以只获取一条记录,等待结果,然后发送响应。延迟很低,但吞吐量也很低。如果每个事务需要5毫秒,则吞吐量为每秒200个事件—比预期的每秒100000个事件慢。

当你使用生产者.send(),则填充生产者上的缓冲区。当缓冲区已满时,生产者将缓冲区发送给Kafka代理并开始重新填充缓冲区。

acks
acks=0:表示producer不需要等待任何确认收到的信息,副本将立即加到socket buffer并认为已经发送。没有任何保障可以保证此种情况下server已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的offset会总是设置为-1。无需服务端的Response、性能较高、丢数据风险较大。
acks=1:这意味着至少要等待leader已经成功将数据写入本地log,但是并没有等待所有follower是否成功写入。如果follower没有成功备份数据,而此时leader又无法提供服务,则消息会丢失。服务端主节点写成功即返回Response、性能中等、丢数据风险中等、主节点宕机可能导致数据丢失。

acks=all:这意味着leader需要等待所有备份都成功写入日志,只有任何一个备份存活,数据都不会丢失。服务端主节点写成功且备节点同步成功才返回Response、性能较差、数据较为安全、主节点和备节点都宕机才会导致数据丢失。

一般建议选择 acks=1,重要的服务可以设置 acks=all

retries
结合实际业务调整
客户端发送消息的重试次数。值大于0时,这些数据发送失败后,客户端会重新发送。
注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。

request.timeout.ms
结合实际业务调整
设置一个请求最大等待时间,超过这个时间则会抛Timeout异常。
超时时间如果设置大一些,如120000(120秒),高并发的场景中,能减少发送失败的情况。

block.on.buffer.full
TRUE表示当我们内存用尽时,停止接收新消息记录或者抛出错误。
默认情况下,这个设置为TRUE。然而某些阻塞可能不值得期待,因此立即抛出错误更好。如果设置为false,则producer抛出一个异常错误:BufferExhaustedException

batch.size
批量大小以总字节(而不是消息数)度量批大小。它控制在向Kafka代理发送消息之前要收集的数据字节数。在不超过可用内存的情况下,将其设置为尽可能高的值。默认值为16384。如果您增加缓冲区的大小,它可能永远不会满。生产者最终会根据其他触发器发送信息,例如以毫秒为单位的延迟时间。虽然可以通过将缓冲区批处理大小设置得太高来降低内存使用率,但这不会影响延迟。

如果你的producer一直在发送,你可能会得到最好的吞吐量。如果生产者经常处于空闲状态,您可能没有编写足够的数据来保证当前的资源分配。

发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器 有助于提高吞吐
默认的批量处理消息字节数上限。producer将试图批处理消息记录,以减少请求次数。这将改善client与server之间的性能。不会试图处理大于这个字节数的消息字节数。

发送到brokers的请求将包含多个批量处理,其中会包含对每个partition的一个请求。

较小的批量处理数值比较少用,并且可能降低吞吐量(0则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特定批量处理数值的内存大小。

linger.ms
每条消息待在缓存中的最长时间。若超过这个时间,就会忽略 batch.size 的限制,然后客户端立即把消息发往服务器。
有助于控制延迟
设置在异步模式下缓冲数据的最长时间。例如,设置为100意味着它一次发送100毫秒的消息。这提高了吞吐量,但缓冲增加了消息传递延迟。
默认情况下,生产者不等待。它在数据可用时发送缓冲区。

Instead of sending immediately, you can set linger.ms to 5 and send more messages in one batch. This would reduce the number of requests sent, but would add up to 5 milliseconds of latency to records sent, even if the load on the system does not warrant the delay.

The farther away the broker is from the producer, the more overhead required to send messages. Increase linger.ms for higher latency and higher throughput in your producer.

buffer.memory
所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.size 和 linger.ms 的限制
producer可以用来缓存数据的内存大小。如果数据产生速度大于向broker发送的速度,producer会阻塞或者抛出异常,以“block.on.buffer.full”来表明。

这项设置将和producer能够使用的总内存相关,但并不是一个硬性的限制,因为不是producer使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些用于维护请求。

reties: 提交失败重试次数

max.in.flight.requests.per.connection
生产者受到服务器响应之前还可以发送多少消息,设置为1可以保证消息是顺序写入的(即使失败重试)

compression.type: snappy/gzip/lz4可选,推荐 少CPU,高性价比的snappy
得知数据的发送情况

设置回调

import org.apache.kafka.clients.producer.Callback
//定义回调函数
private class DemoProducerCallback extends Callback {
   
    @override
    def onCompletion(recordMetadata: RecordMetadata, e: Exception) {
   
        if (e != null) {
   
            e.printStackTrace()
        }
    }
}
//设置回调函数
producer.send(record, new DemoProducerCallback())
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

ISR

AR:Assigned Replicas 所有副本

ISR是由leader维护,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。

HW:High Watermark 高水位,取一个partition对应的ISR中最小的LEO作为HW,consumer最多只能消费到HW所在的位置上一条信息。
LEO:LogEndOffset 当前日志文件中下一条待写信息的offset

HW/LEO这两个都是指最后一条的下一条的位置而不是指最后一条的位置。

LSO:Last Stable Offset 对未完成的事务而言,LSO 的值等于事务中第一条消息的位置(firstUnstableOffset),对已完成的事务而言,它的值同 HW 相同

LW:Low Watermark 低水位, 代表 AR 集合中最小的 logStartOffset 值

如何确保消息有序

为了保证topic整个有序,那么将partition调整为1

消息生产过程

拦截器->序列化器->分区器

主线程负责创建消息,然后通过分区器、序列化器、拦截器作用之后缓存到累加器RecordAccumulator中。
Sender线程负责将RecordAccumulator中消息发送到kafka中.

拦截器

主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:
(1)configure(configs)
获取配置信息和初始化数据时调用。
(2)onSend(ProducerRecord):
该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
(3)onAcknowledgement(RecordMetadata, Exception):
该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率
(4)close:
关闭interceptor,主要用于执行一些资源清理工作
如前所述,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

public class TimeInterceptor implements ProducerInterceptor<String, String> {
   

	@Override
	public void configure(Map<String, ?> configs) {
   

	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
   
		// 创建一个新的record,把时间戳写入消息体的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
   

	}

	@Override
	public void close() {
   

	}
}
public class CounterInterceptor implements ProducerInterceptor<String, String>{
   
    private int errorCounter = 0;
    private int successCounter = 0;

	@Override
	public void configure(Map<String, ?> configs) {
   
		
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
   
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
   
		// 统计成功和失败的次数
        if (exception == null) {
   
            successCounter++;
        } else {
   
            errorCounter++;
        }
	}

	@Override
	public void close() {
   
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}
}

List<String> interceptors = new ArrayList<>();
		interceptors.add("com.alex.kafka.interceptor.TimeInterceptor"); 	interceptors.add("com.alex.kafka.interceptor.CounterInterceptor"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

重要消息

最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志

Brokers相关参数设置

操作系统相关的内核参数影响Kafka的整体性能。这些参数可以在运行时通过sysctl进行配置。要使内核配置更改持久化(即,重新启动后使用调整后的参数),请编辑/etc/系统控制组态.以下部分描述了一些重要的内核设置。

由于Kafka处理许多日志段文件和网络连接,因此在生产部署中,如果代理托管许多分区,则在某些情况下可能需要增加最大进程文件描述符设置。例如,Kafka代理至少需要以下数量的文件描述符来跟踪日志段文件:
(number of partitions)*(partition size / segment size)

代理需要额外的文件描述符,以便通过网络套接字与外部方(如客户机、其他代理、Zookeeper、Sentry和Kerberos)通信。
To review FD limit currently set for a running Kafka broker, run cat /proc/KAFKA_BROKER_PID/limits, and look for Max open files.
To see open file descriptors, run:
lsof -p KAFKA_BROKER_PID

文件系统

Linux records when a file was created (ctime), modified (mtime) and accessed (atime). The value noatime is a special mount option for filesystems (such as EXT4) in Linux that tells the kernel not to update inode information every time a file is accessed (that is, when it was last read). Using this option may result in write performance gain. Kafka is not relying on atime. The value relatime is another mounting option that optimizes how atime is persisted. Access time is only updated if the previous atime was earlier than the current modified time.
Linux记录文件创建(ctime)、修改(mtime)和访问(atime)的时间。noatime值是Linux中文件系统(如EXT4)的一个特殊装载选项,它告诉内核不要在每次访问文件时(即上次读取文件时)更新inode信息。使用此选项可能会提高写入性能。卡夫卡不依赖时间。值relatime是另一个优化atime持久化方式的装载选项。只有当上一个时间早于当前修改的时间时,访问时间才会更新。
只有当上一个时间早于当前修改的时间时,访问时间才会更新。
To view mounting options, run mount -l or cat /etc/fstab command.

虚拟内存处理
Kafka广泛使用系统页缓存来生成和使用消息。Linux内核参数,虚拟机交换,是一个0-100之间的值,用于控制将应用程序数据(作为匿名页)从物理内存交换到磁盘上的虚拟内存。值越高,从物理内存中交换出的非活动进程就越频繁。值越低,交换的文件就越少,从而强制清空文件系统缓冲区。它是Kafka的一个重要内核参数,因为分配给交换空间的内存越多,分配给页缓存的内存就越少。Cloudera建议设置虚拟机交换值为1

要检查已交换到磁盘的内存,请运行vmstat并查找交换列。

Kafka严重依赖于磁盘I/O性能。vm.u比率以及vm.dirty\u background\u比率是控制脏页刷新到磁盘的频率的内核参数。较高的vm.u比率导致对磁盘的刷新频率降低。

要显示系统中脏页的实际数量,请运行egrep“dirty | writeback”/proc/vmstat

网络参数
卡夫卡被设计用来处理大量的网络流量。默认情况下,Linux内核不适合这种情况。可能需要根据用例或特定Kafka工作负载调整以下内核设置:
net.core.wmem_default: Default send socket buffer size.
net.core.rmem_default: Default receive socket buffer size.
net.core.wmem_max: Maximum send socket buffer size.
net.core.rmem_max: Maximum receive socket buffer size.
net.ipv4.tcp_wmem: Memory reserved for TCP send buffers.
net.ipv4.tcp_rmem: Memory reserved for TCP receive buffers.
net.ipv4.tcp_window_scaling: TCP Window Scaling option.
net.ipv4.tcp_max_syn_backlog: Maximum number of outstanding TCP SYN requests (connection requests).
net.core.netdev_max_backlog: Maximum number of queued packets on the kernel input side (useful to deal with spike of network requests).

要指定参数,可以使用Cloudera企业参考体系结构作为指导。

配置JMX临时端口
Kafka为JMX使用两个高编号的临时端口。当您查看Kafka代理进程的netstat-anp信息时,会列出这些端口。

您可以通过向Cloudera Manager中的Additional Broker Java Options(Broker\u Java\u opts)字段添加类似于以下的命令来更改第一个端口的编号。

-Dcom.sun.management.jmxremote.rmi.port=port
JMXèu端口配置映射到com.sun.management公司.jmxremote.port端口默认情况下。
要通过JConsole访问JMX,请运行 jconsole ${BROKER_HOST}:9393
用于JMX通信的第二个临时端口是为JRMP协议实现的,不能更改。

用于JMX通信的第二个临时端口是为JRMP协议实现的,不能更改。

broker.id.generation.enable=true
broker.id
必填参数,broker的唯一标识
num.partitions
默认分区数配置
log.retention.ms
数据最多保留的时间,根据 业务与数据情况、可用磁盘容量 设置数据最多保留的时间期限
log.segment.bytes
数据片段的滚动大小,太小将会频繁写文件,太大将会影响日志过期
log.segment.ms
日志片段的过期时间
message.max.bytes
单条消息最大的大小,默认1M
replication.factor
副本数配置,副本数越多,可靠性越高、故障率越低,同时 使用的存储成本也也高、性能越差
min.insync.replicas
最少同步副本数配置,同步的副本数要求越多生产者的效率可能就越低,但是 整体的数据可靠性就越高。
unclean.leader.election
不完全选举(默认为true),开配置开启后 允许不同步的副本成为 Leader,这将导致 部分数据丢失 的后果,但是 提升了系统整体可用性。
严格要求数据一致性 的场景中,unclean.leader.election 是需要被关闭
log.cleaner.enable 配置,相同键保留最新值
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件)
log.dirs
/tmp/kafka-logs Kafka数据存放的目录。可以指定多个目录,中间用逗号分隔,当新partition被创建的时会被存放到当前存放partition最少的目录。
port
9092 BrokerServer接受客户端连接的端口号
zookeeper.connect
Zookeeper的连接串,格式为:hostname1:port1,hostname2:port2,hostname3:port3。可以填一个或多个,为了提高可靠性,建议都填上。注意,此配置允许我们指定一个zookeeper路径来存放此kafka集群的所有数据,为了与其他应用集群区分开,建议在此配置中指定本集群存放目录,格式为:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path 。需要注意的是,消费者的参数要和此参数一致。
delete.topic.enable
false 启用deletetopic参数,建议设置为true。

JVM和垃圾收集
垃圾收集对基于JVM的应用程序的性能有着巨大的影响。建议对Kafka代理使用垃圾优先(G1)垃圾收集器。在Cloudera Manager中,在Kafka服务配置的附加Broker Java选项下指定以下内容:

-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
-Djava.awt.headless=true -Djava.net.preferIPv4Stack=true

  • 1
  • 2
  • 3
  • 4

Cloudera建议根据您的用例为代理设置4-8gb的JVM堆大小内存。由于Kafka的性能在很大程度上取决于操作系统的页缓存,因此不建议将其与其他需要内存的应用程序并置。

当代理分配大块时,大消息可能会导致较长的垃圾收集(GC)暂停。监视GC日志和服务器日志。

将此添加到代理Java选项:

-XX:+PrintGC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-Xloggc:</path/to/file.txt>

如果长时间的GC暂停导致Kafka放弃ZooKeeper会话,则可能需要配置更长的超时值,有关详细信息,请参阅Kafka ZooKeeper性能调优。

网络和I/O线程

Kafka代理使用网络线程来处理客户端请求。传入的请求(如produce和fetch请求)被放入一个请求队列中,I/O线程从那里接收并处理它们。在处理请求之后,响应被放入一个内部响应队列中,网络线程从中获取响应并将响应发送回客户端。

网络线程数是一个重要的集群范围的设置,用于确定用于处理网络请求(即,接收请求和发送响应)的线程数。主要根据生产者、消费者和副本获取者的数量设置此值。

排队的最大请求数控制在阻塞网络线程之前请求队列中允许的请求数。

io线程数指定代理用于处理来自请求队列(可能包括磁盘I/O)的请求的线程数。

ISR管理

为主题分区设置的同步副本(ISR)包含所有跟随者副本,这些跟随者副本被引导分区捕获,并且位于活动的代理上。

如果复制副本落后于分区前导“太远”,它将从ISR集中删除。太远的定义由配置设置控制复制延迟时间.最大ms。如果跟随者至少在这段时间内没有发送任何获取请求,或者没有消耗到leaders日志结束偏移量,那么leader将从ISR集中删除跟随者。

num.replica.fetchers个是一个群集范围的配置设置,用于控制代理中的获取程序线程数。这些线程负责从源代理复制消息(即分区引线所在的位置)。增加此值将导致更高的I/O并行性和获取程序吞吐量。当然,有一个权衡:经纪人使用更多的CPU和网络。

replica.fetch.min复制.bytes控制从跟随副本获取的最小字节数。如果没有足够的字节,请等待到replica.fetch.wait文件.最大ms.

replica.fetch.wait文件.最大ms控制在从获取程序副本检查新消息之前睡眠的时间。此值应小于复制延迟时间.最大ms,否则复制副本将从ISR集中踢出。

要检查主题分区的ISR集,请运行以下命令:

kafka–zookeeper z o o k e e p e r H ˘ O S T N A M E : 2181 / k a f k a − − 描述 − − t o p i c {zookeeper\u HOSTNAME}:2181/kafka--描述--topic zookeeperH˘OSTNAME:2181/kafka描述topic{topic}

如果分区引线死亡,则从ISR集中选择一个新引线。不会有数据丢失。如果没有ISR,不干净的领导人选举可能会导致数据丢失。

不干净的领导人选举发生在不干净的领导选举.enable设置为true。默认情况下,该值设置为false。

Log Cleaner
As discussed in Record Management, the log cleaner implements log compaction. The following cluster-wide configuration settings can be used to fine tune log compaction:

log.cleaner.threads线程控制有多少后台线程负责日志压缩。增加此值会以增加I/O活动为代价提高日志压缩的性能。

日志.cleaner.io.每个最大字节数.second限制日志清理器的I/O活动,使其读写总和平均小于此值。

log.cleaner.dedupe删除.缓冲区大小指定用于跨所有清理线程进行日志压缩的内存。

日志.cleaner.io.缓冲区大小控制所有清理器线程中用于日志清理器I/O缓冲区的总内存。

log.cleaner.min最小值.压实滞后ms控制消息未压缩的时间长度。

日志.cleaner.io.缓冲负载系数控制重复数据消除缓冲区的日志清理器负载因子。增加此值允许系统一次清理更多日志,但会增加哈希冲突。

log.cleaner.backoff文件.ms控制在没有要压缩的日志时等待到下一次检查的时间。

num.replica.fetchers
群集范围内的配置设置,用于控制代理中有多少个提取程序线程。

replica.fetch.min.bytes

replica.fetch.wait.max.ms

unclean.leader.election.enable=false

Topics are divided into partitions. Each partition has a leader. Topics that are properly configured for reliability will consist of a leader partition and 2 or more follower partitions. When the leaders are not balanced properly, one might be overworked, compared to others.

Depending on your system and how critical your data is, you want to be sure that you have sufficient replication sets to preserve your data. For each topic, Cloudera recommends starting with one partition per physical storage disk and one consumer per partition.

日志清理

如记录管理中所述,日志清理器实现日志压缩。以下群集范围的配置设置可用于微调日志压缩:

1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824

log.cleaner.threads控制有多少后台线程负责日志压缩。增大此值将以增加I / O活动为代价提高日志压缩的性能。
log.cleaner.io.max.bytes.per.second 限制日志清除器的I / O活动,以便其读取和写入的总和平均小于该值。
log.cleaner.dedupe.buffer.size 指定用于所有清理程序线程之间的日志压缩的内存。
log.cleaner.io.buffer.size 控制所有清理程序线程中用于日志清理程序I / O缓冲区的总内存。
log.cleaner.min.compaction.lag.ms 控制不压缩邮件的时间。
log.cleaner.io.buffer.load.factor控制重复数据删除缓冲区的日志清理器的加载因子。增大该值可使系统立即清除更多日志,但会增加哈希冲突。
log.cleaner.backoff.ms 控制直到下一次检查是否没有日志要压缩的时间。

KAFKA_GC_LOG_OPTS="-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M

-Dcom.sun.management.jmxremote.rmi.port = 端口
jconsole $ {BROKER_HOST}:9393

组网参数

net.core.wmem_default默认发送套接字缓冲区大小。
net.core.rmem_default:默认接收套接字缓冲区大小。
net.core.wmem_max:最大发送套接字缓冲区大小。
net.core.rmem_max:最大接收套接字缓冲区大小。
net.ipv4.tcp_wmem:为TCP发送缓冲区保留的内存。
net.ipv4.tcp_rmem:为TCP接收缓冲区保留的内存。
net.ipv4.tcp_window_scaling:“ TCP窗口缩放”选项。
net.ipv4.tcp_max_syn_backlog:未完成的TCP SYN请求(连接请求)的最大数量。
net.core.netdev_max_backlog:内核输入端排队的数据包的最大数量(用于处理网络请求的峰值)。

consumer相关参数配置

消费者可以在管道的另一端产生吞吐量问题。主题的使用者组中使用者的最大数目等于分区的数目。您需要足够的分区来处理与生产者保持同步所需的所有消费者。

同一消费者组中的消费者在他们之间分割分区。向组中添加更多使用者可以提高性能(最多可增加分区数)。添加更多的消费群体不会影响性能。

每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset。

消息队列Kafka版Consumer会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为消费位点ConsumerOffset。

剩余的未消费的条数(也称为消息堆积量)=MaxOffset-ConsumerOffset。
消费实

例的个数不要大于分区的数量除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡
剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset

auto.commit.enable
如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程无法提供服务时,由新的consumer使用。
约束: 设置为false后,需要先成功消费再提交,这样可以避免消息丢失。

fetch.message.max.bytes
必须大于等于Producer客户端每次产生的消息最大字节数。如果参数的值太小,可能导致Producer产生的消息无法被Consumer成功消费。

auto.commit.interval.ms: 默认值为 1000,也即 1s。
如果距离当前时间已经超过参数auto.commit.interval.ms规定的时长,则客户端会启动位点提交动作
因此,如果将enable.auto.commit设置为true,则需要在每次poll数据时,确保前一次poll出来的数据已经消费完毕,否则可能导致位点跳跃。
如果想自己控制位点提交,请把enable.auto.commit设为false,并调用commit(offsets)函数自行控制位点提交。

connections.max.idle.ms
空连接的超时时间,设置为30000可以在网络异常场景下减少请求卡顿的时间。

auto.offset.reset
建议设置成latest,而不要设置成earliest,避免因位点非法时从头开始消费,从而造成大量重复。
如果是您自己管理位点,可以设置成none。
拉取大消息
消费过程是由客户端主动去服务端拉取消息的,在拉取大消息时,需要注意控制拉取速度,注意修改配置:

当服务端不存在曾经提交过的位点时(例如客户端第一次上线)。
当从非法位点拉取消息时(例如某个分区最大位点是10,但客户端却从11开始拉取消息)。
Java客户端可以通过auto.offset.reset来配置重置策略,主要有三种策略:
latest:从最大位点开始消费。
earliest:从最小位点开始消费。
none:不做任何操作,即不重置。

强烈建议设置成“latest”
如果是客户自己管理位点,可以设置成”none”
没有初始化offset或者offset被删除时,可以设置以下值:
earliest:自动复位offset为最早
latest:自动复位offset为最新
none:如果没有发现offset则向消费者抛出异常
anything else:向消费者抛出异常。

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

“max.poll.records”, 如果单条消息超过1MB,建议这里设置为1.
“fetch.max.bytes”, 设置比单条消息的大小略大一点.
建议设置成公网带宽的一半(注意该参数的单位是bytes,公网带宽的单位是bits)

“max.partition.fetch.bytes”, 设置比单条消息的大小略大一点。建议设置成fetch.max.bytes的三分之一或者四分之一。
拉取大消息的核心是逐条拉取的。

消息重复和消费幂等
消息队列Kafka版消费的语义是at least once, 也就是至少投递一次,保证消息不丢失,但是无法保证消息不重复。在出现网络问题、客户端重启时均有可能造成少量重复消息,此时应用消费端如果对消息重复比较敏感(例如订单交易类),则应该做消息幂等。

“max.partition.fetch.bytes”, 建议设置成fetch.max.bytes的三分之一或者四分之一。
fetch.min.bytes: 消费者从 Broker 最小拉取的字节数,调大可以降低负载、提高吞吐
session.timeout.ms: 消费者端发送心跳的超时时间(默认3s),调高可以 避免gc等情况下触发的rebalance

限流
在一写多读场景中, 如果某一个 Consumer 操作大量读磁盘, 会影响 Produce 级其他消费者操作的延迟。因此,通过 Kafka Quota 机制对 Consume 限流及支持动态调整阈值也是我们后续的方向
quota.producer.default
quota.producer.default = 10485760
quota.consumer.default = 10485760
/config /clients下的ZooKeeper

pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了避免这种情况,我们在我们的拉请求中有参数,允许消费者请求在等待数据到达的“长轮询”中进行阻塞(并且可选地等待到给定的字节数,以确保大的传输大小)。

消费堆积

消费端拿到消息后,执行消费逻辑,通常会执行一些远程调用,如果这个时候同步等待结果,则有可能造成一直等待,消费进程无法向前推进。

消费端应该竭力避免堵塞消费线程,如果存在等待调用结果的情况,建议设置等待的超时时间,超时后作为消费失败进行处理。

如何实现消息幂等

以数据库类应用为例,
发送消息时,传入 key 作为唯一流水号ID;
消费消息时,判断 key 是否已经消费过,如果已经消费过了,则忽略,如果没消费过,则消费一次;

实践中通常会打印失败的消息、或者存储到某个服务(比如创建一个 Topic 专门用来放失败的消息),然后定时 check 失败消息的情况,分析失败原因,根据情况处理。

  • 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
  • 比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
  • 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。
  • 比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据。

如何提高消费速度

Batch机制,消息队列Kafka版Producer端主要通过两个参数进行控制:
batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)。达到设置的数值时,就会触发一次网络请求,然后Producer客户端把消息批量发往服务器。
linger.ms : 每条消息在缓存中的最长时间。若超过这个时间,Producer客户端就会忽略batch.size的限制,立即把消息发往服务器。
因此,消息队列Kafka版Producer客户端什么时候把消息批量发送至服务器是由batch.size和linger.ms共同决定的。您可以根据具体业务需求进行调整。

增加Consumer实例个数。
可以在进程内直接增加(需要保证每个实例对应一个线程,否则没有太大意义),也可以部署多个消费实例进程;需要注意的是,实例个数超过分区数量后就不再能提高速度,将会有消费实例不工作。
增加消费线程。
增加Consumer实例本质上也是增加线程的方式来提升速度,因此更加重要的性能提升方式是增加消费线程,最基本的步骤如下:
建议设置等待的超时时间,超时后作消费失败处理。
增加消费线程
1、定义一个线程池;
2、Poll 数据;
3、把数据提交到线程池进行并发处理;
4、等并发结果返回成功后,再次 poll 数据执行

调整生产者、代理和消费者,使其在可管理的时间内发送、处理和接收尽可能多的批处理,从而使Kafka集群的延迟和吞吐量达到最佳平衡。

//同步提交、失败重试、安全、延迟高consumer.commitSync
//异步提交、无失败重试、不安全、延迟低consumer.commitAsync
auto.commit.offset
ConsumerRecords<> records = consumer.poll();
for (ConsumerRecord<> record : records){
。。。
tyr{
consumer.commitSync()
}
。。。
}
记录和偏移量一起写入db中,这个动作保持事务,并通过 分区再平衡监听器 处理消费者变动影响

ConsumerRebalanceListener 有两个方法可以在此方案中使用:
onPartitionsRevoked:消费者停止后,rebalance开始前调用,将记录和最新偏移量一起写入db中,防止偏移量丢失
onPartitionsAssigned:rebalance开始后,消费者开始读数据前调用,从db中读取偏移量,调用seek从指定的偏移量开始消费

在开了多个线程同时访问的情况下,如果队列里已经没有消息了,其实不需要所有的线程都在poll,只需要有一个线程poll各分区的消息就足够了,当在polling的线程发现队列中有消息,可以唤醒其他线程一起消费消息,以达到快速响应的目的。

这种方案适用于对消费消息的实时性要求不高的应用场景。如果要求准实时消费消息,则建议保持所有消费者处于活跃状态。

消费者(Consumer)和消息分区(Partition)并不强制数量相等,Kafka的poll(long)方法帮助实现获取消息、分区平衡、消费者与Kafka broker节点间的心跳检测等功能。
因此在对消费消息的实时性要求不高场景下,当消息数量不多的时候,可以选择让一部分消费者处于wait状态。

public class Config
{
   
    private static final String PRODUCE_CONFIG = "producer.properties";

    private static final String CONSUME_CONFIG = "consumer.properties";

    private static final String SASL_CONFIG = "dms_kafka_client_jaas.conf";

    private static final String TRUSTSTORE_PATH = "client.truststore.jks";

    public static Properties getProducerConfig() throws IOException
    {
   
        return getPropertyFromClassPath(PRODUCE_CONFIG);
    }

    public static Properties getConsumerConfig() throws IOException
    {
   
        return getPropertyFromClassPath(CONSUME_CONFIG);
    }

    public static String getSaslConfig() throws IOException
    {
   
        return getClassLoader().getResource(SASL_CONFIG).getPath();
    }

    public static String getTrustStorePath() throws IOException
    {
   
        return getClassLoader().getResource(TRUSTSTORE_PATH).getPath();
    }

    private static Properties getPropertyFromClassPath(String resourceName) throws IOException
    {
   
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        if (classLoader == null)
        {
   
            classLoader = Config.class.getClassLoader();
        }

        Properties properties = new Properties();
        properties.load(classLoader.getResourceAsStream(resourceName));
        return properties;
    }

    private static ClassLoader getClassLoader()
    {
   
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        if (classLoader == null)
        {
   
            classLoader = Config.class.getClassLoader();
        }
        return classLoader;
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
public class DmsKafkaProduceDemo
{
   
    public static void main(String[] args) throws IOException
    {
   
        Properties producerConfig = Config.getProducerConfig();

        producerConfig.put("ssl.truststore.location", Config.getTrustStorePath());
        System.setProperty("java.security.auth.login.config", Config.getSaslConfig());

        Producer<String, String> producer = new KafkaProducer<>(producerConfig);
        for (int i = 0; i < 10; i++)
        {
   
            Future<RecordMetadata> future =
                producer.send(new ProducerRecord<String, String>(
                        producerConfig.getProperty("topic"),
                        null, "hello, dms kafka."));
            RecordMetadata rm;
            try
            {
   
                rm = future.get();
                System.out.println("Succeed to send msg: " + rm.offset());
            }
            catch (InterruptedException | ExecutionException e)
            {
   
                e.printStackTrace();
            }
        }
        producer.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
public class DmsKafkaConsumeDemo
{
   
    private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);

    public static void WorkerFunc(int workerId, KafkaConsumer<String, String> kafkaConsumer) throws IOException
    {
   
        Properties consumerConfig = Config.getConsumerConfig();
        RecordReceiver receiver = new RecordReceiver(workerId, kafkaConsumer, consumerConfig.getProperty("topic"));
        while (true)
        {
   
            ConsumerRecords<String, String> records = receiver.receiveMessage();
            Iterator<ConsumerRecord<String, String>> iter = records.iterator();
            while (iter.hasNext())
            {
   
                ConsumerRecord<String, String> cr = iter.next();
                System.out.println("Thread" + workerId + " recievedrecords" + cr.value());
                logger.info("Thread" + workerId + " recievedrecords" + cr.value());

            }

        }
    }

    public static KafkaConsumer<String, String> getConsumer() throws IOException
    {
   
        Properties consumerConfig = Config.getConsumerConfig();

        consumerConfig.put("ssl.truststore.location", Config.getTrustStorePath());
        System.setProperty("java.security.auth.login.config", Config.getSaslConfig());

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
        kafkaConsumer.subscribe(Arrays.asList(consumerConfig.getProperty("topic")),
                new ConsumerRebalanceListener()
                {
   
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> arg0)
                    {
   

                    }

                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> tps)
                    {
   

                    }
                });
        return kafkaConsumer;
    }

    public static void main(String[] args) throws IOException
    {
   

        //创建当前消费组的consumer
        final KafkaConsumer<String, String> consumer1 = getConsumer();
        Thread thread1 = new Thread(new Runnable()
        {
   
            public void run()
            {
   
                try
                {
   
                    WorkerFunc(1, consumer1);
                }
                catch (IOException e)
                {
   
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        final KafkaConsumer<String, String> consumer2 = getConsumer();

        Thread thread2 = new Thread(new Runnable()
        {
   
            public void run()
            {
   
                try
                {
   
                    WorkerFunc(2, consumer2);
                }
                catch (IOException e)
                {
   
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        final KafkaConsumer<String, String> consumer3 = getConsumer();

        Thread thread3 = new Thread(new Runnable()
        {
   
            public void run()
            {
   
                try
                {
   
                    WorkerFunc(3, consumer3);
                }
                catch (IOException e)
                {
   
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        //启动线程
        thread1.start();
        thread2.start();
        thread3.start();

        try
        {
   
            Thread.sleep(5000);
        }
        catch (InterruptedException e)
        {
   
            e.printStackTrace();
        }
        //线程加入
        try
        {
   
            thread1.join();
            thread2.join();
            thread3.join();
        }
        catch (InterruptedException e)
        {
   
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
public class RecordReceiver
{
   
    private static Logger logger = Logger.getLogger(DmsKafkaProduceDemo.class);
    
    //polling的间隔时间
    public static final int WAIT_SECONDS = 10 * 1000;

    protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();

    protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

    protected Object lockObj;

    protected String topicName;

    protected KafkaConsumer<String, String> kafkaConsumer;

    protected int workerId;

    public RecordReceiver(int id, KafkaConsumer<String, String> kafkaConsumer, String queue)
    {
   
        this.kafkaConsumer = kafkaConsumer;
        this.topicName = queue;
        this.workerId = id;

        synchronized (sLockObjMap)
        {
   
            lockObj = sLockObjMap.get(topicName);
            if (lockObj == null)
            {
   
                lockObj = new Object();
                sLockObjMap.put(topicName, lockObj);
            }
        }
    }

    public boolean setPolling()
    {
   
        synchronized (lockObj)
        {
   
            Boolean ret = sPollingMap.get(topicName);
            if (ret == null || !ret)
            {
   
                sPollingMap.put(topicName, true);
                return true;
            }
            return false;
        }
    }

    //唤醒全部线程
    public void clearPolling()
    {
   
        synchronized (lockObj)
        {
   
            sPollingMap.put(topicName, false);
            lockObj.notifyAll();
            System.out.println("Everyone WakeUp and Work!");
            logger.info("Everyone WakeUp and Work!");
        }
    }

    public ConsumerRecords<String, String> receiveMessage()
    {
   
        boolean polling = false;
        while (true)
        {
   
            //检查线程的poll状态,必要时休眠
            synchronized (lockObj)
            {
   
                Boolean p = sPollingMap.get(topicName);
                if (p != null && p)
                {
   
                    try
                    {
   
                        System.out.println("Thread" + workerId + " Have a nice sleep!");
                        logger.info("Thread" + workerId +" Have a nice sleep!");
                        polling = false;
                        lockObj.wait();
                    }
                    catch (InterruptedException e)
                    {
   
                        System.out.println("MessageReceiver Interrupted! topicName is " + topicName);
                        logger.error("MessageReceiver Interrupted! topicName is "+topicName);

                        return null;
                    }
                }
            }

            //开始消费,必要时唤醒其他线程消费
            try
            {
   
                ConsumerRecords<String, String> Records = null;
                if (!polling)
                {
   
                    Records = kafkaConsumer.poll(100);                    
                    if (Records.count() == 0)
                    {
   
                        polling = true;
                        continue;
                    }
                }
                else
                {
   
                    if (setPolling())
                    {
   
                        System.out.println("Thread" + workerId + " Polling!");
                        logger.info("Thread " + workerId + " Polling!");
                    }
                    else
                    {
   
                        continue;
                    }
                    do
                    {
   
                        System.out.println("Thread" + workerId + " KEEP Poll records!");
                        logger.info("Thread" + workerId + " KEEP Poll records!");
                        try
                        {
   
                            Records = kafkaConsumer.poll(WAIT_SECONDS);
                        }
                        catch (Exception e)
                        {
   
                            System.out.println("Exception Happened when polling records: " + e);
                            logger.error("Exception Happened when polling records: " + e);

                        }
                    } while (Records.count()==0);
                    clearPolling();
                }
                //消息确认
                kafkaConsumer.commitSync();
                return Records;
            }
            catch (Exception e)
            {
   
                System.out.println("Exception Happened when poll records: " + e);
                logger.error("Exception Happened when poll records: " + e);
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169

消息过滤

如果过滤的种类不多,可以采取多个 Topic 的方式达到过滤的目的;
如果过滤的种类多,则最好在客户端业务层面自行过滤。

消息广播

Kafka 自身没有消息广播的语义,可以通过创建不同的 Consumer Group 来模拟实现。
创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks“all”生成。这将确保生产者在大多数副本没有收到写操作时引发异常。

消息格式

crc32(4B):crc32校验值。校验范围为magic至value之间。
magic(1B):消息格式版本号,此版本的magic值为0。
attributes(1B):消息的属性。总共占1个字节,低3位表示压缩类型:0表示NONE、1表示GZIP、2表示SNAPPY、3表示LZ4(LZ4自Kafka 0.9.x引入),其余位保留。
key length(4B):表示消息的key的长度。如果为-1,则表示没有设置key,即key=null。
key:可选,如果没有key则无此字段。
value length(4B):实际消息体的长度。如果为-1,则表示消息为空。
value:消息体。可以为空,比如tomnstone消息

开发规范

相同属性的业务数据推送至同一Topic中
命名规范:
Topic_业务名/业务表名
英文字母统一小写
短横杠"-"以下划线代替 " _ "

topic的分区数可不可以增加
可以
topic的分区数可不可以减少
不可以

topic使用规范
推荐3副本,同步复制,最小同步副本数为2,且同步副本数不能等于topic副本数,否则宕机1个副本会导致无法生产消息。

创建方式:支持选择是否开启kafka自动创建Topic的开关。选择开启后,表示生产或消费一个未创建的Topic时,会自动创建一个包含3个分区和3个副本的Topic。

单topic最大分区数建议为100。
连接数限制:3000
消息大小:不能超过10MB

使用sasl_ssl协议访问Kafka:确保DNS具有反向解析能力,或者在hosts文件配置kafka所有节点ip和主机名映射,避免Kafka client做反向解析,阻塞连接建立。

磁盘容量申请超过业务量 * 副本数的2倍,即保留磁盘空闲50%左右。
业务进程JVM内存使用确保无频繁FGC,否则会阻塞消息的生产和消费。

消息内容规范
btype:业务类型标识
recordgid:数据记录唯一标识
gtime:数据获取的业务时间
utime:数据推送时间
data:业务数据json串

producer使用规范
同步复制客户端需要配合使用:acks=all
配置发送失败重试:retries=3
发送优化:linger.ms=0
生产端的JVM内存要足够,避免内存不足导致发送阻塞
retry.backoff.ms,重试间隔,建议设置为1000。
发送接口是异步的,如果您想接收发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。
如果消息发送量较大,建议不要设置Key,并使用黏性分区策略

控制台的默认分区个数是12,可以满足绝大部分场景的需求。您可以根据业务使用量进行增加。
注意 分区增加后,将不能减少,请小幅度调整

Consumer Group订阅多个Topic。
一个Consumer Group可以订阅多个Topic,多个Topic的消息被Cosumer Group中的Consumer均匀消费。例如Consumer Group A订阅了Topic A、Topic B、Topic C,则这三个Topic中的消息,被Consumer Group中的Consumer均匀消费。

String topicStr = kafkaProperties.getProperty
    声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/652477
    推荐阅读
    相关标签
      

    闽ICP备14008679号