summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCalin Juravle <calin@google.com>2013-07-09 19:42:18 +0100
committerCalin Juravle <calin@google.com>2013-08-14 19:06:09 +0100
commit75a06e56a4cc4599946e21422513e4bafa759509 (patch)
tree53e7cf102f29e509154291d36230d8a1898feeed
parentf85a03031c10e0bb17b2ea22cceeba9091123aae (diff)
downloadlibcore-75a06e56a4cc4599946e21422513e4bafa759509.zip
libcore-75a06e56a4cc4599946e21422513e4bafa759509.tar.gz
libcore-75a06e56a4cc4599946e21422513e4bafa759509.tar.bz2
Synced java.util.concurrent library up to 2013.07.01.
Compared to the reference code base I re-added AbstracMap as a base class for ConcurrentHashMap. Change-Id: I0751638784e962425418ea8640721c00d7200873
-rw-r--r--libdvm/src/main/java/java/lang/Class.java22
-rw-r--r--luni/src/main/java/java/util/concurrent/ConcurrentHashMap.java3892
-rw-r--r--luni/src/main/java/java/util/concurrent/ConcurrentLinkedQueue.java2
-rw-r--r--luni/src/main/java/java/util/concurrent/CountedCompleter.java113
-rw-r--r--luni/src/main/java/java/util/concurrent/ForkJoinPool.java1647
-rw-r--r--luni/src/main/java/java/util/concurrent/ForkJoinTask.java108
-rw-r--r--luni/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java17
7 files changed, 3760 insertions, 2041 deletions
diff --git a/libdvm/src/main/java/java/lang/Class.java b/libdvm/src/main/java/java/lang/Class.java
index 2f26688..c8064cb 100644
--- a/libdvm/src/main/java/java/lang/Class.java
+++ b/libdvm/src/main/java/java/lang/Class.java
@@ -54,13 +54,14 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import libcore.util.CollectionUtils;
import libcore.util.EmptyArray;
import org.apache.harmony.kernel.vm.StringUtils;
import libcore.reflect.AnnotationAccess;
import libcore.reflect.GenericSignatureParser;
import libcore.reflect.Types;
-
+import libcore.util.BasicLruCache;
/**
* The in-memory representation of a Java class. This representation serves as
* the starting point for querying class-related information, a process usually
@@ -779,9 +780,17 @@ public final class Class<T> implements Serializable, AnnotatedElement, GenericDe
* void} then an empty array is returned.
*/
public Type[] getGenericInterfaces() {
- GenericSignatureParser parser = new GenericSignatureParser(getClassLoader());
- parser.parseForClass(this, getSignatureAttribute());
- return Types.getClonedTypeArray(parser.interfaceTypes);
+ Type[] result;
+ synchronized (Caches.genericInterfaces) {
+ result = Caches.genericInterfaces.get(this);
+ if (result == null) {
+ GenericSignatureParser parser = new GenericSignatureParser(getClassLoader());
+ parser.parseForClass(this, getSignatureAttribute());
+ result = Types.getClonedTypeArray(parser.interfaceTypes);
+ Caches.genericInterfaces.put(this, result);
+ }
+ }
+ return result;
}
/**
@@ -1262,4 +1271,9 @@ public final class Class<T> implements Serializable, AnnotatedElement, GenericDe
return AnnotationAccess.typeIndexToAnnotationDirectoryOffset(getDex(), getTypeIndex());
}
+ private static class Caches {
+ private static final BasicLruCache<Class, Type[]> genericInterfaces
+ = new BasicLruCache<Class, Type[]>(50);
+ }
+
}
diff --git a/luni/src/main/java/java/util/concurrent/ConcurrentHashMap.java b/luni/src/main/java/java/util/concurrent/ConcurrentHashMap.java
index c85a5cc..515cc38 100644
--- a/luni/src/main/java/java/util/concurrent/ConcurrentHashMap.java
+++ b/luni/src/main/java/java/util/concurrent/ConcurrentHashMap.java
@@ -5,865 +5,729 @@
*/
package java.util.concurrent;
-import java.util.concurrent.locks.*;
-import java.util.*;
+
+import java.io.ObjectStreamField;
import java.io.Serializable;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.ConcurrentModificationException;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantLock;
// BEGIN android-note
// removed link to collections framework docs
+// removed links to hidden api
// END android-note
/**
* A hash table supporting full concurrency of retrievals and
- * adjustable expected concurrency for updates. This class obeys the
+ * high expected concurrency for updates. This class obeys the
* same functional specification as {@link java.util.Hashtable}, and
* includes versions of methods corresponding to each method of
- * <tt>Hashtable</tt>. However, even though all operations are
+ * {@code Hashtable}. However, even though all operations are
* thread-safe, retrieval operations do <em>not</em> entail locking,
* and there is <em>not</em> any support for locking the entire table
* in a way that prevents all access. This class is fully
- * interoperable with <tt>Hashtable</tt> in programs that rely on its
+ * interoperable with {@code Hashtable} in programs that rely on its
* thread safety but not on its synchronization details.
*
- * <p> Retrieval operations (including <tt>get</tt>) generally do not
- * block, so may overlap with update operations (including
- * <tt>put</tt> and <tt>remove</tt>). Retrievals reflect the results
- * of the most recently <em>completed</em> update operations holding
- * upon their onset. For aggregate operations such as <tt>putAll</tt>
- * and <tt>clear</tt>, concurrent retrievals may reflect insertion or
- * removal of only some entries. Similarly, Iterators and
- * Enumerations return elements reflecting the state of the hash table
- * at some point at or since the creation of the iterator/enumeration.
- * They do <em>not</em> throw {@link ConcurrentModificationException}.
- * However, iterators are designed to be used by only one thread at a time.
+ * <p>Retrieval operations (including {@code get}) generally do not
+ * block, so may overlap with update operations (including {@code put}
+ * and {@code remove}). Retrievals reflect the results of the most
+ * recently <em>completed</em> update operations holding upon their
+ * onset. (More formally, an update operation for a given key bears a
+ * <em>happens-before</em> relation with any (non-null) retrieval for
+ * that key reporting the updated value.) For aggregate operations
+ * such as {@code putAll} and {@code clear}, concurrent retrievals may
+ * reflect insertion or removal of only some entries. Similarly,
+ * Iterators and Enumerations return elements reflecting the state of
+ * the hash table at some point at or since the creation of the
+ * iterator/enumeration. They do <em>not</em> throw {@link
+ * ConcurrentModificationException}. However, iterators are designed
+ * to be used by only one thread at a time. Bear in mind that the
+ * results of aggregate status methods including {@code size}, {@code
+ * isEmpty}, and {@code containsValue} are typically useful only when
+ * a map is not undergoing concurrent updates in other threads.
+ * Otherwise the results of these methods reflect transient states
+ * that may be adequate for monitoring or estimation purposes, but not
+ * for program control.
*
- * <p> The allowed concurrency among update operations is guided by
- * the optional <tt>concurrencyLevel</tt> constructor argument
- * (default <tt>16</tt>), which is used as a hint for internal sizing. The
- * table is internally partitioned to try to permit the indicated
- * number of concurrent updates without contention. Because placement
- * in hash tables is essentially random, the actual concurrency will
- * vary. Ideally, you should choose a value to accommodate as many
- * threads as will ever concurrently modify the table. Using a
- * significantly higher value than you need can waste space and time,
- * and a significantly lower value can lead to thread contention. But
- * overestimates and underestimates within an order of magnitude do
- * not usually have much noticeable impact. A value of one is
- * appropriate when it is known that only one thread will modify and
- * all others will only read. Also, resizing this or any other kind of
- * hash table is a relatively slow operation, so, when possible, it is
- * a good idea to provide estimates of expected table sizes in
- * constructors.
+ * <p>The table is dynamically expanded when there are too many
+ * collisions (i.e., keys that have distinct hash codes but fall into
+ * the same slot modulo the table size), with the expected average
+ * effect of maintaining roughly two bins per mapping (corresponding
+ * to a 0.75 load factor threshold for resizing). There may be much
+ * variance around this average as mappings are added and removed, but
+ * overall, this maintains a commonly accepted time/space tradeoff for
+ * hash tables. However, resizing this or any other kind of hash
+ * table may be a relatively slow operation. When possible, it is a
+ * good idea to provide a size estimate as an optional {@code
+ * initialCapacity} constructor argument. An additional optional
+ * {@code loadFactor} constructor argument provides a further means of
+ * customizing initial table capacity by specifying the table density
+ * to be used in calculating the amount of space to allocate for the
+ * given number of elements. Also, for compatibility with previous
+ * versions of this class, constructors may optionally specify an
+ * expected {@code concurrencyLevel} as an additional hint for
+ * internal sizing. Note that using many keys with exactly the same
+ * {@code hashCode()} is a sure way to slow down performance of any
+ * hash table. To ameliorate impact, when keys are {@link Comparable},
+ * this class may use comparison order among keys to help break ties.
*
* <p>This class and its views and iterators implement all of the
* <em>optional</em> methods of the {@link Map} and {@link Iterator}
* interfaces.
*
- * <p> Like {@link Hashtable} but unlike {@link HashMap}, this class
- * does <em>not</em> allow <tt>null</tt> to be used as a key or value.
+ * <p>Like {@link Hashtable} but unlike {@link HashMap}, this class
+ * does <em>not</em> allow {@code null} to be used as a key or value.
*
* @since 1.5
* @author Doug Lea
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
-public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
- implements ConcurrentMap<K, V>, Serializable {
+public class ConcurrentHashMap<K,V> extends java.util.AbstractMap<K,V>
+ implements ConcurrentMap<K,V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
/*
- * The basic strategy is to subdivide the table among Segments,
- * each of which itself is a concurrently readable hash table. To
- * reduce footprint, all but one segments are constructed only
- * when first needed (see ensureSegment). To maintain visibility
- * in the presence of lazy construction, accesses to segments as
- * well as elements of segment's table must use volatile access,
- * which is done via Unsafe within methods segmentAt etc
- * below. These provide the functionality of AtomicReferenceArrays
- * but reduce the levels of indirection. Additionally,
- * volatile-writes of table elements and entry "next" fields
- * within locked operations use the cheaper "lazySet" forms of
- * writes (via putOrderedObject) because these writes are always
- * followed by lock releases that maintain sequential consistency
- * of table updates.
- *
- * Historical note: The previous version of this class relied
- * heavily on "final" fields, which avoided some volatile reads at
- * the expense of a large initial footprint. Some remnants of
- * that design (including forced construction of segment 0) exist
- * to ensure serialization compatibility.
+ * Overview:
+ *
+ * The primary design goal of this hash table is to maintain
+ * concurrent readability (typically method get(), but also
+ * iterators and related methods) while minimizing update
+ * contention. Secondary goals are to keep space consumption about
+ * the same or better than java.util.HashMap, and to support high
+ * initial insertion rates on an empty table by many threads.
+ *
+ * This map usually acts as a binned (bucketed) hash table. Each
+ * key-value mapping is held in a Node. Most nodes are instances
+ * of the basic Node class with hash, key, value, and next
+ * fields. However, various subclasses exist: TreeNodes are
+ * arranged in balanced trees, not lists. TreeBins hold the roots
+ * of sets of TreeNodes. ForwardingNodes are placed at the heads
+ * of bins during resizing. ReservationNodes are used as
+ * placeholders while establishing values in computeIfAbsent and
+ * related methods. The types TreeBin, ForwardingNode, and
+ * ReservationNode do not hold normal user keys, values, or
+ * hashes, and are readily distinguishable during search etc
+ * because they have negative hash fields and null key and value
+ * fields. (These special nodes are either uncommon or transient,
+ * so the impact of carrying around some unused fields is
+ * insignificant.)
+ *
+ * The table is lazily initialized to a power-of-two size upon the
+ * first insertion. Each bin in the table normally contains a
+ * list of Nodes (most often, the list has only zero or one Node).
+ * Table accesses require volatile/atomic reads, writes, and
+ * CASes. Because there is no other way to arrange this without
+ * adding further indirections, we use intrinsics
+ * (sun.misc.Unsafe) operations.
+ *
+ * We use the top (sign) bit of Node hash fields for control
+ * purposes -- it is available anyway because of addressing
+ * constraints. Nodes with negative hash fields are specially
+ * handled or ignored in map methods.
+ *
+ * Insertion (via put or its variants) of the first node in an
+ * empty bin is performed by just CASing it to the bin. This is
+ * by far the most common case for put operations under most
+ * key/hash distributions. Other update operations (insert,
+ * delete, and replace) require locks. We do not want to waste
+ * the space required to associate a distinct lock object with
+ * each bin, so instead use the first node of a bin list itself as
+ * a lock. Locking support for these locks relies on builtin
+ * "synchronized" monitors.
+ *
+ * Using the first node of a list as a lock does not by itself
+ * suffice though: When a node is locked, any update must first
+ * validate that it is still the first node after locking it, and
+ * retry if not. Because new nodes are always appended to lists,
+ * once a node is first in a bin, it remains first until deleted
+ * or the bin becomes invalidated (upon resizing).
+ *
+ * The main disadvantage of per-bin locks is that other update
+ * operations on other nodes in a bin list protected by the same
+ * lock can stall, for example when user equals() or mapping
+ * functions take a long time. However, statistically, under
+ * random hash codes, this is not a common problem. Ideally, the
+ * frequency of nodes in bins follows a Poisson distribution
+ * (http://en.wikipedia.org/wiki/Poisson_distribution) with a
+ * parameter of about 0.5 on average, given the resizing threshold
+ * of 0.75, although with a large variance because of resizing
+ * granularity. Ignoring variance, the expected occurrences of
+ * list size k are (exp(-0.5) * pow(0.5, k) / factorial(k)). The
+ * first values are:
+ *
+ * 0: 0.60653066
+ * 1: 0.30326533
+ * 2: 0.07581633
+ * 3: 0.01263606
+ * 4: 0.00157952
+ * 5: 0.00015795
+ * 6: 0.00001316
+ * 7: 0.00000094
+ * 8: 0.00000006
+ * more: less than 1 in ten million
+ *
+ * Lock contention probability for two threads accessing distinct
+ * elements is roughly 1 / (8 * #elements) under random hashes.
+ *
+ * Actual hash code distributions encountered in practice
+ * sometimes deviate significantly from uniform randomness. This
+ * includes the case when N > (1<<30), so some keys MUST collide.
+ * Similarly for dumb or hostile usages in which multiple keys are
+ * designed to have identical hash codes or ones that differs only
+ * in masked-out high bits. So we use a secondary strategy that
+ * applies when the number of nodes in a bin exceeds a
+ * threshold. These TreeBins use a balanced tree to hold nodes (a
+ * specialized form of red-black trees), bounding search time to
+ * O(log N). Each search step in a TreeBin is at least twice as
+ * slow as in a regular list, but given that N cannot exceed
+ * (1<<64) (before running out of addresses) this bounds search
+ * steps, lock hold times, etc, to reasonable constants (roughly
+ * 100 nodes inspected per operation worst case) so long as keys
+ * are Comparable (which is very common -- String, Long, etc).
+ * TreeBin nodes (TreeNodes) also maintain the same "next"
+ * traversal pointers as regular nodes, so can be traversed in
+ * iterators in the same way.
+ *
+ * The table is resized when occupancy exceeds a percentage
+ * threshold (nominally, 0.75, but see below). Any thread
+ * noticing an overfull bin may assist in resizing after the
+ * initiating thread allocates and sets up the replacement
+ * array. However, rather than stalling, these other threads may
+ * proceed with insertions etc. The use of TreeBins shields us
+ * from the worst case effects of overfilling while resizes are in
+ * progress. Resizing proceeds by transferring bins, one by one,
+ * from the table to the next table. To enable concurrency, the
+ * next table must be (incrementally) prefilled with place-holders
+ * serving as reverse forwarders to the old table. Because we are
+ * using power-of-two expansion, the elements from each bin must
+ * either stay at same index, or move with a power of two
+ * offset. We eliminate unnecessary node creation by catching
+ * cases where old nodes can be reused because their next fields
+ * won't change. On average, only about one-sixth of them need
+ * cloning when a table doubles. The nodes they replace will be
+ * garbage collectable as soon as they are no longer referenced by
+ * any reader thread that may be in the midst of concurrently
+ * traversing table. Upon transfer, the old table bin contains
+ * only a special forwarding node (with hash field "MOVED") that
+ * contains the next table as its key. On encountering a
+ * forwarding node, access and update operations restart, using
+ * the new table.
+ *
+ * Each bin transfer requires its bin lock, which can stall
+ * waiting for locks while resizing. However, because other
+ * threads can join in and help resize rather than contend for
+ * locks, average aggregate waits become shorter as resizing
+ * progresses. The transfer operation must also ensure that all
+ * accessible bins in both the old and new table are usable by any
+ * traversal. This is arranged by proceeding from the last bin
+ * (table.length - 1) up towards the first. Upon seeing a
+ * forwarding node, traversals (see class Traverser) arrange to
+ * move to the new table without revisiting nodes. However, to
+ * ensure that no intervening nodes are skipped, bin splitting can
+ * only begin after the associated reverse-forwarders are in
+ * place.
+ *
+ * The traversal scheme also applies to partial traversals of
+ * ranges of bins (via an alternate Traverser constructor)
+ * to support partitioned aggregate operations. Also, read-only
+ * operations give up if ever forwarded to a null table, which
+ * provides support for shutdown-style clearing, which is also not
+ * currently implemented.
+ *
+ * Lazy table initialization minimizes footprint until first use,
+ * and also avoids resizings when the first operation is from a
+ * putAll, constructor with map argument, or deserialization.
+ * These cases attempt to override the initial capacity settings,
+ * but harmlessly fail to take effect in cases of races.
+ *
+ * The element count is maintained using a specialization of
+ * LongAdder. We need to incorporate a specialization rather than
+ * just use a LongAdder in order to access implicit
+ * contention-sensing that leads to creation of multiple
+ * CounterCells. The counter mechanics avoid contention on
+ * updates but can encounter cache thrashing if read too
+ * frequently during concurrent access. To avoid reading so often,
+ * resizing under contention is attempted only upon adding to a
+ * bin already holding two or more nodes. Under uniform hash
+ * distributions, the probability of this occurring at threshold
+ * is around 13%, meaning that only about 1 in 8 puts check
+ * threshold (and after resizing, many fewer do so).
+ *
+ * TreeBins use a special form of comparison for search and
+ * related operations (which is the main reason we cannot use
+ * existing collections such as TreeMaps). TreeBins contain
+ * Comparable elements, but may contain others, as well as
+ * elements that are Comparable but not necessarily Comparable
+ * for the same T, so we cannot invoke compareTo among them. To
+ * handle this, the tree is ordered primarily by hash value, then
+ * by Comparable.compareTo order if applicable. On lookup at a
+ * node, if elements are not comparable or compare as 0 then both
+ * left and right children may need to be searched in the case of
+ * tied hash values. (This corresponds to the full list search
+ * that would be necessary if all elements were non-Comparable and
+ * had tied hashes.) The red-black balancing code is updated from
+ * pre-jdk-collections
+ * (http://gee.cs.oswego.edu/dl/classes/collections/RBCell.java)
+ * based in turn on Cormen, Leiserson, and Rivest "Introduction to
+ * Algorithms" (CLR).
+ *
+ * TreeBins also require an additional locking mechanism. While
+ * list traversal is always possible by readers even during
+ * updates, tree traversal is not, mainly because of tree-rotations
+ * that may change the root node and/or its linkages. TreeBins
+ * include a simple read-write lock mechanism parasitic on the
+ * main bin-synchronization strategy: Structural adjustments
+ * associated with an insertion or removal are already bin-locked
+ * (and so cannot conflict with other writers) but must wait for
+ * ongoing readers to finish. Since there can be only one such
+ * waiter, we use a simple scheme using a single "waiter" field to
+ * block writers. However, readers need never block. If the root
+ * lock is held, they proceed along the slow traversal path (via
+ * next-pointers) until the lock becomes available or the list is
+ * exhausted, whichever comes first. These cases are not fast, but
+ * maximize aggregate expected throughput.
+ *
+ * Maintaining API and serialization compatibility with previous
+ * versions of this class introduces several oddities. Mainly: We
+ * leave untouched but unused constructor arguments refering to
+ * concurrencyLevel. We accept a loadFactor constructor argument,
+ * but apply it only to initial table capacity (which is the only
+ * time that we can guarantee to honor it.) We also declare an
+ * unused "Segment" class that is instantiated in minimal form
+ * only when serializing.
+ *
+ * This file is organized to make things a little easier to follow
+ * while reading than they might otherwise: First the main static
+ * declarations and utilities, then fields, then main public
+ * methods (with a few factorings of multiple public methods into
+ * internal ones), then sizing methods, trees, traversers, and
+ * bulk operations.
*/
/* ---------------- Constants -------------- */
/**
- * The default initial capacity for this table,
- * used when not otherwise specified in a constructor.
+ * The largest possible table capacity. This value must be
+ * exactly 1<<30 to stay within Java array allocation and indexing
+ * bounds for power of two table sizes, and is further required
+ * because the top two bits of 32bit hash fields are used for
+ * control purposes.
*/
- static final int DEFAULT_INITIAL_CAPACITY = 16;
+ private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
- * The default load factor for this table, used when not
- * otherwise specified in a constructor.
+ * The default initial table capacity. Must be a power of 2
+ * (i.e., at least 1) and at most MAXIMUM_CAPACITY.
*/
- static final float DEFAULT_LOAD_FACTOR = 0.75f;
+ private static final int DEFAULT_CAPACITY = 16;
/**
- * The default concurrency level for this table, used when not
- * otherwise specified in a constructor.
+ * The largest possible (non-power of two) array size.
+ * Needed by toArray and related methods.
*/
- static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+ static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
- * The maximum capacity, used if a higher value is implicitly
- * specified by either of the constructors with arguments. MUST
- * be a power of two <= 1<<30 to ensure that entries are indexable
- * using ints.
+ * The default concurrency level for this table. Unused but
+ * defined for compatibility with previous versions of this class.
*/
- static final int MAXIMUM_CAPACITY = 1 << 30;
+ private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
- * The minimum capacity for per-segment tables. Must be a power
- * of two, at least two to avoid immediate resizing on next use
- * after lazy construction.
+ * The load factor for this table. Overrides of this value in
+ * constructors affect only the initial table capacity. The
+ * actual floating point value isn't normally used -- it is
+ * simpler to use expressions such as {@code n - (n >>> 2)} for
+ * the associated resizing threshold.
*/
- static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
+ private static final float LOAD_FACTOR = 0.75f;
/**
- * The maximum number of segments to allow; used to bound
- * constructor arguments. Must be power of two less than 1 << 24.
+ * The bin count threshold for using a tree rather than list for a
+ * bin. Bins are converted to trees when adding an element to a
+ * bin with at least this many nodes. The value must be greater
+ * than 2, and should be at least 8 to mesh with assumptions in
+ * tree removal about conversion back to plain bins upon
+ * shrinkage.
*/
- static final int MAX_SEGMENTS = 1 << 16; // slightly conservative
+ static final int TREEIFY_THRESHOLD = 8;
/**
- * Number of unsynchronized retries in size and containsValue
- * methods before resorting to locking. This is used to avoid
- * unbounded retries if tables undergo continuous modification
- * which would make it impossible to obtain an accurate result.
+ * The bin count threshold for untreeifying a (split) bin during a
+ * resize operation. Should be less than TREEIFY_THRESHOLD, and at
+ * most 6 to mesh with shrinkage detection under removal.
*/
- static final int RETRIES_BEFORE_LOCK = 2;
-
- /* ---------------- Fields -------------- */
+ static final int UNTREEIFY_THRESHOLD = 6;
/**
- * Mask value for indexing into segments. The upper bits of a
- * key's hash code are used to choose the segment.
+ * The smallest table capacity for which bins may be treeified.
+ * (Otherwise the table is resized if too many nodes in a bin.)
+ * The value should be at least 4 * TREEIFY_THRESHOLD to avoid
+ * conflicts between resizing and treeification thresholds.
*/
- final int segmentMask;
+ static final int MIN_TREEIFY_CAPACITY = 64;
/**
- * Shift value for indexing within segments.
+ * Minimum number of rebinnings per transfer step. Ranges are
+ * subdivided to allow multiple resizer threads. This value
+ * serves as a lower bound to avoid resizers encountering
+ * excessive memory contention. The value should be at least
+ * DEFAULT_CAPACITY.
*/
- final int segmentShift;
+ private static final int MIN_TRANSFER_STRIDE = 16;
- /**
- * The segments, each of which is a specialized hash table.
+ /*
+ * Encodings for Node hash fields. See above for explanation.
*/
- final Segment<K,V>[] segments;
+ static final int MOVED = 0x8fffffff; // (-1) hash for forwarding nodes
+ static final int TREEBIN = 0x80000000; // hash for roots of trees
+ static final int RESERVED = 0x80000001; // hash for transient reservations
+ static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
+
+ /** Number of CPUS, to place bounds on some sizings */
+ static final int NCPU = Runtime.getRuntime().availableProcessors();
- transient Set<K> keySet;
- transient Set<Map.Entry<K,V>> entrySet;
- transient Collection<V> values;
+ /** For serialization compatibility. */
+ private static final ObjectStreamField[] serialPersistentFields = {
+ new ObjectStreamField("segments", Segment[].class),
+ new ObjectStreamField("segmentMask", Integer.TYPE),
+ new ObjectStreamField("segmentShift", Integer.TYPE)
+ };
+
+ /* ---------------- Nodes -------------- */
/**
- * ConcurrentHashMap list entry. Note that this is never exported
- * out as a user-visible Map.Entry.
+ * Key-value entry. This class is never exported out as a
+ * user-mutable Map.Entry (i.e., one supporting setValue; see
+ * MapEntry below), but can be used for read-only traversals used
+ * in bulk tasks. Subclasses of Node with a negative hash field
+ * are special, and contain null keys and values (but are never
+ * exported). Otherwise, keys and vals are never null.
*/
- static final class HashEntry<K,V> {
+ static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
- volatile V value;
- volatile HashEntry<K,V> next;
+ volatile V val;
+ Node<K,V> next;
- HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
+ Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
- this.value = value;
+ this.val = val;
this.next = next;
}
- /**
- * Sets next field with volatile write semantics. (See above
- * about use of putOrderedObject.)
- */
- final void setNext(HashEntry<K,V> n) {
- UNSAFE.putOrderedObject(this, nextOffset, n);
+ public final K getKey() { return key; }
+ public final V getValue() { return val; }
+ public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
+ public final String toString(){ return key + "=" + val; }
+ public final V setValue(V value) {
+ throw new UnsupportedOperationException();
}
- // Unsafe mechanics
- static final sun.misc.Unsafe UNSAFE;
- static final long nextOffset;
- static {
- try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> k = HashEntry.class;
- nextOffset = UNSAFE.objectFieldOffset
- (k.getDeclaredField("next"));
- } catch (Exception e) {
- throw new Error(e);
+ public final boolean equals(Object o) {
+ Object k, v, u; Map.Entry<?,?> e;
+ return ((o instanceof Map.Entry) &&
+ (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
+ (v = e.getValue()) != null &&
+ (k == key || k.equals(key)) &&
+ (v == (u = val) || v.equals(u)));
+ }
+
+ /**
+ * Virtualized support for map.get(); overridden in subclasses.
+ */
+ Node<K,V> find(int h, Object k) {
+ Node<K,V> e = this;
+ if (k != null) {
+ do {
+ K ek;
+ if (e.hash == h &&
+ ((ek = e.key) == k || (ek != null && k.equals(ek))))
+ return e;
+ } while ((e = e.next) != null);
}
+ return null;
}
}
+ /* ---------------- Static utilities -------------- */
+
/**
- * Gets the ith element of given table (if nonnull) with volatile
- * read semantics. Note: This is manually integrated into a few
- * performance-sensitive methods to reduce call overhead.
+ * Spreads (XORs) higher bits of hash to lower and also forces top
+ * bit to 0. Because the table uses power-of-two masking, sets of
+ * hashes that vary only in bits above the current mask will
+ * always collide. (Among known examples are sets of Float keys
+ * holding consecutive whole numbers in small tables.) So we
+ * apply a transform that spreads the impact of higher bits
+ * downward. There is a tradeoff between speed, utility, and
+ * quality of bit-spreading. Because many common sets of hashes
+ * are already reasonably distributed (so don't benefit from
+ * spreading), and because we use trees to handle large sets of
+ * collisions in bins, we just XOR some shifted bits in the
+ * cheapest possible way to reduce systematic lossage, as well as
+ * to incorporate impact of the highest bits that would otherwise
+ * never be used in index calculations because of table bounds.
*/
- @SuppressWarnings("unchecked")
- static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
- return (tab == null) ? null :
- (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)i << TSHIFT) + TBASE);
+ static final int spread(int h) {
+ return (h ^ (h >>> 16)) & HASH_BITS;
}
/**
- * Sets the ith element of given table, with volatile write
- * semantics. (See above about use of putOrderedObject.)
+ * Returns a power of two table size for the given desired capacity.
+ * See Hackers Delight, sec 3.2
*/
- static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
- HashEntry<K,V> e) {
- UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
+ private static final int tableSizeFor(int c) {
+ int n = c - 1;
+ n |= n >>> 1;
+ n |= n >>> 2;
+ n |= n >>> 4;
+ n |= n >>> 8;
+ n |= n >>> 16;
+ return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
+
/**
- * Applies a supplemental hash function to a given hashCode, which
- * defends against poor quality hash functions. This is critical
- * because ConcurrentHashMap uses power-of-two length hash tables,
- * that otherwise encounter collisions for hashCodes that do not
- * differ in lower or upper bits.
+ * Returns x's Class if it is of the form "class C implements
+ * Comparable<C>", else null.
*/
- private static int hash(int h) {
- // Spread bits to regularize both segment and index locations,
- // using variant of single-word Wang/Jenkins hash.
- h += (h << 15) ^ 0xffffcd7d;
- h ^= (h >>> 10);
- h += (h << 3);
- h ^= (h >>> 6);
- h += (h << 2) + (h << 14);
- return h ^ (h >>> 16);
+ static Class<?> comparableClassFor(Object x) {
+ if (x instanceof Comparable) {
+ Class<?> c; Type[] ts, as; Type t; ParameterizedType p;
+ if ((c = x.getClass()) == String.class) // bypass checks
+ return c;
+ if ((ts = c.getGenericInterfaces()) != null) {
+ for (int i = 0; i < ts.length; ++i) {
+ if (((t = ts[i]) instanceof ParameterizedType) &&
+ ((p = (ParameterizedType)t).getRawType() ==
+ Comparable.class) &&
+ (as = p.getActualTypeArguments()) != null &&
+ as.length == 1 && as[0] == c) // type arg is c
+ return c;
+ }
+ }
+ }
+ return null;
}
/**
- * Segments are specialized versions of hash tables. This
- * subclasses from ReentrantLock opportunistically, just to
- * simplify some locking and avoid separate construction.
+ * Returns k.compareTo(x) if x matches kc (k's screened comparable
+ * class), else 0.
*/
- static final class Segment<K,V> extends ReentrantLock implements Serializable {
- /*
- * Segments maintain a table of entry lists that are always
- * kept in a consistent state, so can be read (via volatile
- * reads of segments and tables) without locking. This
- * requires replicating nodes when necessary during table
- * resizing, so the old lists can be traversed by readers
- * still using old version of table.
- *
- * This class defines only mutative methods requiring locking.
- * Except as noted, the methods of this class perform the
- * per-segment versions of ConcurrentHashMap methods. (Other
- * methods are integrated directly into ConcurrentHashMap
- * methods.) These mutative methods use a form of controlled
- * spinning on contention via methods scanAndLock and
- * scanAndLockForPut. These intersperse tryLocks with
- * traversals to locate nodes. The main benefit is to absorb
- * cache misses (which are very common for hash tables) while
- * obtaining locks so that traversal is faster once
- * acquired. We do not actually use the found nodes since they
- * must be re-acquired under lock anyway to ensure sequential
- * consistency of updates (and in any case may be undetectably
- * stale), but they will normally be much faster to re-locate.
- * Also, scanAndLockForPut speculatively creates a fresh node
- * to use in put if no node is found.
- */
+ @SuppressWarnings({"rawtypes","unchecked"}) // for cast to Comparable
+ static int compareComparables(Class<?> kc, Object k, Object x) {
+ return (x == null || x.getClass() != kc ? 0 :
+ ((Comparable)k).compareTo(x));
+ }
- private static final long serialVersionUID = 2249069246763182397L;
+ /* ---------------- Table element access -------------- */
- /**
- * The maximum number of times to tryLock in a prescan before
- * possibly blocking on acquire in preparation for a locked
- * segment operation. On multiprocessors, using a bounded
- * number of retries maintains cache acquired while locating
- * nodes.
- */
- static final int MAX_SCAN_RETRIES =
- Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
+ /*
+ * Volatile access methods are used for table elements as well as
+ * elements of in-progress next table while resizing. All uses of
+ * the tab arguments must be null checked by callers. All callers
+ * also paranoically precheck that tab's length is not zero (or an
+ * equivalent check), thus ensuring that any index argument taking
+ * the form of a hash value anded with (length - 1) is a valid
+ * index. Note that, to be correct wrt arbitrary concurrency
+ * errors by users, these checks must operate on local variables,
+ * which accounts for some odd-looking inline assignments below.
+ * Note that calls to setTabAt always occur within locked regions,
+ * and so do not need full volatile semantics, but still require
+ * ordering to maintain concurrent readability.
+ */
- /**
- * The per-segment table. Elements are accessed via
- * entryAt/setEntryAt providing volatile semantics.
- */
- transient volatile HashEntry<K,V>[] table;
+ @SuppressWarnings("unchecked")
+ static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
+ return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
+ }
- /**
- * The number of elements. Accessed only either within locks
- * or among other volatile reads that maintain visibility.
- */
- transient int count;
+ static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
+ Node<K,V> c, Node<K,V> v) {
+ return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
+ }
- /**
- * The total number of mutative operations in this segment.
- * Even though this may overflows 32 bits, it provides
- * sufficient accuracy for stability checks in CHM isEmpty()
- * and size() methods. Accessed only either within locks or
- * among other volatile reads that maintain visibility.
- */
- transient int modCount;
+ static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
+ U.putOrderedObject(tab, ((long)i << ASHIFT) + ABASE, v);
+ }
- /**
- * The table is rehashed when its size exceeds this threshold.
- * (The value of this field is always <tt>(int)(capacity *
- * loadFactor)</tt>.)
- */
- transient int threshold;
+ /* ---------------- Fields -------------- */
- /**
- * The load factor for the hash table. Even though this value
- * is same for all segments, it is replicated to avoid needing
- * links to outer object.
- * @serial
- */
- final float loadFactor;
+ /**
+ * The array of bins. Lazily initialized upon first insertion.
+ * Size is always a power of two. Accessed directly by iterators.
+ */
+ transient volatile Node<K,V>[] table;
- Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
- this.loadFactor = lf;
- this.threshold = threshold;
- this.table = tab;
- }
+ /**
+ * The next table to use; non-null only while resizing.
+ */
+ private transient volatile Node<K,V>[] nextTable;
- final V put(K key, int hash, V value, boolean onlyIfAbsent) {
- HashEntry<K,V> node = tryLock() ? null :
- scanAndLockForPut(key, hash, value);
- V oldValue;
- try {
- HashEntry<K,V>[] tab = table;
- int index = (tab.length - 1) & hash;
- HashEntry<K,V> first = entryAt(tab, index);
- for (HashEntry<K,V> e = first;;) {
- if (e != null) {
- K k;
- if ((k = e.key) == key ||
- (e.hash == hash && key.equals(k))) {
- oldValue = e.value;
- if (!onlyIfAbsent) {
- e.value = value;
- ++modCount;
- }
- break;
- }
- e = e.next;
- }
- else {
- if (node != null)
- node.setNext(first);
- else
- node = new HashEntry<K,V>(hash, key, value, first);
- int c = count + 1;
- if (c > threshold && tab.length < MAXIMUM_CAPACITY)
- rehash(node);
- else
- setEntryAt(tab, index, node);
- ++modCount;
- count = c;
- oldValue = null;
- break;
- }
- }
- } finally {
- unlock();
- }
- return oldValue;
- }
+ /**
+ * Base counter value, used mainly when there is no contention,
+ * but also as a fallback during table initialization
+ * races. Updated via CAS.
+ */
+ private transient volatile long baseCount;
- /**
- * Doubles size of table and repacks entries, also adding the
- * given node to new table
- */
- @SuppressWarnings("unchecked")
- private void rehash(HashEntry<K,V> node) {
- /*
- * Reclassify nodes in each list to new table. Because we
- * are using power-of-two expansion, the elements from
- * each bin must either stay at same index, or move with a
- * power of two offset. We eliminate unnecessary node
- * creation by catching cases where old nodes can be
- * reused because their next fields won't change.
- * Statistically, at the default threshold, only about
- * one-sixth of them need cloning when a table
- * doubles. The nodes they replace will be garbage
- * collectable as soon as they are no longer referenced by
- * any reader thread that may be in the midst of
- * concurrently traversing table. Entry accesses use plain
- * array indexing because they are followed by volatile
- * table write.
- */
- HashEntry<K,V>[] oldTable = table;
- int oldCapacity = oldTable.length;
- int newCapacity = oldCapacity << 1;
- threshold = (int)(newCapacity * loadFactor);
- HashEntry<K,V>[] newTable =
- (HashEntry<K,V>[]) new HashEntry<?,?>[newCapacity];
- int sizeMask = newCapacity - 1;
- for (int i = 0; i < oldCapacity ; i++) {
- HashEntry<K,V> e = oldTable[i];
- if (e != null) {
- HashEntry<K,V> next = e.next;
- int idx = e.hash & sizeMask;
- if (next == null) // Single node on list
- newTable[idx] = e;
- else { // Reuse consecutive sequence at same slot
- HashEntry<K,V> lastRun = e;
- int lastIdx = idx;
- for (HashEntry<K,V> last = next;
- last != null;
- last = last.next) {
- int k = last.hash & sizeMask;
- if (k != lastIdx) {
- lastIdx = k;
- lastRun = last;
- }
- }
- newTable[lastIdx] = lastRun;
- // Clone remaining nodes
- for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
- V v = p.value;
- int h = p.hash;
- int k = h & sizeMask;
- HashEntry<K,V> n = newTable[k];
- newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
- }
- }
- }
- }
- int nodeIndex = node.hash & sizeMask; // add the new node
- node.setNext(newTable[nodeIndex]);
- newTable[nodeIndex] = node;
- table = newTable;
- }
+ /**
+ * Table initialization and resizing control. When negative, the
+ * table is being initialized or resized: -1 for initialization,
+ * else -(1 + the number of active resizing threads). Otherwise,
+ * when table is null, holds the initial table size to use upon
+ * creation, or 0 for default. After initialization, holds the
+ * next element count value upon which to resize the table.
+ */
+ private transient volatile int sizeCtl;
- /**
- * Scans for a node containing given key while trying to
- * acquire lock, creating and returning one if not found. Upon
- * return, guarantees that lock is held. Unlike in most
- * methods, calls to method equals are not screened: Since
- * traversal speed doesn't matter, we might as well help warm
- * up the associated code and accesses as well.
- *
- * @return a new node if key not found, else null
- */
- private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
- HashEntry<K,V> first = entryForHash(this, hash);
- HashEntry<K,V> e = first;
- HashEntry<K,V> node = null;
- int retries = -1; // negative while locating node
- while (!tryLock()) {
- HashEntry<K,V> f; // to recheck first below
- if (retries < 0) {
- if (e == null) {
- if (node == null) // speculatively create node
- node = new HashEntry<K,V>(hash, key, value, null);
- retries = 0;
- }
- else if (key.equals(e.key))
- retries = 0;
- else
- e = e.next;
- }
- else if (++retries > MAX_SCAN_RETRIES) {
- lock();
- break;
- }
- else if ((retries & 1) == 0 &&
- (f = entryForHash(this, hash)) != first) {
- e = first = f; // re-traverse if entry changed
- retries = -1;
- }
- }
- return node;
- }
+ /**
+ * The next table index (plus one) to split while resizing.
+ */
+ private transient volatile int transferIndex;
- /**
- * Scans for a node containing the given key while trying to
- * acquire lock for a remove or replace operation. Upon
- * return, guarantees that lock is held. Note that we must
- * lock even if the key is not found, to ensure sequential
- * consistency of updates.
- */
- private void scanAndLock(Object key, int hash) {
- // similar to but simpler than scanAndLockForPut
- HashEntry<K,V> first = entryForHash(this, hash);
- HashEntry<K,V> e = first;
- int retries = -1;
- while (!tryLock()) {
- HashEntry<K,V> f;
- if (retries < 0) {
- if (e == null || key.equals(e.key))
- retries = 0;
- else
- e = e.next;
- }
- else if (++retries > MAX_SCAN_RETRIES) {
- lock();
- break;
- }
- else if ((retries & 1) == 0 &&
- (f = entryForHash(this, hash)) != first) {
- e = first = f;
- retries = -1;
- }
- }
- }
+ /**
+ * The least available table index to split while resizing.
+ */
+ private transient volatile int transferOrigin;
- /**
- * Remove; match on key only if value null, else match both.
- */
- final V remove(Object key, int hash, Object value) {
- if (!tryLock())
- scanAndLock(key, hash);
- V oldValue = null;
- try {
- HashEntry<K,V>[] tab = table;
- int index = (tab.length - 1) & hash;
- HashEntry<K,V> e = entryAt(tab, index);
- HashEntry<K,V> pred = null;
- while (e != null) {
- K k;
- HashEntry<K,V> next = e.next;
- if ((k = e.key) == key ||
- (e.hash == hash && key.equals(k))) {
- V v = e.value;
- if (value == null || value == v || value.equals(v)) {
- if (pred == null)
- setEntryAt(tab, index, next);
- else
- pred.setNext(next);
- ++modCount;
- --count;
- oldValue = v;
- }
- break;
- }
- pred = e;
- e = next;
- }
- } finally {
- unlock();
- }
- return oldValue;
- }
+ /**
+ * Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
+ */
+ private transient volatile int cellsBusy;
- final boolean replace(K key, int hash, V oldValue, V newValue) {
- if (!tryLock())
- scanAndLock(key, hash);
- boolean replaced = false;
- try {
- HashEntry<K,V> e;
- for (e = entryForHash(this, hash); e != null; e = e.next) {
- K k;
- if ((k = e.key) == key ||
- (e.hash == hash && key.equals(k))) {
- if (oldValue.equals(e.value)) {
- e.value = newValue;
- ++modCount;
- replaced = true;
- }
- break;
- }
- }
- } finally {
- unlock();
- }
- return replaced;
- }
+ /**
+ * Table of counter cells. When non-null, size is a power of 2.
+ */
+ private transient volatile CounterCell[] counterCells;
- final V replace(K key, int hash, V value) {
- if (!tryLock())
- scanAndLock(key, hash);
- V oldValue = null;
- try {
- HashEntry<K,V> e;
- for (e = entryForHash(this, hash); e != null; e = e.next) {
- K k;
- if ((k = e.key) == key ||
- (e.hash == hash && key.equals(k))) {
- oldValue = e.value;
- e.value = value;
- ++modCount;
- break;
- }
- }
- } finally {
- unlock();
- }
- return oldValue;
- }
+ // views
+ private transient KeySetView<K,V> keySet;
+ private transient ValuesView<K,V> values;
+ private transient EntrySetView<K,V> entrySet;
- final void clear() {
- lock();
- try {
- HashEntry<K,V>[] tab = table;
- for (int i = 0; i < tab.length ; i++)
- setEntryAt(tab, i, null);
- ++modCount;
- count = 0;
- } finally {
- unlock();
- }
- }
- }
- // Accessing segments
+ /* ---------------- Public operations -------------- */
/**
- * Gets the jth element of given segment array (if nonnull) with
- * volatile element access semantics via Unsafe. (The null check
- * can trigger harmlessly only during deserialization.) Note:
- * because each element of segments array is set only once (using
- * fully ordered writes), some performance-sensitive methods rely
- * on this method only as a recheck upon null reads.
+ * Creates a new, empty map with the default initial table size (16).
*/
- @SuppressWarnings("unchecked")
- static final <K,V> Segment<K,V> segmentAt(Segment<K,V>[] ss, int j) {
- long u = (j << SSHIFT) + SBASE;
- return ss == null ? null :
- (Segment<K,V>) UNSAFE.getObjectVolatile(ss, u);
+ public ConcurrentHashMap() {
}
/**
- * Returns the segment for the given index, creating it and
- * recording in segment table (via CAS) if not already present.
+ * Creates a new, empty map with an initial table size
+ * accommodating the specified number of elements without the need
+ * to dynamically resize.
*
- * @param k the index
- * @return the segment
- */
- @SuppressWarnings("unchecked")
- private Segment<K,V> ensureSegment(int k) {
- final Segment<K,V>[] ss = this.segments;
- long u = (k << SSHIFT) + SBASE; // raw offset
- Segment<K,V> seg;
- if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
- Segment<K,V> proto = ss[0]; // use segment 0 as prototype
- int cap = proto.table.length;
- float lf = proto.loadFactor;
- int threshold = (int)(cap * lf);
- HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry<?,?>[cap];
- if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
- == null) { // recheck
- Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
- while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
- == null) {
- if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
- break;
- }
- }
- }
- return seg;
- }
-
- // Hash-based segment and entry accesses
-
- /**
- * Gets the segment for the given hash code.
+ * @param initialCapacity The implementation performs internal
+ * sizing to accommodate this many elements.
+ * @throws IllegalArgumentException if the initial capacity of
+ * elements is negative
*/
- @SuppressWarnings("unchecked")
- private Segment<K,V> segmentForHash(int h) {
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- return (Segment<K,V>) UNSAFE.getObjectVolatile(segments, u);
+ public ConcurrentHashMap(int initialCapacity) {
+ if (initialCapacity < 0)
+ throw new IllegalArgumentException();
+ int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
+ MAXIMUM_CAPACITY :
+ tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
+ this.sizeCtl = cap;
}
/**
- * Gets the table entry for the given segment and hash code.
+ * Creates a new map with the same mappings as the given map.
+ *
+ * @param m the map
*/
- @SuppressWarnings("unchecked")
- static final <K,V> HashEntry<K,V> entryForHash(Segment<K,V> seg, int h) {
- HashEntry<K,V>[] tab;
- return (seg == null || (tab = seg.table) == null) ? null :
- (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
+ public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
+ this.sizeCtl = DEFAULT_CAPACITY;
+ putAll(m);
}
- /* ---------------- Public operations -------------- */
-
/**
- * Creates a new, empty map with the specified initial
- * capacity, load factor and concurrency level.
+ * Creates a new, empty map with an initial table size based on
+ * the given number of elements ({@code initialCapacity}) and
+ * initial table density ({@code loadFactor}).
*
* @param initialCapacity the initial capacity. The implementation
- * performs internal sizing to accommodate this many elements.
- * @param loadFactor the load factor threshold, used to control resizing.
- * Resizing may be performed when the average number of elements per
- * bin exceeds this threshold.
- * @param concurrencyLevel the estimated number of concurrently
- * updating threads. The implementation performs internal sizing
- * to try to accommodate this many threads.
- * @throws IllegalArgumentException if the initial capacity is
- * negative or the load factor or concurrencyLevel are
- * nonpositive.
- */
- @SuppressWarnings("unchecked")
- public ConcurrentHashMap(int initialCapacity,
- float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
- throw new IllegalArgumentException();
- if (concurrencyLevel > MAX_SEGMENTS)
- concurrencyLevel = MAX_SEGMENTS;
- // Find power-of-two sizes best matching arguments
- int sshift = 0;
- int ssize = 1;
- while (ssize < concurrencyLevel) {
- ++sshift;
- ssize <<= 1;
- }
- this.segmentShift = 32 - sshift;
- this.segmentMask = ssize - 1;
- if (initialCapacity > MAXIMUM_CAPACITY)
- initialCapacity = MAXIMUM_CAPACITY;
- int c = initialCapacity / ssize;
- if (c * ssize < initialCapacity)
- ++c;
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- while (cap < c)
- cap <<= 1;
- // create segments and segments[0]
- Segment<K,V> s0 =
- new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
- (HashEntry<K,V>[])new HashEntry<?,?>[cap]);
- Segment<K,V>[] ss = (Segment<K,V>[])new Segment<?,?>[ssize];
- UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
- this.segments = ss;
- }
-
- /**
- * Creates a new, empty map with the specified initial capacity
- * and load factor and with the default concurrencyLevel (16).
- *
- * @param initialCapacity The implementation performs internal
- * sizing to accommodate this many elements.
- * @param loadFactor the load factor threshold, used to control resizing.
- * Resizing may be performed when the average number of elements per
- * bin exceeds this threshold.
+ * performs internal sizing to accommodate this many elements,
+ * given the specified load factor.
+ * @param loadFactor the load factor (table density) for
+ * establishing the initial table size
* @throws IllegalArgumentException if the initial capacity of
* elements is negative or the load factor is nonpositive
*
* @since 1.6
*/
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
- this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
+ this(initialCapacity, loadFactor, 1);
}
/**
- * Creates a new, empty map with the specified initial capacity,
- * and with default load factor (0.75) and concurrencyLevel (16).
+ * Creates a new, empty map with an initial table size based on
+ * the given number of elements ({@code initialCapacity}), table
+ * density ({@code loadFactor}), and number of concurrently
+ * updating threads ({@code concurrencyLevel}).
*
* @param initialCapacity the initial capacity. The implementation
- * performs internal sizing to accommodate this many elements.
- * @throws IllegalArgumentException if the initial capacity of
- * elements is negative.
+ * performs internal sizing to accommodate this many elements,
+ * given the specified load factor.
+ * @param loadFactor the load factor (table density) for
+ * establishing the initial table size
+ * @param concurrencyLevel the estimated number of concurrently
+ * updating threads. The implementation may use this value as
+ * a sizing hint.
+ * @throws IllegalArgumentException if the initial capacity is
+ * negative or the load factor or concurrencyLevel are
+ * nonpositive
*/
- public ConcurrentHashMap(int initialCapacity) {
- this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
+ public ConcurrentHashMap(int initialCapacity,
+ float loadFactor, int concurrencyLevel) {
+ if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
+ throw new IllegalArgumentException();
+ if (initialCapacity < concurrencyLevel) // Use at least as many bins
+ initialCapacity = concurrencyLevel; // as estimated threads
+ long size = (long)(1.0 + (long)initialCapacity / loadFactor);
+ int cap = (size >= (long)MAXIMUM_CAPACITY) ?
+ MAXIMUM_CAPACITY : tableSizeFor((int)size);
+ this.sizeCtl = cap;
}
- /**
- * Creates a new, empty map with a default initial capacity (16),
- * load factor (0.75) and concurrencyLevel (16).
- */
- public ConcurrentHashMap() {
- this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
- }
+ // Original (since JDK1.2) Map methods
/**
- * Creates a new map with the same mappings as the given map.
- * The map is created with a capacity of 1.5 times the number
- * of mappings in the given map or 16 (whichever is greater),
- * and a default load factor (0.75) and concurrencyLevel (16).
- *
- * @param m the map
+ * {@inheritDoc}
*/
- public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
- this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
- DEFAULT_INITIAL_CAPACITY),
- DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
- putAll(m);
+ public int size() {
+ long n = sumCount();
+ return ((n < 0L) ? 0 :
+ (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
+ (int)n);
}
/**
- * Returns <tt>true</tt> if this map contains no key-value mappings.
- *
- * @return <tt>true</tt> if this map contains no key-value mappings
+ * {@inheritDoc}
*/
public boolean isEmpty() {
- /*
- * Sum per-segment modCounts to avoid mis-reporting when
- * elements are concurrently added and removed in one segment
- * while checking another, in which case the table was never
- * actually empty at any point. (The sum ensures accuracy up
- * through at least 1<<31 per-segment modifications before
- * recheck.) Methods size() and containsValue() use similar
- * constructions for stability checks.
- */
- long sum = 0L;
- final Segment<K,V>[] segments = this.segments;
- for (int j = 0; j < segments.length; ++j) {
- Segment<K,V> seg = segmentAt(segments, j);
- if (seg != null) {
- if (seg.count != 0)
- return false;
- sum += seg.modCount;
- }
- }
- if (sum != 0L) { // recheck unless no modifications
- for (int j = 0; j < segments.length; ++j) {
- Segment<K,V> seg = segmentAt(segments, j);
- if (seg != null) {
- if (seg.count != 0)
- return false;
- sum -= seg.modCount;
- }
- }
- if (sum != 0L)
- return false;
- }
- return true;
- }
-
- /**
- * Returns the number of key-value mappings in this map. If the
- * map contains more than <tt>Integer.MAX_VALUE</tt> elements, returns
- * <tt>Integer.MAX_VALUE</tt>.
- *
- * @return the number of key-value mappings in this map
- */
- public int size() {
- // Try a few times to get accurate count. On failure due to
- // continuous async changes in table, resort to locking.
- final Segment<K,V>[] segments = this.segments;
- final int segmentCount = segments.length;
-
- long previousSum = 0L;
- for (int retries = -1; retries < RETRIES_BEFORE_LOCK; retries++) {
- long sum = 0L; // sum of modCounts
- long size = 0L;
- for (int i = 0; i < segmentCount; i++) {
- Segment<K,V> segment = segmentAt(segments, i);
- if (segment != null) {
- sum += segment.modCount;
- size += segment.count;
- }
- }
- if (sum == previousSum)
- return ((size >>> 31) == 0) ? (int) size : Integer.MAX_VALUE;
- previousSum = sum;
- }
-
- long size = 0L;
- for (int i = 0; i < segmentCount; i++) {
- Segment<K,V> segment = ensureSegment(i);
- segment.lock();
- size += segment.count;
- }
- for (int i = 0; i < segmentCount; i++)
- segments[i].unlock();
- return ((size >>> 31) == 0) ? (int) size : Integer.MAX_VALUE;
+ return sumCount() <= 0L; // ignore transient negative values
}
/**
@@ -878,18 +742,20 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
- Segment<K,V> s; // manually integrate access methods to reduce overhead
- HashEntry<K,V>[] tab;
- int h = hash(key.hashCode());
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
- (tab = s.table) != null) {
- for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
- e != null; e = e.next) {
- K k;
- if ((k = e.key) == key || (e.hash == h && key.equals(k)))
- return e.value;
+ Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
+ int h = spread(key.hashCode());
+ if ((tab = table) != null && (n = tab.length) > 0 &&
+ (e = tabAt(tab, (n - 1) & h)) != null) {
+ if ((eh = e.hash) == h) {
+ if ((ek = e.key) == key || (ek != null && key.equals(ek)))
+ return e.val;
+ }
+ else if (eh < 0)
+ return (p = e.find(h, key)) != null ? p.val : null;
+ while ((e = e.next) != null) {
+ if (e.hash == h &&
+ ((ek = e.key) == key || (ek != null && key.equals(ek))))
+ return e.val;
}
}
return null;
@@ -898,149 +764,121 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
/**
* Tests if the specified object is a key in this table.
*
- * @param key possible key
- * @return <tt>true</tt> if and only if the specified object
+ * @param key possible key
+ * @return {@code true} if and only if the specified object
* is a key in this table, as determined by the
- * <tt>equals</tt> method; <tt>false</tt> otherwise.
+ * {@code equals} method; {@code false} otherwise
* @throws NullPointerException if the specified key is null
*/
- @SuppressWarnings("unchecked")
public boolean containsKey(Object key) {
- Segment<K,V> s; // same as get() except no need for volatile value read
- HashEntry<K,V>[] tab;
- int h = hash(key.hashCode());
- long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
- if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
- (tab = s.table) != null) {
- for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
- (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
- e != null; e = e.next) {
- K k;
- if ((k = e.key) == key || (e.hash == h && key.equals(k)))
- return true;
- }
- }
- return false;
+ return get(key) != null;
}
/**
- * Returns <tt>true</tt> if this map maps one or more keys to the
- * specified value. Note: This method requires a full internal
- * traversal of the hash table, and so is much slower than
- * method <tt>containsKey</tt>.
+ * Returns {@code true} if this map maps one or more keys to the
+ * specified value. Note: This method may require a full traversal
+ * of the map, and is much slower than method {@code containsKey}.
*
* @param value value whose presence in this map is to be tested
- * @return <tt>true</tt> if this map maps one or more keys to the
+ * @return {@code true} if this map maps one or more keys to the
* specified value
* @throws NullPointerException if the specified value is null
*/
public boolean containsValue(Object value) {
- // Same idea as size()
if (value == null)
throw new NullPointerException();
- final Segment<K,V>[] segments = this.segments;
- long previousSum = 0L;
- int lockCount = 0;
- try {
- for (int retries = -1; ; retries++) {
- long sum = 0L; // sum of modCounts
- for (int j = 0; j < segments.length; j++) {
- Segment<K,V> segment;
- if (retries == RETRIES_BEFORE_LOCK) {
- segment = ensureSegment(j);
- segment.lock();
- lockCount++;
- } else {
- segment = segmentAt(segments, j);
- if (segment == null)
- continue;
- }
- HashEntry<K,V>[] tab = segment.table;
- if (tab != null) {
- for (int i = 0 ; i < tab.length; i++) {
- HashEntry<K,V> e;
- for (e = entryAt(tab, i); e != null; e = e.next) {
- V v = e.value;
- if (v != null && value.equals(v))
- return true;
- }
- }
- sum += segment.modCount;
- }
- }
- if ((retries >= 0 && sum == previousSum) || lockCount > 0)
- return false;
- previousSum = sum;
+ Node<K,V>[] t;
+ if ((t = table) != null) {
+ Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
+ for (Node<K,V> p; (p = it.advance()) != null; ) {
+ V v;
+ if ((v = p.val) == value || (v != null && value.equals(v)))
+ return true;
}
- } finally {
- for (int j = 0; j < lockCount; j++)
- segments[j].unlock();
}
- }
-
- /**
- * Legacy method testing if some key maps into the specified value
- * in this table. This method is identical in functionality to
- * {@link #containsValue}, and exists solely to ensure
- * full compatibility with class {@link java.util.Hashtable},
- * which supported this method prior to introduction of the
- * Java Collections framework.
- *
- * @param value a value to search for
- * @return <tt>true</tt> if and only if some key maps to the
- * <tt>value</tt> argument in this table as
- * determined by the <tt>equals</tt> method;
- * <tt>false</tt> otherwise
- * @throws NullPointerException if the specified value is null
- */
- public boolean contains(Object value) {
- return containsValue(value);
+ return false;
}
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
- * <p> The value can be retrieved by calling the <tt>get</tt> method
+ * <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
- * @return the previous value associated with <tt>key</tt>, or
- * <tt>null</tt> if there was no mapping for <tt>key</tt>
+ * @return the previous value associated with {@code key}, or
+ * {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
- @SuppressWarnings("unchecked")
public V put(K key, V value) {
- Segment<K,V> s;
- if (value == null)
- throw new NullPointerException();
- int hash = hash(key.hashCode());
- int j = (hash >>> segmentShift) & segmentMask;
- if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
- (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
- s = ensureSegment(j);
- return s.put(key, hash, value, false);
+ return putVal(key, value, false);
}
- /**
- * {@inheritDoc}
- *
- * @return the previous value associated with the specified key,
- * or <tt>null</tt> if there was no mapping for the key
- * @throws NullPointerException if the specified key or value is null
- */
- @SuppressWarnings("unchecked")
- public V putIfAbsent(K key, V value) {
- Segment<K,V> s;
- if (value == null)
- throw new NullPointerException();
- int hash = hash(key.hashCode());
- int j = (hash >>> segmentShift) & segmentMask;
- if ((s = (Segment<K,V>)UNSAFE.getObject
- (segments, (j << SSHIFT) + SBASE)) == null)
- s = ensureSegment(j);
- return s.put(key, hash, value, true);
+ /** Implementation for put and putIfAbsent */
+ final V putVal(K key, V value, boolean onlyIfAbsent) {
+ if (key == null || value == null) throw new NullPointerException();
+ int hash = spread(key.hashCode());
+ int binCount = 0;
+ for (Node<K,V>[] tab = table;;) {
+ Node<K,V> f; int n, i, fh;
+ if (tab == null || (n = tab.length) == 0)
+ tab = initTable();
+ else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
+ if (casTabAt(tab, i, null,
+ new Node<K,V>(hash, key, value, null)))
+ break; // no lock when adding to empty bin
+ }
+ else if ((fh = f.hash) == MOVED)
+ tab = helpTransfer(tab, f);
+ else {
+ V oldVal = null;
+ synchronized (f) {
+ if (tabAt(tab, i) == f) {
+ if (fh >= 0) {
+ binCount = 1;
+ for (Node<K,V> e = f;; ++binCount) {
+ K ek;
+ if (e.hash == hash &&
+ ((ek = e.key) == key ||
+ (ek != null && key.equals(ek)))) {
+ oldVal = e.val;
+ if (!onlyIfAbsent)
+ e.val = value;
+ break;
+ }
+ Node<K,V> pred = e;
+ if ((e = e.next) == null) {
+ pred.next = new Node<K,V>(hash, key,
+ value, null);
+ break;
+ }
+ }
+ }
+ else if (f instanceof TreeBin) {
+ Node<K,V> p;
+ binCount = 2;
+ if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
+ value)) != null) {
+ oldVal = p.val;
+ if (!onlyIfAbsent)
+ p.val = value;
+ }
+ }
+ }
+ }
+ if (binCount != 0) {
+ if (binCount >= TREEIFY_THRESHOLD)
+ treeifyBin(tab, i);
+ if (oldVal != null)
+ return oldVal;
+ break;
+ }
+ }
+ }
+ addCount(1L, binCount);
+ return null;
}
/**
@@ -1051,8 +889,9 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* @param m mappings to be stored in this map
*/
public void putAll(Map<? extends K, ? extends V> m) {
+ tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
- put(e.getKey(), e.getValue());
+ putVal(e.getKey(), e.getValue(), false);
}
/**
@@ -1060,87 +899,147 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* This method does nothing if the key is not in the map.
*
* @param key the key that needs to be removed
- * @return the previous value associated with <tt>key</tt>, or
- * <tt>null</tt> if there was no mapping for <tt>key</tt>
+ * @return the previous value associated with {@code key}, or
+ * {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key is null
*/
public V remove(Object key) {
- int hash = hash(key.hashCode());
- Segment<K,V> s = segmentForHash(hash);
- return s == null ? null : s.remove(key, hash, null);
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NullPointerException if the specified key is null
- */
- public boolean remove(Object key, Object value) {
- int hash = hash(key.hashCode());
- Segment<K,V> s;
- return value != null && (s = segmentForHash(hash)) != null &&
- s.remove(key, hash, value) != null;
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NullPointerException if any of the arguments are null
- */
- public boolean replace(K key, V oldValue, V newValue) {
- int hash = hash(key.hashCode());
- if (oldValue == null || newValue == null)
- throw new NullPointerException();
- Segment<K,V> s = segmentForHash(hash);
- return s != null && s.replace(key, hash, oldValue, newValue);
+ return replaceNode(key, null, null);
}
/**
- * {@inheritDoc}
- *
- * @return the previous value associated with the specified key,
- * or <tt>null</tt> if there was no mapping for the key
- * @throws NullPointerException if the specified key or value is null
+ * Implementation for the four public remove/replace methods:
+ * Replaces node value with v, conditional upon match of cv if
+ * non-null. If resulting value is null, delete.
*/
- public V replace(K key, V value) {
- int hash = hash(key.hashCode());
- if (value == null)
- throw new NullPointerException();
- Segment<K,V> s = segmentForHash(hash);
- return s == null ? null : s.replace(key, hash, value);
+ final V replaceNode(Object key, V value, Object cv) {
+ int hash = spread(key.hashCode());
+ for (Node<K,V>[] tab = table;;) {
+ Node<K,V> f; int n, i, fh;
+ if (tab == null || (n = tab.length) == 0 ||
+ (f = tabAt(tab, i = (n - 1) & hash)) == null)
+ break;
+ else if ((fh = f.hash) == MOVED)
+ tab = helpTransfer(tab, f);
+ else {
+ V oldVal = null;
+ boolean validated = false;
+ synchronized (f) {
+ if (tabAt(tab, i) == f) {
+ if (fh >= 0) {
+ validated = true;
+ for (Node<K,V> e = f, pred = null;;) {
+ K ek;
+ if (e.hash == hash &&
+ ((ek = e.key) == key ||
+ (ek != null && key.equals(ek)))) {
+ V ev = e.val;
+ if (cv == null || cv == ev ||
+ (ev != null && cv.equals(ev))) {
+ oldVal = ev;
+ if (value != null)
+ e.val = value;
+ else if (pred != null)
+ pred.next = e.next;
+ else
+ setTabAt(tab, i, e.next);
+ }
+ break;
+ }
+ pred = e;
+ if ((e = e.next) == null)
+ break;
+ }
+ }
+ else if (f instanceof TreeBin) {
+ validated = true;
+ TreeBin<K,V> t = (TreeBin<K,V>)f;
+ TreeNode<K,V> r, p;
+ if ((r = t.root) != null &&
+ (p = r.findTreeNode(hash, key, null)) != null) {
+ V pv = p.val;
+ if (cv == null || cv == pv ||
+ (pv != null && cv.equals(pv))) {
+ oldVal = pv;
+ if (value != null)
+ p.val = value;
+ else if (t.removeTreeNode(p))
+ setTabAt(tab, i, untreeify(t.first));
+ }
+ }
+ }
+ }
+ }
+ if (validated) {
+ if (oldVal != null) {
+ if (value == null)
+ addCount(-1L, -1);
+ return oldVal;
+ }
+ break;
+ }
+ }
+ }
+ return null;
}
/**
* Removes all of the mappings from this map.
*/
public void clear() {
- final Segment<K,V>[] segments = this.segments;
- for (int j = 0; j < segments.length; ++j) {
- Segment<K,V> s = segmentAt(segments, j);
- if (s != null)
- s.clear();
+ long delta = 0L; // negative number of deletions
+ int i = 0;
+ Node<K,V>[] tab = table;
+ while (tab != null && i < tab.length) {
+ int fh;
+ Node<K,V> f = tabAt(tab, i);
+ if (f == null)
+ ++i;
+ else if ((fh = f.hash) == MOVED) {
+ tab = helpTransfer(tab, f);
+ i = 0; // restart
+ }
+ else {
+ synchronized (f) {
+ if (tabAt(tab, i) == f) {
+ Node<K,V> p = (fh >= 0 ? f :
+ (f instanceof TreeBin) ?
+ ((TreeBin<K,V>)f).first : null);
+ while (p != null) {
+ --delta;
+ p = p.next;
+ }
+ setTabAt(tab, i++, null);
+ }
+ }
+ }
}
+ if (delta != 0L)
+ addCount(delta, -1);
}
/**
* Returns a {@link Set} view of the keys contained in this map.
* The set is backed by the map, so changes to the map are
- * reflected in the set, and vice-versa. The set supports element
+ * reflected in the set, and vice-versa. The set supports element
* removal, which removes the corresponding mapping from this map,
- * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>,
- * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt>
- * operations. It does not support the <tt>add</tt> or
- * <tt>addAll</tt> operations.
+ * via the {@code Iterator.remove}, {@code Set.remove},
+ * {@code removeAll}, {@code retainAll}, and {@code clear}
+ * operations. It does not support the {@code add} or
+ * {@code addAll} operations.
*
- * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator
+ * <p>The view's {@code iterator} is a "weakly consistent" iterator
* that will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
+ *
+ * @return the set view
+ *
*/
public Set<K> keySet() {
- Set<K> ks = keySet;
- return (ks != null) ? ks : (keySet = new KeySet());
+ KeySetView<K,V> ks;
+ return (ks = keySet) != null ? ks : (keySet = new KeySetView<K,V>(this, null));
}
/**
@@ -1148,20 +1047,22 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* The collection is backed by the map, so changes to the map are
* reflected in the collection, and vice-versa. The collection
* supports element removal, which removes the corresponding
- * mapping from this map, via the <tt>Iterator.remove</tt>,
- * <tt>Collection.remove</tt>, <tt>removeAll</tt>,
- * <tt>retainAll</tt>, and <tt>clear</tt> operations. It does not
- * support the <tt>add</tt> or <tt>addAll</tt> operations.
+ * mapping from this map, via the {@code Iterator.remove},
+ * {@code Collection.remove}, {@code removeAll},
+ * {@code retainAll}, and {@code clear} operations. It does not
+ * support the {@code add} or {@code addAll} operations.
*
- * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator
+ * <p>The view's {@code iterator} is a "weakly consistent" iterator
* that will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
+ *
+ * @return the collection view
*/
public Collection<V> values() {
- Collection<V> vs = values;
- return (vs != null) ? vs : (values = new Values());
+ ValuesView<K,V> vs;
+ return (vs = values) != null ? vs : (values = new ValuesView<K,V>(this));
}
/**
@@ -1169,20 +1070,329 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* The set is backed by the map, so changes to the map are
* reflected in the set, and vice-versa. The set supports element
* removal, which removes the corresponding mapping from the map,
- * via the <tt>Iterator.remove</tt>, <tt>Set.remove</tt>,
- * <tt>removeAll</tt>, <tt>retainAll</tt>, and <tt>clear</tt>
- * operations. It does not support the <tt>add</tt> or
- * <tt>addAll</tt> operations.
+ * via the {@code Iterator.remove}, {@code Set.remove},
+ * {@code removeAll}, {@code retainAll}, and {@code clear}
+ * operations.
*
- * <p>The view's <tt>iterator</tt> is a "weakly consistent" iterator
+ * <p>The view's {@code iterator} is a "weakly consistent" iterator
* that will never throw {@link ConcurrentModificationException},
* and guarantees to traverse elements as they existed upon
* construction of the iterator, and may (but is not guaranteed to)
* reflect any modifications subsequent to construction.
+ *
+ * @return the set view
*/
public Set<Map.Entry<K,V>> entrySet() {
- Set<Map.Entry<K,V>> es = entrySet;
- return (es != null) ? es : (entrySet = new EntrySet());
+ EntrySetView<K,V> es;
+ return (es = entrySet) != null ? es : (entrySet = new EntrySetView<K,V>(this));
+ }
+
+ /**
+ * Returns the hash code value for this {@link Map}, i.e.,
+ * the sum of, for each key-value pair in the map,
+ * {@code key.hashCode() ^ value.hashCode()}.
+ *
+ * @return the hash code value for this map
+ */
+ public int hashCode() {
+ int h = 0;
+ Node<K,V>[] t;
+ if ((t = table) != null) {
+ Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
+ for (Node<K,V> p; (p = it.advance()) != null; )
+ h += p.key.hashCode() ^ p.val.hashCode();
+ }
+ return h;
+ }
+
+ /**
+ * Returns a string representation of this map. The string
+ * representation consists of a list of key-value mappings (in no
+ * particular order) enclosed in braces ("{@code {}}"). Adjacent
+ * mappings are separated by the characters {@code ", "} (comma
+ * and space). Each key-value mapping is rendered as the key
+ * followed by an equals sign ("{@code =}") followed by the
+ * associated value.
+ *
+ * @return a string representation of this map
+ */
+ public String toString() {
+ Node<K,V>[] t;
+ int f = (t = table) == null ? 0 : t.length;
+ Traverser<K,V> it = new Traverser<K,V>(t, f, 0, f);
+ StringBuilder sb = new StringBuilder();
+ sb.append('{');
+ Node<K,V> p;
+ if ((p = it.advance()) != null) {
+ for (;;) {
+ K k = p.key;
+ V v = p.val;
+ sb.append(k == this ? "(this Map)" : k);
+ sb.append('=');
+ sb.append(v == this ? "(this Map)" : v);
+ if ((p = it.advance()) == null)
+ break;
+ sb.append(',').append(' ');
+ }
+ }
+ return sb.append('}').toString();
+ }
+
+ /**
+ * Compares the specified object with this map for equality.
+ * Returns {@code true} if the given object is a map with the same
+ * mappings as this map. This operation may return misleading
+ * results if either map is concurrently modified during execution
+ * of this method.
+ *
+ * @param o object to be compared for equality with this map
+ * @return {@code true} if the specified object is equal to this map
+ */
+ public boolean equals(Object o) {
+ if (o != this) {
+ if (!(o instanceof Map))
+ return false;
+ Map<?,?> m = (Map<?,?>) o;
+ Node<K,V>[] t;
+ int f = (t = table) == null ? 0 : t.length;
+ Traverser<K,V> it = new Traverser<K,V>(t, f, 0, f);
+ for (Node<K,V> p; (p = it.advance()) != null; ) {
+ V val = p.val;
+ Object v = m.get(p.key);
+ if (v == null || (v != val && !v.equals(val)))
+ return false;
+ }
+ for (Map.Entry<?,?> e : m.entrySet()) {
+ Object mk, mv, v;
+ if ((mk = e.getKey()) == null ||
+ (mv = e.getValue()) == null ||
+ (v = get(mk)) == null ||
+ (mv != v && !mv.equals(v)))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Stripped-down version of helper class used in previous version,
+ * declared for the sake of serialization compatibility
+ */
+ static class Segment<K,V> extends ReentrantLock implements Serializable {
+ private static final long serialVersionUID = 2249069246763182397L;
+ final float loadFactor;
+ Segment(float lf) { this.loadFactor = lf; }
+ }
+
+ /**
+ * Saves the state of the {@code ConcurrentHashMap} instance to a
+ * stream (i.e., serializes it).
+ * @param s the stream
+ * @serialData
+ * the key (Object) and value (Object)
+ * for each key-value mapping, followed by a null pair.
+ * The key-value mappings are emitted in no particular order.
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+ // For serialization compatibility
+ // Emulate segment calculation from previous version of this class
+ int sshift = 0;
+ int ssize = 1;
+ while (ssize < DEFAULT_CONCURRENCY_LEVEL) {
+ ++sshift;
+ ssize <<= 1;
+ }
+ int segmentShift = 32 - sshift;
+ int segmentMask = ssize - 1;
+ @SuppressWarnings("unchecked") Segment<K,V>[] segments = (Segment<K,V>[])
+ new Segment<?,?>[DEFAULT_CONCURRENCY_LEVEL];
+ for (int i = 0; i < segments.length; ++i)
+ segments[i] = new Segment<K,V>(LOAD_FACTOR);
+ s.putFields().put("segments", segments);
+ s.putFields().put("segmentShift", segmentShift);
+ s.putFields().put("segmentMask", segmentMask);
+ s.writeFields();
+
+ Node<K,V>[] t;
+ if ((t = table) != null) {
+ Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
+ for (Node<K,V> p; (p = it.advance()) != null; ) {
+ s.writeObject(p.key);
+ s.writeObject(p.val);
+ }
+ }
+ s.writeObject(null);
+ s.writeObject(null);
+ segments = null; // throw away
+ }
+
+ /**
+ * Reconstitutes the instance from a stream (that is, deserializes it).
+ * @param s the stream
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ /*
+ * To improve performance in typical cases, we create nodes
+ * while reading, then place in table once size is known.
+ * However, we must also validate uniqueness and deal with
+ * overpopulated bins while doing so, which requires
+ * specialized versions of putVal mechanics.
+ */
+ sizeCtl = -1; // force exclusion for table construction
+ s.defaultReadObject();
+ long size = 0L;
+ Node<K,V> p = null;
+ for (;;) {
+ @SuppressWarnings("unchecked") K k = (K) s.readObject();
+ @SuppressWarnings("unchecked") V v = (V) s.readObject();
+ if (k != null && v != null) {
+ p = new Node<K,V>(spread(k.hashCode()), k, v, p);
+ ++size;
+ }
+ else
+ break;
+ }
+ if (size == 0L)
+ sizeCtl = 0;
+ else {
+ int n;
+ if (size >= (long)(MAXIMUM_CAPACITY >>> 1))
+ n = MAXIMUM_CAPACITY;
+ else {
+ int sz = (int)size;
+ n = tableSizeFor(sz + (sz >>> 1) + 1);
+ }
+ @SuppressWarnings({"rawtypes","unchecked"})
+ Node<K,V>[] tab = (Node<K,V>[])new Node[n];
+ int mask = n - 1;
+ long added = 0L;
+ while (p != null) {
+ boolean insertAtFront;
+ Node<K,V> next = p.next, first;
+ int h = p.hash, j = h & mask;
+ if ((first = tabAt(tab, j)) == null)
+ insertAtFront = true;
+ else {
+ K k = p.key;
+ if (first.hash < 0) {
+ TreeBin<K,V> t = (TreeBin<K,V>)first;
+ if (t.putTreeVal(h, k, p.val) == null)
+ ++added;
+ insertAtFront = false;
+ }
+ else {
+ int binCount = 0;
+ insertAtFront = true;
+ Node<K,V> q; K qk;
+ for (q = first; q != null; q = q.next) {
+ if (q.hash == h &&
+ ((qk = q.key) == k ||
+ (qk != null && k.equals(qk)))) {
+ insertAtFront = false;
+ break;
+ }
+ ++binCount;
+ }
+ if (insertAtFront && binCount >= TREEIFY_THRESHOLD) {
+ insertAtFront = false;
+ ++added;
+ p.next = first;
+ TreeNode<K,V> hd = null, tl = null;
+ for (q = p; q != null; q = q.next) {
+ TreeNode<K,V> t = new TreeNode<K,V>
+ (q.hash, q.key, q.val, null, null);
+ if ((t.prev = tl) == null)
+ hd = t;
+ else
+ tl.next = t;
+ tl = t;
+ }
+ setTabAt(tab, j, new TreeBin<K,V>(hd));
+ }
+ }
+ }
+ if (insertAtFront) {
+ ++added;
+ p.next = first;
+ setTabAt(tab, j, p);
+ }
+ p = next;
+ }
+ table = tab;
+ sizeCtl = n - (n >>> 2);
+ baseCount = added;
+ }
+ }
+
+ // ConcurrentMap methods
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the previous value associated with the specified key,
+ * or {@code null} if there was no mapping for the key
+ * @throws NullPointerException if the specified key or value is null
+ */
+ public V putIfAbsent(K key, V value) {
+ return putVal(key, value, true);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws NullPointerException if the specified key is null
+ */
+ public boolean remove(Object key, Object value) {
+ if (key == null)
+ throw new NullPointerException();
+ return value != null && replaceNode(key, null, value) != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @throws NullPointerException if any of the arguments are null
+ */
+ public boolean replace(K key, V oldValue, V newValue) {
+ if (key == null || oldValue == null || newValue == null)
+ throw new NullPointerException();
+ return replaceNode(key, newValue, oldValue) != null;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * @return the previous value associated with the specified key,
+ * or {@code null} if there was no mapping for the key
+ * @throws NullPointerException if the specified key or value is null
+ */
+ public V replace(K key, V value) {
+ if (key == null || value == null)
+ throw new NullPointerException();
+ return replaceNode(key, value, null);
+ }
+ // Hashtable legacy methods
+
+ /**
+ * Legacy method testing if some key maps into the specified value
+ * in this table. This method is identical in functionality to
+ * {@link #containsValue(Object)}, and exists solely to ensure
+ * full compatibility with class {@link java.util.Hashtable}.
+ *
+ * @param value a value to search for
+ * @return {@code true} if and only if some key maps to the
+ * {@code value} argument in this table as
+ * determined by the {@code equals} method;
+ * {@code false} otherwise
+ * @throws NullPointerException if the specified value is null
+ */
+ public boolean contains(Object value) {
+ // BEGIN android-note
+ // removed deprecation
+ // END android-note
+ return containsValue(value);
}
/**
@@ -1192,7 +1402,9 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* @see #keySet()
*/
public Enumeration<K> keys() {
- return new KeyIterator();
+ Node<K,V>[] t;
+ int f = (t = table) == null ? 0 : t.length;
+ return new KeyIterator<K,V>(t, f, 0, f, this);
}
/**
@@ -1202,281 +1414,1787 @@ public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
* @see #values()
*/
public Enumeration<V> elements() {
- return new ValueIterator();
+ Node<K,V>[] t;
+ int f = (t = table) == null ? 0 : t.length;
+ return new ValueIterator<K,V>(t, f, 0, f, this);
}
- /* ---------------- Iterator Support -------------- */
+ // ConcurrentHashMap-only methods
- abstract class HashIterator {
- int nextSegmentIndex;
- int nextTableIndex;
- HashEntry<K,V>[] currentTable;
- HashEntry<K, V> nextEntry;
- HashEntry<K, V> lastReturned;
+ /**
+ * Returns the number of mappings. This method should be used
+ * instead of {@link #size} because a ConcurrentHashMap may
+ * contain more mappings than can be represented as an int. The
+ * value returned is an estimate; the actual count may differ if
+ * there are concurrent insertions or removals.
+ *
+ * @return the number of mappings
+ * @since 1.8
+ *
+ * @hide
+ */
+ public long mappingCount() {
+ long n = sumCount();
+ return (n < 0L) ? 0L : n; // ignore transient negative values
+ }
- HashIterator() {
- nextSegmentIndex = segments.length - 1;
- nextTableIndex = -1;
- advance();
+ /**
+ * Creates a new {@link Set} backed by a ConcurrentHashMap
+ * from the given type to {@code Boolean.TRUE}.
+ *
+ * @return the new set
+ * @since 1.8
+ *
+ * @hide
+ */
+ public static <K> KeySetView<K,Boolean> newKeySet() {
+ return new KeySetView<K,Boolean>
+ (new ConcurrentHashMap<K,Boolean>(), Boolean.TRUE);
+ }
+
+ /**
+ * Creates a new {@link Set} backed by a ConcurrentHashMap
+ * from the given type to {@code Boolean.TRUE}.
+ *
+ * @param initialCapacity The implementation performs internal
+ * sizing to accommodate this many elements.
+ * @throws IllegalArgumentException if the initial capacity of
+ * elements is negative
+ * @return the new set
+ * @since 1.8
+ *
+ * @hide
+ */
+ public static <K> KeySetView<K,Boolean> newKeySet(int initialCapacity) {
+ return new KeySetView<K,Boolean>
+ (new ConcurrentHashMap<K,Boolean>(initialCapacity), Boolean.TRUE);
+ }
+
+ /**
+ * Returns a {@link Set} view of the keys in this map, using the
+ * given common mapped value for any additions (i.e., {@link
+ * Collection#add} and {@link Collection#addAll(Collection)}).
+ * This is of course only appropriate if it is acceptable to use
+ * the same value for all additions from this view.
+ *
+ * @param mappedValue the mapped value to use for any additions
+ * @return the set view
+ * @throws NullPointerException if the mappedValue is null
+ *
+ * @hide
+ */
+ public Set<K> keySet(V mappedValue) {
+ if (mappedValue == null)
+ throw new NullPointerException();
+ return new KeySetView<K,V>(this, mappedValue);
+ }
+
+ /* ---------------- Special Nodes -------------- */
+
+ /**
+ * A node inserted at head of bins during transfer operations.
+ */
+ static final class ForwardingNode<K,V> extends Node<K,V> {
+ final Node<K,V>[] nextTable;
+ ForwardingNode(Node<K,V>[] tab) {
+ super(MOVED, null, null, null);
+ this.nextTable = tab;
+ }
+
+ Node<K,V> find(int h, Object k) {
+ Node<K,V> e; int n;
+ Node<K,V>[] tab = nextTable;
+ if (k != null && tab != null && (n = tab.length) > 0 &&
+ (e = tabAt(tab, (n - 1) & h)) != null) {
+ do {
+ int eh; K ek;
+ if ((eh = e.hash) == h &&
+ ((ek = e.key) == k || (ek != null && k.equals(ek))))
+ return e;
+ if (eh < 0)
+ return e.find(h, k);
+ } while ((e = e.next) != null);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * A place-holder node used in computeIfAbsent and compute
+ */
+ static final class ReservationNode<K,V> extends Node<K,V> {
+ ReservationNode() {
+ super(RESERVED, null, null, null);
+ }
+
+ Node<K,V> find(int h, Object k) {
+ return null;
+ }
+ }
+
+ /* ---------------- Table Initialization and Resizing -------------- */
+
+ /**
+ * Initializes table, using the size recorded in sizeCtl.
+ */
+ private final Node<K,V>[] initTable() {
+ Node<K,V>[] tab; int sc;
+ while ((tab = table) == null || tab.length == 0) {
+ if ((sc = sizeCtl) < 0)
+ Thread.yield(); // lost initialization race; just spin
+ else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
+ try {
+ if ((tab = table) == null || tab.length == 0) {
+ int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
+ @SuppressWarnings({"rawtypes","unchecked"})
+ Node<K,V>[] nt = (Node<K,V>[])new Node[n];
+ table = tab = nt;
+ sc = n - (n >>> 2);
+ }
+ } finally {
+ sizeCtl = sc;
+ }
+ break;
+ }
+ }
+ return tab;
+ }
+
+ /**
+ * Adds to count, and if table is too small and not already
+ * resizing, initiates transfer. If already resizing, helps
+ * perform transfer if work is available. Rechecks occupancy
+ * after a transfer to see if another resize is already needed
+ * because resizings are lagging additions.
+ *
+ * @param x the count to add
+ * @param check if <0, don't check resize, if <= 1 only check if uncontended
+ */
+ private final void addCount(long x, int check) {
+ CounterCell[] as; long b, s;
+ if ((as = counterCells) != null ||
+ !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
+ CounterHashCode hc; CounterCell a; long v; int m;
+ boolean uncontended = true;
+ if ((hc = threadCounterHashCode.get()) == null ||
+ as == null || (m = as.length - 1) < 0 ||
+ (a = as[m & hc.code]) == null ||
+ !(uncontended =
+ U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
+ fullAddCount(x, hc, uncontended);
+ return;
+ }
+ if (check <= 1)
+ return;
+ s = sumCount();
+ }
+ if (check >= 0) {
+ Node<K,V>[] tab, nt; int sc;
+ while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
+ tab.length < MAXIMUM_CAPACITY) {
+ if (sc < 0) {
+ if (sc == -1 || transferIndex <= transferOrigin ||
+ (nt = nextTable) == null)
+ break;
+ if (U.compareAndSwapInt(this, SIZECTL, sc, sc - 1))
+ transfer(tab, nt);
+ }
+ else if (U.compareAndSwapInt(this, SIZECTL, sc, -2))
+ transfer(tab, null);
+ s = sumCount();
+ }
+ }
+ }
+
+ /**
+ * Helps transfer if a resize is in progress.
+ */
+ final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
+ Node<K,V>[] nextTab; int sc;
+ if ((f instanceof ForwardingNode) &&
+ (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
+ if (nextTab == nextTable && tab == table &&
+ transferIndex > transferOrigin && (sc = sizeCtl) < -1 &&
+ U.compareAndSwapInt(this, SIZECTL, sc, sc - 1))
+ transfer(tab, nextTab);
+ return nextTab;
+ }
+ return table;
+ }
+
+ /**
+ * Tries to presize table to accommodate the given number of elements.
+ *
+ * @param size number of elements (doesn't need to be perfectly accurate)
+ */
+ private final void tryPresize(int size) {
+ int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
+ tableSizeFor(size + (size >>> 1) + 1);
+ int sc;
+ while ((sc = sizeCtl) >= 0) {
+ Node<K,V>[] tab = table; int n;
+ if (tab == null || (n = tab.length) == 0) {
+ n = (sc > c) ? sc : c;
+ if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
+ try {
+ if (table == tab) {
+ @SuppressWarnings({"rawtypes","unchecked"})
+ Node<K,V>[] nt = (Node<K,V>[])new Node[n];
+ table = nt;
+ sc = n - (n >>> 2);
+ }
+ } finally {
+ sizeCtl = sc;
+ }
+ }
+ }
+ else if (c <= sc || n >= MAXIMUM_CAPACITY)
+ break;
+ else if (tab == table &&
+ U.compareAndSwapInt(this, SIZECTL, sc, -2))
+ transfer(tab, null);
+ }
+ }
+
+ /**
+ * Moves and/or copies the nodes in each bin to new table. See
+ * above for explanation.
+ */
+ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
+ int n = tab.length, stride;
+ if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
+ stride = MIN_TRANSFER_STRIDE; // subdivide range
+ if (nextTab == null) { // initiating
+ try {
+ @SuppressWarnings({"rawtypes","unchecked"})
+ Node<K,V>[] nt = (Node<K,V>[])new Node[n << 1];
+ nextTab = nt;
+ } catch (Throwable ex) { // try to cope with OOME
+ sizeCtl = Integer.MAX_VALUE;
+ return;
+ }
+ nextTable = nextTab;
+ transferOrigin = n;
+ transferIndex = n;
+ ForwardingNode<K,V> rev = new ForwardingNode<K,V>(tab);
+ for (int k = n; k > 0;) { // progressively reveal ready slots
+ int nextk = (k > stride) ? k - stride : 0;
+ for (int m = nextk; m < k; ++m)
+ nextTab[m] = rev;
+ for (int m = n + nextk; m < n + k; ++m)
+ nextTab[m] = rev;
+ U.putOrderedInt(this, TRANSFERORIGIN, k = nextk);
+ }
+ }
+ int nextn = nextTab.length;
+ ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
+ boolean advance = true;
+ for (int i = 0, bound = 0;;) {
+ int nextIndex, nextBound, fh; Node<K,V> f;
+ while (advance) {
+ if (--i >= bound)
+ advance = false;
+ else if ((nextIndex = transferIndex) <= transferOrigin) {
+ i = -1;
+ advance = false;
+ }
+ else if (U.compareAndSwapInt
+ (this, TRANSFERINDEX, nextIndex,
+ nextBound = (nextIndex > stride ?
+ nextIndex - stride : 0))) {
+ bound = nextBound;
+ i = nextIndex - 1;
+ advance = false;
+ }
+ }
+ if (i < 0 || i >= n || i + n >= nextn) {
+ for (int sc;;) {
+ if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, ++sc)) {
+ if (sc == -1) {
+ nextTable = null;
+ table = nextTab;
+ sizeCtl = (n << 1) - (n >>> 1);
+ }
+ return;
+ }
+ }
+ }
+ else if ((f = tabAt(tab, i)) == null) {
+ if (casTabAt(tab, i, null, fwd)) {
+ setTabAt(nextTab, i, null);
+ setTabAt(nextTab, i + n, null);
+ advance = true;
+ }
+ }
+ else if ((fh = f.hash) == MOVED)
+ advance = true; // already processed
+ else {
+ synchronized (f) {
+ if (tabAt(tab, i) == f) {
+ Node<K,V> ln, hn;
+ if (fh >= 0) {
+ int runBit = fh & n;
+ Node<K,V> lastRun = f;
+ for (Node<K,V> p = f.next; p != null; p = p.next) {
+ int b = p.hash & n;
+ if (b != runBit) {
+ runBit = b;
+ lastRun = p;
+ }
+ }
+ if (runBit == 0) {
+ ln = lastRun;
+ hn = null;
+ }
+ else {
+ hn = lastRun;
+ ln = null;
+ }
+ for (Node<K,V> p = f; p != lastRun; p = p.next) {
+ int ph = p.hash; K pk = p.key; V pv = p.val;
+ if ((ph & n) == 0)
+ ln = new Node<K,V>(ph, pk, pv, ln);
+ else
+ hn = new Node<K,V>(ph, pk, pv, hn);
+ }
+ }
+ else if (f instanceof TreeBin) {
+ TreeBin<K,V> t = (TreeBin<K,V>)f;
+ TreeNode<K,V> lo = null, loTail = null;
+ TreeNode<K,V> hi = null, hiTail = null;
+ int lc = 0, hc = 0;
+ for (Node<K,V> e = t.first; e != null; e = e.next) {
+ int h = e.hash;
+ TreeNode<K,V> p = new TreeNode<K,V>
+ (h, e.key, e.val, null, null);
+ if ((h & n) == 0) {
+ if ((p.prev = loTail) == null)
+ lo = p;
+ else
+ loTail.next = p;
+ loTail = p;
+ ++lc;
+ }
+ else {
+ if ((p.prev = hiTail) == null)
+ hi = p;
+ else
+ hiTail.next = p;
+ hiTail = p;
+ ++hc;
+ }
+ }
+ ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
+ (hc != 0) ? new TreeBin<K,V>(lo) : t;
+ hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
+ (lc != 0) ? new TreeBin<K,V>(hi) : t;
+ }
+ else
+ ln = hn = null;
+ setTabAt(nextTab, i, ln);
+ setTabAt(nextTab, i + n, hn);
+ setTabAt(tab, i, fwd);
+ advance = true;
+ }
+ }
+ }
+ }
+ }
+
+ /* ---------------- Conversion from/to TreeBins -------------- */
+
+ /**
+ * Replaces all linked nodes in bin at given index unless table is
+ * too small, in which case resizes instead.
+ */
+ private final void treeifyBin(Node<K,V>[] tab, int index) {
+ Node<K,V> b; int n, sc;
+ if (tab != null) {
+ if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
+ if (tab == table && (sc = sizeCtl) >= 0 &&
+ U.compareAndSwapInt(this, SIZECTL, sc, -2))
+ transfer(tab, null);
+ }
+ else if ((b = tabAt(tab, index)) != null) {
+ synchronized (b) {
+ if (tabAt(tab, index) == b) {
+ TreeNode<K,V> hd = null, tl = null;
+ for (Node<K,V> e = b; e != null; e = e.next) {
+ TreeNode<K,V> p =
+ new TreeNode<K,V>(e.hash, e.key, e.val,
+ null, null);
+ if ((p.prev = tl) == null)
+ hd = p;
+ else
+ tl.next = p;
+ tl = p;
+ }
+ setTabAt(tab, index, new TreeBin<K,V>(hd));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns a list on non-TreeNodes replacing those in given list.
+ */
+ static <K,V> Node<K,V> untreeify(Node<K,V> b) {
+ Node<K,V> hd = null, tl = null;
+ for (Node<K,V> q = b; q != null; q = q.next) {
+ Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
+ if (tl == null)
+ hd = p;
+ else
+ tl.next = p;
+ tl = p;
+ }
+ return hd;
+ }
+
+ /* ---------------- TreeNodes -------------- */
+
+ /**
+ * Nodes for use in TreeBins
+ */
+ static final class TreeNode<K,V> extends Node<K,V> {
+ TreeNode<K,V> parent; // red-black tree links
+ TreeNode<K,V> left;
+ TreeNode<K,V> right;
+ TreeNode<K,V> prev; // needed to unlink next upon deletion
+ boolean red;
+
+ TreeNode(int hash, K key, V val, Node<K,V> next,
+ TreeNode<K,V> parent) {
+ super(hash, key, val, next);
+ this.parent = parent;
+ }
+
+ Node<K,V> find(int h, Object k) {
+ return findTreeNode(h, k, null);
}
/**
- * Sets nextEntry to first node of next non-empty table
- * (in backwards order, to simplify checks).
+ * Returns the TreeNode (or null if not found) for the given key
+ * starting at given root.
*/
- final void advance() {
- for (;;) {
- if (nextTableIndex >= 0) {
- if ((nextEntry = entryAt(currentTable,
- nextTableIndex--)) != null)
+ final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
+ if (k != null) {
+ TreeNode<K,V> p = this;
+ do {
+ int ph, dir; K pk; TreeNode<K,V> q;
+ TreeNode<K,V> pl = p.left, pr = p.right;
+ if ((ph = p.hash) > h)
+ p = pl;
+ else if (ph < h)
+ p = pr;
+ else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
+ return p;
+ else if (pl == null && pr == null)
break;
+ else if ((kc != null ||
+ (kc = comparableClassFor(k)) != null) &&
+ (dir = compareComparables(kc, k, pk)) != 0)
+ p = (dir < 0) ? pl : pr;
+ else if (pl == null)
+ p = pr;
+ else if (pr == null ||
+ (q = pr.findTreeNode(h, k, kc)) == null)
+ p = pl;
+ else
+ return q;
+ } while (p != null);
+ }
+ return null;
+ }
+ }
+
+ /* ---------------- TreeBins -------------- */
+
+ /**
+ * TreeNodes used at the heads of bins. TreeBins do not hold user
+ * keys or values, but instead point to list of TreeNodes and
+ * their root. They also maintain a parasitic read-write lock
+ * forcing writers (who hold bin lock) to wait for readers (who do
+ * not) to complete before tree restructuring operations.
+ */
+ static final class TreeBin<K,V> extends Node<K,V> {
+ TreeNode<K,V> root;
+ volatile TreeNode<K,V> first;
+ volatile Thread waiter;
+ volatile int lockState;
+ // values for lockState
+ static final int WRITER = 1; // set while holding write lock
+ static final int WAITER = 2; // set when waiting for write lock
+ static final int READER = 4; // increment value for setting read lock
+
+ /**
+ * Creates bin with initial set of nodes headed by b.
+ */
+ TreeBin(TreeNode<K,V> b) {
+ super(TREEBIN, null, null, null);
+ this.first = b;
+ TreeNode<K,V> r = null;
+ for (TreeNode<K,V> x = b, next; x != null; x = next) {
+ next = (TreeNode<K,V>)x.next;
+ x.left = x.right = null;
+ if (r == null) {
+ x.parent = null;
+ x.red = false;
+ r = x;
}
- else if (nextSegmentIndex >= 0) {
- Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
- if (seg != null && (currentTable = seg.table) != null)
- nextTableIndex = currentTable.length - 1;
+ else {
+ Object key = x.key;
+ int hash = x.hash;
+ Class<?> kc = null;
+ for (TreeNode<K,V> p = r;;) {
+ int dir, ph;
+ if ((ph = p.hash) > hash)
+ dir = -1;
+ else if (ph < hash)
+ dir = 1;
+ else if ((kc != null ||
+ (kc = comparableClassFor(key)) != null))
+ dir = compareComparables(kc, key, p.key);
+ else
+ dir = 0;
+ TreeNode<K,V> xp = p;
+ if ((p = (dir <= 0) ? p.left : p.right) == null) {
+ x.parent = xp;
+ if (dir <= 0)
+ xp.left = x;
+ else
+ xp.right = x;
+ r = balanceInsertion(r, x);
+ break;
+ }
+ }
}
- else
+ }
+ this.root = r;
+ }
+
+ /**
+ * Acquires write lock for tree restructuring.
+ */
+ private final void lockRoot() {
+ if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
+ contendedLock(); // offload to separate method
+ }
+
+ /**
+ * Releases write lock for tree restructuring.
+ */
+ private final void unlockRoot() {
+ lockState = 0;
+ }
+
+ /**
+ * Possibly blocks awaiting root lock.
+ */
+ private final void contendedLock() {
+ boolean waiting = false;
+ for (int s;;) {
+ if (((s = lockState) & WRITER) == 0) {
+ if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
+ if (waiting)
+ waiter = null;
+ return;
+ }
+ }
+ else if ((s | WAITER) == 0) {
+ if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
+ waiting = true;
+ waiter = Thread.currentThread();
+ }
+ }
+ else if (waiting)
+ LockSupport.park(this);
+ }
+ }
+
+ /**
+ * Returns matching node or null if none. Tries to search
+ * using tree comparisons from root, but continues linear
+ * search when lock not available.
+ */
+ final Node<K,V> find(int h, Object k) {
+ if (k != null) {
+ for (Node<K,V> e = first; e != null; e = e.next) {
+ int s; K ek;
+ if (((s = lockState) & (WAITER|WRITER)) != 0) {
+ if (e.hash == h &&
+ ((ek = e.key) == k || (ek != null && k.equals(ek))))
+ return e;
+ }
+ else if (U.compareAndSwapInt(this, LOCKSTATE, s,
+ s + READER)) {
+ TreeNode<K,V> r, p;
+ try {
+ p = ((r = root) == null ? null :
+ r.findTreeNode(h, k, null));
+ } finally {
+
+ Thread w;
+ int ls;
+ do {} while (!U.compareAndSwapInt
+ (this, LOCKSTATE,
+ ls = lockState, ls - READER));
+ if (ls == (READER|WAITER) && (w = waiter) != null)
+ LockSupport.unpark(w);
+ }
+ return p;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Finds or adds a node.
+ * @return null if added
+ */
+ final TreeNode<K,V> putTreeVal(int h, K k, V v) {
+ Class<?> kc = null;
+ for (TreeNode<K,V> p = root;;) {
+ int dir, ph; K pk; TreeNode<K,V> q, pr;
+ if (p == null) {
+ first = root = new TreeNode<K,V>(h, k, v, null, null);
+ break;
+ }
+ else if ((ph = p.hash) > h)
+ dir = -1;
+ else if (ph < h)
+ dir = 1;
+ else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
+ return p;
+ else if ((kc == null &&
+ (kc = comparableClassFor(k)) == null) ||
+ (dir = compareComparables(kc, k, pk)) == 0) {
+ if (p.left == null)
+ dir = 1;
+ else if ((pr = p.right) == null ||
+ (q = pr.findTreeNode(h, k, kc)) == null)
+ dir = -1;
+ else
+ return q;
+ }
+ TreeNode<K,V> xp = p;
+ if ((p = (dir < 0) ? p.left : p.right) == null) {
+ TreeNode<K,V> x, f = first;
+ first = x = new TreeNode<K,V>(h, k, v, f, xp);
+ if (f != null)
+ f.prev = x;
+ if (dir < 0)
+ xp.left = x;
+ else
+ xp.right = x;
+ if (!xp.red)
+ x.red = true;
+ else {
+ lockRoot();
+ try {
+ root = balanceInsertion(root, x);
+ } finally {
+ unlockRoot();
+ }
+ }
break;
+ }
}
+ assert checkInvariants(root);
+ return null;
}
- final HashEntry<K,V> nextEntry() {
- HashEntry<K,V> e = nextEntry;
- if (e == null)
- throw new NoSuchElementException();
- lastReturned = e; // cannot assign until after null check
- if ((nextEntry = e.next) == null)
- advance();
- return e;
+ /**
+ * Removes the given node, that must be present before this
+ * call. This is messier than typical red-black deletion code
+ * because we cannot swap the contents of an interior node
+ * with a leaf successor that is pinned by "next" pointers
+ * that are accessible independently of lock. So instead we
+ * swap the tree linkages.
+ *
+ * @return true if now too small, so should be untreeified
+ */
+ final boolean removeTreeNode(TreeNode<K,V> p) {
+ TreeNode<K,V> next = (TreeNode<K,V>)p.next;
+ TreeNode<K,V> pred = p.prev; // unlink traversal pointers
+ TreeNode<K,V> r, rl;
+ if (pred == null)
+ first = next;
+ else
+ pred.next = next;
+ if (next != null)
+ next.prev = pred;
+ if (first == null) {
+ root = null;
+ return true;
+ }
+ if ((r = root) == null || r.right == null || // too small
+ (rl = r.left) == null || rl.left == null)
+ return true;
+ lockRoot();
+ try {
+ TreeNode<K,V> replacement;
+ TreeNode<K,V> pl = p.left;
+ TreeNode<K,V> pr = p.right;
+ if (pl != null && pr != null) {
+ TreeNode<K,V> s = pr, sl;
+ while ((sl = s.left) != null) // find successor
+ s = sl;
+ boolean c = s.red; s.red = p.red; p.red = c; // swap colors
+ TreeNode<K,V> sr = s.right;
+ TreeNode<K,V> pp = p.parent;
+ if (s == pr) { // p was s's direct parent
+ p.parent = s;
+ s.right = p;
+ }
+ else {
+ TreeNode<K,V> sp = s.parent;
+ if ((p.parent = sp) != null) {
+ if (s == sp.left)
+ sp.left = p;
+ else
+ sp.right = p;
+ }
+ if ((s.right = pr) != null)
+ pr.parent = s;
+ }
+ p.left = null;
+ if ((p.right = sr) != null)
+ sr.parent = p;
+ if ((s.left = pl) != null)
+ pl.parent = s;
+ if ((s.parent = pp) == null)
+ r = s;
+ else if (p == pp.left)
+ pp.left = s;
+ else
+ pp.right = s;
+ if (sr != null)
+ replacement = sr;
+ else
+ replacement = p;
+ }
+ else if (pl != null)
+ replacement = pl;
+ else if (pr != null)
+ replacement = pr;
+ else
+ replacement = p;
+ if (replacement != p) {
+ TreeNode<K,V> pp = replacement.parent = p.parent;
+ if (pp == null)
+ r = replacement;
+ else if (p == pp.left)
+ pp.left = replacement;
+ else
+ pp.right = replacement;
+ p.left = p.right = p.parent = null;
+ }
+
+ root = (p.red) ? r : balanceDeletion(r, replacement);
+
+ if (p == replacement) { // detach pointers
+ TreeNode<K,V> pp;
+ if ((pp = p.parent) != null) {
+ if (p == pp.left)
+ pp.left = null;
+ else if (p == pp.right)
+ pp.right = null;
+ p.parent = null;
+ }
+ }
+ } finally {
+ unlockRoot();
+ }
+ assert checkInvariants(root);
+ return false;
+ }
+
+ /* ------------------------------------------------------------ */
+ // Red-black tree methods, all adapted from CLR
+
+ static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root,
+ TreeNode<K,V> p) {
+ TreeNode<K,V> r, pp, rl;
+ if (p != null && (r = p.right) != null) {
+ if ((rl = p.right = r.left) != null)
+ rl.parent = p;
+ if ((pp = r.parent = p.parent) == null)
+ (root = r).red = false;
+ else if (pp.left == p)
+ pp.left = r;
+ else
+ pp.right = r;
+ r.left = p;
+ p.parent = r;
+ }
+ return root;
+ }
+
+ static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root,
+ TreeNode<K,V> p) {
+ TreeNode<K,V> l, pp, lr;
+ if (p != null && (l = p.left) != null) {
+ if ((lr = p.left = l.right) != null)
+ lr.parent = p;
+ if ((pp = l.parent = p.parent) == null)
+ (root = l).red = false;
+ else if (pp.right == p)
+ pp.right = l;
+ else
+ pp.left = l;
+ l.right = p;
+ p.parent = l;
+ }
+ return root;
}
- public final boolean hasNext() { return nextEntry != null; }
- public final boolean hasMoreElements() { return nextEntry != null; }
+ static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root,
+ TreeNode<K,V> x) {
+ x.red = true;
+ for (TreeNode<K,V> xp, xpp, xppl, xppr;;) {
+ if ((xp = x.parent) == null) {
+ x.red = false;
+ return x;
+ }
+ else if (!xp.red || (xpp = xp.parent) == null)
+ return root;
+ if (xp == (xppl = xpp.left)) {
+ if ((xppr = xpp.right) != null && xppr.red) {
+ xppr.red = false;
+ xp.red = false;
+ xpp.red = true;
+ x = xpp;
+ }
+ else {
+ if (x == xp.right) {
+ root = rotateLeft(root, x = xp);
+ xpp = (xp = x.parent) == null ? null : xp.parent;
+ }
+ if (xp != null) {
+ xp.red = false;
+ if (xpp != null) {
+ xpp.red = true;
+ root = rotateRight(root, xpp);
+ }
+ }
+ }
+ }
+ else {
+ if (xppl != null && xppl.red) {
+ xppl.red = false;
+ xp.red = false;
+ xpp.red = true;
+ x = xpp;
+ }
+ else {
+ if (x == xp.left) {
+ root = rotateRight(root, x = xp);
+ xpp = (xp = x.parent) == null ? null : xp.parent;
+ }
+ if (xp != null) {
+ xp.red = false;
+ if (xpp != null) {
+ xpp.red = true;
+ root = rotateLeft(root, xpp);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root,
+ TreeNode<K,V> x) {
+ for (TreeNode<K,V> xp, xpl, xpr;;) {
+ if (x == null || x == root)
+ return root;
+ else if ((xp = x.parent) == null) {
+ x.red = false;
+ return x;
+ }
+ else if (x.red) {
+ x.red = false;
+ return root;
+ }
+ else if ((xpl = xp.left) == x) {
+ if ((xpr = xp.right) != null && xpr.red) {
+ xpr.red = false;
+ xp.red = true;
+ root = rotateLeft(root, xp);
+ xpr = (xp = x.parent) == null ? null : xp.right;
+ }
+ if (xpr == null)
+ x = xp;
+ else {
+ TreeNode<K,V> sl = xpr.left, sr = xpr.right;
+ if ((sr == null || !sr.red) &&
+ (sl == null || !sl.red)) {
+ xpr.red = true;
+ x = xp;
+ }
+ else {
+ if (sr == null || !sr.red) {
+ if (sl != null)
+ sl.red = false;
+ xpr.red = true;
+ root = rotateRight(root, xpr);
+ xpr = (xp = x.parent) == null ?
+ null : xp.right;
+ }
+ if (xpr != null) {
+ xpr.red = (xp == null) ? false : xp.red;
+ if ((sr = xpr.right) != null)
+ sr.red = false;
+ }
+ if (xp != null) {
+ xp.red = false;
+ root = rotateLeft(root, xp);
+ }
+ x = root;
+ }
+ }
+ }
+ else { // symmetric
+ if (xpl != null && xpl.red) {
+ xpl.red = false;
+ xp.red = true;
+ root = rotateRight(root, xp);
+ xpl = (xp = x.parent) == null ? null : xp.left;
+ }
+ if (xpl == null)
+ x = xp;
+ else {
+ TreeNode<K,V> sl = xpl.left, sr = xpl.right;
+ if ((sl == null || !sl.red) &&
+ (sr == null || !sr.red)) {
+ xpl.red = true;
+ x = xp;
+ }
+ else {
+ if (sl == null || !sl.red) {
+ if (sr != null)
+ sr.red = false;
+ xpl.red = true;
+ root = rotateLeft(root, xpl);
+ xpl = (xp = x.parent) == null ?
+ null : xp.left;
+ }
+ if (xpl != null) {
+ xpl.red = (xp == null) ? false : xp.red;
+ if ((sl = xpl.left) != null)
+ sl.red = false;
+ }
+ if (xp != null) {
+ xp.red = false;
+ root = rotateRight(root, xp);
+ }
+ x = root;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Recursive invariant check
+ */
+ static <K,V> boolean checkInvariants(TreeNode<K,V> t) {
+ TreeNode<K,V> tp = t.parent, tl = t.left, tr = t.right,
+ tb = t.prev, tn = (TreeNode<K,V>)t.next;
+ if (tb != null && tb.next != t)
+ return false;
+ if (tn != null && tn.prev != t)
+ return false;
+ if (tp != null && t != tp.left && t != tp.right)
+ return false;
+ if (tl != null && (tl.parent != t || tl.hash > t.hash))
+ return false;
+ if (tr != null && (tr.parent != t || tr.hash < t.hash))
+ return false;
+ if (t.red && tl != null && tl.red && tr != null && tr.red)
+ return false;
+ if (tl != null && !checkInvariants(tl))
+ return false;
+ if (tr != null && !checkInvariants(tr))
+ return false;
+ return true;
+ }
+
+ private static final sun.misc.Unsafe U;
+ private static final long LOCKSTATE;
+ static {
+ try {
+ U = sun.misc.Unsafe.getUnsafe();
+ Class<?> k = TreeBin.class;
+ LOCKSTATE = U.objectFieldOffset
+ (k.getDeclaredField("lockState"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+ }
+
+ /* ----------------Table Traversal -------------- */
+
+ /**
+ * Encapsulates traversal for methods such as containsValue; also
+ * serves as a base class for other iterators.
+ *
+ * Method advance visits once each still-valid node that was
+ * reachable upon iterator construction. It might miss some that
+ * were added to a bin after the bin was visited, which is OK wrt
+ * consistency guarantees. Maintaining this property in the face
+ * of possible ongoing resizes requires a fair amount of
+ * bookkeeping state that is difficult to optimize away amidst
+ * volatile accesses. Even so, traversal maintains reasonable
+ * throughput.
+ *
+ * Normally, iteration proceeds bin-by-bin traversing lists.
+ * However, if the table has been resized, then all future steps
+ * must traverse both the bin at the current index as well as at
+ * (index + baseSize); and so on for further resizings. To
+ * paranoically cope with potential sharing by users of iterators
+ * across threads, iteration terminates if a bounds checks fails
+ * for a table read.
+ */
+ static class Traverser<K,V> {
+ Node<K,V>[] tab; // current table; updated if resized
+ Node<K,V> next; // the next entry to use
+ int index; // index of bin to use next
+ int baseIndex; // current index of initial table
+ int baseLimit; // index bound for initial table
+ final int baseSize; // initial table size
+
+ Traverser(Node<K,V>[] tab, int size, int index, int limit) {
+ this.tab = tab;
+ this.baseSize = size;
+ this.baseIndex = this.index = index;
+ this.baseLimit = limit;
+ this.next = null;
+ }
+
+ /**
+ * Advances if possible, returning next valid node, or null if none.
+ */
+ final Node<K,V> advance() {
+ Node<K,V> e;
+ if ((e = next) != null)
+ e = e.next;
+ for (;;) {
+ Node<K,V>[] t; int i, n; K ek; // must use locals in checks
+ if (e != null)
+ return next = e;
+ if (baseIndex >= baseLimit || (t = tab) == null ||
+ (n = t.length) <= (i = index) || i < 0)
+ return next = null;
+ if ((e = tabAt(t, index)) != null && e.hash < 0) {
+ if (e instanceof ForwardingNode) {
+ tab = ((ForwardingNode<K,V>)e).nextTable;
+ e = null;
+ continue;
+ }
+ else if (e instanceof TreeBin)
+ e = ((TreeBin<K,V>)e).first;
+ else
+ e = null;
+ }
+ if ((index += baseSize) >= n)
+ index = ++baseIndex; // visit upper slots if present
+ }
+ }
+ }
+
+ /**
+ * Base of key, value, and entry Iterators. Adds fields to
+ * Traverser to support iterator.remove.
+ */
+ static class BaseIterator<K,V> extends Traverser<K,V> {
+ final ConcurrentHashMap<K,V> map;
+ Node<K,V> lastReturned;
+ BaseIterator(Node<K,V>[] tab, int size, int index, int limit,
+ ConcurrentHashMap<K,V> map) {
+ super(tab, size, index, limit);
+ this.map = map;
+ advance();
+ }
+
+ public final boolean hasNext() { return next != null; }
+ public final boolean hasMoreElements() { return next != null; }
public final void remove() {
- if (lastReturned == null)
+ Node<K,V> p;
+ if ((p = lastReturned) == null)
throw new IllegalStateException();
- ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
+ map.replaceNode(p.key, null, null);
+ }
+ }
+
+ static final class KeyIterator<K,V> extends BaseIterator<K,V>
+ implements Iterator<K>, Enumeration<K> {
+ KeyIterator(Node<K,V>[] tab, int index, int size, int limit,
+ ConcurrentHashMap<K,V> map) {
+ super(tab, index, size, limit, map);
+ }
+
+ public final K next() {
+ Node<K,V> p;
+ if ((p = next) == null)
+ throw new NoSuchElementException();
+ K k = p.key;
+ lastReturned = p;
+ advance();
+ return k;
}
+
+ public final K nextElement() { return next(); }
}
- final class KeyIterator
- extends HashIterator
- implements Iterator<K>, Enumeration<K>
- {
- public final K next() { return super.nextEntry().key; }
- public final K nextElement() { return super.nextEntry().key; }
+ static final class ValueIterator<K,V> extends BaseIterator<K,V>
+ implements Iterator<V>, Enumeration<V> {
+ ValueIterator(Node<K,V>[] tab, int index, int size, int limit,
+ ConcurrentHashMap<K,V> map) {
+ super(tab, index, size, limit, map);
+ }
+
+ public final V next() {
+ Node<K,V> p;
+ if ((p = next) == null)
+ throw new NoSuchElementException();
+ V v = p.val;
+ lastReturned = p;
+ advance();
+ return v;
+ }
+
+ public final V nextElement() { return next(); }
}
- final class ValueIterator
- extends HashIterator
- implements Iterator<V>, Enumeration<V>
- {
- public final V next() { return super.nextEntry().value; }
- public final V nextElement() { return super.nextEntry().value; }
+ static final class EntryIterator<K,V> extends BaseIterator<K,V>
+ implements Iterator<Map.Entry<K,V>> {
+ EntryIterator(Node<K,V>[] tab, int index, int size, int limit,
+ ConcurrentHashMap<K,V> map) {
+ super(tab, index, size, limit, map);
+ }
+
+ public final Map.Entry<K,V> next() {
+ Node<K,V> p;
+ if ((p = next) == null)
+ throw new NoSuchElementException();
+ K k = p.key;
+ V v = p.val;
+ lastReturned = p;
+ advance();
+ return new MapEntry<K,V>(k, v, map);
+ }
}
/**
- * Custom Entry class used by EntryIterator.next(), that relays
- * setValue changes to the underlying map.
+ * Exported Entry for EntryIterator
*/
- final class WriteThroughEntry
- extends AbstractMap.SimpleEntry<K,V>
- {
- WriteThroughEntry(K k, V v) {
- super(k,v);
+ static final class MapEntry<K,V> implements Map.Entry<K,V> {
+ final K key; // non-null
+ V val; // non-null
+ final ConcurrentHashMap<K,V> map;
+ MapEntry(K key, V val, ConcurrentHashMap<K,V> map) {
+ this.key = key;
+ this.val = val;
+ this.map = map;
+ }
+ public K getKey() { return key; }
+ public V getValue() { return val; }
+ public int hashCode() { return key.hashCode() ^ val.hashCode(); }
+ public String toString() { return key + "=" + val; }
+
+ public boolean equals(Object o) {
+ Object k, v; Map.Entry<?,?> e;
+ return ((o instanceof Map.Entry) &&
+ (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
+ (v = e.getValue()) != null &&
+ (k == key || k.equals(key)) &&
+ (v == val || v.equals(val)));
}
/**
* Sets our entry's value and writes through to the map. The
- * value to return is somewhat arbitrary here. Since a
- * WriteThroughEntry does not necessarily track asynchronous
- * changes, the most recent "previous" value could be
- * different from what we return (or could even have been
- * removed in which case the put will re-establish). We do not
- * and cannot guarantee more.
+ * value to return is somewhat arbitrary here. Since we do not
+ * necessarily track asynchronous changes, the most recent
+ * "previous" value could be different from what we return (or
+ * could even have been removed, in which case the put will
+ * re-establish). We do not and cannot guarantee more.
*/
public V setValue(V value) {
if (value == null) throw new NullPointerException();
- V v = super.setValue(value);
- ConcurrentHashMap.this.put(getKey(), value);
+ V v = val;
+ val = value;
+ map.put(key, value);
return v;
}
}
- final class EntryIterator
- extends HashIterator
- implements Iterator<Entry<K,V>>
- {
- public Map.Entry<K,V> next() {
- HashEntry<K,V> e = super.nextEntry();
- return new WriteThroughEntry(e.key, e.value);
+ /* ----------------Views -------------- */
+
+ /**
+ * Base class for views.
+ *
+ */
+ abstract static class CollectionView<K,V,E>
+ implements Collection<E>, java.io.Serializable {
+ private static final long serialVersionUID = 7249069246763182397L;
+ final ConcurrentHashMap<K,V> map;
+ CollectionView(ConcurrentHashMap<K,V> map) { this.map = map; }
+
+ /**
+ * Returns the map backing this view.
+ *
+ * @return the map backing this view
+ */
+ public ConcurrentHashMap<K,V> getMap() { return map; }
+
+ /**
+ * Removes all of the elements from this view, by removing all
+ * the mappings from the map backing this view.
+ */
+ public final void clear() { map.clear(); }
+ public final int size() { return map.size(); }
+ public final boolean isEmpty() { return map.isEmpty(); }
+
+ // implementations below rely on concrete classes supplying these
+ // abstract methods
+ /**
+ * Returns a "weakly consistent" iterator that will never
+ * throw {@link ConcurrentModificationException}, and
+ * guarantees to traverse elements as they existed upon
+ * construction of the iterator, and may (but is not
+ * guaranteed to) reflect any modifications subsequent to
+ * construction.
+ */
+ public abstract Iterator<E> iterator();
+ public abstract boolean contains(Object o);
+ public abstract boolean remove(Object o);
+
+ private static final String oomeMsg = "Required array size too large";
+
+ public final Object[] toArray() {
+ long sz = map.mappingCount();
+ if (sz > MAX_ARRAY_SIZE)
+ throw new OutOfMemoryError(oomeMsg);
+ int n = (int)sz;
+ Object[] r = new Object[n];
+ int i = 0;
+ for (E e : this) {
+ if (i == n) {
+ if (n >= MAX_ARRAY_SIZE)
+ throw new OutOfMemoryError(oomeMsg);
+ if (n >= MAX_ARRAY_SIZE - (MAX_ARRAY_SIZE >>> 1) - 1)
+ n = MAX_ARRAY_SIZE;
+ else
+ n += (n >>> 1) + 1;
+ r = Arrays.copyOf(r, n);
+ }
+ r[i++] = e;
+ }
+ return (i == n) ? r : Arrays.copyOf(r, i);
+ }
+
+ @SuppressWarnings("unchecked")
+ public final <T> T[] toArray(T[] a) {
+ long sz = map.mappingCount();
+ if (sz > MAX_ARRAY_SIZE)
+ throw new OutOfMemoryError(oomeMsg);
+ int m = (int)sz;
+ T[] r = (a.length >= m) ? a :
+ (T[])java.lang.reflect.Array
+ .newInstance(a.getClass().getComponentType(), m);
+ int n = r.length;
+ int i = 0;
+ for (E e : this) {
+ if (i == n) {
+ if (n >= MAX_ARRAY_SIZE)
+ throw new OutOfMemoryError(oomeMsg);
+ if (n >= MAX_ARRAY_SIZE - (MAX_ARRAY_SIZE >>> 1) - 1)
+ n = MAX_ARRAY_SIZE;
+ else
+ n += (n >>> 1) + 1;
+ r = Arrays.copyOf(r, n);
+ }
+ r[i++] = (T)e;
+ }
+ if (a == r && i < n) {
+ r[i] = null; // null-terminate
+ return r;
+ }
+ return (i == n) ? r : Arrays.copyOf(r, i);
+ }
+
+ /**
+ * Returns a string representation of this collection.
+ * The string representation consists of the string representations
+ * of the collection's elements in the order they are returned by
+ * its iterator, enclosed in square brackets ({@code "[]"}).
+ * Adjacent elements are separated by the characters {@code ", "}
+ * (comma and space). Elements are converted to strings as by
+ * {@link String#valueOf(Object)}.
+ *
+ * @return a string representation of this collection
+ */
+ public final String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ Iterator<E> it = iterator();
+ if (it.hasNext()) {
+ for (;;) {
+ Object e = it.next();
+ sb.append(e == this ? "(this Collection)" : e);
+ if (!it.hasNext())
+ break;
+ sb.append(',').append(' ');
+ }
+ }
+ return sb.append(']').toString();
}
+
+ public final boolean containsAll(Collection<?> c) {
+ if (c != this) {
+ for (Object e : c) {
+ if (e == null || !contains(e))
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public final boolean removeAll(Collection<?> c) {
+ boolean modified = false;
+ for (Iterator<E> it = iterator(); it.hasNext();) {
+ if (c.contains(it.next())) {
+ it.remove();
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
+ public final boolean retainAll(Collection<?> c) {
+ boolean modified = false;
+ for (Iterator<E> it = iterator(); it.hasNext();) {
+ if (!c.contains(it.next())) {
+ it.remove();
+ modified = true;
+ }
+ }
+ return modified;
+ }
+
}
- final class KeySet extends AbstractSet<K> {
- public Iterator<K> iterator() {
- return new KeyIterator();
+ /**
+ * A view of a ConcurrentHashMap as a {@link Set} of keys, in
+ * which additions may optionally be enabled by mapping to a
+ * common value. This class cannot be directly instantiated.
+ * See {@link #keySet() keySet()},
+ * {@link #keySet(Object) keySet(V)},
+ * {@link #newKeySet() newKeySet()},
+ * {@link #newKeySet(int) newKeySet(int)}.
+ *
+ * @since 1.8
+ *
+ * @hide
+ */
+ public static class KeySetView<K,V> extends CollectionView<K,V,K>
+ implements Set<K>, java.io.Serializable {
+ private static final long serialVersionUID = 7249069246763182397L;
+ private final V value;
+ KeySetView(ConcurrentHashMap<K,V> map, V value) { // non-public
+ super(map);
+ this.value = value;
}
- public int size() {
- return ConcurrentHashMap.this.size();
+
+ /**
+ * Returns the default mapped value for additions,
+ * or {@code null} if additions are not supported.
+ *
+ * @return the default mapped value for additions, or {@code null}
+ * if not supported
+ */
+ public V getMappedValue() { return value; }
+
+ /**
+ * {@inheritDoc}
+ * @throws NullPointerException if the specified key is null
+ */
+ public boolean contains(Object o) { return map.containsKey(o); }
+
+ /**
+ * Removes the key from this map view, by removing the key (and its
+ * corresponding value) from the backing map. This method does
+ * nothing if the key is not in the map.
+ *
+ * @param o the key to be removed from the backing map
+ * @return {@code true} if the backing map contained the specified key
+ * @throws NullPointerException if the specified key is null
+ */
+ public boolean remove(Object o) { return map.remove(o) != null; }
+
+ /**
+ * @return an iterator over the keys of the backing map
+ */
+ public Iterator<K> iterator() {
+ Node<K,V>[] t;
+ ConcurrentHashMap<K,V> m = map;
+ int f = (t = m.table) == null ? 0 : t.length;
+ return new KeyIterator<K,V>(t, f, 0, f, m);
}
- public boolean isEmpty() {
- return ConcurrentHashMap.this.isEmpty();
+
+ /**
+ * Adds the specified key to this set view by mapping the key to
+ * the default mapped value in the backing map, if defined.
+ *
+ * @param e key to be added
+ * @return {@code true} if this set changed as a result of the call
+ * @throws NullPointerException if the specified key is null
+ * @throws UnsupportedOperationException if no default mapped value
+ * for additions was provided
+ */
+ public boolean add(K e) {
+ V v;
+ if ((v = value) == null)
+ throw new UnsupportedOperationException();
+ return map.putVal(e, v, true) == null;
}
- public boolean contains(Object o) {
- return ConcurrentHashMap.this.containsKey(o);
+
+ /**
+ * Adds all of the elements in the specified collection to this set,
+ * as if by calling {@link #add} on each one.
+ *
+ * @param c the elements to be inserted into this set
+ * @return {@code true} if this set changed as a result of the call
+ * @throws NullPointerException if the collection or any of its
+ * elements are {@code null}
+ * @throws UnsupportedOperationException if no default mapped value
+ * for additions was provided
+ */
+ public boolean addAll(Collection<? extends K> c) {
+ boolean added = false;
+ V v;
+ if ((v = value) == null)
+ throw new UnsupportedOperationException();
+ for (K e : c) {
+ if (map.putVal(e, v, true) == null)
+ added = true;
+ }
+ return added;
}
- public boolean remove(Object o) {
- return ConcurrentHashMap.this.remove(o) != null;
+
+ public int hashCode() {
+ int h = 0;
+ for (K e : this)
+ h += e.hashCode();
+ return h;
}
- public void clear() {
- ConcurrentHashMap.this.clear();
+
+ public boolean equals(Object o) {
+ Set<?> c;
+ return ((o instanceof Set) &&
+ ((c = (Set<?>)o) == this ||
+ (containsAll(c) && c.containsAll(this))));
}
+
}
- final class Values extends AbstractCollection<V> {
- public Iterator<V> iterator() {
- return new ValueIterator();
+ /**
+ * A view of a ConcurrentHashMap as a {@link Collection} of
+ * values, in which additions are disabled. This class cannot be
+ * directly instantiated. See {@link #values()}.
+ */
+ static final class ValuesView<K,V> extends CollectionView<K,V,V>
+ implements Collection<V>, java.io.Serializable {
+ private static final long serialVersionUID = 2249069246763182397L;
+ ValuesView(ConcurrentHashMap<K,V> map) { super(map); }
+ public final boolean contains(Object o) {
+ return map.containsValue(o);
}
- public int size() {
- return ConcurrentHashMap.this.size();
+
+ public final boolean remove(Object o) {
+ if (o != null) {
+ for (Iterator<V> it = iterator(); it.hasNext();) {
+ if (o.equals(it.next())) {
+ it.remove();
+ return true;
+ }
+ }
+ }
+ return false;
}
- public boolean isEmpty() {
- return ConcurrentHashMap.this.isEmpty();
+
+ public final Iterator<V> iterator() {
+ ConcurrentHashMap<K,V> m = map;
+ Node<K,V>[] t;
+ int f = (t = m.table) == null ? 0 : t.length;
+ return new ValueIterator<K,V>(t, f, 0, f, m);
}
- public boolean contains(Object o) {
- return ConcurrentHashMap.this.containsValue(o);
+
+ public final boolean add(V e) {
+ throw new UnsupportedOperationException();
}
- public void clear() {
- ConcurrentHashMap.this.clear();
+ public final boolean addAll(Collection<? extends V> c) {
+ throw new UnsupportedOperationException();
}
+
}
- final class EntrySet extends AbstractSet<Map.Entry<K,V>> {
- public Iterator<Map.Entry<K,V>> iterator() {
- return new EntryIterator();
- }
+ /**
+ * A view of a ConcurrentHashMap as a {@link Set} of (key, value)
+ * entries. This class cannot be directly instantiated. See
+ * {@link #entrySet()}.
+ */
+ static final class EntrySetView<K,V> extends CollectionView<K,V,Map.Entry<K,V>>
+ implements Set<Map.Entry<K,V>>, java.io.Serializable {
+ private static final long serialVersionUID = 2249069246763182397L;
+ EntrySetView(ConcurrentHashMap<K,V> map) { super(map); }
+
public boolean contains(Object o) {
- if (!(o instanceof Map.Entry))
- return false;
- Map.Entry<?,?> e = (Map.Entry<?,?>)o;
- V v = ConcurrentHashMap.this.get(e.getKey());
- return v != null && v.equals(e.getValue());
+ Object k, v, r; Map.Entry<?,?> e;
+ return ((o instanceof Map.Entry) &&
+ (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
+ (r = map.get(k)) != null &&
+ (v = e.getValue()) != null &&
+ (v == r || v.equals(r)));
}
+
public boolean remove(Object o) {
- if (!(o instanceof Map.Entry))
- return false;
- Map.Entry<?,?> e = (Map.Entry<?,?>)o;
- return ConcurrentHashMap.this.remove(e.getKey(), e.getValue());
+ Object k, v; Map.Entry<?,?> e;
+ return ((o instanceof Map.Entry) &&
+ (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
+ (v = e.getValue()) != null &&
+ map.remove(k, v));
}
- public int size() {
- return ConcurrentHashMap.this.size();
+
+ /**
+ * @return an iterator over the entries of the backing map
+ */
+ public Iterator<Map.Entry<K,V>> iterator() {
+ ConcurrentHashMap<K,V> m = map;
+ Node<K,V>[] t;
+ int f = (t = m.table) == null ? 0 : t.length;
+ return new EntryIterator<K,V>(t, f, 0, f, m);
}
- public boolean isEmpty() {
- return ConcurrentHashMap.this.isEmpty();
+
+ public boolean add(Entry<K,V> e) {
+ return map.putVal(e.getKey(), e.getValue(), false) == null;
}
- public void clear() {
- ConcurrentHashMap.this.clear();
+
+ public boolean addAll(Collection<? extends Entry<K,V>> c) {
+ boolean added = false;
+ for (Entry<K,V> e : c) {
+ if (add(e))
+ added = true;
+ }
+ return added;
}
+
+ public final int hashCode() {
+ int h = 0;
+ Node<K,V>[] t;
+ if ((t = map.table) != null) {
+ Traverser<K,V> it = new Traverser<K,V>(t, t.length, 0, t.length);
+ for (Node<K,V> p; (p = it.advance()) != null; ) {
+ h += p.hashCode();
+ }
+ }
+ return h;
+ }
+
+ public final boolean equals(Object o) {
+ Set<?> c;
+ return ((o instanceof Set) &&
+ ((c = (Set<?>)o) == this ||
+ (containsAll(c) && c.containsAll(this))));
+ }
+
}
- /* ---------------- Serialization Support -------------- */
+
+ /* ---------------- Counters -------------- */
+
+ // Adapted from LongAdder and Striped64.
+ // See their internal docs for explanation.
+
+ // A padded cell for distributing counts
+ static final class CounterCell {
+ volatile long p0, p1, p2, p3, p4, p5, p6;
+ volatile long value;
+ volatile long q0, q1, q2, q3, q4, q5, q6;
+ CounterCell(long x) { value = x; }
+ }
/**
- * Saves the state of the <tt>ConcurrentHashMap</tt> instance to a
- * stream (i.e., serializes it).
- * @param s the stream
- * @serialData
- * the key (Object) and value (Object)
- * for each key-value mapping, followed by a null pair.
- * The key-value mappings are emitted in no particular order.
+ * Holder for the thread-local hash code determining which
+ * CounterCell to use. The code is initialized via the
+ * counterHashCodeGenerator, but may be moved upon collisions.
*/
- private void writeObject(java.io.ObjectOutputStream s)
- throws java.io.IOException {
- // force all segments for serialization compatibility
- for (int k = 0; k < segments.length; ++k)
- ensureSegment(k);
- s.defaultWriteObject();
-
- final Segment<K,V>[] segments = this.segments;
- for (int k = 0; k < segments.length; ++k) {
- Segment<K,V> seg = segmentAt(segments, k);
- seg.lock();
- try {
- HashEntry<K,V>[] tab = seg.table;
- for (int i = 0; i < tab.length; ++i) {
- HashEntry<K,V> e;
- for (e = entryAt(tab, i); e != null; e = e.next) {
- s.writeObject(e.key);
- s.writeObject(e.value);
- }
- }
- } finally {
- seg.unlock();
- }
- }
- s.writeObject(null);
- s.writeObject(null);
+ static final class CounterHashCode {
+ int code;
}
/**
- * Reconstitutes the <tt>ConcurrentHashMap</tt> instance from a
- * stream (i.e., deserializes it).
- * @param s the stream
+ * Generates initial value for per-thread CounterHashCodes.
*/
- @SuppressWarnings("unchecked")
- private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
- s.defaultReadObject();
+ static final AtomicInteger counterHashCodeGenerator = new AtomicInteger();
- // Re-initialize segments to be minimally sized, and let grow.
- int cap = MIN_SEGMENT_TABLE_CAPACITY;
- final Segment<K,V>[] segments = this.segments;
- for (int k = 0; k < segments.length; ++k) {
- Segment<K,V> seg = segments[k];
- if (seg != null) {
- seg.threshold = (int)(cap * seg.loadFactor);
- seg.table = (HashEntry<K,V>[]) new HashEntry<?,?>[cap];
+ /**
+ * Increment for counterHashCodeGenerator. See class ThreadLocal
+ * for explanation.
+ */
+ static final int SEED_INCREMENT = 0x61c88647;
+
+ /**
+ * Per-thread counter hash codes. Shared across all instances.
+ */
+ static final ThreadLocal<CounterHashCode> threadCounterHashCode =
+ new ThreadLocal<CounterHashCode>();
+
+ final long sumCount() {
+ CounterCell[] as = counterCells; CounterCell a;
+ long sum = baseCount;
+ if (as != null) {
+ for (int i = 0; i < as.length; ++i) {
+ if ((a = as[i]) != null)
+ sum += a.value;
}
}
+ return sum;
+ }
- // Read the keys and values, and put the mappings in the table
+ // See LongAdder version for explanation
+ private final void fullAddCount(long x, CounterHashCode hc,
+ boolean wasUncontended) {
+ int h;
+ if (hc == null) {
+ hc = new CounterHashCode();
+ int s = counterHashCodeGenerator.addAndGet(SEED_INCREMENT);
+ h = hc.code = (s == 0) ? 1 : s; // Avoid zero
+ threadCounterHashCode.set(hc);
+ }
+ else
+ h = hc.code;
+ boolean collide = false; // True if last slot nonempty
for (;;) {
- K key = (K) s.readObject();
- V value = (V) s.readObject();
- if (key == null)
- break;
- put(key, value);
+ CounterCell[] as; CounterCell a; int n; long v;
+ if ((as = counterCells) != null && (n = as.length) > 0) {
+ if ((a = as[(n - 1) & h]) == null) {
+ if (cellsBusy == 0) { // Try to attach new Cell
+ CounterCell r = new CounterCell(x); // Optimistic create
+ if (cellsBusy == 0 &&
+ U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
+ boolean created = false;
+ try { // Recheck under lock
+ CounterCell[] rs; int m, j;
+ if ((rs = counterCells) != null &&
+ (m = rs.length) > 0 &&
+ rs[j = (m - 1) & h] == null) {
+ rs[j] = r;
+ created = true;
+ }
+ } finally {
+ cellsBusy = 0;
+ }
+ if (created)
+ break;
+ continue; // Slot is now non-empty
+ }
+ }
+ collide = false;
+ }
+ else if (!wasUncontended) // CAS already known to fail
+ wasUncontended = true; // Continue after rehash
+ else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
+ break;
+ else if (counterCells != as || n >= NCPU)
+ collide = false; // At max size or stale
+ else if (!collide)
+ collide = true;
+ else if (cellsBusy == 0 &&
+ U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
+ try {
+ if (counterCells == as) {// Expand table unless stale
+ CounterCell[] rs = new CounterCell[n << 1];
+ for (int i = 0; i < n; ++i)
+ rs[i] = as[i];
+ counterCells = rs;
+ }
+ } finally {
+ cellsBusy = 0;
+ }
+ collide = false;
+ continue; // Retry with expanded table
+ }
+ h ^= h << 13; // Rehash
+ h ^= h >>> 17;
+ h ^= h << 5;
+ }
+ else if (cellsBusy == 0 && counterCells == as &&
+ U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
+ boolean init = false;
+ try { // Initialize table
+ if (counterCells == as) {
+ CounterCell[] rs = new CounterCell[2];
+ rs[h & 1] = new CounterCell(x);
+ counterCells = rs;
+ init = true;
+ }
+ } finally {
+ cellsBusy = 0;
+ }
+ if (init)
+ break;
+ }
+ else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
+ break; // Fall back on using base
}
+ hc.code = h; // Record index for next time
}
// Unsafe mechanics
- private static final sun.misc.Unsafe UNSAFE;
- private static final long SBASE;
- private static final int SSHIFT;
- private static final long TBASE;
- private static final int TSHIFT;
+ private static final sun.misc.Unsafe U;
+ private static final long SIZECTL;
+ private static final long TRANSFERINDEX;
+ private static final long TRANSFERORIGIN;
+ private static final long BASECOUNT;
+ private static final long CELLSBUSY;
+ private static final long CELLVALUE;
+ private static final long ABASE;
+ private static final int ASHIFT;
static {
- int ss, ts;
try {
- UNSAFE = sun.misc.Unsafe.getUnsafe();
- Class<?> tc = HashEntry[].class;
- Class<?> sc = Segment[].class;
- TBASE = UNSAFE.arrayBaseOffset(tc);
- SBASE = UNSAFE.arrayBaseOffset(sc);
- ts = UNSAFE.arrayIndexScale(tc);
- ss = UNSAFE.arrayIndexScale(sc);
+ U = sun.misc.Unsafe.getUnsafe();
+ Class<?> k = ConcurrentHashMap.class;
+ SIZECTL = U.objectFieldOffset
+ (k.getDeclaredField("sizeCtl"));
+ TRANSFERINDEX = U.objectFieldOffset
+ (k.getDeclaredField("transferIndex"));
+ TRANSFERORIGIN = U.objectFieldOffset
+ (k.getDeclaredField("transferOrigin"));
+ BASECOUNT = U.objectFieldOffset
+ (k.getDeclaredField("baseCount"));
+ CELLSBUSY = U.objectFieldOffset
+ (k.getDeclaredField("cellsBusy"));
+ Class<?> ck = CounterCell.class;
+ CELLVALUE = U.objectFieldOffset
+ (ck.getDeclaredField("value"));
+ Class<?> ak = Node[].class;
+ ABASE = U.arrayBaseOffset(ak);
+ int scale = U.arrayIndexScale(ak);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
- if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0)
- throw new Error("data type scale not a power of two");
- SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);
- TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);
}
}
diff --git a/luni/src/main/java/java/util/concurrent/ConcurrentLinkedQueue.java b/luni/src/main/java/java/util/concurrent/ConcurrentLinkedQueue.java
index 873f825..b39a533 100644
--- a/luni/src/main/java/java/util/concurrent/ConcurrentLinkedQueue.java
+++ b/luni/src/main/java/java/util/concurrent/ConcurrentLinkedQueue.java
@@ -31,7 +31,7 @@ import java.util.Queue;
* Like most other concurrent collection implementations, this class
* does not permit the use of {@code null} elements.
*
- * <p>This implementation employs an efficient &quot;wait-free&quot;
+ * <p>This implementation employs an efficient <em>non-blocking</em>
* algorithm based on one described in <a
* href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
* Fast, and Practical Non-Blocking and Blocking Concurrent Queue
diff --git a/luni/src/main/java/java/util/concurrent/CountedCompleter.java b/luni/src/main/java/java/util/concurrent/CountedCompleter.java
index ffe7582..d5f794e 100644
--- a/luni/src/main/java/java/util/concurrent/CountedCompleter.java
+++ b/luni/src/main/java/java/util/concurrent/CountedCompleter.java
@@ -8,14 +8,15 @@ package java.util.concurrent;
/**
* A {@link ForkJoinTask} with a completion action performed when
- * triggered and there are no remaining pending
- * actions. CountedCompleters are in general more robust in the
+ * triggered and there are no remaining pending actions.
+ * CountedCompleters are in general more robust in the
* presence of subtask stalls and blockage than are other forms of
* ForkJoinTasks, but are less intuitive to program. Uses of
* CountedCompleter are similar to those of other completion based
* components (such as {@link java.nio.channels.CompletionHandler})
* except that multiple <em>pending</em> completions may be necessary
- * to trigger the completion action {@link #onCompletion}, not just one.
+ * to trigger the completion action {@link #onCompletion(CountedCompleter)},
+ * not just one.
* Unless initialized otherwise, the {@linkplain #getPendingCount pending
* count} starts at zero, but may be (atomically) changed using
* methods {@link #setPendingCount}, {@link #addToPendingCount}, and
@@ -40,9 +41,10 @@ package java.util.concurrent;
* <p>A concrete CountedCompleter class must define method {@link
* #compute}, that should in most cases (as illustrated below), invoke
* {@code tryComplete()} once before returning. The class may also
- * optionally override method {@link #onCompletion} to perform an
- * action upon normal completion, and method {@link
- * #onExceptionalCompletion} to perform an action upon any exception.
+ * optionally override method {@link #onCompletion(CountedCompleter)}
+ * to perform an action upon normal completion, and method
+ * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
+ * perform an action upon any exception.
*
* <p>CountedCompleters most often do not bear results, in which case
* they are normally declared as {@code CountedCompleter<Void>}, and
@@ -63,13 +65,14 @@ package java.util.concurrent;
* only as an internal helper for other computations, so its own task
* status (as reported in methods such as {@link ForkJoinTask#isDone})
* is arbitrary; this status changes only upon explicit invocations of
- * {@link #complete}, {@link ForkJoinTask#cancel}, {@link
- * ForkJoinTask#completeExceptionally} or upon exceptional completion
- * of method {@code compute}. Upon any exceptional completion, the
- * exception may be relayed to a task's completer (and its completer,
- * and so on), if one exists and it has not otherwise already
- * completed. Similarly, cancelling an internal CountedCompleter has
- * only a local effect on that completer, so is not often useful.
+ * {@link #complete}, {@link ForkJoinTask#cancel},
+ * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
+ * exceptional completion of method {@code compute}. Upon any
+ * exceptional completion, the exception may be relayed to a task's
+ * completer (and its completer, and so on), if one exists and it has
+ * not otherwise already completed. Similarly, cancelling an internal
+ * CountedCompleter has only a local effect on that completer, so is
+ * not often useful.
*
* <p><b>Sample Usages.</b>
*
@@ -96,8 +99,8 @@ package java.util.concurrent;
* improve load balancing. In the recursive case, the second of each
* pair of subtasks to finish triggers completion of its parent
* (because no result combination is performed, the default no-op
- * implementation of method {@code onCompletion} is not overridden). A
- * static utility method sets up the base task and invokes it
+ * implementation of method {@code onCompletion} is not overridden).
+ * A static utility method sets up the base task and invokes it
* (here, implicitly using the {@link ForkJoinPool#commonPool()}).
*
* <pre> {@code
@@ -152,12 +155,11 @@ package java.util.concurrent;
* }
* }</pre>
*
- * As a further improvement, notice that the left task need not even
- * exist. Instead of creating a new one, we can iterate using the
- * original task, and add a pending count for each fork. Additionally,
- * because no task in this tree implements an {@link #onCompletion}
- * method, {@code tryComplete()} can be replaced with {@link
- * #propagateCompletion}.
+ * As a further improvement, notice that the left task need not even exist.
+ * Instead of creating a new one, we can iterate using the original task,
+ * and add a pending count for each fork. Additionally, because no task
+ * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
+ * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
*
* <pre> {@code
* class ForEach<E> ...
@@ -235,7 +237,7 @@ package java.util.concurrent;
*
* <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
* results of multiple subtasks usually need to access these results
- * in method {@link #onCompletion}. As illustrated in the following
+ * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
* class (that performs a simplified form of map-reduce where mappings
* and reductions are all of type {@code E}), one way to do this in
* divide and conquer designs is to have each subtask record its
@@ -357,7 +359,7 @@ package java.util.concurrent;
*
* <p><b>Triggers.</b> Some CountedCompleters are themselves never
* forked, but instead serve as bits of plumbing in other designs;
- * including those in which the completion of one of more async tasks
+ * including those in which the completion of one or more async tasks
* triggers another async task. For example:
*
* <pre> {@code
@@ -438,20 +440,21 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
}
/**
- * Performs an action when method {@link #completeExceptionally}
- * is invoked or method {@link #compute} throws an exception, and
- * this task has not otherwise already completed normally. On
- * entry to this method, this task {@link
- * ForkJoinTask#isCompletedAbnormally}. The return value of this
- * method controls further propagation: If {@code true} and this
- * task has a completer, then this completer is also completed
- * exceptionally. The default implementation of this method does
- * nothing except return {@code true}.
+ * Performs an action when method {@link
+ * #completeExceptionally(Throwable)} is invoked or method {@link
+ * #compute} throws an exception, and this task has not already
+ * otherwise completed normally. On entry to this method, this task
+ * {@link ForkJoinTask#isCompletedAbnormally}. The return value
+ * of this method controls further propagation: If {@code true}
+ * and this task has a completer that has not completed, then that
+ * completer is also completed exceptionally, with the same
+ * exception as this completer. The default implementation of
+ * this method does nothing except return {@code true}.
*
* @param ex the exception
* @param caller the task invoking this method (which may
* be this task itself)
- * @return true if this exception should be propagated to this
+ * @return {@code true} if this exception should be propagated to this
* task's completer, if one exists
*/
public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
@@ -492,7 +495,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
* @param delta the value to add
*/
public final void addToPendingCount(int delta) {
- int c; // note: can replace with intrinsic in jdk8
+ int c;
do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
}
@@ -502,7 +505,7 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
*
* @param expected the expected value
* @param count the new value
- * @return true if successful
+ * @return {@code true} if successful
*/
public final boolean compareAndSetPendingCount(int expected, int count) {
return U.compareAndSwapInt(this, PENDING, expected, count);
@@ -536,9 +539,9 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/**
* If the pending count is nonzero, decrements the count;
- * otherwise invokes {@link #onCompletion} and then similarly
- * tries to complete this task's completer, if one exists,
- * else marks this task as complete.
+ * otherwise invokes {@link #onCompletion(CountedCompleter)}
+ * and then similarly tries to complete this task's completer,
+ * if one exists, else marks this task as complete.
*/
public final void tryComplete() {
CountedCompleter<?> a = this, s = a;
@@ -557,12 +560,12 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
/**
* Equivalent to {@link #tryComplete} but does not invoke {@link
- * #onCompletion} along the completion path: If the pending count
- * is nonzero, decrements the count; otherwise, similarly tries to
- * complete this task's completer, if one exists, else marks this
- * task as complete. This method may be useful in cases where
- * {@code onCompletion} should not, or need not, be invoked for
- * each completer in a computation.
+ * #onCompletion(CountedCompleter)} along the completion path:
+ * If the pending count is nonzero, decrements the count;
+ * otherwise, similarly tries to complete this task's completer, if
+ * one exists, else marks this task as complete. This method may be
+ * useful in cases where {@code onCompletion} should not, or need
+ * not, be invoked for each completer in a computation.
*/
public final void propagateCompletion() {
CountedCompleter<?> a = this, s = a;
@@ -579,13 +582,15 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
}
/**
- * Regardless of pending count, invokes {@link #onCompletion},
- * marks this task as complete and further triggers {@link
- * #tryComplete} on this task's completer, if one exists. The
- * given rawResult is used as an argument to {@link #setRawResult}
- * before invoking {@link #onCompletion} or marking this task as
- * complete; its value is meaningful only for classes overriding
- * {@code setRawResult}.
+ * Regardless of pending count, invokes
+ * {@link #onCompletion(CountedCompleter)}, marks this task as
+ * complete and further triggers {@link #tryComplete} on this
+ * task's completer, if one exists. The given rawResult is
+ * used as an argument to {@link #setRawResult} before invoking
+ * {@link #onCompletion(CountedCompleter)} or marking this task
+ * as complete; its value is meaningful only for classes
+ * overriding {@code setRawResult}. This method does not modify
+ * the pending count.
*
* <p>This method may be useful when forcing completion as soon as
* any one (versus all) of several subtask results are obtained.
@@ -604,7 +609,6 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
p.tryComplete();
}
-
/**
* If this task's pending count is zero, returns this task;
* otherwise decrements its pending count and returns {@code
@@ -668,8 +672,9 @@ public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
void internalPropagateException(Throwable ex) {
CountedCompleter<?> a = this, s = a;
while (a.onExceptionalCompletion(ex, s) &&
- (a = (s = a).completer) != null && a.status >= 0)
- a.recordExceptionalCompletion(ex);
+ (a = (s = a).completer) != null && a.status >= 0 &&
+ a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
+ ;
}
/**
diff --git a/luni/src/main/java/java/util/concurrent/ForkJoinPool.java b/luni/src/main/java/java/util/concurrent/ForkJoinPool.java
index 87ffff3..5ac01c8 100644
--- a/luni/src/main/java/java/util/concurrent/ForkJoinPool.java
+++ b/luni/src/main/java/java/util/concurrent/ForkJoinPool.java
@@ -6,6 +6,7 @@
package java.util.concurrent;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -17,6 +18,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
@@ -49,9 +51,9 @@ import java.util.concurrent.TimeUnit;
* level; by default, equal to the number of available processors. The
* pool attempts to maintain enough active (or available) threads by
* dynamically adding, suspending, or resuming internal worker
- * threads, even if some tasks are stalled waiting to join
- * others. However, no such adjustments are guaranteed in the face of
- * blocked I/O or other unmanaged synchronization. The nested {@link
+ * threads, even if some tasks are stalled waiting to join others.
+ * However, no such adjustments are guaranteed in the face of blocked
+ * I/O or other unmanaged synchronization. The nested {@link
* ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
@@ -75,38 +77,45 @@ import java.util.concurrent.TimeUnit;
* there is little difference among choice of methods.
*
* <table BORDER CELLPADDING=3 CELLSPACING=1>
+ * <caption>Summary of task execution methods</caption>
* <tr>
* <td></td>
* <td ALIGN=CENTER> <b>Call from non-fork/join clients</b></td>
* <td ALIGN=CENTER> <b>Call from within fork/join computations</b></td>
* </tr>
* <tr>
- * <td> <b>Arrange async execution</td>
+ * <td> <b>Arrange async execution</b></td>
* <td> {@link #execute(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork}</td>
* </tr>
* <tr>
- * <td> <b>Await and obtain result</td>
+ * <td> <b>Await and obtain result</b></td>
* <td> {@link #invoke(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#invoke}</td>
* </tr>
* <tr>
- * <td> <b>Arrange exec and obtain Future</td>
+ * <td> <b>Arrange exec and obtain Future</b></td>
* <td> {@link #submit(ForkJoinTask)}</td>
* <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
* </tr>
* </table>
*
* <p>The common pool is by default constructed with default
- * parameters, but these may be controlled by setting three {@link
- * System#getProperty system properties} with prefix {@code
- * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
- * an integer greater than zero, {@code threadFactory} -- the class
- * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
- * exceptionHandler} -- the class name of a {@link
- * java.lang.Thread.UncaughtExceptionHandler
- * Thread.UncaughtExceptionHandler}. Upon any error in establishing
- * these settings, default parameters are used.
+ * parameters, but these may be controlled by setting three
+ * {@linkplain System#getProperty system properties}:
+ * <ul>
+ * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
+ * - the parallelism level, a non-negative integer
+ * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
+ * - the class name of a {@link ForkJoinWorkerThreadFactory}
+ * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
+ * - the class name of a {@link UncaughtExceptionHandler}
+ * </ul>
+ * The system class loader is used to load these classes.
+ * Upon any error in establishing these settings, default parameters
+ * are used. It is possible to disable or limit the use of threads in
+ * the common pool by setting the parallelism property to zero, and/or
+ * using a factory that may return {@code null}.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
@@ -153,32 +162,35 @@ public class ForkJoinPool extends AbstractExecutorService {
* (http://research.sun.com/scalable/pubs/index.html) and
* "Idempotent work stealing" by Michael, Saraswat, and Vechev,
* PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
- * The main differences ultimately stem from GC requirements that
- * we null out taken slots as soon as we can, to maintain as small
- * a footprint as possible even in programs generating huge
- * numbers of tasks. To accomplish this, we shift the CAS
- * arbitrating pop vs poll (steal) from being on the indices
- * ("base" and "top") to the slots themselves. So, both a
- * successful pop and poll mainly entail a CAS of a slot from
- * non-null to null. Because we rely on CASes of references, we
- * do not need tag bits on base or top. They are simple ints as
- * used in any circular array-based queue (see for example
- * ArrayDeque). Updates to the indices must still be ordered in a
- * way that guarantees that top == base means the queue is empty,
- * but otherwise may err on the side of possibly making the queue
- * appear nonempty when a push, pop, or poll have not fully
- * committed. Note that this means that the poll operation,
- * considered individually, is not wait-free. One thief cannot
- * successfully continue until another in-progress one (or, if
- * previously empty, a push) completes. However, in the
- * aggregate, we ensure at least probabilistic non-blockingness.
- * If an attempted steal fails, a thief always chooses a different
- * random victim target to try next. So, in order for one thief to
- * progress, it suffices for any in-progress poll or new push on
- * any empty queue to complete. (This is why we normally use
- * method pollAt and its variants that try once at the apparent
- * base index, else consider alternative actions, rather than
- * method poll.)
+ * See also "Correct and Efficient Work-Stealing for Weak Memory
+ * Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
+ * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
+ * analysis of memory ordering (atomic, volatile etc) issues. The
+ * main differences ultimately stem from GC requirements that we
+ * null out taken slots as soon as we can, to maintain as small a
+ * footprint as possible even in programs generating huge numbers
+ * of tasks. To accomplish this, we shift the CAS arbitrating pop
+ * vs poll (steal) from being on the indices ("base" and "top") to
+ * the slots themselves. So, both a successful pop and poll
+ * mainly entail a CAS of a slot from non-null to null. Because
+ * we rely on CASes of references, we do not need tag bits on base
+ * or top. They are simple ints as used in any circular
+ * array-based queue (see for example ArrayDeque). Updates to the
+ * indices must still be ordered in a way that guarantees that top
+ * == base means the queue is empty, but otherwise may err on the
+ * side of possibly making the queue appear nonempty when a push,
+ * pop, or poll have not fully committed. Note that this means
+ * that the poll operation, considered individually, is not
+ * wait-free. One thief cannot successfully continue until another
+ * in-progress one (or, if previously empty, a push) completes.
+ * However, in the aggregate, we ensure at least probabilistic
+ * non-blockingness. If an attempted steal fails, a thief always
+ * chooses a different random victim target to try next. So, in
+ * order for one thief to progress, it suffices for any
+ * in-progress poll or new push on any empty queue to
+ * complete. (This is why we normally use method pollAt and its
+ * variants that try once at the apparent base index, else
+ * consider alternative actions, rather than method poll.)
*
* This approach also enables support of a user mode in which local
* task processing is in FIFO, not LIFO order, simply by using
@@ -197,18 +209,18 @@ public class ForkJoinPool extends AbstractExecutorService {
* for work-stealing (this would contaminate lifo/fifo
* processing). Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
- * ThreadLocal Submitter class contains a value initially used as
- * a hash code for choosing existing queues, but may be randomly
- * repositioned upon contention with other submitters. In
- * essence, submitters act like workers except that they are
- * restricted to executing local tasks that they submitted (or in
- * the case of CountedCompleters, others with the same root task).
- * However, because most shared/external queue operations are more
- * expensive than internal, and because, at steady state, external
- * submitters will compete for CPU with workers, ForkJoinTask.join
- * and related methods disable them from repeatedly helping to
- * process tasks if all workers are active. Insertion of tasks in
- * shared mode requires a lock (mainly to protect in the case of
+ * Submitter probe value serves as a hash code for
+ * choosing existing queues, and may be randomly repositioned upon
+ * contention with other submitters. In essence, submitters act
+ * like workers except that they are restricted to executing local
+ * tasks that they submitted (or in the case of CountedCompleters,
+ * others with the same root task). However, because most
+ * shared/external queue operations are more expensive than
+ * internal, and because, at steady state, external submitters
+ * will compete for CPU with workers, ForkJoinTask.join and
+ * related methods disable them from repeatedly helping to process
+ * tasks if all workers are active. Insertion of tasks in shared
+ * mode requires a lock (mainly to protect in the case of
* resizing) but we use only a simple spinlock (using bits in
* field qlock), because submitters encountering a busy queue move
* on to try or create other queues -- they block only when
@@ -298,37 +310,35 @@ public class ForkJoinPool extends AbstractExecutorService {
* has not yet entered the wait queue. We solve this by requiring
* a full sweep of all workers (via repeated calls to method
* scan()) both before and after a newly waiting worker is added
- * to the wait queue. During a rescan, the worker might release
- * some other queued worker rather than itself, which has the same
- * net effect. Because enqueued workers may actually be rescanning
- * rather than waiting, we set and clear the "parker" field of
- * WorkQueues to reduce unnecessary calls to unpark. (This
- * requires a secondary recheck to avoid missed signals.) Note
- * the unusual conventions about Thread.interrupts surrounding
- * parking and other blocking: Because interrupts are used solely
- * to alert threads to check termination, which is checked anyway
- * upon blocking, we clear status (using Thread.interrupted)
- * before any call to park, so that park does not immediately
- * return due to status being set via some other unrelated call to
- * interrupt in user code.
+ * to the wait queue. Because enqueued workers may actually be
+ * rescanning rather than waiting, we set and clear the "parker"
+ * field of WorkQueues to reduce unnecessary calls to unpark.
+ * (This requires a secondary recheck to avoid missed signals.)
+ * Note the unusual conventions about Thread.interrupts
+ * surrounding parking and other blocking: Because interrupts are
+ * used solely to alert threads to check termination, which is
+ * checked anyway upon blocking, we clear status (using
+ * Thread.interrupted) before any call to park, so that park does
+ * not immediately return due to status being set via some other
+ * unrelated call to interrupt in user code.
*
* Signalling. We create or wake up workers only when there
* appears to be at least one task they might be able to find and
- * execute. However, many other threads may notice the same task
- * and each signal to wake up a thread that might take it. So in
- * general, pools will be over-signalled. When a submission is
- * added or another worker adds a task to a queue that has fewer
- * than two tasks, they signal waiting workers (or trigger
- * creation of new ones if fewer than the given parallelism level
- * -- signalWork), and may leave a hint to the unparked worker to
- * help signal others upon wakeup). These primary signals are
- * buttressed by others (see method helpSignal) whenever other
- * threads scan for work or do not have a task to process. On
- * most platforms, signalling (unpark) overhead time is noticeably
+ * execute. When a submission is added or another worker adds a
+ * task to a queue that has fewer than two tasks, they signal
+ * waiting workers (or trigger creation of new ones if fewer than
+ * the given parallelism level -- signalWork). These primary
+ * signals are buttressed by others whenever other threads remove
+ * a task from a queue and notice that there are other tasks there
+ * as well. So in general, pools will be over-signalled. On most
+ * platforms, signalling (unpark) overhead time is noticeably
* long, and the time between signalling a thread and it actually
* making progress can be very noticeably long, so it is worth
* offloading these delays from critical paths as much as
- * possible.
+ * possible. Additionally, workers spin-down gradually, by staying
+ * alive so long as they see the ctl state changing. Similar
+ * stability-sensing techniques are also used before blocking in
+ * awaitJoin and helpComplete.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
@@ -441,7 +451,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* Common Pool
* ===========
*
- * The static commonPool always exists after static
+ * The static common pool always exists after static
* initialization. Since it (or any other created pool) need
* never be used, we minimize initial construction overhead and
* footprint to the setup of about a dozen fields, with no nested
@@ -449,8 +459,11 @@ public class ForkJoinPool extends AbstractExecutorService {
* fullExternalPush during the first submission to the pool.
*
* When external threads submit to the common pool, they can
- * perform some subtask processing (see externalHelpJoin and
- * related methods). We do not need to record whether these
+ * perform subtask processing (see externalHelpJoin and related
+ * methods). This caller-helps policy makes it sensible to set
+ * common pool parallelism level to one (or more) less than the
+ * total number of available cores, or even zero for pure
+ * caller-runs. We do not need to record whether external
* submissions are to the common pool -- if not, externalHelpJoin
* returns quickly (at the most helping to signal some common pool
* workers). These submitters would otherwise be blocked waiting
@@ -520,6 +533,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param pool the pool this thread works in
* @throws NullPointerException if the pool is null
+ * @return the new worker thread
*/
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
@@ -536,26 +550,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Per-thread records for threads that submit to pools. Currently
- * holds only pseudo-random seed / index that is used to choose
- * submission queues in method externalPush. In the future, this may
- * also incorporate a means to implement different task rejection
- * and resubmission policies.
- *
- * Seeds for submitters and workers/workQueues work in basically
- * the same way but are initialized and updated using slightly
- * different mechanics. Both are initialized using the same
- * approach as in class ThreadLocal, where successive values are
- * unlikely to collide with previous values. Seeds are then
- * randomly modified upon collisions using xorshifts, which
- * requires a non-zero seed.
- */
- static final class Submitter {
- int seed;
- Submitter(int s) { seed = s; }
- }
-
- /**
* Class for artificial tasks that are used to replace the target
* of local joins if they are removed from an interior queue slot
* in WorkQueue.tryRemoveAndExec. We don't need the proxy to
@@ -614,17 +608,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* do not want multiple WorkQueue instances or multiple queue
* arrays sharing cache lines. (It would be best for queue objects
* and their arrays to share, but there is nothing available to
- * help arrange that). Unfortunately, because they are recorded
- * in a common array, WorkQueue instances are often moved to be
- * adjacent by garbage collectors. To reduce impact, we use field
- * padding that works OK on common platforms; this effectively
- * trades off slightly slower average field access for the sake of
- * avoiding really bad worst-case access. (Until better JVM
- * support is in place, this padding is dependent on transient
- * properties of JVM field layout rules.) We also take care in
- * allocating, sizing and resizing the array. Non-shared queue
- * arrays are initialized by workers before use. Others are
- * allocated on first use.
+ * help arrange that). The @Contended annotation alerts JVMs to
+ * try to keep instances apart.
*/
static final class WorkQueue {
/**
@@ -650,13 +635,12 @@ public class ForkJoinPool extends AbstractExecutorService {
// Heuristic padding to ameliorate unfortunate memory placements
volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
- int seed; // for random scanning; initialize nonzero
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
- int hint; // steal or signal hint (index)
- int poolIndex; // index of this queue in pool (or 0)
- final int mode; // 0: lifo, > 0: fifo, < 0: shared
int nsteals; // number of steals
+ int hint; // steal index hint
+ short poolIndex; // index of this queue in pool
+ final short mode; // 0: lifo, > 0: fifo, < 0: shared
volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
@@ -674,8 +658,8 @@ public class ForkJoinPool extends AbstractExecutorService {
int seed) {
this.pool = pool;
this.owner = owner;
- this.mode = mode;
- this.seed = seed;
+ this.mode = (short)mode;
+ this.hint = seed; // store initial seed for runWorker
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
@@ -688,7 +672,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return (n >= 0) ? 0 : -n; // ignore transient negative
}
- /**
+ /**
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
@@ -713,20 +697,18 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
- int s = top, m, n;
+ int s = top, n;
if ((a = array) != null) { // ignore if queue removed
- int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
- U.putOrderedObject(a, j, task);
- if ((n = (top = s + 1) - base) <= 2) {
- if ((p = pool) != null)
- p.signalWork(this);
- }
+ int m = a.length - 1;
+ U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
+ if ((n = (top = s + 1) - base) <= 2)
+ (p = pool).signalWork(p.workQueues, this);
else if (n >= m)
growArray();
}
}
- /**
+ /**
* Initializes or doubles the capacity of array. Call either
* by owner or with lock held -- it is OK for base, but not
* top, to move while resizings are in progress.
@@ -784,9 +766,8 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((a = array) != null) {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null &&
- base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
+ base == b && U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
@@ -802,9 +783,8 @@ public class ForkJoinPool extends AbstractExecutorService {
int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
if (t != null) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
return t;
}
}
@@ -861,46 +841,43 @@ public class ForkJoinPool extends AbstractExecutorService {
ForkJoinTask.cancelIgnoringExceptions(t);
}
- /**
- * Computes next value for random probes. Scans don't require
- * a very high quality generator, but also not a crummy one.
- * Marsaglia xor-shift is cheap and works well enough. Note:
- * This is manually inlined in its usages in ForkJoinPool to
- * avoid writes inside busy scan loops.
- */
- final int nextSeed() {
- int r = seed;
- r ^= r << 13;
- r ^= r >>> 17;
- return seed = r ^= r << 5;
- }
-
// Specialized execution methods
/**
- * Pops and runs tasks until empty.
+ * Polls and runs tasks until empty.
*/
- private void popAndExecAll() {
- // A bit faster than repeated pop calls
- ForkJoinTask<?>[] a; int m, s; long j; ForkJoinTask<?> t;
- while ((a = array) != null && (m = a.length - 1) >= 0 &&
- (s = top - 1) - base >= 0 &&
- (t = ((ForkJoinTask<?>)
- U.getObject(a, j = ((m & s) << ASHIFT) + ABASE)))
- != null) {
- if (U.compareAndSwapObject(a, j, t, null)) {
- top = s;
- t.doExec();
- }
- }
+ final void pollAndExecAll() {
+ for (ForkJoinTask<?> t; (t = poll()) != null;)
+ t.doExec();
}
/**
- * Polls and runs tasks until empty.
+ * Executes a top-level task and any local tasks remaining
+ * after execution.
*/
- private void pollAndExecAll() {
- for (ForkJoinTask<?> t; (t = poll()) != null;)
- t.doExec();
+ final void runTask(ForkJoinTask<?> task) {
+ if ((currentSteal = task) != null) {
+ task.doExec();
+ ForkJoinTask<?>[] a = array;
+ int md = mode;
+ ++nsteals;
+ currentSteal = null;
+ if (md != 0)
+ pollAndExecAll();
+ else if (a != null) {
+ int s, m = a.length - 1;
+ while ((s = top - 1) - base >= 0) {
+ long i = ((m & s) << ASHIFT) + ABASE;
+ ForkJoinTask<?> t = (ForkJoinTask<?>)U.getObject(a, i);
+ if (t == null)
+ break;
+ if (U.compareAndSwapObject(a, i, t, null)) {
+ top = s;
+ t.doExec();
+ }
+ }
+ }
+ }
}
/**
@@ -911,13 +888,15 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return false if no progress can be made, else true
*/
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
- boolean stat = true, removed = false, empty = true;
+ boolean stat;
ForkJoinTask<?>[] a; int m, s, b, n;
- if ((a = array) != null && (m = a.length - 1) >= 0 &&
+ if (task != null && (a = array) != null && (m = a.length - 1) >= 0 &&
(n = (s = top) - (b = base)) > 0) {
+ boolean removed = false, empty = true;
+ stat = true;
for (ForkJoinTask<?> t;;) { // traverse from s to b
- int j = ((--s & m) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
+ long j = ((--s & m) << ASHIFT) + ABASE;
+ t = (ForkJoinTask<?>)U.getObject(a, j);
if (t == null) // inconsistent length
break;
else if (t == task) {
@@ -945,68 +924,95 @@ public class ForkJoinPool extends AbstractExecutorService {
break;
}
}
+ if (removed)
+ task.doExec();
}
- if (removed)
- task.doExec();
+ else
+ stat = false;
return stat;
}
/**
- * Polls for and executes the given task or any other task in
- * its CountedCompleter computation.
+ * Tries to poll for and execute the given task or any other
+ * task in its CountedCompleter computation.
*/
- final boolean pollAndExecCC(ForkJoinTask<?> root) {
- ForkJoinTask<?>[] a; int b; Object o;
- outer: while ((b = base) - top < 0 && (a = array) != null) {
+ final boolean pollAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int b; Object o; CountedCompleter<?> t, r;
+ if ((b = base) - top < 0 && (a = array) != null) {
long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) == null ||
- !(o instanceof CountedCompleter))
- break;
- for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
- if (r == root) {
- if (base == b &&
- U.compareAndSwapObject(a, j, t, null)) {
- base = b + 1;
- t.doExec();
+ if ((o = U.getObjectVolatile(a, j)) == null)
+ return true; // retry
+ if (o instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (base == b &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ U.putOrderedInt(this, QBASE, b + 1);
+ t.doExec();
+ }
return true;
}
- else
- break; // restart
+ else if ((r = r.completer) == null)
+ break; // not part of root computation
}
- if ((r = r.completer) == null)
- break outer; // not part of root computation
}
}
return false;
}
/**
- * Executes a top-level task and any local tasks remaining
- * after execution.
+ * Tries to pop and execute the given task or any other task
+ * in its CountedCompleter computation.
*/
- final void runTask(ForkJoinTask<?> t) {
- if (t != null) {
- (currentSteal = t).doExec();
- currentSteal = null;
- ++nsteals;
- if (base - top < 0) { // process remaining local tasks
- if (mode == 0)
- popAndExecAll();
- else
- pollAndExecAll();
+ final boolean externalPopAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
+ if (base - (s = top) < 0 && (a = array) != null) {
+ long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (U.compareAndSwapInt(this, QLOCK, 0, 1)) {
+ if (top == s && array == a &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ top = s - 1;
+ qlock = 0;
+ t.doExec();
+ }
+ else
+ qlock = 0;
+ }
+ return true;
+ }
+ else if ((r = r.completer) == null)
+ break;
+ }
}
}
+ return false;
}
/**
- * Executes a non-top-level (stolen) task.
+ * Internal version
*/
- final void runSubtask(ForkJoinTask<?> t) {
- if (t != null) {
- ForkJoinTask<?> ps = currentSteal;
- (currentSteal = t).doExec();
- currentSteal = ps;
+ final boolean internalPopAndExecCC(CountedCompleter<?> root) {
+ ForkJoinTask<?>[] a; int s; Object o; CountedCompleter<?> t, r;
+ if (base - (s = top) < 0 && (a = array) != null) {
+ long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) instanceof CountedCompleter) {
+ for (t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (U.compareAndSwapObject(a, j, t, null)) {
+ top = s - 1;
+ t.doExec();
+ }
+ return true;
+ }
+ else if ((r = r.completer) == null)
+ break;
+ }
+ }
}
+ return false;
}
/**
@@ -1023,6 +1029,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// Unsafe mechanics
private static final sun.misc.Unsafe U;
+ private static final long QBASE;
private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
@@ -1031,6 +1038,8 @@ public class ForkJoinPool extends AbstractExecutorService {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
+ QBASE = U.objectFieldOffset
+ (k.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
@@ -1047,13 +1056,6 @@ public class ForkJoinPool extends AbstractExecutorService {
// static fields (initialized in static initializer below)
/**
- * Creates a new ForkJoinWorkerThread. This factory is used unless
- * overridden in ForkJoinPool constructors.
- */
- public static final ForkJoinWorkerThreadFactory
- defaultForkJoinWorkerThreadFactory;
-
- /**
* Per-thread submission bookkeeping. Shared across all pools
* to reduce ThreadLocal pollution and because random motion
* to avoid contention in one pool is likely to hold for others.
@@ -1063,6 +1065,13 @@ public class ForkJoinPool extends AbstractExecutorService {
static final ThreadLocal<Submitter> submitters;
/**
+ * Creates a new ForkJoinWorkerThread. This factory is used unless
+ * overridden in ForkJoinPool constructors.
+ */
+ public static final ForkJoinWorkerThreadFactory
+ defaultForkJoinWorkerThreadFactory;
+
+ /**
* Permission required for callers of methods that may start or
* kill threads.
*/
@@ -1074,12 +1083,15 @@ public class ForkJoinPool extends AbstractExecutorService {
* to paranoically avoid potential initialization circularities
* as well as to simplify generated code.
*/
- static final ForkJoinPool commonPool;
+ static final ForkJoinPool common;
/**
- * Common pool parallelism. Must equal commonPool.parallelism.
+ * Common pool parallelism. To allow simpler use and management
+ * when common pool threads are disabled, we allow the underlying
+ * common.parallelism field to be zero, but in that case still report
+ * parallelism as 1 to reflect resulting caller-runs mechanics.
*/
- static final int commonPoolParallelism;
+ static final int commonParallelism;
/**
* Sequence number for creating workerNamePrefix.
@@ -1114,7 +1126,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Tolerance for idle timeouts, to cope with timer undershoots
*/
- private static final long TIMEOUT_SLOP = 2000000L; // 20ms
+ private static final long TIMEOUT_SLOP = 2000000L;
/**
* The maximum stolen->joining link depth allowed in method
@@ -1216,30 +1228,19 @@ public class ForkJoinPool extends AbstractExecutorService {
static final int FIFO_QUEUE = 1;
static final int SHARED_QUEUE = -1;
- // bounds for #steps in scan loop -- must be power 2 minus 1
- private static final int MIN_SCAN = 0x1ff; // cover estimation slop
- private static final int MAX_SCAN = 0x1ffff; // 4 * max workers
-
- // Instance fields
-
- /*
- * Field layout of this class tends to matter more than one would
- * like. Runtime layout order is only loosely related to
- * declaration order and may differ across JVMs, but the following
- * empirically works OK on current JVMs.
- */
-
// Heuristic padding to ameliorate unfortunate memory placements
volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
+ // Instance fields
volatile long stealCount; // collects worker counts
volatile long ctl; // main pool control
volatile int plock; // shutdown status and seqLock
volatile int indexSeed; // worker/submitter index seed
- final int config; // mode and parallelism level
+ final short parallelism; // parallelism level
+ final short mode; // LIFO/FIFO
WorkQueue[] workQueues; // main registry
final ForkJoinWorkerThreadFactory factory;
- final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
+ final UncaughtExceptionHandler ueh; // per-worker UEH
final String workerNamePrefix; // to create worker name string
volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
@@ -1254,24 +1255,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* a more conservative alternative to a pure spinlock.
*/
private int acquirePlock() {
- int spins = PL_SPINS, r = 0, ps, nps;
+ int spins = PL_SPINS, ps, nps;
for (;;) {
if (((ps = plock) & PL_LOCK) == 0 &&
U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
return nps;
- else if (r == 0) { // randomize spins if possible
- Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
- if ((t instanceof ForkJoinWorkerThread) &&
- (w = ((ForkJoinWorkerThread)t).workQueue) != null)
- r = w.seed;
- else if ((z = submitters.get()) != null)
- r = z.seed;
- else
- r = 1;
- }
else if (spins >= 0) {
- r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
- if (r >= 0)
+ if (ThreadLocalRandom.current().nextInt() >= 0)
--spins;
}
else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
@@ -1303,48 +1293,15 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Performs secondary initialization, called when plock is zero.
- * Creates workQueue array and sets plock to a valid value. The
- * lock body must be exception-free (so no try/finally) so we
- * optimistically allocate new array outside the lock and throw
- * away if (very rarely) not needed. (A similar tactic is used in
- * fullExternalPush.) Because the plock seq value can eventually
- * wrap around zero, this method harmlessly fails to reinitialize
- * if workQueues exists, while still advancing plock.
- *
- * Additionally tries to create the first worker.
- */
- private void initWorkers() {
- WorkQueue[] ws, nws; int ps;
- int p = config & SMASK; // find power of two table size
- int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
- n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
- n = (n + 1) << 1;
- if ((ws = workQueues) == null || ws.length == 0)
- nws = new WorkQueue[n];
- else
- nws = null;
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if (((ws = workQueues) == null || ws.length == 0) && nws != null)
- workQueues = nws;
- int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
- if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
- releasePlock(nps);
- tryAddWorker();
- }
-
- /**
* Tries to create and start one worker if fewer than target
* parallelism level exist. Adjusts counts etc on failure.
*/
private void tryAddWorker() {
- long c; int u;
+ long c; int u, e;
while ((u = (int)((c = ctl) >>> 32)) < 0 &&
- (u & SHORT_SIGN) != 0 && (int)c == 0) {
- long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
- ((u + UAC_UNIT) & UAC_MASK)) << 32;
+ (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) {
+ long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) |
+ ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e;
if (U.compareAndSwapLong(this, CTL, c, nc)) {
ForkJoinWorkerThreadFactory fac;
Throwable ex = null;
@@ -1355,8 +1312,8 @@ public class ForkJoinPool extends AbstractExecutorService {
wt.start();
break;
}
- } catch (Throwable e) {
- ex = e;
+ } catch (Throwable rex) {
+ ex = rex;
}
deregisterWorker(wt, ex);
break;
@@ -1377,14 +1334,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the worker's queue
*/
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
- Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
+ UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
wt.setDaemon(true);
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
s += SEED_INCREMENT) ||
s == 0); // skip 0
- WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
+ WorkQueue w = new WorkQueue(this, wt, mode, s);
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1404,14 +1361,15 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- w.eventCount = w.poolIndex = r; // volatile write orders
+ w.poolIndex = (short)r;
+ w.eventCount = r; // volatile write orders
ws[r] = w;
}
} finally {
if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
releasePlock(nps);
}
- wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
+ wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1)));
return w;
}
@@ -1421,17 +1379,17 @@ public class ForkJoinPool extends AbstractExecutorService {
* array, and adjusts counts. If pool is shutting down, tries to
* complete termination.
*
- * @param wt the worker thread or null if construction failed
+ * @param wt the worker thread, or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
- int ps;
+ int ps; long sc;
w.qlock = -1; // ensure set
- long ns = w.nsteals, sc; // collect steal count
do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
- sc = stealCount, sc + ns));
+ sc = stealCount,
+ sc + w.nsteals));
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1460,7 +1418,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (e > 0) { // activate or create replacement
if ((ws = workQueues) == null ||
(i = e & SMASK) >= ws.length ||
- (v = ws[i]) != null)
+ (v = ws[i]) == null)
break;
long nc = (((long)(v.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
@@ -1489,6 +1447,26 @@ public class ForkJoinPool extends AbstractExecutorService {
// Submissions
/**
+ * Per-thread records for threads that submit to pools. Currently
+ * holds only pseudo-random seed / index that is used to choose
+ * submission queues in method externalPush. In the future, this may
+ * also incorporate a means to implement different task rejection
+ * and resubmission policies.
+ *
+ * Seeds for submitters and workers/workQueues work in basically
+ * the same way but are initialized and updated using slightly
+ * different mechanics. Both are initialized using the same
+ * approach as in class ThreadLocal, where successive values are
+ * unlikely to collide with previous values. Seeds are then
+ * randomly modified upon collisions using xorshifts, which
+ * requires a non-zero seed.
+ */
+ static final class Submitter {
+ int seed;
+ Submitter(int s) { seed = s; }
+ }
+
+ /**
* Unless shutting down, adds the given task to a submission queue
* at submitter's current queue index (modulo submission
* range). Only the most common path is directly handled in this
@@ -1497,19 +1475,21 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param task the task. Caller must ensure non-null.
*/
final void externalPush(ForkJoinTask<?> task) {
- WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
- if ((z = submitters.get()) != null && plock > 0 &&
- (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
- (q = ws[m & z.seed & SQMASK]) != null &&
+ Submitter z = submitters.get();
+ WorkQueue q; int r, m, s, n, am; ForkJoinTask<?>[] a;
+ int ps = plock;
+ WorkQueue[] ws = workQueues;
+ if (z != null && ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 &&
+ (q = ws[m & (r = z.seed) & SQMASK]) != null && r != 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
- int b = q.base, s = q.top, n, an;
- if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
- int j = (((an - 1) & s) << ASHIFT) + ABASE;
+ if ((a = q.array) != null &&
+ (am = a.length - 1) > (n = (s = q.top) - q.base)) {
+ int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
q.top = s + 1; // push on to deque
q.qlock = 0;
- if (n <= 2)
- signalWork(q);
+ if (n <= 1)
+ signalWork(ws, q);
return;
}
q.qlock = 0;
@@ -1520,13 +1500,19 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Full version of externalPush. This method is called, among
* other times, upon the first submission of the first task to the
- * pool, so must perform secondary initialization (via
- * initWorkers). It also detects first submission by an external
- * thread by looking up its ThreadLocal, and creates a new shared
- * queue if the one at index if empty or contended. The plock lock
- * body must be exception-free (so no try/finally) so we
- * optimistically allocate new queues outside the lock and throw
- * them away if (very rarely) not needed.
+ * pool, so must perform secondary initialization. It also
+ * detects first submission by an external thread by looking up
+ * its ThreadLocal, and creates a new shared queue if the one at
+ * index if empty or contended. The plock lock body must be
+ * exception-free (so no try/finally) so we optimistically
+ * allocate new queues outside the lock and throw them away if
+ * (very rarely) not needed.
+ *
+ * Secondary initialization occurs when plock is zero, to create
+ * workQueue array and set plock to a valid value. This lock body
+ * must also be exception-free. Because the plock seq value can
+ * eventually wrap around zero, this method harmlessly fails to
+ * reinitialize if workQueues exists, while still advancing plock.
*/
private void fullExternalPush(ForkJoinTask<?> task) {
int r = 0; // random index seed
@@ -1537,17 +1523,31 @@ public class ForkJoinPool extends AbstractExecutorService {
r += SEED_INCREMENT) && r != 0)
submitters.set(z = new Submitter(r));
}
- else if (r == 0) { // move to a different index
+ else if (r == 0) { // move to a different index
r = z.seed;
- r ^= r << 13; // same xorshift as WorkQueues
+ r ^= r << 13; // same xorshift as WorkQueues
r ^= r >>> 17;
- z.seed = r ^ (r << 5);
+ z.seed = r ^= (r << 5);
}
- else if ((ps = plock) < 0)
+ if ((ps = plock) < 0)
throw new RejectedExecutionException();
else if (ps == 0 || (ws = workQueues) == null ||
- (m = ws.length - 1) < 0)
- initWorkers();
+ (m = ws.length - 1) < 0) { // initialize workQueues
+ int p = parallelism; // find power of two table size
+ int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
+ n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
+ n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
+ WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
+ new WorkQueue[n] : null);
+ if (((ps = plock) & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ if (((ws = workQueues) == null || ws.length == 0) && nws != null)
+ workQueues = nws;
+ int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
+ }
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
@@ -1565,7 +1565,7 @@ public class ForkJoinPool extends AbstractExecutorService {
q.qlock = 0; // unlock
}
if (submitted) {
- signalWork(q);
+ signalWork(ws, q);
return;
}
}
@@ -1573,6 +1573,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
q = new WorkQueue(this, null, SHARED_QUEUE, r);
+ q.poolIndex = (short)k;
if (((ps = plock) & PL_LOCK) != 0 ||
!U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
ps = acquirePlock();
@@ -1583,7 +1584,7 @@ public class ForkJoinPool extends AbstractExecutorService {
releasePlock(nps);
}
else
- r = 0; // try elsewhere while lock held
+ r = 0;
}
}
@@ -1594,41 +1595,42 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void incrementActiveCount() {
long c;
- do {} while (!U.compareAndSwapLong(this, CTL, c = ctl, c + AC_UNIT));
+ do {} while (!U.compareAndSwapLong
+ (this, CTL, c = ctl, ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
}
/**
* Tries to create or activate a worker if too few are active.
*
- * @param q the (non-null) queue holding tasks to be signalled
+ * @param ws the worker array to use to find signallees
+ * @param q if non-null, the queue holding tasks to be processed
*/
- final void signalWork(WorkQueue q) {
- int hint = q.poolIndex;
- long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
- while ((u = (int)((c = ctl) >>> 32)) < 0) {
- if ((e = (int)c) > 0) {
- if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
- (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
- long nc = (((long)(w.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.hint = hint;
- w.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = w.parker) != null)
- U.unpark(p);
- break;
- }
- if (q.top - q.base <= 0)
- break;
- }
- else
- break;
- }
- else {
+ final void signalWork(WorkQueue[] ws, WorkQueue q) {
+ for (;;) {
+ long c; int e, u, i; WorkQueue w; Thread p;
+ if ((u = (int)((c = ctl) >>> 32)) >= 0)
+ break;
+ if ((e = (int)c) <= 0) {
if ((short)u < 0)
tryAddWorker();
break;
}
+ if (ws == null || ws.length <= (i = e & SMASK) ||
+ (w = ws[i]) == null)
+ break;
+ long nc = (((long)(w.nextWait & E_MASK)) |
+ ((long)(u + UAC_UNIT)) << 32);
+ int ne = (e + E_SEQ) & E_MASK;
+ if (w.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = ne;
+ if ((p = w.parker) != null)
+ U.unpark(p);
+ break;
+ }
+ if (q != null && q.base >= q.top)
+ break;
}
}
@@ -1639,214 +1641,154 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
- do { w.runTask(scan(w)); } while (w.qlock >= 0);
+ for (int r = w.hint; scan(w, r) == 0; ) {
+ r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
+ }
}
/**
- * Scans for and, if found, returns one task, else possibly
+ * Scans for and, if found, runs one task, else possibly
* inactivates the worker. This method operates on single reads of
* volatile state and is designed to be re-invoked continuously,
* in part because it returns upon detecting inconsistencies,
* contention, or state changes that indicate possible success on
* re-invocation.
*
- * The scan searches for tasks across queues (starting at a random
- * index, and relying on registerWorker to irregularly scatter
- * them within array to avoid bias), checking each at least twice.
- * The scan terminates upon either finding a non-empty queue, or
- * completing the sweep. If the worker is not inactivated, it
- * takes and returns a task from this queue. Otherwise, if not
- * activated, it signals workers (that may include itself) and
- * returns so caller can retry. Also returns for true if the
- * worker array may have changed during an empty scan. On failure
- * to find a task, we take one of the following actions, after
- * which the caller will retry calling this method unless
- * terminated.
- *
- * * If pool is terminating, terminate the worker.
- *
- * * If not already enqueued, try to inactivate and enqueue the
- * worker on wait queue. Or, if inactivating has caused the pool
- * to be quiescent, relay to idleAwaitWork to possibly shrink
- * pool.
- *
- * * If already enqueued and none of the above apply, possibly
- * park awaiting signal, else lingering to help scan and signal.
- *
- * * If a non-empty queue discovered or left as a hint,
- * help wake up other workers before return.
+ * The scan searches for tasks across queues starting at a random
+ * index, checking each at least twice. The scan terminates upon
+ * either finding a non-empty queue, or completing the sweep. If
+ * the worker is not inactivated, it takes and runs a task from
+ * this queue. Otherwise, if not activated, it tries to activate
+ * itself or some other worker by signalling. On failure to find a
+ * task, returns (for retry) if pool state may have changed during
+ * an empty scan, or tries to inactivate if active, else possibly
+ * blocks or terminates via method awaitWork.
*
* @param w the worker (via its WorkQueue)
- * @return a task or null if none found
+ * @param r a random seed
+ * @return worker qlock status if would have waited, else 0
*/
- private final ForkJoinTask<?> scan(WorkQueue w) {
+ private final int scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
- int ps = plock; // read plock before ws
- if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
- int ec = w.eventCount; // ec is negative if inactive
- int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
- w.hint = -1; // update seed and clear hint
- int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
- do {
- WorkQueue q; ForkJoinTask<?>[] a; int b;
- if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
- (a = q.array) != null) { // probably nonempty
- int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- ForkJoinTask<?> t = (ForkJoinTask<?>)
- U.getObjectVolatile(a, i);
- if (q.base == b && ec >= 0 && t != null &&
- U.compareAndSwapObject(a, i, t, null)) {
- if ((q.base = b + 1) - q.top < 0)
- signalWork(q);
- return t; // taken
- }
- else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
- w.hint = (r + j) & m; // help signal below
- break; // cannot take
+ long c = ctl; // for consistency check
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 && w != null) {
+ for (int j = m + m + 1, ec = w.eventCount;;) {
+ WorkQueue q; int b, e; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
+ if ((q = ws[(r - j) & m]) != null &&
+ (b = q.base) - q.top < 0 && (a = q.array) != null) {
+ long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ if ((t = ((ForkJoinTask<?>)
+ U.getObjectVolatile(a, i))) != null) {
+ if (ec < 0)
+ helpRelease(c, ws, w, q, b);
+ else if (q.base == b &&
+ U.compareAndSwapObject(a, i, t, null)) {
+ U.putOrderedInt(q, QBASE, b + 1);
+ if ((b + 1) - q.top < 0)
+ signalWork(ws, q);
+ w.runTask(t);
+ }
}
+ break;
}
- } while (--j >= 0);
-
- int h, e, ns; long c, sc; WorkQueue q;
- if ((ns = w.nsteals) != 0) {
- if (U.compareAndSwapLong(this, STEALCOUNT,
- sc = stealCount, sc + ns))
- w.nsteals = 0; // collect steals and rescan
- }
- else if (plock != ps) // consistency check
- ; // skip
- else if ((e = (int)(c = ctl)) < 0)
- w.qlock = -1; // pool is terminating
- else {
- if ((h = w.hint) < 0) {
- if (ec >= 0) { // try to enqueue/inactivate
- long nc = (((long)ec |
- ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
- w.nextWait = e; // link and mark inactive
+ else if (--j < 0) {
+ if ((ec | (e = (int)c)) < 0) // inactive or terminating
+ return awaitWork(w, c, ec);
+ else if (ctl == c) { // try to inactivate and enqueue
+ long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
+ w.nextWait = e;
w.eventCount = ec | INT_SIGN;
- if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
- w.eventCount = ec; // unmark on CAS failure
- else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
- idleAwaitWork(w, nc, c);
- }
- else if (w.eventCount < 0 && !tryTerminate(false, false) &&
- ctl == c) { // block
- Thread wt = Thread.currentThread();
- Thread.interrupted(); // clear status
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt; // emulate LockSupport.park
- if (w.eventCount < 0) // recheck
- U.park(false, 0L);
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- }
- }
- if ((h >= 0 || (h = w.hint) >= 0) &&
- (ws = workQueues) != null && h < ws.length &&
- (q = ws[h]) != null) { // signal others before retry
- WorkQueue v; Thread p; int u, i, s;
- for (int n = (config & SMASK) >>> 1;;) {
- int idleCount = (w.eventCount < 0) ? 0 : -1;
- if (((s = idleCount - q.base + q.top) <= n &&
- (n = s) <= 0) ||
- (u = (int)((c = ctl) >>> 32)) >= 0 ||
- (e = (int)c) <= 0 || m < (i = e & SMASK) ||
- (v = ws[i]) == null)
- break;
- long nc = (((long)(v.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (v.eventCount != (e | INT_SIGN) ||
- !U.compareAndSwapLong(this, CTL, c, nc))
- break;
- v.hint = h;
- v.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = v.parker) != null)
- U.unpark(p);
- if (--n <= 0)
- break;
+ if (!U.compareAndSwapLong(this, CTL, c, nc))
+ w.eventCount = ec; // back out
}
+ break;
}
}
}
- return null;
+ return 0;
}
/**
- * If inactivating worker w has caused the pool to become
- * quiescent, checks for pool termination, and, so long as this is
- * not the only worker, waits for event for up to a given
- * duration. On timeout, if ctl has not changed, terminates the
- * worker, which will in turn wake up another worker to possibly
- * repeat this process.
+ * A continuation of scan(), possibly blocking or terminating
+ * worker w. Returns without blocking if pool state has apparently
+ * changed since last invocation. Also, if inactivating w has
+ * caused the pool to become quiescent, checks for pool
+ * termination, and, so long as this is not the only worker, waits
+ * for event for up to a given duration. On timeout, if ctl has
+ * not changed, terminates the worker, which will in turn wake up
+ * another worker to possibly repeat this process.
*
* @param w the calling worker
- * @param currentCtl the ctl value triggering possible quiescence
- * @param prevCtl the ctl value to restore if thread is terminated
- */
- private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
- if (w != null && w.eventCount < 0 &&
- !tryTerminate(false, false) && (int)prevCtl != 0) {
- int dc = -(short)(currentCtl >>> TC_SHIFT);
- long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
- long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
- Thread wt = Thread.currentThread();
- while (ctl == currentCtl) {
- Thread.interrupted(); // timed variant of version in scan()
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt;
- if (ctl == currentCtl)
- U.park(false, parkTime);
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- if (ctl != currentCtl)
- break;
- if (deadline - System.nanoTime() <= 0L &&
- U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
- w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
- w.qlock = -1; // shrink
- break;
+ * @param c the ctl value on entry to scan
+ * @param ec the worker's eventCount on entry to scan
+ */
+ private final int awaitWork(WorkQueue w, long c, int ec) {
+ int stat, ns; long parkTime, deadline;
+ if ((stat = w.qlock) >= 0 && w.eventCount == ec && ctl == c &&
+ !Thread.interrupted()) {
+ int e = (int)c;
+ int u = (int)(c >>> 32);
+ int d = (u >> UAC_SHIFT) + parallelism; // active count
+
+ if (e < 0 || (d <= 0 && tryTerminate(false, false)))
+ stat = w.qlock = -1; // pool is terminating
+ else if ((ns = w.nsteals) != 0) { // collect steals and retry
+ long sc;
+ w.nsteals = 0;
+ do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
+ sc = stealCount, sc + ns));
+ }
+ else {
+ long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L :
+ ((long)(w.nextWait & E_MASK)) | // ctl to restore
+ ((long)(u + UAC_UNIT)) << 32);
+ if (pc != 0L) { // timed wait if last waiter
+ int dc = -(short)(c >>> TC_SHIFT);
+ parkTime = (dc < 0 ? FAST_IDLE_TIMEOUT:
+ (dc + 1) * IDLE_TIMEOUT);
+ deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
+ }
+ else
+ parkTime = deadline = 0L;
+ if (w.eventCount == ec && ctl == c) {
+ Thread wt = Thread.currentThread();
+ U.putObject(wt, PARKBLOCKER, this);
+ w.parker = wt; // emulate LockSupport.park
+ if (w.eventCount == ec && ctl == c)
+ U.park(false, parkTime); // must recheck before park
+ w.parker = null;
+ U.putObject(wt, PARKBLOCKER, null);
+ if (parkTime != 0L && ctl == c &&
+ deadline - System.nanoTime() <= 0L &&
+ U.compareAndSwapLong(this, CTL, c, pc))
+ stat = w.qlock = -1; // shrink pool
}
}
}
+ return stat;
}
/**
- * Scans through queues looking for work while joining a task; if
- * any present, signals. May return early if more signalling is
- * detectably unneeded.
- *
- * @param task return early if done
- * @param origin an index to start scan
- */
- private void helpSignal(ForkJoinTask<?> task, int origin) {
- WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
- if (task != null && task.status >= 0 &&
- (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
- (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
- outer: for (int k = origin, j = m; j >= 0; --j) {
- WorkQueue q = ws[k++ & m];
- for (int n = m;;) { // limit to at most m signals
- if (task.status < 0)
- break outer;
- if (q == null ||
- ((s = -q.base + q.top) <= n && (n = s) <= 0))
- break;
- if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
- (e = (int)c) <= 0 || m < (i = e & SMASK) ||
- (w = ws[i]) == null)
- break outer;
- long nc = (((long)(w.nextWait & E_MASK)) |
- ((long)(u + UAC_UNIT) << 32));
- if (w.eventCount != (e | INT_SIGN))
- break outer;
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = w.parker) != null)
- U.unpark(p);
- if (--n <= 0)
- break;
- }
- }
+ * Possibly releases (signals) a worker. Called only from scan()
+ * when a worker with apparently inactive status finds a non-empty
+ * queue. This requires revalidating all of the associated state
+ * from caller.
+ */
+ private final void helpRelease(long c, WorkQueue[] ws, WorkQueue w,
+ WorkQueue q, int b) {
+ WorkQueue v; int e, i; Thread p;
+ if (w != null && w.eventCount < 0 && (e = (int)c) > 0 &&
+ ws != null && ws.length > (i = e & SMASK) &&
+ (v = ws[i]) != null && ctl == c) {
+ long nc = (((long)(v.nextWait & E_MASK)) |
+ ((long)((int)(c >>> 32) + UAC_UNIT)) << 32);
+ int ne = (e + E_SEQ) & E_MASK;
+ if (q != null && q.base == b && w.eventCount < 0 &&
+ v.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ v.eventCount = ne;
+ if ((p = v.parker) != null)
+ U.unpark(p);
}
}
}
@@ -1871,7 +1813,8 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
private int tryHelpStealer(WorkQueue joiner, ForkJoinTask<?> task) {
int stat = 0, steps = 0; // bound to avoid cycles
- if (joiner != null && task != null) { // hoist null checks
+ if (task != null && joiner != null &&
+ joiner.base - joiner.top >= 0) { // hoist checks
restart: for (;;) {
ForkJoinTask<?> subtask = task; // current target
for (WorkQueue j = joiner, v;;) { // v is stealer of subtask
@@ -1898,7 +1841,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
for (;;) { // help stealer or descend to its stealer
- ForkJoinTask[] a; int b;
+ ForkJoinTask[] a; int b;
if (subtask.status < 0) // surround probes with
continue restart; // consistency checks
if ((b = v.base) - v.top < 0 && (a = v.array) != null) {
@@ -1909,13 +1852,23 @@ public class ForkJoinPool extends AbstractExecutorService {
v.currentSteal != subtask)
continue restart; // stale
stat = 1; // apparent progress
- if (t != null && v.base == b &&
- U.compareAndSwapObject(a, i, t, null)) {
- v.base = b + 1; // help stealer
- joiner.runSubtask(t);
+ if (v.base == b) {
+ if (t == null)
+ break restart;
+ if (U.compareAndSwapObject(a, i, t, null)) {
+ U.putOrderedInt(v, QBASE, b + 1);
+ ForkJoinTask<?> ps = joiner.currentSteal;
+ int jt = joiner.top;
+ do {
+ joiner.currentSteal = t;
+ t.doExec(); // clear local tasks too
+ } while (task.status >= 0 &&
+ joiner.top != jt &&
+ (t = joiner.pop()) != null);
+ joiner.currentSteal = ps;
+ break restart;
+ }
}
- else if (v.base == b && ++steps == MAX_HELP)
- break restart; // v apparently stalled
}
else { // empty -- try to descend
ForkJoinTask<?> next = v.currentJoin;
@@ -1942,27 +1895,33 @@ public class ForkJoinPool extends AbstractExecutorService {
* and run tasks within the target's computation.
*
* @param task the task to join
- * @param mode if shared, exit upon completing any task
- * if all workers are active
- */
- private int helpComplete(ForkJoinTask<?> task, int mode) {
- WorkQueue[] ws; WorkQueue q; int m, n, s, u;
- if (task != null && (ws = workQueues) != null &&
- (m = ws.length - 1) >= 0) {
- for (int j = 1, origin = j;;) {
+ */
+ private int helpComplete(WorkQueue joiner, CountedCompleter<?> task) {
+ WorkQueue[] ws; int m;
+ int s = 0;
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0 &&
+ joiner != null && task != null) {
+ int j = joiner.poolIndex;
+ int scans = m + m + 1;
+ long c = 0L; // for stability check
+ for (int k = scans; ; j += 2) {
+ WorkQueue q;
if ((s = task.status) < 0)
- return s;
- if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
- origin = j;
- if (mode == SHARED_QUEUE &&
- ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
+ break;
+ else if (joiner.internalPopAndExecCC(task))
+ k = scans;
+ else if ((s = task.status) < 0)
+ break;
+ else if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
+ k = scans;
+ else if (--k < 0) {
+ if (c == (c = ctl))
break;
+ k = scans;
}
- else if ((j = (j + 2) & m) == origin)
- break;
}
}
- return 0;
+ return s;
}
/**
@@ -1971,17 +1930,22 @@ public class ForkJoinPool extends AbstractExecutorService {
* for blocking. Fails on contention or termination. Otherwise,
* adds a new thread if no idle workers are available and pool
* may become starved.
+ *
+ * @param c the assumed ctl value
*/
- final boolean tryCompensate() {
- int pc = config & SMASK, e, i, tc; long c;
- WorkQueue[] ws; WorkQueue w; Thread p;
- if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
- if (e != 0 && (i = e & SMASK) < ws.length &&
- (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
+ final boolean tryCompensate(long c) {
+ WorkQueue[] ws = workQueues;
+ int pc = parallelism, e = (int)c, m, tc;
+ if (ws != null && (m = ws.length - 1) >= 0 && e >= 0 && ctl == c) {
+ WorkQueue w = ws[e & m];
+ if (e != 0 && w != null) {
+ Thread p;
long nc = ((long)(w.nextWait & E_MASK) |
(c & (AC_MASK|TC_MASK)));
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
+ int ne = (e + E_SEQ) & E_MASK;
+ if (w.eventCount == (e | INT_SIGN) &&
+ U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = ne;
if ((p = w.parker) != null)
U.unpark(p);
return true; // replace with idle worker
@@ -2024,23 +1988,20 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
int s = 0;
- if (joiner != null && task != null && (s = task.status) >= 0) {
+ if (task != null && (s = task.status) >= 0 && joiner != null) {
ForkJoinTask<?> prevJoin = joiner.currentJoin;
joiner.currentJoin = task;
- do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
- joiner.tryRemoveAndExec(task)); // process local tasks
- if (s >= 0 && (s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 &&
- (task instanceof CountedCompleter))
- s = helpComplete(task, LIFO_QUEUE);
- }
+ do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
+ (s = task.status) >= 0);
+ if (s >= 0 && (task instanceof CountedCompleter))
+ s = helpComplete(joiner, (CountedCompleter<?>)task);
+ long cc = 0; // for stability checks
while (s >= 0 && (s = task.status) >= 0) {
- if ((!joiner.isEmpty() || // try helping
- (s = tryHelpStealer(joiner, task)) == 0) &&
+ if ((s = tryHelpStealer(joiner, task)) == 0 &&
(s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 && tryCompensate()) {
+ if (!tryCompensate(cc))
+ cc = ctl;
+ else {
if (task.trySetSignal() && (s = task.status) >= 0) {
synchronized (task) {
if (task.status >= 0) {
@@ -2053,9 +2014,11 @@ public class ForkJoinPool extends AbstractExecutorService {
task.notifyAll();
}
}
- long c; // re-activate
+ long c; // reactivate
do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
+ (this, CTL, c = ctl,
+ ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
}
}
}
@@ -2077,15 +2040,11 @@ public class ForkJoinPool extends AbstractExecutorService {
if (joiner != null && task != null && (s = task.status) >= 0) {
ForkJoinTask<?> prevJoin = joiner.currentJoin;
joiner.currentJoin = task;
- do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
- joiner.tryRemoveAndExec(task));
- if (s >= 0 && (s = task.status) >= 0) {
- helpSignal(task, joiner.poolIndex);
- if ((s = task.status) >= 0 &&
- (task instanceof CountedCompleter))
- s = helpComplete(task, LIFO_QUEUE);
- }
- if (s >= 0 && joiner.isEmpty()) {
+ do {} while (joiner.tryRemoveAndExec(task) && // process local tasks
+ (s = task.status) >= 0);
+ if (s >= 0) {
+ if (task instanceof CountedCompleter)
+ helpComplete(joiner, (CountedCompleter<?>)task);
do {} while (task.status >= 0 &&
tryHelpStealer(joiner, task) > 0);
}
@@ -2095,29 +2054,22 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Returns a (probably) non-empty steal queue, if one is found
- * during a random, then cyclic scan, else null. This method must
- * be retried by caller if, by the time it tries to use the queue,
- * it is empty.
- * @param r a (random) seed for scanning
- */
- private WorkQueue findNonEmptyStealQueue(int r) {
- for (WorkQueue[] ws;;) {
- int ps = plock, m, n;
- if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
- return null;
- for (int j = (m + 1) << 2; ;) {
- WorkQueue q = ws[(((r + j) << 1) | 1) & m];
- if (q != null && (n = q.base - q.top) < 0) {
- if (n < -1)
- signalWork(q);
- return q;
- }
- else if (--j < 0) {
- if (plock == ps)
- return null;
- break;
+ * during a scan, else null. This method must be retried by
+ * caller if, by the time it tries to use the queue, it is empty.
+ */
+ private WorkQueue findNonEmptyStealQueue() {
+ int r = ThreadLocalRandom.current().nextInt();
+ for (;;) {
+ int ps = plock, m; WorkQueue[] ws; WorkQueue q;
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
+ for (int j = (m + 1) << 2; j >= 0; --j) {
+ if ((q = ws[(((r - j) << 1) | 1) & m]) != null &&
+ q.base - q.top < 0)
+ return q;
}
}
+ if (plock == ps)
+ return null;
}
}
@@ -2128,38 +2080,36 @@ public class ForkJoinPool extends AbstractExecutorService {
* find tasks either.
*/
final void helpQuiescePool(WorkQueue w) {
+ ForkJoinTask<?> ps = w.currentSteal;
for (boolean active = true;;) {
- ForkJoinTask<?> localTask; // exhaust local queue
- while ((localTask = w.nextLocalTask()) != null)
- localTask.doExec();
- // Similar to loop in scan(), but ignoring submissions
- WorkQueue q = findNonEmptyStealQueue(w.nextSeed());
- if (q != null) {
- ForkJoinTask<?> t; int b;
+ long c; WorkQueue q; ForkJoinTask<?> t; int b;
+ while ((t = w.nextLocalTask()) != null)
+ t.doExec();
+ if ((q = findNonEmptyStealQueue()) != null) {
if (!active) { // re-establish active count
- long c;
active = true;
do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
+ (this, CTL, c = ctl,
+ ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))));
+ }
+ if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
+ (w.currentSteal = t).doExec();
+ w.currentSteal = ps;
}
- if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
- w.runSubtask(t);
}
- else {
- long c;
- if (active) { // decrement active count without queuing
+ else if (active) { // decrement active count without queuing
+ long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT);
+ if ((int)(nc >> AC_SHIFT) + parallelism == 0)
+ break; // bypass decrement-then-increment
+ if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c -= AC_UNIT));
- }
- else
- c = ctl; // re-increment on exit
- if ((int)(c >> AC_SHIFT) + (config & SMASK) == 0) {
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
- break;
- }
}
+ else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 &&
+ U.compareAndSwapLong
+ (this, CTL, c, ((c & ~AC_MASK) |
+ ((c & AC_MASK) + AC_UNIT))))
+ break;
}
}
@@ -2173,7 +2123,7 @@ public class ForkJoinPool extends AbstractExecutorService {
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
- if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
+ if ((q = findNonEmptyStealQueue()) == null)
return null;
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
return t;
@@ -2229,7 +2179,7 @@ public class ForkJoinPool extends AbstractExecutorService {
static int getSurplusQueuedTaskCount() {
Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
- int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
+ int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).parallelism;
int n = (q = wt.workQueue).top - q.base;
int a = (int)(pool.ctl >> AC_SHIFT) + p;
return n - (a > (p >>>= 1) ? 0 :
@@ -2258,45 +2208,47 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
- if (this == commonPool) // cannot shut down
+ int ps;
+ if (this == common) // cannot shut down
return false;
+ if ((ps = plock) >= 0) { // enable by setting plock
+ if (!enable)
+ return false;
+ if ((ps & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
+ }
for (long c;;) {
- if (((c = ctl) & STOP_BIT) != 0) { // already terminating
- if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
+ if (((c = ctl) & STOP_BIT) != 0) { // already terminating
+ if ((short)(c >>> TC_SHIFT) + parallelism <= 0) {
synchronized (this) {
- notifyAll(); // signal when 0 workers
+ notifyAll(); // signal when 0 workers
}
}
return true;
}
- if (plock >= 0) { // not yet enabled
- int ps;
- if (!enable)
- return false;
- if (((ps = plock) & PL_LOCK) != 0 ||
- !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
- ps = acquirePlock();
- if (!U.compareAndSwapInt(this, PLOCK, ps, SHUTDOWN))
- releasePlock(SHUTDOWN);
- }
- if (!now) { // check if idle & no tasks
- if ((int)(c >> AC_SHIFT) != -(config & SMASK) ||
- hasQueuedSubmissions())
+ if (!now) { // check if idle & no tasks
+ WorkQueue[] ws; WorkQueue w;
+ if ((int)(c >> AC_SHIFT) + parallelism > 0)
return false;
- // Check for unqueued inactive workers. One pass suffices.
- WorkQueue[] ws = workQueues; WorkQueue w;
- if (ws != null) {
- for (int i = 1; i < ws.length; i += 2) {
- if ((w = ws[i]) != null && w.eventCount >= 0)
+ if ((ws = workQueues) != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ if ((w = ws[i]) != null &&
+ (!w.isEmpty() ||
+ ((i & 1) != 0 && w.eventCount >= 0))) {
+ signalWork(ws, w);
return false;
+ }
}
}
}
if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
for (int pass = 0; pass < 3; ++pass) {
- WorkQueue[] ws = workQueues;
- if (ws != null) {
- WorkQueue w; Thread wt;
+ WorkQueue[] ws; WorkQueue w; Thread wt;
+ if ((ws = workQueues) != null) {
int n = ws.length;
for (int i = 0; i < n; ++i) {
if ((w = ws[i]) != null) {
@@ -2307,7 +2259,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (!wt.isInterrupted()) {
try {
wt.interrupt();
- } catch (SecurityException ignore) {
+ } catch (Throwable ignore) {
}
}
U.unpark(wt);
@@ -2318,7 +2270,7 @@ public class ForkJoinPool extends AbstractExecutorService {
// Wake up workers parked on event queue
int i, e; long cc; Thread p;
while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
- (i = e & SMASK) < n &&
+ (i = e & SMASK) < n && i >= 0 &&
(w = ws[i]) != null) {
long nc = ((long)(w.nextWait & E_MASK) |
((cc + AC_UNIT) & AC_MASK) |
@@ -2344,9 +2296,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* least one task.
*/
static WorkQueue commonSubmitterQueue() {
- ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
+ Submitter z; ForkJoinPool p; WorkQueue[] ws; int m, r;
return ((z = submitters.get()) != null &&
- (p = commonPool) != null &&
+ (p = common) != null &&
(ws = p.workQueues) != null &&
(m = ws.length - 1) >= 0) ?
ws[m & z.seed & SQMASK] : null;
@@ -2355,127 +2307,57 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Tries to pop the given task from submitter's queue in common pool.
*/
- static boolean tryExternalUnpush(ForkJoinTask<?> t) {
- ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
- ForkJoinTask<?>[] a; int m, s;
- if (t != null &&
- (z = submitters.get()) != null &&
- (p = commonPool) != null &&
- (ws = p.workQueues) != null &&
- (m = ws.length - 1) >= 0 &&
- (q = ws[m & z.seed & SQMASK]) != null &&
- (s = q.top) != q.base &&
- (a = q.array) != null) {
+ final boolean tryExternalUnpush(ForkJoinTask<?> task) {
+ WorkQueue joiner; ForkJoinTask<?>[] a; int m, s;
+ Submitter z = submitters.get();
+ WorkQueue[] ws = workQueues;
+ boolean popped = false;
+ if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
+ (joiner = ws[z.seed & m & SQMASK]) != null &&
+ joiner.base != (s = joiner.top) &&
+ (a = joiner.array) != null) {
long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
- if (U.getObject(a, j) == t &&
- U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s && // recheck
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- q.qlock = 0;
- return true;
+ if (U.getObject(a, j) == task &&
+ U.compareAndSwapInt(joiner, QLOCK, 0, 1)) {
+ if (joiner.top == s && joiner.array == a &&
+ U.compareAndSwapObject(a, j, task, null)) {
+ joiner.top = s - 1;
+ popped = true;
}
- q.qlock = 0;
+ joiner.qlock = 0;
}
}
- return false;
+ return popped;
}
- /**
- * Tries to pop and run local tasks within the same computation
- * as the given root. On failure, tries to help complete from
- * other queues via helpComplete.
- */
- private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
- ForkJoinTask<?>[] a; int m;
- if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
- root != null && root.status >= 0) {
- for (;;) {
- int s, u; Object o; CountedCompleter<?> task = null;
- if ((s = q.top) - q.base > 0) {
- long j = ((m & (s - 1)) << ASHIFT) + ABASE;
- if ((o = U.getObject(a, j)) != null &&
- (o instanceof CountedCompleter)) {
- CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
- do {
- if (r == root) {
- if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s &&
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- task = t;
- }
- q.qlock = 0;
- }
- break;
- }
- } while ((r = r.completer) != null);
- }
- }
- if (task != null)
- task.doExec();
- if (root.status < 0 ||
- (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
+ final int externalHelpComplete(CountedCompleter<?> task) {
+ WorkQueue joiner; int m, j;
+ Submitter z = submitters.get();
+ WorkQueue[] ws = workQueues;
+ int s = 0;
+ if (z != null && ws != null && (m = ws.length - 1) >= 0 &&
+ (joiner = ws[(j = z.seed) & m & SQMASK]) != null && task != null) {
+ int scans = m + m + 1;
+ long c = 0L; // for stability check
+ j |= 1; // poll odd queues
+ for (int k = scans; ; j += 2) {
+ WorkQueue q;
+ if ((s = task.status) < 0)
break;
- if (task == null) {
- helpSignal(root, q.poolIndex);
- if (root.status >= 0)
- helpComplete(root, SHARED_QUEUE);
+ else if (joiner.externalPopAndExecCC(task))
+ k = scans;
+ else if ((s = task.status) < 0)
break;
+ else if ((q = ws[j & m]) != null && q.pollAndExecCC(task))
+ k = scans;
+ else if (--k < 0) {
+ if (c == (c = ctl))
+ break;
+ k = scans;
}
}
}
- }
-
- /**
- * Tries to help execute or signal availability of the given task
- * from submitter's queue in common pool.
- */
- static void externalHelpJoin(ForkJoinTask<?> t) {
- // Some hard-to-avoid overlap with tryExternalUnpush
- ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
- ForkJoinTask<?>[] a; int m, s, n;
- if (t != null &&
- (z = submitters.get()) != null &&
- (p = commonPool) != null &&
- (ws = p.workQueues) != null &&
- (m = ws.length - 1) >= 0 &&
- (q = ws[m & z.seed & SQMASK]) != null &&
- (a = q.array) != null) {
- int am = a.length - 1;
- if ((s = q.top) != q.base) {
- long j = ((am & (s - 1)) << ASHIFT) + ABASE;
- if (U.getObject(a, j) == t &&
- U.compareAndSwapInt(q, QLOCK, 0, 1)) {
- if (q.array == a && q.top == s &&
- U.compareAndSwapObject(a, j, t, null)) {
- q.top = s - 1;
- q.qlock = 0;
- t.doExec();
- }
- else
- q.qlock = 0;
- }
- }
- if (t.status >= 0) {
- if (t instanceof CountedCompleter)
- p.externalHelpComplete(q, t);
- else
- p.helpSignal(t, q.poolIndex);
- }
- }
- }
-
- /**
- * Restricted version of helpQuiescePool for external callers
- */
- static void externalHelpQuiescePool() {
- ForkJoinPool p; ForkJoinTask<?> t; WorkQueue q; int b;
- if ((p = commonPool) != null &&
- (q = p.findNonEmptyStealQueue(1)) != null &&
- (b = q.base) - q.top < 0 &&
- (t = q.pollAt(b)) != null)
- t.doExec();
+ return s;
}
// Exported methods
@@ -2529,49 +2411,65 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
- Thread.UncaughtExceptionHandler handler,
+ UncaughtExceptionHandler handler,
boolean asyncMode) {
+ this(checkParallelism(parallelism),
+ checkFactory(factory),
+ handler,
+ (asyncMode ? FIFO_QUEUE : LIFO_QUEUE),
+ "ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
- if (factory == null)
- throw new NullPointerException();
+ }
+
+ private static int checkParallelism(int parallelism) {
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
- this.factory = factory;
- this.ueh = handler;
- this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
- long np = (long)(-parallelism); // offset ctl counts
- this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
- int pn = nextPoolId();
- StringBuilder sb = new StringBuilder("ForkJoinPool-");
- sb.append(Integer.toString(pn));
- sb.append("-worker-");
- this.workerNamePrefix = sb.toString();
+ return parallelism;
+ }
+
+ private static ForkJoinWorkerThreadFactory checkFactory
+ (ForkJoinWorkerThreadFactory factory) {
+ if (factory == null)
+ throw new NullPointerException();
+ return factory;
}
/**
- * Constructor for common pool, suitable only for static initialization.
- * Basically the same as above, but uses smallest possible initial footprint.
+ * Creates a {@code ForkJoinPool} with the given parameters, without
+ * any security checks or parameter validation. Invoked directly by
+ * makeCommonPool.
*/
- ForkJoinPool(int parallelism, long ctl,
- ForkJoinWorkerThreadFactory factory,
- Thread.UncaughtExceptionHandler handler) {
- this.config = parallelism;
- this.ctl = ctl;
+ private ForkJoinPool(int parallelism,
+ ForkJoinWorkerThreadFactory factory,
+ UncaughtExceptionHandler handler,
+ int mode,
+ String workerNamePrefix) {
+ this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
- this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
+ this.mode = (short)mode;
+ this.parallelism = (short)parallelism;
+ long np = (long)(-parallelism); // offset ctl counts
+ this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
/**
- * Returns the common pool instance.
+ * Returns the common pool instance. This pool is statically
+ * constructed; its run state is unaffected by attempts to {@link
+ * #shutdown} or {@link #shutdownNow}. However this pool and any
+ * ongoing processing are automatically terminated upon program
+ * {@link System#exit}. Any program that relies on asynchronous
+ * task processing to complete before program termination should
+ * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
+ * before exit.
*
* @return the common pool instance
* @since 1.8
* @hide
*/
public static ForkJoinPool commonPool() {
- // assert commonPool != null : "static init error";
- return commonPool;
+ // assert common != null : "static init error";
+ return common;
}
// Execution methods
@@ -2627,7 +2525,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
- job = new ForkJoinTask.AdaptedRunnableAction(task);
+ job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
@@ -2729,7 +2627,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @return the handler, or {@code null} if none
*/
- public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ public UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}
@@ -2739,7 +2637,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
- return config & SMASK;
+ int par;
+ return ((par = parallelism) > 0) ? par : 1;
}
/**
@@ -2750,7 +2649,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @hide
*/
public static int getCommonPoolParallelism() {
- return commonPoolParallelism;
+ return commonParallelism;
}
/**
@@ -2762,7 +2661,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of worker threads
*/
public int getPoolSize() {
- return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
+ return parallelism + (short)(ctl >>> TC_SHIFT);
}
/**
@@ -2772,7 +2671,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return (config >>> 16) == FIFO_QUEUE;
+ return mode == FIFO_QUEUE;
}
/**
@@ -2803,7 +2702,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
+ int r = parallelism + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2819,7 +2718,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
+ return parallelism + (int)(ctl >> AC_SHIFT) <= 0;
}
/**
@@ -2982,7 +2881,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- int pc = (config & SMASK);
+ int pc = parallelism;
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
@@ -3012,11 +2911,6 @@ public class ForkJoinPool extends AbstractExecutorService {
* already shut down. Tasks that are in the process of being
* submitted concurrently during the course of this method may or
* may not be rejected.
- *
- * @throws SecurityException if a security manager exists and
- * the caller is not permitted to modify threads
- * because it does not hold {@link
- * java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public void shutdown() {
checkPermission();
@@ -3051,7 +2945,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean isTerminated() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) == -(config & SMASK));
+ (short)(c >>> TC_SHIFT) + parallelism <= 0);
}
/**
@@ -3070,7 +2964,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean isTerminating() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) != -(config & SMASK));
+ (short)(c >>> TC_SHIFT) + parallelism > 0);
}
/**
@@ -3085,9 +2979,10 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Blocks until all tasks have completed execution after a
* shutdown request, or the timeout occurs, or the current thread
- * is interrupted, whichever happens first. Note that the {@link
- * #commonPool()} never terminates until program shutdown so
- * this method will always time out.
+ * is interrupted, whichever happens first. Because the {@link
+ * #commonPool()} never terminates until program shutdown, when
+ * applied to the common pool, this method is equivalent to {@link
+ * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
@@ -3097,6 +2992,12 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (this == common) {
+ awaitQuiescence(timeout, unit);
+ return false;
+ }
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
@@ -3117,6 +3018,59 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
+ * If called by a ForkJoinTask operating in this pool, equivalent
+ * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
+ * waits and/or attempts to assist performing tasks until this
+ * pool {@link #isQuiescent} or the indicated timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return {@code true} if quiescent; {@code false} if the
+ * timeout elapsed.
+ */
+ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
+ long nanos = unit.toNanos(timeout);
+ ForkJoinWorkerThread wt;
+ Thread thread = Thread.currentThread();
+ if ((thread instanceof ForkJoinWorkerThread) &&
+ (wt = (ForkJoinWorkerThread)thread).pool == this) {
+ helpQuiescePool(wt.workQueue);
+ return true;
+ }
+ long startTime = System.nanoTime();
+ WorkQueue[] ws;
+ int r = 0, m;
+ boolean found = true;
+ while (!isQuiescent() && (ws = workQueues) != null &&
+ (m = ws.length - 1) >= 0) {
+ if (!found) {
+ if ((System.nanoTime() - startTime) > nanos)
+ return false;
+ Thread.yield(); // cannot block
+ }
+ found = false;
+ for (int j = (m + 1) << 2; j >= 0; --j) {
+ ForkJoinTask<?> t; WorkQueue q; int b;
+ if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
+ found = true;
+ if ((t = q.pollAt(b)) != null)
+ t.doExec();
+ break;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Waits and/or attempts to assist performing tasks indefinitely
+ * until the {@link #commonPool()} {@link #isQuiescent}.
+ */
+ static void quiesceCommonPool() {
+ common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ }
+
+ /**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
@@ -3125,9 +3079,9 @@ public class ForkJoinPool extends AbstractExecutorService {
* not necessary. Method {@code block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
- * thread invoking {@link ForkJoinPool#managedBlock}. The
- * unusual methods in this API accommodate synchronizers that may,
- * but don't usually, block for long periods. Similarly, they
+ * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
+ * The unusual methods in this API accommodate synchronizers that
+ * may, but don't usually, block for long periods. Similarly, they
* allow more efficient internal handling of cases in which
* additional workers may be, but usually are not, needed to
* ensure sufficient parallelism. Toward this end,
@@ -3185,6 +3139,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Returns {@code true} if blocking is unnecessary.
+ * @return {@code true} if blocking is unnecessary
*/
boolean isReleasable();
}
@@ -3214,21 +3169,8 @@ public class ForkJoinPool extends AbstractExecutorService {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
- while (!blocker.isReleasable()) { // variant of helpSignal
- WorkQueue[] ws; WorkQueue q; int m, u;
- if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
- for (int i = 0; i <= m; ++i) {
- if (blocker.isReleasable())
- return;
- if ((q = ws[i]) != null && q.base - q.top < 0) {
- p.signalWork(q);
- if ((u = (int)(p.ctl >>> 32)) >= 0 ||
- (u >> UAC_SHIFT) >= 0)
- break;
- }
- }
- }
- if (p.tryCompensate()) {
+ while (!blocker.isReleasable()) {
+ if (p.tryCompensate(p.ctl)) {
try {
do {} while (!blocker.isReleasable() &&
!blocker.block());
@@ -3266,6 +3208,7 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final long STEALCOUNT;
private static final long PLOCK;
private static final long INDEXSEED;
+ private static final long QBASE;
private static final long QLOCK;
static {
@@ -3285,6 +3228,8 @@ public class ForkJoinPool extends AbstractExecutorService {
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
Class<?> wk = WorkQueue.class;
+ QBASE = U.objectFieldOffset
+ (wk.getDeclaredField("base"));
QLOCK = U.objectFieldOffset
(wk.getDeclaredField("qlock"));
Class<?> ak = ForkJoinTask[].class;
@@ -3298,45 +3243,51 @@ public class ForkJoinPool extends AbstractExecutorService {
}
submitters = new ThreadLocal<Submitter>();
- ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
+ defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
- /*
- * Establish common pool parameters. For extra caution,
- * computations to set up common pool state are here; the
- * constructor just assigns these values to fields.
- */
+ common = java.security.AccessController.doPrivileged
+ (new java.security.PrivilegedAction<ForkJoinPool>() {
+ public ForkJoinPool run() { return makeCommonPool(); }});
+ int par = common.parallelism; // report 1 even if threads disabled
+ commonParallelism = par > 0 ? par : 1;
+ }
- int par = 0;
- Thread.UncaughtExceptionHandler handler = null;
- try { // TBD: limit or report ignored exceptions?
+ /**
+ * Creates and returns the common pool, respecting user settings
+ * specified via system properties.
+ */
+ private static ForkJoinPool makeCommonPool() {
+ int parallelism = -1;
+ ForkJoinWorkerThreadFactory factory
+ = defaultForkJoinWorkerThreadFactory;
+ UncaughtExceptionHandler handler = null;
+ try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
- String hp = System.getProperty
- ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
String fp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.threadFactory");
+ String hp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
+ if (pp != null)
+ parallelism = Integer.parseInt(pp);
if (fp != null)
- fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
- getSystemClassLoader().loadClass(fp).newInstance());
+ factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
+ getSystemClassLoader().loadClass(fp).newInstance());
if (hp != null)
- handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
+ handler = ((UncaughtExceptionHandler)ClassLoader.
getSystemClassLoader().loadClass(hp).newInstance());
- if (pp != null)
- par = Integer.parseInt(pp);
} catch (Exception ignore) {
}
- if (par <= 0)
- par = Runtime.getRuntime().availableProcessors();
- if (par > MAX_CAP)
- par = MAX_CAP;
- commonPoolParallelism = par;
- long np = (long)(-par); // precompute initial ctl value
- long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
-
- commonPool = new ForkJoinPool(par, ct, fac, handler);
+ if (parallelism < 0 && // default 1 less than #cores
+ (parallelism = Runtime.getRuntime().availableProcessors() - 1) < 0)
+ parallelism = 0;
+ if (parallelism > MAX_CAP)
+ parallelism = MAX_CAP;
+ return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
+ "ForkJoinPool.commonPool-worker-");
}
}
diff --git a/luni/src/main/java/java/util/concurrent/ForkJoinTask.java b/luni/src/main/java/java/util/concurrent/ForkJoinTask.java
index 818788e..6d25775 100644
--- a/luni/src/main/java/java/util/concurrent/ForkJoinTask.java
+++ b/luni/src/main/java/java/util/concurrent/ForkJoinTask.java
@@ -136,7 +136,7 @@ import java.lang.reflect.Constructor;
* supports other methods and techniques (for example the use of
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
- * are not statically structured as DAGs. To support such usages a
+ * are not statically structured as DAGs. To support such usages, a
* ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
* value using {@link #setForkJoinTaskTag} or {@link
* #compareAndSetForkJoinTaskTag} and checked using {@link
@@ -286,25 +286,35 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int externalAwaitDone() {
int s;
- ForkJoinPool.externalHelpJoin(this);
- boolean interrupted = false;
- while ((s = status) >= 0) {
- if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
- synchronized (this) {
- if (status >= 0) {
- try {
- wait();
- } catch (InterruptedException ie) {
- interrupted = true;
+ ForkJoinPool cp = ForkJoinPool.common;
+ if ((s = status) >= 0) {
+ if (cp != null) {
+ if (this instanceof CountedCompleter)
+ s = cp.externalHelpComplete((CountedCompleter<?>)this);
+ else if (cp.tryExternalUnpush(this))
+ s = doExec();
+ }
+ if (s >= 0 && (s = status) >= 0) {
+ boolean interrupted = false;
+ do {
+ if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
+ synchronized (this) {
+ if (status >= 0) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ }
+ else
+ notifyAll();
}
}
- else
- notifyAll();
- }
+ } while ((s = status) >= 0);
+ if (interrupted)
+ Thread.currentThread().interrupt();
}
}
- if (interrupted)
- Thread.currentThread().interrupt();
return s;
}
@@ -313,9 +323,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
+ ForkJoinPool cp = ForkJoinPool.common;
if (Thread.interrupted())
throw new InterruptedException();
- ForkJoinPool.externalHelpJoin(this);
+ if ((s = status) >= 0 && cp != null) {
+ if (this instanceof CountedCompleter)
+ cp.externalHelpComplete((CountedCompleter<?>)this);
+ else if (cp.tryExternalUnpush(this))
+ doExec();
+ }
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
@@ -329,7 +345,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return s;
}
-
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
@@ -601,14 +616,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* A version of "sneaky throw" to relay exceptions
*/
- static void rethrow(final Throwable ex) {
- if (ex != null) {
- if (ex instanceof Error)
- throw (Error)ex;
- if (ex instanceof RuntimeException)
- throw (RuntimeException)ex;
- throw uncheckedThrowable(ex, RuntimeException.class);
- }
+ static void rethrow(Throwable ex) {
+ if (ex != null)
+ ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
/**
@@ -617,8 +627,8 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* unchecked exceptions
*/
@SuppressWarnings("unchecked") static <T extends Throwable>
- T uncheckedThrowable(final Throwable t, final Class<T> c) {
- return (T)t; // rely on vacuous cast
+ void uncheckedThrow(Throwable t) throws T {
+ throw (T)t; // rely on vacuous cast
}
/**
@@ -653,7 +663,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
- ForkJoinPool.commonPool.externalPush(this);
+ ForkJoinPool.common.externalPush(this);
return this;
}
@@ -774,8 +784,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @param tasks the collection of tasks
* @return the tasks argument, to simplify usage
* @throws NullPointerException if tasks or any element are null
-
- * @hide
*/
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
@@ -831,7 +839,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* <p>This method is designed to be invoked by <em>other</em>
* tasks. To terminate the current task, you can just return or
* throw an unchecked exception from its computation method, or
- * invoke {@link #completeExceptionally}.
+ * invoke {@link #completeExceptionally(Throwable)}.
*
* @param mayInterruptIfRunning this value has no effect in the
* default implementation because interrupts are not used to
@@ -984,6 +992,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// Messy in part because we measure in nanosecs, but wait in millisecs
int s; long ms;
long ns = unit.toNanos(timeout);
+ ForkJoinPool cp;
if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
@@ -995,8 +1004,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
w = wt.workQueue;
p.helpJoinOnce(w, this); // no retries on failure
}
- else
- ForkJoinPool.externalHelpJoin(this);
+ else if ((cp = ForkJoinPool.common) != null) {
+ if (this instanceof CountedCompleter)
+ cp.externalHelpComplete((CountedCompleter<?>)this);
+ else if (cp.tryExternalUnpush(this))
+ doExec();
+ }
boolean canBlock = false;
boolean interrupted = false;
try {
@@ -1004,7 +1017,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this);
else if (!canBlock) {
- if (p == null || p.tryCompensate())
+ if (p == null || p.tryCompensate(p.ctl))
canBlock = true;
}
else {
@@ -1080,7 +1093,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
wt.pool.helpQuiescePool(wt.workQueue);
}
else
- ForkJoinPool.externalHelpQuiescePool();
+ ForkJoinPool.quiesceCommonPool();
}
/**
@@ -1145,7 +1158,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
Thread t;
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
- ForkJoinPool.tryExternalUnpush(this));
+ ForkJoinPool.common.tryExternalUnpush(this));
}
/**
@@ -1316,7 +1329,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*
* @param e the expected tag value
* @param tag the new tag value
- * @return true if successful; i.e., the current value was
+ * @return {@code true} if successful; i.e., the current value was
* equal to e and is now tag.
* @since 1.8
* @hide
@@ -1370,6 +1383,24 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
+ * Adaptor for Runnables in which failure forces worker exception
+ */
+ static final class RunnableExecuteAction extends ForkJoinTask<Void> {
+ final Runnable runnable;
+ RunnableExecuteAction(Runnable runnable) {
+ if (runnable == null) throw new NullPointerException();
+ this.runnable = runnable;
+ }
+ public final Void getRawResult() { return null; }
+ public final void setRawResult(Void v) { }
+ public final boolean exec() { runnable.run(); return true; }
+ void internalPropagateException(Throwable ex) {
+ rethrow(ex); // rethrow outside exec() catches.
+ }
+ private static final long serialVersionUID = 5232453952276885070L;
+ }
+
+ /**
* Adaptor for Callables
*/
static final class AdaptedCallable<T> extends ForkJoinTask<T>
@@ -1480,5 +1511,4 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
throw new Error(e);
}
}
-
}
diff --git a/luni/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java b/luni/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
index f31763c..5f2799b 100644
--- a/luni/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
+++ b/luni/src/main/java/java/util/concurrent/ForkJoinWorkerThread.java
@@ -14,8 +14,8 @@ package java.util.concurrent;
* scheduling or execution. However, you can override initialization
* and termination methods surrounding the main task processing loop.
* If you do create such a subclass, you will also need to supply a
- * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to use it
- * in a {@code ForkJoinPool}.
+ * custom {@link ForkJoinPool.ForkJoinWorkerThreadFactory} to
+ * {@linkplain ForkJoinPool#ForkJoinPool use it} in a {@code ForkJoinPool}.
*
* @since 1.7
* @hide
@@ -61,16 +61,17 @@ public class ForkJoinWorkerThread extends Thread {
}
/**
- * Returns the index number of this thread in its pool. The
- * returned value ranges from zero to the maximum number of
- * threads (minus one) that have ever been created in the pool.
- * This method may be useful for applications that track status or
- * collect results per-worker rather than per-task.
+ * Returns the unique index number of this thread in its pool.
+ * The returned value ranges from zero to the maximum number of
+ * threads (minus one) that may exist in the pool, and does not
+ * change during the lifetime of the thread. This method may be
+ * useful for applications that track status or collect results
+ * per-worker-thread rather than per-task.
*
* @return the index number
*/
public int getPoolIndex() {
- return workQueue.poolIndex;
+ return workQueue.poolIndex >>> 1; // ignore odd/even tag bit
}
/**