当前位置:   article > 正文

rabbitmq连接java代码_rabbitmq java代码

rabbitmq java代码

生产者连接rabbitMQ的代码: 

  1. import java.io.IOException;
  2. import java.util.concurrent.TimeoutException;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. import com.rabbitmq.client.MessageProperties;
  7. public class RabbitProducer {
  8. private static final String EXCHANGE_NAME = "exchange_demo";
  9. private static final String ROUTING_KEY = "routingkey_demo";
  10. private static final String QUEUE_NAME = "queue_demo";
  11. private static final String IP_ADDRESS = "127.0.0.1";
  12. private static final int PORT = 5672; // RabbitMQ服务端默认端口号为5672
  13. public static void main(String[] args) throws IOException, TimeoutException {
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost(IP_ADDRESS);
  16. factory.setPort(PORT);
  17. factory.setUsername("zifeiy");
  18. factory.setPassword("passwd");
  19. Connection connection = factory.newConnection(); // 建立连接
  20. Channel channel = connection.createChannel(); // 创建信道
  21. // 创建一个type="direct"、持久化的、非自动删除的交换器
  22. channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
  23. // 创建一个持久化、非排他的、非自动删除的队列
  24. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
  25. // 将交换器和队列通过路由绑定
  26. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
  27. // 发送一条持久化的消息:hello world!
  28. String message = "hello,world!";
  29. channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
  30. MessageProperties.PERSISTENT_TEXT_PLAIN,
  31. message.getBytes());
  32. // 关闭资源
  33. channel.close();
  34. connection.close();
  35. }
  36. }

消费者连接rabbitMQ的代码:

  1. import java.io.IOException;
  2. import java.util.concurrent.TimeUnit;
  3. import java.util.concurrent.TimeoutException;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Address;
  6. import com.rabbitmq.client.Channel;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. import com.rabbitmq.client.Consumer;
  9. import com.rabbitmq.client.DefaultConsumer;
  10. import com.rabbitmq.client.Envelope;
  11. import com.rabbitmq.client.Connection;
  12. public class RabbitConsumer {
  13. private static final String QUEUE_NAME = "queue_demo";
  14. private static final String IP_ADDRESS = "127.0.0.1";
  15. private static final int PORT = 5672;
  16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  17. Address[] addresses = new Address[] {
  18. new Address(IP_ADDRESS, PORT)
  19. };
  20. ConnectionFactory factory = new ConnectionFactory();
  21. factory.setUsername("zifeiy");
  22. factory.setPassword("passwd");
  23. // 这里的连接方式与生产者的demo略有不同,注意区分
  24. Connection connection = factory.newConnection(addresses); // 创建连接
  25. final Channel channel = connection.createChannel(); // 创建信道
  26. channel.basicQos(64); // 设置客户端最多接受未被ack的消息的个数
  27. Consumer consumer = new DefaultConsumer(channel) {
  28. @Override
  29. public void handleDelivery(String consumerTag, Envelope envelope,
  30. AMQP.BasicProperties properties, byte[] body) throws IOException {
  31. System.out.println("recv message: " + new String(body));
  32. try {
  33. TimeUnit.SECONDS.sleep(1);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. channel.basicAck(envelope.getDeliveryTag(), false);
  38. }
  39. };
  40. channel.basicConsume(QUEUE_NAME, consumer);
  41. // 等待回调函数执行完毕后,关闭资源
  42. TimeUnit.SECONDS.sleep(5);
  43. channel.close();
  44. connection.close();
  45. }
  46. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/384281
推荐阅读
相关标签
  

闽ICP备14008679号