赞
踩
Producer.java中的run()方法调用了producer.send方法,点进去send方法返回doSend。 doSend方法封装了producer的核心流程,对应前面画的那张流程图,之后还有一些细节的实现:
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { // first make sure the metadata for the topic is available /** * 步骤一: * waitOnMetadata 同步等待拉取元数据 * maxBlockTimeMs 这个参数代表最多等待多久 * 返回一个 ClusterAndWaitTime 的结果 * */ ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); // clusterAndWaitTime.waitedOnMetadataMs 代表拉取元数据用了多少时间 // 最多等待多久的时间 - 拉取元数据用了多长时间 = 还剩余多少时间可以用 代表消息没有发送成功 // 重新发送还可以有多少时间 超过这个时间会抛出异常 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); // 更新集群的元数据 Cluster cluster = clusterAndWaitTime.cluster; /** * 步骤二: * 对消息的 key和value 进行序列化 * */ byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer"); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer"); } /** * 步骤三: * 根据分区器选择消息应该发送的分区 * */ int partition = partition(record, serializedKey, serializedValue, cluster); int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue); /** * 步骤四: * 确认消息的大小是否超过了最大值 * KafkaProducer初始化的时候, 指定producer最大能发送的一条消息有多大 * 默认是1M, 根据公司实际业务进行修改 * */ ensureValidRecordSize(serializedSize); /** * 步骤五: * 根据元数据信息, 封装分区对象 * * */ tp = new TopicPartition(record.topic(), partition); long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback /** * 步骤六: * 消息是异步发送, 给每一条消息绑定回调函数 * */ Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); /** * 步骤七: * 把消息放入RecordAccumulator, (32兆的一块内存) * 然后由accumulator把消息封装成一个一个批次去发送 * */ RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); // 唤醒线程的条件: // 批次满了 或者 新创建一个批次 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); /** * 步骤八: * 唤醒sender线程, 这里是真正发送数据 * */ this.sender.wakeup(); } return result.future;
本节中值得学习的技术:
例如第四步中判断消息大小是否超过最大值的方法ensureValidRecordSize,点进去后发现, kafka自定义了自己的异常, 在底层代码中往上抛出, 在核心代码中捕获异常, 能让用户更好的知道在哪里出现了错误, 更利于错误排查。这种编程思想可用于很多项目。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。