赞
踩
不足:
工具类,获取mq连接
/** * 获取MQ连接 * @return * @throws IOException * @throws TimeoutException */ public static Connection getConnection() throws IOException, TimeoutException { //定义工厂 ConnectionFactory factory = new ConnectionFactory(); //服务地址 factory.setHost("127.0.0.1"); //AMQP端口 factory.setPort(5672); //vhost factory.setVirtualHost("/"); //用户密码 factory.setUsername("guest"); factory.setPassword("guest"); return factory.newConnection(); }
private static final String QUEUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); //从连接中获取通道 Channel channel = connection.createChannel(); //创建队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发布消息 String msg = "hello simple"; channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); channel.close(); connection.close(); }
出现队列
队列进行Get Message(s)
取出消息,Message变为0。
public class Consumer { private static final String QUEUE_NAME="test_simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = ConnectionUtils.getConnection(); //创建频道 Channel channel = connection.createChannel(); //队列声明(生产者中声明过可以不写) channel.queueDeclare(QUEUE_NAME,false,false,false,null); //消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //事件模型,一旦有消息进入队列即触发方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("接收到的消息:" + msg); } }; //监听队列 channel.basicConsume(QUEUE_NAME
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。