赞
踩
下载window的kafka地址
window的kafka只是为了方便学习
安装地址:kafka.apache.org/
解压zip为文件夹
kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。
zookeeper-server-start.bat ..\..\config\zookeeper.properties
//启动zookeeper
kafka-server-start.bat ..\..\config\server.properties
//启动kafka
先启动zookeeper在启动kafka
管理员方式打开cmd,启动zookeeper之后启动kafka
运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
另启动一个cmd窗口启动kafka
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。
**注意根据kafka的版本来决定操作命令 版本,操作系统window或Linux等都决定kafka的命令操作 **
运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181。
运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092。
和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic
# 创建topic
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qil0820
# 查询topic
kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
# 删除topic
kafka-topics.bat --delete --zookeeper localhost:2181 --topic qil0820
windows下的kafka命令操作
创建一个生产者,在订阅一个消费者来进行监听消费
kafka-console-producer.bat --broker-list localhost:9092 --topic wslKafkaTest
# 测试生产消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic wslKafkaTest --from-beginning
# 测试消息消费
Offset Explorer (kafkatool.com)
生产者向Topic中发送消息
消费者通过指定监听Topic来进行消费消息
<!--kafka的依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: wslKafkaTest // 设置默认的生产者消费者所属组id
@Api("Kafka生产者")
@RestController
public class KafkaProduceController {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;
@ApiOperation("测试kafka发送消息")
@PostMapping("/sendMessage")
public void sendMessage(String str) {
System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+str);
kafkaTemplate.send("wslKafkaTest",str);//使用send方法发送消息,需要传入topic名称
}
}
@Component
public class MessageListener {
@KafkaListener(topics = "wslKafkaTest")
public void onMessage(ConsumerRecord<String,String> record){
System.out.println("已完成短信发送业务(kafka),id:"+record.value());
}
}
spring: application: name: hello-kafka kafka: listener: #设置是否批量消费,默认 single(单条),batch(批量) type: single # 集群地址 bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # 生产者配置 producer: # 重试次数 retries: 3 # 应答级别 # acks=0 把消息发送到kafka就认为发送成功 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功 acks: all # 批量处理的最大大小 单位 byte batch-size: 4096 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka buffer-memory: 33554432 # 客户端ID client-id: hello-kafka # Key 序列化类 key-serializer: org.apache.kafka.common.serialization.StringSerializer # Value 序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer # 消息压缩:none、lz4、gzip、snappy,默认为 none。 compression-type: gzip properties: partitioner: #指定自定义分区器 class: top.zysite.hello.kafka.partitioner.MyPartitioner linger: # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka ms: 1000 max: block: # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms ms: 6000 # 消费者配置 consumer: # 默认消费者组 group-id: testGroup # 自动提交 offset 默认 true enable-auto-commit: false # 自动提交的频率 单位 ms auto-commit-interval: 1000 # 批量消费最大数量 max-poll-records: 100 # Key 反序列化类 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # Value 反序列化类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 当kafka中没有初始offset或offset超出范围时将自动重置offset # earliest:重置为分区中最小的offset # latest:重置为分区中最新的offset(消费分区中新产生的数据) # none:只要有一个分区不存在已提交的offset,就抛出异常 auto-offset-reset: latest properties: interceptor: classes: top.zysite.hello.kafka.interceptor.MyConsumerInterceptor session: timeout: # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作 ms: 120000 request: timeout: # 请求超时 ms: 120000
// 根据生产者工厂构建kafkaTemplate
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
return kafkaTemplate;
}
// 将一个生产者工厂注册到spring容器中
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
}
KafkaTemplate提供了几个发送消息的接口如下:
topic:指定要发送消息的topic名称
partition:向指定的partition发送消息
key:消息的key
data:消息的data
timestamp:时间信息,一般默认为当前时间
record:ProducerRecord结构,是对key和value的一层封装,直接发送key和value,也会在内部被封装成ProducerRecord然后再发送出去
message:包含消息头(topic、partition、字符集)等信息和消息的封装格式
// 向默认的topic发送消息
public ListenableFuture<SendResult<K, V>> sendDefault(V data);
public ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
public ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
// 向指定的topic发送消息
public ListenableFuture<SendResult<K, V>> send(String topic, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
public ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
public ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
public ListenableFuture<SendResult<K, V>> send(Message<?> message)
Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener
是SpringKafka
的消息监听器接口,也是一个函数式接口,利用接口的onMessage
方法可以实现消费数据。
public interface GenericMessageListener<T> { void onMessage(T data); default void onMessage(T data, @Nullable Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } }
基于此接口可以实现单条数据消息监听器接口MessageListenen
、多条数据消息监听器接口BatchMessageListener
、带ACK机制的消息监听器AcknowledgingMessageListener
和BatchAcknowledgingMessageListener
消息监听器MessageListener
是由消费监听器容器MessageListenerContainer
接口来承载,使用setupMessageListenner()
方法启动一个监听器。其中还有定义了操作消息的resume()
、pause()
等方法。
public interface MessageListenerContainer extends SmartLifecycle {
// 启动一个消息监听器
void setupMessageListener(Object messageListener);
// 获取消费者的指标信息
Map<String, Map<MetricName, ? extends Metric>> metrics();
}
spring-kafka提供了两个容器KafkaMessageListenerContainer
和ConcurrentMessageListenerContainer
。
消息监听器容器由容器工厂KafkaListenerContainerFactory
统一创建并管理
public interface KafkaListenerContainerFactory<C extends MessageListenerContainer> {
// 根据endpoint创建监听器容器
C createListenerContainer(KafkaListenerEndpoint endpoint);
// 根据topic、partition和offset的配置
C createContainer(TopicPartitionOffset... topicPartitions);
// 根据topic创建监听器容器
C createContainer(String... topics);
// 根据topic的正则表达式创建监听器容器
C createContainer(Pattern topicPattern);
}
spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory
,其有两个重要的配置
ContainerProperties
和ConsumerFactory
ContainerProperties
定义了要消费消息的topic
,消息处理的MessageListener
等信息。
因此要实现一个消息监听器的流程如下:
SpringKafka的消费者是由一个消费监听器容器ListenerConatiner
去承载的,容器对应一个配置文件为ContainerProperties
,ContainerProperties
继承自消费者配置类ConsumerProperties
,并且承载了消息监听器的设置
首先介绍非注解式的消息监听器,类似于ProducerFactory
,消费者需要创建一个ConsumerFactory
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
有了容器工厂之后,就可以通过注册bean
的方式生成一个MessageListenerContainer
@Bean
public KafkaMessageListenerContainer<String, String> kafkaMessageListenerContainer(
ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProperties = new ContainerProperties("numb");
containerProperties.setMessageListener(
(MessageListener<String, String>) data -> System.out.println("收到消息: " + data.value()));
return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
}
在这个kafkaMessageListenerContainer
中,通过ContainerProperties
配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer
注册到ConcurrentKafkaListenerContainerFactory
中,这样获取到数据后会自动调用消息监听器进行数据处理。
测试消费者消费数据
@Test public void test_send_and_consume() { ExecutorService threadPool = Executors.newCachedThreadPool(); threadPool.submit(() -> { while (true) { kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发送完成"); } }); while (true); }
输出:
发送完成
发送成功
收到消息: kv
发送完成
发送成功
收到消息: kv
之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory
之后,还需要用代码配置MessageListenerContainer
, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener
实现。
@Component @Slf4j public class MessageHandler { @KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb" // , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})} ) public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) { try { String message = (String) record.value(); log.info("收到消息: {}", message); } catch (Exception e) { log.error(e.getMessage(), e); } finally { // 手动提交 offset acknowledgment.acknowledge(); } } }
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。