当前位置:   article > 正文

Apusic应用服务器文档_com.apusic.ams.embed

com.apusic.ams.embed
JAVA 消息服务(JMS)定义了Java 中访问消息中间件的接口。JMS 只是接口,并没有给予实现,实现JMS 接口的消息中间件称为JMS Provider,Apusic应用服务器实现了JMS接口,用户可以通过使用JMS接口,在Apusic中进行JMS编程。 Apusic支持JMS1.0.2版本,JMS1.0.2是本文档书写时JMS API最新发布版。有关JMS1.0.2的特性,参见 JMS1.0.2特性

消息中间件提供企业数据的异步传输,通过消息中间件,一些原本互相孤立的业务组件可以组合成一个可靠的、灵活的系统。
    消息中间件大致分为两类:Point-to-Point(PTP) 和 Publish-Subscribe(Pub/Sub),Apusic支持这两种模型。
    PTP是点对点传输消息,建立在消息队列的基础上,每个客户端对应一个消息队列,客户端发送消息到对方的消息队列中,从自己的消息队列读取消息。
    Pub/Sub是将消息定位到某个层次结构栏目的节点上,Pub/Sub 通常是匿名的并能够动态发布消息,Pub/Sub 必须保证某个节点的所有发布者(Publisher)发布的信息准确无误地发送到这个节点的所有消息订阅者(Subscriber)。

JMS 支持两种消息类型PTP 和Pub/Sub,分别称作:PTP Domain 和Pub/Sub Domain,这两种接口都继承统一的JMS Parent 接口,JMS 主要接口如下所示:
JMS Parent   PTPDomainPub/Sub Domain
ConnectionFactoryQueueConnectionFactoryTopicConnectionFactory
ConnectionQueueConnectionTopicConnection
DestinationQueueTopic
SessionQueueSessionTopicSession
MessageProducerQueueSenderTopicPublisher
MessageConsumerQueueReceiver,QueueBrowserTopicSubscriber

    以下是对这些接口的简单描述:
    ConnectionFactory :连接工厂,JMS 用它创建连接
    Connection :JMS 客户端到JMS Provider 的连接
    Destination :消息的目的地
    Session: 一个发送或接收消息的线程
    MessageProducer: 由Session 对象创建的用来发送消息的对象
    MessageConsumer: 由Session 对象创建的用来接收消息的对象

JDBC: JMS 客户端可以使用JDBC 接口,可以将JDBC 和JMS 包含在一个事务里。这种包含可以在EJB 里,也可以直接调用JTA(Java Transaction API)接口实现。
    JavaBeans: JavaBeans可以用JMS Session 发送接收消息。
    EJB: EJB2.0 规范中定义了新的Message-Driven Beans 组件模型。
    JTA(Java Transaction API): JMS 客户端可以用JTA 启动事务。JMS Provider 可以选择是否支持分布式事务。
    JTS(Java Transaction Service): JMS 可以和JTS 一起组成一个分布式事务,如将发送接收消息和更新数据库包含在一个事务里。
    JNDI: JMS客户端通过JNDI 调用JMS 中的对象。

JMS 消息由以下几部分组成:消息头,属性,消息体。
    消息头(Header) - 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性如:JMSDestination,JMSMessageID 等。
消息头由谁设置
JMSDestinationsend 或 publish 方法
JMSDeliveryModesend 或 publish 方法
JMSExpirationsend 或 publish 方法
JMSPrioritysend 或 publish 方法
JMSMessageIDsend 或 publish 方法
JMSTimestampsend 或 publish 方法
JMSCorrelationID客户
JMSReplyTo客户
JMSType客户
JMSRedeliveredJMS Provider
属性(Properties) - 除了消息头中定义好的标准属性外,JMS 提供一种机制增加新属性到消息头中,这种新属性包含以下几种:
    1. 应用需要用到的属性;
    2. 消息头中原有的一些可选属性;
    3. JMS Provider 需要用到的属性。
    标准的JMS 消息头包含以下属性: 
   JMSDestination --消息发送的目的地 
   JMSDeliveryMode --传递模式, 有两种模式: PERSISTENT 和NON_PERSISTENT,PERSISTENT 表示该消息一定要被送到目的地,否则会导致应用错误。NON_PERSISTENT 表示偶然丢失该消息是被允许的,这两种模式使开发者可以在消息传递的可靠性和吞吐量之间找到平衡点。
   JMSMessageID 唯一识别每个消息的标识,由JMS Provider 产生。
   JMSTimestamp 一个消息被提交给JMS Provider 到消息被发出的时间。
   JMSCorrelationID 用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息。
   JMSReplyTo 提供本消息回复消息的目的地址。
   JMSRedelivered 如果一个客户端收到一个设置了JMSRedelivered 属性的消息,则表示可能该客户端曾经在早些时候收到过该消息,但并没有签收(acknowledged)。
   JMSType 消息类型的识别符。
   JMSExpiration 消息过期时间,等于QueueSender 的send 方法中的timeToLive 值或TopicPublisher 的publish 方法中的timeToLive 值加上发送时刻的GMT 时间值。如果timeToLive值等于零,则JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
  
JMSPriority 消息优先级,从0-9 十个级别,0-4 是普通消息,5-9 是加急消息。JMS 不要求JMS Provider 严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达。

   
消息体(Body) - JMS API 定义了5种消息体格式,也叫消息类型,你可以使用不同形式发送接收数据并可以兼容现有的消息格式,下面描述这5种类型:
消息类型消息体
TextMessagejava.lang.String对象,如xml文件内容
MapMessage名/值对的集合,名是String对象,值类型可以是Java任何基本类型
BytesMessage字节流
StreamMessageJava中的输入输出流
ObjectMessageJava中的可序列化对象
Message没有消息体,只有消息头和属性。
下例演示创建并发送一个TextMessage到一个队列:
TextMessage message = queueSession.createTextMessage();
message.setText(msg_text); // msg_text is a String
queueSender.send(message);
下例演示接收消息并转换为合适的消息类型:
Message m = queueReceiver.receive();
if (m instanceof TextMessage) {
TextMessage message = (TextMessage) m;
System.out.println("Reading message: " + message.getText());
} else {
// Handle error
}

消息的同步接收
同步接收是指客户端主动去接收消息,JMS 客户端可以采用MessageConsumer 的receive方法去接收下一个消息。
    消息的异步接收
异步接收是指当消息到达时,主动通知客户端。JMS 客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer,这样,每当消息到达时,JMS Provider 会调用MessageListener中的onMessage 方法。

PTP(Point-to-Point)模型是基于队列的,发送方发消息到队列,接收方从队列接收消息,队列的存在使得消息的异步传输成为可能。和邮件系统中的邮箱一样,队列可以包含各种消息,JMS Provider 提供工具管理队列的创建、删除。JMS PTP 模型定义了客户端如何向队列发送消息,从队列接收消息,浏览队列中的消息。
    下面描述JMS PTP 模型中的主要概念和对象:
名称描述
Queue由JMS Provider 管理,队列由队列名识别,客户端可以通过JNDI 接口用队列名得到一个队列对象。
TemporaryQueue由QueueConnection 创建,而且只能由创建它的QueueConnection 使用。
QueueConnectionFactory 客户端用QueueConnectionFactory 创建QueueConnection 对象。
QueueConnection 一个到JMS PTP provider 的连接,客户端可以用QueueConnection 创建QueueSession 来发送和接收消息。
QueueSession 提供一些方法创建QueueReceiver 、QueueSender、QueueBrowser 和TemporaryQueue。如果在QueueSession 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当接收者下次连接到相同的队列时,这些消息还会被再次接收。
QueueReceiver 客户端用QueueReceiver 接收队列中的消息,如果用户在QueueReceiver 中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。
QueueSender 客户端用QueueSender 发送消息到队列。
QueueBrowser 客户端可以QueueBrowser 浏览队列中的消息,但不会收走消息。
QueueRequestor JMS 提供QueueRequestor 类简化消息的收发过程。QueueRequestor 的构造函数有两个参数:QueueSession 和queue,QueueRequestor 通过创建一个临时队列来完成最终的收发消息请求。
可靠性(Reliability)队列可以长久地保存消息直到接收者收到消息。接收者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。

JMS Pub/Sub 模型定义了如何向一个内容节点发布和订阅消息,这些节点被称作主题(topic)。
    主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传送。
    下面描述JMS Pub/Sub 模型中的主要概念和对象:
名称描述
订阅(subscription)消息订阅分为非持久订阅(non-durable subscription)和持久订阅(durable subscrip-tion),非持久订阅只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。持久订阅时,客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
Topic主题由JMS Provider 管理,主题由主题名识别,客户端可以通过JNDI 接口用主题名得到一个主题对象。JMS 没有给出主题的组织和层次结构的定义,由JMS Provider 自己定义。
TemporaryTopic临时主题由TopicConnection 创建,而且只能由创建它的TopicConnection 使用。临时主题不能提供持久订阅功能。
TopicConnectionFactory客户端用TopicConnectionFactory 创建TopicConnection 对象。
TopicConnectionTopicConnection 是一个到JMS Pub/Sub provider 的连接,客户端可以用TopicConnection创建TopicSession 来发布和订阅消息。
TopicSessionTopicSession 提供一些方法创建TopicPublisher、TopicSubscriber、TemporaryTopic 。它还提供unsubscribe 方法取消消息的持久订阅。
TopicPublisher客户端用TopicPublisher 发布消息到主题。
TopicSubscriber客户端用TopicSubscriber 接收发布到主题上的消息。可以在TopicSubscriber 中设置消息过滤功能,这样,不符合要求的消息不会被接收。
Durable TopicSubscriber如果一个客户端需要持久订阅消息,可以使用Durable TopicSubscriber,TopSession 提供一个方法createDurableSubscriber创建Durable TopicSubscriber 对象。
恢复和重新派送(Recovery and Redelivery)非持久订阅状态下,不能恢复或重新派送一个未签收的消息。只有持久订阅才能恢复或重新派送一个未签收的消息。
TopicRequestorJMS 提供TopicRequestor 类简化消息的收发过程。TopicRequestor 的构造函数有两个参数:TopicSession 和topic。TopicRequestor 通过创建一个临时主题来完成最终的发布和接收消息请求。
可靠性(Reliability)当所有的消息必须被接收,则用持久订阅模式。当丢失消息能够被容忍,则用非持久订阅模式。
Apusic JMS 配置文件包括两部分: /usr/apusic/config/apusic.conf 和/usr/apusic/config/jms.xml,假设Apusic 的安装目录是/usr/apusic。
广义上说,一个JMS 应用是几个JMS 客户端交换消息,开发JMS 客户端应用由以下几步构成:
  • 用JNDI 得到ConnectionFactory 对象;
  • 用JNDI 得到目标队列或主题对象,即Destination 对象;
  • 用ConnectionFactory 创建Connection 对象;
  • 用Connection 对象创建一个或多个JMS Session;
  • 用Session 和Destination 创建MessageProducer 和MessageConsumer;
  • 通知Connection 开始传递消息。
PTP 模型主要包含消息的发送和接收,下面我们分别举例说明:

发送消息

第一,启动Apusic应用服务器。

第二,打开管理工具:在浏览器中输入http://localhost:6882,缺省管理员为admin,密码也是admin。

第三,配置队列连接创建器:在管理工具中点击“基本配置”-->“消息”,点击“队列连接创建器”,如果存在JNDI名为 “jms/QueueConnectionFactory”的队列连接创建器,则无须创建,否则点击“创建”,添加必要信息,“队列连接创建器名称”为 “QueueConnectionFactory”,“队列连接创建器JNDI名”为“jms/QueueConnectionFactory”,“连接 是否可以匿名访问”为“是”,其它项为缺省设置,点击“保存”。如下:

第四,配置队列:在管理工具中点击“基本配置”-->“消息”,点击“队列”,如果存在JNDI名为 “jms/QueueConnectionFactory”的队列,则无须创建,否则点击“创建”,添加必要信息,“队列名称”为 “testQueue”,“队列JNDI名”为“myQueue”,其它项为缺省设置,点击“保存”。如下:

第五,编写发送消息客户端代码:Send.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2002
* Company: Apusic
* @author Michael
* @version 1.0
*/

public class Send{

String queueName = "myQueue";
QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueSender queueSender = null;
TextMessage message = null;

public static void main(String[] args) throws Exception {

InitialContext ic = getInitialContext();
Send sender = new Send();
sender.init(ic) ;
sender.sendMessage();
sender.close();

}

public void init(InitialContext ctx) throws Exception{

queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
queueConnection = queueConnectionFactory.createQueueConnection();
queue = (Queue) ctx.lookup(queueName);
}

public void sendMessage() throws JMSException,RemoteException{

queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.PERSISTENT);
message = queueSession.createTextMessage();
message.setText("The Message from myQueue");
queueSender.send(message);
}

public void close() throws JMSException{

if(queueConnection!=null)
queueConnection.close();
}

private static InitialContext getInitialContext() throws NamingException{

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory");
env.put(Context.PROVIDER_URL, "rmi://localhost:6888");
return (new InitialContext(env));

}
}
接收消息

如果管理工具中没有JNDI名为“jms/QueueConnectionFactory”的队列连接创建器和“myQueue”的队列,则依照上面一到四步进行设置,否则可直接编写接收消息客户端代码:Receive.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2002
* Company: Apusic
* @author Michael
* @version 1.0
*/

public class Receive{

String queueName = "myQueue";
QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
QueueConnection queueConnection = null;
QueueSession queueSession = null;
QueueReceiver queueReceiver = null;
TextMessage message = null;

public static void main(String[] args) throws Exception {

InitialContext ic = getInitialContext();
Receive receiver = new Receive();
receiver.init(ic) ;
receiver.TBreceiveMessage();//你可以在此处调用YBreceiveMessage
receiver.close();
}

public void init(InitialContext ctx) throws Exception{

queueConnectionFactory = (QueueConnectionFactory)ctx.lookup("jms/QueueConnectionFactory");
queueConnection = queueConnectionFactory.createQueueConnection();
queue = (Queue) ctx.lookup(queueName);
}

public void TBreceiveMessage() throws NamingException, JMSException,RemoteException{

queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
queueConnection.start();
for (;;) {
message = (TextMessage) queueReceiver.receive();
System.out.println("Reading message: " + message.getText());
if (message.getText().equals("quit"))
break;
}
}

public void YBreceiveMessage() throws NamingException, JMSException,RemoteException,IOException{

queueSession = queueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
queueReceiver = queueSession.createReceiver(queue);
//register my textListener which comes from MessageListener
TextMessageListener textListener = new TextMessageListener();
queueReceiver.setMessageListener(textListener);
queueConnection.start();

System.out.println("To end program, enter Q or q, then ");
InputStreamReader reader = new InputStreamReader(System.in);
char answer = '/0';

while (!((answer == 'q') || (answer == 'Q')))
answer = (char)reader.read();
}

public void close() throws JMSException{

if(queueReceiver!=null)
queueReceiver.close();
if(queueSession!=null)
queueSession.close();
if(queueConnection!=null)
queueConnection.close();
}

private static InitialContext getInitialContext() throws NamingException{

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"com.apusic.jndi.InitialContextFactory");
env.put(Context.PROVIDER_URL, "rmi://localhost:6888");
return (new InitialContext(env));

}
}
其中异步接收时使用的TextMessageListener代码如下:
import javax.jms.MessageListener;
import javax.jms.Message;
import javax.jms.TextMessage;
import javax.jms.JMSException;

/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2002
* Company: Apusic
* @author Michael
* @version 1.0
*/

public class TextMessageListener implements MessageListener {

public TextMessageListener() {}

public void onMessage(Message m) {

TextMessage msg = (TextMessage) m;
try {
System.out.println("Async reading message: " + msg.getText() +
" (priority=" + msg.getJMSPriority() + ")");
} catch (JMSException e) {
System.out.println("Exception in onMessage(): " + e.toString());
}
}
}
最后,运行程序:在Windows下打开两个DOS窗口,首先运行Receive 然后运行Send,

如果是异步接收消息,在运行Receive 的窗口中可以看到输出:
To end program, enter Q or q, then
Async reading message: The Second Message from testQueue (priority=4)

如果是同步接收消息,在运行Receive 的窗口中可以看到输出:
Reading message: The Message from testQueue

Pub/Sub 模型主要包含消息的发布和订阅,下面我们分别举例说明:

发布消息

如果管理工具中没有JNDI名为“jms/TopicConnectionFactory”的队列连接创建器和“myTopic”的队列,则依照上面一到四步进行设置,否则可直接编写发布消息客户端代码:Published.java

import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2002
* Company: Apusic
* @author Michael
* @version 1.0
*/

public class Published{

String topicName = "myTopic";
TopicConnectionFactory topicConnectionFactory = null;
Topic topic = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
TopicPublisher topicPublisher = null;
String msgText = null;
TextMessage message = null;

public static void main(String[] args) throws Exception {

InitialContext ic = getInitialContext();
Published publisher = new Published();
publisher.init(ic) ;
publisher.publish();
publisher.close();
}

public void init(InitialContext ctx) throws Exception{

topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
topic = (Topic) ctx.lookup(topicName);
}

public void publish() throws NamingException, JMSException,RemoteException{

topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createTextMessage();
msgText = "This is the published message";
message.setText(msgText);
topicPublisher.publish(message);
}

public void close() throws JMSException{

if(topicConnection!=null)
topicConnection.close();
}

private static InitialContext getInitialContext() throws NamingException{

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory");
env.put(Context.PROVIDER_URL, "rmi://localhost:6888");
return (new InitialContext(env));

}

}
订阅消息
import javax.jms.*;
import javax.naming.*;
import java.io.*;
import java.util.*;
import java.rmi.RemoteException;
/**
* Title: JMS
* Description: JMS Test
* Copyright: Copyright (c) 2002
* Company: Apusic
* @author Michael
* @version 1.0
*/

public class Subscriber{

String topicName = "myTopic";
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
Topic topic = null;
TopicSession topicSession = null;
TopicSubscriber topicSubscriber = null;
TextMessage message = null;

String id = "durable";

public static void main(String[] args) throws Exception {

InitialContext ic = getInitialContext();
Subscriber subscriber = new Subscriber();
subscriber.init(ic) ;
subscriber.subscribe();
subscriber.close();
}

public void init(InitialContext ctx) throws Exception{

topicConnectionFactory = (TopicConnectionFactory)ctx.lookup("jms/TopicConnectionFactory");
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setClientID(id) ;
topic = (Topic) ctx.lookup(topicName);
}

public void subscribe() throws NamingException, JMSException,RemoteException{

topicSession = topicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(topic,id);
topicConnection.start();
message = (TextMessage) topicSubscriber.receive();
System.out.println("SUBSCRIBER THREAD: Reading message: " + message.getText());

}

public void close() throws JMSException{

if(topicConnection!=null)
topicConnection.close();
}

private static InitialContext getInitialContext() throws NamingException{

Hashtable env = new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY,"com.apusic.jndi.InitialContextFactory");
env.put(Context.PROVIDER_URL, "rmi://localhost:6888");
return (new InitialContext(env));

}

}

最后,运行程序:在Windows下打开两个DOS窗口,首先运行Subscriber,然后运行Published,在运行Subscriber的窗口中可以看到输出:

SUBSCRIBER THREAD: Reading message: This is the published message

利用Apusic JMS 配置文件(%Apusic_home%/config/jms.xml),你可以对队列和主题设定访问权限,可以详细规定Apusic中的用户和组,可以对队列和主题进行何种操作。
    对于队列,可以设定发送(send)、接收(receive)和浏览(browse)权限;对于主题,可以设定发布(publish)、订阅(subscribe)、持久订阅(subscribe-durable)和取消持久订阅(unsubscribe)权限。
    客户端用QueueConnectionFactory 的createQueueConnection(java.lang.String userName,java.lang.String password)方法创建队列连接时输入用户身份,或者用createTopicConnection(java.lang.String userName, java.lang.String password)方法创建主题连接时输入用户身份。
    下面的例子授权用户larry 可以对队列testQueue 进行接收和浏览:

首先创建testQueue队列,如果Apusic不存在testQueue队列,则通过管理工具创建,方法同上。
    建立安全角色(security-role)rbt,将该角色映射到用户larry,使larry 拥有该角色包含的所有权限。如果Apuisc应用服务器不存在larry用户,在管理工具中“用户管理”-->“用户/组” 中创建larry用户。然后可参考apusic.jar中jms.dtd的定义在jms.xml中加入如下片段:       
    <security-role>
        <role-name>rbt</role-name>
        <principal>larry</principal>
    </security-role>

设置安全角色rbt的权限
    <destination-permission>
        <role-name>rbt</role-name>
        <destination-method>
            <queue-name>testQueue</queue-name>
            <method-name>receive</method-name>
            <method-name>browse</method-name>
        </destination-method>
    </destination-permission>

JMS 是J2EE 规范中提出的消息中间件服务(Java Message Service™)规范,提供应用程序间 异步或同步的消息传递和管理服务,Apusic 应用服务器中包含了高效、可靠的消息服务。Apusic应用服务器中的消息服务接口完全遵循JMS API规范。提高了企业应用中各组件的可移植性、松耦合特性,同时更加高效可靠;为分布式企业应用异步交换关键业务数据和事件提供了可靠而灵活的服务。
Apusic消息服务提供对消息队列(Queue)和消息主题(Topic)的管理,消息的发送由消息服务负责完成,原理如下:
Client----->Computer A(Router x)----------Computer B(Router z)
| Connector 3 |
| Connector 1 |Connector 2
| |
Computer C(Router y)--------------------
Computer A,B,C 分别为提供消息服务的应用服务器,称为Router,他们之间的连接被称为Connector,客户端直接与Computer A 连接,客户端要把消息传送到Computer B 的 队列和主题时,只需指明接收消息的主机名和队列或主题的名称发送消息即可,Computer A 中的消息服务将会查找配置文件中的设置,如从A 到B 间有通路连通,消息服务将会按照 最短路径发送消息到指定服务器(Computer B) 的指定队列或主题,如A 到B 将无有效连接, 消息将被保存在某一连通的服务器上,待有有效通路之后再发送消息。
根据J2EE1.3规范,Apusic应用服务器提供了两个主要的消息服务方面的新特性:
多个提供消息服务的应用服务器可以组成一个虚拟的JMS网络,每个提供消息服务的应用服务器可被视为一个JMS网络中的节点,而节点之间的连接则通过声明消息路由提供。只要用户连接到JMS网络中任何一个节点,即可向网络中的其他任意节点发送消息。
为提供消息服务的可靠性,和JMS网络中消息的暂时存储,Apusic应用服务器使用了一个可靠的消息存储机制。
Apusic消息服务消息路由与存储配置的部分存在于Apusic应用服务器目录下config子目录中, apusic.conf文件中的相关配置段。 apusic.conf 文件中可对消息服务的消息存储目录和消息路由进行配置。
Apusic应用服务器的消息存储的配置,通过编辑Apusic应用服务器目录下,config子目录中的 apusic.conf 配置文件的相关配置段进行。缺省的配置段如下:
...
<SERVICE
CLASS="com.apusic.jms.server.JMSServer">
<ATTRIBUTE NAME="BackStoreDirectory" VALUE="store/jms"/>
</SERVICE>
...
此配置段指定了消息服务及其使用的消息存储目录,其中,名为BackStoreDirectory属性的值指定了消 息存储目录的路径,可使用绝对路径如:c:/message_store/或usr/message_store来指定,亦可使用相对于Apusic安装 目录的相对路径如:store/my_message_store来指定,如指定目录不存在,应用服务器将创建指定的目录。
在一个由多台提供消息服务的应用服务器组成的JMS网络上,每个提供消息服务的应用服务器可被视为一个JMS网络中 的节点,消息路由意味着节点之间的路径。只要用户连接到JMS网络中任何一个节点,即可向网络中的其他任意节点发送消息,由消息服务根据消息路由提供消息 在JMS网络中的最佳传输机制。如下图所示:
消息路由图示-1
A、B、C、D、E 五个节点上都是提供消息服务的应用服务器,五个节点连通成一个网络,连接到 A 节点的客户端可以发消息到任意五个节点,消息服务会自动寻找一条最佳路径传递消息到目标节点。如从A 发消息到D,有两条连通路径:A->B->C->D 和A->E->D,其中 A->E->D 经过的节点最少,该路径为最佳路径,应用服务器将根据此路径对消息进行传递。
消息路由图示-2
在一个JMS网络中,如上图所示,每个节点都是一个消息路由器(Router),每个消息路由器有一个路由器名,以区别于其他的路由器。路由器名可以和主机名相同, 也可以不同。
每两个节点之间的直接路径被称为路由连接器(RoutingConnector),网络中每两个节点间的直接路径必 须事先进行申明性的定义。每个路由连接器的申明是单向的,如在A节点上申明到B的路由连接器,说明可以由A到B,但如果需要由B到A,则需要在B节点上申 明到A的路由连接器。
假设上图中的五个节点A、B、C、D、E 是某个实际网络中五个提供消息服务的Apusic应用服务器主机,实际网络中的主机名分别是computerA、computerB、 computerC、computerD、computerE,在由这五个节点组成的JMS网络中,对应的路由器名分别是routerA、 routerB、routerC、routerD、routerE,要使这五个节点结成类似于上图的JMS网络,即每个节点都有一条指向下一个节点的路 径,最后一个节点的路径则指向最开始的节点,形成一个闭合的环,而每个节点都知道环中的其他节点,消息客户连接到环中的任意节点,都可向其他节点发送消 息,则需要在每个节点上申明其他节点,同时申明此节点到其他节点的路径,如:A节点上需要申明B、C、D、E四个节点,表示由A节点可以接收向其他四个节 点发送消息的请求,同时需要申明由A到B的路径,表示由A节点到B节点有一条通路;节点B则需要申明A、C、D、E四个节点,同时需要申明由B到C的路 径,如此类推。
如消息路由图示-2中,如节点B未申明到节点C的路径,消息客户连接到了节点A,发送消息到节点C,由于此时无有效的由A到C的路径,则消息将会被保存在网络中,等待当有有效的路径时再行发送。
对消息服务中路由的配置是通过Apusic应用服务器目录中子目录config下的 apusic.conf配置文件进行的。通过 apusic.conf中的相关配置段可以对消息路由器和路由连接器进行管理和配置。
消息服务的管理是通过配置消息管理对象的属性完成的。消息管理对象包含了管理员生成的消息配置信息,之后由消息客户端使用。
消息服务中定义了两个管理对象:
由于消息服务包含两种消息模型,即Point-to-Point(PTP)和Publish-and- Subscribe(Pub/Sub)模型,Apusic应用服务器对这两种消息模型提供了完整的支持,因此,对于连接创建器而言,提供了两种可配置的连 接创建器类型,面向PTP消息模型的QueueConnectionFactory和面向Pub/Sub模型的 TopicConnectionFactory;对于消息接收站而言,同样提供了两种可配置的消息接收类型站,面向PTP的队列(Queue)站和面向 Pub/Sub的主题(Topic)站。
通过对存在于Apusic应用服务器目录下config子目录中,名为 jms.xml的配置文件进行编辑,实现对Apusic应用服务器消息服务的管理。在此文件中,可以对连接创建器、消息接收站和消息服务安全策略进行配置。
jms.xml文件是一个xml文件,其文档类型定义(DTD)为jms-config_1_2.dtd。
Apusic应用服务器提供了 缺省的jms.xml文件
在jms.xml文件中,每一个连接创建器配置信息对应一个connection-factory标记申明的xml元素,每个connection-factory元素可包含使用以下三种标记所申明的子元素:
实际应用中,当管理员为连接创建器分配JNDI名之后,消息客户即可使用JNDI在服务器的命名空间中对连接创建器进行查找并获得引用,之后通过连接创建器取得与消息服务的连接。
连接创建器的配置属性如下表:
属性描述值类型缺省值
type连接创建器的消息模型(PTP和Pub/Sub)类型“QueueConnectionFactory”或“TopicConnectionFactory”,分别对应PTP和Pub/Sub模型,此属性必须定义
pooled指定此连接创建器是否对其管理的连接使用连接池“True”或“False”“False”
secure指定连接创建器所提供连接的通讯方式“True”或“False”“False”
anonymous是否授权匿名用户访问此连接创建器“True”或“False”“True”
client-id由于标识连接客户状态的标识符,通常被用于Pub/Sub模型中的持久订阅(Durable subscription)字符串,此属性是可选的
default-delivery-mode使用由此连接创建器生成的连接发送消息时,缺省的发送方式“persistent”或“non-persistent”“non-persistent”
default-priority使用由此连接创建器生成的连接发送消息时,缺省的优先级数字(0~9)“4”
default-time-to-live使用由此连接创建器生成的连接发送消息时,对于已发送的消息,消息系统保留此消息的缺省时间长度,单位为毫秒。整型0
min-pool-size此连接创建器对应的连接池中,所保持的最少连接数整型5
min-pool-size此连接创建器对应的连接池中,所保持的最少连接数整型30
idle-timeout连接等待超时时间。当连接池中的某个连接等待被使用的实际时间超过此属性数值时,连接池自动关闭此连接整型,单位是秒300
对应于PTP和Pub/Sub消息模型,Apusic应用服务器中的消息服务提供了两种消息接收站,队列 (Queue)和主题(Topic)。实际应用中,管理员为消息接收站分配JNDI名,消息客户即可使用JNDI在服务器的命名空间中对消息接收站进行查 找并获得引用,在通过连接创建器取得与消息服务的连接之后,消息客户即可向消息接收站同步或异步地发送或接收消息。
在jms.xml文件中,每一个消息接收站配置信息对应一个queue标记或topic标记申明的xml元素。
在jms.xml文件中,每一个队列(Queue)配置信息对应一个queue标记申明的xml元素,每个申明的queue元素可包含三种标记所申明的子元素:
队列的配置属性如下表:
属性描述值类型缺省值
cache-size队列缓冲中保留的消息个数整型20
expiry-check-interval系统检测消息队列中消息是否过期的时间间隔,单位是秒整型60
在jms.xml文件中,每一个主题(Topic)配置信息对应一个topic标记申明的xml元素,每个申明的topic元素可包含三种标记所申明的子元素:
主题的配置属性如下表:
属性描述值类型缺省值
cache-size主题缓冲中保留的消息个数整型20
expiry-check-interval系统检测消息主题中消息是否过期的时间间隔,单位是秒整型60
J2EE™体系中的JMS规范实际上并未包含有关安全方面的内容,因此,系统提供保证消息服务的安全和完整性的机制就极为重要。
Apusic应用服务器提供了对消息服务方面的安全管理,主要根据J2EE™体系中的安全角色(Security Role)和消息客户的操作对消息服务进行保护。
消息服务中基于安全角色的授权方式是指,系统管理员可定义一组安全角色,每个被定义的安全角色对应于系统中的一组用 户或组,然后,根据消息客户用户可对消息接收站(Destination)进行的操作(如对于队列,客户可执行发送、接收、浏览等操作)对前面定义的安全 角色进行授权。
对于一个或者一组jms.xml中申明的消息接收站(Destination)和一个或者一组jms.xml中申明的安全角色,通过使用一个destination-permission标记申明的xml元素来设置这二者之间的对应关系, 每个申明的destination-permission元素可包含三种标记所申明的子元素:
下面范例授权安全角色foo可以对队列bar 进行接收和浏览:
Last modified: 2002年5月27日 8:52:34
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/522746
推荐阅读
相关标签
  

闽ICP备14008679号