当前位置:   article > 正文

Kafka应用场景_kafka producer close使用

kafka producer close使用

在学习一门新技术之前,我们需要先去了解一下这门技术的具体应用场景,使用它能够做什么,能够达到什么目的,学习kafka的初衷是用作消息队列;但是还可以使用Kafka Stream进行一些实时的流计算,多用于大数据处理;也可以做日志收集汇总、网站活动跟踪等任务。

消息队列

kafka可以很好的替代一些传统的消息系统,kafka具有更好的吞吐量,内置的分区使kafka具有更好的容错和伸缩性,这些特性使它可以替代传统的消息系统,成为大型消息处理应用的首选方案。

场景:异步、解耦、削峰填谷

  1. 生成订单:给不同的产品业务线分配同一个topic的不同partition,用户下单后根据订单类型发送到对应的partition
  2. 消息通知:用户登录后计算积分
  • 消息生产者

    1. public static void main(String[] args) throws Exception {
    2. Properties prop = new Properties();
    3. prop.put("bootstrap.servers", "127.0.0.1:9092");
    4. prop.put("acks", "all");
    5. prop.put("retries", "0");
    6. // 缓冲区大小
    7. prop.put("batch.size", "10");
    8. prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    9. prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    10. KafkaProducer<String, String> producer = new KafkaProducer<>(prop);
    11. for (int i = 0; i < 101; i++) {
    12. ProducerRecord<String, String> record = new ProducerRecord<>("my_topics", "value_" + i);
    13. // 阻塞到消息发送完成
    14. producer.send(record).get();
    15. }
    16. // 刷新缓冲区,发送到分区,并清空缓冲区
    17. // producer.flush();
    18. // 关闭生产者,会阻塞到缓冲区内的数据发送完
    19. producer.close();
    20. // producer.close(Duration.ofMillis(1000));
    21. }

    生产者发送消息是先将消息放到缓冲区,当缓冲区存满之后会自动flush,或者手动调用flush()方法

  • 消息消费者

    1. public static void main(String[] args) {
    2. Properties properties = new Properties();
    3. properties.put("bootstrap.servers", "127.0.0.1:9092");
    4. properties.put("group.id", "cc_consumer");
    5. properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    6. properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    7. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    8. // 指定topic
    9. consumer.subscribe(Arrays.asList("my_topics"));
    10. // 指定topic的partition
    11. // TopicPartition partition0 = new TopicPartition("my_topics", 10);
    12. // consumer.assign(Arrays.asList(partition0));
    13. try {
    14. while (true) {
    15. ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    16. for (ConsumerRecord<String, String> record : records) {
    17. System.out.println(record.toString());
    18. }
    19. }
    20. } finally {
    21. consumer.close(Duration.ofMillis(2000));
    22. }
    23. }

流计算

[todo]

日志收集

应用程序的日志可以通过log4j收集日志信息,并将日志直接打到kafka中:客户端—>应用—>kafka

SpringBoot中默认使用的是logback,所以要在引入SpringBoot的jar包时排除掉logback的jar包

日志消息发送有同步和异步两种方式,由KafkaAppender中的syncSend属性决定,默认为true(同步)

  1. > <Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false">
  2. >
  • pom.xml
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. <exclusions>
  5. <exclusion>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-logging</artifactId>
  8. </exclusion>
  9. </exclusions>
  10. </dependency>
  11. <!-- springboot 1.3.x之前版本是log4j,之后版本都是log4j2 -->
  12. <dependency>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-log4j2</artifactId>
  15. </dependency>
  • log4j2.xml
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <Configuration status="off">
  3. <Properties>
  4. </Properties>
  5. <Appenders>
  6. <Console name="STDOUT" target="SYSTEM_OUT">
  7. <PatternLayout pattern="%d %p %c{1.} %t %m%n"/>
  8. </Console>
  9. <!--kafka topic-->
  10. <Kafka name="KAFKA-LOGGER" topic="my_topics">
  11. <!--JsonLayout:日志格式为json,方便在ES中处理-->
  12. <JsonLayout/>
  13. <!--kafka server的ip:port-->
  14. <Property name="bootstrap.servers">127.0.0.1:9092</Property>
  15. <Property name="retries">3</Property>
  16. <Property name="linger.ms">1000</Property>
  17. <Property name="buffer.memory">10485760</Property>
  18. </Kafka>
  19. <Async name="ASYNC-KAFKA-LOGGER">
  20. <AppenderRef ref="KAFKA-LOGGER"/>
  21. <LinkedTransferQueue/>
  22. </Async>
  23. </Appenders>
  24. <Loggers>
  25. <!--日志级别大于info都会被记录到Kafka-->
  26. <Logger name="cc.kevinlu.springbootkafka.controller.MessageController" level="info"
  27. additivity="false">
  28. <AppenderRef ref="KAFKA-LOGGER"/>
  29. </Logger>
  30. <!-- Root表示所有Logger用Root中的Appender打印日志 -->
  31. <Root level="info">
  32. <AppenderRef ref="STDOUT"/>
  33. </Root>
  34. </Loggers>
  35. </Configuration>
  • code
  1. @GetMapping("/log")
  2. public String sendLog() {
  3. for (int i = 0; i < 10; i++) {
  4. log.info("kafka log i = " + i);
  5. }
  6. return "success";
  7. }
  • consumer视图

image-20200419032218971

网站活动跟踪

  1. 前端Nodejs控制

    Node接入kafka需要使用kafka-node库,下面是网上的例子

    1. var kafka = require('kafka-node'),
    2. Producer = kafka.Producer,
    3. client = new kafka.KafkaClient({kafkaHost: 'localhost:9092'});
    4. /**
    5. * 定义生产类
    6. * partitionerType 定义
    7. * 0:默认模式 只产生数据在第一个分区
    8. * 1:随机分配,在分区个数内,随机产生消息到各分区
    9. * 2:循环分配,在分区个数内,按顺序循环产生消息到各分区
    10. */
    11. var producerOption = {
    12. requireAcks: 1,
    13. ackTimeoutMs: 100,
    14. partitionerType: 0 //默认为第一个分区
    15. };
    16. var producer = new Producer(client,producerOption);
    17. /**
    18. * TOPIC的创建需要在命令行进行创建,以便指定分区个数以及备份个数
    19. * PS:kafka-node的创建topic不行,不能创建分区
    20. * 产生消息,如果不指定partition
    21. * 则根据 partitionerType 的值来指定发送数据到哪个分区
    22. * 我们创建的topic-test-one只有一个分区,所以只能产生数据到第1个分区(下标0),否则不会生产数据
    23. */
    24. function getPayloads(){
    25. return [
    26. {topic:"topic-test-one",messages:JSON.stringify({"name":"jack","age":"120"}),partition:0}
    27. ];
    28. }
    29. producer.on("ready",function(){
    30. setInterval(function(){
    31. producer.send(getPayloads(),function(err,data){
    32. if(!err){
    33. console.log("send message complete!data:"+JSON.stringify(data),new Date());
    34. }
    35. });
    36. },1000);
    37. });
    38. producer.on('error', function (err) {console.log("send message error!\r\n"+err);})
  2. 后端日志控制

    后端也可以使用log4j的日志系统来完成,拦截所有需要监控的api请求,使用log4j输出日志到kafka队列中,和上述日志收集方法相同。若同一个应用中需要通过日志输出到kafka的多个topic中,可以使用log4j的Marker标记来区分,配置如下:

    1. <?xml version="1.0" encoding="UTF-8"?>
    2. <Configuration status="off">
    3. <Properties>
    4. </Properties>
    5. <Appenders>
    6. <Console name="STDOUT" target="SYSTEM_OUT">
    7. <PatternLayout pattern="%d %p %c{1.} %t %m%n"/>
    8. </Console>
    9. <!-- 日志收集 -->
    10. <Kafka name="KAFKA-LOGGER" topic="cc_log_test" syncSend="false">
    11. <JsonLayout/>
    12. <Property name="bootstrap.servers">127.0.0.1:9092</Property>
    13. <Property name="retries">3</Property>
    14. <Property name="linger.ms">1000</Property>
    15. <Property name="buffer.memory">10485760</Property>
    16. <Filters>
    17. <!-- 通过Marker过滤消息 -->
    18. <MarkerFilter marker="Kafka" onMatch="ACCEPT" onMismatch="DENY"/>
    19. </Filters>
    20. </Kafka>
    21. <!-- 轨迹跟踪 -->
    22. <Kafka name="KAFKA-TRACK-LOGGER" topic="cc_test1" syncSend="false">
    23. <JsonLayout/>
    24. <Property name="bootstrap.servers">127.0.0.1:9092</Property>
    25. <Property name="retries">3</Property>
    26. <Property name="linger.ms">1000</Property>
    27. <Property name="buffer.memory">10485760</Property>
    28. <Filters>
    29. <!-- 通过Marker过滤消息 -->
    30. <MarkerFilter marker="Track" onMatch="ACCEPT" onMismatch="DENY"/>
    31. </Filters>
    32. </Kafka>
    33. <Async name="ASYNC-KAFKA-LOGGER">
    34. <AppenderRef ref="KAFKA-LOGGER"/>
    35. <AppenderRef ref="KAFKA-TRACK-LOGGER"/>
    36. <LinkedTransferQueue/>
    37. </Async>
    38. </Appenders>
    39. <Loggers>
    40. <Logger name="cc.kevinlu.springbootkafka.controller" level="info"
    41. additivity="false">
    42. <AppenderRef ref="KAFKA-LOGGER"/>
    43. <AppenderRef ref="KAFKA-TRACK-LOGGER"/>
    44. </Logger>
    45. <Root level="info">
    46. <AppenderRef ref="STDOUT"/>
    47. </Root>
    48. </Loggers>
    49. </Configuration>
    1. private final static Marker KAFKA_MARKER = MarkerManager.getMarker("Kafka");
    2. private final static Marker KAFKA_TRACK_MARKER = MarkerManager.getMarker("Track");
    3. @GetMapping("/log")
    4. public String sendLog() {
    5. // 轨迹跟踪
    6. log.info(KAFKA_TRACK_MARKER, "send async message!");
    7. for (int i = 0; i < 10; i++) {
    8. // 日志收集
    9. log.info(KAFKA_MARKER, "kafka log i = {}", i);
    10. }
    11. return "success";
    12. }
  3. 前端+后端组合

    后端提供API供前端传递轨迹,后端接收到请求之后将消息同步到kafka中。

声明:本文内容由网友自发贡献,转载请注明出处:【wpsshop】
推荐阅读
相关标签
  

闽ICP备14008679号