赞
踩
1 理论篇
1.1 来自官方的介绍
NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always on ‘dial-tone’.
NATS在分布式系统中扮演一个中央神经系统,这些分布式系统包括移动设备,物联网,企业微服务和原生云基础设施。不同于传统的企业级消息系统,NATS提供永远在线服务。
NATS was created by Derek Collison, Founder and CEO at Apcera. Derek has spent over twenty years designing, building and using publish-subscribe messaging systems. Learn why traditional enterprise messaging systems don’t work for today’s distributed infrastructure, in this talk from Derek at SCALE on designing a new cloud-native messaging framework.
1.2 NATS 的消息(Message)
NATS messaging involves the electronic exchange of data among computer applications.
NATS provides a layer between the application and the underlying physical network. Application data is encoded as a message and sent by the publisher. The message is received, decoded, and processed by one or more subscribers. A subscriber can process a NATS message asynchronously or synchronously.
NATS的消息是指在不同计算机应用程序之间的消息交互。
NATS在应用与底层网络之间提供了一个抽象层。应用数据被包装为消息(message)并由发布者发送。消息被一个或多个订阅者接收,解码并且处理。订阅者可以异步或同步地处理消息。
1.2.1 Asynchronous processing
Asynchronous processing uses a callback message handler to process messages. When a message arrives, the registered callback handler receives control to process the message. The client or consuming application is not blocked from performing other work while it is waiting for a message. Asynchronous processing lets you create multi-threaded dispatching designs.
异步消息处理
异步消息处理使用一个消息回调处理器来处理消息,当消息到达的时候,已经注册的回调处理器来处理该消息。客户端或消费者程序不会从其他事件中阻塞在等待消息的时候。异步消息处理可以让程序员创建多线程的分发设计。
1.2.2 Synchronous processing
Synchronous processing requires that application code explicitly call a method to process an incoming message. Typically an explicit call is a blocking call that suspends processing until a message becomes available. If no message is available, the period for which the message processing call blocks is set by the client. Synchronous processing is typically used by a server whose purpose is to wait for and process incoming request messages, and to send replies to the requesting application.
同步消息处理
同步消息处理需要应用程序代码显示地调用函数来处理到达的消息。通常显示的函数调用是一个阻塞的调用,它会等待一知道消息可用。如果没有可用的消息,客户端在调用消息的时候就会一直处于阻塞状态。同步消息处理通常是由一个服务器端充当,它的职责就是等待并处理到达的消息,并且给发送消息的一方发送回复内容。
1.3 NATS的发布订阅(Publish Subscribe)
NATS implements a publish subscribe messaging model. NATS publish subscribe is a one-to-many communication. A publisher sends a message on a subject. Any active subscriber listening on that subject receives the message. Subscribers can register interest in wildcard subjects.
NATS实现了一个基于发布订阅的消息模型。NATS的发布订阅模型是一个一对多的通信模型。消息的发布者发送消息到一个主体(subject),任何的活动的订阅者坚挺这个主题并收到消息。订阅者可以使用通配符注册感兴趣的主题。
If a subscriber is not listening on the subject (no subject match), or is not active when the message is sent, the message is not received. NATS is a fire-and-forget messaging system. If you need higher levels of service, you build it into the client.
如果订阅者没有监听的主题(或者没有匹配的主题),或者不在线,当消息发送的时候它将不会收到消息。NATS是一个一劳永逸的消息系统。如果需穴ky"http://www.it165.net/qq/" target="_blank" class="keylink">qq4/LjftcS3/s7xo6y9q8v8xNrWw9Tav827p7bLoaM8L3A+CjxibG9ja3F1b3RlPgoJPHA+SW4gYW4gYXN5bmNocm9ub3VzIGV4Y2hhbmdlLCBtZXNzYWdlcyBhcmUgZGVsaXZlcmVkIHRvIHRoZSBzdWJzY3JpYmVyJnJzcXVvO3MgbWVzc2FnZSBoYW5kbGVyLiBJZiB0aGVyZSBpcyBubyBoYW5kbGVyLCB0aGUgc3Vic2NyaXB0aW9uIGlzIHN5bmNocm9ub3VzIGFuZCB0aGUgY2xpZW50IG1heSBiZSBibG9ja2VkIHVudGlsIGl0IGNhbiBwcm9jZXNzIHRoZSBtZXNzYWdlLjwvcD4KPC9ibG9ja3F1b3RlPgo8cD7U2tLssr3P+8+itKbA7dbQo6zP+8+ivau74bG7t6LLzbW9tqnUxNXftcTP+8+iu9i197SmwO3G98nPo6zI57n7w7vT0M/7z6K72LX3tKbA7cb3o6zV4rj2tqnUxNXfvs3Kx82ssr21xKOsv827p7bLvau74dfoyPujrNaqtcDL/LSmwO3By8/7z6KhozwvcD4KPGgzIGlkPQ=="2-实践篇">2 实践篇
首先确保NATS服务器程序已经搭建好了。
参考
http://blog.csdn.net/frankcheng5143/article/details/51141804
2.1 启动服务
在命令行输入
gnastd
2.2 客户端程序的编写
依赖的jar包
io.nats
jnats
0.4.1
其他依赖
没有日志它会报错,这里我用的是log4j
org.slf4j
slf4j-api
1.7.21
org.slf4j
slf4j-log4j12
1.7.21
log4j
log4j
1.2.17
log4j的配置
log4j.properties
#---------------------------------------------------------
# Log4J Settings for log4j 1.2.x (via jakarta-commons-logging)
#
# The five logging levels used by Log are (in order):
#
# 1. DEBUG (the least serious)
# 2. INFO
# 3. WARN
# 4. ERROR
# 5. FATAL (the most serious)
#
#---------------------------------------------------------
# Root-Categroy
#---------------------------------------------------------
log4j.rootCategory=INFO, stdout, file
#---------------------------------------------------------
# stdout
#---------------------------------------------------------
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n
#---------------------------------------------------------
# file (log)
#---------------------------------------------------------
log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
log4j.appender.file.File=/home/gwcheng/info.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.Append=true
log4j.appender.file.DatePattern='.'yyyy-MM-dd
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss:SSS} %p [%M] %c %m%n
#---------------------------------------------------------
# file (html)
#---------------------------------------------------------
#log4j.appender.html=org.apache.log4j.FileAppender
#log4j.appender.html.File=### THE PATH HERE ###
#log4j.appender.html.layout=org.apache.log4j.HTMLLayout
#---------------------------------------------------------
# customer
#---------------------------------------------------------
# my definition
io.netty=debug
好的,开始写代码。
这里模拟一个发布者,三个订阅者,拓扑图和上面的图一样。
2.2.1 发布者代码
Publish.java
package com.gwc.nats.nats.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Message;
/**
* Hello world!
* @author gwcheng
*/
public class Publish {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory cf = new ConnectionFactory('nats://127.0.0.1:4222');
Connection nc = cf.createConnection();
// 消息
Message msg = new Message();
// 设置主题
msg.setSubject('foo');
@SuppressWarnings('resource')
Scanner scanner = new Scanner(System.in);
System.out.println('请输入字符串:');
while (true) {
String line = scanner.nextLine();
msg.setData(line.getBytes());
// 发布消息
nc.publish(msg);
}
}
}
2.2.2 发布者代码
Subscribe.java
package com.gwc.nats.nats.test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import io.nats.client.Connection;
import io.nats.client.ConnectionFactory;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
public class Subscribe {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory cf = new ConnectionFactory('nats://127.0.0.1:4222');
Connection nc = cf.createConnection();
// Lambda 表达式写法
/*
* nc.subscribe('foo', m -> { System.out.printf(
* '收到的消息:%s
', new String(m.getData())); });
*/
nc.subscribe('foo', new MessageHandler() {
@Override
public void onMessage(Message msg) {
System.out.println('收到的消息:' + new String(msg.getData()));
}
});
}
}
写上三个一模一样的Subscribe,除了类名不一样。
2.3 测试
先运行Publish,并输入一些字符,发布到foo这个主题上,这里没有订阅者,一会儿有了订阅者也不会收到刚才发布的字符。
运行三个Subscribe
然后在Publish的console输入一些字符,这个时候三个订阅者都将会收到该字符
Publish
Subscribe
Subscribe1
我们让Subscribe2 订阅的主题不是foo,看一下运行结果
Subscribe
Subscribe1
Subscribe2
说明Subscribe2并没有收到消息,因为它没有关注foo主题。
参考文献
http://nats.io/
http://nats.io/documentation/concepts/nats-messaging/
http://nats.io/documentation/concepts/nats-pub-sub/
https://github.com/nats-io/jnats
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。