rpm -ivh erlang-21.3-1.el7.x86_64.rpm #erlang环境
yum install socat -y #socat依赖
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm #rabbitmq安装
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
/sbin/service rabbitmq-server start
/sbin/service rabbitmq-server status
/sbin/service rabbitmq-server stop
开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
创建账号 rabbitmqctl add_user admin 123 设置用户角色 rabbitmqctl set_user_tags admin administrator#超级管理员 设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read> rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限 当前用户和角色 rabbitmqctl list_users 关闭应用的命令为 rabbitmqctl stop_app 清除的命令为 rabbitmqctl reset 重新启动命令为 rabbitmqctl start_app
<!-- 指定 jdk 编译版本--> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <!--rabbitmq 依赖客户端--> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- 操作文件流的一个依赖--> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies>
public class Producer { public static final String QUEUE_NAME="hello"; public static final String HOST=""; public static final String USER_NAME="admin"; public static final String PASSWORD="123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /** * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); /** * @param exchange the exchange to publish the message to * @param routingKey the routing key *@param mandatory true if the 'mandatory' flag is to be set * @param immediate true if the 'immediate' flag is to be set. Note that the RabbitMQ server does not support this flag. @param props other properties for the message - routing headers etc @param body the message body * */ String message="hello world!"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("发送消息完毕!"); } }
“By doing so we would simply make the program move on, close everything, and exit! This would be awkward because we want the process to stay alive while the consumer is listening asynchronously for messages to arrive.”
public class Consumer { public static final String QUEUE_NAME="hello"; public static final String HOST=""; public static final String USER_NAME="admin"; public static final String PASSWORD="123"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Channel channel = factory.newConnection().createChannel(); /** * @param queue the name of the queue * @param autoAck true if the server should consider messages * acknowledged once delivered; false if the server should expect * explicit acknowledgements * @param deliverCallback callback when a message is delivered * @param shutdownSignalCallback callback when the channel/connection is shut down * */ channel.basicConsume(QUEUE_NAME, true, (String consumerTag, Delivery message)->{ //处理消息 System.out.println("message:"+new String(message.getBody())); }, (String consumerTag)->{ System.out.println("消息被中断;"+consumerTag); }); System.out.println("消息接收完毕!"); } }
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to a queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
/** * 工作队列生产者 * */ public class Sender { public static final String QUEUE_NAME="hello"; public static void main(String[] args) { Channel channel = ChannelUtil.getChannel(); Scanner sc = new Scanner(System.in); while (sc.hasNext()){ String next = sc.next(); try { channel.queueDeclare(QUEUE_NAME, false,false, false, null); channel.basicPublish("", QUEUE_NAME,null, next.getBytes() ); System.out.println("工作队列1已经发送消息"+next); }catch (Exception e){ System.out.println("发送异常:"+e.getMessage()); } } } }
/** * 工作队列处理者,处理生产者产生的大量消息 * */ public class Worker { public static final String QUEUE_NAME="hello"; public static void main(String[] args) { new Thread(()->{ Channel channel = ChannelUtil.getChannel(); try { channel.basicConsume(QUEUE_NAME, false, (String consumerTag, Delivery message)->{ System.out.println("队列1:"+new String(message.getBody())); /** * @param1:消息标记,哪个消息应答了 * @param2:取消应答同一信道所有消息 * */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }, (String consumerTag)->{}); }catch (Exception e){ System.out.println("消息消费异常"); } },"线程1").start(); new Thread(()->{ Channel channel = ChannelUtil.getChannel(); try { channel.basicConsume(QUEUE_NAME, false, (String consumerTag, Delivery message)->{ System.out.println("队列2:"+new String(message.getBody())); /** * @param1:消息标记,哪个消息应答了 * @param2:取消应答同一信道所有消息 * */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }, (String consumerTag)->{}); }catch (Exception e){ System.out.println("消息消费异常"); } },"线程2").start(); } }
channel.queueDeclare(QUEUE_NAME, true,false, false, null);
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, next.getBytes() );
