当前位置:   article > 正文

深度解析Kafka中CopyOnWriteMap实现原理_kafka copyonwritemap

kafka copyonwritemap

原创不易,转载请注明出处


前言

Kafka消息生产者端 业务在发送消息的时候,会将消息追加写到RecordAccumulator这个内存缓冲区的队列中,在追加的时候首先会根据消息的topic与partition 去一个map中获取对应的Deque队列,如果对应的队列不存在,就会创建一个,并添加到map中,然后才会将消息追加到队列队尾的batch元素中,我们知道,Kafka发送消息是支持多线程并发的,也就是往内存缓冲区中追加也是多线程并发的,其实在往Deque队列中追加写的过程是使用了synchronized来保证并发安全性,但是在获取topic-partition对应Deque队列的时候如果使用普通的HashMap是会有并发安全问题的,因为队列不存在,会创建一个,然后put到map中这个操作是线程不安全的。当然面对这种问题我们有很多解决方案,比如说加锁串行化,也可以使用并发安全的map ConcurrentHashMap这种分段锁实现的,但是Kafka没有使用这几种方案,而是自己实现了一个更加切合业务场景的map,CopyOnWriteMap。

1. 加锁copy写

我们先来看它的定义,成员与构造

public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    // 可见性
    private volatile Map<K, V> map;

    public CopyOnWriteMap() {

        /// 创建一个空map
        this.map = Collections.emptyMap();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

CopyOnWriteMap 实现ConcurrentMap ,成员就一个,是个map,这个成员是volatile 关键字修饰的,一旦发生了改变,所有线程可见,关于volatile不知道的同学可以上网查阅下资料,这是个java 并发编程中非常重要的点,也是面试常问的点。构造就是创建了一个普通的map。
接下来我们看下它比较典型的写方法putIfAbsent与put。

 // 加锁
 @Override
 public synchronized V putIfAbsent(K k, V v) {
     if (!containsKey(k))// 如果不存在 就会添加
         return put(k, v);
     else

         // 如果存在的话返回 存在的
         return get(k);
 }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以看到这个putIfAbsent 方法是由synchronized 修饰的,也就是多线程并发安全的,判断如果不存在这个key,就调用put方法添加
如果存在了就调用get 方法获取。
我们看下这个put方法

// put的时候加锁
@Override
public synchronized V put(K k, V v) {
    // cp之前的那个map
    Map<K, V> copy = new HashMap<K, V>(this.map);
    // 添加
    V prev = copy.put(k, v);
    // 重新赋值
    this.map = Collections.unmodifiableMap(copy);
    return prev;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

可以看到这个put方法也是synchronized 修饰的,先会cp一份之前的那个map,然后在cp的那个map中添加,接下来重要的来了,给map重新赋值。由于成员map是volatile 修饰的,拥有可见性,那些读的线程就会立马感知到map的引用变了,变成了一个新的map。

其实还有这些 putAll ,remove ,replace 有关写的方法都是加锁的cp 操作的。

2. 无锁并发读

我们再来看下并发无锁并发读
拿最常用的get方法来说

@Override
public V get(Object k) {
    return map.get(k);
}
  • 1
  • 2
  • 3
  • 4

就是调用map的get 方法来读的,而且是无锁的。
像containsValue,containsKey,entrySet,isEmpty,keySet,size,values这些有关读操作都是无锁的。

总结

比如说我们在put的时候,会将并发put的操作串行化,但是它并没有锁map,而是锁的这个方法,这样做的好处就是你在put的时候,不影响那些并发读的线程,但是这种写时cp的方式有个问题就是,如果写很频繁的时候,容易造成大量内存的浪费,gc 成本增加,所以Kafka这个实现的这个CopyOnWriteMap特别适用于读多写少的场景,特别是这种无锁并发读,大大提高并发度,它使用在获取Deque这个业务中是正好的,因为这个topic与partition是不经常变动的,就是在消息生产者在开始的时候,可能会出现出现并发写,一旦初始化好了,基本都是读操作了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/597153
推荐阅读
相关标签
  

闽ICP备14008679号