当前位置:   article > 正文

zookeeper 请求认证

r-o mode will be unavailable


认证
认证请求有头部,响应有头部


请求头

  1. class RequestHeader {
  2. int xid;
  3. int type;
  4. }

 


认证请求

  1. class AuthPacket {
  2. int type;
  3. ustring scheme;
  4. buffer auth;
  5. }

 


响应头

  1. class ReplyHeader {
  2. int xid;
  3. long zxid;
  4. int err;
  5. }

 




  1. public class AuthPacketTest {
  2. private static final Logger log = LoggerFactory.getLogger(AuthPacketTest.class);
  3. private static SocketChannel channel;
  4. private static volatile boolean isSessionOpened = false;
  5. private static long sid = 0;
  6. @BeforeClass
  7. public static void initialize() throws IOException {
  8. channel = SocketChannel.open();
  9. //SocketAddress socketAddress = new InetSocketAddress("localhost", 2182);
  10. SocketAddress socketAddress = new InetSocketAddress("localhost", 2191);
  11. channel.connect(socketAddress);
  12. }
  13. private static class WriteWorker extends Thread {
  14. private void isSessionAlive() {
  15. heartbeat();
  16. }
  17. private void heartbeat() {
  18. ping();
  19. }
  20. private void ping() {
  21. try {
  22. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  23. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
  24. boa.writeInt(-1, "len"); // We'll fill this in later
  25. RequestHeader h = new RequestHeader(-2, OpCode.ping);
  26. h.serialize(boa, "header");
  27. baos.close();
  28. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  29. bb.putInt(bb.capacity() - 4);
  30. bb.rewind();
  31. channel.write(bb);
  32. } catch (IOException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. private void auth() {
  37. try {
  38. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  39. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
  40. boa.writeInt(-1, "len"); // We'll fill this in later
  41. RequestHeader requestHeader = new RequestHeader(-4, OpCode.auth);
  42. requestHeader.serialize(boa, "header");
  43. //AuthPacket authPacket = new AuthPacket(0, "auth", "".getBytes());
  44. AuthPacket authPacket = new AuthPacket(0, "ip", "127.0.0.1".getBytes());
  45. authPacket.serialize(boa, "request");
  46. baos.close();
  47. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  48. bb.putInt(bb.capacity() - 4);
  49. bb.rewind();
  50. channel.write(bb);
  51. } catch (IOException e) {
  52. e.printStackTrace();
  53. }
  54. }
  55. private void connect() {
  56. try {
  57. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  58. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
  59. ConnectRequest connectRequest = new ConnectRequest(0, 0, 10000, 0, new byte[16]);
  60. boa.writeInt(-1, "len"); // We'll fill this in later
  61. connectRequest.serialize(boa, "connect");
  62. boa.writeBool(true, "readOnly");
  63. baos.close();
  64. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  65. bb.putInt(bb.capacity() - 4);
  66. bb.rewind();
  67. channel.write(bb);
  68. } catch (IOException e) {
  69. e.printStackTrace();
  70. }
  71. }
  72. public void run() {
  73. connect();
  74. for (;;) {
  75. isSessionAlive();
  76. auth();
  77. try {
  78. Thread.sleep(1000);
  79. } catch (InterruptedException e) {
  80. e.printStackTrace();
  81. }
  82. }
  83. }
  84. }
  85. private static class ReaderWorker extends Thread {
  86. private void handConnectResponse() {
  87. try {
  88. ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
  89. while (true) {
  90. channel.read(lenBuffer);
  91. if (! lenBuffer.hasRemaining()) {
  92. break;
  93. }
  94. }
  95. lenBuffer.flip();
  96. int len = lenBuffer.getInt();
  97. if (len < 0 || len >= ClientCnxn.packetLen) {
  98. throw new IOException("Packet len" + len + " is out of range!");
  99. }
  100. ByteBuffer buffer = ByteBuffer.allocate(len);
  101. while (true) {
  102. channel.read(buffer);
  103. if (! buffer.hasRemaining()) {
  104. break;
  105. }
  106. }
  107. buffer.flip();
  108. ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
  109. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  110. ConnectResponse conRsp = new ConnectResponse();
  111. conRsp.deserialize(bbia, "connect");
  112. // read "is read-only" flag
  113. boolean isRO = false;
  114. try {
  115. isRO = bbia.readBool("readOnly");
  116. } catch (IOException e) {
  117. // this is ok -- just a packet from an old server which
  118. // doesn't contain readOnly field
  119. log.warn("Connected to an old server; r-o mode will be unavailable");
  120. }
  121. sid = conRsp.getSessionId();
  122. isSessionOpened = true;
  123. System.out.println(ToStringBuilder.reflectionToString(conRsp, ToStringStyle.MULTI_LINE_STYLE));
  124. } catch (IOException e) {
  125. e.printStackTrace();
  126. }
  127. }
  128. private void handleResponse() {
  129. try {
  130. ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
  131. while (true) {
  132. channel.read(lenBuffer);
  133. if (! lenBuffer.hasRemaining()) {
  134. break;
  135. }
  136. }
  137. lenBuffer.flip();
  138. int len = lenBuffer.getInt();
  139. if (len < 0 || len >= ClientCnxn.packetLen) {
  140. throw new IOException("Packet len" + len + " is out of range!");
  141. }
  142. ByteBuffer buffer = ByteBuffer.allocate(len);
  143. while (true) {
  144. channel.read(buffer);
  145. if (! buffer.hasRemaining()) {
  146. break;
  147. }
  148. }
  149. buffer.flip();
  150. ByteBufferInputStream bbis = new ByteBufferInputStream(buffer);
  151. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  152. ReplyHeader replyHdr = new ReplyHeader();
  153. replyHdr.deserialize(bbia, "header");
  154. System.out.println(ToStringBuilder.reflectionToString(replyHdr, ToStringStyle.MULTI_LINE_STYLE));
  155. } catch (IOException e) {
  156. e.printStackTrace();
  157. }
  158. }
  159. public void run() {
  160. for (;;) {
  161. try {
  162. Thread.sleep(1000);
  163. } catch (InterruptedException e) {
  164. e.printStackTrace();
  165. }
  166. if (! isSessionOpened) {
  167. handConnectResponse();
  168. } else {
  169. handleResponse();
  170. }
  171. }
  172. }
  173. }
  174. @Test
  175. public void connect() throws IOException {
  176. Thread wt = new WriteWorker();
  177. wt.start();
  178. Thread rt = new ReaderWorker();
  179. rt.start();
  180. try {
  181. wt.join();
  182. rt.join();
  183. } catch (InterruptedException e) {
  184. e.printStackTrace();
  185. }
  186. }
  187. }

 






启动运行后:

服务器打印出如下日志:

2017-04-16 02:46:47,000 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:NIOServerCnxnFactory@192] - Accepted socket connection from /127.0.0.1:3681
2017-04-16 02:46:47,078 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@900] - Client attempting to establish new session at /127.0.0.1:3681
2017-04-16 02:46:47,203 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperServer@645] - Established session 0x15b71a085380002 with negotiated timeout 10000 for client /127.0.0.1:3681
2017-04-16 02:46:47,218 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@924] - got auth packet /127.0.0.1:3681
2017-04-16 02:46:47,375 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2191:ZooKeeperServer@958] - auth success /127.0.0.1:3681
2017-04-16 02:46:58,062 [myid:1] - INFO  [CommitProcessor:1:NIOServerCnxn@1008] - Closed socket connection for client /127.0.0.1:3681 which had sessionid 0x15b71a085380002


客户端输出:

org.apache.zookeeper.proto.ConnectResponse@1e0be38[
  protocolVersion=0
  timeOut=10000
  sessionId=97796751162343430
  passwd={-32,49,98,88,-1,-10,-5,13,19,-80,10,87,71,61,41,-28}
]
org.apache.zookeeper.proto.ReplyHeader@1aaa14a[
  xid=-4
  zxid=0
  err=0
]

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/983213
推荐阅读
相关标签
  

闽ICP备14008679号