赞
踩
模拟阻塞队列
public class BlockingQueueTest {
public static void main(String[] args) {
// 容量为10的阻塞队列
BlockingQueue queue = new ArrayBlockingQueue(10);
// 生产者线程
new Thread(new Producer(queue)).start();
// 消费者线程
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
生产者
// 生产者 class Producer implements Runnable { private BlockingQueue<Integer> queue; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { for (int i = 0; i < 100; i++) { Thread.sleep(20); queue.put(i); System.out.println(Thread.currentThread().getName() + "生产:" + queue.size()); } } catch (Exception e) { e.printStackTrace(); } } }
消费者
// 消费者 class Consumer implements Runnable { private BlockingQueue<Integer> queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { try { while (true) { Thread.sleep(new Random().nextInt(1000)); queue.take(); System.out.println(Thread.currentThread().getName() + "消费:" + queue.size()); } } catch (Exception e) { e.printStackTrace(); } } }
Broker:kafka中每一台服务器
Zookeeper:用来管理集群,可以单独使用,kafka中也有内置
Topic:主题,kafka采用消息订阅模式
Partition:分区
offset:分区内从索引
副本
Leader Replica:主副本
Follow Replica:从副本
官方地址:https://kafka.apache.org/downloads
下载完毕,直接解压即可
配置
修改zookeeper配置文件zookeeper.properties
dataDir=d:/attachment/kafka_2.13-3.2.0/data/zookeeper_logs
修改kafka配置文件server.properties
log.dirs=d:/attachment/kafka_2.13-3.2.0/data/kafka_logs
kafka基本命令
官方地址:https://kafka.apache.org/documentation/#quickstart
启动zookeeper
PS D:\attachment\kafka_2.13-3.2.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
启动kafka
PS D:\attachment\kafka_2.13-3.2.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
创建主题: test 一个副本 一个分区
PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
查看当前服务器主题
PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-topics.bat --list --bootstrap-server localhost:9092
test
创建生产者,向指定主题发消息
PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-console-producer.bat --broker-list localhost:9092 --topic test
>hello
>world
消费者
PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world
关闭zookeeper服务器
zookeeper-server-stop.bat
关闭kafka服务器
kafka-server-stop.bat
删除主题
在server.properties中增加设置,默认未开启
delete.topic.enable=true
删除主题命令
/bin/kafka-topics --delete --topic test --zookeeper localhost:2181
删除主题中的数据
如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//修改保留时间为三秒,但不是修改后三秒就马上删掉,kafka是采用轮训的方式,轮训到这个主题发现三秒前的数据都是删掉。时间由自己在server.properties里面设置,设置见下面。
server.properties中的数据保留时间配置
log.retention.hours=168 //保留时间,单位小时
log.retention.check.interval.ms=300000 //保留时间检查间隔,单位毫秒
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
# kafka
spring.kafka.bootstrap-servers=localhost:9092
# 消费组id
spring.kafka.consumer.group-id=test-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交频率(毫秒)
spring.kafka.consumer.auto-commit-interval=3000
不要忘记启动 zookeeper 和 kafka
@RunWith(SpringRunner.class) @SpringBootTest(classes = CommunityApplication.class) public class KafkaTest { @Autowired private KafkaProducer kafkaProducer; @Test public void testKafka(){ kafkaProducer.sendMessage("test","你好"); kafkaProducer.sendMessage("test","在吗"); try { Thread.sleep(1000*10); } catch (InterruptedException e) { e.printStackTrace(); } } } @Component class KafkaProducer{ @Autowired private KafkaTemplate kafkaTemplate; public void sendMessage(String topic,String content){ kafkaTemplate.send(topic,content
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。