赞
踩
这里我们戳进KafkaProducer的send方法
- @Override
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
- // intercept the record, which can be potentially modified; this method does not throw exceptions
- ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
- return doSend(interceptedRecord, callback);
- }
再戳进doSend方法
- /**
- * Implementation of asynchronously send a record to a topic.
- */
- private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
- TopicPartition tp = null;
- try {
- ... ....
- byte[] serializedKey;
- try {
- serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
- } catch (ClassCastException cce) {
- throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
- " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
- " specified in key.serializer", cce);
- }
- ... ...
- int partition = partition(record, serializedKey, serializedValue, cluster);
- ... ...
- }
调用的partition方法
- private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
- Integer partition = record.partition();
- return partition != null ?
- partition :
- partitioner.partition(
- record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
- }
这里面的partitioner的类就是:
org.apache.kafka.clients.producer.internals.DefaultPartitioner
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
- int numPartitions = partitions.size();
- if (keyBytes == null) {
- int nextValue = nextValue(topic);
- List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
- if (availablePartitions.size() > 0) {
- int part = Utils.toPositive(nextValue) % availablePartitions.size();
- return availablePartitions.get(part).partition();
- } else {
- // no partitions are available, give a non-available partition
- return Utils.toPositive(nextValue) % numPartitions;
- }
- } else {
- // hash the keyBytes to choose a partition
- return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
- }
- }
可以看出如果没有指定Key ,会采用随机方式指定分区。而如果指定了Key则会对Key做MurmurHash2操作之后对总分区数取模。注意的是,这里的Key可不是发送消息时指定的Key,在doSend方法不难看见,他是
keySerializer.serialize(record.topic(), record.headers(), record.key());
这个接口实际
- default byte[] serialize(String topic, Headers headers, T data) {
- return serialize(topic, data);
- }
观察他的几个实现
- public class ShortSerializer implements Serializer<Short> {
- public byte[] serialize(String topic, Short data) {
- if (data == null)
- return null;
-
- return new byte[] {
- (byte) (data >>> 8),
- data.byteValue()
- };
- }
- }
- public class LongSerializer implements Serializer<Long> {
- public byte[] serialize(String topic, Long data) {
- if (data == null)
- return null;
-
- return new byte[] {
- (byte) (data >>> 56),
- (byte) (data >>> 48),
- (byte) (data >>> 40),
- (byte) (data >>> 32),
- (byte) (data >>> 24),
- (byte) (data >>> 16),
- (byte) (data >>> 8),
- data.byteValue()
- };
- }
- }
- public class StringSerializer implements Serializer<String> {
- private String encoding = "UTF8";
-
- ... ...
-
- @Override
- public byte[] serialize(String topic, String data) {
- try {
- if (data == null)
- return null;
- else
- return data.getBytes(encoding);
- } catch (UnsupportedEncodingException e) {
- throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
- }
- }
- }
- public class ByteArraySerializer implements Serializer<byte[]> {
- @Override
- public byte[] serialize(String topic, byte[] data) {
- return data;
- }
- }
所以他是根据Key的序列化后的进行Hash在模分区数
观察Kafka Rest Proxy源码:
https://github.com/confluentinc/kafka-rest
关注工程kafka-rest,可以看到如之前介绍的两个类:ProducerPool 和 ProduceTask,ProducerPool 中的produce方法实现了消息的发送。
- public <K, V> void produce(
- String topic,
- Integer partition,
- EmbeddedFormat recordFormat,
- SchemaHolder schemaHolder,
- Collection<? extends ProduceRecord<K, V>> records,
- ProduceRequestCallback callback
- ) {
- ProduceTask task = new ProduceTask(schemaHolder, records.size(), callback);
- log.trace("Starting produce task " + task.toString());
- RestProducer restProducer = producers.get(recordFormat);
- restProducer.produce(task, topic, partition, records);
- }
这个是RestProducer接口定义的方法,以最简单的(无Schema的)消息为例,实现RestProducer接口的是NoSchemaRestProducer,这个类中的produce方法
- @Override
- public void produce(
- ProduceTask task,
- String topic,
- Integer partition,
- Collection<? extends ProduceRecord<K, V>> produceRecords
- ) {
- for (ProduceRecord<K, V> record : produceRecords) {
- Integer recordPartition = partition;
- if (recordPartition == null) {
- recordPartition = record.partition();
- }
- producer.send(
- new ProducerRecord(topic, recordPartition, record.getKey(), record.getValue()),
- task.createCallback()
- );
- }
- }
这个producer就是KafkaProducer,后面就不解释了吧。
不过既然Kafka Rest API支持向指定分区发送消息,我可以像Java Client实现Partitioner 接口那样自定义分区规则——通过自定义的业务场景预先分好Partition,并将Partition信息写入请求的JSON中。
使用Topic rest_test2测试,该Topic有3个分区。
通过Postman验证以下Key及分区为:
KEY = 33669988, Postman测试会被分到 Partition 0
KEY = 15935725, Postman测试会被分到 Partition 1
KEY = 13572468, Postman测试会被分到 Partition 2
之后使用这三个Key,分别发送1000条数据,每发送一条都验证以下请求的response中的partition是否和Postman的一样,把一样的计数,最后看结果。
代码:
- public class TestPartition {
-
- public static int DATA_SIZE = 1000;
- public static String REST_HOST = "xxx";
- public static int REST_PORT = 8085;
- public static String TOPIC = "rest_test2";
- public static String CONTENT_TYPE = "application/vnd.kafka.binary.v2+json";
- public static String ENCODE = "utf-8";
- public static String KEY_IN_PARTITION_0 = "33669988";
- public static String KEY_IN_PARTITION_1 = "15935725";
- public static String KEY_IN_PARTITION_2 = "13572468";
-
- public static void main(String[] args) {
- Random random = new Random();
-
- int count0 = 0;
- int count1 = 0;
- int count2 = 0;
-
- for(int i=0; i<DATA_SIZE; i++){
- //发送数据
- String response = sendMessage(
- REST_HOST,
- REST_PORT,
- TOPIC,
- KEY_IN_PARTITION_0,
- MD5Util.encrypt(
- String.valueOf(
- random.nextInt(999999)
- )),
- CONTENT_TYPE,
- ENCODE);
- //验证是否在 Partition 0
- if(0==getPartition(response)){
- count0++;
- }else {
- System.out.println("Need 0 but "+getPartition(response));
- }
- }
-
- for(int i=0; i<DATA_SIZE/2; i++){
- String response = sendMessage(
- REST_HOST,
- REST_PORT,
- TOPIC,
- KEY_IN_PARTITION_1,
- MD5Util.encrypt(
- String.valueOf(
- random.nextInt(999999)
- )),
- CONTENT_TYPE,
- ENCODE);
- if(1==getPartition(response)){
- count1++;
- }else {
- System.out.println(getPartition(response));
- }
- }
-
- for(int i=0; i<DATA_SIZE/5; i++){
- String response = sendMessage(
- REST_HOST,
- REST_PORT,
- TOPIC,
- KEY_IN_PARTITION_2,
- MD5Util.encrypt(
- String.valueOf(
- random.nextInt(999999)
- )),
- CONTENT_TYPE,
- ENCODE);
- if(2==getPartition(response)){
- count2++;
- }else {
- System.out.println(getPartition(response));
- }
- }
-
- System.out.println(count0);
- System.out.println(count1);
- System.out.println(count2);
-
- HttpClientPoolTool.closeConnectionPool();
- }
-
- public static int getPartition(String response){
- try {
- Pattern pattern = Pattern.compile("(?<=(\"partition\":)).*(?=(,\"offset\"))");
- Matcher matcher = pattern.matcher(response);
- if (matcher.find()) {
- return Integer.parseInt(matcher.group(0).trim());
- }
- return -1;
- }catch (Exception e){
- System.out.println(response);
- e.printStackTrace();
- }
- return -1;
- }
- }
结果均是1000,再测试一下三个Key数据量不一致的,分别发送1000、500、200条,结果依旧是相同的Key会被分在同一个partition下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。