当前位置:   article > 正文

第五章 Kafka 构建TB级异步消息系统_使用kafaka发异步通知

使用kafaka发异步通知

第五章 Kafka 构建TB级异步消息系统

1、阻塞队列

在这里插入图片描述

模拟阻塞队列

public class BlockingQueueTest {
   
    public static void main(String[] args) {
   
        // 容量为10的阻塞队列
        BlockingQueue queue = new ArrayBlockingQueue(10);
        // 生产者线程
        new Thread(new Producer(queue)).start();
        // 消费者线程
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();

    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

生产者

// 生产者
class Producer implements Runnable {
   

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue queue) {
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
        try {
   
            for (int i = 0; i < 100; i++) {
   
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (Exception e) {
   
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费者

// 消费者
class Consumer implements Runnable {
   

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue queue) {
   
        this.queue = queue;
    }

    @Override
    public void run() {
   
        try {
   
            while (true) {
   
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
   
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

2、kafka入门

在这里插入图片描述

Broker:kafka中每一台服务器
Zookeeper:用来管理集群,可以单独使用,kafka中也有内置

Topic:主题,kafka采用消息订阅模式
Partition:分区
offset:分区内从索引

副本
Leader Replica:主副本
Follow Replica:从副本

1. 下载

官方地址:https://kafka.apache.org/downloads
在这里插入图片描述
下载完毕,直接解压即可

配置

修改zookeeper配置文件zookeeper.properties

dataDir=d:/attachment/kafka_2.13-3.2.0/data/zookeeper_logs
  • 1

修改kafka配置文件server.properties

log.dirs=d:/attachment/kafka_2.13-3.2.0/data/kafka_logs
  • 1

kafka基本命令

官方地址:https://kafka.apache.org/documentation/#quickstart

启动zookeeper

PS D:\attachment\kafka_2.13-3.2.0> .\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  • 1

启动kafka

PS D:\attachment\kafka_2.13-3.2.0> .\bin\windows\kafka-server-start.bat .\config\server.properties
  • 1

创建主题: test 一个副本 一个分区

PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1  --topic test
  • 1

查看当前服务器主题

PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-topics.bat --list  --bootstrap-server localhost:9092
test
  • 1
  • 2

创建生产者,向指定主题发消息

PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-console-producer.bat --broker-list localhost:9092 --topic test
>hello
>world
  • 1
  • 2
  • 3

消费者

PS D:\attachment\kafka_2.13-3.2.0\bin\windows> .\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
hello
world
  • 1
  • 2
  • 3

关闭zookeeper服务器

zookeeper-server-stop.bat
  • 1

关闭kafka服务器

kafka-server-stop.bat
  • 1

删除主题

在server.properties中增加设置,默认未开启

delete.topic.enable=true
  • 1

删除主题命令

/bin/kafka-topics --delete --topic test --zookeeper localhost:2181
  • 1

删除主题中的数据

如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现

bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//修改保留时间为三秒,但不是修改后三秒就马上删掉,kafka是采用轮训的方式,轮训到这个主题发现三秒前的数据都是删掉。时间由自己在server.properties里面设置,设置见下面。
  • 1
  • 2

server.properties中的数据保留时间配置

log.retention.hours=168  //保留时间,单位小时
log.retention.check.interval.ms=300000 //保留时间检查间隔,单位毫秒
  • 1
  • 2

3、Spring 整合Kafka

在这里插入图片描述

1.引入依赖

<!--kafka-->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.配置kafka

# kafka
spring.kafka.bootstrap-servers=localhost:9092
# 消费组id
spring.kafka.consumer.group-id=test-consumer-group
# 是否自动提交
spring.kafka.consumer.enable-auto-commit=true
# 自动提交频率(毫秒)
spring.kafka.consumer.auto-commit-interval=3000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

3.访问kafka

不要忘记启动 zookeeper 和 kafka

@RunWith(SpringRunner.class)
@SpringBootTest(classes = CommunityApplication.class)
public class KafkaTest {
   
    @Autowired
    private KafkaProducer kafkaProducer;


    @Test
    public void testKafka(){
   
        kafkaProducer.sendMessage("test","你好");
        kafkaProducer.sendMessage("test","在吗");

        try {
   
            Thread.sleep(1000*10);
        } catch (InterruptedException e) {
   
            e.printStackTrace();
        }

    }
}

@Component
class KafkaProducer{
   
    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic,String content){
   
        kafkaTemplate.send(topic,content
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/572903
推荐阅读
相关标签
  

闽ICP备14008679号