当前位置:   article > 正文

Apache apusic---ActiveMQ中间件在JMS1.1、J2EE中的使用_apusicmq demo实例

apusicmq demo实例

ActiveMQ中间件在JMS1.1、J2EE中的使用

Apache的中间件---ActiveMQ对于企业级的应用很广泛。支持JMS1.1、J2EE1.4以上版本,可运行于JVM和大部分web容器中、支持多种语言客户端(Java、C、C++,ActionScript、Ajax等)、支持多种协议(stomp、openWire等)、对spring的支持也很良好、速度很快是JbossMQ的十倍、与openJMS、JbossMQ等开源框架相比,ActiveMQ有Apache的支持可以持续发展。
启动ActiveMQ的后台管理为:http://localhost:8161/admin/(登陆时候默认的user and password is admin/admin)
Queue(Point-to-Point)与Topic(publisher/Subscriber)的比较
1:jms queue执行load balancer语义
一条消息仅仅能够被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将会被保存到处理该message的consumer可用。如果一个consumer收到一条message后却不响应它,那么这条消息将被转移到另外一个consumer那儿。一个Queue可以有很多的consumer,并且在多个可用的consumer中负载均衡。
2:Topic实现publish和subscribe语义
一条消息被publish时,它将发送到所用感兴趣的订阅者,所以零到多个subscribe将被接受到消息的一个拷贝,但是在消息代理接收到消息的时,只有激活订阅的subscribe能够获得消息的一个拷贝。
下面为程序的实现例子:对java的支持
producer(sender)消息生产对象
package com.clark.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
private static final int SEND_NUMBER = 5;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionFactory  = null;
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection = null;
//Session:one sender or receiver message singleton thread and provide a transaction context
Session session = null;
/**
* Destination:message destination,sender who
* it include two message domain:Point-to-Point(Destinational call queue)
* and Publisher/Sucscribler Model(Destinational call topic)
* Point-To-Point character:one message can only one consumer
* between message produer and consumer have no relationship.No matter consumer whether is running
* status when producer send message,It can receiver message
* Publisher/Subscribler:one message has many consumer
* producer and consumer have relationship.Subscribler one topic consumer can only 
* consumer publisher send message
*/

Destination destination = null;
//MessageProducer
MessageProducer producer;
//TextMessage message
//constructor ConntectionFactory Instance
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
try {
connection = connectionFactory.createConnection();
//start
connection.start();
/**
* Session.AUTO_ACKNOWLEDGE:when consumer receive message succes or MessageListener.onMessage return success
* Session.CLIENT_ACKNOWLEDGE:it is session acknowledge,as long as make sure one, all is be ok
* Session.DUPS_ACKNOWLEDGE:session delay message commit
*/
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("FirstQueue");
//get message producer(sender)
producer = session.createProducer(destination);
/**
* JMS provide message commit model:PERSISTENT and NON_PERSISTENT
*/
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//constructor message
sendMessage(session,producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
}
}

private static void sendMessage(Session session, MessageProducer producer) throws JMSException {
for (int i = 0; i < SEND_NUMBER; i++) {
/**
* JMS message type:TextMessage、MapMessage、BytesMessage、StreamMessage and ObjectMessage
*/
TextMessage message = session.createTextMessage("ActiveMQ send message:"+i);
// 发送消息到目的地方
            System.out.println("send message:" + "ActiveMq send message" + i);
            producer.send(message);
}

}
}
receive(consumer)消费者对象
package com.clark.activemq;


import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class Receiver {
    public static void main(String[] args) {
        // ConnectionFactory :连接工厂,JMS 用它创建连接
        ConnectionFactory connectionFactory;
        // Connection :JMS 客户端到JMS Provider 的连接
        Connection connection = null;
        // Session: 一个发送或接收消息的线程
        Session session;
        // Destination :消息的目的地;消息发送给谁.
        Destination destination;
        // 消费者,消息接收者
        MessageConsumer consumer;
        connectionFactory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_USER,
                ActiveMQConnection.DEFAULT_PASSWORD,
                "tcp://localhost:61616");//URL must be this value ,else will throw exception
        try {
            // 构造从工厂得到连接对象
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            // 获取操作连接
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            destination = session.createQueue("FirstQueue");//this queue name can own create--->will generator in ActiveMQ Admin console
            consumer = session.createConsumer(destination);
            while (true) {
                //设置接收者接收消息的时间,为了便于测试,这里定为100s
            /**
            * consumer method has two type:synchronization and asynchronization
            * syschronization:we will use consumer.receive() display receive message,it can block util message come here
            * asynchronization:consumer can register a message listener,use define take action when message come here
            */
                TextMessage message = (TextMessage) consumer.receive(100000);
                if (null != message) {
                    System.out.println("收到消息" + message.getText());
                } else {
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != connection)
                    connection.close();
            } catch (Throwable ignore) {
            }
        }
    }
}

===========测试方法=================================
先运行Receiver.java class类,console不会输出任何的信息
当运行Sender.java类的时候,在Sender类中的console会输出
send message:ActiveMq send message0
send message:ActiveMq send message1
send message:ActiveMq send message2
send message:ActiveMq send message3
send message:ActiveMq send message4


此时我们在Receiver类中的console会输出

收到消息ActiveMQ send message:0
收到消息ActiveMQ send message:1
收到消息ActiveMQ send message:2
收到消息ActiveMQ send message:3
收到消息ActiveMQ send message:4

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/522744
推荐阅读
相关标签
  

闽ICP备14008679号