当前位置:   article > 正文

rabbmitmq 实战_rabbmitmq warren

rabbmitmq warren

1,简单队列

一个生产者对应一个消费者。
1、生产者

Connection connection= ConnectionUtil.getConnection("127.0.10.1",
                5672,"/","admin_p","123");

        Channel channel =connection.createChannel();

        channel.queueDeclare(QUERY_NAME, false, false, false, null);

        String message ="hello word 22222";
        // 参数意思 
      
        channel.basicPublish("", QUERY_NAME, null, message.getBytes());
        channel.close();
        connection.close();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2、消费者

Connection connection= ConnectionUtil.getConnection("127.0.0.1", 5672
                , "/", "admin_p", "123");

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUERY_NAME,false,false,false,null);

        QueueingConsumer queueingConsumer =new QueueingConsumer(channel);

        //消费
        channel.basicConsume(QUERY_NAME,true,queueingConsumer);

        while (true)
        {

          QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
          String message=new String(delivery.getBody());
          System.out.print("获取到的消息----------------"+message);
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2、work 模式

一个生产者对应多个消费者
1、生产者

package rabbmitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Produce {

    private final static String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws IOException, InterruptedException {
        // 连接
        Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672
                , "/",
                "admin_p", "123");

        // 通道
        Channel cha =connection.createChannel();

        //队列
       cha.queueDeclare(QUEUE_NAME, false, false, false, null);

       for (int i=0;i<10;i++)
       {

           String message ="work"+i;

           cha.basicPublish("", QUEUE_NAME, null,message.getBytes());

           System.out.println(message);
           Thread.sleep(10);
       }

       cha.close();
       connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

2、消费者

消费者1

package rabbmitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Consumer {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, InterruptedException {

        Connection con= ConnectionUtil.getConnection("127.0.0.1",5672,"/",
                "admin_p", "123");

        Channel channel =con.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        //同一时刻只会给消费者发送一条消息
        channel.basicQos(1);
        channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
        while (true)
        {
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
            String mess=new String(delivery.getBody());
            System.out.println(mess);
            Thread.sleep(10);
            // 返回确认状太
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

消费者2

package rabbmitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class ConsumerTwo {

    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException, InterruptedException {

     Connection con = ConnectionUtil.getConnection("127.0.0.1",5672,"/",
             "admin_p", "123");

     Channel channel = con.createChannel();

     channel.queueDeclare(QUEUE_NAME, false, false, false, null);

     QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

     // 能者多劳模式
     channel.basicQos(1);
     channel.basicConsume(QUEUE_NAME, false, queueingConsumer);
     while (true)
     {

         QueueingConsumer.Delivery  delivery =queueingConsumer.nextDelivery();
         System.out.println("------"+new String(delivery.getBody()));
         Thread.sleep(1000);
         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

3、交换机的四种模式

交换器分为四种,分别是:direct、fanout、topic和 headers。
3.1、direct模式
如果路由键完全匹配的话,消息才会被投放到相应的队列。
>生产者

package rabbmitmq.root;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class rProduce {


    public  static final String EX_CHANGENAME="ex_direct";
    public static void main(String[] args) throws IOException {

        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();

        channel.exchangeDeclare(EX_CHANGENAME, "direct");

        String message ="hell0 word direct";


        channel.basicPublish(EX_CHANGENAME, "update", false, null,
                message.getBytes());

        channel.close();
        connection.close();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

消费者1

package rabbmitmq.root;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Consomer1 {

    public  static  final  String Queen_NAME="direct1";


    public  static final String EX_CHANGENAME="ex_direct";

    public static void main(String[] args) throws IOException, InterruptedException {


        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");
        Channel channel =connection.createChannel();

        channel.queueDeclare(Queen_NAME, false, false ,false , null);
        channel.queueBind(Queen_NAME, EX_CHANGENAME,"select");
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(Queen_NAME, queueingConsumer);

        while (true)
        {
            QueueingConsumer.Delivery delivery =queueingConsumer.nextDelivery();

            String message =new String(delivery.getBody());

            System.out.println("1========="+message);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

消费者2

package rabbmitmq.root;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {

    public  static  final  String Queen_NAME="direct2";

    public  static final String EX_CHANGENAME="ex_direct";


    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel = connection.createChannel();

        channel.queueDeclare(Queen_NAME, false, false, false, null);

        channel.queueBind(Queen_NAME, EX_CHANGENAME, "update");

        QueueingConsumer queueingConsumer= new QueueingConsumer(channel);

        channel.basicConsume(Queen_NAME, queueingConsumer);
        while (true)
        {

            QueueingConsumer.Delivery delivery =queueingConsumer.nextDelivery();

            String message =new String(delivery.getBody());

            System.out.println("2========="+message);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

3.2、fanout模式
当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

生产者

package rabbmitmq.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class xProduce {

    private   final  static String ex_changename="fanout_exchange";
    public static void main(String[] args) throws IOException {
        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();


        channel.exchangeDeclare(ex_changename,"fanout");

        String message="hello word fanout oooo";

        channel.basicPublish(ex_changename, "", null, message.getBytes());

        channel.close();
        connection.close();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

消费者1

package rabbmitmq.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class xConsumer {

    private final static String QUEUE_NAME = "fanout_queue_1";

    private final static String EXCHANGE_NAME = "fanout_exchange";
    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, queueingConsumer);

        while(true)
        {
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();

            System.out.println("1"+new String(delivery.getBody()));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

消费者2

package rabbmitmq.exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class xConsumer2 {
    private final static String QUEUE_NAME = "fanout_queue_2";

    private final static String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection("127.0.0.0",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        channel.basicQos(1);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, queueingConsumer);

        while(true)
        {
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();

            System.out.println("2"+new String(delivery.getBody()));
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

3.3、topic模式
设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

生产者

package rabbmitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class tProduce {

    private static  final String Exchangename="ex_topic";

    public static void main(String[] args) throws IOException {

        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();

        channel.exchangeDeclare(Exchangename, "topic");

        String message = "hello topic";

        channel.basicPublish(Exchangename, "update.*", null,
                message.getBytes());
        channel.close();
        connection.close();

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

消费者1

package rabbmitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Consumer1 {

    private static  final String Exchangename="ex_topic";
    private static final String QueenName="topic_1";
    public static void main(String[] args) throws IOException, InterruptedException {

        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p", "123");

        Channel channel =connection.createChannel();

        channel.queueDeclare(QueenName, false, false, false, null);

        channel.queueBind(QueenName, Exchangename,"update.*");

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(QueenName, queueingConsumer);


        while (true)
        {
            QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
            String outMessage=new String(delivery.getBody());
            System.out.println("1================="+outMessage);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

消费者2

package rabbmitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import rabbmitmq.Untity.ConnectionUtil;

import java.io.IOException;

public class Consumer2 {
    private static  final String Exchangename="ex_topic";
    private static final String QueenName="topic_2";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = ConnectionUtil.getConnection("127.0.0.1",
                5672, "/", "admin_p","123");
        Channel channel = connection.createChannel();
        channel.queueDeclare(QueenName, false, false, false, null);
        channel.queueBind(QueenName, Exchangename, "select.*");
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(QueenName, queueingConsumer);
         while (true)
         {

             QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();
             String outMessage =new String(delivery.getBody());
             System.out.println("2============"+outMessage);

             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
         }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4、 交换机模式下的mq工作方式。

在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/AllinToyou/article/detail/598216
推荐阅读
相关标签
  

闽ICP备14008679号