赞
踩
设计模式在软件开发中起着至关重要的作用,它们是解决常见问题的经过验证的解决方案。而Netty
作为一个优秀的网络应用程序框架,同样也采用了许多设计模式来提供高性能和可扩展性。在本文中,我们将探讨Netty
中使用的一些关键设计模式,以及它们在构建强大网络应用程序中的应用。
Netty中MqttEncoder
这个编码器就用到了单例模式,它将构造函数私有化,并基于饿汉式的方式全局创建单例一个MqttEncoder
的单例 。
@ChannelHandler.Sharable
public final class MqttEncoder extends MessageToMessageEncoder<MqttMessage> {
public static final MqttEncoder INSTANCE = new MqttEncoder();
private MqttEncoder() { }
//略
}
这使得我们后续需要使用这个编码器的话,只能使用这个全局维护的示例对象INSTANCE
,避免了重复创建的开销。
nioSocketChannel.pipeline().addLast(new StringDecoder()).addLast(MqttEncoder.INSTANCE)
对此我们将其梳理为类图,对应的如下图所示:
同样的Netty
对于异常的管理也处理的很精细,例如ReadTimeoutException
,就是基于饿汉式的方式创建一个单例INSTANCE
:
public final class ReadTimeoutException extends TimeoutException {
private static final long serialVersionUID = 169287984113283421L;
public static final ReadTimeoutException INSTANCE = new ReadTimeoutException();
private ReadTimeoutException() { }
}
对于ReadTimeoutException
这个异常类的使用,也是全局仅用这个示例进行传播:
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
对应的类图如下所示:
策略模式最典型的特点就是:
在Netty
中最典型的实现就是DefaultEventExecutorChooserFactory
,这个工厂会工具用户传入executors
动态创建EventExecutorChooser
:
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
//根据用户传入的executors动态返回一个executors轮询选择器
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//如果executors的长度是2的次幂则返回PowerOfTowEventExecutorChooser,反之返回GenericEventExecutorChooser
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
对应的类图如下所示:
基于装饰者模式,实现基于拓展的方式对类进行增强,做到尽可能避免没必要的修改,在Netty
中WrappedByteBuf
就是最经典的装饰着模式,其实现思路为:
WrappedByteBuf
继承ByteBuf
获取ByteBuf
的所有行为方法。ByteBuf
。ByteBuf
的所有行为方法,通过传入的ByteBuf
进行实现,并返回WrappedByteBuf
而不是传入的buf
,从而做对外屏蔽内部实现。class WrappedByteBuf extends ByteBuf {
protected final ByteBuf buf;
protected WrappedByteBuf(ByteBuf buf) {
if (buf == null) {
throw new NullPointerException("buf");
}
this.buf = buf;
}
@Override
public final boolean hasMemoryAddress() {
return buf.hasMemoryAddress();
}
@Override
public final long memoryAddress() {
return buf.memoryAddress();
}
@Override
public final int capacity() {
return buf.capacity();
}
@Override
public ByteBuf capacity(int newCapacity) {
buf.capacity(newCapacity);
return this;
}
@Override
public final int maxCapacity() {
return buf.maxCapacity();
}
@Override
public final ByteBufAllocator alloc() {
return buf.alloc();
}
//使用buf实现,返回当前对象,从而对外屏蔽内部实现细节
@Override
public ByteBuf retain() {
buf.retain();
return this;
}
//略
}
对应的类图如下所示:
与之同理的还有UnreleasableByteBuf
,这里就不多做赘述了:
final class UnreleasableByteBuf extends WrappedByteBuf {
private SwappedByteBuf swappedBuf;
UnreleasableByteBuf(ByteBuf buf) {
super(buf);
}
@Override
public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == order()) {
return this;
}
SwappedByteBuf swappedBuf = this.swappedBuf;
if (swappedBuf == null) {
this.swappedBuf = swappedBuf = new SwappedByteBuf(this);
}
return swappedBuf;
}
@Override
public ByteBuf asReadOnly() {
return new UnreleasableByteBuf(buf.asReadOnly());
}
@Override
public ByteBuf readSlice(int length) {
return new UnreleasableByteBuf(buf.readSlice(length));
}
@Override
public ByteBuf readRetainedSlice(int length) {
return new UnreleasableByteBuf(buf.readRetainedSlice(length));
}
//略
}
我们在使用Netty
时,可能经常会进行这样一段的代码编写,通过addListener
方法注册监听器,确保端口绑定成功后,回调通知我们的监听器:
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
bind(serverBootstrap, port + 1);
}
});
步入DefaultPromise
的addListener
源码之后,可以看到它会通过addListener0
完成监听器注册:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
synchronized (this) {
addListener0(listener);
}
if (isDone()) {
notifyListeners();
}
return this;
}
最终就会通过异步的方式通知我们的监听器。
private void notifyListeners() {
EventExecutor executor = executor();
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
//通知所有的监听器
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//通知所有的监听器
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
对应的类图如下图所示:
迭代器模式不是很常见,我们还是以Netty
中的一段示例代码来了解这种涉及模式,代码执行步骤如下:
ByteBuf
,分别是header
和body
。ByteBuf
添加到CompositeByteBuf
。CompositeByteBuf
迭代这两个ByteBuf
数组。 public static void main(String[] args) {
//创建两个ByteBuf ,分别是header和body
ByteBuf header = Unpooled.wrappedBuffer(new byte[]{1, 2, 3});
ByteBuf body = Unpooled.wrappedBuffer(new byte[]{4, 5, 6});
CompositeByteBuf byteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(2);
//这两个ByteBuf 添加到CompositeByteBuf`
byteBuf.addComponent(true, header);
byteBuf.addComponent(true, body);
//基于CompositeByteBuf 迭代这两个ByteBuf数组
Iterator<ByteBuf> iterator = byteBuf.iterator();
while (iterator.hasNext()){
ByteBuf next = iterator.next();
int len = next.readableBytes();
byte[] bytes=new byte[len];
next.readBytes(bytes);
for (byte b : bytes) {
System.out.println(b);
}
}
}
这种模式在java
中非常常见,其特点为:
首先看看集合类:
Iterable
获取迭代器的能力。addComponent
实现集合添加能力。public class CompositeByteBuf extends AbstractReferenceCountedByteBuf implements Iterable<ByteBuf> {
private static final Iterator<ByteBuf> EMPTY_ITERATOR = Collections.<ByteBuf>emptyList().iterator();
@Override
public Iterator<ByteBuf> iterator() {
ensureAccessible();
if (components.isEmpty()) {
return EMPTY_ITERATOR;
}
return new CompositeByteBufIterator();
}
public CompositeByteBuf addComponent(boolean increaseWriterIndex, ByteBuf buffer) {
checkNotNull(buffer, "buffer");
addComponent0(increaseWriterIndex, components.size(), buffer);
consolidateIfNeeded();
return this;
}
}
查看迭代器:
Iterator
实现迭代能力。//以内部类的形式继承Iterator实现迭代能力
private final class CompositeByteBufIterator implements Iterator<ByteBuf> {
private final int size = components.size();
private int index;
@Override
public boolean hasNext() {
return size > index;
}
@Override
public ByteBuf next() {
if (size != components.size()) {
throw new ConcurrentModificationException();
}
if (!hasNext()) {
throw new NoSuchElementException();
}
try {
return components.get(index++).buf;
} catch (IndexOutOfBoundsException e) {
throw new ConcurrentModificationException();
}
}
@Override
public void remove() {
throw new UnsupportedOperationException("Read-Only");
}
}
责任链模式在Netty
中最常见了,例如我们希望读处理器在一条链上不断传播,我们就会通过这样一段代码:
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
而fireChannelRead
就会通过findContextInbound
找到下一个处理器继续调用channelRead
像一条链条一样传播下去。
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
查看findContextInbound
细节可知,这些ChannelInboundHandlerAdapter
都是通过AbstractChannelHandlerContext
封装再进行串联,确保每个消息都会通过这条链传播到所有ChannelInboundHandlerAdapter
:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
对应的类图如下所示:
netty设计模式-单例模式:https://blog.csdn.net/fst438060684/article/details/81156825
迭代器模式:https://www.runoob.com/design-pattern/iterator-pattern.html
设计模型之责任链模式含UML完整实例):https://blog.csdn.net/atu1111/article/details/105558539
一次性搞懂设计模式–迭代器模式:https://zhuanlan.zhihu.com/p/537080924
行为型模式:https://design-patterns.readthedocs.io/zh_CN/latest/behavioral_patterns/behavioral.html
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。