当前位置:   article > 正文

Kafka 消息中间件_卡夫卡中间件

卡夫卡中间件

一消息中间件

消息中间件是在消息的传输过程中保存消息的容器。消息中间件在将消息从消息生产者到消费者时充当中间人的作用。队列的主要目的是提供路由并保证消息的传送; 如果发送消息时接收者不可用,消息对列会保留消息,直到可以成功地传递它为止,当然,消息队列保存消息也是有期限的。

二消息中间件特点

  • 1)解耦:

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  • 2)冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  • 3)扩展性:

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

  • 4)消峰

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 5)可恢复性:

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 6)顺序保证:

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性)

  • 7)缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

  • 8)异步通信:

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

消息模型

在JMS标准中,有两种消息模型点对点(Point to Point),发布/订阅(Pub/Sub)。

P2P模式

P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

Pub/sub模式

包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber)。消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。

Kafka是一种分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础,具有高水平扩展和高吞吐量。目前越来越多的开源分布式处理系统如Apache flume、Apache Storm、Spark、Elasticsearch都支持与Kafka集成。 产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。

首先打个比方,kafka好比就是电视台,而电视台下面有很多节目,生产者就是制作节目的团队,而消费者就是我们观看这个节目的人,一开始在zookeeper创建一个节目,假设就叫cctv1,有了这个节目名后,我们就得请一个团队来填充这个节目,比如放电视剧之类的数据,而我们消费者要观看这个节目的话就得需要zookeeper来授权给我们。电视台则只是存数据的,相当于一个中间人,和现在中介差不多个意思

kafka基本概念

  • producer:生产者,发布消息到 kafka 集群的终端或服务。
  • consumer:消费者,从 kafka 集群中消费消息的终端或服务。
  • topic: 消息以topic为类别记录,每一类的消息称之为一个主题(Topic),可以理解为一个队列
  • broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic), 并从Broker拉数据,从而消费这些已发布的消息。
  • Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Segment:partition物理上由多个segment组成,每个segment file对应两个文件,分别是以.log结尾的数据文件和以.index结尾的索引文件。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Consumer Group 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
  • Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka。
消息发送的流程:
  1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
  2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
  3. Consumer从kafka集群pull数据,并控制获取消息的offset

Broker保存消息

存储方式
物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

  1. [atguigu@hadoop102 logs]$ ll
  2. drwxrwxr-x. 2 atguigu atguigu 4096 86 14:37 first-0
  3. drwxrwxr-x. 2 atguigu atguigu 4096 86 14:35 first-1
  4. drwxrwxr-x. 2 atguigu atguigu 4096 86 14:37 first-2

 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中如下:

  1. [atguigu@hadoop102 logs]$ cd first-0
  2. [atguigu@hadoop102 first-0]$ ll
  3. -rw-rw-r--. 1 atguigu atguigu 10485760 86 14:33 00000000000000000000.index
  4. -rw-rw-r--. 1 atguigu atguigu 219 86 15:07 00000000000000000000.log
  5. -rw-rw-r--. 1 atguigu atguigu 10485756 86 14:33 00000000000000000000.timeindex
  6. -rw-rw-r--. 1 atguigu atguigu 8 86 14:37 leader-epoch-checkpoint

存储策略
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:

  1. 基于时间:log.retention.hours=168
  2. 基于文件大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

Kafka消息发送方式

Kafka消息发送分同步(sync)、异步(async)两种方式
默认是使用同步方式,可通过producer.type属性进行配置;
Kafka保证消息被安全生产,有三个选项分别是0,1,-1
通过request.required.acks属性进行配置:
0代表:不进行消息接收是否成功的确认(默认值);
1代表:当Leader副本接收成功后,返回接收成功确认信息;
-1代表:当Leader和Follower副本都接收成功后,返回接收成功确认信息;
六种发送场景消息丢失的场景
网络异常
acks设置为0时,不和Kafka集群进行消息接受确认,当网络发生异常等情况时,存在消息丢失的可能;
客户端异常
异步发送时,消息并没有直接发送至Kafka集群,而是在Client端按一定规则缓存并批量发送。在这期间,如果客户端发生死机等情况,都会导致消息的丢失;
缓冲区满了
异步发送时,Client端缓存的消息超出了缓冲池的大小,也存在消息丢失的可能;
Leader副本异常
acks设置为1时,Leader副本接收成功,Kafka集群就返回成功确认信息,而Follower副本可能还在同步。这时Leader副本突然出现异常,新Leader副本(原Follower副本)未能和其保持一致,就会出现消息丢失的情况;
以上就是消息丢失的几种情况,在日常应用中,我们需要结合自身的应用场景来选择不同的配置。
想要更高的吞吐量就设置:异步、ack=0;想要不丢失消息数据就选:同步、ack=-1策略

消息接收的三种模式

kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。

  1. 最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。
  2. 最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。
  3. 正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。

kafka的备份策略

Kafka的备份的单元是partition,也就是每个partition都会有leader partiton和follow partiton。其中leader partition是用来进行和producer进行写交互,follow从leader副本进行拉数据进行同步,从而保证数据的冗余,防止数据丢失的目的。当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息

Kafka的应用场景:

1、消息队列

比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。

2、行为跟踪

Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3、元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4、日志收集

日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5、流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。

6、事件源

事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7、持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

安装Kafka

一. 首先确认下jdk有没有安装

java -version
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:

https://www.oracle.com/technetwork/java/javase/downloads/index.html 

到这个地址下载jdk11版本,我下载的是jdk-11.0.1_linux-x64_bin.tar.gz,然后解压到/usr/local/jdk/下。然后打开/etc/profile文件

vim /etc/profile
把下面这段代码写到文件里
 
  1. export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73
  2. export CLASSPATH=.:$JAVA_HOME/lib
  3. #export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
  4. export PATH=$JAVA_HOME/bin:$PATH
最后让jdk就生效
source /etc/profile
可以使用 java -version验证下
出现问题Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=gasp解决方法
1. 打开配置文件"/etc/profile". 2. 在最后一行加上"unset _JAVA_OPTIONS".

安装Kafka解压即可

  1. tar -xzf kafka_2.12-1.1.1.tgz
  2. cd kafka_2.12-1.1.0

启动Zookeeper server

[root@apple kafka_2.12-1.1.0]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

启动Kafka server 

[root@apple kafka_2.12-1.1.0]# sh bin/kafka-server-start.sh config/server.properties &

下面(a) (b)可以测试是否安装成功!安装成功后可以通过代码实现生产者和消费者,不需要启动。

(a)运行生产者producer,并监听topic的test

[root@apple kafka_2.12-1.1.0]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(b)运行消费者consumer,并监听topic的test
 
  1. #old version <0.9
  2. [root@apple kafka_2.12-1.1.0]# sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

这样,在producer那边输入内容,consumer马上就能接收到。

当有跨机的producer或consumer连接时需要配置config/server.properties的advertised.listeners如果没有设置,就使用 listeners,让注册到ZK上的IP是外网IP。新版配置

advertised.listeners=PLAINTEXT://59.64.11.22:9092 
停止kafka命令 
[root@apple kafka_2.12-1.1.0]# bin/kafka-server-stop.sh

PHP 使用kafka需要装扩展

1安装kafka的扩展之前,在安装php-rdkafka之前,需要先安装librdkafka

  1. git clone https://github.com/edenhill/librdkafka.git
  2. cd librdkafka
  3. ./configure
  4. make && make install

2安装rdkafka

  1. git clone https://github.com/arnaud-lb/php-rdkafka.git
  2. cd php-rdkafka
  3. phpize
  4. ./configure --with-php-config=/usr/local/php/bin/php-config ###你安装的php下的php-config路径
  5. make && make install

 3、在php.ini中添加行 ###在php下面的/etc/php.ini 编辑

extension=rdkafka.so

 4、重启php后访问php测试页面验证:

注意事项:如果你的服务器上存在多个版本的php,编译的时候要将 –with-php-config 指定到目标PHP 版本的安装目录。

高级API的特点

优点
● 高级API写起来简单
● 不需要去自行去管理offset,系统通过zookeeper自行管理
● 不需要管理分区,副本等情况,系统自动管理
● 消费者断线会自动根据上一次记录在 zookeeper中的offset去接着获取数据(默认设置5s更新一下 zookeeper 中存的的offset),版本为0.10.2
● 可以使用group来区分对访问同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
缺点
● 不能自行控制 offset(对于某些特殊需求来说)
● 不能细化控制如分区、副本、zk 等

低级API的特点

优点
● 能够开发者自己控制offset,想从哪里读取就从哪里读取。
● 自行控制连接分区,对分区自定义进行负载均衡
● 对 zookeeper 的依赖性降低(如:offset 不一定非要靠 zk 存储,自行存储offset 即可,比如存在文件或者内存中)
缺点
● 太过复杂,需要自行控制 offset,连接哪个分区,找到分区 leader 等

根据不同的业务需求,编写生产者消费者,需要选择不同的API接口代码


发送消息:在基于Kafka的消息中,仅仅支持部分简单的类型如:String,Integer。但通常使用中,需要传递到复杂对象,数组,队列等,可以使用JSON可以很容易的转换为String

  1. <?php
  2. try {
  3. $rcf = new RdKafka\Conf();
  4. // 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于这个topic分区的数量是没有意义的。
  5. $rcf->set('group.id', 'test');
  6. $cf = new RdKafka\TopicConf();
  7. $cf->set('offset.store.method', 'broker');
  8. //当没有初始偏移量时,从哪里开始读取
  9. $cf->set('auto.offset.reset', 'smallest');
  10. $rk = new RdKafka\Producer($rcf);
  11. $rk->setLogLevel(LOG_DEBUG);
  12. $rk->addBrokers("127.0.0.1");
  13. $topic = $rk->newTopic("ordersMq", $cf);
  14. for ($i = 0; $i < 1000; $i++) {
  15. $message = "test kafka message" . $i;
  16. $topic->produce(0, 0, $message);
  17. }
  18. } catch (Exception $e) {
  19. echo $e->getMessage();
  20. }

接收消息

  1. <?php
  2. try {
  3. $rcf = new RdKafka\Conf();
  4. $rcf->set('group.id', 'test');
  5. $cf = new RdKafka\TopicConf();
  6. //$cf->set('offset.store.method', 'file');
  7. $cf->set('auto.offset.reset', 'smallest');
  8. $cf->set('auto.commit.enable', true);
  9. $rk = new RdKafka\Consumer($rcf);
  10. $rk->setLogLevel(LOG_DEBUG);
  11. // 指定 broker 地址,多个地址用"," 分割
  12. $rk->addBrokers("127.0.0.1");
  13. $topic = $rk->newTopic("ordersMq", $cf);
  14. //$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
  15. while (true) {
  16. $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
  17. // 第一个参数是分区号
  18. // 第二个参数是超时时间
  19. $msg = $topic->consume(0, 1000);
  20. // var_dump($msg);
  21. if ($msg->err) {
  22. echo $msg->errstr(), "\n";
  23. break;
  24. } else {
  25. echo $msg->payload, "\n";
  26. }
  27. $topic->consumeStop(0);
  28. sleep(1);
  29. }
  30. } catch (Exception $e) {
  31. echo $e->getMessage();
  32. }

运行消费者

php consumer.php

rabbitmq kafka 实际场景选择

mq是适合业务中间件 kafka是数据中间件。在实际生产应用中,通常会使用kafka作为消息传输的数据管道,rabbitmq作为交易数据作为数据传输管道,主要的取舍因素则是是否存在丢数据的可能;rabbitmq在金融场景中经常使用,具有较高的严谨性,数据丢失的可能性更小,同事具备更高的实时性;而kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但从严谨性角度来讲,大不如rabbitmq;而且由于kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况;

Java Spring整合Kafka

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.7.2</version>
  5. </dependency>

配置类

  1. import org.apache.kafka.clients.consumer.ConsumerConfig;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringDeserializer;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. import org.springframework.kafka.annotation.EnableKafka;
  9. import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
  10. import org.springframework.kafka.core.ConsumerFactory;
  11. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  12. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  13. import org.springframework.kafka.core.KafkaTemplate;
  14. import org.springframework.kafka.core.ProducerFactory;
  15. import org.springframework.kafka.listener.ContainerProperties;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. /**
  19. * Kafka的配置类
  20. */
  21. @Configuration
  22. @EnableKafka
  23. public class KafkaConfig {
  24. @Value("${spring.kafka.bootstrap.servers}")
  25. private String bootstrapServers;
  26. @Value("${spring.kafka.group.id}")
  27. private String groupId;
  28. @Value("${spring.kafka.retries}")
  29. private String retries;
  30. /**
  31. * kafka消息监听器容器的工厂类
  32. *
  33. * @return
  34. */
  35. @Bean
  36. ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
  37. ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  38. // 3个KafkaMessageListenerContainer并发监听
  39. factory.setConcurrency(3);
  40. // 消费者工厂
  41. factory.setConsumerFactory(consumerFactory());
  42. ContainerProperties containerProperties = factory.getContainerProperties();
  43. // 当Acknowledgment.acknowledge()方法被调用即提交offset
  44. containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  45. // 调用commitAsync()异步提交
  46. containerProperties.setSyncCommits(false);
  47. return factory;
  48. }
  49. /**
  50. * 消费者工厂
  51. *
  52. * @return
  53. */
  54. @Bean
  55. public ConsumerFactory<Integer, String> consumerFactory() {
  56. return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  57. }
  58. /**
  59. * 消费者拉取消息配置
  60. *
  61. * @return
  62. */
  63. @Bean
  64. public Map<String, Object> consumerConfigs() {
  65. Map<String, Object> props = new HashMap<>(16);
  66. // kafka集群地址
  67. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  68. // groupId
  69. props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  70. // 开启自动提交
  71. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  72. // 自动提交offset到zk的时间间隔,时间单位是毫秒
  73. // props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
  74. // session超时设置,15秒,超过这个时间会认为此消费者挂掉,将其从消费组中移除
  75. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  76. //键的反序列化方式,key表示分区
  77. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  78. //值的反序列化方式
  79. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  80. return props;
  81. }
  82. /**
  83. * 生产者工厂
  84. *
  85. * @return
  86. */
  87. @Bean
  88. public ProducerFactory<String, String> producerFactory() {
  89. return new DefaultKafkaProducerFactory<>(producerConfigs());
  90. }
  91. /**
  92. * 生产者发送消息配置
  93. *
  94. * @return
  95. */
  96. @Bean
  97. public Map<String, Object> producerConfigs() {
  98. Map<String, Object> props = new HashMap<>();
  99. // kafka集群地址
  100. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  101. // 消息发送确认方式
  102. props.put(ProducerConfig.ACKS_CONFIG, "1");
  103. // 消息发送重试次数
  104. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  105. // 重试间隔时间
  106. props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "1000");
  107. // 控制批处理大小,单位为字节
  108. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  109. // 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
  110. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  111. // 生产者可以使用的总内存字节来缓冲等待发送到服务器的记录
  112. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  113. //键的反序列化方式,key表示分区
  114. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  115. //值的反序列化方式
  116. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  117. return props;
  118. }
  119. /**
  120. * Kafka模版类,用来发送消息
  121. *
  122. * @return
  123. */
  124. @Bean
  125. public KafkaTemplate<String, String> kafkaTemplate() {
  126. return new KafkaTemplate<String, String>(producerFactory());
  127. }
  128. }

消息发布

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.kafka.support.SendResult;
  6. import org.springframework.util.concurrent.ListenableFuture;
  7. @Slf4j
  8. public class KafkaProducer {
  9. @Autowired
  10. private KafkaTemplate kafkaTemplate;
  11. @Value("${spring.kafka.bootstrap.topic}")
  12. private String topic;
  13. public void sendMessage(String key, String data) {
  14. try {
  15. log.info("create ag offline,key:{},data: {}", key, data);
  16. ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, data);
  17. //future.addCallback(success -> log.info("发送消息成功!"), failure -> log.error("发送消息失败!失败原因是:{}", failure.getMessage()));
  18. } catch (Exception e) {
  19. log.error("KafkaProducerService sendMessage: {},e:{}", data, e);
  20. }
  21. }
  22. }

 监听消息消费

  1. @Slf4j
  2. @Component
  3. public class CustomKafkaListener /**implements MessageListener<String,String>*/ {
  4. @KafkaListener(topics = {KafkaConfig.TOPIC_NAME},id = KafkaConfig.TOPIC_NAME)
  5. public void onMessage1(String msg){
  6. log.info("onMessage1消费kafka消息 {} ",msg);
  7. }
  8. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/289130
推荐阅读
相关标签
  

闽ICP备14008679号