赞
踩
implementation 'io.netty:netty-all:4.1.4.Final'
ExponentialBackOffRetry.java
package com.lanyu.cloud.netty;
import com.lanyu.cloud.utils.LogUtil;
import java.util.Random;
/**
* <p>Retry policy that retries a set number of times with increasing sleep time between retries</p>
*/
public class ExponentialBackOffRetry implements RetryPolicy {
private static final int MAX_RETRIES_LIMIT = 29;
private static final int DEFAULT_MAX_SLEEP_MS = Integer.MAX_VALUE;
private final Random random = new Random();
private final long baseSleepTimeMs;
private final int maxRetries;
private final int maxSleepMs;
public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries) {
this(baseSleepTimeMs, maxRetries, DEFAULT_MAX_SLEEP_MS);
}
public ExponentialBackOffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) {
this.maxRetries = maxRetries;
this.baseSleepTimeMs = baseSleepTimeMs;
this.maxSleepMs = maxSleepMs;
}
@Override
public boolean allowRetry(int retryCount) {
if (retryCount < maxRetries) {
return true;
}
return false;
}
@Override
public long getSleepTimeMs(int retryCount) {
if (retryCount < 0) {
throw new IllegalArgumentException("retries count must greater than 0.");
}
if (retryCount > MAX_RETRIES_LIMIT) {
LogUtil.e(String.format("maxRetries too large (%d). Pinning to %d", maxRetries, MAX_RETRIES_LIMIT));
retryCount = MAX_RETRIES_LIMIT;
}
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << retryCount));
if (sleepMs > maxSleepMs) {
LogUtil.e(String.format("Sleep extension too large (%d). Pinning to %d", sleepMs, maxSleepMs));
sleepMs = maxSleepMs;
}
return sleepMs;
}
}
INettyMessageListener.java
package com.lanyu.cloud.netty;
public interface INettyMessageListener {
void onReceive(String message);
void onConnectSuccess();
void onError();
}
NettyChatClient.java
package com.lanyu.cloud.netty;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
public class NettyChatClient {
private static NettyChatClient mNettyChatClient;
private static String mServerIp;
private static int mPort;
private Channel mChannel;
/**
* 重连策略
*/
private RetryPolicy mRetryPolicy;
private Bootstrap mBootstrap;
public void init(INettyMessageListener messageListener) {
mRetryPolicy = new ExponentialBackOffRetry(3000, Integer.MAX_VALUE, 3 * 1000);
EventLoopGroup group = new NioEventLoopGroup();
// bootstrap 可重用, 只需在TcpClient实例化的时候初始化即可.
mBootstrap = new Bootstrap();
mBootstrap.group(group)
.option(ChannelOption.TCP_NODELAY, true)
.channel(NioSocketChannel.class)
.handler(new ClientHandlersInitializer());
}
public static NettyChatClient newInstance(String serverIp, int port) {
if (mNettyChatClient == null) {
mNettyChatClient = new NettyChatClient();
mServerIp = serverIp;
mPort = port;
}
return mNettyChatClient;
}
public static NettyChatClient getInstance() {
return mNettyChatClient;
}
/**
* 向远程TCP服务器请求连接
*/
public void connect() {
synchronized (mBootstrap) {
ChannelFuture future = mBootstrap.connect(mServerIp, mPort);
future.addListener(getConnectionListener());
this.mChannel = future.channel();
}
}
private ChannelFutureListener getConnectionListener() {
return future -> {
if (!future.isSuccess()) {
future.channel().pipeline().fireChannelInactive();
}
};
}
private class ClientHandlersInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));
pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new ReconnectHandler(mNettyChatClient));
}
}
public RetryPolicy getRetryPolicy() {
return mRetryPolicy;
}
public void sendMessage(String content) {
if (mChannel != null) {
mChannel.writeAndFlush(content);
}
}
}
ReconnectHandler.java
package com.lanyu.cloud.netty;
import android.annotation.SuppressLint;
import com.lanyu.cloud.utils.LogUtil;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
@ChannelHandler.Sharable
public class ReconnectHandler extends SimpleChannelInboundHandler<String> {
private int retries = 0;
private RetryPolicy mRetryPolicy;
private NettyChatClient mNettyChatClient;
public ReconnectHandler(NettyChatClient nettyChatClient) {
this.mNettyChatClient = nettyChatClient;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LogUtil.e("Successfully established a connection to the server.");
retries = 0;
ctx.fireChannelActive();
}
@SuppressLint("DefaultLocale")
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
if (retries == 0) {
LogUtil.e("Lost the TCP connection with the server.");
ctx.close();
}
boolean allowRetry = getRetryPolicy().allowRetry(retries);
if (allowRetry) {
long sleepTimeMs = getRetryPolicy().getSleepTimeMs(retries);
LogUtil.e(String.format("Try to reconnect to the server after %dms. Retry count: %d.", sleepTimeMs, ++retries));
final EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
System.out.println("Reconnecting ...");
mNettyChatClient.connect();
}, sleepTimeMs, TimeUnit.MILLISECONDS);
}
ctx.fireChannelInactive();
}
/**
* 心跳监测
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
ctx.close();
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
default:
}
LogUtil.e(ctx.channel().remoteAddress() + " " + eventType);
}
}
/**
* 收到消息后调用
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, String message) {
LogUtil.e("======================");
LogUtil.e("收到消息" + message);
}
private RetryPolicy getRetryPolicy() {
if (this.mRetryPolicy == null) {
this.mRetryPolicy = mNettyChatClient.getRetryPolicy();
}
return this.mRetryPolicy;
}
}
RetryPolicy.java
package com.lanyu.cloud.netty;
public interface RetryPolicy {
/**
* Called when an operation has failed for some reason. This method should return
* true to make another attempt.
*
* @param retryCount the number of times retried so far (0 the first time)
* @return true/false
*/
boolean allowRetry(int retryCount);
/**
* get sleep time in ms of current retry count.
*
* @param retryCount current retry count
* @return the time to sleep
*/
long getSleepTimeMs(int retryCount);
}
使用如下:
private void initSocket() {
NettyChatClient nettyChatClient = NettyChatClient.newInstance(Constant.SERVERIP, Constant.PORT);
nettyChatClient.init(new INettyMessageListener() {
@Override
public void onReceive(String message) {
for (INettyMessageListener nettyMessageListener : mIMessageListenerList) {
nettyMessageListener.onReceive(message);
}
}
@Override
public void onConnectSuccess() {
for (INettyMessageListener nettyMessageListener : mIMessageListenerList) {
nettyMessageListener.onConnectSuccess();
}
}
@Override
public void onError() {
for (INettyMessageListener nettyMessageListener : mIMessageListenerList) {
nettyMessageListener.onError();
}
}
});
nettyChatClient.connect();
}
NettyChatClient.getInstance().sendMessage(cmdJson.toJSONString());
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。