赞
踩
四种模式通用的内容
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
封装连接工具类
public class RabbitUtils { private static ConnectionFactory connectionFactory = new ConnectionFactory(); static { connectionFactory.setHost("192.168.10.180"); connectionFactory.setPort(5672);//5672是RabbitMQ的默认端口号 connectionFactory.setUsername("lee_curry"); connectionFactory.setPassword("leecurry"); connectionFactory.setVirtualHost("baiqi"); } public static Connection getConnection(){ Connection conn = null; try { conn = connectionFactory.newConnection(); return conn; } catch (Exception e) { throw new RuntimeException(e); } } }
封装常量
public class RabbitConstant {
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
下面展示一些 内联代码片
。
生产者
public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null); String message = "hello白起666"; //四个参数 //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到 //队列名称 //额外的设置属性 //最后一个参数是要传递的消息字节数组 channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null,message.getBytes()); channel.close(); conn.close(); System.out.println("===发送成功==="); } }
消费者
public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection conn = RabbitUtils.getConnection(); //创建通信“通道”,相当于TCP中的虚拟连接 Channel channel = conn.createChannel(); //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列 //第一个参数:队列名称ID //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失 //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问 //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列 //其他额外的参数, null channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD,false, false, false, null); //从MQ服务器中获取数据 //创建一个消息消费者 //第一个参数:队列名 //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法 //第三个参数要传入DefaultConsumer的实现类 channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel)); } } class Reciver extends DefaultConsumer { private Channel channel; //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到 public Reciver(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body); System.out.println("消费者接收到的消息:"+message); System.out.println("消息的TagId:"+envelope.getDeliveryTag()); //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息 channel.basicAck(envelope.getDeliveryTag(), false); } }
代码
public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 1 ; i <= 100 ; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } }
public class SMS { private String name; private String mobile; private String content; public SMS(String name, String mobile, String content) { this.name = name; this.mobile = mobile; this.content = content; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMobile() { return mobile; } public void setMobile(String mobile) { this.mobile = mobile; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); }
public class SMSSender2 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender2-短信发送成功:" + jsonSMS); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
public class SMSSender3 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender3-短信发送成功:" + jsonSMS); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
模式
天气预报,是由某部门统一发布,然后新浪,百度等才会接受,采用广播模式处理
public class WeatherBureau { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); String input = new Scanner(System.in).next(); Channel channel = connection.createChannel(); //第一个参数交换机名字 其他参数和之前的一样 channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes()); channel.close(); connection.close(); } }
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
public class BiaDu { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
具体代码实现
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128"); channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201128天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
public class BiaDu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127"); channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
具体代码
public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new LinkedHashMap<String, String>(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry<String, String> me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
public class Sina { public static void main(String[] args) throws IOException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "us.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
public class BiaDu { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127"); // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。