- 拦截内容在内容前加时间戳
-
- import java.util.Map;
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- public class TimeInterceptor implements ProducerInterceptor<String, String> {
-
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
-
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- // 创建一个新的record,把时间戳写入消息体的最前部
- return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
- System.currentTimeMillis() + "," + record.value().toString());
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
-
- }
-
- @Override
- public void close() {
-
- }
- }
-
- 拦截内容计数
- import java.util.Map;
- import org.apache.kafka.clients.producer.ProducerInterceptor;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.clients.producer.RecordMetadata;
-
- public class CounterInterceptor implements ProducerInterceptor<String, String>{
- private int errorCounter = 0;
- private int successCounter = 0;
-
- @Override
- public void configure(Map<String, ?> configs) {
-
- }
-
- @Override
- public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
- return record;
- }
-
- @Override
- public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
- // 统计成功和失败的次数
- if (exception == null) {
- successCounter++;
- } else {
- errorCounter++;
- }
- }
-
- @Override
- public void close() {
- // 保存结果
- System.out.println("Successful sent: " + successCounter);
- System.out.println("Failed sent: " + errorCounter);
- }
- }
-
- 主程序
-
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
-
- public class InterceptorProducer {
-
- public static void main(String[] args) throws Exception {
- // 1 设置配置信息
- Properties props = new Properties();
- props.put("bootstrap.servers", "hadoop01:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- // 2 构建拦截链
- List<String> interceptors = new ArrayList<>();
- interceptors.add("TimeInterceptor"); interceptors.add("CounterInterceptor");
- props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
-
- String topic = "first";
- Producer<String, String> producer = new KafkaProducer<>(props);
-
- // 3 发送消息
- for (int i = 0; i < 10; i++) {
-
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
- producer.send(record);
- }
-
- // 4 一定要关闭producer,这样才会调用interceptor的close方法
- producer.close();
- }
- }
-
运行结果
API端
Linux端