diff options
Diffstat (limited to 'luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java')
-rw-r--r-- | luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java | 849 |
1 files changed, 709 insertions, 140 deletions
diff --git a/luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java b/luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java index a622832..e30ab67 100644 --- a/luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java +++ b/luni/src/main/java/java/util/concurrent/ArrayBlockingQueue.java @@ -1,12 +1,17 @@ /* * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain + * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; -import java.util.concurrent.locks.*; -import java.util.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.lang.ref.WeakReference; // BEGIN android-note // removed link to collections framework docs @@ -73,11 +78,20 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> /** Main lock guarding all access */ final ReentrantLock lock; + /** Condition for waiting takes */ private final Condition notEmpty; + /** Condition for waiting puts */ private final Condition notFull; + /** + * Shared state for currently active iterators, or null if there + * are known not to be any. Allows queue operations to update + * iterator state. + */ + transient Itrs itrs = null; + // Internal helper methods /** @@ -94,16 +108,12 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> return ((i == 0) ? items.length : i) - 1; } - @SuppressWarnings("unchecked") - static <E> E cast(Object item) { - return (E) item; - } - /** * Returns item at index i. */ final E itemAt(int i) { - return this.<E>cast(items[i]); + @SuppressWarnings("unchecked") E x = (E) items[i]; + return x; } /** @@ -120,10 +130,12 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ - private void insert(E x) { + private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; items[putIndex] = x; putIndex = inc(putIndex); - ++count; + count++; notEmpty.signal(); } @@ -131,42 +143,57 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ - private E extract() { + private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; final Object[] items = this.items; - E x = this.<E>cast(items[takeIndex]); + @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; takeIndex = inc(takeIndex); - --count; + count--; + if (itrs != null) + itrs.elementDequeued(); notFull.signal(); return x; } /** - * Deletes item at position i. - * Utility for remove and iterator.remove. + * Deletes item at array index removeIndex. + * Utility for remove(Object) and iterator.remove. * Call only when holding lock. */ - void removeAt(int i) { + void removeAt(final int removeIndex) { + // assert lock.getHoldCount() == 1; + // assert items[removeIndex] != null; + // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; - // if removing front item, just advance - if (i == takeIndex) { + if (removeIndex == takeIndex) { + // removing front item; just advance items[takeIndex] = null; takeIndex = inc(takeIndex); + count--; + if (itrs != null) + itrs.elementDequeued(); } else { + // an "interior" remove + // slide over all others up through putIndex. - for (;;) { - int nexti = inc(i); - if (nexti != putIndex) { - items[i] = items[nexti]; - i = nexti; + final int putIndex = this.putIndex; + for (int i = removeIndex;;) { + int next = inc(i); + if (next != putIndex) { + items[i] = items[next]; + i = next; } else { items[i] = null; - putIndex = i; + this.putIndex = i; break; } } + count--; + if (itrs != null) + itrs.removedAt(removeIndex); } - --count; notFull.signal(); } @@ -271,7 +298,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> if (count == items.length) return false; else { - insert(e); + enqueue(e); return true; } } finally { @@ -293,7 +320,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> try { while (count == items.length) notFull.await(); - insert(e); + enqueue(e); } finally { lock.unlock(); } @@ -320,7 +347,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> return false; nanos = notFull.awaitNanos(nanos); } - insert(e); + enqueue(e); return true; } finally { lock.unlock(); @@ -331,7 +358,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> final ReentrantLock lock = this.lock; lock.lock(); try { - return (count == 0) ? null : extract(); + return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } @@ -343,7 +370,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> try { while (count == 0) notEmpty.await(); - return extract(); + return dequeue(); } finally { lock.unlock(); } @@ -359,7 +386,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> return null; nanos = notEmpty.awaitNanos(nanos); } - return extract(); + return dequeue(); } finally { lock.unlock(); } @@ -438,11 +465,15 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> final ReentrantLock lock = this.lock; lock.lock(); try { - for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { - if (o.equals(items[i])) { - removeAt(i); - return true; - } + if (count > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + if (o.equals(items[i])) { + removeAt(i); + return true; + } + } while ((i = inc(i)) != putIndex); } return false; } finally { @@ -464,9 +495,14 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> final ReentrantLock lock = this.lock; lock.lock(); try { - for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) - if (o.equals(items[i])) - return true; + if (count > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + if (o.equals(items[i])) + return true; + } while ((i = inc(i)) != putIndex); + } return false; } finally { lock.unlock(); @@ -522,8 +558,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> * The following code can be used to dump the queue into a newly * allocated array of {@code String}: * - * <pre> - * String[] y = x.toArray(new String[0]);</pre> + * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> * * Note that {@code toArray(new Object[0])} is identical in function to * {@code toArray()}. @@ -589,12 +624,20 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> final ReentrantLock lock = this.lock; lock.lock(); try { - for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) - items[i] = null; - count = 0; - putIndex = 0; - takeIndex = 0; - notFull.signalAll(); + int k = count; + if (k > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + items[i] = null; + } while ((i = inc(i)) != putIndex); + takeIndex = putIndex; + count = 0; + if (itrs != null) + itrs.queueIsEmpty(); + for (; k > 0 && lock.hasWaiters(notFull); k--) + notFull.signal(); + } } finally { lock.unlock(); } @@ -607,32 +650,7 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { - checkNotNull(c); - if (c == this) - throw new IllegalArgumentException(); - final Object[] items = this.items; - final ReentrantLock lock = this.lock; - lock.lock(); - try { - int i = takeIndex; - int n = 0; - int max = count; - while (n < max) { - c.add(this.<E>cast(items[i])); - items[i] = null; - i = inc(i); - ++n; - } - if (n > 0) { - count = 0; - putIndex = 0; - takeIndex = 0; - notFull.signalAll(); - } - return n; - } finally { - lock.unlock(); - } + return drainTo(c, Integer.MAX_VALUE); } /** @@ -651,21 +669,33 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> final ReentrantLock lock = this.lock; lock.lock(); try { - int i = takeIndex; - int n = 0; - int max = (maxElements < count) ? maxElements : count; - while (n < max) { - c.add(this.<E>cast(items[i])); - items[i] = null; - i = inc(i); - ++n; - } - if (n > 0) { - count -= n; - takeIndex = i; - notFull.signalAll(); + int n = Math.min(maxElements, count); + int take = takeIndex; + int i = 0; + try { + while (i < n) { + @SuppressWarnings("unchecked") E x = (E) items[take]; + c.add(x); + items[take] = null; + take = inc(take); + i++; + } + return n; + } finally { + // Restore invariants even if c.add() threw + if (i > 0) { + count -= i; + takeIndex = take; + if (itrs != null) { + if (count == 0) + itrs.queueIsEmpty(); + else if (i > take) + itrs.takeIndexWrapped(); + } + for (; i > 0 && lock.hasWaiters(notFull); i--) + notFull.signal(); + } } - return n; } finally { lock.unlock(); } @@ -675,12 +705,12 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> * Returns an iterator over the elements in this queue in proper sequence. * The elements will be returned in order from first (head) to last (tail). * - * <p>The returned {@code Iterator} is a "weakly consistent" iterator that + * <p>The returned iterator is a "weakly consistent" iterator that * will never throw {@link java.util.ConcurrentModificationException - * ConcurrentModificationException}, - * and guarantees to traverse elements as they existed upon - * construction of the iterator, and may (but is not guaranteed to) - * reflect any modifications subsequent to construction. + * ConcurrentModificationException}, and guarantees to traverse + * elements as they existed upon construction of the iterator, and + * may (but is not guaranteed to) reflect any modifications + * subsequent to construction. * * @return an iterator over the elements in this queue in proper sequence */ @@ -689,88 +719,627 @@ public class ArrayBlockingQueue<E> extends AbstractQueue<E> } /** - * Iterator for ArrayBlockingQueue. To maintain weak consistency - * with respect to puts and takes, we (1) read ahead one slot, so - * as to not report hasNext true but then not have an element to - * return -- however we later recheck this slot to use the most - * current value; (2) ensure that each array slot is traversed at - * most once (by tracking "remaining" elements); (3) skip over - * null slots, which can occur if takes race ahead of iterators. - * However, for circular array-based queues, we cannot rely on any - * well established definition of what it means to be weakly - * consistent with respect to interior removes since these may - * require slot overwrites in the process of sliding elements to - * cover gaps. So we settle for resiliency, operating on - * established apparent nexts, which may miss some elements that - * have moved between calls to next. + * Shared data between iterators and their queue, allowing queue + * modifications to update iterators when elements are removed. + * + * This adds a lot of complexity for the sake of correctly + * handling some uncommon operations, but the combination of + * circular-arrays and supporting interior removes (i.e., those + * not at head) would cause iterators to sometimes lose their + * places and/or (re)report elements they shouldn't. To avoid + * this, when a queue has one or more iterators, it keeps iterator + * state consistent by: + * + * (1) keeping track of the number of "cycles", that is, the + * number of times takeIndex has wrapped around to 0. + * (2) notifying all iterators via the callback removedAt whenever + * an interior element is removed (and thus other elements may + * be shifted). + * + * These suffice to eliminate iterator inconsistencies, but + * unfortunately add the secondary responsibility of maintaining + * the list of iterators. We track all active iterators in a + * simple linked list (accessed only when the queue's lock is + * held) of weak references to Itr. The list is cleaned up using + * 3 different mechanisms: + * + * (1) Whenever a new iterator is created, do some O(1) checking for + * stale list elements. + * + * (2) Whenever takeIndex wraps around to 0, check for iterators + * that have been unused for more than one wrap-around cycle. + * + * (3) Whenever the queue becomes empty, all iterators are notified + * and this entire data structure is discarded. + * + * So in addition to the removedAt callback that is necessary for + * correctness, iterators have the shutdown and takeIndexWrapped + * callbacks that help remove stale iterators from the list. + * + * Whenever a list element is examined, it is expunged if either + * the GC has determined that the iterator is discarded, or if the + * iterator reports that it is "detached" (does not need any + * further state updates). Overhead is maximal when takeIndex + * never advances, iterators are discarded before they are + * exhausted, and all removals are interior removes, in which case + * all stale iterators are discovered by the GC. But even in this + * case we don't increase the amortized complexity. + * + * Care must be taken to keep list sweeping methods from + * reentrantly invoking another such method, causing subtle + * corruption bugs. + */ + class Itrs { + + /** + * Node in a linked list of weak iterator references. + */ + private class Node extends WeakReference<Itr> { + Node next; + + Node(Itr iterator, Node next) { + super(iterator); + this.next = next; + } + } + + /** Incremented whenever takeIndex wraps around to 0 */ + int cycles = 0; + + /** Linked list of weak iterator references */ + private Node head; + + /** Used to expunge stale iterators */ + private Node sweeper = null; + + private static final int SHORT_SWEEP_PROBES = 4; + private static final int LONG_SWEEP_PROBES = 16; + + Itrs(Itr initial) { + register(initial); + } + + /** + * Sweeps itrs, looking for and expunging stale iterators. + * If at least one was found, tries harder to find more. + * Called only from iterating thread. + * + * @param tryHarder whether to start in try-harder mode, because + * there is known to be at least one iterator to collect + */ + void doSomeSweeping(boolean tryHarder) { + // assert lock.getHoldCount() == 1; + // assert head != null; + int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; + Node o, p; + final Node sweeper = this.sweeper; + boolean passedGo; // to limit search to one full sweep + + if (sweeper == null) { + o = null; + p = head; + passedGo = true; + } else { + o = sweeper; + p = o.next; + passedGo = false; + } + + for (; probes > 0; probes--) { + if (p == null) { + if (passedGo) + break; + o = null; + p = head; + passedGo = true; + } + final Itr it = p.get(); + final Node next = p.next; + if (it == null || it.isDetached()) { + // found a discarded/exhausted iterator + probes = LONG_SWEEP_PROBES; // "try harder" + // unlink p + p.clear(); + p.next = null; + if (o == null) { + head = next; + if (next == null) { + // We've run out of iterators to track; retire + itrs = null; + return; + } + } + else + o.next = next; + } else { + o = p; + } + p = next; + } + + this.sweeper = (p == null) ? null : o; + } + + /** + * Adds a new iterator to the linked list of tracked iterators. + */ + void register(Itr itr) { + // assert lock.getHoldCount() == 1; + head = new Node(itr, head); + } + + /** + * Called whenever takeIndex wraps around to 0. + * + * Notifies all iterators, and expunges any that are now stale. + */ + void takeIndexWrapped() { + // assert lock.getHoldCount() == 1; + cycles++; + for (Node o = null, p = head; p != null;) { + final Itr it = p.get(); + final Node next = p.next; + if (it == null || it.takeIndexWrapped()) { + // unlink p + // assert it == null || it.isDetached(); + p.clear(); + p.next = null; + if (o == null) + head = next; + else + o.next = next; + } else { + o = p; + } + p = next; + } + if (head == null) // no more iterators to track + itrs = null; + } + + /** + * Called whenever an interior remove (not at takeIndex) occured. + * + * Notifies all iterators, and expunges any that are now stale. + */ + void removedAt(int removedIndex) { + for (Node o = null, p = head; p != null;) { + final Itr it = p.get(); + final Node next = p.next; + if (it == null || it.removedAt(removedIndex)) { + // unlink p + // assert it == null || it.isDetached(); + p.clear(); + p.next = null; + if (o == null) + head = next; + else + o.next = next; + } else { + o = p; + } + p = next; + } + if (head == null) // no more iterators to track + itrs = null; + } + + /** + * Called whenever the queue becomes empty. + * + * Notifies all active iterators that the queue is empty, + * clears all weak refs, and unlinks the itrs datastructure. + */ + void queueIsEmpty() { + // assert lock.getHoldCount() == 1; + for (Node p = head; p != null; p = p.next) { + Itr it = p.get(); + if (it != null) { + p.clear(); + it.shutdown(); + } + } + head = null; + itrs = null; + } + + /** + * Called whenever an element has been dequeued (at takeIndex). + */ + void elementDequeued() { + // assert lock.getHoldCount() == 1; + if (count == 0) + queueIsEmpty(); + else if (takeIndex == 0) + takeIndexWrapped(); + } + } + + /** + * Iterator for ArrayBlockingQueue. + * + * To maintain weak consistency with respect to puts and takes, we + * read ahead one slot, so as to not report hasNext true but then + * not have an element to return. + * + * We switch into "detached" mode (allowing prompt unlinking from + * itrs without help from the GC) when all indices are negative, or + * when hasNext returns false for the first time. This allows the + * iterator to track concurrent updates completely accurately, + * except for the corner case of the user calling Iterator.remove() + * after hasNext() returned false. Even in this case, we ensure + * that we don't remove the wrong element by keeping track of the + * expected element to remove, in lastItem. Yes, we may fail to + * remove lastItem from the queue if it moved due to an interleaved + * interior remove while in detached mode. */ private class Itr implements Iterator<E> { - private int remaining; // Number of elements yet to be returned - private int nextIndex; // Index of element to be returned by next - private E nextItem; // Element to be returned by next call to next - private E lastItem; // Element returned by last call to next - private int lastRet; // Index of last element returned, or -1 if none + /** Index to look for new nextItem; NONE at end */ + private int cursor; + + /** Element to be returned by next call to next(); null if none */ + private E nextItem; + + /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ + private int nextIndex; + + /** Last element returned; null if none or not detached. */ + private E lastItem; + + /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ + private int lastRet; + + /** Previous value of takeIndex, or DETACHED when detached */ + private int prevTakeIndex; + + /** Previous value of iters.cycles */ + private int prevCycles; + + /** Special index value indicating "not available" or "undefined" */ + private static final int NONE = -1; + + /** + * Special index value indicating "removed elsewhere", that is, + * removed by some operation other than a call to this.remove(). + */ + private static final int REMOVED = -2; + + /** Special value for prevTakeIndex indicating "detached mode" */ + private static final int DETACHED = -3; Itr() { + // assert lock.getHoldCount() == 0; + lastRet = NONE; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { - lastRet = -1; - if ((remaining = count) > 0) + if (count == 0) { + // assert itrs == null; + cursor = NONE; + nextIndex = NONE; + prevTakeIndex = DETACHED; + } else { + final int takeIndex = ArrayBlockingQueue.this.takeIndex; + prevTakeIndex = takeIndex; nextItem = itemAt(nextIndex = takeIndex); + cursor = incCursor(takeIndex); + if (itrs == null) { + itrs = new Itrs(this); + } else { + itrs.register(this); // in this order + itrs.doSomeSweeping(false); + } + prevCycles = itrs.cycles; + // assert takeIndex >= 0; + // assert prevTakeIndex == takeIndex; + // assert nextIndex >= 0; + // assert nextItem != null; + } } finally { lock.unlock(); } } + boolean isDetached() { + // assert lock.getHoldCount() == 1; + return prevTakeIndex < 0; + } + + private int incCursor(int index) { + // assert lock.getHoldCount() == 1; + index = inc(index); + if (index == putIndex) + index = NONE; + return index; + } + + /** + * Returns true if index is invalidated by the given number of + * dequeues, starting from prevTakeIndex. + */ + private boolean invalidated(int index, int prevTakeIndex, + long dequeues, int length) { + if (index < 0) + return false; + int distance = index - prevTakeIndex; + if (distance < 0) + distance += length; + return dequeues > distance; + } + + /** + * Adjusts indices to incorporate all dequeues since the last + * operation on this iterator. Call only from iterating thread. + */ + private void incorporateDequeues() { + // assert lock.getHoldCount() == 1; + // assert itrs != null; + // assert !isDetached(); + // assert count > 0; + + final int cycles = itrs.cycles; + final int takeIndex = ArrayBlockingQueue.this.takeIndex; + final int prevCycles = this.prevCycles; + final int prevTakeIndex = this.prevTakeIndex; + + if (cycles != prevCycles || takeIndex != prevTakeIndex) { + final int len = items.length; + // how far takeIndex has advanced since the previous + // operation of this iterator + long dequeues = (cycles - prevCycles) * len + + (takeIndex - prevTakeIndex); + + // Check indices for invalidation + if (invalidated(lastRet, prevTakeIndex, dequeues, len)) + lastRet = REMOVED; + if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) + nextIndex = REMOVED; + if (invalidated(cursor, prevTakeIndex, dequeues, len)) + cursor = takeIndex; + + if (cursor < 0 && nextIndex < 0 && lastRet < 0) + detach(); + else { + this.prevCycles = cycles; + this.prevTakeIndex = takeIndex; + } + } + } + + /** + * Called when itrs should stop tracking this iterator, either + * because there are no more indices to update (cursor < 0 && + * nextIndex < 0 && lastRet < 0) or as a special exception, when + * lastRet >= 0, because hasNext() is about to return false for the + * first time. Call only from iterating thread. + */ + private void detach() { + // Switch to detached mode + // assert lock.getHoldCount() == 1; + // assert cursor == NONE; + // assert nextIndex < 0; + // assert lastRet < 0 || nextItem == null; + // assert lastRet < 0 ^ lastItem != null; + if (prevTakeIndex >= 0) { + // assert itrs != null; + prevTakeIndex = DETACHED; + // try to unlink from itrs (but not too hard) + itrs.doSomeSweeping(true); + } + } + + /** + * For performance reasons, we would like not to acquire a lock in + * hasNext in the common case. To allow for this, we only access + * fields (i.e. nextItem) that are not modified by update operations + * triggered by queue modifications. + */ public boolean hasNext() { - return remaining > 0; + // assert lock.getHoldCount() == 0; + if (nextItem != null) + return true; + noNext(); + return false; + } + + private void noNext() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + // assert cursor == NONE; + // assert nextIndex == NONE; + if (!isDetached()) { + // assert lastRet >= 0; + incorporateDequeues(); // might update lastRet + if (lastRet >= 0) { + lastItem = itemAt(lastRet); + // assert lastItem != null; + detach(); + } + } + // assert isDetached(); + // assert lastRet < 0 ^ lastItem != null; + } finally { + lock.unlock(); + } } public E next() { + // assert lock.getHoldCount() == 0; + final E x = nextItem; + if (x == null) + throw new NoSuchElementException(); final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { - if (remaining <= 0) - throw new NoSuchElementException(); + if (!isDetached()) + incorporateDequeues(); + // assert nextIndex != NONE; + // assert lastItem == null; lastRet = nextIndex; - E x = itemAt(nextIndex); // check for fresher value - if (x == null) { - x = nextItem; // we are forced to report old value - lastItem = null; // but ensure remove fails + final int cursor = this.cursor; + if (cursor >= 0) { + nextItem = itemAt(nextIndex = cursor); + // assert nextItem != null; + this.cursor = incCursor(cursor); + } else { + nextIndex = NONE; + nextItem = null; } - else - lastItem = x; - while (--remaining > 0 && // skip over nulls - (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) - ; - return x; } finally { lock.unlock(); } + return x; } public void remove() { + // assert lock.getHoldCount() == 0; final ReentrantLock lock = ArrayBlockingQueue.this.lock; lock.lock(); try { - int i = lastRet; - if (i == -1) + if (!isDetached()) + incorporateDequeues(); // might update lastRet or detach + final int lastRet = this.lastRet; + this.lastRet = NONE; + if (lastRet >= 0) { + if (!isDetached()) + removeAt(lastRet); + else { + final E lastItem = this.lastItem; + // assert lastItem != null; + this.lastItem = null; + if (itemAt(lastRet) == lastItem) + removeAt(lastRet); + } + } else if (lastRet == NONE) throw new IllegalStateException(); - lastRet = -1; - E x = lastItem; - lastItem = null; - // only remove if item still at index - if (x != null && x == items[i]) { - boolean removingHead = (i == takeIndex); - removeAt(i); - if (!removingHead) - nextIndex = dec(nextIndex); - } + // else lastRet == REMOVED and the last returned element was + // previously asynchronously removed via an operation other + // than this.remove(), so nothing to do. + + if (cursor < 0 && nextIndex < 0) + detach(); } finally { lock.unlock(); + // assert lastRet == NONE; + // assert lastItem == null; } } - } + /** + * Called to notify the iterator that the queue is empty, or that it + * has fallen hopelessly behind, so that it should abandon any + * further iteration, except possibly to return one more element + * from next(), as promised by returning true from hasNext(). + */ + void shutdown() { + // assert lock.getHoldCount() == 1; + cursor = NONE; + if (nextIndex >= 0) + nextIndex = REMOVED; + if (lastRet >= 0) { + lastRet = REMOVED; + lastItem = null; + } + prevTakeIndex = DETACHED; + // Don't set nextItem to null because we must continue to be + // able to return it on next(). + // + // Caller will unlink from itrs when convenient. + } + + private int distance(int index, int prevTakeIndex, int length) { + int distance = index - prevTakeIndex; + if (distance < 0) + distance += length; + return distance; + } + + /** + * Called whenever an interior remove (not at takeIndex) occured. + * + * @return true if this iterator should be unlinked from itrs + */ + boolean removedAt(int removedIndex) { + // assert lock.getHoldCount() == 1; + if (isDetached()) + return true; + + final int cycles = itrs.cycles; + final int takeIndex = ArrayBlockingQueue.this.takeIndex; + final int prevCycles = this.prevCycles; + final int prevTakeIndex = this.prevTakeIndex; + final int len = items.length; + int cycleDiff = cycles - prevCycles; + if (removedIndex < takeIndex) + cycleDiff++; + final int removedDistance = + (cycleDiff * len) + (removedIndex - prevTakeIndex); + // assert removedDistance >= 0; + int cursor = this.cursor; + if (cursor >= 0) { + int x = distance(cursor, prevTakeIndex, len); + if (x == removedDistance) { + if (cursor == putIndex) + this.cursor = cursor = NONE; + } + else if (x > removedDistance) { + // assert cursor != prevTakeIndex; + this.cursor = cursor = dec(cursor); + } + } + int lastRet = this.lastRet; + if (lastRet >= 0) { + int x = distance(lastRet, prevTakeIndex, len); + if (x == removedDistance) + this.lastRet = lastRet = REMOVED; + else if (x > removedDistance) + this.lastRet = lastRet = dec(lastRet); + } + int nextIndex = this.nextIndex; + if (nextIndex >= 0) { + int x = distance(nextIndex, prevTakeIndex, len); + if (x == removedDistance) + this.nextIndex = nextIndex = REMOVED; + else if (x > removedDistance) + this.nextIndex = nextIndex = dec(nextIndex); + } + else if (cursor < 0 && nextIndex < 0 && lastRet < 0) { + this.prevTakeIndex = DETACHED; + return true; + } + return false; + } + + /** + * Called whenever takeIndex wraps around to zero. + * + * @return true if this iterator should be unlinked from itrs + */ + boolean takeIndexWrapped() { + // assert lock.getHoldCount() == 1; + if (isDetached()) + return true; + if (itrs.cycles - prevCycles > 1) { + // All the elements that existed at the time of the last + // operation are gone, so abandon further iteration. + shutdown(); + return true; + } + return false; + } + +// /** Uncomment for debugging. */ +// public String toString() { +// return ("cursor=" + cursor + " " + +// "nextIndex=" + nextIndex + " " + +// "lastRet=" + lastRet + " " + +// "nextItem=" + nextItem + " " + +// "lastItem=" + lastItem + " " + +// "prevCycles=" + prevCycles + " " + +// "prevTakeIndex=" + prevTakeIndex + " " + +// "size()=" + size() + " " + +// "remainingCapacity()=" + remainingCapacity()); +// } + } } |