赞
踩
解决线程通信的问题
阻塞的方法有put、take
生产的数据放在队列当中,当队列满的时候生产者线程就被阻塞住(什么都不做,不会消耗资源),等待消费者线程进行消费
消费线程从队列当中拿数据进行消费,当数据非消费完了的时候,消费者线程被阻塞住,等待生产者线程进行生产
阻塞队列的作用就是用来在两个线程之间,避免cpu资源的消耗,能够提高系统的性能。
/**
* @Description: 阻塞队列测试
* @Author:啵啵啵啵啵啵唧~~~
* @Date:2022/5/28
*/
public class BlockingQueueTests {
public static void main(String[] args) {
//实例化一个阻塞队列给生产则和消费者共同使用
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
//多开几个消费者线程进行消费
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
/**
* 生产者
*/
class Producer implements Runnable {
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
//模拟生产者生产数据,隔20ms放一个数据
try {
for (int i = 0; i < 100; i++) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 消费者
*/
class Consumer implements Runnable {
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
//模拟消费者,消费者是一直进行消费
try {
while (true) {
//消费者消费给他模拟的时间是随机的0~1000,所以说消费者消费的时间大概率大于生产者生产的时间
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName() + "消费了:" + queue.size());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Kafka简介
Kafka特点(性能最好的消息队列)
Kafka术语
下载Kafka解压之后进行一些配置
启动zookeeper(切换到kafka目录下)
bin\windows\zookeeper-server-start.bat config\zookeeper.properties
bin\windows\kafka-server-start.bat config\server.properties
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
kafka-topics.bat --list --bootstrap-server localhost:9092
kafka-console-producer.bat --broker-list localhost:9092 --topic test
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
zookeeper-server-stop.bat
kafka-server-stop.bat
导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
配置kafka
# Kafka配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
#配置是否自动提交消费者的偏移量
spring.kafka.consumer.enable-auto-commit=true
#配置自动提交的频率
spring.kafka.consumer.auto-commit-interval=3000
访问kfka
生产者
消费者
public void handleMessage(ConsumerRecord record) {}
测试一下(记得启动kafak和zookeeper)
/**
* @Description:
* @Author:啵啵啵啵啵啵唧~~~
* @Date:2022/5/28
*/
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
/**
* 测试生产者生产者消息消费者是否能够消费掉
*/
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 生产者发消息是主动去发送的
*/
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送消息
*
* @param topic 消息主题(分区)
* @param content 消息的内容
*/
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
/**
* 消费者消费消息是被动去消费的
*/
@Component
class KafkaConsumer {
//绑定需要监听的主题,只要有消息就调用handleMessage方法进行消费,没有消息就阻塞住
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。