当前位置:   article > 正文

Java客户端实现RabbitMQ简单队列_rabbitmq jdk1.7

rabbitmq jdk1.7

一.导包

maven中添加依赖。

  1. <dependency>
  2.     <groupId>com.rabbitmq</groupId>
  3.     <artifactId>amqp-client</artifactId>
  4.     <version>4.9.0</version>
  5. </dependency>

之前我导入了RabbitMQ最新的包(5.5.0),然后了一个错误

Unsupported major.minor version 52.0

是因为我本机装的jdk1.7,但5.x系列的版本需要JDK 8支持,4.x系列的版本支持JDK 6,我又换成了4.X的包。

附stanford parser和jdk版本对应关系:

  1. J2SE 8 = 52,
  2. J2SE 7 = 51,
  3. J2SE 6.0 = 50,
  4. J2SE 5.0 = 49,
  5. JDK 1.4 = 48,
  6. JDK 1.3 = 47,
  7. JDK 1.2 = 46,
  8. JDK 1.1 = 45

二.创建连接工具类

  1. package utill;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * 获取MQ连接工具
  8. * @author liucong 2018/11/13
  9. */
  10. public class ConnectionUtils {
  11. /**
  12. * 获取连接方法
  13. * @return connection
  14. */
  15. public static Connection getConnection() throws IOException, TimeoutException {
  16. //1.定义连接工厂
  17. ConnectionFactory connectionFactory = new ConnectionFactory();
  18. //2.设置服务地址
  19. connectionFactory.setHost("127.0.0.1");
  20. //3.设置端口(AMQP协议的端口,类似Mysql的3306)
  21. connectionFactory.setPort(5672);
  22. //4.设置账号,密码,vhost
  23. connectionFactory.setVirtualHost("/");
  24. connectionFactory.setUsername("guest");
  25. connectionFactory.setPassword("guest");
  26. //5.获得连接
  27. Connection connection = connectionFactory.newConnection();
  28. return connection;
  29. }
  30. }

三.创建生产者

  1. package producer;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import org.junit.Test;
  5. import utill.ConnectionUtils;
  6. import java.io.IOException;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * 生产者发送数据到消息队列
  10. * @author liucong 2018/11/13
  11. */
  12. public class Producer {
  13. /**
  14. * 队列名称
  15. */
  16. private static final String QUEUE_NAME = "QUEUE_test";
  17. /**
  18. * 发送消息方法
  19. * @throws IOException
  20. * @throws TimeoutException
  21. */
  22. @Test
  23. public void sendMsg() throws IOException, TimeoutException {
  24. //1.通过工具类获取MQ连接
  25. Connection connection = ConnectionUtils.getConnection();
  26. //2.从连接中创建通道
  27. Channel channel = connection.createChannel();
  28. //3.创建队列(让路由器知道往哪个队列里发送消息)
  29. boolean durable = false;
  30. boolean exclusive = false;
  31. boolean autoDelete = false;
  32. channel.queueDeclare(QUEUE_NAME, durable, exclusive, autoDelete,null);
  33. //4.创建消息
  34. String msg = "Hello QUEUE_test !!";
  35. //5.发送消息
  36. channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
  37. System.out.println(msg + "发送成功");
  38. //6.关闭连接
  39. channel.close();
  40. connection.close();
  41. }
  42. }

发送一条消息试试。

发送成功了。

四.创建消费者

  1. package customer;
  2. import com.rabbitmq.client.*;
  3. import utill.ConnectionUtils;
  4. import java.io.IOException;
  5. import java.util.concurrent.TimeoutException;
  6. /**
  7. * 消费者消费消息
  8. * @author liucong 2018/11/13
  9. */
  10. public class Customer {
  11. /**
  12. * 队列名称
  13. */
  14. private static final String QUEUE_NAME = "QUEUE_test1";
  15. public static void main(String[] arge) throws IOException, TimeoutException {
  16. //1.获取连接
  17. Connection connection = ConnectionUtils.getConnection();
  18. //2.创建管道
  19. Channel channel = connection.createChannel();
  20. //3.获取消息
  21. Consumer consumer = new DefaultConsumer(channel){
  22. /**
  23. * @param consumerTag consumerTag
  24. * @param envelope 存放生产者相关信息
  25. * @param properties properties
  26. * @param body 消息实体
  27. * @throws IOException
  28. */
  29. @Override
  30. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  31. String msg = new String(body,"UTF-8");
  32. System.out.println("已接收到" + msg);
  33. }
  34. };
  35. //4.自动回复队列应答(RabbitMQ中的消息确认机制)
  36. channel.basicConsume(QUEUE_NAME, true, consumer);
  37. //5.关闭连接
  38. channel.close();
  39. connection.close();
  40. }
  41. }

一个简单的消息队列就完成了。

简单队列的不足: 耦合性高  生产消费一一对应,如果有多个消费者想都消费这个消息,队列名称变更时需要同时更改 。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/568801
推荐阅读
相关标签
  

闽ICP备14008679号