当前位置:   article > 正文

【大数据平台】——基于Confluent的Kafka Rest API探索(四)_kafka restapi

kafka restapi
  • Kafka Rest API 指定分区

  • Kafka消息分区规则

这里我们戳进KafkaProducer的send方法

  1. @Override
  2. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  3. // intercept the record, which can be potentially modified; this method does not throw exceptions
  4. ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
  5. return doSend(interceptedRecord, callback);
  6. }

再戳进doSend方法

  1. /**
  2. * Implementation of asynchronously send a record to a topic.
  3. */
  4. private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
  5. TopicPartition tp = null;
  6. try {
  7. ... ....
  8. byte[] serializedKey;
  9. try {
  10. serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
  11. } catch (ClassCastException cce) {
  12. throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
  13. " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
  14. " specified in key.serializer", cce);
  15. }
  16. ... ...
  17. int partition = partition(record, serializedKey, serializedValue, cluster);
  18. ... ...
  19. }

调用的partition方法

  1. private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
  2. Integer partition = record.partition();
  3. return partition != null ?
  4. partition :
  5. partitioner.partition(
  6. record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
  7. }

这里面的partitioner的类就是:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

  1. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  2. List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
  3. int numPartitions = partitions.size();
  4. if (keyBytes == null) {
  5. int nextValue = nextValue(topic);
  6. List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
  7. if (availablePartitions.size() > 0) {
  8. int part = Utils.toPositive(nextValue) % availablePartitions.size();
  9. return availablePartitions.get(part).partition();
  10. } else {
  11. // no partitions are available, give a non-available partition
  12. return Utils.toPositive(nextValue) % numPartitions;
  13. }
  14. } else {
  15. // hash the keyBytes to choose a partition
  16. return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
  17. }
  18. }

可以看出如果没有指定Key ,会采用随机方式指定分区。而如果指定了Key则会对Key做MurmurHash2操作之后对总分区数取模。注意的是,这里的Key可不是发送消息时指定的Key,在doSend方法不难看见,他是

keySerializer.serialize(record.topic(), record.headers(), record.key());

这个接口实际

  1. default byte[] serialize(String topic, Headers headers, T data) {
  2. return serialize(topic, data);
  3. }

观察他的几个实现

  1. public class ShortSerializer implements Serializer<Short> {
  2. public byte[] serialize(String topic, Short data) {
  3. if (data == null)
  4. return null;
  5. return new byte[] {
  6. (byte) (data >>> 8),
  7. data.byteValue()
  8. };
  9. }
  10. }
  1. public class LongSerializer implements Serializer<Long> {
  2. public byte[] serialize(String topic, Long data) {
  3. if (data == null)
  4. return null;
  5. return new byte[] {
  6. (byte) (data >>> 56),
  7. (byte) (data >>> 48),
  8. (byte) (data >>> 40),
  9. (byte) (data >>> 32),
  10. (byte) (data >>> 24),
  11. (byte) (data >>> 16),
  12. (byte) (data >>> 8),
  13. data.byteValue()
  14. };
  15. }
  16. }
  1. public class StringSerializer implements Serializer<String> {
  2. private String encoding = "UTF8";
  3. ... ...
  4. @Override
  5. public byte[] serialize(String topic, String data) {
  6. try {
  7. if (data == null)
  8. return null;
  9. else
  10. return data.getBytes(encoding);
  11. } catch (UnsupportedEncodingException e) {
  12. throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
  13. }
  14. }
  15. }
  1. public class ByteArraySerializer implements Serializer<byte[]> {
  2. @Override
  3. public byte[] serialize(String topic, byte[] data) {
  4. return data;
  5. }
  6. }

所以他是根据Key的序列化后的进行Hash在模分区数

  • Kafka Rest Proxy指定分区

观察Kafka Rest Proxy源码:

https://github.com/confluentinc/kafka-rest

关注工程kafka-rest,可以看到如之前介绍的两个类:ProducerPool 和 ProduceTask,ProducerPool 中的produce方法实现了消息的发送。

  1. public <K, V> void produce(
  2. String topic,
  3. Integer partition,
  4. EmbeddedFormat recordFormat,
  5. SchemaHolder schemaHolder,
  6. Collection<? extends ProduceRecord<K, V>> records,
  7. ProduceRequestCallback callback
  8. ) {
  9. ProduceTask task = new ProduceTask(schemaHolder, records.size(), callback);
  10. log.trace("Starting produce task " + task.toString());
  11. RestProducer restProducer = producers.get(recordFormat);
  12. restProducer.produce(task, topic, partition, records);
  13. }

这个是RestProducer接口定义的方法,以最简单的(无Schema的)消息为例,实现RestProducer接口的是NoSchemaRestProducer,这个类中的produce方法

  1. @Override
  2. public void produce(
  3. ProduceTask task,
  4. String topic,
  5. Integer partition,
  6. Collection<? extends ProduceRecord<K, V>> produceRecords
  7. ) {
  8. for (ProduceRecord<K, V> record : produceRecords) {
  9. Integer recordPartition = partition;
  10. if (recordPartition == null) {
  11. recordPartition = record.partition();
  12. }
  13. producer.send(
  14. new ProducerRecord(topic, recordPartition, record.getKey(), record.getValue()),
  15. task.createCallback()
  16. );
  17. }
  18. }

这个producer就是KafkaProducer,后面就不解释了吧。

不过既然Kafka Rest API支持向指定分区发送消息,我可以像Java Client实现Partitioner 接口那样自定义分区规则——通过自定义的业务场景预先分好Partition,并将Partition信息写入请求的JSON中。

  • 测试

使用Topic rest_test2测试,该Topic有3个分区。

通过Postman验证以下Key及分区为:

KEY = 33669988, Postman测试会被分到 Partition 0

KEY = 15935725, Postman测试会被分到 Partition 1

KEY = 13572468, Postman测试会被分到 Partition 2

之后使用这三个Key,分别发送1000条数据,每发送一条都验证以下请求的response中的partition是否和Postman的一样,把一样的计数,最后看结果。

代码:

  1. public class TestPartition {
  2. public static int DATA_SIZE = 1000;
  3. public static String REST_HOST = "xxx";
  4. public static int REST_PORT = 8085;
  5. public static String TOPIC = "rest_test2";
  6. public static String CONTENT_TYPE = "application/vnd.kafka.binary.v2+json";
  7. public static String ENCODE = "utf-8";
  8. public static String KEY_IN_PARTITION_0 = "33669988";
  9. public static String KEY_IN_PARTITION_1 = "15935725";
  10. public static String KEY_IN_PARTITION_2 = "13572468";
  11. public static void main(String[] args) {
  12. Random random = new Random();
  13. int count0 = 0;
  14. int count1 = 0;
  15. int count2 = 0;
  16. for(int i=0; i<DATA_SIZE; i++){
  17. //发送数据
  18. String response = sendMessage(
  19. REST_HOST,
  20. REST_PORT,
  21. TOPIC,
  22. KEY_IN_PARTITION_0,
  23. MD5Util.encrypt(
  24. String.valueOf(
  25. random.nextInt(999999)
  26. )),
  27. CONTENT_TYPE,
  28. ENCODE);
  29. //验证是否在 Partition 0
  30. if(0==getPartition(response)){
  31. count0++;
  32. }else {
  33. System.out.println("Need 0 but "+getPartition(response));
  34. }
  35. }
  36. for(int i=0; i<DATA_SIZE/2; i++){
  37. String response = sendMessage(
  38. REST_HOST,
  39. REST_PORT,
  40. TOPIC,
  41. KEY_IN_PARTITION_1,
  42. MD5Util.encrypt(
  43. String.valueOf(
  44. random.nextInt(999999)
  45. )),
  46. CONTENT_TYPE,
  47. ENCODE);
  48. if(1==getPartition(response)){
  49. count1++;
  50. }else {
  51. System.out.println(getPartition(response));
  52. }
  53. }
  54. for(int i=0; i<DATA_SIZE/5; i++){
  55. String response = sendMessage(
  56. REST_HOST,
  57. REST_PORT,
  58. TOPIC,
  59. KEY_IN_PARTITION_2,
  60. MD5Util.encrypt(
  61. String.valueOf(
  62. random.nextInt(999999)
  63. )),
  64. CONTENT_TYPE,
  65. ENCODE);
  66. if(2==getPartition(response)){
  67. count2++;
  68. }else {
  69. System.out.println(getPartition(response));
  70. }
  71. }
  72. System.out.println(count0);
  73. System.out.println(count1);
  74. System.out.println(count2);
  75. HttpClientPoolTool.closeConnectionPool();
  76. }
  77. public static int getPartition(String response){
  78. try {
  79. Pattern pattern = Pattern.compile("(?<=(\"partition\":)).*(?=(,\"offset\"))");
  80. Matcher matcher = pattern.matcher(response);
  81. if (matcher.find()) {
  82. return Integer.parseInt(matcher.group(0).trim());
  83. }
  84. return -1;
  85. }catch (Exception e){
  86. System.out.println(response);
  87. e.printStackTrace();
  88. }
  89. return -1;
  90. }
  91. }

结果均是1000,再测试一下三个Key数据量不一致的,分别发送1000、500、200条,结果依旧是相同的Key会被分在同一个partition下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/79830
推荐阅读
相关标签
  

闽ICP备14008679号