当前位置:   article > 正文

android 地图导航demo,BaiduNaviSDKDemo-基于Java开发的一个应有于Android的地图导航功能_android 百度导航

android 百度导航

由于项目需要、这两天几天一直在研究百度导航的功能、通过不断的实践终于有结果了、不愿意独享。

现在我把我的研究成果和大家分享一下、其实百度的 API 已经相当不错了。

文件:url80.ctfile.com/f/25127180-740368925-b800f3?p=551685 (访问密码: 551685)


主从同步的实现逻辑主要在HAService中,在DefaultMessageStore的构造函数中,对HAService进行了实例化,并在start方法中,启动了HAService:

public class DefaultMessageStore implements MessageStore {
public DefaultMessageStore(final MessageStoreConfig messageStoreConfig, final BrokerStatsManager brokerStatsManager,
final MessageArrivingListener messageArrivingListener, final BrokerConfig brokerConfig) throws IOException {
// …
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
// 初始化HAService
this.haService = new HAService(this);
} else {
this.haService = null;
}
// …
}

public void start() throws Exception {
    // ...
    if (!messageStoreConfig.isEnableDLegerCommitLog()) {
        // 启动HAService
        this.haService.start();
        this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());
    }
    // ...
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

}
在HAService的构造函数中,创建了AcceptSocketService、GroupTransferService和HAClient,在start方法中主要做了如下几件事:

调用AcceptSocketService的beginAccept方法,这一步主要是进行端口绑定,在端口上监听从节点的连接请求(可以看做是运行在master节点的);
调用AcceptSocketService的start方法启动服务,这一步主要为了处理从节点的连接请求,与从节点建立连接(可以看做是运行在master节点的);
调用GroupTransferService的start方法,主要用于在主从同步的时候,等待数据传输完毕(可以看做是运行在master节点的);
调用HAClient的start方法启动,里面与master节点建立连接,向master汇报主从同步进度并存储master发送过来的同步数据(可以看做是运行在从节点的);
public class HAService {
public HAService(final DefaultMessageStore defaultMessageStore) throws IOException {
this.defaultMessageStore = defaultMessageStore;
// 创建AcceptSocketService
this.acceptSocketService =
new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
this.groupTransferService = new GroupTransferService();
// 创建HAClient
this.haClient = new HAClient();
}

public void start() throws Exception {
    // 开始监听从服务器的连接
    this.acceptSocketService.beginAccept();
    // 启动服务
    this.acceptSocketService.start();
    // 启动GroupTransferService
    this.groupTransferService.start();
    // 启动
    this.haClient.start();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

}

监听从节点连接请求
AcceptSocketService的beginAccept方法里面首先获取了ServerSocketChannel,然后进行端口绑定,并在selector上面注册了OP_ACCEPT事件的监听,监听从节点的连接请求:

public class HAService {
class AcceptSocketService extends ServiceThread {
/**
* 监听从节点的连接
*
* @throws Exception If fails.
*/
public void beginAccept() throws Exception {
// 创建ServerSocketChannel
this.serverSocketChannel = ServerSocketChannel.open();
// 获取selector
this.selector = RemotingUtil.openSelector();
this.serverSocketChannel.socket().setReuseAddress(true);
// 绑定端口
this.serverSocketChannel.socket().bind(this.socketAddressListen);
// 设置非阻塞
this.serverSocketChannel.configureBlocking(false);
// 注册OP_ACCEPT连接事件的监听
this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
}
}
处理从节点连接请求
AcceptSocketService的run方法中,对监听到的连接请求进行了处理,处理逻辑大致如下:

从selector中获取到监听到的事件;
如果是OP_ACCEPT连接事件,创建与从节点的连接对象HAConnection,与从节点建立连接,然后调用HAConnection的start方法进行启动,并创建的HAConnection对象加入到连接集合中,HAConnection中封装了Master节点和从节点的数据同步逻辑;
public class HAService {
class AcceptSocketService extends ServiceThread {
@Override
public void run() {
log.info(this.getServiceName() + " service started");
// 如果服务未停止
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 获取监听到的事件
Set selected = this.selector.selectedKeys();
// 处理事件
if (selected != null) {
for (SelectionKey k : selected) {
// 如果是连接事件
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();
if (sc != null) {
HAService.log.info(“HAService receive new connection, "
+ sc.socket().getRemoteSocketAddress());
try {
// 创建HAConnection,建立连接
HAConnection conn = new HAConnection(HAService.this, sc);
// 启动
conn.start();
// 添加连接
HAService.this.addConnection(conn);
} catch (Exception e) {
log.error(“new HAConnection exception”, e);
sc.close();
}
}
} else {
log.warn(“Unexpected ops in select " + k.readyOps());
}
}
selected.clear();
}
} catch (Exception e) {
log.error(this.getServiceName() + " service has exception.”, e);
}
}
log.info(this.getServiceName() + " service end”);
}
}
}
等待主从复制传输结束
GroupTransferService的run方法主要是为了在进行主从数据同步的时候,等待从节点数据同步完毕。

在运行时首先进会调用waitForRunning进行等待,因为此时可能还有没有开始主从同步,所以先进行等待,之后如果有同步请求,会唤醒该线程,然后调用doWaitTransfer方法等待数据同步完成:

public class HAService {
class GroupTransferService extends ServiceThread {

     public void run() {
        log.info(this.getServiceName() + " service started");
        // 如果服务未停止
        while (!this.isStopped()) {
            try {
                // 等待运行
                this.waitForRunning(10);
                // 如果被唤醒,调用doWaitTransfer等待主从同步完成
                this.doWaitTransfer();
            } catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

}
在看doWaitTransfer方法之前,首先看下是如何判断有数据需要同步的。

Master节点中,当消息被写入到CommitLog以后,会调用submitReplicaRequest方法处主从同步,首先判断当前Broker的角色是否是SYNC_MASTER,如果是则会构建消息提交请求GroupCommitRequest,然后调用HAService的putRequest添加到请求集合中,并唤醒GroupTransferService中在等待的线程:

public class CommitLog {
public CompletableFuture submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
// 构建GroupCommitRequest
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
// 添加请求
service.putRequest(request);
// 唤醒GroupTransferService中在等待的线程
service.getWaitNotifyObject().wakeupAll();
return request.future();
}
else {
return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
}
在doWaitTransfer方法中,会判断CommitLog提交请求集合requestsRead是否为空,如果不为空,表示有消息写入了CommitLog,Master节点需要等待将数据传输给从节点:

push2SlaveMaxOffset记录了从节点已经同步的消息偏移量,判断push2SlaveMaxOffset是否大于本次CommitLog提交的偏移量,也就是请求中设置的偏移量;
获取请求中设置的等待截止时间;
开启循环,判断数据是否还未传输完毕,并且未超过截止时间,如果是则等待1s,然后继续判断传输是否完毕,不断进行,直到超过截止时间或者数据已经传输完毕;
(向从节点发送的消息最大偏移量push2SlaveMaxOffset超过了请求中设置的偏移量表示本次同步数据传输完毕);
唤醒在等待数据同步完毕的线程;
public class HAService {
// CommitLog提交请求集合
private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();

class GroupTransferService extends ServiceThread {

    private void doWaitTransfer() {
        // 如果CommitLog提交请求集合不为空
        if (!this.requestsRead.isEmpty()) {
            // 处理消息提交请求
            for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                // 判断传输到从节点最大偏移量是否超过了请求中设置的偏移量
                boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                // 获取截止时间
                long deadLine = req.getDeadLine();
                // 如果从节点还未同步完毕并且未超过截止时间
                while (!transferOK && deadLine - System.nanoTime() > 0) {
                    // 等待
                    this.notifyTransferObject.waitForRunning(1000);
                    // 判断从节点同步的最大偏移量是否超过了请求中设置的偏移量
                    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                }
                // 唤醒
                req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }

            this.requestsRead = new LinkedList<>();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

}
启动HAClient
HAClient可以看做是在从节点上运行的,主要进行的处理如下:

调用connectMaster方法连接Master节点,Master节点上也会运行,但是它本身就是Master没有可连的Master节点,所以可以忽略;
调用isTimeToReportOffset方法判断是否需要向Master节点汇报同步偏移量,如果需要则调用reportSlaveMaxOffset方法将当前的消息同步偏移量发送给Master节点;
调用processReadEvent处理网络请求中的可读事件,也就是处理Master发送过来的消息,将消息存入CommitLog;
public class HAService {
class HAClient extends ServiceThread {

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            try {
                // 连接Master节点
                if (this.connectMaster()) {
                    // 是否需要报告消息同步偏移量
                    if (this.isTimeToReportOffset()) {
                        // 向Master节点发送同步偏移量
                        boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                        if (!result) {
                            this.closeMaster();
                        }
                    }
                    this.selector.select(1000);
                    // 处理读事件,也就是Master节点发送的数据
                    boolean ok = this.processReadEvent();
                    if (!ok) {
                        this.closeMaster();
                    }
                    // ...
                } else {
                    this.waitForRunning(1000 * 5);
                }
            } catch (Exception e) {
                log.warn(this.getServiceName() + " service has exception. ", e);
                this.waitForRunning(1000 * 5);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

}
连接主节点
connectMaster方法中会获取Master节点的地址,并转换为SocketAddress对象,然后向Master节点请求建立连接,并在selector注册OP_READ可读事件监听:

public class HAService {
class HAClient extends ServiceThread {
// 当前的主从复制进度
private long currentReportedOffset = 0;

private boolean connectMaster() throws ClosedChannelException {
    if (null == socketChannel) {
        String addr = this.masterAddress.get();
        if (addr != null) {
            // 将地址转为SocketAddress
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) {
                // 连接master
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) {
                    // 注册OP_READ可读事件监听
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                }
            }
        }
        // 获取CommitLog中当前最大的偏移量
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        // 更新上次写入时间
        this.lastWriteTimestamp = System.currentTimeMillis();
    }
    return this.socketChannel != null;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

}
发送主从同步消息拉取偏移量
在isTimeToReportOffset方法中,首先获取当前时间与上一次进行主从同步的时间间隔interval,如果时间间隔interval大于配置的发送心跳时间间隔,表示需要向Master节点发送从节点消息同步的偏移量,接下来会调用reportSlaveMaxOffset方法发送同步偏移量,也就是说从节点会定时向Master节点发送请求,反馈CommitLog中同步消息的偏移量:

public class HAService {
class HAClient extends ServiceThread {
// 当前从节点已经同步消息的偏移量大小
private long currentReportedOffset = 0;

   private boolean isTimeToReportOffset() {
        // 获取距离上一次主从同步的间隔时间
        long interval =
            HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
        // 判断是否超过了配置的发送心跳包时间间隔
        boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
            .getHaSendHeartbeatInterval();

        return needHeart;
    }

    // 发送同步偏移量,传入的参数是当前的主从复制偏移量currentReportedOffset
    private boolean reportSlaveMaxOffset(final long maxOffset) {
        this.reportOffset.position(0);
        this.reportOffset.limit(8); // 设置数据传输大小为8个字节
        this.reportOffset.putLong(maxOffset);// 设置同步偏移量
        this.reportOffset.position(0);
        this.reportOffset.limit(8);

        for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
            try {
                // 向Master节点发送拉取偏移量
                this.socketChannel.write(this.reportOffset);
            } catch (IOException e) {
                log.error(this.getServiceName()
                    + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                return false;
            }
        }
        // 更新发送时间
        lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
        return !this.reportOffset.hasRemaining();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

}
处理网络可读事件
processReadEvent方法中处理了可读事件,也就是处理Master节点发送的同步数据, 首先从socketChannel中读取数据到byteBufferRead中,byteBufferRead是读缓冲区,读取数据的方法会返回读取到的字节数,对字节数大小进行判断:

如果可读字节数大于0表示有数据需要处理,调用dispatchReadRequest方法进行处理;
如果可读字节数为0表示没有可读数据,此时记录读取到空数据的次数,如果连续读到空数据的次数大于3次,将终止本次处理;
class HAClient extends ServiceThread {
// 读缓冲区,会将从socketChannel读入缓冲区
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

    private boolean processReadEvent() {
        int readSizeZeroTimes = 0;
        while (this.byteBufferRead.hasRemaining()) {
            try {
                // 从socketChannel中读取数据到byteBufferRead中,返回读取到的字节数
                int readSize = this.socketChannel.read(this.byteBufferRead);
                if (readSize > 0) {
                    // 重置readSizeZeroTimes
                    readSizeZeroTimes = 0;
                    // 处理数据
                    boolean result = this.dispatchReadRequest();
                    if (!result) {
                        log.error("HAClient, dispatchReadRequest error");
                        return false;
                    }
                } else if (readSize == 0) {
                    // 记录读取到空数据的次数
                    if (++readSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    log.info("HAClient, processReadEvent read socket < 0");
                    return false;
                }
            } catch (IOException e) {
                log.info("HAClient, processReadEvent read socket exception", e);
                return false;
            }
        }

        return true;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

}
消息写入ComitLog
dispatchReadRequest方法中会将从节点读取到的数据写入CommitLog,dispatchPosition记录了已经处理的数据在读缓冲区中的位置,从读缓冲区byteBufferRead获取剩余可读取的字节数,如果可读数据的字节数大于一个消息头的字节数(12个字节),表示有数据还未处理完毕,反之表示消息已经处理完毕结束处理。
对数据的处理逻辑如下:

从缓冲区中读取数据,首先获取到的是消息在master节点的物理偏移量masterPhyOffset;
向后读取8个字节,得到消息体内容的字节数bodySize;
获取从节点当前CommitLog的最大物理偏移量slavePhyOffset,如果不为0并且不等于masterPhyOffset,表示与Master节点的传输偏移量不一致,也就是数据不一致,此时终止处理;
如果可读取的字节数大于一个消息头的字节数 + 消息体大小,表示有消息可处理,继续进行下一步;
计算消息体在读缓冲区中的起始位置,从读缓冲区中根据起始位置,读取消息内容,将消息追加到从节点的CommitLog中;
更新dispatchPosition的值为消息头大小 + 消息体大小,dispatchPosition之前的数据表示已经处理完毕;

class HAClient extends ServiceThread {
    // 已经处理的数据在读缓冲区中的位置,初始化为0
    private int dispatchPosition = 0;
    // 读缓冲区
    private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

    private boolean dispatchReadRequest() {
        // 消息头大小
        final int msgHeaderSize = 8 + 4; // phyoffset + size
        // 开启循环不断读取数据
        while (true) {
            // 获可读取的字节数
            int diff = this.byteBufferRead.position() - this.dispatchPosition;
            // 如果字节数大于一个消息头的字节数
            if (diff >= msgHeaderSize) {
                // 获取消息在master节点的物理偏移量
                long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                // 获取消息体大小
                int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                // 获取从节点当前CommitLog的最大物理偏移量
                long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                if (slavePhyOffset != 0) {
                    // 如果不一致结束处理
                    if (slavePhyOffset != masterPhyOffset) {
                        log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                            + slavePhyOffset + " MASTER: " + masterPhyOffset);
                        return false;
                    }
                }
                // 如果可读取的字节数大于一个消息头的字节数 + 消息体大小
                if (diff >= (msgHeaderSize + bodySize)) {
                    // 将度缓冲区的数据转为字节数组
                    byte[] bodyData = byteBufferRead.array();
                    // 计算消息体在读缓冲区中的起始位置
                    int dataStart = this.dispatchPosition + msgHeaderSize;
                    // 从读缓冲区中根据消息的位置,读取消息内容,将消息追加到从节点的CommitLog中
                    HAService.this.defaultMessageStore.appendToCommitLog(
                            masterPhyOffset, bodyData, dataStart, bodySize);
                    // 更新dispatchPosition的值为消息头大小+消息体大小
                    this.dispatchPosition += msgHeaderSize + bodySize;
                    if (!reportSlaveMaxOffsetPlus()) {
                        return false;
                    }
                    continue;
                }
            }
            if (!this.byteBufferRead.hasRemaining()) {
                this.reallocateByteBuffer();
            }

            break;
        }

        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

HAConnection
HAConnection中封装了Master节点与从节点的网络通信处理,分别在ReadSocketService和WriteSocketService中。

ReadSocketService
ReadSocketService启动后处理监听到的可读事件,前面知道HAClient中从节点会定时向Master节点汇报从节点的消息同步偏移量,Master节点对汇报请求的处理就在这里,如果从网络中监听到了可读事件,会调用processReadEvent处理读事件:

public class HAConnection {
class ReadSocketService extends ServiceThread {
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 处理可读事件
boolean ok = this.processReadEvent();
if (!ok) {
HAConnection.log.error(“processReadEvent error”);
break;
}
// …
} catch (Exception e) {
HAConnection.log.error(this.getServiceName() + " service has exception.“, e);
break;
}
}
// …
HAConnection.log.info(this.getServiceName() + " service end”);
}
}
}
处理可读事件
processReadEvent中从网络中处理读事件的方式与上面HAClient的dispatchReadRequest类似,都是将网络中的数据读取到读缓冲区中,并用一个变量记录已读取数据的位置,processReadEvent方法的处理逻辑如下:

从socketChannel读取数据到读缓冲区byteBufferRead中,返回读取到的字节数;
如果读取到的字节数大于0,进入下一步,如果读取到的字节数为0,记录连续读取到空字节数的次数是否超过三次,如果超过终止处理;
判断剩余可读取的字节数是否大于等于8,前面知道,从节点发送同步消息拉取偏移量的时候设置的字节大小为8,所以字节数大于等于8的时候表示需要读取从节点发送的偏移量;
计算数据在缓冲区中的位置,从缓冲区读取从节点发送的同步偏移量readOffset;
更新processPosition的值,processPosition表示读缓冲区中已经处理数据的位置;
更新slaveAckOffset为从节点发送的同步偏移量readOffset的值;
如果当前Master节点记录的从节点的同步偏移量slaveRequestOffset小于0,表示还未进行同步,此时将slaveRequestOffset更新为从节点发送的同步偏移量;
如果从节点发送的同步偏移量比当前Master节点的最大物理偏移量还要大,终止本次处理;
调用notifyTransferSome,更新Master节点记录的向从节点同步消息的偏移量;
public class HAConnection {

 class ReadSocketService extends ServiceThread {
 // 读缓冲区    
 private final ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
 // 读缓冲区中已经处理的数据位置
 private int processPosition = 0;

 private boolean processReadEvent() {
        int readSizeZeroTimes = 0;
        // 如果没有可读数据
        if (!this.byteBufferRead.hasRemaining()) {
            this.byteBufferRead.flip();
            // 处理位置置为0
            this.processPosition = 0;
        }
        // 如果数据未读取完毕
        while (this.byteBufferRead.hasRemaining()) {
            try {
                // 从socketChannel读取数据到byteBufferRead中,返回读取到的字节数
                int readSize = this.socketChannel.read(this.byteBufferRead);
                // 如果读取数据字节数大于0
                if (readSize > 0) {
                    // 重置readSizeZeroTimes
                    readSizeZeroTimes = 0;
                    // 获取上次处理读事件的时间戳
                    this.lastReadTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    // 判断剩余可读取的字节数是否大于等于8
                    if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
                        // 获取偏移量内容的结束位置
                        int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
                        // 从结束位置向前读取8个字节得到从点发送的同步偏移量
                        long readOffset = this.byteBufferRead.getLong(pos - 8);
                        // 更新处理位置
                        this.processPosition = pos;
                        // 更新slaveAckOffset为从节点发送的同步进度
                        HAConnection.this.slaveAckOffset = readOffset;
                        // 如果记录的从节点的同步进度小于0,表示还未进行同步
                        if (HAConnection.this.slaveRequestOffset < 0) {
                            // 更新为从节点发送的同步进度
                            HAConnection.this.slaveRequestOffset = readOffset;
                            log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
                        } else if (HAConnection.this.slaveAckOffset > HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset()) {
                            // 如果从节点发送的拉取偏移量比当前Master节点的最大物理偏移量还要大
                            log.warn("slave[{}] request offset={} greater than local commitLog offset={}. ",
                                    HAConnection.this.clientAddr,
                                    HAConnection.this.slaveAckOffset,
                                    HAConnection.this.haService.getDefaultMessageStore().getMaxPhyOffset());
                            return false;
                        }
                        // 更新Master节点记录的向从节点同步消息的偏移量
                        HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
                    }
                } else if (readSize == 0) 
                    // 判断连续读取到空数据的次数是否超过三次
                    if (++readSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    log.error("read socket[" + HAConnection.this.clientAddr + "] < 0");
                    return false;
                }
            } catch (IOException e) {
                log.error("processReadEvent exception", e);
                return false;
            }
        }

        return true;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

}
前面在GroupTransferService中可以看到是通过push2SlaveMaxOffset的值判断本次同步是否完成的,在notifyTransferSome方法中可以看到当Master节点收到从节点反馈的消息拉取偏移量时,对push2SlaveMaxOffset的值进行了更新:

public class HAService {
// 向从节点推送的消息最大偏移量
private final GroupTransferService groupTransferService;

public void notifyTransferSome(final long offset) {
    // 如果传入的偏移大于push2SlaveMaxOffset记录的值,进行更新
    for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) {
        // 更新向从节点推送的消息最大偏移量
        boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset);
        if (ok) {
            this.groupTransferService.notifyTransferSome();
            break;
        } else {
            value = this.push2SlaveMaxOffset.get();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

}
WriteSocketService
WriteSocketService用于Master节点向从节点发送同步消息,处理逻辑如下:

根据从节点发送的主从同步消息拉取偏移量slaveRequestOffset进行判断:

如果slaveRequestOffset值为-1,表示还未收到从节点报告的同步偏移量,此时睡眠一段时间等待从节点发送消息拉取偏移量;
如果slaveRequestOffset值不为-1,表示已经开始进行主从同步进行下一步;
判断nextTransferFromWhere值是否为-1,nextTransferFromWhere记录了下次需要传输的消息在CommitLog中的偏移量,如果值为-1表示初次进行数据同步,此时有两种情况:

如果从节点发送的拉取偏移量slaveRequestOffset为0,就从当前CommitLog文件最大偏移量开始同步;
如果slaveRequestOffset不为0,则从slaveRequestOffset位置处进行数据同步;
判断上次写事件是否已经将数据都写入到从节点

如果已经写入完毕,判断距离上次写入数据的时间间隔是否超过了设置的心跳时间,如果超过,为了避免连接空闲被关闭,需要发送一个心跳包,此时构建心跳包的请求数据,调用transferData方法传输数据;
如果上次的数据还未传输完毕,调用transferData方法继续传输,如果还是未完成,则结束此处处理;
根据nextTransferFromWhere从CommitLog中获取消息,如果未获取到消息,等待100ms,如果获取到消息,从CommitLog中获取消息进行传输:
(1)如果获取到消息的字节数大于最大传输的大小,设置最最大传输数量,分批进行传输;
(2)更新下次传输的偏移量地址也就是nextTransferFromWhere的值;
(3)从CommitLog中获取的消息内容设置到将读取到的消息数据设置到selectMappedBufferResult中;
(4)设置消息头信息,包括消息头字节数、拉取消息的偏移量等;
(5)调用transferData发送数据;

public class HAConnection {
class WriteSocketService extends ServiceThread {
private final int headerSize = 8 + 4;// 消息头大小
@Override
public void run() {
HAConnection.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
this.selector.select(1000);
// 如果slaveRequestOffset为-1,表示还未收到从节点报告的拉取进度
if (-1 == HAConnection.this.slaveRequestOffset) {
// 等待一段时间
Thread.sleep(10);
continue;
}
// 初次进行数据同步
if (-1 == this.nextTransferFromWhere) {
// 如果拉取进度为0
if (0 == HAConnection.this.slaveRequestOffset) {
// 从master节点最大偏移量从开始传输
long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
masterOffset =
masterOffset
- (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
.getMappedFileSizeCommitLog());

                        if (masterOffset < 0) {
                            masterOffset = 0;
                        }
                        // 更新nextTransferFromWhere
                        this.nextTransferFromWhere = masterOffset;
                    } else {
                        // 根据从节点发送的偏移量开始数据同步
                        this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
                    }

                    log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
                        + "], and slave request " + HAConnection.this.slaveRequestOffset);
                }
                // 判断上次传输是否完毕
                if (this.lastWriteOver) {
                    // 获取当前时间距离上次写入数据的时间间隔
                    long interval =
                        HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                    // 如果距离上次写入数据的时间间隔超过了设置的心跳时间
                    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
                        .getHaSendHeartbeatInterval()) {
                        // 构建header
                        this.byteBufferHeader.position(0);
                        this.byteBufferHeader.limit(headerSize);
                        this.byteBufferHeader.putLong(this.nextTransferFromWhere);
                        this.byteBufferHeader.putInt(0);
                        this.byteBufferHeader.flip();
                        // 发送心跳包
                        this.lastWriteOver = this.transferData();
                        if (!this.lastWriteOver)
                            continue;
                    }
                } else {
                    // 未传输完毕,继续上次的传输
                    this.lastWriteOver = this.transferData();
                    // 如果依旧未完成,结束本次处理
                    if (!this.lastWriteOver)
                        continue;
                }
                // 根据偏移量获取消息数据
                SelectMappedBufferResult selectResult =
                    HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
                if (selectResult != null) {// 获取消息不为空
                    // 获取消息内容大小
                    int size = selectResult.getSize();
                    // 如果消息的字节数大于最大传输的大小
                    if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
                        // 设置为最大传输大小
                        size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
                    }

                    long thisOffset = this.nextTransferFromWhere;
                    // 更新下次传输的偏移量地址
                    this.nextTransferFromWhere += size;

                    selectResult.getByteBuffer().limit(size);
                    // 将读取到的消息数据设置到selectMappedBufferResult
                    this.selectMappedBufferResult = selectResult;

                    // 设置消息头
                    this.byteBufferHeader.position(0);
                    // 设置消息头大小
                    this.byteBufferHeader.limit(headerSize);
                    // 设置偏移量地址
                    this.byteBufferHeader.putLong(thisOffset);
                    // 设置消息内容大小
                    this.byteBufferHeader.putInt(size);
                    this.byteBufferHeader.flip();
                    // 发送数据
                    this.lastWriteOver = this.transferData();
                } else {
                    // 等待100ms
                    HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
                }
            } catch (Exception e) {

                HAConnection.log.error(this.getServiceName() + " service has exception.", e);
                break;
            }
        }

        HAConnection.this.haService.getWaitNotifyObject().removeFromWaitingThreadTable();

        // ...
        HAConnection.log.info(this.getServiceName() + " service end");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

}
发送数据
transferData方法的处理逻辑如下:

发送消息头数据;
消息头数据发送完毕之后,发送消息内容,前面知道从CommitLog中读取的消息内容放入到了selectMappedBufferResult,将selectMappedBufferResult的内容发送给从节点;
public class HAConnection {
class WriteSocketService extends ServiceThread {
private boolean transferData() throws Exception {
int writeSizeZeroTimes = 0;
// 写入消息头
while (this.byteBufferHeader.hasRemaining()) {
// 发送消息头数据
int writeSize = this.socketChannel.write(this.byteBufferHeader);
if (writeSize > 0) {
writeSizeZeroTimes = 0;
// 记录发送时间
this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
} else if (writeSize == 0) {
if (++writeSizeZeroTimes >= 3) {
break;
}
} else {
throw new Exception(“ha master write header error < 0”);
}
}

        if (null == this.selectMappedBufferResult) {
            return !this.byteBufferHeader.hasRemaining();
        }

        writeSizeZeroTimes = 0;

        // 消息头数据发送完毕之后,发送消息内容
        if (!this.byteBufferHeader.hasRemaining()) {
            while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                // 发送消息内容
                int writeSize = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                if (writeSize > 0) {
                    writeSizeZeroTimes = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else if (writeSize == 0) {
                    if (++writeSizeZeroTimes >= 3) {
                        break;
                    }
                } else {
                    throw new Exception("ha master write body error < 0");
                }
            }
        }
        // ...
        return result;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/297681
推荐阅读
相关标签
  

闽ICP备14008679号