认证
认证请求有头部,响应有头部
请求头
- class RequestHeader {
- int xid;
- int type;
- }
认证请求
- class AuthPacket {
- int type;
- ustring scheme;
- buffer auth;
- }
响应头
- class ReplyHeader {
- int xid;
- long zxid;
- int err;
- }
- public class AuthPacketTest {
-
- private static final Logger log = LoggerFactory.getLogger(AuthPacketTest.class);
-
- private static SocketChannel channel;
-
- private static volatile boolean isSessionOpened = false;
-
- private static long sid = 0;
-
- @BeforeClass
- public static void initialize() throws IOException {
- channel = SocketChannel.open();
-
- //SocketAddress socketAddress = new InetSocketAddress("localhost", 2182);
- SocketAddress socketAddress = new InetSocketAddress("localhost", 2191);
- channel.connect(socketAddress);
- }
-
- private static class WriteWorker extends Thread {
- private void isSessionAlive() {
- heartbeat();
- }
-
- private void heartbeat() {
- ping();
- }
-
- private void ping() {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
- boa.writeInt(-1, "len"); // We'll fill this in later
-
- RequestHeader h = new RequestHeader(-2, OpCode.ping);
- h.serialize(boa, "header");
- baos.close();
-
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.capacity() - 4);
- bb.rewind();
-
- channel.write(bb);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void auth() {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
- boa.writeInt(-1, "len"); // We'll fill this in later
-
- RequestHeader requestHeader = new RequestHeader(-4, OpCode.auth);
- requestHeader.serialize(boa, "header");
-
- //AuthPacket authPacket = new AuthPacket(0, "auth", "".getBytes());
- AuthPacket authPacket = new AuthPacket(0, "ip", "127.0.0.1".getBytes());
- authPacket.serialize(boa, "request");
- baos.close();
-
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.capacity() - 4);
- bb.rewind();
-
- channel.write(bb);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void connect() {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-
- ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]);
-
- boa.writeInt(-1, "len"); // We'll fill this in later
- connectRequest.serialize(boa, "connect");
- boa.writeBool(true, "readOnly");
- baos.close();
-
- ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
- bb.putInt(bb.capacity() - 4);
- bb.rewind();
-
- channel.write(bb);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- public void run() {
- connect();
- for (;;) {
- isSessionAlive();
- auth();
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private static class ReaderWorker extends Thread {
- private void handConnectResponse() {
- try {
- ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
- while (true) {
- channel.read(lenBuffer);
- if (! lenBuffer.hasRemaining()) {
- break;
- }
- }
- lenBuffer.flip();
- int len = lenBuffer.getInt();
- if (len < 0 || len >= ClientCnxn.packetLen) {
- throw new IOException("Packet len" + len + " is out of range!");
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(len);
- while (true) {
- channel.read(buffer);
- if (! buffer.hasRemaining()) {
- break;
- }
- }
- buffer.flip();
-
- ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ConnectResponse conRsp = new ConnectResponse();
- conRsp.deserialize(bbia, "connect");
-
- // read "is read-only" flag
- boolean isRO = false;
- try {
- isRO = bbia.readBool("readOnly");
- } catch (IOException e) {
- // this is ok -- just a packet from an old server which
- // doesn't contain readOnly field
- log.warn("Connected to an old server; r-o mode will be unavailable");
- }
-
- sid = conRsp.getSessionId();
- isSessionOpened = true;
- System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- private void handleResponse() {
- try {
- ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
- while (true) {
- channel.read(lenBuffer);
- if (! lenBuffer.hasRemaining()) {
- break;
- }
- }
- lenBuffer.flip();
- int len = lenBuffer.getInt();
- if (len < 0 || len >= ClientCnxn.packetLen) {
- throw new IOException("Packet len" + len + " is out of range!");
- }
-
- ByteBuffer buffer = ByteBuffer.allocate(len);
- while (true) {
- channel.read(buffer);
- if (! buffer.hasRemaining()) {
- break;
- }
- }
- buffer.flip();
-
- ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-
- ReplyHeader replyHdr = new ReplyHeader();
- replyHdr.deserialize(bbia, "header");
-
- System.out.println(ToStringBuilder.reflectionToString(replyHdr, ToStringStyle.MULTI_LINE_STYLE));
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void run() {
- for (;;) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- if (! isSessionOpened) {
- handConnectResponse();
- } else {
- handleResponse();
- }
- }
- }
- }
-
- @Test
- public void connect() throws IOException {
- Thread wt = new WriteWorker();
- wt.start();
- Thread rt = new ReaderWorker();
- rt.start();
-
- try {
- wt.join();
- rt.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
启动运行后:
服务器打印出如下日志:
2017-04-16 02:46:47,000 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:3681
2017-04-16 02:46:47,078 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:3681
2017-04-16 02:46:47,203 [myid:1] - INFO [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b71a085380002 with negotiated timeout 10000 for client /127.0.0.1:3681
2017-04-16 02:46:47,218 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@924] - got auth packet /127.0.0.1:3681
2017-04-16 02:46:47,375 [myid:1] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@958] - auth success /127.0.0.1:3681
2017-04-16 02:46:58,062 [myid:1] - INFO [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:3681 which had sessionid 0x15b71a085380002
客户端输出:
org.apache.zookeeper.proto.ConnectResponse@1e0be38[
protocolVersion=0
timeOut=10000
sessionId=97796751162343430
passwd={-32,49,98,88,-1,-10,-5,13,19,-80,10,87,71,61,41,-28}
]
org.apache.zookeeper.proto.ReplyHeader@1aaa14a[
xid=-4
zxid=0
err=0
]