当前位置:   article > 正文

双队列的一种实现

两类pcd队列csdn

介绍

双队列是一种高效的内存数据结构,在多线程编程中,能保证生产者线程的写入和消费者的读出尽量做到最低的影响,避免了共享队列的锁开销。本文将介绍一种双队列的设计,并给出实现代码,然后会举例使用的场景。该双队列在项目中使用,性能也得到了验证。

设计

接下来具体介绍双队列的设计,并且会粘贴少量方法代码,帮助介绍。
本文中讲述的双队列,本质上是两个数组保存写入的Object,一个数组负责写入,另一个被消费者读出,两个数组都对应一个重入锁。数组内写入的数据会被计数。
  1. public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
  2. BlockingQueue<T>, java.io.Serializable {
  3. private static final long serialVersionUID = 1L;
  4. private static int default_line_limit = 1000;
  5. private static long max_cache_size = 67108864L;
  6. private int lineLimit;
  7. private long cacheSize;
  8. private T[] itemsA;
  9. private T[] itemsB;
  10. private ReentrantLock readLock, writeLock;
  11. private Condition notFull;
  12. private Condition awake;
  13. /**
  14. * writeArray : in reader's eyes, reader get data from data source and write
  15. * data to this line array. readArray : in writer's eyes, writer put data to
  16. * data destination from this line array.
  17. *
  18. * Because of this is doubleQueue mechanism, the two line will exchange when
  19. * time is suitable.
  20. *
  21. */
  22. private T[] writeArray, readArray;
  23. private volatile int writeCount, readCount;
  24. private int writeArrayTP, readArrayHP;
  25. private volatile boolean closed = false;
  26. private int spillSize = 0;
  27. private long lineRx = 0;
  28. private long lineTx = 0;

队列实现了阻塞队列的接口,所以在向队列offer数据的时候是阻塞的,同样,取出操作poll也会阻塞。两个数组会在适当的时候进行queueSwitch操作。queueSwitch的条件就是当读者把queue读空了之后,且写入的queue此时不为空的时候,两个queue就会进行交换。在交换的时候,写入queue会被上锁,此时生产者不能让队列里写入数据。一般情况下,queue互换其实就是两个数组的引用互换,将相应的计数器也重置,写队列的计数器此时就清零了,因为queue交换是因为读队列已经被读空。
  1. private long queueSwitch(long timeout, boolean isInfinite)
  2. throws InterruptedException {
  3. System.out.println("queue switch");
  4. writeLock.lock();
  5. try {
  6. if (writeCount <= 0) {
  7. if (closed) {
  8. return -2;
  9. }
  10. try {
  11. if (isInfinite && timeout <= 0) {
  12. awake.await();
  13. return -1;
  14. } else {
  15. return awake.awaitNanos(timeout);
  16. }
  17. } catch (InterruptedException ie) {
  18. awake.signal();
  19. throw ie;
  20. }
  21. } else {
  22. T[] tmpArray = readArray;
  23. readArray = writeArray;
  24. writeArray = tmpArray;
  25. readCount = writeCount;
  26. readArrayHP = 0;
  27. writeCount = 0;
  28. writeArrayTP = 0;
  29. notFull.signal();
  30. // logger.debug("Queue switch successfully!");
  31. return -1;
  32. }
  33. } finally {
  34. writeLock.unlock();
  35. }
  36. }

上面queue交换的时候,可以看到当要被交换的写队列也已经为空的时候,会做一次检查。如果此时queue已经被显示地关闭了,那么poll操作就会返回空,读者此时应该检查queue是否已经被closed了,若已经closed了,那么读者已经把queue里的数据读完了。这里的显示close是我们给双队列加的一个状态,close这件事的作用是为了让读者知道:生产者已经停止往queue里写新数据了,但是queue里其实可能还有未取完的数据(在写queue里,此时还差一次queue switch),你往queue poll取数据的时候,如果取到空了,那么应该做一次check,如果queue已经关闭了,那么读者就知道本次读的任务完全结束了。反过来,close状态其实不影响写,生产者如果还想写的话,其实也是可以的,但是我不推荐这么做。
  1. public void close() {
  2. writeLock.lock();
  3. try {
  4. closed = true;
  5. //System.out.println(this);
  6. awake.signalAll();
  7. } finally {
  8. writeLock.unlock();
  9. }
  10. }

如果没有这个close标志位的话,可能就需要消费者放入一个EOF让读者知道。这在只有一个生产者和一个消费者的情况下是可行的,但是如果是一个多对一,一对多,甚至多对多的情况呢?一对一的情况是最简单的,也是双队列被创造出来最合适的场景。因为双队列完全分离了一个生产者和一个消费者的锁争抢情况,各自只要获得自己的读/写队列的锁就可以了。在本文阐述的双队列中,唯一产生一些开销的就是queue swtich的情况,如果queue频繁交换的话,还是会产生一些性能开销的。


一对多

上面已经大致介绍了双队列的读写。在实际项目中,一对多的场景需要注意的地方有两:
  • 单个生产者需要在结束的时候关闭queue
  • 多个消费者需要知道任务结束(知道其他线程已经完成任务)
第一点很简单,比如读文件的话,当生产者readLine()最后为空的时候,就认为数据源已经读完,调用方法把queue close()。而消费者在读queue的时候,有时候可能会由于延迟、queue交换等原因取到空数据,此时就如上面一节所说,消费者线程拿到空数据后应该检查queue的状态,如果queue没有关闭,那么应该等待一小会儿后继续poll数据;如果queue关闭了,那么其实说明该线程已经完成了任务。同理,其他消费者线程也应该在取到空的时候做这样的操作。


消费者之间或者外部有一方需要知道各个消费者线程的存活情况,这样才能知道本次任务完成。比如如果外面有一个上帝的话,可以加一个CountDownLatch计数,每个消费者完成后就countDown一次,外部调用await()直到大家都已经退出,那么整个任务结束。如果没有上帝,线程之间互相知道对方情况的话,我的做法是让生产者放入一个EOF,当某线程取到EOF的时候,他知道自己是第一个遇到尽头的人,他会置一个布尔,而其他线程在取到空的时候会检查该布尔值,这样就能知道是否已经有小伙伴已经拿到EOF了,那么这时候就可以countDown了,而拿到EOF的线程进程countDown后就await(),最后退出。
下面是我自己针对这种场景,使用双队列的方式,其中的fromQueue是一个ConcurrentLinkedQueue,大家可以忽略,toQueue是双队列,可以注意一下用法。特别是往里面写的时候,需要while循环重试直到写入成功。
  1. @Override
  2. public void run() {
  3. long start = System.currentTimeMillis();
  4. log.debug(Thread.currentThread() + " Unpacker started at " + start);
  5. Random r = new Random(start);
  6. Bundle bundle = null;
  7. boolean shoudShutdown = false;
  8. try {
  9. while(!shoudShutdown) {
  10. bundle = (Bundle) fromQueue.poll();
  11. if (bundle == null) {
  12. if (seeEOF.get()) {
  13. // 当取到空,并且其他线程已经取到EOF,那么本线程将Latch减1,并退出循环
  14. latch.countDown();
  15. shoudShutdown = true;
  16. } else {
  17. // 如果EOF还没被取到,本线程小睡一会后继续取
  18. try {
  19. sleep(r.nextInt(10));
  20. } catch (InterruptedException e) {
  21. log.error("Interrupted when taking a nap", e);
  22. }
  23. }
  24. } else if (!bundle.isEof()) {
  25. // bundle非空且非EOF,则往双队列写入一个Bundle
  26. byte[] lineBytes = BundleUtil.getDecompressedData(bundle);
  27. // 放入双队列时,若offer失败则重试
  28. while (!toQueue.offer(new UnCompressedBundle(bundle.getId(), ByteUtil.bytes2Lines(lineBytes, lineDelim), bundle.getIndex(), bundle.getJobId()))) {
  29. log.info("Unpacker put failed, will retry");
  30. }
  31. log.info("After enqueue, queue size is " + toQueue.size());
  32. } else {
  33. // Unpacker获得到了EOF
  34. seeEOF.set(true);
  35. // 自己将Lacth减1,并等待其他线程退出
  36. latch.countDown();
  37. try {
  38. latch.await();
  39. } catch (InterruptedException e) {
  40. log.error("Interrupted when waiting the latch ");
  41. }
  42. // 其他线程已经退出,本线程放入EOF
  43. while (!toQueue.offer(new UnCompressedBundle(-1L, new Line[0], -1L, -1L))) {
  44. log.info("Unpacker put EOF failed, will retry");
  45. }
  46. // 关闭Queue
  47. toQueue.close();
  48. // 退出循环
  49. shoudShutdown = true;
  50. }
  51. }
  52. log.debug(Thread.currentThread() + " Unpacker finished in " + (System.currentTimeMillis()-start) + " ms");
  53. } catch (Exception e) {
  54. log.error("Exception when unpacker is running ", e);
  55. // 将latch减1,表示自己异常退出,且不再工作
  56. // latch.countDown();
  57. log.debug(Thread.currentThread() + " Unpacker occured exception and stopped. ");
  58. } finally {
  59. }
  60. }

多对一

多个生产者的情况下,写入队列无可避免发送锁争抢,但是能保证消费者的稳定读出过程。没有什么特殊处理的地方,这里就不啰嗦了。

总结分析

本文介绍了一种经典双队列的设计和实现,也给出了一些代码演示。文章末尾我会贴出整个双队列的代码实现,需要的同学也可以留言,我把.java发给你。如果使用的时候有发现问题,不吝赐教,这个双队列的实现也还不是很完美。使用的时候也存在需要注意的地方。
其实双队列的目的还是在于让写和读互相没有影响,而且更加照顾了写的速度。因为一般写的速度可能会比较快,而读的人读出之后还会做一些额外的处理,所以写的这一方借助双队列,可以持续写的过程,而且如果读的一方慢的话,可以多起几个消费者线程,就像"一对多"场景里阐述的那样来使用双队列。

下面是整个实现。各位可以仔细看看,发现问题一定记得通知我 :)

  1. import java.util.AbstractQueue;
  2. import java.util.Collection;
  3. import java.util.Iterator;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.TimeUnit;
  6. import java.util.concurrent.locks.Condition;
  7. import java.util.concurrent.locks.ReentrantLock;
  8. import lombok.ToString;
  9. import lombok.extern.log4j.Log4j;
  10. /**
  11. * Represents a region with two swap spaces, one for storing data which from
  12. * data source, the other one for storing data which will be transferred to data
  13. * destination.
  14. * <br>
  15. * A classical DoubleCachedQueue, In beginning, space A and space B both
  16. * empty, then loading task begin to load data to space A, when A is almost
  17. * full, let the data from data source being loaded to space B, then dumping
  18. * task begin to dump data from space A to data source. When space A is empty,
  19. * switch the two spaces for load and dump task. Repeat the above operation.
  20. *
  21. */
  22. @Log4j
  23. @ToString
  24. public class DoubleCachedQueue<T> extends AbstractQueue<T> implements
  25. BlockingQueue<T>, java.io.Serializable {
  26. private static final long serialVersionUID = 1L;
  27. private static int default_line_limit = 1000;
  28. private static long max_cache_size = 67108864L;
  29. private int lineLimit;
  30. private long cacheSize;
  31. private T[] itemsA;
  32. private T[] itemsB;
  33. private ReentrantLock readLock, writeLock;
  34. private Condition notFull;
  35. private Condition awake;
  36. /**
  37. * writeArray : in reader's eyes, reader get data from data source and write
  38. * data to this line array. readArray : in writer's eyes, writer put data to
  39. * data destination from this line array.
  40. *
  41. * Because of this is doubleQueue mechanism, the two line will exchange when
  42. * time is suitable.
  43. *
  44. */
  45. private T[] writeArray, readArray;
  46. private volatile int writeCount, readCount;
  47. private int writeArrayTP, readArrayHP;
  48. private volatile boolean closed = false;
  49. private int spillSize = 0;
  50. private long lineRx = 0;
  51. private long lineTx = 0;
  52. /**
  53. * Get info of line number in {@link DoubleCachedQueue} space.
  54. *
  55. * @return Information of line number.
  56. *
  57. */
  58. public String info() {
  59. return String.format("Write Array: %s/%s; Read Array: %s/%s", writeCount, writeArray.length, readCount, readArray.length);
  60. }
  61. /**
  62. * Use the two parameters to construct a {@link DoubleCachedQueue} which hold the
  63. * swap areas.
  64. *
  65. * @param lineLimit
  66. * Limit of the line number the {@link DoubleCachedQueue} can hold.
  67. *
  68. * @param byteLimit
  69. * Limit of the bytes the {@link DoubleCachedQueue} can hold.
  70. *
  71. */
  72. public DoubleCachedQueue(int lineLimit) {
  73. if (lineLimit <= 0) {
  74. this.lineLimit = default_line_limit;
  75. }else{
  76. this.lineLimit = lineLimit;
  77. }
  78. itemsA = (T[])new Object[lineLimit];
  79. itemsB = (T[])new Object[lineLimit];
  80. readLock = new ReentrantLock();
  81. writeLock = new ReentrantLock();
  82. notFull = writeLock.newCondition();
  83. awake = writeLock.newCondition();
  84. readArray = itemsA;
  85. writeArray = itemsB;
  86. spillSize = lineLimit * 8 / 10;
  87. }
  88. public DoubleCachedQueue(long cacheSize){
  89. if (cacheSize <= 0) {
  90. throw new IllegalArgumentException(
  91. "Queue initial capacity can't less than 0!");
  92. }
  93. this.cacheSize = cacheSize > max_cache_size ? max_cache_size : cacheSize;
  94. readLock = new ReentrantLock();
  95. writeLock = new ReentrantLock();
  96. notFull = writeLock.newCondition();
  97. awake = writeLock.newCondition();
  98. readArray = itemsA;
  99. writeArray = itemsB;
  100. spillSize = lineLimit * 8 / 10;
  101. }
  102. /**
  103. * Get line number of the {@link DoubleCachedQueue}
  104. *
  105. * @return lineLimit Limit of the line number the {@link DoubleCachedQueue} can
  106. * hold.
  107. *
  108. */
  109. public int getLineLimit() {
  110. return lineLimit;
  111. }
  112. /**
  113. * Set line number of the {@link DoubleCachedQueue}.
  114. *
  115. * @param capacity
  116. * Limit of the line number the {@link DoubleCachedQueue} can hold.
  117. *
  118. */
  119. public void setLineLimit(int capacity) {
  120. this.lineLimit = capacity;
  121. }
  122. /**
  123. * Insert one line of record to a apace which buffers the swap data.
  124. *
  125. * @param line
  126. * The inserted line.
  127. *
  128. */
  129. private void insert(T line) {
  130. writeArray[writeArrayTP] = line;
  131. ++writeArrayTP;
  132. ++writeCount;
  133. ++lineRx;
  134. }
  135. /**
  136. * Insert a line array(appointed the limit of array size) of data to a apace
  137. * which buffers the swap data.
  138. *
  139. * @param lines
  140. * Inserted line array.
  141. *
  142. * @param size
  143. * Limit of inserted size of the line array.
  144. *
  145. */
  146. private void insert(T[] lines, int size) {
  147. if(size > 0){
  148. System.arraycopy(lines, 0, writeArray, writeArrayTP, size);
  149. writeArrayTP = writeArrayTP + size;
  150. writeCount = writeCount + size;
  151. lineRx = lineRx + size;
  152. }
  153. // for (int i = 0; i < size; ++i) {
  154. // writeArray[writeArrayTP] = lines[i];
  155. // ++writeArrayTP;
  156. // ++writeCount;
  157. // ++lineRx;
  158. // if(lines[i] != null && lines[i].getLine() != null){
  159. // byteRx += lines[i].getLine().length();
  160. // }
  161. // }
  162. }
  163. /**
  164. * Extract one line of record from the space which contains current data.
  165. *
  166. * @return line A line of data.
  167. *
  168. */
  169. private T extract() {
  170. T e = readArray[readArrayHP];
  171. readArray[readArrayHP] = null;
  172. ++readArrayHP;
  173. --readCount;
  174. ++lineTx;
  175. return e;
  176. }
  177. /**
  178. * Extract a line array of data from the space which contains current data.
  179. *
  180. * @param ea
  181. * @return Extracted line number of data.
  182. *
  183. */
  184. private int extract(T[] ea) {
  185. int readsize = Math.min(ea.length, readCount);
  186. if(readsize > 0){
  187. readCount = readCount - readsize;
  188. lineTx = lineTx + readsize;
  189. System.arraycopy(readArray, readArrayHP, ea, 0, readsize);
  190. readArrayHP = readArrayHP + readsize;
  191. }
  192. // for (int i = 0; i < readsize; ++i) {
  193. // ea[i] = readArray[readArrayHP];
  194. // readArray[readArrayHP] = null;
  195. // ++readArrayHP;
  196. // --readCount;
  197. // ++lineTx;
  198. // }
  199. return readsize;
  200. }
  201. /**
  202. * switch condition: read queue is empty && write queue is not empty.
  203. * Notice:This function can only be invoked after readLock is grabbed,or may
  204. * cause dead lock.
  205. *
  206. * @param timeout
  207. *
  208. * @param isInfinite
  209. * whether need to wait forever until some other thread awake it.
  210. *
  211. * @return
  212. *
  213. * @throws InterruptedException
  214. *
  215. */
  216. private long queueSwitch(long timeout, boolean isInfinite)
  217. throws InterruptedException {
  218. System.out.println("queue switch");
  219. writeLock.lock();
  220. try {
  221. if (writeCount <= 0) {
  222. if (closed) {
  223. return -2;
  224. }
  225. try {
  226. if (isInfinite && timeout <= 0) {
  227. awake.await();
  228. return -1;
  229. } else {
  230. return awake.awaitNanos(timeout);
  231. }
  232. } catch (InterruptedException ie) {
  233. awake.signal();
  234. throw ie;
  235. }
  236. } else {
  237. T[] tmpArray = readArray;
  238. readArray = writeArray;
  239. writeArray = tmpArray;
  240. readCount = writeCount;
  241. readArrayHP = 0;
  242. writeCount = 0;
  243. writeArrayTP = 0;
  244. notFull.signal();
  245. // logger.debug("Queue switch successfully!");
  246. return -1;
  247. }
  248. } finally {
  249. writeLock.unlock();
  250. }
  251. }
  252. /**
  253. * If exists write space, it will return true, and write one line to the
  254. * space. otherwise, it will try to do that in a appointed time,when time is
  255. * out if still failed, return false.
  256. *
  257. * @param line
  258. * a Line.
  259. *
  260. * @param timeout
  261. * appointed limit time
  262. *
  263. * @param unit
  264. * time unit
  265. *
  266. * @return True if success,False if failed.
  267. *
  268. */
  269. public boolean offer(T line, long timeout, TimeUnit unit)
  270. throws InterruptedException {
  271. if (line == null) {
  272. throw new NullPointerException();
  273. }
  274. long nanoTime = unit.toNanos(timeout);
  275. writeLock.lockInterruptibly();
  276. if(itemsA == null || itemsB == null){
  277. initArray(line);
  278. }
  279. try {
  280. for (;;) {
  281. if (writeCount < writeArray.length) {
  282. insert(line);
  283. if (writeCount == 1) {
  284. awake.signal();
  285. }
  286. return true;
  287. }
  288. // Time out
  289. if (nanoTime <= 0) {
  290. return false;
  291. }
  292. // keep waiting
  293. try {
  294. nanoTime = notFull.awaitNanos(nanoTime);
  295. } catch (InterruptedException ie) {
  296. notFull.signal();
  297. throw ie;
  298. }
  299. }
  300. } finally {
  301. writeLock.unlock();
  302. }
  303. }
  304. private void initArray(T line) {
  305. long recordLength = computeSize(line);
  306. long size = cacheSize/recordLength;
  307. if(size <= 0){
  308. size = default_line_limit;
  309. }
  310. lineLimit = (int) size;
  311. itemsA = (T[])new Object[(int) size];
  312. itemsB = (T[])new Object[(int) size];
  313. readArray = itemsA;
  314. writeArray = itemsB;
  315. }
  316. public long computeSize(T line){
  317. return 1;
  318. }
  319. /**
  320. * If exists write space, it will return true, and write a line array to the
  321. * space.<br>
  322. * otherwise, it will try to do that in a appointed time,when time out if
  323. * still failed, return false.
  324. *
  325. * @param lines
  326. * line array contains lines of data
  327. *
  328. * @param size
  329. * Line number needs to write to the space.
  330. *
  331. * @param timeout
  332. * appointed limit time
  333. *
  334. * @param unit
  335. * time unit
  336. *
  337. * @return status of this operation, true or false.
  338. *
  339. * @throws InterruptedException
  340. * if being interrupted during the try limit time.
  341. *
  342. */
  343. public boolean offer(T[] lines, int size, long timeout, TimeUnit unit)
  344. throws InterruptedException {
  345. if (lines == null || lines.length == 0) {
  346. throw new NullPointerException();
  347. }
  348. long nanoTime = unit.toNanos(timeout);
  349. writeLock.lockInterruptibly();
  350. if(itemsA == null || itemsB == null){
  351. initArray(lines[0]);
  352. }
  353. try {
  354. for (;;) {
  355. if (writeCount + size <= writeArray.length) {
  356. insert(lines, size);
  357. if (writeCount >= spillSize) {
  358. awake.signalAll();
  359. }
  360. return true;
  361. }
  362. // Time out
  363. if (nanoTime <= 0) {
  364. return false;
  365. }
  366. // keep waiting
  367. try {
  368. nanoTime = notFull.awaitNanos(nanoTime);
  369. } catch (InterruptedException ie) {
  370. notFull.signal();
  371. throw ie;
  372. }
  373. }
  374. } finally {
  375. writeLock.unlock();
  376. }
  377. }
  378. /**
  379. * Close the synchronized lock and one inner state.
  380. *
  381. */
  382. public void close() {
  383. writeLock.lock();
  384. try {
  385. closed = true;
  386. //System.out.println(this);
  387. awake.signalAll();
  388. } finally {
  389. writeLock.unlock();
  390. }
  391. }
  392. public boolean isClosed() {
  393. return closed;
  394. }
  395. /**
  396. *
  397. *
  398. * @param timeout
  399. * appointed limit time
  400. *
  401. * @param unit
  402. * time unit
  403. */
  404. public T poll(long timeout, TimeUnit unit) throws InterruptedException {
  405. long nanoTime = unit.toNanos(timeout);
  406. readLock.lockInterruptibly();
  407. try {
  408. for (;;) {
  409. if (readCount > 0) {
  410. return extract();
  411. }
  412. if (nanoTime <= 0) {
  413. return null;
  414. }
  415. nanoTime = queueSwitch(nanoTime, true);
  416. }
  417. } finally {
  418. readLock.unlock();
  419. }
  420. }
  421. /**
  422. *
  423. * @param ea
  424. * line buffer
  425. *
  426. *
  427. * @param timeout
  428. * a appointed limit time
  429. *
  430. * @param unit
  431. * a time unit
  432. *
  433. * @return line number of data.if less or equal than 0, means fail.
  434. *
  435. * @throws InterruptedException
  436. * if being interrupted during the try limit time.
  437. */
  438. public int poll(T[] ea, long timeout, TimeUnit unit)
  439. throws InterruptedException {
  440. long nanoTime = unit.toNanos(timeout);
  441. readLock.lockInterruptibly();
  442. try {
  443. for (;;) {
  444. if (readCount > 0) {
  445. return extract(ea);
  446. }
  447. if (nanoTime == -2) {
  448. return -1;
  449. }
  450. if (nanoTime <= 0) {
  451. return 0;
  452. }
  453. nanoTime = queueSwitch(nanoTime, false);
  454. }
  455. } finally {
  456. readLock.unlock();
  457. }
  458. }
  459. public Iterator<T> iterator() {
  460. return null;
  461. }
  462. /**
  463. * Get size of {@link Storage} in bytes.
  464. *
  465. * @return Storage size.
  466. *
  467. * */
  468. @Override
  469. public int size() {
  470. return (writeCount + readCount);
  471. }
  472. @Override
  473. public int drainTo(Collection<? super T> c) {
  474. return 0;
  475. }
  476. @Override
  477. public int drainTo(Collection<? super T> c, int maxElements) {
  478. return 0;
  479. }
  480. /**
  481. * If exists write space, it will return true, and write one line to the
  482. * space.<br>
  483. * otherwise, it will try to do that in a appointed time(20
  484. * milliseconds),when time out if still failed, return false.
  485. *
  486. * @param line
  487. * a Line.
  488. *
  489. * @see DoubleCachedQueue#offer(Line, long, TimeUnit)
  490. *
  491. */
  492. @Override
  493. public boolean offer(T line) {
  494. try {
  495. return offer(line, 20, TimeUnit.MILLISECONDS);
  496. } catch (InterruptedException e1) {
  497. log.debug(e1.getMessage(), e1);
  498. }
  499. return false;
  500. }
  501. @Override
  502. public void put(T e) throws InterruptedException {
  503. }
  504. @Override
  505. public int remainingCapacity() {
  506. return 0;
  507. }
  508. @Override
  509. public T take() throws InterruptedException {
  510. return null;
  511. }
  512. @Override
  513. public T peek() {
  514. return null;
  515. }
  516. @Override
  517. public T poll() {
  518. try {
  519. return poll(1*1000, TimeUnit.MILLISECONDS);
  520. } catch (InterruptedException e) {
  521. log.debug(e.getMessage(), e);
  522. }
  523. return null;
  524. }
  525. }


(全文完)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/891796?site
推荐阅读
相关标签
  

闽ICP备14008679号