赞
踩
文章结构
1.RecordAccumulator写入流程
2.batches :CopyOnWriteMap解析
3.线程安全和高并发效率的保证
这是生产者的缓冲组件,可以在生产者的send方法中看到这句
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
下面进入append方法看下
经过分区器后向RecordAccumulator里面的batches缓存,一开始要判断该topic是否已经有队列
batches是concurrentMap实现类,k 是topic+partiton封装的类, v是队列。
在getOrCreateDeque方法中,如果batches没有tp,那就新建一个队列并返回。
为什么是putIfAbsent(里面判断是否存在),而不是直接put,这个是在我了解到:CopyOnWriteMap之后才明白的,后面会说。
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
至此得到了对应tp的队列。
然后锁住这个队列,只能有一个send线程获得dq对象,往里面tryAppend
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
再进入tryAppend看下,取deque最后一个元素,如果有,那就试着加进去(有可能因为size问题或者根本没有元素不成功),返回的future就是空的。如果最后一个元素不是空且新消息的size没问题,那就进last.tryAppend。因为新数据没有初始化RecordBuilder,还是不会添加成功。具体可以去看ProducerBatch类的tryAppend(不是RecordAccumulator的tryAppend哦,差点看混了)
总结下
如果队列有最后一个batch,且batch有地方容纳消息,那就加进去,然后就结束了
如果队列有最后一个batch,但是batch没地方了,last.tryAppend返回空,last关闭写入,继续往后走
如果队列没有最后一个batch(空),往后走
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
下一步估计消息大小,给内存分配计算,取batchSize和消息最大值。
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic,
compression, key, value, headers));
buffer = free.allocate(size, maxTimeToBlock);
分配好内存后再次锁住dq开始搞事情
又一次tryAppend(这步真没看懂作用,难道是free.allocate改变了什么,后续明白了会回补这块)
下一步新建刚才估算大小尺寸的recordsBuilder,封装进batch,batch装消息
dq装batch
至此消息成功装进RecordAccumulator
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); }
实现的ConcurrentMap方法,用读写分离来实现线程安全,多个线程在读取这个map时候,会得到这个指向这个map的指针。如果有线程想修改map内容,系统就复制一份map,在这个线程修改好以后,把新的指针赋给ConcurrentMap实现类。可以看到下面的put方法是 synchronized修饰的,因此同一时间只能有一个线程修改内容。在修改的时候别的线程依然可以用老的指针读取。这个非常适合读多写少的场景。
public synchronized V put(K k, V v) {
Map<K, V> copy = new HashMap<K, V>(this.map);
V prev = copy.put(k, v);
this.map = Collections.unmodifiableMap(copy);
return prev;
}
每个消息都会走一次getOrCreateDeque方法,batches随便读,即使有新的tp来了去修改batches时候也能访问。消息的数量远远多于tp数量,因此这个结构划算。
前面留下的疑问,putIfAbsent而不是put,假设现在该tp是新出现的的,多个线程都get到tp所在的dq返回null之后都去尝试向batches放新的队列,put操作是synchronized的,因此第一个获得锁的线程添加好之后,该dq建好了,别的线程要判断下。
private Deque<ProducerBatch> getOrCreateDeque(TopicPartition tp) {
Deque<ProducerBatch> d = this.batches.get(tp);
if (d != null)
return d;
d = new ArrayDeque<>();
Deque<ProducerBatch> previous = this.batches.putIfAbsent(tp, d);
if (previous == null)
return d;
else
return previous;
}
batches新增dq方法是synchronized只限一位,copyonwrite复制map内容写好后返回新map指针,不锁老map
因为不锁老map多个线程拿dq指针时候不受限制
写dq时候锁dq对象,但是map从来不锁上,因此dq指针随便拿
自己画图展现了下多线程效率
线程1在添加键值对时候,
线程3在取dq2
线程2在改dq3,dq3锁上了
线程4在拿dq3指针
这些动作同时都能做,太牛了!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。