当前位置:   article > 正文

RabbitMQ - 安装配置及使用方法_rabbitmq配置ip 管理员

rabbitmq配置ip 管理员

安装

拉取带有控制台的rabbitmq镜像

docker pull rabbitmq:management
  • 1

运行

docker run --name rabbitmq -d -p 15672:15672 -p 5672:5672 rabbitmq:management
  • 1

控制台访问, RabbitMQ默认的用户名:guest,密码:guest

http://localhost:15672
  • 1

使用

rabbitmq不建议使用guest用户进行操作, 先新建一个用户, 并设置为admin
在这里插入图片描述
添加一个virtual host, 默认只有一个/的host
在这里插入图片描述
进入新建的host, 为新建的user设置拥有该host的权限
在这里插入图片描述
简单体验rabbitmq的使用
首先建立一个连接

fun getConnection(): Connection {
    return ConnectionFactory().apply {
        host = "你的IP地址"
        port = 5672
        virtualHost = "/vhost_test"
        username = "oleolema"
        password = "yqh"
    }.newConnection()
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
const val QUEUE_NAME = "test_simple_queue"
  • 1

建立生产者

fun main() {
    val connection = getConnection()
//    获取通道
    val channel = connection.createChannel()
    connection.use {
        channel.use {
//        创建队列声明
            it.queueDeclare(QUEUE_NAME, false, false, false, null)
            val msg = "Hello RabbitMQ"
//        发送信息
            it.basicPublish("", QUEUE_NAME, null, msg.toByteArray())
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

建立消费者(多个消费者代码相同)

fun main() {
    val connection = getConnection()
    // channel
    val channel = connection.createChannel()
//    队列声明
//    durable 用于消息持久化 (服务器重启rabbitmq的消息也不会丢失)
    channel.queueDeclare(QUEUE_NAME, false, false, false, null)
//     autoAck: 自动告知已收到
//	接收消息
    channel.basicConsume(QUEUE_NAME, true,
    		// 接收到消息
            DeliverCallback { consumerTag, message ->
                println(consumerTag)
                println(message.body.toString(Charset.defaultCharset()))
            },
            // 消息被取消
            CancelCallback {
                println("cancel")
            })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

在这里插入图片描述
上面使用的轮询分发的模式, 如果有多个消费者, 生产者的消息会平均分发到每个一个消费者. 这种模式的弊端显而易见, 如果消费者的消费实力不等, 便会造成一边空闲, 一边负载, 造成服务器资源利用不均, 因此就出现了 能者多劳 的模式
能者多劳: 当消费者处理完成后, 主动回应消息队列, 消息队列才会给它发送信息
在这里插入图片描述
队列名称

const val QUEUE_WORK = "test_work_queue"
  • 1

生产者

fun main() {
    getConnection().use {
        val channel = it.createChannel()
        channel.queueDeclare(QUEUE_WORK, false, false, false, null)
        for (i in 1..100) {
            val msg = "Hello RabbitMQ $i"
            channel.basicPublish("", QUEUE_WORK, null, msg.toByteArray())
            println("product $msg")
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

消费者 (多个消费者代码相同)

fun main() {
    val channel = getConnection().createChannel()
    channel.queueDeclare(QUEUE_WORK, false, false, false, null)
//    一次只分发一个
    channel.basicQos(1)
//    autoAck = false
    channel.basicConsume(QUEUE_WORK, false, DeliverCallback { consumerTag, message ->
        try {
            println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
            Thread.sleep(100)
        } finally {
//            主动回执
            channel.basicAck(message.envelope.deliveryTag, false)
        }
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

订阅模式 : 消费者C订阅消息 , 多个消费者收到同一条消息. 它的原理是: 生产者P的消息不直接发送到消息队列, 而是发送到交换机X(交换机不具备存储功能, 只负责消息转发), 交换机再发送到跟自身绑定的消息队列, 因此, 每个消费者需要有不同消息队列, 将其绑定到交换机上. 这样, 生产者将消息发送给交换机, 交换机再转发到各个消息队列, 间接的完成了消息的发送
生产者, 发送消息到交换机
在这里插入图片描述

const val QUEUE_FANOUT1 = "test_fanout_queue1"
const val QUEUE_FANOUT2 = "test_fanout_queue2"
const val EXCHANGE_NAME = "test_simple_exchange"
  • 1
  • 2
  • 3
fun main() {
    getConnection().use {
        val channel = it.createChannel()
//        fanout : 分发
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT)
        for (i in 1..50) {
            val msg = "Hello RabbitMQ $i"
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.toByteArray())
            println("product $msg")
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

消费者1, 订阅不同的消息队列 QUEUE_FANOUT1

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT)
//    定义队列
    channel.queueDeclare(QUEUE_FANOUT1, false, false, false, null)
//    队列绑定到交换机
    channel.queueBind(QUEUE_FANOUT1, EXCHANGE_NAME, "")
    channel.basicConsume(QUEUE_FANOUT1, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(100)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

消费者2, 订阅不同的消息队列 QUEUE_FANOUT2

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT)
//    定义队列
    channel.queueDeclare(QUEUE_FANOUT2, false, false, false, null)
//    队列绑定到交换机
    channel.queueBind(QUEUE_FANOUT2, EXCHANGE_NAME, "")
    channel.basicConsume(QUEUE_FANOUT2, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

交换机
fanout: 分发, 不处理路由键, 只要和交换机绑定, 便会收到消息.
direct: 路由键完全匹配
topic: 按规则匹配路由键

关于topic的规则:

  • *匹配一个
  • #匹配一个或多个

Direct
在这里插入图片描述
生产者

fun main() {
    getConnection().use {
        val channel = it.createChannel()
        channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT)
        val routeKey = "warning"
        val msg = "Hello RabbitMQ $routeKey"
        channel.basicPublish(EXCHANGE_DIRECT, routeKey, null, msg.toByteArray())
        println("product $msg")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消费者1, 绑定路由键: info, warning, error

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT)
//    定义队列
    channel.queueDeclare(QUEUE_DIRECT1, false, false, false, null)
//    队列绑定到交换机 指定路由键为error warning info
    channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "error")
    channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "info")
    channel.queueBind(QUEUE_DIRECT1, EXCHANGE_DIRECT, "warning")
    channel.basicConsume(QUEUE_DIRECT1, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费者2 绑定路由键: error

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT)
//    定义队列
    channel.queueDeclare(QUEUE_DIRECT2, false, false, false, null)
//    队列绑定到交换机 指定路由键为error
    channel.queueBind(QUEUE_DIRECT2, EXCHANGE_DIRECT, "error")
    channel.basicConsume(QUEUE_DIRECT2, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

Topic
在这里插入图片描述
生产者:

fun main() {
    getConnection().use {
        val channel = it.createChannel()
        channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC)
        val routeKey = "goods.add"
        val msg = "Hello RabbitMQ $routeKey"
        channel.basicPublish(EXCHANGE_TOPIC, routeKey, null, msg.toByteArray())
        println("product $msg")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

消费者1, 路由键 goods.select

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC)
//    定义队列
    channel.queueDeclare(QUEUE_TOPIC1, false, false, false, null)
//    队列绑定到交换机 指定路由键为error warning info
    channel.queueBind(QUEUE_TOPIC1, EXCHANGE_TOPIC, "goods.select")
    channel.basicConsume(QUEUE_TOPIC1, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

消费者2, 路由键: goods.#

fun main() {
    val channel = getConnection().createChannel()
//    定义交换机
    channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC)
//    定义队列
    channel.queueDeclare(QUEUE_TOPIC2, false, false, false, null)
//    队列绑定到交换机 指定路由键为error
    channel.queueBind(QUEUE_TOPIC2, EXCHANGE_TOPIC, "goods.#")
    channel.basicConsume(QUEUE_TOPIC2, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

事务
channel.txSelect() 开启事务
channel.txCommit() 提交事务
channel.txRollback() 回滚事务

生产者

fun main() {
    getConnection().use {
        val channel = it.createChannel()
        channel.queueDeclare(QUEUE_TRANSACTION, false, false, false, null)
        try {
//            开启事务
            channel.txSelect()
            val msg = "Hello RabbitMQ"
            channel.basicPublish("", QUEUE_TRANSACTION, null, msg.toByteArray())
            val a = 1 / 0
            println("product $msg")
//            提交事务
            channel.txCommit()
        } catch (e: Exception) {
//            回滚事务
            channel.txRollback()
            println("error: $e")
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

消费者

fun main() {
    val channel = getConnection().createChannel()
    channel.queueDeclare(QUEUE_TRANSACTION, false, false, false, null)
    channel.basicConsume(QUEUE_TRANSACTION, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

回执
消息发送成功后通知生产者. 当RabbitMQ将消息存储到消息队列, 表示发送成功, 如果开启了持久化, RabbitMQ将消息写入磁盘后表示成功

生产者

fun main() {
    getConnection().use {
        val channel = it.createChannel()
        channel.queueDeclare(QUEUE_CONFIRM, false, false, false, null)
//            开启发送成功回执(RabbitMQ将消息存储到消息队列, 表示发送成功, 如果开启了持久化, RabbitMQ将消息写入磁盘后表示成功)
        channel.confirmSelect()
        for(i in 1..5){
            val msg = "Hello RabbitMQ $i"
            channel.basicPublish("", QUEUE_CONFIRM, null, msg.toByteArray())
            println("product $msg")
        }
//            判断是否发送成功
        if (channel.waitForConfirms()) {
            println("发送成功")
        } else {
            println("发送失败")
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

消费者

fun main() {
    val channel = getConnection().createChannel()
    channel.queueDeclare(QUEUE_CONFIRM, false, false, false, null)
    channel.basicConsume(QUEUE_CONFIRM, true, DeliverCallback { consumerTag, message ->
        println(message.body.toString(StandardCharsets.UTF_8))
//            消费需要时间
        Thread.sleep(200)
    }, CancelCallback {
        println("cancel")
    })
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

SpringBoot 整合 RabbitMQ

配置application.properties

#rabbitmq
spring.rabbitmq.host=101.200.43.221
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/vhost_test
spring.rabbitmq.username=oleolema
spring.rabbitmq.password=yqh
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

启动类 @EnableRabbit

@EnableRabbit
@SpringBootApplication
class RabbitmqDemoApplication

fun main(args: Array<String>) {
    runApplication<RabbitmqDemoApplication>(*args)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

配置队列和交换机

@Configuration
class RabbitConfig {

    //队列 起名:TestDirectQueue
    @Bean
    fun TestDirectQueue(): Queue? {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //   return new Queue("TestDirectQueue",true,true,false);

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return Queue("TestDirectQueue", false)
    }

    //Direct交换机 起名:TestDirectExchange
    @Bean
    fun TestDirectExchange(): DirectExchange? {
        //  return new DirectExchange("TestDirectExchange",true,true);
        return DirectExchange("TestDirectExchange", false, false)
    }

    //绑定  将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting
    @Bean
    fun bindingDirect(): Binding? {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

生产者

@RestController
class ProducerController(
        private val rabbitTemplate: RabbitTemplate
) {

    @GetMapping("/producer")
    fun producer(): String {
        val messageId = UUID.randomUUID().toString()
        val messageData = "test message, hello!"
        val createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))
        val map: MutableMap<String, Any> = HashMap()
        map["messageId"] = messageId
        map["messageData"] = messageData
        map["createTime"] = createTime
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", map)
        return "success"
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

消费者

@Component
@RabbitListener(queues = ["TestDirectQueue"]) //监听的队列名称 TestDirectQueue
class Consumer {
    @RabbitHandler
    fun process(testMessage: Map<*, *>) {
        println("DirectReceiver消费者收到消息  : $testMessage")
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/769778
推荐阅读
相关标签
  

闽ICP备14008679号