summaryrefslogtreecommitdiffstats
path: root/jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java')
-rw-r--r--jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java601
1 files changed, 601 insertions, 0 deletions
diff --git a/jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java b/jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java
new file mode 100644
index 0000000..711b47b
--- /dev/null
+++ b/jsr166-tests/src/test/java/jsr166/SynchronousQueueTest.java
@@ -0,0 +1,601 @@
+/*
+ * Written by Doug Lea with assistance from members of JCP JSR-166
+ * Expert Group and released to the public domain, as explained at
+ * http://creativecommons.org/publicdomain/zero/1.0/
+ * Other contributors include Andrew Wright, Jeffrey Hayes,
+ * Pat Fisher, Mike Judd.
+ */
+
+package jsr166;
+
+import junit.framework.*;
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+public class SynchronousQueueTest extends JSR166TestCase {
+
+ public static class Fair extends BlockingQueueTest {
+ protected BlockingQueue emptyCollection() {
+ return new SynchronousQueue(true);
+ }
+ }
+
+ public static class NonFair extends BlockingQueueTest {
+ protected BlockingQueue emptyCollection() {
+ return new SynchronousQueue(false);
+ }
+ }
+
+ /**
+ * Any SynchronousQueue is both empty and full
+ */
+ public void testEmptyFull() { testEmptyFull(false); }
+ public void testEmptyFull_fair() { testEmptyFull(true); }
+ public void testEmptyFull(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ assertTrue(q.isEmpty());
+ assertEquals(0, q.size());
+ assertEquals(0, q.remainingCapacity());
+ assertFalse(q.offer(zero));
+ }
+
+ /**
+ * offer fails if no active taker
+ */
+ public void testOffer() { testOffer(false); }
+ public void testOffer_fair() { testOffer(true); }
+ public void testOffer(boolean fair) {
+ SynchronousQueue q = new SynchronousQueue(fair);
+ assertFalse(q.offer(one));
+ }
+
+ /**
+ * add throws IllegalStateException if no active taker
+ */
+ public void testAdd() { testAdd(false); }
+ public void testAdd_fair() { testAdd(true); }
+ public void testAdd(boolean fair) {
+ SynchronousQueue q = new SynchronousQueue(fair);
+ assertEquals(0, q.remainingCapacity());
+ try {
+ q.add(one);
+ shouldThrow();
+ } catch (IllegalStateException success) {}
+ }
+
+ /**
+ * addAll(this) throws IllegalArgumentException
+ */
+ public void testAddAll_self() { testAddAll_self(false); }
+ public void testAddAll_self_fair() { testAddAll_self(true); }
+ public void testAddAll_self(boolean fair) {
+ SynchronousQueue q = new SynchronousQueue(fair);
+ try {
+ q.addAll(q);
+ shouldThrow();
+ } catch (IllegalArgumentException success) {}
+ }
+
+ /**
+ * addAll throws ISE if no active taker
+ */
+ public void testAddAll_ISE() { testAddAll_ISE(false); }
+ public void testAddAll_ISE_fair() { testAddAll_ISE(true); }
+ public void testAddAll_ISE(boolean fair) {
+ SynchronousQueue q = new SynchronousQueue(fair);
+ Integer[] ints = new Integer[1];
+ for (int i = 0; i < ints.length; i++)
+ ints[i] = i;
+ Collection<Integer> coll = Arrays.asList(ints);
+ try {
+ q.addAll(coll);
+ shouldThrow();
+ } catch (IllegalStateException success) {}
+ }
+
+ /**
+ * put blocks interruptibly if no active taker
+ */
+ public void testBlockingPut() { testBlockingPut(false); }
+ public void testBlockingPut_fair() { testBlockingPut(true); }
+ public void testBlockingPut(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ Thread.currentThread().interrupt();
+ try {
+ q.put(99);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ assertFalse(Thread.interrupted());
+
+ pleaseInterrupt.countDown();
+ try {
+ q.put(99);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ assertFalse(Thread.interrupted());
+ }});
+
+ await(pleaseInterrupt);
+ assertThreadStaysAlive(t);
+ t.interrupt();
+ awaitTermination(t);
+ assertEquals(0, q.remainingCapacity());
+ }
+
+ /**
+ * put blocks interruptibly waiting for take
+ */
+ public void testPutWithTake() { testPutWithTake(false); }
+ public void testPutWithTake_fair() { testPutWithTake(true); }
+ public void testPutWithTake(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ final CountDownLatch pleaseTake = new CountDownLatch(1);
+ final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ pleaseTake.countDown();
+ q.put(one);
+
+ pleaseInterrupt.countDown();
+ try {
+ q.put(99);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ assertFalse(Thread.interrupted());
+ }});
+
+ await(pleaseTake);
+ assertEquals(0, q.remainingCapacity());
+ try { assertSame(one, q.take()); }
+ catch (InterruptedException e) { threadUnexpectedException(e); }
+
+ await(pleaseInterrupt);
+ assertThreadStaysAlive(t);
+ t.interrupt();
+ awaitTermination(t);
+ assertEquals(0, q.remainingCapacity());
+ }
+
+ /**
+ * timed offer times out if elements not taken
+ */
+ public void testTimedOffer() { testTimedOffer(false); }
+ public void testTimedOffer_fair() { testTimedOffer(true); }
+ public void testTimedOffer(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ long startTime = System.nanoTime();
+ assertFalse(q.offer(new Object(), timeoutMillis(), MILLISECONDS));
+ assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
+ pleaseInterrupt.countDown();
+ try {
+ q.offer(new Object(), 2 * LONG_DELAY_MS, MILLISECONDS);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ }});
+
+ await(pleaseInterrupt);
+ assertThreadStaysAlive(t);
+ t.interrupt();
+ awaitTermination(t);
+ }
+
+ /**
+ * poll return null if no active putter
+ */
+ public void testPoll() { testPoll(false); }
+ public void testPoll_fair() { testPoll(true); }
+ public void testPoll(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ assertNull(q.poll());
+ }
+
+ /**
+ * timed poll with zero timeout times out if no active putter
+ */
+ public void testTimedPoll0() { testTimedPoll0(false); }
+ public void testTimedPoll0_fair() { testTimedPoll0(true); }
+ public void testTimedPoll0(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ try { assertNull(q.poll(0, MILLISECONDS)); }
+ catch (InterruptedException e) { threadUnexpectedException(e); }
+ }
+
+ /**
+ * timed poll with nonzero timeout times out if no active putter
+ */
+ public void testTimedPoll() { testTimedPoll(false); }
+ public void testTimedPoll_fair() { testTimedPoll(true); }
+ public void testTimedPoll(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ long startTime = System.nanoTime();
+ try { assertNull(q.poll(timeoutMillis(), MILLISECONDS)); }
+ catch (InterruptedException e) { threadUnexpectedException(e); }
+ assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
+ }
+
+ /**
+ * timed poll before a delayed offer times out, returning null;
+ * after offer succeeds; on interruption throws
+ */
+ public void testTimedPollWithOffer() { testTimedPollWithOffer(false); }
+ public void testTimedPollWithOffer_fair() { testTimedPollWithOffer(true); }
+ public void testTimedPollWithOffer(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ final CountDownLatch pleaseOffer = new CountDownLatch(1);
+ final CountDownLatch pleaseInterrupt = new CountDownLatch(1);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ long startTime = System.nanoTime();
+ assertNull(q.poll(timeoutMillis(), MILLISECONDS));
+ assertTrue(millisElapsedSince(startTime) >= timeoutMillis());
+
+ pleaseOffer.countDown();
+ startTime = System.nanoTime();
+ assertSame(zero, q.poll(LONG_DELAY_MS, MILLISECONDS));
+ assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
+
+ Thread.currentThread().interrupt();
+ try {
+ q.poll(LONG_DELAY_MS, MILLISECONDS);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ assertFalse(Thread.interrupted());
+
+ pleaseInterrupt.countDown();
+ try {
+ q.poll(LONG_DELAY_MS, MILLISECONDS);
+ shouldThrow();
+ } catch (InterruptedException success) {}
+ assertFalse(Thread.interrupted());
+ }});
+
+ await(pleaseOffer);
+ long startTime = System.nanoTime();
+ try { assertTrue(q.offer(zero, LONG_DELAY_MS, MILLISECONDS)); }
+ catch (InterruptedException e) { threadUnexpectedException(e); }
+ assertTrue(millisElapsedSince(startTime) < MEDIUM_DELAY_MS);
+
+ await(pleaseInterrupt);
+ assertThreadStaysAlive(t);
+ t.interrupt();
+ awaitTermination(t);
+ }
+
+ /**
+ * peek() returns null if no active putter
+ */
+ public void testPeek() { testPeek(false); }
+ public void testPeek_fair() { testPeek(true); }
+ public void testPeek(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ assertNull(q.peek());
+ }
+
+ /**
+ * element() throws NoSuchElementException if no active putter
+ */
+ public void testElement() { testElement(false); }
+ public void testElement_fair() { testElement(true); }
+ public void testElement(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ try {
+ q.element();
+ shouldThrow();
+ } catch (NoSuchElementException success) {}
+ }
+
+ /**
+ * remove() throws NoSuchElementException if no active putter
+ */
+ public void testRemove() { testRemove(false); }
+ public void testRemove_fair() { testRemove(true); }
+ public void testRemove(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ try {
+ q.remove();
+ shouldThrow();
+ } catch (NoSuchElementException success) {}
+ }
+
+ /**
+ * contains returns false
+ */
+ public void testContains() { testContains(false); }
+ public void testContains_fair() { testContains(true); }
+ public void testContains(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ assertFalse(q.contains(zero));
+ }
+
+ /**
+ * clear ensures isEmpty
+ */
+ public void testClear() { testClear(false); }
+ public void testClear_fair() { testClear(true); }
+ public void testClear(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ q.clear();
+ assertTrue(q.isEmpty());
+ }
+
+ /**
+ * containsAll returns false unless empty
+ */
+ public void testContainsAll() { testContainsAll(false); }
+ public void testContainsAll_fair() { testContainsAll(true); }
+ public void testContainsAll(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Integer[] empty = new Integer[0];
+ assertTrue(q.containsAll(Arrays.asList(empty)));
+ Integer[] ints = new Integer[1]; ints[0] = zero;
+ assertFalse(q.containsAll(Arrays.asList(ints)));
+ }
+
+ /**
+ * retainAll returns false
+ */
+ public void testRetainAll() { testRetainAll(false); }
+ public void testRetainAll_fair() { testRetainAll(true); }
+ public void testRetainAll(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Integer[] empty = new Integer[0];
+ assertFalse(q.retainAll(Arrays.asList(empty)));
+ Integer[] ints = new Integer[1]; ints[0] = zero;
+ assertFalse(q.retainAll(Arrays.asList(ints)));
+ }
+
+ /**
+ * removeAll returns false
+ */
+ public void testRemoveAll() { testRemoveAll(false); }
+ public void testRemoveAll_fair() { testRemoveAll(true); }
+ public void testRemoveAll(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Integer[] empty = new Integer[0];
+ assertFalse(q.removeAll(Arrays.asList(empty)));
+ Integer[] ints = new Integer[1]; ints[0] = zero;
+ assertFalse(q.containsAll(Arrays.asList(ints)));
+ }
+
+ /**
+ * toArray is empty
+ */
+ public void testToArray() { testToArray(false); }
+ public void testToArray_fair() { testToArray(true); }
+ public void testToArray(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Object[] o = q.toArray();
+ assertEquals(0, o.length);
+ }
+
+ /**
+ * toArray(Integer array) returns its argument with the first
+ * element (if present) nulled out
+ */
+ public void testToArray2() { testToArray2(false); }
+ public void testToArray2_fair() { testToArray2(true); }
+ public void testToArray2(boolean fair) {
+ final SynchronousQueue<Integer> q
+ = new SynchronousQueue<Integer>(fair);
+ Integer[] a;
+
+ a = new Integer[0];
+ assertSame(a, q.toArray(a));
+
+ a = new Integer[3];
+ Arrays.fill(a, 42);
+ assertSame(a, q.toArray(a));
+ assertNull(a[0]);
+ for (int i = 1; i < a.length; i++)
+ assertEquals(42, (int) a[i]);
+ }
+
+ /**
+ * toArray(null) throws NPE
+ */
+ public void testToArray_null() { testToArray_null(false); }
+ public void testToArray_null_fair() { testToArray_null(true); }
+ public void testToArray_null(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ try {
+ Object o[] = q.toArray(null);
+ shouldThrow();
+ } catch (NullPointerException success) {}
+ }
+
+ /**
+ * iterator does not traverse any elements
+ */
+ public void testIterator() { testIterator(false); }
+ public void testIterator_fair() { testIterator(true); }
+ public void testIterator(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Iterator it = q.iterator();
+ assertFalse(it.hasNext());
+ try {
+ Object x = it.next();
+ shouldThrow();
+ } catch (NoSuchElementException success) {}
+ }
+
+ /**
+ * iterator remove throws ISE
+ */
+ public void testIteratorRemove() { testIteratorRemove(false); }
+ public void testIteratorRemove_fair() { testIteratorRemove(true); }
+ public void testIteratorRemove(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Iterator it = q.iterator();
+ try {
+ it.remove();
+ shouldThrow();
+ } catch (IllegalStateException success) {}
+ }
+
+ /**
+ * toString returns a non-null string
+ */
+ public void testToString() { testToString(false); }
+ public void testToString_fair() { testToString(true); }
+ public void testToString(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ String s = q.toString();
+ assertNotNull(s);
+ }
+
+ /**
+ * offer transfers elements across Executor tasks
+ */
+ public void testOfferInExecutor() { testOfferInExecutor(false); }
+ public void testOfferInExecutor_fair() { testOfferInExecutor(true); }
+ public void testOfferInExecutor(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ final CheckedBarrier threadsStarted = new CheckedBarrier(2);
+
+ executor.execute(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ assertFalse(q.offer(one));
+ threadsStarted.await();
+ assertTrue(q.offer(one, LONG_DELAY_MS, MILLISECONDS));
+ assertEquals(0, q.remainingCapacity());
+ }});
+
+ executor.execute(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ threadsStarted.await();
+ assertSame(one, q.take());
+ }});
+
+ joinPool(executor);
+ }
+
+ /**
+ * timed poll retrieves elements across Executor threads
+ */
+ public void testPollInExecutor() { testPollInExecutor(false); }
+ public void testPollInExecutor_fair() { testPollInExecutor(true); }
+ public void testPollInExecutor(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ final CheckedBarrier threadsStarted = new CheckedBarrier(2);
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ executor.execute(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ assertNull(q.poll());
+ threadsStarted.await();
+ assertSame(one, q.poll(LONG_DELAY_MS, MILLISECONDS));
+ assertTrue(q.isEmpty());
+ }});
+
+ executor.execute(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ threadsStarted.await();
+ q.put(one);
+ }});
+
+ joinPool(executor);
+ }
+
+ /**
+ * a deserialized serialized queue is usable
+ */
+ public void testSerialization() {
+ final SynchronousQueue x = new SynchronousQueue();
+ final SynchronousQueue y = new SynchronousQueue(false);
+ final SynchronousQueue z = new SynchronousQueue(true);
+ assertSerialEquals(x, y);
+ assertNotSerialEquals(x, z);
+ SynchronousQueue[] qs = { x, y, z };
+ for (SynchronousQueue q : qs) {
+ SynchronousQueue clone = serialClone(q);
+ assertNotSame(q, clone);
+ assertSerialEquals(q, clone);
+ assertTrue(clone.isEmpty());
+ assertEquals(0, clone.size());
+ assertEquals(0, clone.remainingCapacity());
+ assertFalse(clone.offer(zero));
+ }
+ }
+
+ /**
+ * drainTo(c) of empty queue doesn't transfer elements
+ */
+ public void testDrainTo() { testDrainTo(false); }
+ public void testDrainTo_fair() { testDrainTo(true); }
+ public void testDrainTo(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ ArrayList l = new ArrayList();
+ q.drainTo(l);
+ assertEquals(0, q.size());
+ assertEquals(0, l.size());
+ }
+
+ /**
+ * drainTo empties queue, unblocking a waiting put.
+ */
+ public void testDrainToWithActivePut() { testDrainToWithActivePut(false); }
+ public void testDrainToWithActivePut_fair() { testDrainToWithActivePut(true); }
+ public void testDrainToWithActivePut(boolean fair) {
+ final SynchronousQueue q = new SynchronousQueue(fair);
+ Thread t = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ q.put(one);
+ }});
+
+ ArrayList l = new ArrayList();
+ long startTime = System.nanoTime();
+ while (l.isEmpty()) {
+ q.drainTo(l);
+ if (millisElapsedSince(startTime) > LONG_DELAY_MS)
+ fail("timed out");
+ Thread.yield();
+ }
+ assertTrue(l.size() == 1);
+ assertSame(one, l.get(0));
+ awaitTermination(t);
+ }
+
+ /**
+ * drainTo(c, n) empties up to n elements of queue into c
+ */
+ public void testDrainToN() throws InterruptedException {
+ final SynchronousQueue q = new SynchronousQueue();
+ Thread t1 = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ q.put(one);
+ }});
+
+ Thread t2 = newStartedThread(new CheckedRunnable() {
+ public void realRun() throws InterruptedException {
+ q.put(two);
+ }});
+
+ ArrayList l = new ArrayList();
+ delay(SHORT_DELAY_MS);
+ q.drainTo(l, 1);
+ assertEquals(1, l.size());
+ q.drainTo(l, 1);
+ assertEquals(2, l.size());
+ assertTrue(l.contains(one));
+ assertTrue(l.contains(two));
+ awaitTermination(t1);
+ awaitTermination(t2);
+ }
+
+}