赞
踩
生产者连接rabbitMQ的代码:
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- public class RabbitProducer {
- private static final String EXCHANGE_NAME = "exchange_demo";
- private static final String ROUTING_KEY = "routingkey_demo";
- private static final String QUEUE_NAME = "queue_demo";
- private static final String IP_ADDRESS = "127.0.0.1";
- private static final int PORT = 5672; // RabbitMQ服务端默认端口号为5672
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(IP_ADDRESS);
- factory.setPort(PORT);
- factory.setUsername("zifeiy");
- factory.setPassword("passwd");
- Connection connection = factory.newConnection(); // 建立连接
- Channel channel = connection.createChannel(); // 创建信道
- // 创建一个type="direct"、持久化的、非自动删除的交换器
- channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
- // 创建一个持久化、非排他的、非自动删除的队列
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- // 将交换器和队列通过路由绑定
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
- // 发送一条持久化的消息:hello world!
- String message = "hello,world!";
- channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes());
- // 关闭资源
- channel.close();
- connection.close();
- }
- }
消费者连接rabbitMQ的代码:
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Address;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.Connection;
-
- public class RabbitConsumer {
- private static final String QUEUE_NAME = "queue_demo";
- private static final String IP_ADDRESS = "127.0.0.1";
- private static final int PORT = 5672;
-
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- Address[] addresses = new Address[] {
- new Address(IP_ADDRESS, PORT)
- };
- ConnectionFactory factory = new ConnectionFactory();
- factory.setUsername("zifeiy");
- factory.setPassword("passwd");
- // 这里的连接方式与生产者的demo略有不同,注意区分
- Connection connection = factory.newConnection(addresses); // 创建连接
- final Channel channel = connection.createChannel(); // 创建信道
- channel.basicQos(64); // 设置客户端最多接受未被ack的消息的个数
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("recv message: " + new String(body));
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- channel.basicConsume(QUEUE_NAME, consumer);
- // 等待回调函数执行完毕后,关闭资源
- TimeUnit.SECONDS.sleep(5);
- channel.close();
- connection.close();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。