赞
踩
消息队列, 由scala 和java 编写, 是一种高吞吐量的分布式发布与订阅消息系统
消息队列简称: MQ
Message: 多台设备生产通信的数据名称, 可以是视频, 文本, 音频等等
Queue: 一种特殊的线性表, 满足先进先出原则
p2p: peer to peer
pub/sub: 发布与订阅
共同点:
消息生产者将消息生产到队列, 消费组从队列中消费消息
不同点:
p2p: 一个生产者的消息只能被一个消费组消费, 如: 打电话
pub/sub: 每个消息都可以有多个消费者, 消息存在队列, 使用偏移量方式来管理消息
存储能力, 容器一般是队列
消息入列, 生产
消费出列, 消费
rabbitmq: erlang 编写, 支持负载均衡, 数据持久化, pub/sub/p2p
redis: kv 的nosql 缓存数据库, 也支持pub/sub, 对于短消息(小于10kb)的性能高于 rabbitmq
zeromq: 轻量级的mq, p2p
activemq: jms 实现, p2p, 持久化, 分布式事务
kafka/jafka: 高性能跨语言分布式事务, 基于发布-订阅的全分布式支持持久化, 可以离线或者实时处理数据
racketmq: 纯java 实现, 发布-订阅, 本地事务和分布式事务
1. rabbitmq: erlang 编写, 支持负载均衡, 数据持久化, pub/sub/p2p
2. zeromq: 轻量级的mq, p2p
3. activemq: jms 实现, p2p, 持久化, 分布式事务
4. kafka/jafka: 高性能跨语言分布式事务, 基于发布-订阅的全分布式支持持久化, 可以离线或者实时处理数据
5. racketmq: 纯java 实现, 发布-订阅, 本地事务和分布式事务
1. topic: 主题, kafka 处理的消息分为不同的分类, 按照主题来划分
2. broker: 消息服务器的代理, kakfa 集群中的一个节点一般称为 broket, 主要来存储消息, 存在硬盘中
3. partition: 分区, topic 的物理上的分组, 一个topic 在broket上被分为一个或多个partition,
分区在创建主题时指定
4. message: 消息, 通信的基本单元, 每个消息属于某一个 partition
5. producer: 生产者, 消息和数据都是由这个组件产生, 由它发送到kafka 集群中
6. consumer: 消费者, 消息和数据都是由这个组件来消费
7. zookeeper: zk做分布式协调, 在kafka 2.8 后不在必须依赖
下载地址: https://kafka.apache.org/downloads 这里选择 2.13, 下载的版本是 3.x
官方下载慢, 提供阿里镜像地址和阿里云网盘下载地址, 已经放到文末
解压到指定目录即可, 下载的是 tgz 文件 :
linux: tar -zvxf kafka-3.6.0.tgz -C /home/kafka (这里解压到 /home/kafka 目录)
windows: 使用解压工具即可, 同时使用 windows 安装
bin: 默认存放linux 系统的一些脚本
windows: 存放pc 系统的脚步
config: kafka 配置文件存放目录
kraft: kraft 模式的配置文件
libs: 依赖目录
licenses: 许可证存放
logs: 服务日志目录
site-docs: kafka 文档
1. 修改config/zookeeper.properties, 可以自定义端口号, 一般默认即可
clientPort: 端口号
2. 修改config/server.properties,
listener: plaintext://[ip]:9092 , 可以自定义ip
advertised.listener:plaintext://ip:9092, 这个后面不在本地部署时需要配置
zookeeper.connect: ip:2181 , 修改为zookeeper.properties 中的端口
# 本地安装可使用默认, 如果存在端口占用, 修改端口即可, 其他默认即可
这里因为系统是 windows , 所以相关命令在 bin/windows 下面, 在安装目录下打开 cmd
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\window\kafka-server-start.bat config\server.properties
bin\windows\kafka-topics.bat --create --if-not-exists --topic test --bootstrap-server localhost:9092
# 查看主题列表
bin\windows\kafka-topics.bat --list --bootstrap-server locahost:9092
# --if-not-exists: 当创建的 test 主题不存在时才创建
#生产者:
bin\windows\kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
#消费者:
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
生产者-发送消息
消费者-消费消息
#创建随机uuid
bin\windows\kafka-storage random
bin\windows\kafka-storage.bat format -t nkA_YUlRQEmIdLk4dr35xA <uuid> -c config\kraft\server.properties
bin\windows\kafka-server-start.bat config\kraft\server.properties
生产者和消费者和原来一样使用
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.1</version>
</dependency>
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.Scanner;
public class KafkaProducerDemo {
public static void main(String[] args) throws InterruptedException {
Scanner sc = new Scanner(System.in);
Producer<String, Object> producer = createProducer();
String mess = null;
boolean flag = true;
while (flag) {
System.out.print("生产消息(输入exit 退出): ");
mess = sc.nextLine();
if (mess != null && (flag = (!"exit".equals(mess)))) {
producer.send(new ProducerRecord<>("test", mess));
Thread.sleep(1000);
System.out.println("send message success....");
}
}
}
/**
* 创建生产者
*
* @return
*/
public static Producer<String, Object> createProducer() {
// 使用生产者配置
Properties properties = buildProducerProperties();
// 创建生产者对象
KafkaProducer<String, Object> producer = new KafkaProducer<>(properties);
return producer;
}
/**
* 构建生产者配置
*
* @return
*/
public static Properties buildProducerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("acks", "all");
properties.put("retries", 0);
properties.put("batch.size", 16384);
properties.put("linger.ms", 1);
properties.put("buffer.memory", 33554432);
properties.put("key.serializer", StringSerializer.class.getName());
properties.put("value.serializer", StringSerializer.class.getName());
return properties;
}
}
package org.example;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
//创建消费者, 并消费消息
Consumer<String, Object> consumer = createConsumer();
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(100);
for (ConsumerRecord<String, Object> record : records) {
System.out.println("消费消息: " + record.value());
}
}
}
/**
* 创建消费者
*
* @return
*/
public static Consumer<String, Object> createConsumer() {
// 使用消费者配置
Properties properties = buildConsumerProperties();
// 创建消费者对象
Consumer<String, Object> consumer = new KafkaConsumer<>(properties);
// 订阅 test 主题
consumer.subscribe(Arrays.asList("test"));
// 返回消费者
return consumer;
}
/**
* 构建消费者配置
*
* @return
*/
public static Properties buildConsumerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("group.id", "test");
properties.put("enable.auto.commit", false);
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", StringDeserializer.class.getName());
properties.put("value.deserializer", StringDeserializer.class.getName());
return properties;
}
}
springboot 集成 kafka 以及自定义注解实现消息消费可参考: Springboot 集成 Kafka
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。