赞
踩
【问题】消息队列的使用场景有很多,最常见的使用场景有以下几个(基本作用)。
- 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
异步处理:当一个应用程序需要处理大量的请求时,使用消息队列可以将请求发送到队列中,并异步地处理它们。这样可以避免应用程序阻塞,提高处理速度。
- 系统解耦:使用消息队列可以将不同的模块解耦,各模块之间可以通过消息队列传递消息。这样当其中一个模块发生变化时,其他模块不会受到影响,提高系统的可维护性和可扩展性。
数据缓存:当一个应用程序需要读取数据时,使用消息队列可以将数据缓存在队列中,提高数据的访问速度。例如,将热门文章缓存在消息队列中,可以减轻数据库的压力。
分布式系统:在分布式系统中,使用消息队列可以在不同的节点之间传递消息。这样可以实现节点之间的解耦,提高系统的可靠性和可扩展性。
日志处理:使用消息队列可以将应用程序的日志信息发送到队列中,并异步地处理它们。这样可以避免日志信息对应用程序的性能产生影响,提高系统的稳定性。
【问题】MyQM的缺点有哪些?
- 系统复杂性:使用消息队列会增加系统的复杂性,需要开发人员具备更多的知识和经验,同时也需要更多的测试和维护工作。
- 消息顺序问题:由于消息队列是异步处理的,因此消息可能会以不同的顺序到达,这可能会导致一些问题。
- 网络延迟:使用消息队列需要通过网络发送和接收消息,如果网络延迟较大,则可能会影响系统的性能。
- 队列管理问题:在使用消息队列时需要管理队列的大小、消息的过期时间和消息的优先级等问题,这可能会带来一些挑战。
【问题】如何保证消息队列的高可用?
集群部署:消息队列应该以集群的方式部署,保证至少有多个实例在运行,即使某个节点出现故障,其他节点也能够接管其工作。
数据备份和恢复:对于重要的消息,应该进行备份,以便在节点故障或数据损坏的情况下能够恢复数据。
心跳检测(监控机制):通过发送心跳包来检测消息队列实例的状态,如果发现实例出现故障,可以及时进行处理。
负载均衡:通过负载均衡将消息队列实例分配到不同的服务器上,以实现负载均衡和容错能力。
自动扩展:当消息队列的负载增加时,应该及时扩展节点或增加机器,以保证消息队列的高可用性。
【问题】如何保证消息不被重复消费?或者说,如何保证消息消费的幂等性?
消费者端去重:在消费者端记录已经消费过的消息 ID,避免重复消费相同的消息。
消息去重机制:在消息队列中设置消息去重机制,确保相同的消息只会被消费一次,例如使用消息 ID 或者消息内容生成唯一标识。
消费者端幂等性设计:在消费者端进行幂等性设计,确保即使相同的消息被重复消费,也不会导致重复操作。例如,可以在消费者端进行幂等性检查,避免重复的操作。
使用分布式锁:在多个消费者同时处理消息时,可以使用分布式锁进行同步,确保只有一个消费者能够处理该消息。
保证消息生产的幂等性:在消息生产者端进行幂等性设计,确保相同的消息只会被生产一次,避免消息重复。
【问题】如何保证消息的可靠性传输?或者说,如何处理消息丢失的问题?
消息确认机制:在消息发送后,应该等待消息的确认响应,确保消息已经被成功接收。如果没有收到确认响应,可以尝试重新发送消息或者将消息放入一个重试队列。
消息持久化:在消息队列中,可以将消息持久化到磁盘上,确保即使消息队列服务器出现故障,也能够在恢复后重新加载消息。对于消息的生产者和消费者,也可以将消息持久化到数据库或者日志中,以便在发生故障时能够恢复消息。
镜像队列:支持镜像队列功能,可以将队列中的消息复制到多个节点上。这可以确保即使一个节点宕机,消息也可以在其他节点上继续处理。
事务机制:支持事务机制,允许生产者将消息发送到队列时开启事务,以确保消息被正确发送。如果发送过程中发生任何错误,可以回滚事务并重试。
【问题】如何保证消息的顺序性?
单个队列:将所有相关的消息发送到同一个队列中,这样可以确保消息按照发送的顺序依次处理。这种方式可以保证消息的顺序性,但会限制并发性能。
分区队列:将不同顺序的消息发送到不同的队列中,例如按照消息的创建时间分配到不同的队列。这种方式可以减少单个队列的负载,并保证消息的顺序性。
消息标识:对于需要保证顺序的消息,可以在消息中添加一个序号或唯一标识符。消费者在处理消息时,可以通过序号或标识符来保证消息的顺序性。
全局锁:在处理需要保证顺序的消息时,可以使用全局锁来确保只有一个消费者在处理消息。这种方式可以保证消息的顺序性,但会降低系统的并发性能。
【问题】大量消息在 MQ 里长时间积压,该如何解决?
调整 MQ 配置:增加队列大小、缓存大小、同时处理的消息数量等,以提高系统的处理能力。
添加更多的消费者:可以添加更多的消费者,以提高消息的处理速度。
增加 MQ 节点:可以增加 MQ 集群的节点数量,以分担消息处理压力。
设置消息过期时间:可以为消息设置过期时间,在一定时间内未被消费则持久化并自动删除,避免消息长时间积压。
优化消费者的消费速度:可以优化消费者的消费速度,例如增加消费者的并发数、调整消费者的处理逻辑等,以提高消费者的处理效率。
监控消息队列:定期监控消息队列中的消息数量和处理速度,可以及时发现消息积压的情况,以采取相应的措施。
分流消息:将消息分散到不同的消息队列中,可以有效缓解单个消息队列的积压问题。
使用缓存:使用缓存可以减少对消息队列的读写操作,从而降低消息积压的风险。
定期清理历史数据:对于一些历史数据已经处理完成的消息,需要及时清理,减少不必要的积压。
【问题】MQ 中的消息过期失效了怎么办?仿照RabbitMQ在消息队列中,可以通过设置消息的过期时间来控制消息的有效期。如果消息过期后仍未被消费者消费,则该消息会被自动删除。
死信队列是一种特殊的队列,它用于存放那些无法被正常消费的消息。当一个消息无法被成功消费并返回确认或失败的时候,MQ将该消息转移到死信队列中。
提高消息可靠性:将无法被正常消费的消息转移到死信队列中,可以保证消息不会被丢失,从而提高消息可靠性。
减少资源浪费:将无法被正常消费的消息保留在主队列中,会占用大量的存储空间和处理资源。将这些消息转移到死信队列中,可以减少资源的浪费。
消息消费失败:当消息在被消费者消费时出现异常,例如消费者进程崩溃或消费者处理消息的代码逻辑出现错误,消息也会被转移到死信队列中。
消息过期:当消息在指定的时间内未被消费,MQ会将其转移到死信队列中。
延迟队列是一种特殊的队列,它用于延迟消息的消费。当消息被发送到延迟队列时,该消息不会立即被消费者消费,而是在指定的时间后才会被消费。
订单超时:当用户下单后,如果在一定时间内未完成支付,可以将订单消息发送到延迟队列中。如果在规定的时间内未完成支付,就可以将该订单取消。
消息重试:当消费者无法正常消费消息时,可以将消息发送到延迟队列中。在一定的时间后,可以重新将消息发送到主队列中,以便消费者重新尝试消费。
定时任务:当需要执行一些定时任务时,可以将任务消息发送到延迟队列中。在指定的时间后,可以消费该消息并执行定时任务。
【问题】使用过其他消息队列吗?为什么选择RabbitMQ?
(1)RabbitMQ:
- 优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置。
- 缺点:性能和吞吐量不太理想,不易进行二次开发。
(2)RocketMQ:
- 优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区。
- 缺点:兼容性上不是太好。
(3)Kafka:
- 优点:拥有强大的性能及吞吐量,兼容性很好。
- 缺点:由于“攒一波再处理”导致延迟比较高。
总的来说,Kafka适用于需要处理大量实时数据的场景,而RabbitMQ适用于需要可靠消息传递的场景。
【问题】RabbitMQ 有哪些重要的角色?
Producer(生产者):生产者是指发送消息的应用程序。它将消息发送到 RabbitMQ 中的 Exchange(交换机)中,交换机会根据消息的路由键(routing key)将消息路由到一个或多个 Queue(队列)中。
Exchange(交换机):Exchange 是消息的中转站,用于接收生产者发送的消息,并根据消息的路由键将消息路由到一个或多个 Queue 中。Exchange 有四种类型:direct、fanout、topic 和 headers。
Queue(队列):Queue 用于存储消息。消费者会从 Queue 中获取消息并进行处理。每个消息都会被路由到一个或多个 Queue 中,这取决于 Exchange 的类型和配置方式。
Channel(信道):Channel 是在 Connection 上创建的一个虚拟连接,用于传输消息。Channel 可以看作是一个独立的会话,通过它可以进行消息的发送和接收,以及声明 Exchange、Queue、Binding 等操作。
Consumer(消费者):消费者是指接收并处理消息的应用程序。消费者从 Queue 中获取消息,并进行处理。消费者可以单独启动,也可以作为一个消费者组一起工作。
Broker(消息代理):消息代理是 RabbitMQ 的核心组件,它接收和转发消息。Broker 负责管理 Exchange、Queue、Binding(绑定)等对象,并且维护这些对象之间的关系。RabbitMQ 的所有消息都必须经过 Broker 进行转发。
【问题】RabbitMQ 有几种队列模式?
简单模式:也称为点对点模型,一个生产者向一个消费者发送消息。消息被发送到一个队列中,然后由一个消费者接收和处理。
工作模式:也称为任务队列模型,一个生产者向多个消费者发送消息。消息被发送到一个队列中,然后由多个消费者竞争接收和处理。
发布/订阅模式:一个生产者向多个消费者广播消息。消息被发送到一个交换机(Exchange),它将消息分发给所有与之绑定的队列,然后由各个队列的消费者接收和处理。
路由模式:根据消息的路由键(Routing Key)将消息发送到特定的队列。生产者指定消息的路由键,而消费者只接收与之匹配的路由键的消息。
主题模式:类似于路由模式,但使用更灵活的通配符匹配规则。生产者可以使用通配符表达式指定消息的路由键,消费者可以使用相同的表达式来匹配接收的消息。
【问题】Kafka的优点?
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写。
【问题】Kafka的缺点?由于是批量发送,数据并非真正的实时;对于mqtt协议不支持;不支持物联网传感数据直接接入;仅支持统一分区内消息有序,无法实现全局消息有序;监控不完善,需要安装插件;依赖zookeeper进行元数据管理;
【问题】Kafka的使用场景?
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、HBase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和 Flink。
【问题】Kafka 是如何做到消息的有序性?kafka 中的每个 partition 中的消息在写入时都是有序的,而且单独一个 partition 只能由一个消费者去消费,可以在里面保证消息的顺序性。但是分区之间的消息是不保证有序的。
【问题】Kafka 可以脱离 zookeeper 单独使用吗?为什么?Kafka 在早期版本中使用 ZooKeeper 来管理和协调 broker 和消费者之间的通信和状态,包括存储元数据信息和分配 partition 等。但是,从 Kafka 2.8 版本开始,Kafka 可以脱离 ZooKeeper 单独使用,引入了 Kafka Raft Metadata Mode 来管理和存储元数据信息。
【问题】Kafka 有几种数据保留的策略?
时间保留策略:根据消息的时间戳来决定消息是否被保留。可以设置保留时间,超过指定时间的消息将被删除。可以通过配置文件中的
log.retention.hours
参数来设置消息的保留时间。大小保留策略:根据主题日志文件的大小来决定消息是否被保留。可以设置日志文件的最大大小,当文件大小超过指定大小时,Kafka 会自动创建一个新的日志文件,并将消息写入新的日志文件。可以通过配置文件中的
log.retention.bytes
参数来设置消息日志文件的最大大小。日志压缩保留策略:根据压缩后的消息日志文件大小来决定消息是否被保留。Kafka 可以自动对消息进行压缩,可以设置消息压缩后的最大大小,当压缩后的日志文件大小超过指定大小时,Kafka 会删除最早的消息。可以通过配置文件中的
log.cleaner
参数来设置消息压缩后的最大大小。删除策略:可以通过配置文件中的
log.cleanup.policy
参数来设置删除策略。支持的删除策略包括删除旧数据、删除未使用的数据、删除压缩的数据等。如果多种数据保留策略同时使用,Kafka 会优先采用时间保留策略。此外,在 Kafka 的新版本中,还支持基于日志段(log segment)的数据保留策略。
【问题】Kafka 的多分区副本?
- 在Kafka中,一个主题(Topic)可以被划分为多个分区(Partition),每个分区由多个副本(Replica)组成。每个分区都有一个Leader副本和多个Follower副本,Leader副本负责读写操作,而Follower副本只负责从Leader副本同步数据。
- 当Producer向Kafka发送消息时,消息首先被写入Leader副本中,然后异步地复制到所有Follower副本中。如果Leader副本出现故障,Kafka会自动选举一个新的Leader副本来替代它。这种多分区副本机制保证了即使在某些副本出现故障的情况下,Kafka仍然能够保证数据的完整性和可用性。
多分区副本机制还可以提高Kafka的并发读取能力。当多个Consumer同时消费同一个分区时,Kafka可以通过分配不同的Follower副本来实现并发读取,从而提高整个系统的吞吐量。增加副本数会增加系统的负载和存储开销,同时,过多的副本数也会增加数据同步的延迟,影响系统的实时性。
【问题】Kafka 的分区策略有哪些?
Round Robin 分区策略:将消息均匀地分布到所有分区中。这种策略适用于所有的生产者,并且分区数量相对较少的情况下效果最佳。
Hash 分区策略:根据消息的 key 值对分区进行哈希计算,将同一个 key 值的消息分配到同一个分区中。这种策略适用于希望具有相同 key 值的消息被写入到同一个分区中的情况。
Range 分区策略:将一定范围内的 key 值分配到同一个分区中。这种策略适用于希望具有相似 key 值的消息被写入到同一个分区中的情况。
Sticky 分区策略:基于消息的 key 值,将同一组 key 值的消息写入到同一个分区中,并尽量将同一组 key 值的消息写入到同一个 broker 上。这种策略适用于希望在同一分区中保持消息的顺序的情况。
自定义分区策略:如果以上分区策略不能满足需求,可以自定义分区策略。自定义分区策略需要实现 Kafka 提供的
org.apache.kafka.clients.producer.Partitioner
接口,并在生产者配置中指定。
【问题】谈下你对 Zookeeper 的认识?Zookeeper 是一个分布式协调服务,它是 Apache 软件基金会的一个开源项目。Zookeeper 的主要功能是提供分布式应用程序的协调服务,例如分布式锁、配置管理、命名服务、分布式队列等,能够使得分布式应用程序能够更加简单、可靠地实现分布式协同工作。
分布式锁:Zookeeper 可以提供分布式锁的实现,帮助分布式应用程序协调并发访问共享资源的问题。
配置管理:Zookeeper 可以作为分布式应用程序的配置中心,管理各种应用程序的配置信息。
命名服务:Zookeeper 可以提供分布式应用程序的命名服务,以便分布式应用程序能够更加方便地访问各种服务。
分布式队列:Zookeeper 可以提供分布式队列的实现,用于分布式应用程序的消息传递和处理。
Leader 选举:Zookeeper 可以提供分布式应用程序的 Leader 选举功能,用于实现主从模式的分布式应用程序。
Zookeeper 的主要特点是高性能、高可用、数据一致性和可靠性等。Zookeeper 的数据存储采用内存数据库,具有高速读写和高容错性,能够支持大量的并发连接。Zookeeper 采用了 Paxos 算法来保证数据的一致性,能够保证数据的可靠性和稳定性。Zookeeper 还提供了类似于文件系统的命名空间,并允许用户以树形结构组织数据,从而更加灵活地管理数据。
【问题】谈下你对 ZAB 协议的了解?ZAB(Zookeeper Atomic Broadcast)协议是 Zookeeper 中用于实现一致性的协议,是 Zookeeper 的核心组件之一。ZAB 协议通过保证所有服务器上的数据副本都是一致的来确保 Zookeeper 系统的高可用性和数据可靠性。ZAB 协议采用了 Paxos 算法的两阶段提交协议来保证数据的一致性,同时还具有更高的性能和可靠性。ZAB 协议分为两个阶段,分别是 Leader 选举和数据广播。
【问题】Zookeeper 怎么保证主从节点的状态同步?在 Zookeeper 中,主从节点之间的状态同步是通过 ZAB(Zookeeper Atomic Broadcast)协议来实现的。
- 在 Zookeeper 中,Leader 节点会负责将客户端请求转化为事务并广播给所有 Follower 节点。一旦 Follower 节点收到 Leader 的广播请求,就会确认该请求,并将确认信息返回给 Leader。当 Leader 节点收到了超过半数的 Follower 节点的确认信息后,就可以将事务提交到自己的数据库中,并将提交结果广播给所有 Follower 节点。Follower 节点接收到 Leader 广播的提交结果后,也会将事务提交到自己的数据库中,从而实现了主从节点之间的状态同步。
- 除了 ZAB 协议之外,Zookeeper 还通过心跳机制来保证主从节点之间的状态同步。具体来说,每个 Follower 节点会定期向 Leader 节点发送心跳请求,以确保与 Leader 节点保持连接,并及时获取 Leader 节点上的最新状态。如果某个 Follower 节点长时间没有收到 Leader 节点的心跳请求,就会认为 Leader 节点已经失效,并开始选举新的 Leader 节点。
【问题】Zookeeper 有几种部署模式?
单机模式:在单机模式下,Zookeeper 只运行在一台机器上,所有的数据都存储在该机器上的本地文件系统中。这种部署方式主要用于测试和开发环境。
集群模式:在集群模式下,Zookeeper 运行在多台机器上,每台机器都有相同的数据副本,其中一台机器被选举为 Leader,其他机器作为 Follower。客户端通过连接到任意一个节点与 Zookeeper 进行通信。这种部署方式可以提高系统的可用性和可靠性,适用于生产环境。
带观察者的集群模式:带观察者的集群模式在集群模式的基础上增加了观察者角色。观察者节点可以接收集群中的广播消息,但不参与投票和事务处理。观察者节点可以帮助分担 Leader 节点的负载,并提高系统的可伸缩性。
【问题】说一下 Zookeeper 的通知机制?通知机制是 Zookeeper 的核心功能之一,它能够实现 Zookeeper 的分布式协调和同步。
节点数据变更通知:客户端可以通过在节点上注册一个 Watcher 监听器来监控该节点的数据变化。当该节点的数据发生变化时,Zookeeper 会将变化通知给客户端,客户端可以根据需要重新获取节点的数据。需要注意的是,一次 Watcher 监听器只会接收一次通知,如果需要持续监听节点的变化,需要在接收到通知后重新注册一个新的 Watcher 监听器。
节点状态变更通知:Zookeeper 还可以通过 Session 事件和 Connection 事件来通知客户端节点的状态变化。当客户端连接到 Zookeeper 服务器时,会创建一个 Session,如果该 Session 超时或者因为其他原因失效,Zookeeper 会触发 Session 事件通知客户端。另外,如果客户端与 Zookeeper 服务器的连接发生异常,Zookeeper 也会触发 Connection 事件通知客户端。客户端可以根据这些事件及时感知节点的状态变化,并采取相应的措施。
【问题】Zookeeper 宕机如何处理?
重新启动 Zookeeper:如果 Zookeeper 宕机只是临时性的,可以尝试重新启动 Zookeeper。当 Zookeeper 重新启动后,会根据数据日志和快照文件恢复数据,尽可能地保留之前的状态和数据。需要注意的是,在重新启动 Zookeeper 之前,需要确保所有依赖于 Zookeeper 的服务已经停止。
故障转移:如果 Zookeeper 宕机的时间较长,或者出现了数据损坏等情况,可能需要进行故障转移。故障转移是指将原本运行在宕机节点上的 Zookeeper 服务迁移到另一个节点上。在进行故障转移之前,需要确保新节点的配置和数据与原节点相同,并确保新节点已经正常运行。
备份和恢复:如果 Zookeeper 宕机后无法通过其他手段进行恢复,可以尝试使用备份和恢复的方法。备份和恢复是指将 Zookeeper 的数据和日志文件备份到另一台机器上,然后在新的机器上重新启动 Zookeeper。备份和恢复的方法可以确保数据的完整性,但需要耗费较长的时间和较大的存储空间。
【问题】Zookeeper 和 Dubbo 的关系?在 Dubbo 中,Zookeeper 主要用于服务的注册和发现。当服务提供者启动后,会将自己的 IP 地址、端口号和服务名称等信息注册到 Zookeeper 中,并保持心跳连接,以保证服务的可用性。当服务消费者需要调用某个服务时,会通过 Zookeeper 获取该服务提供者的地址,并建立连接,以调用服务。除了服务注册和发现,Zookeeper 还可以用于 Dubbo 的动态配置。Dubbo 可以在 Zookeeper 中注册一个配置中心,将系统中的一些配置信息存储在 Zookeeper 中,并在运行时通过订阅 Zookeeper 中的配置信息来动态更新应用程序的配置。
【问题】集群中为什么要有主节点?它可以掌控整个集群的状态,决定任务的分配和执行,并保证集群的数据一致性和正常运行。同时,主节点的存在也可以简化系统的实现,提高集群的效率和性能。
控制和协调:主节点负责控制和协调整个集群的工作,包括任务的分配、节点的管理、状态的同步等。主节点能够掌控整个集群的状态,决定哪些任务由哪些节点执行,并保证集群的数据一致性。
故障转移:主节点还负责处理节点的故障和宕机情况,当主节点出现故障时,从节点可以接管主节点的工作,并重新选举一个新的主节点,以保证整个集群的正常运行。
提高效率:主节点可以避免多个从节点之间的竞争和冲突,从而提高集群的效率和性能。主节点可以对任务进行合理的分配和调度,避免资源的浪费和重复执行。
简化实现:主节点模式可以简化分布式系统的实现,避免分布式算法和复杂的协调机制,从而减少系统的复杂性和维护成本。
【问题】集群中有 3 台服务器,其中一个节点宕机,这个时候 Zookeeper 还可以使用吗?
- Zookeeper 可以容忍部分节点宕机,但是具体情况取决于宕机的节点数量以及它们所在的角色。如果在一个由 3 个节点组成的集群中,其中一个节点宕机,那么 Zookeeper 仍然可以使用。因为 Zookeeper 使用的是过半数机制,只要超过一半的节点正常工作,集群就可以继续运行。
- 但是,如果宕机的是主节点,那么需要重新选举新的主节点,才能保证集群的正常运行。这种情况下,需要满足以下条件:(1)还剩下的节点数要大于等于一半;(2)宕机的节点不能是最后一个主节点,否则需要手动恢复数据并重新启动集群。
- 如果在这个例子中,宕机的是主节点,而且只剩下了两个从节点,那么集群就无法继续工作了,因为无法选举新的主节点。需要等待宕机的节点重新启动或者加入新的节点才能恢复集群的工作。
我们可使用 Queue 来实现消息队列,Queue 大体可分为以下三类:
- 双端队列(Deque)是 Queue 的子类也是 Queue 的补充类,头部和尾部都支持元素插入和获取;
- 阻塞队列指的是在元素操作时(添加或删除),如果没有成功,会阻塞等待执行,比如当添加元素时,如果队列元素已满,队列则会阻塞等待直到有空位时再插入;
- 非阻塞队列,和阻塞队列相反,它会直接返回操作的结果,而非阻塞等待操作,双端队列也属于非阻塞队列。
import java.util.LinkedList; import java.util.Queue; public class CustomQueue { // 定义消息队列 private static Queue<String> queue = new LinkedList<>(); public static void main(String[] args) { producer(); // 调用生产者 consumer(); // 调用消费者 } // 生产者 public static void producer() { // 添加消息 queue.add("first message."); queue.add("second message."); queue.add("third message."); } // 消费者 public static void consumer() { while (!queue.isEmpty()) { // 消费消息 System.out.println(queue.poll()); } } }实现自定义延迟队列需要实现 Delayed 接口,重写 getDelay() 方法,延迟队列完整实现代码如下:
import lombok.Getter; import lombok.Setter; import java.text.DateFormat; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * 自定义延迟队列 */ public class CustomDelayQueue { // 延迟消息队列 private static DelayQueue delayQueue = new DelayQueue(); public static void main(String[] args) throws InterruptedException { producer(); // 调用生产者 consumer(); // 调用消费者 } // 生产者 public static void producer() { // 添加消息 delayQueue.put(new MyDelay(1000, "消息1")); delayQueue.put(new MyDelay(3000, "消息2")); } // 消费者 public static void consumer() throws InterruptedException { System.out.println("开始执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); while (!delayQueue.isEmpty()) { System.out.println(delayQueue.take()); } System.out.println("结束执行时间:" + DateFormat.getDateTimeInstance().format(new Date())); } /** * 自定义延迟队列 */ static class MyDelay implements Delayed { // 延迟截止时间(单位:毫秒) long delayTime = System.currentTimeMillis(); // 借助 lombok 实现 @Getter @Setter private String msg; /** * 初始化 * @param delayTime 设置延迟执行时间 * @param msg 执行的消息 */ public MyDelay(long delayTime, String msg) { this.delayTime = (this.delayTime + delayTime); this.msg = msg; } // 获取剩余时间 @Override public long getDelay(TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 队列里元素的排序依据 @Override public int compareTo(Delayed o) { if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) { return 1; } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) { return -1; } else { return 0; } } @Override public String toString() { return this.msg; } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。