赞
踩
原创不易,转载请注明出处
Kafka消息生产者端 业务在发送消息的时候,会将消息追加写到RecordAccumulator这个内存缓冲区的队列中,在追加的时候首先会根据消息的topic与partition 去一个map中获取对应的Deque队列,如果对应的队列不存在,就会创建一个,并添加到map中,然后才会将消息追加到队列队尾的batch元素中,我们知道,Kafka发送消息是支持多线程并发的,也就是往内存缓冲区中追加也是多线程并发的,其实在往Deque队列中追加写的过程是使用了synchronized来保证并发安全性,但是在获取topic-partition对应Deque队列的时候如果使用普通的HashMap是会有并发安全问题的,因为队列不存在,会创建一个,然后put到map中这个操作是线程不安全的。当然面对这种问题我们有很多解决方案,比如说加锁串行化,也可以使用并发安全的map ConcurrentHashMap这种分段锁实现的,但是Kafka没有使用这几种方案,而是自己实现了一个更加切合业务场景的map,CopyOnWriteMap。
我们先来看它的定义,成员与构造
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
// 可见性
private volatile Map<K, V> map;
public CopyOnWriteMap() {
/// 创建一个空map
this.map = Collections.emptyMap();
}
CopyOnWriteMap 实现ConcurrentMap ,成员就一个,是个map,这个成员是volatile 关键字修饰的,一旦发生了改变,所有线程可见,关于volatile不知道的同学可以上网查阅下资料,这是个java 并发编程中非常重要的点,也是面试常问的点。构造就是创建了一个普通的map。
接下来我们看下它比较典型的写方法putIfAbsent与put。
// 加锁
@Override
public synchronized V putIfAbsent(K k, V v) {
if (!containsKey(k))// 如果不存在 就会添加
return put(k, v);
else
// 如果存在的话返回 存在的
return get(k);
}
可以看到这个putIfAbsent 方法是由synchronized 修饰的,也就是多线程并发安全的,判断如果不存在这个key,就调用put方法添加
如果存在了就调用get 方法获取。
我们看下这个put方法
// put的时候加锁
@Override
public synchronized V put(K k, V v) {
// cp之前的那个map
Map<K, V> copy = new HashMap<K, V>(this.map);
// 添加
V prev = copy.put(k, v);
// 重新赋值
this.map = Collections.unmodifiableMap(copy);
return prev;
}
可以看到这个put方法也是synchronized 修饰的,先会cp一份之前的那个map,然后在cp的那个map中添加,接下来重要的来了,给map重新赋值。由于成员map是volatile 修饰的,拥有可见性,那些读的线程就会立马感知到map的引用变了,变成了一个新的map。
其实还有这些 putAll ,remove ,replace 有关写的方法都是加锁的cp 操作的。
我们再来看下并发无锁并发读
拿最常用的get方法来说
@Override
public V get(Object k) {
return map.get(k);
}
就是调用map的get 方法来读的,而且是无锁的。
像containsValue,containsKey,entrySet,isEmpty,keySet,size,values这些有关读操作都是无锁的。
比如说我们在put的时候,会将并发put的操作串行化,但是它并没有锁map,而是锁的这个方法,这样做的好处就是你在put的时候,不影响那些并发读的线程,但是这种写时cp的方式有个问题就是,如果写很频繁的时候,容易造成大量内存的浪费,gc 成本增加,所以Kafka这个实现的这个CopyOnWriteMap特别适用于读多写少的场景,特别是这种无锁并发读,大大提高并发度,它使用在获取Deque这个业务中是正好的,因为这个topic与partition是不经常变动的,就是在消息生产者在开始的时候,可能会出现出现并发写,一旦初始化好了,基本都是读操作了。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。