当前位置:   article > 正文

linux发布订阅通讯方式,NATS学习 概念学习之消息(Message)与发布订阅(PublishSubscribe)...

performant communications system nats

1 理论篇

1.1 来自官方的介绍

NATS acts as a central nervous system for distributed systems such as mobile devices, IoT networks, enterprise microservices and cloud native infrastructure. Unlike traditional enterprise messaging systems, NATS provides an always on ‘dial-tone’.

NATS在分布式系统中扮演一个中央神经系统,这些分布式系统包括移动设备,物联网,企业微服务和原生云基础设施。不同于传统的企业级消息系统,NATS提供永远在线服务。

NATS was created by Derek Collison, Founder and CEO at Apcera. Derek has spent over twenty years designing, building and using publish-subscribe messaging systems. Learn why traditional enterprise messaging systems don’t work for today’s distributed infrastructure, in this talk from Derek at SCALE on designing a new cloud-native messaging framework.

1.2 NATS 的消息(Message)

NATS messaging involves the electronic exchange of data among computer applications.

NATS provides a layer between the application and the underlying physical network. Application data is encoded as a message and sent by the publisher. The message is received, decoded, and processed by one or more subscribers. A subscriber can process a NATS message asynchronously or synchronously.

NATS的消息是指在不同计算机应用程序之间的消息交互。

NATS在应用与底层网络之间提供了一个抽象层。应用数据被包装为消息(message)并由发布者发送。消息被一个或多个订阅者接收,解码并且处理。订阅者可以异步或同步地处理消息。

1bf026001dde00f8176fb2be046fb6d2.png

1.2.1 Asynchronous processing

Asynchronous processing uses a callback message handler to process messages. When a message arrives, the registered callback handler receives control to process the message. The client or consuming application is not blocked from performing other work while it is waiting for a message. Asynchronous processing lets you create multi-threaded dispatching designs.

异步消息处理

异步消息处理使用一个消息回调处理器来处理消息,当消息到达的时候,已经注册的回调处理器来处理该消息。客户端或消费者程序不会从其他事件中阻塞在等待消息的时候。异步消息处理可以让程序员创建多线程的分发设计。

1.2.2 Synchronous processing

Synchronous processing requires that application code explicitly call a method to process an incoming message. Typically an explicit call is a blocking call that suspends processing until a message becomes available. If no message is available, the period for which the message processing call blocks is set by the client. Synchronous processing is typically used by a server whose purpose is to wait for and process incoming request messages, and to send replies to the requesting application.

同步消息处理

同步消息处理需要应用程序代码显示地调用函数来处理到达的消息。通常显示的函数调用是一个阻塞的调用,它会等待一知道消息可用。如果没有可用的消息,客户端在调用消息的时候就会一直处于阻塞状态。同步消息处理通常是由一个服务器端充当,它的职责就是等待并处理到达的消息,并且给发送消息的一方发送回复内容。

1.3 NATS的发布订阅(Publish Subscribe)

NATS implements a publish subscribe messaging model. NATS publish subscribe is a one-to-many communication. A publisher sends a message on a subject. Any active subscriber listening on that subject receives the message. Subscribers can register interest in wildcard subjects.

NATS实现了一个基于发布订阅的消息模型。NATS的发布订阅模型是一个一对多的通信模型。消息的发布者发送消息到一个主体(subject),任何的活动的订阅者坚挺这个主题并收到消息。订阅者可以使用通配符注册感兴趣的主题。

6b93e8f18df0160dd44a510c21758a87.png

If a subscriber is not listening on the subject (no subject match), or is not active when the message is sent, the message is not received. NATS is a fire-and-forget messaging system. If you need higher levels of service, you build it into the client.

如果订阅者没有监听的主题(或者没有匹配的主题),或者不在线,当消息发送的时候它将不会收到消息。NATS是一个一劳永逸的消息系统。如果需穴ky"http://www.it165.net/qq/" target="_blank" class="keylink">qq4/LjftcS3/s7xo6y9q8v8xNrWw9Tav827p7bLoaM8L3A+CjxibG9ja3F1b3RlPgoJPHA+SW4gYW4gYXN5bmNocm9ub3VzIGV4Y2hhbmdlLCBtZXNzYWdlcyBhcmUgZGVsaXZlcmVkIHRvIHRoZSBzdWJzY3JpYmVyJnJzcXVvO3MgbWVzc2FnZSBoYW5kbGVyLiBJZiB0aGVyZSBpcyBubyBoYW5kbGVyLCB0aGUgc3Vic2NyaXB0aW9uIGlzIHN5bmNocm9ub3VzIGFuZCB0aGUgY2xpZW50IG1heSBiZSBibG9ja2VkIHVudGlsIGl0IGNhbiBwcm9jZXNzIHRoZSBtZXNzYWdlLjwvcD4KPC9ibG9ja3F1b3RlPgo8cD7U2tLssr3P+8+itKbA7dbQo6zP+8+ivau74bG7t6LLzbW9tqnUxNXftcTP+8+iu9i197SmwO3G98nPo6zI57n7w7vT0M/7z6K72LX3tKbA7cb3o6zV4rj2tqnUxNXfvs3Kx82ssr21xKOsv827p7bLvau74dfoyPujrNaqtcDL/LSmwO3By8/7z6KhozwvcD4KPGgzIGlkPQ=="2-实践篇">2 实践篇

首先确保NATS服务器程序已经搭建好了。

参考

http://blog.csdn.net/frankcheng5143/article/details/51141804

2.1 启动服务

在命令行输入

gnastd

693d7a7f4a28c194db9d9c6d66b5724b.png

2.2 客户端程序的编写

依赖的jar包

io.nats

jnats

0.4.1

其他依赖

没有日志它会报错,这里我用的是log4j

org.slf4j

slf4j-api

1.7.21

org.slf4j

slf4j-log4j12

1.7.21

log4j

log4j

1.2.17

log4j的配置

log4j.properties

#---------------------------------------------------------

# Log4J Settings for log4j 1.2.x (via jakarta-commons-logging)

#

# The five logging levels used by Log are (in order):

#

# 1. DEBUG (the least serious)

# 2. INFO

# 3. WARN

# 4. ERROR

# 5. FATAL (the most serious)

#

#---------------------------------------------------------

# Root-Categroy

#---------------------------------------------------------

log4j.rootCategory=INFO, stdout, file

#---------------------------------------------------------

# stdout

#---------------------------------------------------------

log4j.appender.stdout=org.apache.log4j.ConsoleAppender

log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %c{1}:%L - %m%n

#---------------------------------------------------------

# file (log)

#---------------------------------------------------------

log4j.appender.file=org.apache.log4j.DailyRollingFileAppender

log4j.appender.file.File=/home/gwcheng/info.log

log4j.appender.file.layout=org.apache.log4j.PatternLayout

log4j.appender.file.Append=true

log4j.appender.file.DatePattern='.'yyyy-MM-dd

log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss:SSS} %p [%M] %c %m%n

#---------------------------------------------------------

# file (html)

#---------------------------------------------------------

#log4j.appender.html=org.apache.log4j.FileAppender

#log4j.appender.html.File=### THE PATH HERE ###

#log4j.appender.html.layout=org.apache.log4j.HTMLLayout

#---------------------------------------------------------

# customer

#---------------------------------------------------------

# my definition

io.netty=debug

好的,开始写代码。

这里模拟一个发布者,三个订阅者,拓扑图和上面的图一样。

2.2.1 发布者代码

Publish.java

package com.gwc.nats.nats.test;

import java.io.IOException;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.Map;

import java.util.Scanner;

import java.util.concurrent.TimeoutException;

import io.nats.client.Connection;

import io.nats.client.ConnectionFactory;

import io.nats.client.Message;

/**

* Hello world!

* @author gwcheng

*/

public class Publish {

public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

ConnectionFactory cf = new ConnectionFactory('nats://127.0.0.1:4222');

Connection nc = cf.createConnection();

// 消息

Message msg = new Message();

// 设置主题

msg.setSubject('foo');

@SuppressWarnings('resource')

Scanner scanner = new Scanner(System.in);

System.out.println('请输入字符串:');

while (true) {

String line = scanner.nextLine();

msg.setData(line.getBytes());

// 发布消息

nc.publish(msg);

}

}

}

2.2.2 发布者代码

Subscribe.java

package com.gwc.nats.nats.test;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

import io.nats.client.Connection;

import io.nats.client.ConnectionFactory;

import io.nats.client.Message;

import io.nats.client.MessageHandler;

public class Subscribe {

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory cf = new ConnectionFactory('nats://127.0.0.1:4222');

Connection nc = cf.createConnection();

// Lambda 表达式写法

/*

* nc.subscribe('foo', m -> { System.out.printf(

* '收到的消息:%s

', new String(m.getData())); });

*/

nc.subscribe('foo', new MessageHandler() {

@Override

public void onMessage(Message msg) {

System.out.println('收到的消息:' + new String(msg.getData()));

}

});

}

}

写上三个一模一样的Subscribe,除了类名不一样。

2.3 测试

先运行Publish,并输入一些字符,发布到foo这个主题上,这里没有订阅者,一会儿有了订阅者也不会收到刚才发布的字符。

c692cb291af37ff8e2d2715e29474680.png

运行三个Subscribe

0b988402c47b3b4a8711e1bf581dc074.png

然后在Publish的console输入一些字符,这个时候三个订阅者都将会收到该字符

Publish

44eb04bd6938fbeb903f0f8332a87d63.png

Subscribe

8690568ad5d1390fe5679256413f784d.png

Subscribe1

c264e9ec0160c3546001a52b389c450c.png

我们让Subscribe2 订阅的主题不是foo,看一下运行结果

Subscribe

295280ced551b2a4b525d4e2b19de053.png

Subscribe1

70ebda8f981f7abde2cbea3118f90538.png

Subscribe2

2d9aa093bff6e32178db669a6ad198a0.png

说明Subscribe2并没有收到消息,因为它没有关注foo主题。

参考文献

http://nats.io/

http://nats.io/documentation/concepts/nats-messaging/

http://nats.io/documentation/concepts/nats-pub-sub/

https://github.com/nats-io/jnats

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/786220
推荐阅读
相关标签
  

闽ICP备14008679号