当前位置:   article > 正文

细说HBase怎么完成一个Get操作 (client side)

hbase this.hbaseclient.get(get)

 

 

源码解析基于HBase-0.20.6。

先看HTable类get()方法的code:

 

   HTable.java

  1. /**
  2. * Extracts certain cells from a given row.
  3. * @param get The object that specifies what data to fetch and from which row.
  4. * @return The data coming from the specified row, if it exists. If the row
  5. * specified doesn't exist, the {@link Result} instance returned won't
  6. * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
  7. * @throws IOException if a remote or network exception occurs.
  8. * @since 0.20.0
  9. */
  10. public Result get(final Get get) throws IOException {
  11. return connection.getRegionServerWithRetries(
  12. new ServerCallable<Result>(connection, tableName, get.getRow()) {
  13. public Result call() throws IOException {
  14. return server.get(location.getRegionInfo().getRegionName(), get);
  15. }
  16. }
  17. );
  18. }

 这段code 比较绕,但至少我们知道可以去查connection的getRegionServerWithRetries方法。那么connection是个什么东西呢?

这个玩意是定义在HTable里面的:

private final HConnection connection;

何时实例化的呢?在HTable的构造函数里面:

this.connection = HConnectionManager.getConnection(conf);

这个conf是一个HBaseConfiguration对象,是HTable构造函数的参数。OK,继续道HConnectionManager里面看看这个connection怎么来的吧:

HConnectionManager.java

  1. /**
  2. * Get the connection object for the instance specified by the configuration
  3. * If no current connection exists, create a new connection for that instance
  4. * @param conf
  5. * @return HConnection object for the instance specified by the configuration
  6. */
  7. public static HConnection getConnection(HBaseConfiguration conf) {
  8. TableServers connection;
  9. synchronized (HBASE_INSTANCES) {
  10. connection = HBASE_INSTANCES.get(conf);
  11. if (connection == null) {
  12. connection = new TableServers(conf);
  13. HBASE_INSTANCES.put(conf, connection);
  14. }
  15. }
  16. return connection;
  17. }

 

现在我们知道每一个conf对应一个connection,具体来说是TableServers类对象(实现了HConnection接口),所有的connections放在一个pool里。那么connection到底干嘛用呢?我们要看看HConnection这个接口的定义。

 

HConnection.java

  1. /**
  2. * Cluster connection.
  3. * {@link HConnectionManager} manages instances of this class.
  4. */
  5. public interface HConnection {
  6. /**
  7. * Retrieve ZooKeeperWrapper used by the connection.
  8. * @return ZooKeeperWrapper handle being used by the connection.
  9. * @throws IOException
  10. */
  11. public ZooKeeperWrapper getZooKeeperWrapper() throws IOException;
  12. /**
  13. * @return proxy connection to master server for this instance
  14. * @throws MasterNotRunningException
  15. */
  16. public HMasterInterface getMaster() throws MasterNotRunningException;
  17. /** @return - true if the master server is running */
  18. public boolean isMasterRunning();
  19. /**
  20. * Checks if <code>tableName</code> exists.
  21. * @param tableName Table to check.
  22. * @return True if table exists already.
  23. * @throws MasterNotRunningException
  24. */
  25. public boolean tableExists(final byte [] tableName)
  26. throws MasterNotRunningException;
  27. /**
  28. * A table that isTableEnabled == false and isTableDisabled == false
  29. * is possible. This happens when a table has a lot of regions
  30. * that must be processed.
  31. * @param tableName
  32. * @return true if the table is enabled, false otherwise
  33. * @throws IOException
  34. */
  35. public boolean isTableEnabled(byte[] tableName) throws IOException;
  36. /**
  37. * @param tableName
  38. * @return true if the table is disabled, false otherwise
  39. * @throws IOException
  40. */
  41. public boolean isTableDisabled(byte[] tableName) throws IOException;
  42. /**
  43. * @param tableName
  44. * @return true if all regions of the table are available, false otherwise
  45. * @throws IOException
  46. */
  47. public boolean isTableAvailable(byte[] tableName) throws IOException;
  48. /**
  49. * List all the userspace tables. In other words, scan the META table.
  50. *
  51. * If we wanted this to be really fast, we could implement a special
  52. * catalog table that just contains table names and their descriptors.
  53. * Right now, it only exists as part of the META table's region info.
  54. *
  55. * @return - returns an array of HTableDescriptors
  56. * @throws IOException
  57. */
  58. public HTableDescriptor[] listTables() throws IOException;
  59. /**
  60. * @param tableName
  61. * @return table metadata
  62. * @throws IOException
  63. */
  64. public HTableDescriptor getHTableDescriptor(byte[] tableName)
  65. throws IOException;
  66. /**
  67. * Find the location of the region of <i>tableName</i> that <i>row</i>
  68. * lives in.
  69. * @param tableName name of the table <i>row</i> is in
  70. * @param row row key you're trying to find the region of
  71. * @return HRegionLocation that describes where to find the reigon in
  72. * question
  73. * @throws IOException
  74. */
  75. public HRegionLocation locateRegion(final byte [] tableName,
  76. final byte [] row)
  77. throws IOException;
  78. /**
  79. * Find the location of the region of <i>tableName</i> that <i>row</i>
  80. * lives in, ignoring any value that might be in the cache.
  81. * @param tableName name of the table <i>row</i> is in
  82. * @param row row key you're trying to find the region of
  83. * @return HRegionLocation that describes where to find the reigon in
  84. * question
  85. * @throws IOException
  86. */
  87. public HRegionLocation relocateRegion(final byte [] tableName,
  88. final byte [] row)
  89. throws IOException;
  90. /**
  91. * Establishes a connection to the region server at the specified address.
  92. * @param regionServer - the server to connect to
  93. * @return proxy for HRegionServer
  94. * @throws IOException
  95. */
  96. public HRegionInterface getHRegionConnection(HServerAddress regionServer)
  97. throws IOException;
  98. /**
  99. * Establishes a connection to the region server at the specified address.
  100. * @param regionServer - the server to connect to
  101. * @param getMaster - do we check if master is alive
  102. * @return proxy for HRegionServer
  103. * @throws IOException
  104. */
  105. public HRegionInterface getHRegionConnection(
  106. HServerAddress regionServer, boolean getMaster)
  107. throws IOException;
  108. /**
  109. * Find region location hosting passed row
  110. * @param tableName
  111. * @param row Row to find.
  112. * @param reload If true do not use cache, otherwise bypass.
  113. * @return Location of row.
  114. * @throws IOException
  115. */
  116. HRegionLocation getRegionLocation(byte [] tableName, byte [] row,
  117. boolean reload)
  118. throws IOException;
  119. /**
  120. * Pass in a ServerCallable with your particular bit of logic defined and
  121. * this method will manage the process of doing retries with timed waits
  122. * and refinds of missing regions.
  123. *
  124. * @param <T> the type of the return value
  125. * @param callable
  126. * @return an object of type T
  127. * @throws IOException
  128. * @throws RuntimeException
  129. */
  130. public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
  131. throws IOException, RuntimeException;
  132. /**
  133. * Pass in a ServerCallable with your particular bit of logic defined and
  134. * this method will pass it to the defined region server.
  135. * @param <T> the type of the return value
  136. * @param callable
  137. * @return an object of type T
  138. * @throws IOException
  139. * @throws RuntimeException
  140. */
  141. public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable)
  142. throws IOException, RuntimeException;
  143. /**
  144. * Process a batch of Puts. Does the retries.
  145. * @param list A batch of Puts to process.
  146. * @param tableName The name of the table
  147. * @return Count of committed Puts. On fault, < list.size().
  148. * @throws IOException
  149. */
  150. public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
  151. throws IOException;
  152. /**
  153. * Process a batch of Deletes. Does the retries.
  154. * @param list A batch of Deletes to process.
  155. * @return Count of committed Deletes. On fault, < list.size().
  156. * @param tableName The name of the table
  157. * @throws IOException
  158. */
  159. public int processBatchOfDeletes(ArrayList<Delete> list, byte[] tableName)
  160. throws IOException;
  161. }

 

上面的code是整个接口的定义,我们现在知道这玩意是封装了一些客户端查询处理请求,像put、delete这些封装在方法

public <T> T getRegionServerWithRetries(ServerCallable<T> callable) 里执行,put、delete等被封装在callable里面。这也就是为我们刚才在HTable.get()里看到的。

到这里要看TableServers.getRegionServerWithRetries(ServerCallable<T> callable)了,继续看code

  1. public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
  2. throws IOException, RuntimeException {
  3. List<Throwable> exceptions = new ArrayList<Throwable>();
  4. for(int tries = 0; tries < numRetries; tries++) {
  5. try {
  6. callable.instantiateServer(tries!=0); return callable.call();
  7. } catch (Throwable t) {
  8. t = translateException(t);
  9. exceptions.add(t);
  10. if (tries == numRetries - 1) {
  11. throw new RetriesExhaustedException(callable.getServerName(),
  12. callable.getRegionName(), callable.getRow(), tries, exceptions);
  13. }
  14. }
  15. try {
  16. Thread.sleep(getPauseTime(tries));
  17. } catch (InterruptedException e) {
  18. // continue
  19. }
  20. }
  21. return null;
  22. }

 

比较核心的code就那两句,首先根据callable对象来完成一些定位ReginServer的工作,然后执行call来进行请求,这里要注意这个call方法是在最最最最开始的HTable.get里面的内部类里重写的。看ServerCallable类的一部分code:

  1. public abstract class ServerCallable<T> implements Callable<T> {
  2. protected final HConnection connection;
  3. protected final byte [] tableName;
  4. protected final byte [] row;
  5. protected HRegionLocation location;
  6. protected HRegionInterface server;
  7. /**
  8. * @param connection
  9. * @param tableName
  10. * @param row
  11. */
  12. public ServerCallable(HConnection connection, byte [] tableName, byte [] row) {
  13. this.connection = connection;
  14. this.tableName = tableName;
  15. this.row = row;
  16. }
  17. /**
  18. *
  19. * @param reload set this to true if connection should re-find the region
  20. * @throws IOException
  21. */
  22. public void instantiateServer(boolean reload) throws IOException {
  23. this.location = connection.getRegionLocation(tableName, row, reload);
  24. this.server = connection.getHRegionConnection(location.getServerAddress());
  25. }

 

所以一个ServerCallable对象包括tableName,row等,并且会通过构造函数传入一个connection引用,并且会调用该connection.getHRegionConnection方法来获取跟RegionServer打交道的一个handle(其实我也不知道称呼它啥了,不能叫connection吧,那就重复了,所以说HBase代码起的名字让我很ft,会误解)。

 

具体看怎么获得这个新玩意的:

HConnectinManager.java

  1. public HRegionInterface getHRegionConnection(
  2. HServerAddress regionServer, boolean getMaster)
  3. throws IOException {
  4. if (getMaster) {
  5. getMaster();
  6. }
  7. HRegionInterface server;
  8. synchronized (this.servers) {
  9. // See if we already have a connection
  10. server = this.servers.get(regionServer.toString());
  11. if (server == null) { // Get a connection
  12. try {
  13. server = (HRegionInterface)HBaseRPC.waitForProxy(
  14. serverInterfaceClass, HBaseRPCProtocolVersion.versionID,
  15. regionServer.getInetSocketAddress(), this.conf,
  16. this.maxRPCAttempts, this.rpcTimeout);
  17. } catch (RemoteException e) {
  18. throw RemoteExceptionHandler.decodeRemoteException(e);
  19. }
  20. this.servers.put(regionServer.toString(), server);
  21. }
  22. }
  23. return server;
  24. }

 

再挖下去看这个server怎么出来的(HBaseRPC类里面):

  1. public static VersionedProtocol getProxy(Class<?> protocol,
  2. long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
  3. Configuration conf, SocketFactory factory)
  4. throws IOException {
  5. VersionedProtocol proxy =
  6. (VersionedProtocol) Proxy.newProxyInstance(
  7. protocol.getClassLoader(), new Class[] { protocol },
  8. new Invoker(addr, ticket, conf, factory));
  9. long serverVersion = proxy.getProtocolVersion(protocol.getName(),
  10. clientVersion);
  11. if (serverVersion == clientVersion) {
  12. return proxy;
  13. }
  14. throw new VersionMismatch(protocol.getName(), clientVersion,
  15. serverVersion);
  16. }

 

 

这两部分code看出用到了java的动态代理机制,server是一个动态代理对象,实现了变量serverInterfaceClass指定的接口。在这里也就是HRegionInterface,也就是说server实现了该接口的内容。那么该接口定义哪些方法呢?

 

  1. public interface HRegionInterface extends HBaseRPCProtocolVersion {
  2. /**
  3. * Get metainfo about an HRegion
  4. *
  5. * @param regionName name of the region
  6. * @return HRegionInfo object for region
  7. * @throws NotServingRegionException
  8. */
  9. public HRegionInfo getRegionInfo(final byte [] regionName)
  10. throws NotServingRegionException;
  11. /**
  12. * Return all the data for the row that matches <i>row</i> exactly,
  13. * or the one that immediately preceeds it.
  14. *
  15. * @param regionName region name
  16. * @param row row key
  17. * @param family Column family to look for row in.
  18. * @return map of values
  19. * @throws IOException
  20. */
  21. public Result getClosestRowBefore(final byte [] regionName,
  22. final byte [] row, final byte [] family)
  23. throws IOException;
  24. /**
  25. *
  26. * @return the regions served by this regionserver
  27. */
  28. public HRegion [] getOnlineRegionsAsArray();
  29. /**
  30. * Perform Get operation.
  31. * @param regionName name of region to get from
  32. * @param get Get operation
  33. * @return Result
  34. * @throws IOException
  35. */
  36. public Result get(byte [] regionName, Get get) throws IOException;
  37. /**
  38. * Perform exists operation.
  39. * @param regionName name of region to get from
  40. * @param get Get operation describing cell to test
  41. * @return true if exists
  42. * @throws IOException
  43. */
  44. public boolean exists(byte [] regionName, Get get) throws IOException;
  45. /**
  46. * Put data into the specified region
  47. * @param regionName
  48. * @param put the data to be put
  49. * @throws IOException
  50. */
  51. public void put(final byte [] regionName, final Put put)
  52. throws IOException;
  53. /**
  54. * Put an array of puts into the specified region
  55. *
  56. * @param regionName
  57. * @param puts
  58. * @return The number of processed put's. Returns -1 if all Puts
  59. * processed successfully.
  60. * @throws IOException
  61. */
  62. public int put(final byte[] regionName, final Put [] puts)
  63. throws IOException;
  64. /**
  65. * Deletes all the KeyValues that match those found in the Delete object,
  66. * if their ts <= to the Delete. In case of a delete with a specific ts it
  67. * only deletes that specific KeyValue.
  68. * @param regionName
  69. * @param delete
  70. * @throws IOException
  71. */
  72. public void delete(final byte[] regionName, final Delete delete)
  73. throws IOException;
  74. /**
  75. * Put an array of deletes into the specified region
  76. *
  77. * @param regionName
  78. * @param deletes
  79. * @return The number of processed deletes. Returns -1 if all Deletes
  80. * processed successfully.
  81. * @throws IOException
  82. */
  83. public int delete(final byte[] regionName, final Delete [] deletes)
  84. throws IOException;
  85. /**
  86. * Atomically checks if a row/family/qualifier value match the expectedValue.
  87. * If it does, it adds the put.
  88. *
  89. * @param regionName
  90. * @param row
  91. * @param family
  92. * @param qualifier
  93. * @param value the expected value
  94. * @param put
  95. * @throws IOException
  96. * @return true if the new put was execute, false otherwise
  97. */
  98. public boolean checkAndPut(final byte[] regionName, final byte [] row,
  99. final byte [] family, final byte [] qualifier, final byte [] value,
  100. final Put put)
  101. throws IOException;
  102. /**
  103. * Atomically increments a column value. If the column value isn't long-like,
  104. * this could throw an exception.
  105. *
  106. * @param regionName
  107. * @param row
  108. * @param family
  109. * @param qualifier
  110. * @param amount
  111. * @param writeToWAL whether to write the increment to the WAL
  112. * @return new incremented column value
  113. * @throws IOException
  114. */
  115. public long incrementColumnValue(byte [] regionName, byte [] row,
  116. byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
  117. throws IOException;
  118. //
  119. // remote scanner interface
  120. //
  121. /**
  122. * Opens a remote scanner with a RowFilter.
  123. *
  124. * @param regionName name of region to scan
  125. * @param scan configured scan object
  126. * @return scannerId scanner identifier used in other calls
  127. * @throws IOException
  128. */
  129. public long openScanner(final byte [] regionName, final Scan scan)
  130. throws IOException;
  131. /**
  132. * Get the next set of values
  133. * @param scannerId clientId passed to openScanner
  134. * @return map of values; returns null if no results.
  135. * @throws IOException
  136. */
  137. public Result next(long scannerId) throws IOException;
  138. /**
  139. * Get the next set of values
  140. * @param scannerId clientId passed to openScanner
  141. * @param numberOfRows the number of rows to fetch
  142. * @return Array of Results (map of values); array is empty if done with this
  143. * region and null if we are NOT to go to the next region (happens when a
  144. * filter rules that the scan is done).
  145. * @throws IOException
  146. */
  147. public Result [] next(long scannerId, int numberOfRows) throws IOException;
  148. /**
  149. * Close a scanner
  150. *
  151. * @param scannerId the scanner id returned by openScanner
  152. * @throws IOException
  153. */
  154. public void close(long scannerId) throws IOException;
  155. /**
  156. * Opens a remote row lock.
  157. *
  158. * @param regionName name of region
  159. * @param row row to lock
  160. * @return lockId lock identifier
  161. * @throws IOException
  162. */
  163. public long lockRow(final byte [] regionName, final byte [] row)
  164. throws IOException;
  165. /**
  166. * Releases a remote row lock.
  167. *
  168. * @param regionName
  169. * @param lockId the lock id returned by lockRow
  170. * @throws IOException
  171. */
  172. public void unlockRow(final byte [] regionName, final long lockId)
  173. throws IOException;
  174. /**
  175. * Method used when a master is taking the place of another failed one.
  176. * @return All regions assigned on this region server
  177. * @throws IOException
  178. */
  179. public HRegionInfo[] getRegionsAssignment() throws IOException;
  180. /**
  181. * Method used when a master is taking the place of another failed one.
  182. * @return The HSI
  183. * @throws IOException
  184. */
  185. public HServerInfo getHServerInfo() throws IOException;
  186. }

 

可以看出HRegionInterface是定义了具体的向RegionServer查询的方法。

 

现在回过头来,当server这个动态代理对象实例化后,经过ServerCallable.call() 最后会调到server.get()。按照java的代理机制,又会传递到我们在构造这个动态代理对象时候传进去的new Invoker(addr, ticket, conf, factory))对象去执行具体的方法。

简单的说,这个Invoker对象使用HBase的RPC客户端跟RegionServer通信完成请求以及结果接收等等。

看看这个RPC客户端长什么样吧:

  1. public Invoker(InetSocketAddress address, UserGroupInformation ticket,
  2. Configuration conf, SocketFactory factory) {
  3. this.address = address;
  4. this.ticket = ticket;
  5. this.client = CLIENTS.getClient(conf, factory); //client就是RPC客户端
  6. }

 这个client是HBaseClient类的对象,这个HBaseClient类就是HBase中用来做RPC的客户端类。在这里HBaseClient也做了一个pool机制,不理解。。。code里面的注释如下:

      // Construct & cache client.  The configuration is only used for timeout,
      // and Clients have connection pools.  So we can either (a) lose some
      // connection pooling and leak sockets, or (b) use the same timeout for all
      // configurations.  Since the IPC is usually intended globally, not
      // per-job, we choose (a).

 

继续说下去,看这么一个client怎么完成最后的请求:

  1. public Writable call(Writable param, InetSocketAddress addr,
  2. UserGroupInformation ticket)
  3. throws IOException {
  4. Call call = new Call(param);
  5. Connection connection = getConnection(addr, ticket, call);
  6. connection.sendParam(call); // send the parameter
  7. boolean interrupted = false;
  8. synchronized (call) {
  9. while (!call.done) {
  10. try {
  11. call.wait(); // wait for the result
  12. } catch (InterruptedException ie) {
  13. // save the fact that we were interrupted
  14. interrupted = true;
  15. }
  16. }
  17. if (interrupted) {
  18. // set the interrupt flag now that we are done waiting
  19. Thread.currentThread().interrupt();
  20. }
  21. if (call.error != null) {
  22. if (call.error instanceof RemoteException) {
  23. call.error.fillInStackTrace();
  24. throw call.error;
  25. }
  26. // local exception
  27. throw wrapException(addr, call.error);
  28. }
  29. return call.value;
  30. }
  31. }

 

又见connection,这次的connection可是用来发送接收数据用的thread了。从getConnection(addr, ticket, call)推断又是一个pool,果不其然:

 

  1. /** Get a connection from the pool, or create a new one and add it to the
  2. * pool. Connections to a given host/port are reused. */
  3. private Connection getConnection(InetSocketAddress addr,
  4. UserGroupInformation ticket,
  5. Call call)
  6. throws IOException {
  7. if (!running.get()) {
  8. // the client is stopped
  9. throw new IOException("The client is stopped");
  10. }
  11. Connection connection;
  12. /* we could avoid this allocation for each RPC by having a
  13. * connectionsId object and with set() method. We need to manage the
  14. * refs for keys in HashMap properly. For now its ok.
  15. */
  16. ConnectionId remoteId = new ConnectionId(addr, ticket);
  17. do {
  18. synchronized (connections) {
  19. connection = connections.get(remoteId);
  20. if (connection == null) {
  21. connection = new Connection(remoteId);
  22. connections.put(remoteId, connection);
  23. }
  24. }
  25. } while (!connection.addCall(call));
  26. //we don't invoke the method below inside "synchronized (connections)"
  27. //block above. The reason for that is if the server happens to be slow,
  28. //it will take longer to establish a connection and that will slow the
  29. //entire system down.
  30. connection.setupIOstreams();
  31. return connection;
  32. }

 

也就是说,只要所要查询的RegionServer的addr和用户组信息一样,就会共享一个connection。connection拿到后会将当前call放进自己内部的一个队列里(维护着call的id=》call的一个映射),当call完成后会更新call的状态(主要是否完成这么一个标志Call.done以及将请求结果填充在Call.value里)。

 

好了现在的情形是,现在看connection如何发送请求数据。

  1. /** Initiates a call by sending the parameter to the remote server.
  2. * Note: this is not called from the Connection thread, but by other
  3. * threads.
  4. * @param call
  5. */
  6. public void sendParam(Call call) {
  7. if (shouldCloseConnection.get()) {
  8. return;
  9. }
  10. DataOutputBuffer d=null;
  11. try {
  12. synchronized (this.out) {
  13. if (LOG.isDebugEnabled())
  14. LOG.debug(getName() + " sending #" + call.id);
  15. //for serializing the
  16. //data to be written
  17. d = new DataOutputBuffer();
  18. d.writeInt(call.id);
  19. call.param.write(d);
  20. byte[] data = d.getData();
  21. int dataLength = d.getLength();
  22. out.writeInt(dataLength); //first put the data length
  23. out.write(data, 0, dataLength);//write the data
  24. out.flush();
  25. }
  26. } catch(IOException e) {
  27. markClosed(e);
  28. } finally {
  29. //the buffer is just an in-memory buffer, but it is still polite to
  30. // close early
  31. IOUtils.closeStream(d);
  32. }
  33. }

 

从code里面看出,请求发送是synchronized,所以会有上一篇日志里提到的问题。

HBase客户端的code先看到这里吧。

 下面这个图帮助理解一下上面各种pool



 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/829908
推荐阅读
相关标签
  

闽ICP备14008679号