当前位置:   article > 正文

kafka实践(十二):生产者(KafkaProducer)源码详解和调试_producer.metrics.enable

producer.metrics.enable

本节对producer的源码解析以熟悉生产者数据发送过程,关于使用Idea对kafka源码编译和调试,可以翻看之前的博客:本地kafka源码的编译和调试,本次分析的版本是kafka-1.0.0;

目录

一、环境准备

二、生产过程和KafkaProducer类解析

1、数据预处理(拦截器、序列化、分区器、缓存)

2、sender线程处理

三、生产者demo使用和调试


 

一、环境准备

在前面已经完成win环境下zk(3.4.12版本)的运行,并对kafka源码编译, 参考:本地kafka源码的编译和调试,在idea的run-->debug-->中新增configuration来创建topic:yzg(3分区1备份),本地启动运行效果:

二、生产过程和KafkaProducer类解析

KafkaProducer在 org.apache.kafka.clients.producer的包下(所有关于生产者源码都在这包),在使用生产者类时要实例化KafkaProducer,其中定义了发送机制,KafkaProducer是Producer的子类,生产者实例(producer)通过实例化KafkaProducer类,并调用它的send()方法完成数据发送,梳理如下:

① 首先过一个拦截器;

② 调用KafkaProducer.send().doSend()方法,doSend首先把key和value按照指定的序列化器进行序列化;

③ partition()函数得到数据和序列化后的数据后,对数据进行分区;

④ 调用RecordAccumulator.append()方法,将处理后的数据扔进RecordAccumulator(缓存对象)的RecordAppendResult类属性中;

⑤ RecordAccumulator.append()方法首先将数据进行队列化放在Deque对象中,Deque包含多个ProducerBatch;

⑥ 上面流程完成后,调用this.sender.wakeup()唤醒sender线程,该线程就干一件事就是发数据,

 KafkaProducer类的构造函数如下,在生产者实例传入集群config和序列化器后(暂未传入topic名称),KafkaProducer实例化后完成所有相关属性的实例化,主要的对象有

  1. private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  2. try {
  3. Map<String, Object> userProvidedConfigs = config.originals();
  4. this.producerConfig = config;
  5. this.time = Time.SYSTEM;
  6. String clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
  7. if (clientId.length() <= 0)
  8. clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
  9. this.clientId = clientId;
  10. String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
  11. (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
  12. LogContext logContext;
  13. if (transactionalId == null)
  14. logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
  15. else
  16. logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
  17. log = logContext.logger(KafkaProducer.class);
  18. log.trace("Starting the Kafka producer");
  19. Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
  20. MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
  21. .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
  22. .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
  23. .tags(metricTags);
  24. List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
  25. MetricsReporter.class);
  26. reporters.add(new JmxReporter(JMX_PREFIX));
  27. this.metrics = new Metrics(metricConfig, reporters, time);
  28. ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
  29. this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
  30. long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
  31. if (keySerializer == null) {
  32. this.keySerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  33. Serializer.class));
  34. this.keySerializer.configure(config.originals(), true);
  35. } else {
  36. config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
  37. this.keySerializer = ensureExtended(keySerializer);
  38. }
  39. if (valueSerializer == null) {
  40. this.valueSerializer = ensureExtended(config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  41. Serializer.class));
  42. this.valueSerializer.configure(config.originals(), false);
  43. } else {
  44. config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
  45. this.valueSerializer = ensureExtended(valueSerializer);
  46. }
  47. // load interceptors and make sure they get clientId
  48. userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
  49. List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
  50. ProducerInterceptor.class);
  51. this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
  52. ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
  53. this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
  54. true, true, clusterResourceListeners);
  55. this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
  56. this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
  57. this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
  58. this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
  59. this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
  60. this.transactionManager = configureTransactionState(config, logContext, log);
  61. int retries = configureRetries(config, transactionManager != null, log);
  62. int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
  63. short acks = configureAcks(config, transactionManager != null, log);
  64. this.apiVersions = new ApiVersions();
  65. this.accumulator = new RecordAccumulator(logContext,
  66. config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
  67. this.totalMemorySize,
  68. this.compressionType,
  69. config.getLong(ProducerConfig.LINGER_MS_CONFIG),
  70. retryBackoffMs,
  71. metrics,
  72. time,
  73. apiVersions,
  74. transactionManager);
  75. List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
  76. this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
  77. ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
  78. Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
  79. NetworkClient client = new NetworkClient(
  80. new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
  81. this.metrics, time, "producer", channelBuilder, logContext),
  82. this.metadata,
  83. clientId,
  84. maxInflightRequests,
  85. config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  86. config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
  87. config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  88. config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  89. this.requestTimeoutMs,
  90. time,
  91. true,
  92. apiVersions,
  93. throttleTimeSensor,
  94. logContext);
  95. this.sender = new Sender(logContext,
  96. client,
  97. this.metadata,
  98. this.accumulator,
  99. maxInflightRequests == 1,
  100. config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
  101. acks,
  102. retries,
  103. metricsRegistry.senderMetrics,
  104. Time.SYSTEM,
  105. this.requestTimeoutMs,
  106. config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
  107. this.transactionManager,
  108. apiVersions);
  109. String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
  110. this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
  111. this.ioThread.start();
  112. this.errors = this.metrics.sensor("errors");
  113. config.logUnused();
  114. AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
  115. log.debug("Kafka producer started");
  116. } catch (Throwable t) {
  117. // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
  118. close(0, TimeUnit.MILLISECONDS, true);
  119. // now propagate the exception
  120. throw new KafkaException("Failed to construct kafka producer", t);
  121. }
  122. }

1、数据预处理(拦截器、序列化、分区器、缓存)

① 生产者producer在拿到props后实例化KafkaProducer,然后多线程调用send(),KafkaProducer如果没有定义拦截器interceptors(ProducerInterceptors类的实例)数据record保持不变,若定义了interceptors就调用拦截器的ProducerInterceptors.onSend()方法过滤数据record,这个拦截器就是用来自定义的,源码里面没有过滤方法;

  1. public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  2. // intercept the record, which can be potentially modified; this method does not throw exceptions
  3. ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
  4. return doSend(interceptedRecord, callback);
  5. }

② 接下来调用doSend方法,方法体内首先调用partition()方法,入参是原始的record数据,以及key和value序列化结果;

  1. // 确认topic和集群信息正确
  2. ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
  3. Cluster cluster = clusterAndWaitTime.cluster;
  4. // 分区器
  5. int partition = partition(record, serializedKey, serializedValue, cluster);
  6. tp = new TopicPartition(record.topic(), partition);
  7. //如果没有指定分区,就使用内置的分区器partitioner.partition()
  8. private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
  9. Integer partition = record.partition();
  10. return partition != null ?
  11. partition :
  12. partitioner.partition(
  13. record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
  14. }

③ 接下来调用KafkaProducer类持有的RecordAccumulator对象的RecordAccumulator.append()方法,返回RecordAppendResult对象;

  1. RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
  2. serializedValue, headers, interceptCallback, remainingWaitMs);
  3. // append方法的实现,返回RecordAppendResult 对象
  4. public RecordAppendResult append(TopicPartition tp,
  5. long timestamp,
  6. byte[] key,
  7. byte[] value,
  8. Header[] headers,
  9. Callback callback,
  10. long maxTimeToBlock) throws InterruptedException {
  11. appendsInProgress.incrementAndGet();
  12. ByteBuffer buffer = null;
  13. if (headers == null) headers = Record.EMPTY_HEADERS;
  14. try {
  15. Deque<ProducerBatch> dq = getOrCreateDeque(tp);
  16. synchronized (dq) {
  17. if (closed)
  18. throw new IllegalStateException("Cannot send after the producer is closed.");
  19. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  20. if (appendResult != null)
  21. return appendResult;
  22. }
  23. byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
  24. int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
  25. log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
  26. buffer = free.allocate(size, maxTimeToBlock);
  27. synchronized (dq) {
  28. if (closed)
  29. throw new IllegalStateException("Cannot send after the producer is closed.");
  30. RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
  31. if (appendResult != null) {
  32. return appendResult;
  33. }
  34. MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  35. ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
  36. FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
  37. dq.addLast(batch);
  38. incomplete.add(batch);
  39. buffer = null;
  40. return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
  41. }
  42. } finally {
  43. if (buffer != null)
  44. free.deallocate(buffer);
  45. appendsInProgress.decrementAndGet();
  46. }
  47. }

④ 继续上面的代码,生产者本地维护一个未发送数据的缓存池,也是一个后台IO线程用来将records转换为网络请求,这就是RecordAccumulator,RecordAccumulator持有RecordAppendResult对象,其future作为整个producer.send()方法的返回值;

  1. public final static class RecordAppendResult {
  2. public final FutureRecordMetadata future;
  3. public final boolean batchIsFull;
  4. public final boolean newBatchCreated;
  5. public RecordAppendResult(FutureRecordMetadata future, boolean batchIsFull, boolean newBatchCreated) {
  6. this.future = future;
  7. this.batchIsFull = batchIsFull;
  8. this.newBatchCreated = newBatchCreated;
  9. }
  10. }

RecordAccumulator通过getOrCreateDeque(tp)得到deque队列(持有ProducerBatch对象),ProducerBatch是最小的发送数据实体,RecordAccumulator计算字节数并分配本地资源,不断往deque队列新增ProducerBatch对象;

Deque<ProducerBatch> dq = getOrCreateDeque(tp);

至此 KafkaProducer.send()方法的逻辑结束,也就是原始数据经过逻辑转换后放在本地的Deque队列中;

2、sender线程处理

在KafkaProducer实例化后sender也被实例化,KafkaProducer.send().doSend()会通过this.sender.wakeup()把线程方法启动,它持有一个NetworkClient实例,sender实例的run()方法包含对NetworkClient的处理逻辑,

  1. // 网络请求的构造器
  2. NetworkClient client = new NetworkClient(
  3. new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
  4. this.metrics, time, "producer", channelBuilder, logContext),
  5. this.metadata,
  6. clientId,
  7. maxInflightRequests,
  8. config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
  9. config.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
  10. config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
  11. config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
  12. this.requestTimeoutMs,
  13. time,
  14. true,
  15. apiVersions,
  16. throttleTimeSensor,
  17. logContext);
  18. // run方法中对于网络请求的逻辑
  19. void run(long now) {
  20. if (transactionManager != null) {
  21. try {
  22. if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
  23. // Check if the previous run expired batches which requires a reset of the producer state.
  24. transactionManager.resetProducerId();
  25. if (!transactionManager.isTransactional()) {
  26. // this is an idempotent producer, so make sure we have a producer id
  27. maybeWaitForProducerId();
  28. } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
  29. transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
  30. "some previously sent messages and can no longer retry them. It isn't safe to continue."));
  31. } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
  32. // as long as there are outstanding transactional requests, we simply wait for them to return
  33. client.poll(retryBackoffMs, now);
  34. return;
  35. }
  36. // do not continue sending if the transaction manager is in a failed state or if there
  37. // is no producer id (for the idempotent case).
  38. if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
  39. RuntimeException lastError = transactionManager.lastError();
  40. if (lastError != null)
  41. maybeAbortBatches(lastError);
  42. client.poll(retryBackoffMs, now);
  43. return;
  44. } else if (transactionManager.hasAbortableError()) {
  45. accumulator.abortUndrainedBatches(transactionManager.lastError());
  46. }
  47. } catch (AuthenticationException e) {
  48. // This is already logged as error, but propagated here to perform any clean ups.
  49. log.trace("Authentication exception while processing transactional

三、生产者demo使用和调试

源码编译运行后相当于本地搭建了kafka集群,在源码examples包下 producer类来了解数据发送流程,首先定义kafka提供的KafkaProducer类,再调用它的send()方法发送数据;很多工作是在KafkaProducer类实例化的时候已经做了;

  1. producer类需定义key和value、topic名称、同步或者异步,然后构造器指定kafka集群地址,生产者id(可选),序列化器

  2. producer线程类的执行方法(while死循环),判断是异步还是同步发送配置(高版本默认都异步),调用send方法发送数据,send方法的第1个参数是ProducerRecord,第2个是messageNo记录发送批次,第3个是数据record,DemoCallBack是回执类(不是函数)

  3. callback类有开始时间、自增的messageno、messageStr字符串三个参数,并重写onCompletion方法来定义异常;

 稍微修改这个实现类进行调试,集群是idea在运行的本地集群(127.0.0.1:9092),指定的topic是yezonggang,异步发送,producer.send()方法写在线程方法内调用,如下:

  1. package demo;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import java.util.Properties;
  5. import java.util.concurrent.ExecutionException;
  6. public class DemoForProducer extends Thread{
  7. public static void main(String[] args) {
  8. //System.out.println("hello");
  9. DemoForProducer dfp=new DemoForProducer("yezonggang",true);
  10. dfp.run();
  11. }
  12. private final KafkaProducer<Integer, String> producer;
  13. private final String topic;
  14. private final Boolean isAsync;
  15. public DemoForProducer(String topic, Boolean isAsync) {
  16. Properties props = new Properties();
  17. props.put("bootstrap.servers", "127.0.0.1:9092");
  18. props.put("client.id", "DemoForProducer");
  19. props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
  20. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  21. producer = new KafkaProducer<>(props);
  22. this.topic = topic;
  23. this.isAsync = isAsync;
  24. }
  25. public DemoForProducer(KafkaProducer<Integer, String> producer, String topic, Boolean isAsync) {
  26. this.producer = producer;
  27. this.topic = topic;
  28. this.isAsync = isAsync;
  29. }
  30. // 线程类的执行方法(while死循环),判断是异步还是同步发送配置(高版本默认都异步),调用send方法发送数据,send方法的第1个参数是ProducerRecord,第2个是messageNo记录发送批次,DemoCallBack是回执函数
  31. public void run() {
  32. int messageNo = 1;
  33. while (true) {
  34. String messageStr = "Message_" + messageNo;
  35. long startTime = System.currentTimeMillis();
  36. if (isAsync) { // Send asynchronously
  37. producer.send(new ProducerRecord<>(topic,
  38. messageNo,
  39. messageStr), new DemoForCallBack(startTime, messageNo, messageStr));
  40. } else { // Send synchronously
  41. try {
  42. producer.send(new ProducerRecord<>(topic,
  43. messageNo,
  44. messageStr)).get();
  45. System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
  46. } catch (InterruptedException | ExecutionException e) {
  47. e.printStackTrace();
  48. }
  49. }
  50. ++messageNo;
  51. }
  52. }
  53. }

KafkaProducer类实例化后,idea中运行的本地kafka集群就已经拿到了producerconfig设置,如下,client.id=DemoForProducer已经说明该生产者已经被kafka集群捕获,即使当前send()方法还未启动;

ProducerRecord(topic=yezonggang, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=1, value=Message_1, timestamp=null)

  1. INFO ProducerConfig values:
  2. acks = 1
  3. batch.size = 16384
  4. bootstrap.servers = [127.0.0.1:9092]
  5. buffer.memory = 33554432
  6. client.id = DemoForProducer
  7. compression.type = none
  8. connections.max.idle.ms = 540000
  9. enable.idempotence = false
  10. interceptor.classes = null
  11. key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
  12. linger.ms = 0
  13. max.block.ms = 60000
  14. max.in.flight.requests.per.connection = 5
  15. max.request.size = 1048576
  16. metadata.max.age.ms = 300000
  17. metric.reporters = []
  18. metrics.num.samples = 2
  19. metrics.recording.level = INFO
  20. metrics.sample.window.ms = 30000
  21. partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
  22. receive.buffer.bytes = 32768
  23. reconnect.backoff.max.ms = 1000
  24. reconnect.backoff.ms = 50
  25. request.timeout.ms = 30000
  26. retries = 0
  27. retry.backoff.ms = 100
  28. sasl.jaas.config = null
  29. sasl.kerberos.kinit.cmd = /usr/bin/kinit
  30. sasl.kerberos.min.time.before.relogin = 60000
  31. sasl.kerberos.service.name = null
  32. sasl.kerberos.ticket.renew.jitter = 0.05
  33. sasl.kerberos.ticket.renew.window.factor = 0.8
  34. sasl.mechanism = GSSAPI
  35. security.protocol = PLAINTEXT
  36. send.buffer.bytes = 131072
  37. ssl.cipher.suites = null
  38. ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
  39. ssl.endpoint.identification.algorithm = null
  40. ssl.key.password = null
  41. ssl.keymanager.algorithm = SunX509
  42. ssl.keystore.location = null
  43. ssl.keystore.password = null
  44. ssl.keystore.type = JKS
  45. ssl.protocol = TLS
  46. ssl.provider = null
  47. ssl.secure.random.implementation = null
  48. ssl.trustmanager.algorithm = PKIX
  49. ssl.truststore.location = null
  50. ssl.truststore.password = null
  51. ssl.truststore.type = JKS
  52. transaction.timeout.ms = 60000
  53. transactional.id = null
  54. value.serializer = class org.apache.kafka.common.serialization.StringSerializer(org.apache.kafka.clients.producer.ProducerConfig)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/618463
推荐阅读
相关标签
  

闽ICP备14008679号