赞
踩
RabbitMQ技术学习更多资源请访问 https://www.itkc8.com
本文不介绍amqp和rabbitmq相关知识,请自行网上查阅
本文是基于spring-rabbit中间件来实现消息的发送接受功能
see http://www.rabbitmq.com/tutorials/tutorial-one-java.html
see http://www.springsource.org/spring-amqp
- <!-- for rabbitmq -->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>2.8.2</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-amqp</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>com.caucho</groupId>
- <artifactId>hessian</artifactId>
- <version>4.0.7</version>
- </dependency>
- </dependencies>
首先我们需要一个用来在app和rabbitmq之间传递消息的持有对象
public class EventMessage implements Serializable{ private String queueName; private String exchangeName; private byte[] eventData; public EventMessage(String queueName, String exchangeName, byte[] eventData) { this.queueName = queueName; this.exchangeName = exchangeName; this.eventData = eventData; } public EventMessage() { } public String getQueueName() { return queueName; } public String getExchangeName() { return exchangeName; } public byte[] getEventData() { return eventData; } @Override public String toString() { return "EopEventMessage [queueName=" + queueName + ", exchangeName=" + exchangeName + ", eventData=" + Arrays.toString(eventData) + "]"; } }
为了可以发送和接受这个消息持有对象,我们还需要需要一个用来序列化和反序列化的工厂
- public interface CodecFactory {
-
- byte[] serialize(Object obj) throws IOException;
-
- Object deSerialize(byte[] in) throws IOException;
-
- }
下面是编码解码的实现类,用了hessian来实现,大家可以自行选择序列化方式
public class HessionCodecFactory implements CodecFactory { private final Logger logger = Logger.getLogger(HessionCodecFactory.class); @Override public byte[] serialize(Object obj) throws IOException { ByteArrayOutputStream baos = null; HessianOutput output = null; try { baos = new ByteArrayOutputStream(1024); output = new HessianOutput(baos); output.startCall(); output.writeObject(obj); output.completeCall(); } catch (final IOException ex) { throw ex; } finally { if (output != null) { try { baos.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return baos != null ? baos.toByteArray() : null; } @Override public Object deSerialize(byte[] in) throws IOException { Object obj = null; ByteArrayInputStream bais = null; HessianInput input = null; try { bais = new ByteArrayInputStream(in); input = new HessianInput(bais); input.startReply(); obj = input.readObject(); input.completeReply(); } catch (final IOException ex) { throw ex; } catch (final Throwable e) { this.logger.error("Failed to decode object.", e); } finally { if (input != null) { try { bais.close(); } catch (final IOException ex) { this.logger.error("Failed to close stream.", ex); } } } return obj; } }
接下来就先实现发送功能,新增一个接口专门用来实现发送功能
- public interface EventTemplate {
-
- void send(String queueName,String exchangeName,Object eventContent) throws SendRefuseException;
-
- void send(String queueName,String exchangeName,Object eventContent,CodecFactory codecFactory) throws SendRefuseException;
- }
SendRefuseException是自定义的发送失败异常类
下面是它的实现类,主要的任务就是将数据转换为EventMessage
public class DefaultEventTemplate implements EventTemplate { private static final Logger logger = Logger.getLogger(DefaultEventTemplate.class); private AmqpTemplate eventAmqpTemplate; private CodecFactory defaultCodecFactory; // private DefaultEventController eec; // // public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate, // CodecFactory defaultCodecFactory, DefaultEventController eec) { // this.eventAmqpTemplate = eopAmqpTemplate; // this.defaultCodecFactory = defaultCodecFactory; // this.eec = eec; // } public DefaultEventTemplate(AmqpTemplate eopAmqpTemplate,CodecFactory defaultCodecFactory) { this.eventAmqpTemplate = eopAmqpTemplate; this.defaultCodecFactory = defaultCodecFactory; } @Override public void send(String queueName, String exchangeName, Object eventContent) throws SendRefuseException { this.send(queueName, exchangeName, eventContent, defaultCodecFactory); } @Override public void send(String queueName, String exchangeName, Object eventContent, CodecFactory codecFactory) throws SendRefuseException { if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName)) { throw new SendRefuseException("queueName exchangeName can not be empty."); } // if (!eec.beBinded(exchangeName, queueName)) // eec.declareBinding(exchangeName, queueName); byte[] eventContentBytes = null; if (codecFactory == null) { if (eventContent == null) { logger.warn("Find eventContent is null,are you sure..."); } else { throw new SendRefuseException( "codecFactory must not be null ,unless eventContent is null"); } } else { try { eventContentBytes = codecFactory.serialize(eventContent); } catch (IOException e) { throw new SendRefuseException(e); } } // 构造成Message EventMessage msg = new EventMessage(queueName, exchangeName, eventContentBytes); try { eventAmqpTemplate.convertAndSend(exchangeName, queueName, msg); } catch (AmqpException e) { logger.error("send event fail. Event Message : [" + eventContent + "]", e); throw new SendRefuseException("send event fail", e); } } }
注释的地方稍后会用到,主要是防止数据数据发送的地方没有事先声明
然后我们再实现接受消息
首先我们需要一个消费接口,所有的消费程序都实现这个类
- public interface EventProcesser {
- public void process(Object e);
- }
为了能够将不同类型的消息交由对应的程序来处理,我们还需要一个消息处理适配器
- /**
- * MessageListenerAdapter的Pojo
- * <p>消息处理适配器,主要功能:</p>
- * <p>1、将不同的消息类型绑定到对应的处理器并本地缓存,如将queue01+exchange01的消息统一交由A处理器来出来</p>
- * <p>2、执行消息的消费分发,调用相应的处理器来消费属于它的消息</p>
- *
- */
- public class MessageAdapterHandler {
-
- private static final Logger logger = Logger.getLogger(MessageAdapterHandler.class);
-
- private ConcurrentMap<String, EventProcessorWrap> epwMap;
-
- public MessageAdapterHandler() {
- this.epwMap = new ConcurrentHashMap<String, EventProcessorWrap>();
- }
-
- public void handleMessage(EventMessage eem) {
- logger.debug("Receive an EventMessage: [" + eem + "]");
- // 先要判断接收到的message是否是空的,在某些异常情况下,会产生空值
- if (eem == null) {
- logger.warn("Receive an null EventMessage, it may product some errors, and processing message is canceled.");
- return;
- }
- if (StringUtils.isEmpty(eem.getQueueName()) || StringUtils.isEmpty(eem.getExchangeName())) {
- logger.warn("The EventMessage's queueName and exchangeName is empty, this is not allowed, and processing message is canceled.");
- return;
- }
- // 解码,并交给对应的EventHandle执行
- EventProcessorWrap eepw = epwMap.get(eem.getQueueName()+"|"+eem.getExchangeName());
- if (eepw == null) {
- logger.warn("Receive an EopEventMessage, but no processor can do it.");
- return;
- }
- try {
- eepw.process(eem.getEventData());
- } catch (IOException e) {
- logger.error("Event content can not be Deserialized, check the provided CodecFactory.",e);
- return;
- }
- }
-
- protected void add(String queueName, String exchangeName, EventProcesser processor,CodecFactory codecFactory) {
- if (StringUtils.isEmpty(queueName) || StringUtils.isEmpty(exchangeName) || processor == null || codecFactory == null) {
- throw new RuntimeException("queueName and exchangeName can not be empty,and processor or codecFactory can not be null. ");
- }
- EventProcessorWrap epw = new EventProcessorWrap(codecFactory,processor);
- EventProcessorWrap oldProcessorWrap = epwMap.putIfAbsent(queueName + "|" + exchangeName, epw);
- if (oldProcessorWrap != null) {
- logger.warn("The processor of this queue and exchange exists, and the new one can't be add");
- }
- }
-
- protected Set<String> getAllBinding() {
- Set<String> keySet = epwMap.keySet();
- return keySet;
- }
-
- protected static class EventProcessorWrap {
-
- private CodecFactory codecFactory;
-
- private EventProcesser eep;
-
- protected EventProcessorWrap(CodecFactory codecFactory,
- EventProcesser eep) {
- this.codecFactory = codecFactory;
- this.eep = eep;
- }
-
- public void process(byte[] eventData) throws IOException{
- Object obj = codecFactory.deSerialize(eventData);
- eep.process(obj);
- }
- }
- }
这是正常情况下的消息处理方式,如果rabbitmq消息接受发生异常,也要监控到,新增一个消费类专门用来处理错误异常的消息
- public class MessageErrorHandler implements ErrorHandler{
-
- private static final Logger logger = Logger.getLogger(MessageErrorHandler.class);
-
- @Override
- public void handleError(Throwable t) {
- logger.error("RabbitMQ happen a error:" + t.getMessage(), t);
- }
-
- }
接下来我们可能需要一个专门配置和rabbitmq通信的一些信息,比如地址,端口等信息
public class EventControlConfig { private final static int DEFAULT_PORT = 5672; private final static String DEFAULT_USERNAME = "guest"; private final static String DEFAULT_PASSWORD = "guest"; private final static int DEFAULT_PROCESS_THREAD_NUM = Runtime.getRuntime().availableProcessors() * 2; private static final int PREFETCH_SIZE = 1; private String serverHost ; private int port = DEFAULT_PORT; private String username = DEFAULT_USERNAME; private String password = DEFAULT_PASSWORD; private String virtualHost; /** * 和rabbitmq建立连接的超时时间 */ private int connectionTimeout = 0; /** * 事件消息处理线程数,默认是 CPU核数 * 2 */ private int eventMsgProcessNum; /** * 每次消费消息的预取值 */ private int prefetchSize; public EventControlConfig(String serverHost) { this(serverHost,DEFAULT_PORT,DEFAULT_USERNAME,DEFAULT_PASSWORD,null,0,DEFAULT_PROCESS_THREAD_NUM,DEFAULT_PROCESS_THREAD_NUM,new HessionCodecFactory()); } public EventControlConfig(String serverHost, int port, String username, String password, String virtualHost, int connectionTimeout, int eventMsgProcessNum,int prefetchSize,CodecFactory defaultCodecFactory) { this.serverHost = serverHost; this.port = port>0?port:DEFAULT_PORT; this.username = username; this.password = password; this.virtualHost = virtualHost; this.connectionTimeout = connectionTimeout>0?connectionTimeout:0; this.eventMsgProcessNum = eventMsgProcessNum>0?eventMsgProcessNum:DEFAULT_PROCESS_THREAD_NUM; this.prefetchSize = prefetchSize>0?prefetchSize:PREFETCH_SIZE; } public String getServerHost() { return serverHost; } public int getPort() { return port; } public String getUsername() { return username; } public String getPassword() { return password; } public String getVirtualHost() { return virtualHost; } public int getConnectionTimeout() { return connectionTimeout; } public int getEventMsgProcessNum() { return eventMsgProcessNum; } public int getPrefetchSize() { return prefetchSize; } }
具体的发送、接受程序已经好了,接下来也是最重要的就是管理控制和rabbitmq的通信
public interface EventController { /** * 控制器启动方法 */ void start(); /** * 获取发送模版 */ EventTemplate getEopEventTemplate(); /** * 绑定消费程序到对应的exchange和queue */ EventController add(String queueName, String exchangeName, EventProcesser eventProcesser); /*in map, the key is queue name, but value is exchange name*/ EventController add(Map<String,String> bindings, EventProcesser eventProcesser); }
它的实现类如下:
- /**
- * 和rabbitmq通信的控制器,主要负责:
- * <p>1、和rabbitmq建立连接</p>
- * <p>2、声明exChange和queue以及它们的绑定关系</p>
- * <p>3、启动消息监听容器,并将不同消息的处理者绑定到对应的exchange和queue上</p>
- * <p>4、持有消息发送模版以及所有exchange、queue和绑定关系的本地缓存</p>
- * @author yangyong
- *
- */
- public class DefaultEventController implements EventController {
-
- private CachingConnectionFactory rabbitConnectionFactory;
-
- private EventControlConfig config;
-
- private RabbitAdmin rabbitAdmin;
-
- private CodecFactory defaultCodecFactory = new HessionCodecFactory();
-
- private SimpleMessageListenerContainer msgListenerContainer; // rabbitMQ msg listener container
-
- private MessageAdapterHandler msgAdapterHandler = new MessageAdapterHandler();
-
- private MessageConverter serializerMessageConverter = new SerializerMessageConverter(); // 直接指定
- //queue cache, key is exchangeName
- private Map<String, DirectExchange> exchanges = new HashMap<String,DirectExchange>();
- //queue cache, key is queueName
- private Map<String, Queue> queues = new HashMap<String, Queue>();
- //bind relation of queue to exchange cache, value is exchangeName | queueName
- private Set<String> binded = new HashSet<String>();
-
- private EventTemplate eventTemplate; // 给App使用的Event发送客户端
-
- private AtomicBoolean isStarted = new AtomicBoolean(false);
-
- private static DefaultEventController defaultEventController;
-
- public synchronized static DefaultEventController getInstance(EventControlConfig config){
- if(defaultEventController==null){
- defaultEventController = new DefaultEventController(config);
- }
- return defaultEventController;
- }
-
- private DefaultEventController(EventControlConfig config){
- if (config == null) {
- throw new IllegalArgumentException("Config can not be null.");
- }
- this.config = config;
- initRabbitConnectionFactory();
- // 初始化AmqpAdmin
- rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
- // 初始化RabbitTemplate
- RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
- rabbitTemplate.setMessageConverter(serializerMessageConverter);
- eventTemplate = new DefaultEventTemplate(rabbitTemplate,defaultCodecFactory, this);
- }
-
- /**
- * 初始化rabbitmq连接
- */
- private void initRabbitConnectionFactory() {
- rabbitConnectionFactory = new CachingConnectionFactory();
- rabbitConnectionFactory.setHost(config.getServerHost());
- rabbitConnectionFactory.setChannelCacheSize(config.getEventMsgProcessNum());
- rabbitConnectionFactory.setPort(config.getPort());
- rabbitConnectionFactory.setUsername(config.getUsername());
- rabbitConnectionFactory.setPassword(config.getPassword());
- if (!StringUtils.isEmpty(config.getVirtualHost())) {
- rabbitConnectionFactory.setVirtualHost(config.getVirtualHost());
- }
- }
-
- /**
- * 注销程序
- */
- public synchronized void destroy() throws Exception {
- if (!isStarted.get()) {
- return;
- }
- msgListenerContainer.stop();
- eventTemplate = null;
- rabbitAdmin = null;
- rabbitConnectionFactory.destroy();
- }
-
- @Override
- public void start() {
- if (isStarted.get()) {
- return;
- }
- Set<String> mapping = msgAdapterHandler.getAllBinding();
- for (String relation : mapping) {
- String[] relaArr = relation.split("\\|");
- declareBinding(relaArr[1], relaArr[0]);
- }
- initMsgListenerAdapter();
- isStarted.set(true);
- }
-
- /**
- * 初始化消息监听器容器
- */
- private void initMsgListenerAdapter(){
- MessageListener listener = new MessageListenerAdapter(msgAdapterHandler,serializerMessageConverter);
- msgListenerContainer = new SimpleMessageListenerContainer();
- msgListenerContainer.setConnectionFactory(rabbitConnectionFactory);
- msgListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
- msgListenerContainer.setMessageListener(listener);
- msgListenerContainer.setErrorHandler(new MessageErrorHandler());
- msgListenerContainer.setPrefetchCount(config.getPrefetchSize()); // 设置每个消费者消息的预取值
- msgListenerContainer.setConcurrentConsumers(config.getEventMsgProcessNum());
- msgListenerContainer.setTxSize(config.getPrefetchSize());//设置有事务时处理的消息数
- msgListenerContainer.setQueues(queues.values().toArray(new Queue[queues.size()]));
- msgListenerContainer.start();
- }
-
- @Override
- public EventTemplate getEopEventTemplate() {
- return eventTemplate;
- }
-
- @Override
- public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser) {
- return add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
- }
-
- public EventController add(String queueName, String exchangeName,EventProcesser eventProcesser,CodecFactory codecFactory) {
- msgAdapterHandler.add(queueName, exchangeName, eventProcesser, defaultCodecFactory);
- if(isStarted.get()){
- initMsgListenerAdapter();
- }
- return this;
- }
-
- @Override
- public EventController add(Map<String, String> bindings,
- EventProcesser eventProcesser) {
- return add(bindings, eventProcesser,defaultCodecFactory);
- }
-
- public EventController add(Map<String, String> bindings,
- EventProcesser eventProcesser, CodecFactory codecFactory) {
- for(Map.Entry<String, String> item: bindings.entrySet())
- msgAdapterHandler.add(item.getKey(),item.getValue(), eventProcesser,codecFactory);
- return this;
- }
-
- /**
- * exchange和queue是否已经绑定
- */
- protected boolean beBinded(String exchangeName, String queueName) {
- return binded.contains(exchangeName+"|"+queueName);
- }
-
- /**
- * 声明exchange和queue已经它们的绑定关系
- */
- protected synchronized void declareBinding(String exchangeName, String queueName) {
- String bindRelation = exchangeName+"|"+queueName;
- if (binded.contains(bindRelation)) return;
-
- boolean needBinding = false;
- DirectExchange directExchange = exchanges.get(exchangeName);
- if(directExchange == null) {
- directExchange = new DirectExchange(exchangeName, true, false, null);
- exchanges.put(exchangeName, directExchange);
- rabbitAdmin.declareExchange(directExchange);//声明exchange
- needBinding = true;
- }
-
- Queue queue = queues.get(queueName);
- if(queue == null) {
- queue = new Queue(queueName, true, false, false);
- queues.put(queueName, queue);
- rabbitAdmin.declareQueue(queue); //声明queue
- needBinding = true;
- }
-
- if(needBinding) {
- Binding binding = BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
- rabbitAdmin.declareBinding(binding);//声明绑定关系
- binded.add(bindRelation);
- }
- }
-
- }
搞定,现在可以将DefaultEventTemplate里的注释去掉了,接下来最后完成单元测试,为了测试传递对象,建立一个PO
- @SuppressWarnings("serial")
- public class People implements Serializable{
- private int id;
- private String name;
- private boolean male;
- private People spouse;
- private List<People> friends;
- public int getId() {
- return id;
- }
- public void setId(int id) {
- this.id = id;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public boolean isMale() {
- return male;
- }
- public void setMale(boolean male) {
- this.male = male;
- }
- public People getSpouse() {
- return spouse;
- }
- public void setSpouse(People spouse) {
- this.spouse = spouse;
- }
- public List<People> getFriends() {
- return friends;
- }
- public void setFriends(List<People> friends) {
- this.friends = friends;
- }
-
- @Override
- public String toString() {
- // TODO Auto-generated method stub
- return "People[id="+id+",name="+name+",male="+male+"]";
- }
- }
建立单元测试
- public class RabbitMqTest{
-
- private String defaultHost = "127.0.0.1";
-
- private String defaultExchange = "EXCHANGE_DIRECT_TEST";
-
- private String defaultQueue = "QUEUE_TEST";
-
- private DefaultEventController controller;
-
- private EventTemplate eventTemplate;
-
- @Before
- public void init() throws IOException{
- EventControlConfig config = new EventControlConfig(defaultHost);
- controller = DefaultEventController.getInstance(config);
- eventTemplate = controller.getEopEventTemplate();
- controller.add(defaultQueue, defaultExchange, new ApiProcessEventProcessor());
- controller.start();
- }
-
- @Test
- public void sendString() throws SendRefuseException{
- eventTemplate.send(defaultQueue, defaultExchange, "hello world");
- }
-
- @Test
- public void sendObject() throws SendRefuseException{
- eventTemplate.send(defaultQueue, defaultExchange, mockObj());
- }
-
- @Test
- public void sendTemp() throws SendRefuseException, InterruptedException{
- String tempExchange = "EXCHANGE_DIRECT_TEST_TEMP";//以前未声明的exchange
- String tempQueue = "QUEUE_TEST_TEMP";//以前未声明的queue
- eventTemplate.send(tempQueue, tempExchange, mockObj());
- //发送成功后此时不会接受到消息,还需要绑定对应的消费程序
- controller.add(tempQueue, tempExchange, new ApiProcessEventProcessor());
- }
-
- @After
- public void end() throws InterruptedException{
- Thread.sleep(2000);
- }
-
- private People mockObj(){
- People jack = new People();
- jack.setId(1);
- jack.setName("JACK");
- jack.setMale(true);
-
- List<People> friends = new ArrayList<>();
- friends.add(jack);
- People hanMeiMei = new People();
- hanMeiMei.setId(1);
- hanMeiMei.setName("韩梅梅");
- hanMeiMei.setMale(false);
- hanMeiMei.setFriends(friends);
-
- People liLei = new People();
- liLei.setId(2);
- liLei.setName("李雷");
- liLei.setMale(true);
- liLei.setFriends(friends);
- liLei.setSpouse(hanMeiMei);
- hanMeiMei.setSpouse(liLei);
- return hanMeiMei;
- }
-
- class ApiProcessEventProcessor implements EventProcesser{
- @Override
- public void process(Object e) {//消费程序这里只是打印信息
- Assert.assertNotNull(e);
- System.out.println(e);
- if(e instanceof People){
- People people = (People)e;
- System.out.println(people.getSpouse());
- System.out.println(people.getFriends());
- }
- }
- }
- }
源码地址请点击这里
RabbitMQ技术学习更多资源请访问 https://www.itkc8.com
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。