赞
踩
拉取带有控制台的rabbitmq镜像
docker pull rabbitmq:management
运行
docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
控制台访问, RabbitMQ默认的用户名:guest,密码:guest
http://localhost:15672
rabbitmq不建议使用guest用户进行操作, 先新建一个用户, 并设置为admin
添加一个virtual host, 默认只有一个/
的host
进入新建的host, 为新建的user设置拥有该host的权限
简单体验rabbitmq的使用
首先建立一个连接
fun getConnection(): Connection {
return ConnectionFactory().apply {
host = "你的IP地址"
port = 5672
virtualHost = "/vhost_test"
username = "oleolema"
password = "yqh"
}.newConnection()
}
const val QUEUE_NAME = "test_simple_queue"
建立生产者
fun main() {
val connection = getConnection()
// 获取通道
val channel = connection.createChannel()
connection.use {
channel.use {
// 创建队列声明
it.queueDeclare(QUEUE_NAME, false, false, false, null)
val msg = "Hello RabbitMQ"
// 发送信息
it.basicPublish("", QUEUE_NAME, null, msg.toByteArray())
}
}
}
建立消费者(多个消费者代码相同)
fun main() { val connection = getConnection() // channel val channel = connection.createChannel() // 队列声明 // durable 用于消息持久化 (服务器重启rabbitmq的消息也不会丢失) channel.queueDeclare(QUEUE_NAME, false, false, false, null) // autoAck: 自动告知已收到 // 接收消息 channel.basicConsume(QUEUE_NAME, true, // 接收到消息 DeliverCallback { consumerTag, message -> println(consumerTag) println(message.body.toString(Charset.defaultCharset())) }, // 消息被取消 CancelCallback { println("cancel") }) }
上面使用的轮询分发的模式, 如果有多个消费者, 生产者的消息会平均分发到每个一个消费者. 这种模式的弊端显而易见, 如果消费者的消费实力不等, 便会造成一边空闲, 一边负载, 造成服务器资源利用不均, 因此就出现了 能者多劳 的模式
能者多劳: 当消费者处理完成后, 主动回应消息队列, 消息队列才会给它发送信息
队列名称
const val QUEUE_WORK = "test_work_queue"
生产者
fun main() {
getConnection().use {
val channel = it.createChannel()
channel.queueDeclare(QUEUE_WORK, false, false, false, null)
for (i in 1..100) {
val msg = "Hello RabbitMQ $i"
channel.basicPublish("", QUEUE_WORK, null, msg.toByteArray())
println("product $msg")
}
}
}
消费者 (多个消费者代码相同)
fun main() { val channel = getConnection().createChannel() channel.queueDeclare(QUEUE_WORK, false, false, false, null) // 一次只分发一个 channel.basicQos(1) // autoAck = false channel.basicConsume(QUEUE_WORK, false, DeliverCallback { consumerTag, message -> try { println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(100) } finally { // 主动回执 channel.basicAck(message.envelope.deliveryTag, false) } }, CancelCallback { println("cancel") }) }
订阅模式 : 消费者C订阅消息 , 多个消费者收到同一条消息. 它的原理是: 生产者P的消息不直接发送到消息队列, 而是发送到交换机X(交换机不具备存储功能, 只负责消息转发), 交换机再发送到跟自身绑定的消息队列, 因此, 每个消费者需要有不同消息队列, 将其绑定到交换机上. 这样, 生产者将消息发送给交换机, 交换机再转发到各个消息队列, 间接的完成了消息的发送
生产者, 发送消息到交换机
const val QUEUE_FANOUT1 = "test_fanout_queue1"
const val QUEUE_FANOUT2 = "test_fanout_queue2"
const val EXCHANGE_NAME = "test_simple_exchange"
fun main() {
getConnection().use {
val channel = it.createChannel()
// fanout : 分发
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT)
for (i in 1..50) {
val msg = "Hello RabbitMQ $i"
channel.basicPublish(EXCHANGE_NAME, "", null, msg.toByteArray())
println("product $msg")
}
}
}
消费者1, 订阅不同的消息队列 QUEUE_FANOUT1
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT) // 定义队列 channel.queueDeclare(QUEUE_FANOUT1, false, false, false, null) // 队列绑定到交换机 channel.queueBind(QUEUE_FANOUT1, EXCHANGE_NAME, "") channel.basicConsume(QUEUE_FANOUT1, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(100) }, CancelCallback { println("cancel") }) }
消费者2, 订阅不同的消息队列 QUEUE_FANOUT2
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT) // 定义队列 channel.queueDeclare(QUEUE_FANOUT2, false, false, false, null) // 队列绑定到交换机 channel.queueBind(QUEUE_FANOUT2, EXCHANGE_NAME, "") channel.basicConsume(QUEUE_FANOUT2, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(200) }, CancelCallback { println("cancel") }) }
交换机
fanout
: 分发, 不处理路由键, 只要和交换机绑定, 便会收到消息.
direct
: 路由键完全匹配
topic
: 按规则匹配路由键
关于topic
的规则:
*
匹配一个#
匹配一个或多个Direct
生产者
fun main() {
getConnection().use {
val channel = it.createChannel()
channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT)
val routeKey = "warning"
val msg = "Hello RabbitMQ $routeKey"
channel.basicPublish(EXCHANGE_DIRECT, routeKey, null, msg.toByteArray())
println("product $msg")
}
}
消费者1, 绑定路由键: info, warning, error
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT) // 定义队列 channel.queueDeclare(QUEUE_DIRECT1, false, false, false, null) // 队列绑定到交换机 指定路由键为error warning info channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "error") channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "info") channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "warning") channel.basicConsume(QUEUE_DIRECT1, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(200) }, CancelCallback { println("cancel") }) }
消费者2 绑定路由键: error
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT) // 定义队列 channel.queueDeclare(QUEUE_DIRECT2, false, false, false, null) // 队列绑定到交换机 指定路由键为error channel.queueBind(QUEUE_DIRECT2, EXCHANGE_DIRECT, "error") channel.basicConsume(QUEUE_DIRECT2, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(200) }, CancelCallback { println("cancel") }) }
Topic
生产者:
fun main() {
getConnection().use {
val channel = it.createChannel()
channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC)
val routeKey = "goods.add"
val msg = "Hello RabbitMQ $routeKey"
channel.basicPublish(EXCHANGE_TOPIC, routeKey, null, msg.toByteArray())
println("product $msg")
}
}
消费者1, 路由键 goods.select
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC) // 定义队列 channel.queueDeclare(QUEUE_TOPIC1, false, false, false, null) // 队列绑定到交换机 指定路由键为error warning info channel.queueBind(QUEUE_TOPIC1, EXCHANGE_TOPIC, "goods.select") channel.basicConsume(QUEUE_TOPIC1, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(200) }, CancelCallback { println("cancel") }) }
消费者2, 路由键: goods.#
fun main() { val channel = getConnection().createChannel() // 定义交换机 channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC) // 定义队列 channel.queueDeclare(QUEUE_TOPIC2, false, false, false, null) // 队列绑定到交换机 指定路由键为error channel.queueBind(QUEUE_TOPIC2, EXCHANGE_TOPIC, "goods.#") channel.basicConsume(QUEUE_TOPIC2, true, DeliverCallback { consumerTag, message -> println(message.body.toString(StandardCharsets.UTF_8)) // 消费需要时间 Thread.sleep(200) }, CancelCallback { println("cancel") }) }
事务
channel.txSelect()
开启事务
channel.txCommit()
提交事务
channel.txRollback()
回滚事务
生产者
fun main() { getConnection().use { val channel = it.createChannel() channel.queueDeclare(QUEUE_TRANSACTION, false, false, false, null) try { // 开启事务 channel.txSelect() val msg = "Hello RabbitMQ" channel.basicPublish("", QUEUE_TRANSACTION, null, msg.toByteArray()) val a = 1 / 0 println("product $msg") // 提交事务 channel.txCommit() } catch (e: Exception) { // 回滚事务 channel.txRollback() println("error: $e") } } }
消费者
fun main() {
val channel = getConnection().createChannel()
channel.queueDeclare(QUEUE_TRANSACTION, false, false, false, null)
channel.basicConsume(QUEUE_TRANSACTION, true, DeliverCallback { consumerTag, message ->
println(message.body.toString(StandardCharsets.UTF_8))
// 消费需要时间
Thread.sleep(200)
}, CancelCallback {
println("cancel")
})
}
回执
消息发送成功后通知生产者. 当RabbitMQ将消息存储到消息队列, 表示发送成功, 如果开启了持久化, RabbitMQ将消息写入磁盘后表示成功
生产者
fun main() { getConnection().use { val channel = it.createChannel() channel.queueDeclare(QUEUE_CONFIRM, false, false, false, null) // 开启发送成功回执(RabbitMQ将消息存储到消息队列, 表示发送成功, 如果开启了持久化, RabbitMQ将消息写入磁盘后表示成功) channel.confirmSelect() for(i in 1..5){ val msg = "Hello RabbitMQ $i" channel.basicPublish("", QUEUE_CONFIRM, null, msg.toByteArray()) println("product $msg") } // 判断是否发送成功 if (channel.waitForConfirms()) { println("发送成功") } else { println("发送失败") } } }
消费者
fun main() {
val channel = getConnection().createChannel()
channel.queueDeclare(QUEUE_CONFIRM, false, false, false, null)
channel.basicConsume(QUEUE_CONFIRM, true, DeliverCallback { consumerTag, message ->
println(message.body.toString(StandardCharsets.UTF_8))
// 消费需要时间
Thread.sleep(200)
}, CancelCallback {
println("cancel")
})
}
SpringBoot 整合 RabbitMQ
配置application.properties
#rabbitmq
spring.rabbitmq.host=101.200.43.221
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/vhost_test
spring.rabbitmq.username=oleolema
spring.rabbitmq.password=yqh
启动类 @EnableRabbit
@EnableRabbit
@SpringBootApplication
class RabbitmqDemoApplication
fun main(args: Array<String>) {
runApplication<RabbitmqDemoApplication>(*args)
}
配置队列和交换机
@Configuration class RabbitConfig { //队列 起名:TestDirectQueue @Bean fun TestDirectQueue(): Queue? { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return Queue("TestDirectQueue", false) } //Direct交换机 起名:TestDirectExchange @Bean fun TestDirectExchange(): DirectExchange? { // return new DirectExchange("TestDirectExchange",true,true); return DirectExchange("TestDirectExchange", false, false) } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting @Bean fun bindingDirect(): Binding? { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting") } }
生产者
@RestController class ProducerController( private val rabbitTemplate: RabbitTemplate ) { @GetMapping("/producer") fun producer(): String { val messageId = UUID.randomUUID().toString() val messageData = "test message, hello!" val createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) val map: MutableMap<String, Any> = HashMap() map["messageId"] = messageId map["messageData"] = messageData map["createTime"] = createTime //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map) return "success" } }
消费者
@Component
@RabbitListener(queues = ["TestDirectQueue"]) //监听的队列名称 TestDirectQueue
class Consumer {
@RabbitHandler
fun process(testMessage: Map<*, *>) {
println("DirectReceiver消费者收到消息 : $testMessage")
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。