Mercurial > hg > release > icedtea7-forest-2.0 > jdk
changeset 1532:eb27b5587e18
Merge
author | sherman |
---|---|
date | Wed, 29 Jul 2009 11:19:14 -0700 |
parents | eb5173d782ca (current diff) 49573ab3096a (diff) |
children | 61d174a58edf |
files | test/java/util/concurrent/ConcurrentLinkedQueue/ConcurrentQueueLoops.java test/java/util/concurrent/ConcurrentLinkedQueue/LoopHelpers.java |
diffstat | 32 files changed, 2247 insertions(+), 908 deletions(-) [+] |
line wrap: on
line diff
--- a/make/common/shared/Defs-control.gmk Fri Jul 24 11:22:29 2009 -0700 +++ b/make/common/shared/Defs-control.gmk Wed Jul 29 11:19:14 2009 -0700 @@ -92,9 +92,9 @@ dummy := $(shell $(MKDIR) -p $(TEMP_DIR)) # The language version we want for this jdk build -SOURCE_LANGUAGE_VERSION=5 +SOURCE_LANGUAGE_VERSION=7 # The class version we want for this jdk build -TARGET_CLASS_VERSION=5 +TARGET_CLASS_VERSION=7 # The MESSAGE, WARNING and ERROR files are used to store sanity check and # source check messages, warnings and errors.
--- a/make/common/shared/Defs-java.gmk Fri Jul 24 11:22:29 2009 -0700 +++ b/make/common/shared/Defs-java.gmk Wed Jul 29 11:19:14 2009 -0700 @@ -122,13 +122,13 @@ JAVACFLAGS += -Werror endif -# Add the source level (currently all source is 1.5, should this be 1.6?) -SOURCE_LANGUAGE_VERSION = 5 +# Add the source level +SOURCE_LANGUAGE_VERSION = 7 LANGUAGE_VERSION = -source $(SOURCE_LANGUAGE_VERSION) JAVACFLAGS += $(LANGUAGE_VERSION) -# Add the class version we want (currently this is 5, should it be 6 or even 7?) -TARGET_CLASS_VERSION = 5 +# Add the class version we want +TARGET_CLASS_VERSION = 7 CLASS_VERSION = -target $(TARGET_CLASS_VERSION) JAVACFLAGS += $(CLASS_VERSION) JAVACFLAGS += -encoding ascii
--- a/make/java/dyn/Makefile Fri Jul 24 11:22:29 2009 -0700 +++ b/make/java/dyn/Makefile Wed Jul 29 11:19:14 2009 -0700 @@ -33,8 +33,8 @@ # The sources built here use new language syntax to generate # method handle calls. Let's be sure we are using that format. -#LANGUAGE_VERSION = -source 7 -#CLASS_VERSION = -target 7 +LANGUAGE_VERSION = -source 7 +CLASS_VERSION = -target 7 # Actually, it will be less disruptive to compile with the same # -target option as the rest of the system, and just turn on
--- a/src/share/classes/com/sun/jndi/ldap/Filter.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/com/sun/jndi/ldap/Filter.java Wed Jul 29 11:19:14 2009 -0700 @@ -93,9 +93,7 @@ int filtOffset[] = new int[1]; - for (filtOffset[0] = filterStart; - filtOffset[0] < filterEnd; - filtOffset[0]++) { + for (filtOffset[0] = filterStart; filtOffset[0] < filterEnd;) { switch (filter[filtOffset[0]]) { case '(': filtOffset[0]++; @@ -104,18 +102,21 @@ case '&': encodeComplexFilter(ber, filter, LDAP_FILTER_AND, filtOffset, filterEnd); + // filtOffset[0] has pointed to char after right paren parens--; break; case '|': encodeComplexFilter(ber, filter, LDAP_FILTER_OR, filtOffset, filterEnd); + // filtOffset[0] has pointed to char after right paren parens--; break; case '!': encodeComplexFilter(ber, filter, LDAP_FILTER_NOT, filtOffset, filterEnd); + // filtOffset[0] has pointed to char after right paren parens--; break; @@ -143,8 +144,8 @@ encodeSimpleFilter(ber, filter, filtOffset[0], nextOffset); - // points to right parens; for loop will increment beyond parens - filtOffset[0] = nextOffset; + // points to the char after right paren. + filtOffset[0] = nextOffset + 1; parens--; break; @@ -170,9 +171,14 @@ filtOffset[0] = filterEnd; // force break from outer break; } + + if (parens < 0) { + throw new InvalidSearchFilterException( + "Unbalanced parenthesis"); + } } - if (parens > 0) { + if (parens != 0) { throw new InvalidSearchFilterException("Unbalanced parenthesis"); }
--- a/src/share/classes/java/lang/Character.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/java/lang/Character.java Wed Jul 29 11:19:14 2009 -0700 @@ -2784,8 +2784,13 @@ * @since 1.5 */ public static int toCodePoint(char high, char low) { - return ((high - MIN_HIGH_SURROGATE) << 10) - + (low - MIN_LOW_SURROGATE) + MIN_SUPPLEMENTARY_CODE_POINT; + // Optimized form of: + // return ((high - MIN_HIGH_SURROGATE) << 10) + // + (low - MIN_LOW_SURROGATE) + // + MIN_SUPPLEMENTARY_CODE_POINT; + return ((high << 10) + low) + (MIN_SUPPLEMENTARY_CODE_POINT + - (MIN_HIGH_SURROGATE << 10) + - MIN_LOW_SURROGATE); } /** @@ -3071,9 +3076,10 @@ } static void toSurrogates(int codePoint, char[] dst, int index) { - int offset = codePoint - MIN_SUPPLEMENTARY_CODE_POINT; - dst[index+1] = (char)((offset & 0x3ff) + MIN_LOW_SURROGATE); - dst[index] = (char)((offset >>> 10) + MIN_HIGH_SURROGATE); + // We write elements "backwards" to guarantee all-or-nothing + dst[index+1] = (char)((codePoint & 0x3ff) + MIN_LOW_SURROGATE); + dst[index] = (char)((codePoint >>> 10) + + (MIN_HIGH_SURROGATE - (MIN_SUPPLEMENTARY_CODE_POINT >>> 10))); } /**
--- a/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/java/util/concurrent/ConcurrentLinkedQueue.java Wed Jul 29 11:19:14 2009 -0700 @@ -34,9 +34,13 @@ */ package java.util.concurrent; -import java.util.*; -import java.util.concurrent.atomic.*; +import java.util.AbstractQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Queue; /** * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes. @@ -47,9 +51,9 @@ * queue the shortest time. New elements * are inserted at the tail of the queue, and the queue retrieval * operations obtain elements at the head of the queue. - * A <tt>ConcurrentLinkedQueue</tt> is an appropriate choice when + * A {@code ConcurrentLinkedQueue} is an appropriate choice when * many threads will share access to a common collection. - * This queue does not permit <tt>null</tt> elements. + * This queue does not permit {@code null} elements. * * <p>This implementation employs an efficient "wait-free" * algorithm based on one described in <a @@ -57,7 +61,7 @@ * Fast, and Practical Non-Blocking and Blocking Concurrent Queue * Algorithms</a> by Maged M. Michael and Michael L. Scott. * - * <p>Beware that, unlike in most collections, the <tt>size</tt> method + * <p>Beware that, unlike in most collections, the {@code size} method * is <em>NOT</em> a constant-time operation. Because of the * asynchronous nature of these queues, determining the current number * of elements requires a traversal of the elements. @@ -87,51 +91,102 @@ private static final long serialVersionUID = 196745693267521676L; /* - * This is a straight adaptation of Michael & Scott algorithm. - * For explanation, read the paper. The only (minor) algorithmic - * difference is that this version supports lazy deletion of - * internal nodes (method remove(Object)) -- remove CAS'es item - * fields to null. The normal queue operations unlink but then - * pass over nodes with null item fields. Similarly, iteration - * methods ignore those with nulls. + * This is a modification of the Michael & Scott algorithm, + * adapted for a garbage-collected environment, with support for + * interior node deletion (to support remove(Object)). For + * explanation, read the paper. * - * Also note that like most non-blocking algorithms in this - * package, this implementation relies on the fact that in garbage + * Note that like most non-blocking algorithms in this package, + * this implementation relies on the fact that in garbage * collected systems, there is no possibility of ABA problems due * to recycled nodes, so there is no need to use "counted * pointers" or related techniques seen in versions used in * non-GC'ed settings. + * + * The fundamental invariants are: + * - There is exactly one (last) Node with a null next reference, + * which is CASed when enqueueing. This last Node can be + * reached in O(1) time from tail, but tail is merely an + * optimization - it can always be reached in O(N) time from + * head as well. + * - The elements contained in the queue are the non-null items in + * Nodes that are reachable from head. CASing the item + * reference of a Node to null atomically removes it from the + * queue. Reachability of all elements from head must remain + * true even in the case of concurrent modifications that cause + * head to advance. A dequeued Node may remain in use + * indefinitely due to creation of an Iterator or simply a + * poll() that has lost its time slice. + * + * The above might appear to imply that all Nodes are GC-reachable + * from a predecessor dequeued Node. That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to advance to head. + * + * Both head and tail are permitted to lag. In fact, failing to + * update them every time one could is a significant optimization + * (fewer CASes). This is controlled by local "hops" variables + * that only trigger helping-CASes after experiencing multiple + * lags. + * + * Since head and tail are updated concurrently and independently, + * it is possible for tail to lag behind head (why not)? + * + * CASing a Node's item reference to null atomically removes the + * element from the queue. Iterators skip over Nodes with null + * items. Prior implementations of this class had a race between + * poll() and remove(Object) where the same element would appear + * to be successfully removed by two concurrent operations. The + * method remove(Object) also lazily unlinks deleted Nodes, but + * this is merely an optimization. + * + * When constructing a Node (before enqueuing it) we avoid paying + * for a volatile write to item by using lazySet instead of a + * normal write. This allows the cost of enqueue to be + * "one-and-a-half" CASes. + * + * Both head and tail may or may not point to a Node with a + * non-null item. If the queue is empty, all items must of course + * be null. Upon creation, both head and tail refer to a dummy + * Node with null item. Both head and tail are only updated using + * CAS, so they never regress, although again this is merely an + * optimization. */ private static class Node<E> { private volatile E item; private volatile Node<E> next; - private static final - AtomicReferenceFieldUpdater<Node, Node> - nextUpdater = - AtomicReferenceFieldUpdater.newUpdater - (Node.class, Node.class, "next"); - private static final - AtomicReferenceFieldUpdater<Node, Object> - itemUpdater = - AtomicReferenceFieldUpdater.newUpdater - (Node.class, Object.class, "item"); - - Node(E x) { item = x; } - - Node(E x, Node<E> n) { item = x; next = n; } + Node(E item) { + // Piggyback on imminent casNext() + lazySetItem(item); + } E getItem() { return item; } boolean casItem(E cmp, E val) { - return itemUpdater.compareAndSet(this, cmp, val); + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void setItem(E val) { - itemUpdater.set(this, val); + item = val; + } + + void lazySetItem(E val) { + UNSAFE.putOrderedObject(this, itemOffset, val); + } + + void lazySetNext(Node<E> val) { + UNSAFE.putOrderedObject(this, nextOffset, val); } Node<E> getNext() { @@ -139,52 +194,55 @@ } boolean casNext(Node<E> cmp, Node<E> val) { - return nextUpdater.compareAndSet(this, cmp, val); + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } - void setNext(Node<E> val) { - nextUpdater.set(this, val); - } + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE = + sun.misc.Unsafe.getUnsafe(); + private static final long nextOffset = + objectFieldOffset(UNSAFE, "next", Node.class); + private static final long itemOffset = + objectFieldOffset(UNSAFE, "item", Node.class); } - private static final - AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> - tailUpdater = - AtomicReferenceFieldUpdater.newUpdater - (ConcurrentLinkedQueue.class, Node.class, "tail"); - private static final - AtomicReferenceFieldUpdater<ConcurrentLinkedQueue, Node> - headUpdater = - AtomicReferenceFieldUpdater.newUpdater - (ConcurrentLinkedQueue.class, Node.class, "head"); - - private boolean casTail(Node<E> cmp, Node<E> val) { - return tailUpdater.compareAndSet(this, cmp, val); - } - - private boolean casHead(Node<E> cmp, Node<E> val) { - return headUpdater.compareAndSet(this, cmp, val); - } - + /** + * A node from which the first live (non-deleted) node (if any) + * can be reached in O(1) time. + * Invariants: + * - all live nodes are reachable from head via succ() + * - head != null + * - (tmp = head).next != tmp || tmp != head + * Non-invariants: + * - head.item may or may not be null. + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + */ + private transient volatile Node<E> head = new Node<E>(null); /** - * Pointer to header node, initialized to a dummy node. The first - * actual node is at head.getNext(). + * A node from which the last node on list (that is, the unique + * node with node.next == null) can be reached in O(1) time. + * Invariants: + * - the last node is always reachable from tail via succ() + * - tail != null + * Non-invariants: + * - tail.item may or may not be null. + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + * - tail.next may or may not be self-pointing to tail. */ - private transient volatile Node<E> head = new Node<E>(null, null); - - /** Pointer to last node on list **/ private transient volatile Node<E> tail = head; /** - * Creates a <tt>ConcurrentLinkedQueue</tt> that is initially empty. + * Creates a {@code ConcurrentLinkedQueue} that is initially empty. */ public ConcurrentLinkedQueue() {} /** - * Creates a <tt>ConcurrentLinkedQueue</tt> + * Creates a {@code ConcurrentLinkedQueue} * initially containing the elements of the given collection, * added in traversal order of the collection's iterator. * @param c the collection of elements to initially contain @@ -201,7 +259,7 @@ /** * Inserts the specified element at the tail of this queue. * - * @return <tt>true</tt> (as specified by {@link Collection#add}) + * @return {@code true} (as specified by {@link Collection#add}) * @throws NullPointerException if the specified element is null */ public boolean add(E e) { @@ -209,107 +267,135 @@ } /** + * We don't bother to update head or tail pointers if fewer than + * HOPS links from "true" location. We assume that volatile + * writes are significantly more expensive than volatile reads. + */ + private static final int HOPS = 1; + + /** + * Try to CAS head to p. If successful, repoint old head to itself + * as sentinel for succ(), below. + */ + final void updateHead(Node<E> h, Node<E> p) { + if (h != p && casHead(h, p)) + h.lazySetNext(h); + } + + /** + * Returns the successor of p, or the head node if p.next has been + * linked to self, which will only be true if traversing with a + * stale pointer that is now off the list. + */ + final Node<E> succ(Node<E> p) { + Node<E> next = p.getNext(); + return (p == next) ? head : next; + } + + /** * Inserts the specified element at the tail of this queue. * - * @return <tt>true</tt> (as specified by {@link Queue#offer}) + * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); - Node<E> n = new Node<E>(e, null); + Node<E> n = new Node<E>(e); + retry: for (;;) { Node<E> t = tail; - Node<E> s = t.getNext(); - if (t == tail) { - if (s == null) { - if (t.casNext(s, n)) { - casTail(t, n); - return true; - } + Node<E> p = t; + for (int hops = 0; ; hops++) { + Node<E> next = succ(p); + if (next != null) { + if (hops > HOPS && t != tail) + continue retry; + p = next; + } else if (p.casNext(null, n)) { + if (hops >= HOPS) + casTail(t, n); // Failure is OK. + return true; } else { - casTail(t, s); + p = succ(p); } } } } public E poll() { - for (;;) { - Node<E> h = head; - Node<E> t = tail; - Node<E> first = h.getNext(); - if (h == head) { - if (h == t) { - if (first == null) - return null; - else - casTail(t, first); - } else if (casHead(h, first)) { - E item = first.getItem(); - if (item != null) { - first.setItem(null); - return item; - } - // else skip over deleted item, continue loop, + Node<E> h = head; + Node<E> p = h; + for (int hops = 0; ; hops++) { + E item = p.getItem(); + + if (item != null && p.casItem(item, null)) { + if (hops >= HOPS) { + Node<E> q = p.getNext(); + updateHead(h, (q != null) ? q : p); } + return item; } + Node<E> next = succ(p); + if (next == null) { + updateHead(h, p); + break; + } + p = next; } + return null; } - public E peek() { // same as poll except don't remove item + public E peek() { + Node<E> h = head; + Node<E> p = h; + E item; for (;;) { - Node<E> h = head; - Node<E> t = tail; - Node<E> first = h.getNext(); - if (h == head) { - if (h == t) { - if (first == null) - return null; - else - casTail(t, first); - } else { - E item = first.getItem(); - if (item != null) - return item; - else // remove deleted node and continue - casHead(h, first); - } + item = p.getItem(); + if (item != null) + break; + Node<E> next = succ(p); + if (next == null) { + break; } + p = next; } + updateHead(h, p); + return item; } /** - * Returns the first actual (non-header) node on list. This is yet - * another variant of poll/peek; here returning out the first - * node, not element (so we cannot collapse with peek() without - * introducing race.) + * Returns the first live (non-deleted) node on list, or null if none. + * This is yet another variant of poll/peek; here returning the + * first node, not element. We could make peek() a wrapper around + * first(), but that would cost an extra volatile read of item, + * and the need to add a retry loop to deal with the possibility + * of losing a race to a concurrent poll(). */ Node<E> first() { + Node<E> h = head; + Node<E> p = h; + Node<E> result; for (;;) { - Node<E> h = head; - Node<E> t = tail; - Node<E> first = h.getNext(); - if (h == head) { - if (h == t) { - if (first == null) - return null; - else - casTail(t, first); - } else { - if (first.getItem() != null) - return first; - else // remove deleted node and continue - casHead(h, first); - } + E item = p.getItem(); + if (item != null) { + result = p; + break; } + Node<E> next = succ(p); + if (next == null) { + result = null; + break; + } + p = next; } + updateHead(h, p); + return result; } - /** - * Returns <tt>true</tt> if this queue contains no elements. + * Returns {@code true} if this queue contains no elements. * - * @return <tt>true</tt> if this queue contains no elements + * @return {@code true} if this queue contains no elements */ public boolean isEmpty() { return first() == null; @@ -317,8 +403,8 @@ /** * Returns the number of elements in this queue. If this queue - * contains more than <tt>Integer.MAX_VALUE</tt> elements, returns - * <tt>Integer.MAX_VALUE</tt>. + * contains more than {@code Integer.MAX_VALUE} elements, returns + * {@code Integer.MAX_VALUE}. * * <p>Beware that, unlike in most collections, this method is * <em>NOT</em> a constant-time operation. Because of the @@ -329,7 +415,7 @@ */ public int size() { int count = 0; - for (Node<E> p = first(); p != null; p = p.getNext()) { + for (Node<E> p = first(); p != null; p = succ(p)) { if (p.getItem() != null) { // Collections.size() spec says to max out if (++count == Integer.MAX_VALUE) @@ -340,16 +426,16 @@ } /** - * Returns <tt>true</tt> if this queue contains the specified element. - * More formally, returns <tt>true</tt> if and only if this queue contains - * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. + * Returns {@code true} if this queue contains the specified element. + * More formally, returns {@code true} if and only if this queue contains + * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this queue - * @return <tt>true</tt> if this queue contains the specified element + * @return {@code true} if this queue contains the specified element */ public boolean contains(Object o) { if (o == null) return false; - for (Node<E> p = first(); p != null; p = p.getNext()) { + for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null && o.equals(item)) @@ -360,23 +446,29 @@ /** * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element <tt>e</tt> such - * that <tt>o.equals(e)</tt>, if this queue contains one or more such + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such * elements. - * Returns <tt>true</tt> if this queue contained the specified element + * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present - * @return <tt>true</tt> if this queue changed as a result of the call + * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; - for (Node<E> p = first(); p != null; p = p.getNext()) { + Node<E> pred = null; + for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null && o.equals(item) && - p.casItem(item, null)) + p.casItem(item, null)) { + Node<E> next = succ(p); + if (pred != null && next != null) + pred.casNext(p, next); return true; + } + pred = p; } return false; } @@ -397,7 +489,7 @@ public Object[] toArray() { // Use ArrayList to deal with resizing. ArrayList<E> al = new ArrayList<E>(); - for (Node<E> p = first(); p != null; p = p.getNext()) { + for (Node<E> p = first(); p != null; p = succ(p)) { E item = p.getItem(); if (item != null) al.add(item); @@ -415,22 +507,22 @@ * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to - * <tt>null</tt>. + * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - * <p>Suppose <tt>x</tt> is a queue known to contain only strings. + * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly - * allocated array of <tt>String</tt>: + * allocated array of {@code String}: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * - * Note that <tt>toArray(new Object[0])</tt> is identical in function to - * <tt>toArray()</tt>. + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the @@ -441,11 +533,12 @@ * this queue * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { // try to use sent-in array int k = 0; Node<E> p; - for (p = first(); p != null && k < a.length; p = p.getNext()) { + for (p = first(); p != null && k < a.length; p = succ(p)) { E item = p.getItem(); if (item != null) a[k++] = (T)item; @@ -458,7 +551,7 @@ // If won't fit, use ArrayList version ArrayList<E> al = new ArrayList<E>(); - for (Node<E> q = first(); q != null; q = q.getNext()) { + for (Node<E> q = first(); q != null; q = succ(q)) { E item = q.getItem(); if (item != null) al.add(item); @@ -511,7 +604,15 @@ lastRet = nextNode; E x = nextItem; - Node<E> p = (nextNode == null)? first() : nextNode.getNext(); + Node<E> pred, p; + if (nextNode == null) { + p = first(); + pred = null; + } else { + pred = nextNode; + p = succ(nextNode); + } + for (;;) { if (p == null) { nextNode = null; @@ -523,8 +624,13 @@ nextNode = p; nextItem = item; return x; - } else // skip over nulls - p = p.getNext(); + } else { + // skip over nulls + Node<E> next = succ(p); + if (pred != null && next != null) + pred.casNext(p, next); + p = next; + } } } @@ -549,7 +655,7 @@ /** * Save the state to a stream (that is, serialize it). * - * @serialData All of the elements (each an <tt>E</tt>) in + * @serialData All of the elements (each an {@code E}) in * the proper order, followed by a null * @param s the stream */ @@ -560,7 +666,7 @@ s.defaultWriteObject(); // Write out all elements in the proper order. - for (Node<E> p = first(); p != null; p = p.getNext()) { + for (Node<E> p = first(); p != null; p = succ(p)) { Object item = p.getItem(); if (item != null) s.writeObject(item); @@ -579,10 +685,11 @@ throws java.io.IOException, ClassNotFoundException { // Read in capacity, and any hidden stuff s.defaultReadObject(); - head = new Node<E>(null, null); + head = new Node<E>(null); tail = head; // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break; @@ -591,4 +698,35 @@ } } + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); + private static final long headOffset = + objectFieldOffset(UNSAFE, "head", ConcurrentLinkedQueue.class); + private static final long tailOffset = + objectFieldOffset(UNSAFE, "tail", ConcurrentLinkedQueue.class); + + private boolean casTail(Node<E> cmp, Node<E> val) { + return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); + } + + private boolean casHead(Node<E> cmp, Node<E> val) { + return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); + } + + private void lazySetHead(Node<E> val) { + UNSAFE.putOrderedObject(this, headOffset, val); + } + + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class<?> klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } }
--- a/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/java/util/concurrent/LinkedBlockingDeque.java Wed Jul 29 11:19:14 2009 -0700 @@ -34,8 +34,13 @@ */ package java.util.concurrent; -import java.util.*; -import java.util.concurrent.locks.*; + +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; /** * An optionally-bounded {@linkplain BlockingDeque blocking deque} based on @@ -73,6 +78,20 @@ /* * Implemented as a simple doubly-linked list protected by a * single lock and using conditions to manage blocking. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to jump to "first" (for next links) + * or "last" (for prev links). */ /* @@ -86,9 +105,27 @@ /** Doubly-linked list node class */ static final class Node<E> { + /** + * The item, or null if this node has been removed. + */ E item; + + /** + * One of: + * - the real predecessor Node + * - this Node, meaning the predecessor is tail + * - null, meaning there is no predecessor + */ Node<E> prev; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head + * - null, meaning there is no successor + */ Node<E> next; + Node(E x, Node<E> p, Node<E> n) { item = x; prev = p; @@ -96,23 +133,37 @@ } } - /** Pointer to first node */ - private transient Node<E> first; - /** Pointer to last node */ - private transient Node<E> last; + /** + * Pointer to first node. + * Invariant: (first == null && last == null) || + * (first.prev == null && first.item != null) + */ + transient Node<E> first; + + /** + * Pointer to last node. + * Invariant: (first == null && last == null) || + * (last.next == null && last.item != null) + */ + transient Node<E> last; + /** Number of items in the deque */ private transient int count; + /** Maximum number of items in the deque */ private final int capacity; + /** Main lock guarding all access */ - private final ReentrantLock lock = new ReentrantLock(); + final ReentrantLock lock = new ReentrantLock(); + /** Condition for waiting takes */ private final Condition notEmpty = lock.newCondition(); + /** Condition for waiting puts */ private final Condition notFull = lock.newCondition(); /** - * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingDeque() { @@ -120,10 +171,10 @@ } /** - * Creates a <tt>LinkedBlockingDeque</tt> with the given (fixed) capacity. + * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity. * * @param capacity the capacity of this deque - * @throws IllegalArgumentException if <tt>capacity</tt> is less than 1 + * @throws IllegalArgumentException if {@code capacity} is less than 1 */ public LinkedBlockingDeque(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); @@ -131,7 +182,7 @@ } /** - * Creates a <tt>LinkedBlockingDeque</tt> with a capacity of + * Creates a {@code LinkedBlockingDeque} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of * the given collection, added in traversal order of the * collection's iterator. @@ -142,8 +193,18 @@ */ public LinkedBlockingDeque(Collection<? extends E> c) { this(Integer.MAX_VALUE); - for (E e : c) - add(e); + final ReentrantLock lock = this.lock; + lock.lock(); // Never contended, but necessary for visibility + try { + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (!linkLast(e)) + throw new IllegalStateException("Deque full"); + } + } finally { + lock.unlock(); + } } @@ -153,9 +214,9 @@ * Links e as first element, or returns false if full. */ private boolean linkFirst(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node<E> f = first; Node<E> x = new Node<E>(e, null, f); first = x; @@ -163,6 +224,7 @@ last = x; else f.prev = x; + ++count; notEmpty.signal(); return true; } @@ -171,9 +233,9 @@ * Links e as last element, or returns false if full. */ private boolean linkLast(E e) { + // assert lock.isHeldByCurrentThread(); if (count >= capacity) return false; - ++count; Node<E> l = last; Node<E> x = new Node<E>(e, l, null); last = x; @@ -181,6 +243,7 @@ first = x; else l.next = x; + ++count; notEmpty.signal(); return true; } @@ -189,10 +252,14 @@ * Removes and returns first element, or null if empty. */ private E unlinkFirst() { + // assert lock.isHeldByCurrentThread(); Node<E> f = first; if (f == null) return null; Node<E> n = f.next; + E item = f.item; + f.item = null; + f.next = f; // help GC first = n; if (n == null) last = null; @@ -200,17 +267,21 @@ n.prev = null; --count; notFull.signal(); - return f.item; + return item; } /** * Removes and returns last element, or null if empty. */ private E unlinkLast() { + // assert lock.isHeldByCurrentThread(); Node<E> l = last; if (l == null) return null; Node<E> p = l.prev; + E item = l.item; + l.item = null; + l.prev = l; // help GC last = p; if (p == null) first = null; @@ -218,31 +289,29 @@ p.next = null; --count; notFull.signal(); - return l.item; + return item; } /** - * Unlink e + * Unlinks x. */ - private void unlink(Node<E> x) { + void unlink(Node<E> x) { + // assert lock.isHeldByCurrentThread(); Node<E> p = x.prev; Node<E> n = x.next; if (p == null) { - if (n == null) - first = last = null; - else { - n.prev = null; - first = n; - } + unlinkFirst(); } else if (n == null) { - p.next = null; - last = p; + unlinkLast(); } else { p.next = n; n.prev = p; + x.item = null; + // Don't mess with x's links. They may still be in use by + // an iterator. + --count; + notFull.signal(); } - --count; - notFull.signalAll(); } // BlockingDeque methods @@ -270,6 +339,7 @@ */ public boolean offerFirst(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkFirst(e); @@ -283,6 +353,7 @@ */ public boolean offerLast(E e) { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { return linkLast(e); @@ -297,6 +368,7 @@ */ public void putFirst(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkFirst(e)) @@ -312,6 +384,7 @@ */ public void putLast(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); + final ReentrantLock lock = this.lock; lock.lock(); try { while (!linkLast(e)) @@ -329,15 +402,15 @@ throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkFirst(e)) - return true; + while (!linkFirst(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -351,15 +424,15 @@ throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - if (linkLast(e)) - return true; + while (!linkLast(e)) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } + return true; } finally { lock.unlock(); } @@ -384,6 +457,7 @@ } public E pollFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkFirst(); @@ -393,6 +467,7 @@ } public E pollLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return unlinkLast(); @@ -402,6 +477,7 @@ } public E takeFirst() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -414,6 +490,7 @@ } public E takeLast() throws InterruptedException { + final ReentrantLock lock = this.lock; lock.lock(); try { E x; @@ -428,16 +505,16 @@ public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkFirst(); - if (x != null) - return x; + E x; + while ( (x = unlinkFirst()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -446,16 +523,16 @@ public E pollLast(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { - for (;;) { - E x = unlinkLast(); - if (x != null) - return x; + E x; + while ( (x = unlinkLast()) == null) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } + return x; } finally { lock.unlock(); } @@ -480,6 +557,7 @@ } public E peekFirst() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (first == null) ? null : first.item; @@ -489,6 +567,7 @@ } public E peekLast() { + final ReentrantLock lock = this.lock; lock.lock(); try { return (last == null) ? null : last.item; @@ -499,6 +578,7 @@ public boolean removeFirstOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) { @@ -515,6 +595,7 @@ public boolean removeLastOccurrence(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = last; p != null; p = p.prev) { @@ -619,14 +700,15 @@ * Returns the number of additional elements that this deque can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this deque - * less the current <tt>size</tt> of this deque. + * less the current {@code size} of this deque. * * <p>Note that you <em>cannot</em> always tell if an attempt to insert - * an element will succeed by inspecting <tt>remainingCapacity</tt> + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ public int remainingCapacity() { + final ReentrantLock lock = this.lock; lock.lock(); try { return capacity - count; @@ -642,22 +724,7 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - lock.lock(); - try { - for (Node<E> p = first; p != null; p = p.next) - c.add(p.item); - int n = count; - count = 0; - first = last = null; - notFull.signalAll(); - return n; - } finally { - lock.unlock(); - } + return drainTo(c, Integer.MAX_VALUE); } /** @@ -671,19 +738,14 @@ throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; lock.lock(); try { - int n = 0; - while (n < maxElements && first != null) { - c.add(first.item); - first.prev = null; - first = first.next; - --count; - ++n; + int n = Math.min(maxElements, count); + for (int i = 0; i < n; i++) { + c.add(first.item); // In this order, in case add() throws. + unlinkFirst(); } - if (first == null) - last = null; - notFull.signalAll(); return n; } finally { lock.unlock(); @@ -712,16 +774,16 @@ /** * Removes the first occurrence of the specified element from this deque. * If the deque does not contain the element, it is unchanged. - * More formally, removes the first element <tt>e</tt> such that - * <tt>o.equals(e)</tt> (if such an element exists). - * Returns <tt>true</tt> if this deque contained the specified element + * More formally, removes the first element {@code e} such that + * {@code o.equals(e)} (if such an element exists). + * Returns {@code true} if this deque contained the specified element * (or equivalently, if this deque changed as a result of the call). * * <p>This method is equivalent to * {@link #removeFirstOccurrence(Object) removeFirstOccurrence}. * * @param o element to be removed from this deque, if present - * @return <tt>true</tt> if this deque changed as a result of the call + * @return {@code true} if this deque changed as a result of the call */ public boolean remove(Object o) { return removeFirstOccurrence(o); @@ -733,6 +795,7 @@ * @return the number of elements in this deque */ public int size() { + final ReentrantLock lock = this.lock; lock.lock(); try { return count; @@ -742,15 +805,16 @@ } /** - * Returns <tt>true</tt> if this deque contains the specified element. - * More formally, returns <tt>true</tt> if and only if this deque contains - * at least one element <tt>e</tt> such that <tt>o.equals(e)</tt>. + * Returns {@code true} if this deque contains the specified element. + * More formally, returns {@code true} if and only if this deque contains + * at least one element {@code e} such that {@code o.equals(e)}. * * @param o object to be checked for containment in this deque - * @return <tt>true</tt> if this deque contains the specified element + * @return {@code true} if this deque contains the specified element */ public boolean contains(Object o) { if (o == null) return false; + final ReentrantLock lock = this.lock; lock.lock(); try { for (Node<E> p = first; p != null; p = p.next) @@ -762,24 +826,46 @@ } } - /** - * Variant of removeFirstOccurrence needed by iterator.remove. - * Searches for the node, not its contents. + /* + * TODO: Add support for more efficient bulk operations. + * + * We don't want to acquire the lock for every iteration, but we + * also want other threads a chance to interact with the + * collection, especially when count is close to capacity. */ - boolean removeNode(Node<E> e) { - lock.lock(); - try { - for (Node<E> p = first; p != null; p = p.next) { - if (p == e) { - unlink(p); - return true; - } - } - return false; - } finally { - lock.unlock(); - } - } + +// /** +// * Adds all of the elements in the specified collection to this +// * queue. Attempts to addAll of a queue to itself result in +// * {@code IllegalArgumentException}. Further, the behavior of +// * this operation is undefined if the specified collection is +// * modified while the operation is in progress. +// * +// * @param c collection containing elements to be added to this queue +// * @return {@code true} if this queue changed as a result of the call +// * @throws ClassCastException {@inheritDoc} +// * @throws NullPointerException {@inheritDoc} +// * @throws IllegalArgumentException {@inheritDoc} +// * @throws IllegalStateException {@inheritDoc} +// * @see #add(Object) +// */ +// public boolean addAll(Collection<? extends E> c) { +// if (c == null) +// throw new NullPointerException(); +// if (c == this) +// throw new IllegalArgumentException(); +// final ReentrantLock lock = this.lock; +// lock.lock(); +// try { +// boolean modified = false; +// for (E e : c) +// if (linkLast(e)) +// modified = true; +// return modified; +// } finally { +// lock.unlock(); +// } +// } /** * Returns an array containing all of the elements in this deque, in @@ -794,7 +880,9 @@ * * @return an array containing all of the elements in this deque */ + @SuppressWarnings("unchecked") public Object[] toArray() { + final ReentrantLock lock = this.lock; lock.lock(); try { Object[] a = new Object[count]; @@ -817,22 +905,22 @@ * <p>If this deque fits in the specified array with room to spare * (i.e., the array has more elements than this deque), the element in * the array immediately following the end of the deque is set to - * <tt>null</tt>. + * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - * <p>Suppose <tt>x</tt> is a deque known to contain only strings. + * <p>Suppose {@code x} is a deque known to contain only strings. * The following code can be used to dump the deque into a newly - * allocated array of <tt>String</tt>: + * allocated array of {@code String}: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * - * Note that <tt>toArray(new Object[0])</tt> is identical in function to - * <tt>toArray()</tt>. + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the deque are to * be stored, if it is big enough; otherwise, a new array of the @@ -843,14 +931,14 @@ * this deque * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { + final ReentrantLock lock = this.lock; lock.lock(); try { if (a.length < count) - a = (T[])java.lang.reflect.Array.newInstance( - a.getClass().getComponentType(), - count - ); + a = (T[])java.lang.reflect.Array.newInstance + (a.getClass().getComponentType(), count); int k = 0; for (Node<E> p = first; p != null; p = p.next) @@ -864,6 +952,7 @@ } public String toString() { + final ReentrantLock lock = this.lock; lock.lock(); try { return super.toString(); @@ -877,8 +966,16 @@ * The deque will be empty after this call returns. */ public void clear() { + final ReentrantLock lock = this.lock; lock.lock(); try { + for (Node<E> f = first; f != null; ) { + f.item = null; + Node<E> n = f.next; + f.prev = null; + f.next = null; + f = n; + } first = last = null; count = 0; notFull.signalAll(); @@ -890,7 +987,7 @@ /** * Returns an iterator over the elements in this deque in proper sequence. * The elements will be returned in order from first (head) to last (tail). - * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -906,7 +1003,7 @@ * Returns an iterator over the elements in this deque in reverse * sequential order. The elements will be returned in order from * last (tail) to first (head). - * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -921,7 +1018,7 @@ */ private abstract class AbstractItr implements Iterator<E> { /** - * The next node to return in next + * The next node to return in next() */ Node<E> next; @@ -939,15 +1036,44 @@ */ private Node<E> lastRet; + abstract Node<E> firstNode(); + abstract Node<E> nextNode(Node<E> n); + AbstractItr() { - advance(); // set to initial position + // set to initial position + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + next = firstNode(); + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } } /** - * Advances next, or if not yet initialized, sets to first node. - * Implemented to move forward vs backward in the two subclasses. + * Advances next. */ - abstract void advance(); + void advance() { + final ReentrantLock lock = LinkedBlockingDeque.this.lock; + lock.lock(); + try { + // assert next != null; + Node<E> s = nextNode(next); + if (s == next) { + next = firstNode(); + } else { + // Skip over removed nodes. + // May be necessary if multiple interior Nodes are removed. + while (s != null && s.item == null) + s = nextNode(s); + next = s; + } + nextItem = (next == null) ? null : next.item; + } finally { + lock.unlock(); + } + } public boolean hasNext() { return next != null; @@ -967,52 +1093,39 @@ if (n == null) throw new IllegalStateException(); lastRet = null; - // Note: removeNode rescans looking for this node to make - // sure it was not already removed. Otherwise, trying to - // re-remove could corrupt list. - removeNode(n); - } - } - - /** Forward iterator */ - private class Itr extends AbstractItr { - void advance() { final ReentrantLock lock = LinkedBlockingDeque.this.lock; lock.lock(); try { - next = (next == null)? first : next.next; - nextItem = (next == null)? null : next.item; + if (n.item != null) + unlink(n); } finally { lock.unlock(); } } } - /** - * Descending iterator for LinkedBlockingDeque - */ + /** Forward iterator */ + private class Itr extends AbstractItr { + Node<E> firstNode() { return first; } + Node<E> nextNode(Node<E> n) { return n.next; } + } + + /** Descending iterator */ private class DescendingItr extends AbstractItr { - void advance() { - final ReentrantLock lock = LinkedBlockingDeque.this.lock; - lock.lock(); - try { - next = (next == null)? last : next.prev; - nextItem = (next == null)? null : next.item; - } finally { - lock.unlock(); - } - } + Node<E> firstNode() { return last; } + Node<E> nextNode(Node<E> n) { return n.prev; } } /** * Save the state of this deque to a stream (that is, serialize it). * * @serialData The capacity (int), followed by elements (each an - * <tt>Object</tt>) in the proper order, followed by a null + * {@code Object}) in the proper order, followed by a null * @param s the stream */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + final ReentrantLock lock = this.lock; lock.lock(); try { // Write out capacity and any hidden stuff @@ -1040,6 +1153,7 @@ last = null; // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break;
--- a/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/java/util/concurrent/LinkedBlockingQueue.java Wed Jul 29 11:19:14 2009 -0700 @@ -34,9 +34,14 @@ */ package java.util.concurrent; -import java.util.concurrent.atomic.*; -import java.util.concurrent.locks.*; -import java.util.*; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.AbstractQueue; +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; /** * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on @@ -86,15 +91,43 @@ * items have been entered since the signal. And symmetrically for * takes signalling puts. Operations such as remove(Object) and * iterators acquire both locks. + * + * Visibility between writers and readers is provided as follows: + * + * Whenever an element is enqueued, the putLock is acquired and + * count updated. A subsequent reader guarantees visibility to the + * enqueued Node by either acquiring the putLock (via fullyLock) + * or by acquiring the takeLock, and then reading n = count.get(); + * this gives visibility to the first n items. + * + * To implement weakly consistent iterators, it appears we need to + * keep all Nodes GC-reachable from a predecessor dequeued Node. + * That would cause two problems: + * - allow a rogue Iterator to cause unbounded memory retention + * - cause cross-generational linking of old Nodes to new Nodes if + * a Node was tenured while live, which generational GCs have a + * hard time dealing with, causing repeated major collections. + * However, only non-deleted Nodes need to be reachable from + * dequeued Nodes, and reachability does not necessarily have to + * be of the kind understood by the GC. We use the trick of + * linking a Node that has just been dequeued to itself. Such a + * self-link implicitly means to advance to head.next. */ /** * Linked list node class */ static class Node<E> { - /** The item, volatile to ensure barrier separating write and read */ - volatile E item; + E item; + + /** + * One of: + * - the real successor Node + * - this Node, meaning the successor is head.next + * - null, meaning there is no successor (this is the last node) + */ Node<E> next; + Node(E x) { item = x; } } @@ -104,10 +137,16 @@ /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(0); - /** Head of linked list */ + /** + * Head of linked list. + * Invariant: head.item == null + */ private transient Node<E> head; - /** Tail of linked list */ + /** + * Tail of linked list. + * Invariant: last.next == null + */ private transient Node<E> last; /** Lock held by take, poll, etc */ @@ -151,18 +190,26 @@ /** * Creates a node and links it at end of queue. + * * @param x the item */ - private void insert(E x) { + private void enqueue(E x) { + // assert putLock.isHeldByCurrentThread(); + // assert last.next == null; last = last.next = new Node<E>(x); } /** - * Removes a node from head of queue, + * Removes a node from head of queue. + * * @return the node */ - private E extract() { - Node<E> first = head.next; + private E dequeue() { + // assert takeLock.isHeldByCurrentThread(); + // assert head.item == null; + Node<E> h = head; + Node<E> first = h.next; + h.next = h; // help GC head = first; E x = first.item; first.item = null; @@ -172,7 +219,7 @@ /** * Lock to prevent both puts and takes. */ - private void fullyLock() { + void fullyLock() { putLock.lock(); takeLock.lock(); } @@ -180,14 +227,21 @@ /** * Unlock to allow both puts and takes. */ - private void fullyUnlock() { + void fullyUnlock() { takeLock.unlock(); putLock.unlock(); } +// /** +// * Tells whether both locks are held by current thread. +// */ +// boolean isFullyLocked() { +// return (putLock.isHeldByCurrentThread() && +// takeLock.isHeldByCurrentThread()); +// } /** - * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}. */ public LinkedBlockingQueue() { @@ -195,10 +249,10 @@ } /** - * Creates a <tt>LinkedBlockingQueue</tt> with the given (fixed) capacity. + * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity. * * @param capacity the capacity of this queue - * @throws IllegalArgumentException if <tt>capacity</tt> is not greater + * @throws IllegalArgumentException if {@code capacity} is not greater * than zero */ public LinkedBlockingQueue(int capacity) { @@ -208,7 +262,7 @@ } /** - * Creates a <tt>LinkedBlockingQueue</tt> with a capacity of + * Creates a {@code LinkedBlockingQueue} with a capacity of * {@link Integer#MAX_VALUE}, initially containing the elements of the * given collection, * added in traversal order of the collection's iterator. @@ -219,8 +273,22 @@ */ public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); - for (E e : c) - add(e); + final ReentrantLock putLock = this.putLock; + putLock.lock(); // Never contended, but necessary for visibility + try { + int n = 0; + for (E e : c) { + if (e == null) + throw new NullPointerException(); + if (n == capacity) + throw new IllegalStateException("Queue full"); + enqueue(e); + ++n; + } + count.set(n); + } finally { + putLock.unlock(); + } } @@ -241,10 +309,10 @@ * Returns the number of additional elements that this queue can ideally * (in the absence of memory or resource constraints) accept without * blocking. This is always equal to the initial capacity of this queue - * less the current <tt>size</tt> of this queue. + * less the current {@code size} of this queue. * * <p>Note that you <em>cannot</em> always tell if an attempt to insert - * an element will succeed by inspecting <tt>remainingCapacity</tt> + * an element will succeed by inspecting {@code remainingCapacity} * because it may be the case that another thread is about to * insert or remove an element. */ @@ -261,8 +329,8 @@ */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); - // Note: convention in all put/take/etc is to preset - // local var holding count negative to indicate failure unless set. + // Note: convention in all put/take/etc is to preset local var + // holding count negative to indicate failure unless set. int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; @@ -273,18 +341,13 @@ * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are - * signalled if it ever changes from - * capacity. Similarly for all other uses of count in - * other wait guards. + * signalled if it ever changes from capacity. Similarly + * for all other uses of count in other wait guards. */ - try { - while (count.get() == capacity) - notFull.await(); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to a non-interrupted thread - throw ie; + while (count.get() == capacity) { + notFull.await(); } - insert(e); + enqueue(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); @@ -299,7 +362,7 @@ * Inserts the specified element at the tail of this queue, waiting if * necessary up to the specified wait time for space to become available. * - * @return <tt>true</tt> if successful, or <tt>false</tt> if + * @return {@code true} if successful, or {@code false} if * the specified waiting time elapses before space is available. * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} @@ -314,23 +377,15 @@ final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { - for (;;) { - if (count.get() < capacity) { - insert(e); - c = count.getAndIncrement(); - if (c + 1 < capacity) - notFull.signal(); - break; - } + while (count.get() == capacity) { if (nanos <= 0) return false; - try { - nanos = notFull.awaitNanos(nanos); - } catch (InterruptedException ie) { - notFull.signal(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notFull.awaitNanos(nanos); } + enqueue(e); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); } finally { putLock.unlock(); } @@ -342,7 +397,7 @@ /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, - * returning <tt>true</tt> upon success and <tt>false</tt> if this queue + * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to @@ -360,7 +415,7 @@ putLock.lock(); try { if (count.get() < capacity) { - insert(e); + enqueue(e); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); @@ -381,15 +436,10 @@ final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { - try { - while (count.get() == 0) - notEmpty.await(); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; + while (count.get() == 0) { + notEmpty.await(); } - - x = extract(); + x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); @@ -409,23 +459,15 @@ final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { - for (;;) { - if (count.get() > 0) { - x = extract(); - c = count.getAndDecrement(); - if (c > 1) - notEmpty.signal(); - break; - } + while (count.get() == 0) { if (nanos <= 0) return null; - try { - nanos = notEmpty.awaitNanos(nanos); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; - } + nanos = notEmpty.awaitNanos(nanos); } + x = dequeue(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); } finally { takeLock.unlock(); } @@ -444,7 +486,7 @@ takeLock.lock(); try { if (count.get() > 0) { - x = extract(); + x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); @@ -457,7 +499,6 @@ return x; } - public E peek() { if (count.get() == 0) return null; @@ -475,43 +516,47 @@ } /** + * Unlinks interior Node p with predecessor trail. + */ + void unlink(Node<E> p, Node<E> trail) { + // assert isFullyLocked(); + // p.next is not changed, to allow iterators that are + // traversing p to maintain their weak-consistency guarantee. + p.item = null; + trail.next = p.next; + if (last == p) + last = trail; + if (count.getAndDecrement() == capacity) + notFull.signal(); + } + + /** * Removes a single instance of the specified element from this queue, - * if it is present. More formally, removes an element <tt>e</tt> such - * that <tt>o.equals(e)</tt>, if this queue contains one or more such + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such * elements. - * Returns <tt>true</tt> if this queue contained the specified element + * Returns {@code true} if this queue contained the specified element * (or equivalently, if this queue changed as a result of the call). * * @param o element to be removed from this queue, if present - * @return <tt>true</tt> if this queue changed as a result of the call + * @return {@code true} if this queue changed as a result of the call */ public boolean remove(Object o) { if (o == null) return false; - boolean removed = false; fullyLock(); try { - Node<E> trail = head; - Node<E> p = head.next; - while (p != null) { + for (Node<E> trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { if (o.equals(p.item)) { - removed = true; - break; + unlink(p, trail); + return true; } - trail = p; - p = p.next; } - if (removed) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - if (count.getAndDecrement() == capacity) - notFull.signalAll(); - } + return false; } finally { fullyUnlock(); } - return removed; } /** @@ -551,22 +596,22 @@ * <p>If this queue fits in the specified array with room to spare * (i.e., the array has more elements than this queue), the element in * the array immediately following the end of the queue is set to - * <tt>null</tt>. + * {@code null}. * * <p>Like the {@link #toArray()} method, this method acts as bridge between * array-based and collection-based APIs. Further, this method allows * precise control over the runtime type of the output array, and may, * under certain circumstances, be used to save allocation costs. * - * <p>Suppose <tt>x</tt> is a queue known to contain only strings. + * <p>Suppose {@code x} is a queue known to contain only strings. * The following code can be used to dump the queue into a newly - * allocated array of <tt>String</tt>: + * allocated array of {@code String}: * * <pre> * String[] y = x.toArray(new String[0]);</pre> * - * Note that <tt>toArray(new Object[0])</tt> is identical in function to - * <tt>toArray()</tt>. + * Note that {@code toArray(new Object[0])} is identical in function to + * {@code toArray()}. * * @param a the array into which the elements of the queue are to * be stored, if it is big enough; otherwise, a new array of the @@ -577,6 +622,7 @@ * this queue * @throws NullPointerException if the specified array is null */ + @SuppressWarnings("unchecked") public <T> T[] toArray(T[] a) { fullyLock(); try { @@ -586,7 +632,7 @@ (a.getClass().getComponentType(), size); int k = 0; - for (Node p = head.next; p != null; p = p.next) + for (Node<E> p = head.next; p != null; p = p.next) a[k++] = (T)p.item; if (a.length > k) a[k] = null; @@ -612,11 +658,14 @@ public void clear() { fullyLock(); try { - head.next = null; - assert head.item == null; - last = head; + for (Node<E> p, h = head; (p = h.next) != null; h = p) { + h.next = h; + p.item = null; + } + head = last; + // assert head.item == null && head.next == null; if (count.getAndSet(0) == capacity) - notFull.signalAll(); + notFull.signal(); } finally { fullyUnlock(); } @@ -629,30 +678,7 @@ * @throws IllegalArgumentException {@inheritDoc} */ public int drainTo(Collection<? super E> c) { - if (c == null) - throw new NullPointerException(); - if (c == this) - throw new IllegalArgumentException(); - Node<E> first; - fullyLock(); - try { - first = head.next; - head.next = null; - assert head.item == null; - last = head; - if (count.getAndSet(0) == capacity) - notFull.signalAll(); - } finally { - fullyUnlock(); - } - // Transfer the elements outside of locks - int n = 0; - for (Node<E> p = first; p != null; p = p.next) { - c.add(p.item); - p.item = null; - ++n; - } - return n; + return drainTo(c, Integer.MAX_VALUE); } /** @@ -666,33 +692,42 @@ throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); - fullyLock(); + boolean signalNotFull = false; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); try { - int n = 0; - Node<E> p = head.next; - while (p != null && n < maxElements) { - c.add(p.item); - p.item = null; - p = p.next; - ++n; + int n = Math.min(maxElements, count.get()); + // count.get provides visibility to first n Nodes + Node<E> h = head; + int i = 0; + try { + while (i < n) { + Node<E> p = h.next; + c.add(p.item); + p.item = null; + h.next = h; + h = p; + ++i; + } + return n; + } finally { + // Restore invariants even if c.add() threw + if (i > 0) { + // assert h.item == null; + head = h; + signalNotFull = (count.getAndAdd(-i) == capacity); + } } - if (n != 0) { - head.next = p; - assert head.item == null; - if (p == null) - last = head; - if (count.getAndAdd(-n) == capacity) - notFull.signalAll(); - } - return n; } finally { - fullyUnlock(); + takeLock.unlock(); + if (signalNotFull) + signalNotFull(); } } /** * Returns an iterator over the elements in this queue in proper sequence. - * The returned <tt>Iterator</tt> is a "weakly consistent" iterator that + * The returned {@code Iterator} is a "weakly consistent" iterator that * will never throw {@link ConcurrentModificationException}, * and guarantees to traverse elements as they existed upon * construction of the iterator, and may (but is not guaranteed to) @@ -706,7 +741,7 @@ private class Itr implements Iterator<E> { /* - * Basic weak-consistent iterator. At all times hold the next + * Basic weakly-consistent iterator. At all times hold the next * item to hand out so that if hasNext() reports true, we will * still have it to return even if lost race with a take etc. */ @@ -715,17 +750,13 @@ private E currentElement; Itr() { - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { current = head.next; if (current != null) currentElement = current.item; } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } @@ -733,54 +764,54 @@ return current != null; } + /** + * Unlike other traversal methods, iterators need to handle: + * - dequeued nodes (p.next == p) + * - interior removed nodes (p.item == null) + */ + private Node<E> nextNode(Node<E> p) { + Node<E> s = p.next; + if (p == s) + return head.next; + // Skip over removed nodes. + // May be necessary if multiple interior Nodes are removed. + while (s != null && s.item == null) + s = s.next; + return s; + } + public E next() { - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { if (current == null) throw new NoSuchElementException(); E x = currentElement; lastRet = current; - current = current.next; - if (current != null) - currentElement = current.item; + current = nextNode(current); + currentElement = (current == null) ? null : current.item; return x; } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } public void remove() { if (lastRet == null) throw new IllegalStateException(); - final ReentrantLock putLock = LinkedBlockingQueue.this.putLock; - final ReentrantLock takeLock = LinkedBlockingQueue.this.takeLock; - putLock.lock(); - takeLock.lock(); + fullyLock(); try { Node<E> node = lastRet; lastRet = null; - Node<E> trail = head; - Node<E> p = head.next; - while (p != null && p != node) { - trail = p; - p = p.next; - } - if (p == node) { - p.item = null; - trail.next = p.next; - if (last == p) - last = trail; - int c = count.getAndDecrement(); - if (c == capacity) - notFull.signalAll(); + for (Node<E> trail = head, p = trail.next; + p != null; + trail = p, p = p.next) { + if (p == node) { + unlink(p, trail); + break; + } } } finally { - takeLock.unlock(); - putLock.unlock(); + fullyUnlock(); } } } @@ -789,7 +820,7 @@ * Save the state to a stream (that is, serialize it). * * @serialData The capacity is emitted (int), followed by all of - * its elements (each an <tt>Object</tt>) in the proper order, + * its elements (each an {@code Object}) in the proper order, * followed by a null * @param s the stream */ @@ -815,6 +846,7 @@ /** * Reconstitute this queue instance from a stream (that is, * deserialize it). + * * @param s the stream */ private void readObject(java.io.ObjectInputStream s) @@ -827,6 +859,7 @@ // Read in all elements and place in queue for (;;) { + @SuppressWarnings("unchecked") E item = (E)s.readObject(); if (item == null) break;
--- a/src/share/classes/sun/nio/cs/Surrogate.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/share/classes/sun/nio/cs/Surrogate.java Wed Jul 29 11:19:14 2009 -0700 @@ -30,7 +30,6 @@ import java.nio.charset.MalformedInputException; import java.nio.charset.UnmappableCharacterException; - /** * Utility class for dealing with surrogates. * @@ -41,19 +40,15 @@ private Surrogate() { } - // UTF-16 surrogate-character ranges - // - public static final char MIN_HIGH = '\uD800'; - public static final char MAX_HIGH = '\uDBFF'; - public static final char MIN_LOW = '\uDC00'; - public static final char MAX_LOW = '\uDFFF'; - public static final char MIN = MIN_HIGH; - public static final char MAX = MAX_LOW; - - // Range of UCS-4 values that need surrogates in UTF-16 - // - public static final int UCS4_MIN = 0x10000; - public static final int UCS4_MAX = (1 << 20) + UCS4_MIN - 1; + // TODO: Deprecate/remove the following redundant definitions + public static final char MIN_HIGH = Character.MIN_HIGH_SURROGATE; + public static final char MAX_HIGH = Character.MAX_HIGH_SURROGATE; + public static final char MIN_LOW = Character.MIN_LOW_SURROGATE; + public static final char MAX_LOW = Character.MAX_LOW_SURROGATE; + public static final char MIN = Character.MIN_SURROGATE; + public static final char MAX = Character.MAX_SURROGATE; + public static final int UCS4_MIN = Character.MIN_SUPPLEMENTARY_CODE_POINT; + public static final int UCS4_MAX = Character.MAX_CODE_POINT; /** * Tells whether or not the given UTF-16 value is a high surrogate. @@ -77,35 +72,45 @@ } /** + * Tells whether or not the given UCS-4 character is in the Basic + * Multilingual Plane, and can be represented using a single char. + */ + public static boolean isBMP(int uc) { + return (int) (char) uc == uc; + } + + /** * Tells whether or not the given UCS-4 character must be represented as a * surrogate pair in UTF-16. */ public static boolean neededFor(int uc) { - return (uc >= UCS4_MIN) && (uc <= UCS4_MAX); + return Character.isSupplementaryCodePoint(uc); } /** * Returns the high UTF-16 surrogate for the given UCS-4 character. */ public static char high(int uc) { - assert neededFor(uc); - return (char)(0xd800 | (((uc - UCS4_MIN) >> 10) & 0x3ff)); + assert Character.isSupplementaryCodePoint(uc); + return (char)((uc >> 10) + + (Character.MIN_HIGH_SURROGATE + - (Character.MIN_SUPPLEMENTARY_CODE_POINT >> 10))); } /** * Returns the low UTF-16 surrogate for the given UCS-4 character. */ public static char low(int uc) { - assert neededFor(uc); - return (char)(0xdc00 | ((uc - UCS4_MIN) & 0x3ff)); + assert Character.isSupplementaryCodePoint(uc); + return (char)((uc & 0x3ff) + Character.MIN_LOW_SURROGATE); } /** * Converts the given surrogate pair into a 32-bit UCS-4 character. */ public static int toUCS4(char c, char d) { - assert isHigh(c) && isLow(d); - return (((c & 0x3ff) << 10) | (d & 0x3ff)) + 0x10000; + assert Character.isHighSurrogate(c) && Character.isLowSurrogate(d); + return Character.toCodePoint(c, d); } /** @@ -178,14 +183,14 @@ * object */ public int parse(char c, CharBuffer in) { - if (Surrogate.isHigh(c)) { + if (Character.isHighSurrogate(c)) { if (!in.hasRemaining()) { error = CoderResult.UNDERFLOW; return -1; } char d = in.get(); - if (Surrogate.isLow(d)) { - character = toUCS4(c, d); + if (Character.isLowSurrogate(d)) { + character = Character.toCodePoint(c, d); isPair = true; error = null; return character; @@ -193,7 +198,7 @@ error = CoderResult.malformedForLength(1); return -1; } - if (Surrogate.isLow(c)) { + if (Character.isLowSurrogate(c)) { error = CoderResult.malformedForLength(1); return -1; } @@ -220,14 +225,14 @@ */ public int parse(char c, char[] ia, int ip, int il) { assert (ia[ip] == c); - if (Surrogate.isHigh(c)) { + if (Character.isHighSurrogate(c)) { if (il - ip < 2) { error = CoderResult.UNDERFLOW; return -1; } char d = ia[ip + 1]; - if (Surrogate.isLow(d)) { - character = toUCS4(c, d); + if (Character.isLowSurrogate(d)) { + character = Character.toCodePoint(c, d); isPair = true; error = null; return character; @@ -235,7 +240,7 @@ error = CoderResult.malformedForLength(1); return -1; } - if (Surrogate.isLow(c)) { + if (Character.isLowSurrogate(c)) { error = CoderResult.malformedForLength(1); return -1; } @@ -282,7 +287,7 @@ * error() will return a descriptive result object */ public int generate(int uc, int len, CharBuffer dst) { - if (uc <= 0xffff) { + if (Surrogate.isBMP(uc)) { if (Surrogate.is(uc)) { error = CoderResult.malformedForLength(len); return -1; @@ -294,12 +299,7 @@ dst.put((char)uc); error = null; return 1; - } - if (uc < Surrogate.UCS4_MIN) { - error = CoderResult.malformedForLength(len); - return -1; - } - if (uc <= Surrogate.UCS4_MAX) { + } else if (Character.isSupplementaryCodePoint(uc)) { if (dst.remaining() < 2) { error = CoderResult.OVERFLOW; return -1; @@ -308,9 +308,10 @@ dst.put(Surrogate.low(uc)); error = null; return 2; + } else { + error = CoderResult.unmappableForLength(len); + return -1; } - error = CoderResult.unmappableForLength(len); - return -1; } /** @@ -330,7 +331,7 @@ * error() will return a descriptive result object */ public int generate(int uc, int len, char[] da, int dp, int dl) { - if (uc <= 0xffff) { + if (Surrogate.isBMP(uc)) { if (Surrogate.is(uc)) { error = CoderResult.malformedForLength(len); return -1; @@ -342,12 +343,7 @@ da[dp] = (char)uc; error = null; return 1; - } - if (uc < Surrogate.UCS4_MIN) { - error = CoderResult.malformedForLength(len); - return -1; - } - if (uc <= Surrogate.UCS4_MAX) { + } else if (Character.isSupplementaryCodePoint(uc)) { if (dl - dp < 2) { error = CoderResult.OVERFLOW; return -1; @@ -356,11 +352,11 @@ da[dp + 1] = Surrogate.low(uc); error = null; return 2; + } else { + error = CoderResult.unmappableForLength(len); + return -1; } - error = CoderResult.unmappableForLength(len); - return -1; } - } }
--- a/src/solaris/native/sun/nio/fs/UnixNativeDispatcher.c Fri Jul 24 11:22:29 2009 -0700 +++ b/src/solaris/native/sun/nio/fs/UnixNativeDispatcher.c Wed Jul 29 11:19:14 2009 -0700 @@ -85,19 +85,21 @@ static jfieldID entry_dev; /** - * System calls that may not be available at build time. + * System calls that may not be available at run time. */ typedef int openat64_func(int, const char *, int, ...); typedef int fstatat64_func(int, const char *, struct stat64 *, int); typedef int unlinkat_func(int, const char*, int); typedef int renameat_func(int, const char*, int, const char*); typedef int futimesat_func(int, const char *, const struct timeval *); +typedef DIR* fdopendir_func(int); static openat64_func* my_openat64_func = NULL; static fstatat64_func* my_fstatat64_func = NULL; static unlinkat_func* my_unlinkat_func = NULL; static renameat_func* my_renameat_func = NULL; static futimesat_func* my_futimesat_func = NULL; +static fdopendir_func* my_fdopendir_func = NULL; /** * fstatat missing from glibc on Linux. Temporary workaround @@ -183,7 +185,7 @@ entry_options = (*env)->GetFieldID(env, clazz, "opts", "[B"); entry_dev = (*env)->GetFieldID(env, clazz, "dev", "J"); - /* system calls that might not be available at build time */ + /* system calls that might not be available at run time */ #if defined(__solaris__) && defined(_LP64) /* Solaris 64-bit does not have openat64/fstatat64 */ @@ -196,6 +198,7 @@ my_unlinkat_func = (unlinkat_func*) dlsym(RTLD_DEFAULT, "unlinkat"); my_renameat_func = (renameat_func*) dlsym(RTLD_DEFAULT, "renameat"); my_futimesat_func = (futimesat_func*) dlsym(RTLD_DEFAULT, "futimesat"); + my_fdopendir_func = (fdopendir_func*) dlsym(RTLD_DEFAULT, "fdopendir"); #if defined(FSTATAT64_SYSCALL_AVAILABLE) /* fstatat64 missing from glibc */ @@ -205,7 +208,7 @@ if (my_openat64_func != NULL && my_fstatat64_func != NULL && my_unlinkat_func != NULL && my_renameat_func != NULL && - my_futimesat_func != NULL) + my_futimesat_func != NULL && my_fdopendir_func != NULL) { flags |= sun_nio_fs_UnixNativeDispatcher_HAS_AT_SYSCALLS; } @@ -565,8 +568,13 @@ Java_sun_nio_fs_UnixNativeDispatcher_fdopendir(JNIEnv* env, jclass this, int dfd) { DIR* dir; + if (my_fdopendir_func == NULL) { + JNU_ThrowInternalError(env, "should not reach here"); + return (jlong)-1; + } + /* EINTR not listed as a possible error */ - dir = fdopendir((int)dfd); + dir = (*my_fdopendir_func)((int)dfd); if (dir == NULL) { throwUnixException(env, errno); }
--- a/src/windows/classes/sun/nio/fs/WindowsPath.java Fri Jul 24 11:22:29 2009 -0700 +++ b/src/windows/classes/sun/nio/fs/WindowsPath.java Wed Jul 29 11:19:14 2009 -0700 @@ -1177,14 +1177,20 @@ /* * Windows treates symbolic links to directories differently than it - * does to other file types. For that reason we check if the exists and - * is a directory. + * does to other file types. For that reason we need to check if the + * target is a directory (or a directory junction). */ + WindowsPath resolvedTarget; + if (target.type == WindowsPathType.RELATIVE) { + WindowsPath parent = getParent(); + resolvedTarget = (parent == null) ? target : parent.resolve(target); + } else { + resolvedTarget = resolve(target); + } int flags = 0; - WindowsPath resolvedTarget = - WindowsPath.createFromNormalizedPath(getFileSystem(), resolve(target).path); try { - if (WindowsFileAttributes.get(resolvedTarget, true).isDirectory()) + WindowsFileAttributes wattrs = WindowsFileAttributes.get(resolvedTarget, false); + if (wattrs.isDirectory() || wattrs.isDirectoryLink()) flags |= SYMBOLIC_LINK_FLAG_DIRECTORY; } catch (WindowsException x) { // unable to access target so assume target is not a directory
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/com/sun/jndi/ldap/BalancedParentheses.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,259 @@ +/* + * Copyright 2009 Sun Microsystems, Inc. All Rights Reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/** + * @test + * @bug 6449574 + * @summary Invalid ldap filter is accepted and processed + */ + +import java.io.*; +import javax.naming.*; +import javax.naming.directory.*; +import java.util.Properties; +import java.util.Hashtable; + +import java.net.Socket; +import java.net.ServerSocket; + +public class BalancedParentheses { + // Should we run the client or server in a separate thread? + // + // Both sides can throw exceptions, but do you have a preference + // as to which side should be the main thread. + static boolean separateServerThread = true; + + // use any free port by default + volatile int serverPort = 0; + + // Is the server ready to serve? + volatile static boolean serverReady = false; + + // Define the server side of the test. + // + // If the server prematurely exits, serverReady will be set to true + // to avoid infinite hangs. + void doServerSide() throws Exception { + ServerSocket serverSock = new ServerSocket(serverPort); + + // signal client, it's ready to accecpt connection + serverPort = serverSock.getLocalPort(); + serverReady = true; + + // accept a connection + Socket socket = serverSock.accept(); + System.out.println("Server: Connection accepted"); + + InputStream is = socket.getInputStream(); + OutputStream os = socket.getOutputStream(); + + // read the bindRequest + while (is.read() != -1) { + // ignore + is.skip(is.available()); + break; + } + + byte[] bindResponse = {0x30, 0x0C, 0x02, 0x01, 0x01, 0x61, 0x07, 0x0A, + 0x01, 0x00, 0x04, 0x00, 0x04, 0x00}; + // write bindResponse + os.write(bindResponse); + os.flush(); + + // ignore any more request. + while (is.read() != -1) { + // ignore + is.skip(is.available()); + } + + is.close(); + os.close(); + socket.close(); + serverSock.close(); + } + + // Define the client side of the test. + // + // If the server prematurely exits, serverReady will be set to true + // to avoid infinite hangs. + void doClientSide() throws Exception { + // Wait for server to get started. + while (!serverReady) { + Thread.sleep(50); + } + + // set up the environment for creating the initial context + Hashtable<Object, Object> env = new Hashtable<Object, Object>(); + env.put(Context.INITIAL_CONTEXT_FACTORY, + "com.sun.jndi.ldap.LdapCtxFactory"); + env.put(Context.PROVIDER_URL, "ldap://localhost:" + serverPort); + env.put("com.sun.jndi.ldap.read.timeout", "1000"); + + // env.put(Context.SECURITY_AUTHENTICATION, "simple"); + // env.put(Context.SECURITY_PRINCIPAL,"cn=root"); + // env.put(Context.SECURITY_CREDENTIALS,"root"); + + // create initial context + DirContext context = new InitialDirContext(env); + + // searching + SearchControls scs = new SearchControls(); + scs.setSearchScope(SearchControls.SUBTREE_SCOPE); + + try { + NamingEnumeration answer = context.search( + "o=sun,c=us", "(&(cn=Bob)))", scs); + } catch (InvalidSearchFilterException isfe) { + // ignore, it is the expected filter exception. + System.out.println("Expected exception: " + isfe.getMessage()); + } catch (NamingException ne) { + // maybe a read timeout exception, as the server does not response. + throw new Exception("Expect a InvalidSearchFilterException", ne); + } + + try { + NamingEnumeration answer = context.search( + "o=sun,c=us", ")(&(cn=Bob)", scs); + } catch (InvalidSearchFilterException isfe) { + // ignore, it is the expected filter exception. + System.out.println("Expected exception: " + isfe.getMessage()); + } catch (NamingException ne) { + // maybe a read timeout exception, as the server does not response. + throw new Exception("Expect a InvalidSearchFilterException", ne); + } + + try { + NamingEnumeration answer = context.search( + "o=sun,c=us", "(&(cn=Bob))", scs); + } catch (InvalidSearchFilterException isfe) { + // ignore, it is the expected filter exception. + throw new Exception("Unexpected ISFE", isfe); + } catch (NamingException ne) { + // maybe a read timeout exception, as the server does not response. + System.out.println("Expected exception: " + ne.getMessage()); + } + + context.close(); + } + + /* + * ============================================================ + * The remainder is just support stuff + */ + + // client and server thread + Thread clientThread = null; + Thread serverThread = null; + + // client and server exceptions + volatile Exception serverException = null; + volatile Exception clientException = null; + + void startServer(boolean newThread) throws Exception { + if (newThread) { + serverThread = new Thread() { + public void run() { + try { + doServerSide(); + } catch (Exception e) { + /* + * Our server thread just died. + * + * Release the client, if not active already... + */ + System.err.println("Server died..."); + System.err.println(e); + serverReady = true; + serverException = e; + } + } + }; + serverThread.start(); + } else { + doServerSide(); + } + } + + void startClient(boolean newThread) throws Exception { + if (newThread) { + clientThread = new Thread() { + public void run() { + try { + doClientSide(); + } catch (Exception e) { + /* + * Our client thread just died. + */ + System.err.println("Client died..."); + clientException = e; + } + } + }; + clientThread.start(); + } else { + doClientSide(); + } + } + + // Primary constructor, used to drive remainder of the test. + BalancedParentheses() throws Exception { + if (separateServerThread) { + startServer(true); + startClient(false); + } else { + startClient(true); + startServer(false); + } + + /* + * Wait for other side to close down. + */ + if (separateServerThread) { + serverThread.join(); + } else { + clientThread.join(); + } + + /* + * When we get here, the test is pretty much over. + * + * If the main thread excepted, that propagates back + * immediately. If the other thread threw an exception, we + * should report back. + */ + if (serverException != null) { + System.out.print("Server Exception:"); + throw serverException; + } + if (clientException != null) { + System.out.print("Client Exception:"); + throw clientException; + } + } + + public static void main(String[] args) throws Exception { + // start the test + new BalancedParentheses(); + } + +}
--- a/test/java/nio/channels/AsynchronousChannelGroup/GroupOfOne.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousChannelGroup/GroupOfOne.java Wed Jul 29 11:19:14 2009 -0700 @@ -44,9 +44,9 @@ final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open() .bind(new InetSocketAddress(0)); - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { + listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { public void completed(AsynchronousSocketChannel ch, Void att) { - listener.accept(null, this); + listener.accept((Void)null, this); } public void failed(Throwable exc, Void att) { } @@ -81,13 +81,13 @@ // 2. the close/shutdown completes final CountDownLatch latch = new CountDownLatch(2); - ch.connect(sa, null, new CompletionHandler<Void,Void>() { + ch.connect(sa, (Void)null, new CompletionHandler<Void,Void>() { public void completed(Void result, Void att) { System.out.println("Connected"); // initiate I/O operation that does not complete (successfully) ByteBuffer buf = ByteBuffer.allocate(100); - ch.read(buf, null, new CompletionHandler<Integer,Void>() { + ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { throw new RuntimeException(); }
--- a/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousChannelGroup/Identity.java Wed Jul 29 11:19:14 2009 -0700 @@ -78,15 +78,15 @@ final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open() .bind(new InetSocketAddress(0)); - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { + listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { public void completed(final AsynchronousSocketChannel ch, Void att) { - listener.accept(null, this); + listener.accept((Void)null, this); final ByteBuffer buf = ByteBuffer.allocate(100); - ch.read(buf, null, new CompletionHandler<Integer,Void>() { + ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { buf.clear(); - ch.read(buf, null, this); + ch.read(buf, (Void)null, this); } public void failed(Throwable exc, Void att) { }
--- a/test/java/nio/channels/AsynchronousChannelGroup/Restart.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousChannelGroup/Restart.java Wed Jul 29 11:19:14 2009 -0700 @@ -94,7 +94,7 @@ for (int i=0; i<count; i++) { final CountDownLatch latch = new CountDownLatch(1); - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { + listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { public void completed(AsynchronousSocketChannel ch, Void att) { try { ch.close();
--- a/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousChannelGroup/Unbounded.java Wed Jul 29 11:19:14 2009 -0700 @@ -45,10 +45,10 @@ final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open() .bind(new InetSocketAddress(0)); - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { + listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { public void completed(AsynchronousSocketChannel ch, Void att) { queue.add(ch); - listener.accept(null, this); + listener.accept((Void)null, this); } public void failed(Throwable exc, Void att) { }
--- a/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousDatagramChannel/Basic.java Wed Jul 29 11:19:14 2009 -0700 @@ -66,7 +66,7 @@ // Test: datagram packet not received immediately dst.clear(); final CountDownLatch latch = new CountDownLatch(1); - ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() { + ch.receive(dst, (Void)null, new CompletionHandler<SocketAddress,Void>() { public void completed(SocketAddress source, Void att) { latch.countDown(); } @@ -82,7 +82,7 @@ // Test: timeout dst.clear(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); - ch.receive(dst, 2, TimeUnit.SECONDS, null, new CompletionHandler<SocketAddress,Void>() { + ch.receive(dst, 2, TimeUnit.SECONDS, (Void)null, new CompletionHandler<SocketAddress,Void>() { public void completed(SocketAddress source, Void att) { } public void failed (Throwable exc, Void att) { @@ -101,7 +101,7 @@ // AsynchronousCloseException dst = ByteBuffer.allocateDirect(100); exception.set(null); - ch.receive(dst, null, new CompletionHandler<SocketAddress,Void>() { + ch.receive(dst, (Void)null, new CompletionHandler<SocketAddress,Void>() { public void completed(SocketAddress source, Void att) { } public void failed (Throwable exc, Void att) { @@ -156,7 +156,7 @@ // Test: datagram packet not received immediately dst.clear(); final CountDownLatch l1 = new CountDownLatch(1); - ch.read(dst, null, new CompletionHandler<Integer,Void>() { + ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { l1.countDown(); } @@ -172,7 +172,7 @@ // Test: timeout dst.clear(); final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); - ch.read(dst, 2, TimeUnit.SECONDS, null, new CompletionHandler<Integer,Void>() { + ch.read(dst, 2, TimeUnit.SECONDS, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { } public void failed (Throwable exc, Void att) { @@ -191,7 +191,7 @@ // AsynchronousCloseException dst.clear(); exception.set(null); - ch.read(dst, null, new CompletionHandler<Integer,Void>() { + ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { } public void failed (Throwable exc, Void att) { @@ -238,7 +238,7 @@ // Test: send datagram packet to reader and check completion handler // is invoked final CountDownLatch l2 = new CountDownLatch(1); - ch.send(ByteBuffer.wrap(msg), sa, null, new CompletionHandler<Integer,Void>() { + ch.send(ByteBuffer.wrap(msg), sa, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesSent, Void att) { if (bytesSent != msg.length) throw new RuntimeException("Unexpected number of bytes received"); @@ -261,7 +261,7 @@ // Test: check that failed method is invoked ch.close(); final CountDownLatch l3 = new CountDownLatch(1); - ch.send(ByteBuffer.wrap(msg), sa, null, new CompletionHandler<Integer,Void>() { + ch.send(ByteBuffer.wrap(msg), sa, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesSent, Void att) { throw new RuntimeException("completed method invoked"); } @@ -315,7 +315,7 @@ // Test: write datagram and check completion handler is invoked final CountDownLatch l2 = new CountDownLatch(1); - ch.write(ByteBuffer.wrap(msg), null, new CompletionHandler<Integer,Void>() { + ch.write(ByteBuffer.wrap(msg), (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesSent, Void att) { if (bytesSent != msg.length) throw new RuntimeException("Unexpected number of bytes received"); @@ -372,7 +372,7 @@ final CountDownLatch latch = new CountDownLatch(1); long timeout = (i == 0) ? 0L : 60L; Future<SocketAddress> remote = ch - .receive(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, null, + .receive(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, (Void)null, new CompletionHandler<SocketAddress,Void>() { public void completed(SocketAddress source, Void att) { } @@ -395,7 +395,7 @@ final CountDownLatch latch = new CountDownLatch(1); long timeout = (i == 0) ? 0L : 60L; Future<Integer> result = ch - .read(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, null, + .read(ByteBuffer.allocate(100), timeout, TimeUnit.SECONDS, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer bytesRead, Void att) { }
--- a/test/java/nio/channels/AsynchronousFileChannel/Basic.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousFileChannel/Basic.java Wed Jul 29 11:19:14 2009 -0700 @@ -190,7 +190,7 @@ if (fl == null) throw new RuntimeException("Unable to acquire lock"); try { - ch.lock(null, new CompletionHandler<FileLock,Void> () { + ch.lock((Void)null, new CompletionHandler<FileLock,Void> () { public void completed(FileLock result, Void att) { } public void failed(Throwable exc, Void att) { @@ -217,7 +217,7 @@ ByteBuffer buf = ByteBuffer.allocateDirect(100); final CountDownLatch latch = new CountDownLatch(1); - ch.read(buf, 0L, null, new CompletionHandler<Integer,Void>() { + ch.read(buf, 0L, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { try { Thread.currentThread().interrupt(); @@ -311,7 +311,7 @@ final AtomicReference<Thread> invoker = new AtomicReference<Thread>(); final CountDownLatch latch = new CountDownLatch(1); - ch.write(genBuffer(), 0L, null, new CompletionHandler<Integer,Void>() { + ch.write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { invoker.set(Thread.currentThread()); latch.countDown(); @@ -410,7 +410,7 @@ // start write operation final CountDownLatch latch = new CountDownLatch(1); - Future<Integer> res = ch.write(genBuffer(), 0L, null, + Future<Integer> res = ch.write(genBuffer(), 0L, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { }
--- a/test/java/nio/channels/AsynchronousServerSocketChannel/Basic.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousServerSocketChannel/Basic.java Wed Jul 29 11:19:14 2009 -0700 @@ -95,7 +95,7 @@ final AtomicReference<Throwable> exception = new AtomicReference<Throwable>(); // start accepting - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() { + listener.accept((Void)null, new CompletionHandler<AsynchronousSocketChannel,Void>() { public void completed(AsynchronousSocketChannel ch, Void att) { try { ch.close();
--- a/test/java/nio/channels/AsynchronousSocketChannel/Basic.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousSocketChannel/Basic.java Wed Jul 29 11:19:14 2009 -0700 @@ -181,7 +181,7 @@ } final AtomicReference<Throwable> connectException = new AtomicReference<Throwable>(); - ch.connect(server.address(), null, new CompletionHandler<Void,Void>() { + ch.connect(server.address(), (Void)null, new CompletionHandler<Void,Void>() { public void completed(Void result, Void att) { } public void failed(Throwable exc, Void att) { @@ -332,7 +332,7 @@ // start read operation final CountDownLatch latch = new CountDownLatch(1); ByteBuffer buf = ByteBuffer.allocate(1); - Future<Integer> res = ch.read(buf, null, + Future<Integer> res = ch.read(buf, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { } @@ -397,11 +397,11 @@ // reads should complete immediately final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity() + 100); final CountDownLatch latch = new CountDownLatch(1); - ch.read(dst, null, new CompletionHandler<Integer,Void>() { + ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { int n = result; if (n > 0) { - ch.read(dst, null, this); + ch.read(dst, (Void)null, this); } else { latch.countDown(); } @@ -450,10 +450,10 @@ // read until the buffer is full final ByteBuffer dst = ByteBuffer.allocateDirect(src.capacity()); final CountDownLatch latch = new CountDownLatch(1); - ch.read(dst, null, new CompletionHandler<Integer,Void>() { + ch.read(dst, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { if (dst.hasRemaining()) { - ch.read(dst, null, this); + ch.read(dst, (Void)null, this); } else { latch.countDown(); } @@ -508,7 +508,7 @@ // scattering read that completes ascynhronously final CountDownLatch latch = new CountDownLatch(1); - ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, null, + ch.read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, new CompletionHandler<Long,Void>() { public void completed(Long result, Void att) { long n = result; @@ -536,7 +536,7 @@ dsts[i].rewind(); } long n = ch - .read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, null, null).get(); + .read(dsts, 0, dsts.length, 0L, TimeUnit.SECONDS, (Void)null, null).get(); if (n <= 0) throw new RuntimeException("No bytes read"); @@ -562,10 +562,10 @@ // write all bytes and close connection when done final ByteBuffer src = genBuffer(); - ch.write(src, null, new CompletionHandler<Integer,Void>() { + ch.write(src, (Void)null, new CompletionHandler<Integer,Void>() { public void completed(Integer result, Void att) { if (src.hasRemaining()) { - ch.write(src, null, this); + ch.write(src, (Void)null, this); } else { try { ch.close(); @@ -616,7 +616,7 @@ // write buffers (should complete immediately) ByteBuffer[] srcs = genBuffers(1); long n = ch - .write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, null, null).get(); + .write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, null).get(); if (n <= 0) throw new RuntimeException("No bytes written"); @@ -629,7 +629,7 @@ // write until socket buffer is full so as to create the conditions // for when a write does not complete immediately srcs = genBuffers(1); - ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, null, + ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, (Void)null, new CompletionHandler<Long,Void>() { public void completed(Long result, Void att) { long n = result; @@ -639,7 +639,7 @@ if (continueWriting.get()) { ByteBuffer[] srcs = genBuffers(8); ch.write(srcs, 0, srcs.length, 0L, TimeUnit.SECONDS, - null, this); + (Void)null, this); } } public void failed(Throwable exc, Void att) { @@ -717,7 +717,7 @@ // this read should timeout ByteBuffer dst = ByteBuffer.allocate(512); try { - ch.read(dst, 3, TimeUnit.SECONDS, null, null).get(); + ch.read(dst, 3, TimeUnit.SECONDS, (Void)null, null).get(); throw new RuntimeException("Read did not timeout"); } catch (ExecutionException x) { if (!(x.getCause() instanceof InterruptedByTimeoutException))
--- a/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Wed Jul 29 11:19:14 2009 -0700 @@ -99,7 +99,7 @@ void start() { sentBuffer.position(0); sentBuffer.limit(sentBuffer.capacity()); - channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () { + channel.write(sentBuffer, (Void)null, new CompletionHandler<Integer,Void> () { public void completed(Integer nwrote, Void att) { bytesSent += nwrote; if (finished) { @@ -107,7 +107,7 @@ } else { sentBuffer.position(0); sentBuffer.limit(sentBuffer.capacity()); - channel.write(sentBuffer, null, this); + channel.write(sentBuffer, (Void)null, this); } } public void failed(Throwable exc, Void att) { @@ -142,14 +142,14 @@ } void start() { - channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () { + channel.read(readBuffer, (Void)null, new CompletionHandler<Integer,Void> () { public void completed(Integer nread, Void att) { if (nread < 0) { closeUnchecked(channel); } else { bytesRead += nread; readBuffer.clear(); - channel.read(readBuffer, null, this); + channel.read(readBuffer, (Void)null, this); } } public void failed(Throwable exc, Void att) {
--- a/test/java/nio/file/Path/Links.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/nio/file/Path/Links.java Wed Jul 29 11:19:14 2009 -0700 @@ -22,7 +22,7 @@ */ /* @test - * @bug 4313887 6838333 + * @bug 4313887 6838333 6863864 * @summary Unit test for java.nio.file.Path createSymbolicLink, * readSymbolicLink, and createLink methods * @library .. @@ -31,7 +31,6 @@ import java.nio.file.*; import java.nio.file.attribute.*; import java.io.*; -import java.util.*; public class Links { @@ -47,7 +46,7 @@ * Exercise createSymbolicLink and readLink methods */ static void testSymLinks(Path dir) throws IOException { - Path link = dir.resolve("link"); + final Path link = dir.resolve("link"); // Check if sym links are supported try { @@ -76,6 +75,63 @@ link.delete(); } } + + // Test links to directory + Path mydir = dir.resolve("mydir"); + Path myfile = mydir.resolve("myfile"); + try { + mydir.createDirectory(); + myfile.createFile(); + + // link -> "mydir" + link.createSymbolicLink(mydir.getName()); + assertTrue(link.readSymbolicLink().equals(mydir.getName())); + + // Test access to directory via link + DirectoryStream<Path> stream = link.newDirectoryStream(); + try { + boolean found = false; + for (Path entry: stream) { + if (entry.getName().equals(myfile.getName())) { + found = true; + break; + } + } + assertTrue(found); + } finally { + stream.close(); + } + + // Test link2 -> link -> mydir + final Path link2 = dir.resolve("link2"); + Path target2 = link.getName(); + link2.createSymbolicLink(target2); + try { + assertTrue(link2.readSymbolicLink().equals(target2)); + link2.newDirectoryStream().close(); + } finally { + link2.delete(); + } + + // Remove mydir and re-create link2 before re-creating mydir + // (This is a useful test on Windows to ensure that creating a + // sym link to a directory sym link creates the right type of link). + myfile.delete(); + mydir.delete(); + link2.createSymbolicLink(target2); + try { + assertTrue(link2.readSymbolicLink().equals(target2)); + mydir.createDirectory(); + link2.newDirectoryStream().close(); + } finally { + link2.delete(); + } + + } finally { + myfile.deleteIfExists(); + mydir.deleteIfExists(); + link.deleteIfExists(); + } } /**
--- a/test/java/util/Collection/MOAT.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/util/Collection/MOAT.java Wed Jul 29 11:19:14 2009 -0700 @@ -426,6 +426,36 @@ q.poll(); equal(q.size(), 4); checkFunctionalInvariants(q); + if ((q instanceof LinkedBlockingQueue) || + (q instanceof LinkedBlockingDeque) || + (q instanceof ConcurrentLinkedQueue)) { + testQueueIteratorRemove(q); + } + } + + private static void testQueueIteratorRemove(Queue<Integer> q) { + System.err.printf("testQueueIteratorRemove %s%n", + q.getClass().getSimpleName()); + q.clear(); + for (int i = 0; i < 5; i++) + q.add(i); + Iterator<Integer> it = q.iterator(); + check(it.hasNext()); + for (int i = 3; i >= 0; i--) + q.remove(i); + equal(it.next(), 0); + equal(it.next(), 4); + + q.clear(); + for (int i = 0; i < 5; i++) + q.add(i); + it = q.iterator(); + equal(it.next(), 0); + check(it.hasNext()); + for (int i = 1; i < 4; i++) + q.remove(i); + equal(it.next(), 1); + equal(it.next(), 4); } private static void testList(final List<Integer> l) { @@ -451,6 +481,11 @@ } private static void testCollection(Collection<Integer> c) { + try { testCollection1(c); } + catch (Throwable t) { unexpected(t); } + } + + private static void testCollection1(Collection<Integer> c) { System.out.println("\n==> " + c.getClass().getName());
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/BlockingQueue/OfferDrainToLoops.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,130 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/* + * @test + * @bug 6805775 6815766 + * @summary Test concurrent offer vs. drainTo + */ + +import java.util.*; +import java.util.concurrent.*; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class OfferDrainToLoops { + void checkNotContainsNull(Iterable it) { + for (Object x : it) + check(x != null); + } + + abstract class CheckedThread extends Thread { + abstract protected void realRun(); + public void run() { + try { realRun(); } catch (Throwable t) { unexpected(t); } + } + { + setDaemon(true); + start(); + } + } + + void test(String[] args) throws Throwable { + test(new LinkedBlockingQueue()); + test(new LinkedBlockingQueue(2000)); + test(new LinkedBlockingDeque()); + test(new LinkedBlockingDeque(2000)); + test(new ArrayBlockingQueue(2000)); + } + + void test(final BlockingQueue q) throws Throwable { + System.out.println(q.getClass().getSimpleName()); + final long testDurationSeconds = 1L; + final long testDurationMillis = testDurationSeconds * 1000L; + final long quittingTimeNanos + = System.nanoTime() + testDurationSeconds * 1000L * 1000L * 1000L; + + Thread offerer = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if ((i % 1024) == 0 && + System.nanoTime() - quittingTimeNanos > 0) + break; + while (! q.offer(i)) + Thread.yield(); + }}}; + + Thread drainer = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if (System.nanoTime() - quittingTimeNanos > 0) + break; + List list = new ArrayList(); + int n = q.drainTo(list); + equal(list.size(), n); + for (int j = 0; j < n - 1; j++) + equal((Long) list.get(j) + 1L, list.get(j + 1)); + Thread.yield(); + }}}; + + Thread scanner = new CheckedThread() { + protected void realRun() { + for (long i = 0; ; i++) { + if (System.nanoTime() - quittingTimeNanos > 0) + break; + checkNotContainsNull(q); + Thread.yield(); + }}}; + + offerer.join(10 * testDurationMillis); + drainer.join(10 * testDurationMillis); + check(! offerer.isAlive()); + check(! drainer.isAlive()); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new OfferDrainToLoops().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +}
--- a/test/java/util/concurrent/ConcurrentLinkedQueue/ConcurrentQueueLoops.java Fri Jul 24 11:22:29 2009 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,146 +0,0 @@ -/* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - */ - -/* - * This file is available under and governed by the GNU General Public - * License version 2 only, as published by the Free Software Foundation. - * However, the following notice accompanied the original version of this - * file: - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -/* - * @test - * @bug 4486658 - * @compile -source 1.5 ConcurrentQueueLoops.java - * @run main/timeout=230 ConcurrentQueueLoops - * @summary Checks that a set of threads can repeatedly get and modify items - */ - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -public class ConcurrentQueueLoops { - static final ExecutorService pool = Executors.newCachedThreadPool(); - static AtomicInteger totalItems; - static boolean print = false; - - public static void main(String[] args) throws Exception { - int maxStages = 8; - int items = 100000; - - if (args.length > 0) - maxStages = Integer.parseInt(args[0]); - - print = false; - System.out.println("Warmup..."); - oneRun(1, items); - Thread.sleep(100); - oneRun(1, items); - Thread.sleep(100); - print = true; - - for (int i = 1; i <= maxStages; i += (i+1) >>> 1) { - oneRun(i, items); - } - pool.shutdown(); - if (! pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) - throw new Error(); - } - - static class Stage implements Callable<Integer> { - final Queue<Integer> queue; - final CyclicBarrier barrier; - int items; - Stage (Queue<Integer> q, CyclicBarrier b, int items) { - queue = q; - barrier = b; - this.items = items; - } - - public Integer call() { - // Repeatedly take something from queue if possible, - // transform it, and put back in. - try { - barrier.await(); - int l = 4321; - int takes = 0; - for (;;) { - Integer item = queue.poll(); - if (item != null) { - ++takes; - l = LoopHelpers.compute2(item.intValue()); - } - else if (takes != 0) { - totalItems.getAndAdd(-takes); - takes = 0; - } - else if (totalItems.get() <= 0) - break; - l = LoopHelpers.compute1(l); - if (items > 0) { - --items; - queue.offer(new Integer(l)); - } - else if ( (l & (3 << 5)) == 0) // spinwait - Thread.sleep(1); - } - return new Integer(l); - } - catch (Exception ie) { - ie.printStackTrace(); - throw new Error("Call loop failed"); - } - } - } - - static void oneRun(int n, int items) throws Exception { - Queue<Integer> q = new ConcurrentLinkedQueue<Integer>(); - LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); - CyclicBarrier barrier = new CyclicBarrier(n + 1, timer); - totalItems = new AtomicInteger(n * items); - ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(n); - for (int i = 0; i < n; ++i) - results.add(pool.submit(new Stage(q, barrier, items))); - - if (print) - System.out.print("Threads: " + n + "\t:"); - barrier.await(); - int total = 0; - for (int i = 0; i < n; ++i) { - Future<Integer> f = results.get(i); - Integer r = f.get(); - total += r.intValue(); - } - long endTime = System.nanoTime(); - long time = endTime - timer.startTime; - if (print) - System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item"); - if (total == 0) // avoid overoptimization - System.out.println("useless result: " + total); - - } -}
--- a/test/java/util/concurrent/ConcurrentLinkedQueue/LoopHelpers.java Fri Jul 24 11:22:29 2009 -0700 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,129 +0,0 @@ -/* - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 only, as - * published by the Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. - */ - -/* - * This file is available under and governed by the GNU General Public - * License version 2 only, as published by the Free Software Foundation. - * However, the following notice accompanied the original version of this - * file: - * - * Written by Doug Lea with assistance from members of JCP JSR-166 - * Expert Group and released to the public domain, as explained at - * http://creativecommons.org/licenses/publicdomain - */ - -/** - * Misc utilities in JSR166 performance tests - */ - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -class LoopHelpers { - - // Some mindless computation to do between synchronizations... - - /** - * generates 32 bit pseudo-random numbers. - * Adapted from http://www.snippets.org - */ - public static int compute1(int x) { - int lo = 16807 * (x & 0xFFFF); - int hi = 16807 * (x >>> 16); - lo += (hi & 0x7FFF) << 16; - if ((lo & 0x80000000) != 0) { - lo &= 0x7fffffff; - ++lo; - } - lo += hi >>> 15; - if (lo == 0 || (lo & 0x80000000) != 0) { - lo &= 0x7fffffff; - ++lo; - } - return lo; - } - - /** - * Computes a linear congruential random number a random number - * of times. - */ - public static int compute2(int x) { - int loops = (x >>> 4) & 7; - while (loops-- > 0) { - x = (x * 2147483647) % 16807; - } - return x; - } - - /** - * An actually useful random number generator, but unsynchronized. - * Basically same as java.util.Random. - */ - public static class SimpleRandom { - private final static long multiplier = 0x5DEECE66DL; - private final static long addend = 0xBL; - private final static long mask = (1L << 48) - 1; - static final AtomicLong seq = new AtomicLong(1); - private long seed = System.nanoTime() + seq.getAndIncrement(); - - public void setSeed(long s) { - seed = s; - } - - public int next() { - long nextseed = (seed * multiplier + addend) & mask; - seed = nextseed; - return ((int)(nextseed >>> 17)) & 0x7FFFFFFF; - } - } - - public static class BarrierTimer implements Runnable { - public volatile long startTime; - public volatile long endTime; - public void run() { - long t = System.nanoTime(); - if (startTime == 0) - startTime = t; - else - endTime = t; - } - public void clear() { - startTime = 0; - endTime = 0; - } - public long getTime() { - return endTime - startTime; - } - } - - public static String rightJustify(long n) { - // There's probably a better way to do this... - String field = " "; - String num = Long.toString(n); - if (num.length() >= field.length()) - return num; - StringBuffer b = new StringBuffer(field); - b.replace(b.length()-num.length(), b.length(), num); - return b.toString(); - } - -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/ConcurrentQueues/ConcurrentQueueLoops.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,198 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/* + * @test + * @bug 4486658 6785442 + * @run main ConcurrentQueueLoops 8 123456 + * @summary Checks that a set of threads can repeatedly get and modify items + */ + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +public class ConcurrentQueueLoops { + ExecutorService pool; + AtomicInteger totalItems; + boolean print; + + // Suitable for benchmarking. Overriden by args[0] for testing. + int maxStages = 20; + + // Suitable for benchmarking. Overriden by args[1] for testing. + int items = 1024 * 1024; + + Collection<Queue<Integer>> concurrentQueues() { + List<Queue<Integer>> queues = new ArrayList<Queue<Integer>>(); + queues.add(new ConcurrentLinkedQueue<Integer>()); + queues.add(new ArrayBlockingQueue<Integer>(items, false)); + //queues.add(new ArrayBlockingQueue<Integer>(count, true)); + queues.add(new LinkedBlockingQueue<Integer>()); + queues.add(new LinkedBlockingDeque<Integer>()); + + try { + queues.add((Queue<Integer>) + Class.forName("java.util.concurrent.LinkedTransferQueue") + .newInstance()); + } catch (IllegalAccessException e) { + } catch (InstantiationException e) { + } catch (ClassNotFoundException e) { + // OK; not yet added to JDK + } + + // Following additional implementations are available from: + // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html + // queues.add(new LinkedTransferQueue<Integer>()); + // queues.add(new SynchronizedLinkedListQueue<Integer>()); + + // Avoid "first fast, second slow" benchmark effect. + Collections.shuffle(queues); + return queues; + } + + void test(String[] args) throws Throwable { + if (args.length > 0) + maxStages = Integer.parseInt(args[0]); + if (args.length > 1) + items = Integer.parseInt(args[1]); + + for (Queue<Integer> queue : concurrentQueues()) + test(queue); + } + + void test(final Queue<Integer> q) throws Throwable { + System.out.println(q.getClass().getSimpleName()); + pool = Executors.newCachedThreadPool(); + print = false; + + print = false; + System.out.println("Warmup..."); + oneRun(1, items, q); + //Thread.sleep(100); + oneRun(3, items, q); + Thread.sleep(100); + print = true; + + for (int i = 1; i <= maxStages; i += (i+1) >>> 1) { + oneRun(i, items, q); + } + pool.shutdown(); + check(pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)); + } + + class Stage implements Callable<Integer> { + final Queue<Integer> queue; + final CyclicBarrier barrier; + int items; + Stage (Queue<Integer> q, CyclicBarrier b, int items) { + queue = q; + barrier = b; + this.items = items; + } + + public Integer call() { + // Repeatedly take something from queue if possible, + // transform it, and put back in. + try { + barrier.await(); + int l = 4321; + int takes = 0; + for (;;) { + Integer item = queue.poll(); + if (item != null) { + ++takes; + l = LoopHelpers.compute2(item.intValue()); + } + else if (takes != 0) { + totalItems.getAndAdd(-takes); + takes = 0; + } + else if (totalItems.get() <= 0) + break; + l = LoopHelpers.compute1(l); + if (items > 0) { + --items; + queue.offer(new Integer(l)); + } + else if ( (l & (3 << 5)) == 0) // spinwait + Thread.sleep(1); + } + return new Integer(l); + } + catch (Throwable t) { unexpected(t); return null; } + } + } + + void oneRun(int n, int items, final Queue<Integer> q) throws Exception { + LoopHelpers.BarrierTimer timer = new LoopHelpers.BarrierTimer(); + CyclicBarrier barrier = new CyclicBarrier(n + 1, timer); + totalItems = new AtomicInteger(n * items); + ArrayList<Future<Integer>> results = new ArrayList<Future<Integer>>(n); + for (int i = 0; i < n; ++i) + results.add(pool.submit(new Stage(q, barrier, items))); + + if (print) + System.out.print("Threads: " + n + "\t:"); + barrier.await(); + int total = 0; + for (int i = 0; i < n; ++i) { + Future<Integer> f = results.get(i); + Integer r = f.get(); + total += r.intValue(); + } + long endTime = System.nanoTime(); + long time = endTime - timer.startTime; + if (print) + System.out.println(LoopHelpers.rightJustify(time / (items * n)) + " ns per item"); + if (total == 0) // avoid overoptimization + System.out.println("useless result: " + total); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new ConcurrentQueueLoops().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/ConcurrentQueues/GCRetention.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,165 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +/* + * @test + * @bug 6785442 + * @summary Benchmark that tries to GC-tenure head, followed by + * many add/remove operations. + * @run main GCRetention 12345 + */ + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.LinkedList; +import java.util.PriorityQueue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.Map; + +public class GCRetention { + // Suitable for benchmarking. Overriden by args[0] for testing. + int count = 1024 * 1024; + + final Map<String,String> results = new ConcurrentHashMap<String,String>(); + + Collection<Queue<Boolean>> queues() { + List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>(); + queues.add(new ConcurrentLinkedQueue<Boolean>()); + queues.add(new ArrayBlockingQueue<Boolean>(count, false)); + queues.add(new ArrayBlockingQueue<Boolean>(count, true)); + queues.add(new LinkedBlockingQueue<Boolean>()); + queues.add(new LinkedBlockingDeque<Boolean>()); + queues.add(new PriorityBlockingQueue<Boolean>()); + queues.add(new PriorityQueue<Boolean>()); + queues.add(new LinkedList<Boolean>()); + + try { + queues.add((Queue<Boolean>) + Class.forName("java.util.concurrent.LinkedTransferQueue") + .newInstance()); + } catch (IllegalAccessException e) { + } catch (InstantiationException e) { + } catch (ClassNotFoundException e) { + // OK; not yet added to JDK + } + + // Following additional implementations are available from: + // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html + // queues.add(new LinkedTransferQueue<Boolean>()); + // queues.add(new SynchronizedLinkedListQueue<Boolean>()); + + // Avoid "first fast, second slow" benchmark effect. + Collections.shuffle(queues); + return queues; + } + + void prettyPrintResults() { + List<String> classNames = new ArrayList<String>(results.keySet()); + Collections.sort(classNames); + int maxClassNameLength = 0; + int maxNanosLength = 0; + for (String name : classNames) { + if (maxClassNameLength < name.length()) + maxClassNameLength = name.length(); + if (maxNanosLength < results.get(name).length()) + maxNanosLength = results.get(name).length(); + } + String format = String.format("%%%ds %%%ds nanos/item%%n", + maxClassNameLength, maxNanosLength); + for (String name : classNames) + System.out.printf(format, name, results.get(name)); + } + + void test(String[] args) { + if (args.length > 0) + count = Integer.valueOf(args[0]); + // Warmup + for (Queue<Boolean> queue : queues()) + test(queue); + results.clear(); + for (Queue<Boolean> queue : queues()) + test(queue); + + prettyPrintResults(); + } + + void test(Queue<Boolean> q) { + long t0 = System.nanoTime(); + for (int i = 0; i < count; i++) + check(q.add(Boolean.TRUE)); + System.gc(); + System.gc(); + Boolean x; + while ((x = q.poll()) != null) + equal(x, Boolean.TRUE); + check(q.isEmpty()); + + for (int i = 0; i < 10 * count; i++) { + for (int k = 0; k < 3; k++) + check(q.add(Boolean.TRUE)); + for (int k = 0; k < 3; k++) + if (q.poll() != Boolean.TRUE) + fail(); + } + check(q.isEmpty()); + + String className = q.getClass().getSimpleName(); + long elapsed = System.nanoTime() - t0; + int nanos = (int) ((double) elapsed / (10 * 3 * count)); + results.put(className, String.valueOf(nanos)); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new GCRetention().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/ConcurrentQueues/IteratorWeakConsistency.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,93 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +import java.util.*; +import java.util.concurrent.*; + +/* + * @test + * @bug 6805775 6815766 + * @summary Check weak consistency of concurrent queue iterators + */ + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class IteratorWeakConsistency { + + void test(String[] args) throws Throwable { + test(new LinkedBlockingQueue()); + test(new LinkedBlockingQueue(20)); + test(new LinkedBlockingDeque()); + test(new LinkedBlockingDeque(20)); + test(new ConcurrentLinkedQueue()); + // Other concurrent queues (e.g. ArrayBlockingQueue) do not + // currently have weakly consistent iterators. + // test(new ArrayBlockingQueue(20)); + } + + void test(Queue q) throws Throwable { + // TODO: make this more general + for (int i = 0; i < 10; i++) + q.add(i); + Iterator it = q.iterator(); + q.poll(); + q.poll(); + q.poll(); + q.remove(7); + List list = new ArrayList(); + while (it.hasNext()) + list.add(it.next()); + equal(list, Arrays.asList(0, 3, 4, 5, 6, 8, 9)); + check(! list.contains(null)); + System.out.printf("%s: %s%n", + q.getClass().getSimpleName(), + list); + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + static Class<?> thisClass = new Object(){}.getClass().getEnclosingClass(); + public static void main(String[] args) throws Throwable { + new IteratorWeakConsistency().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/ConcurrentQueues/LoopHelpers.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,129 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/** + * Misc utilities in JSR166 performance tests + */ + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +class LoopHelpers { + + // Some mindless computation to do between synchronizations... + + /** + * generates 32 bit pseudo-random numbers. + * Adapted from http://www.snippets.org + */ + public static int compute1(int x) { + int lo = 16807 * (x & 0xFFFF); + int hi = 16807 * (x >>> 16); + lo += (hi & 0x7FFF) << 16; + if ((lo & 0x80000000) != 0) { + lo &= 0x7fffffff; + ++lo; + } + lo += hi >>> 15; + if (lo == 0 || (lo & 0x80000000) != 0) { + lo &= 0x7fffffff; + ++lo; + } + return lo; + } + + /** + * Computes a linear congruential random number a random number + * of times. + */ + public static int compute2(int x) { + int loops = (x >>> 4) & 7; + while (loops-- > 0) { + x = (x * 2147483647) % 16807; + } + return x; + } + + /** + * An actually useful random number generator, but unsynchronized. + * Basically same as java.util.Random. + */ + public static class SimpleRandom { + private final static long multiplier = 0x5DEECE66DL; + private final static long addend = 0xBL; + private final static long mask = (1L << 48) - 1; + static final AtomicLong seq = new AtomicLong(1); + private long seed = System.nanoTime() + seq.getAndIncrement(); + + public void setSeed(long s) { + seed = s; + } + + public int next() { + long nextseed = (seed * multiplier + addend) & mask; + seed = nextseed; + return ((int)(nextseed >>> 17)) & 0x7FFFFFFF; + } + } + + public static class BarrierTimer implements Runnable { + public volatile long startTime; + public volatile long endTime; + public void run() { + long t = System.nanoTime(); + if (startTime == 0) + startTime = t; + else + endTime = t; + } + public void clear() { + startTime = 0; + endTime = 0; + } + public long getTime() { + return endTime - startTime; + } + } + + public static String rightJustify(long n) { + // There's probably a better way to do this... + String field = " "; + String num = Long.toString(n); + if (num.length() >= field.length()) + return num; + StringBuffer b = new StringBuffer(field); + b.replace(b.length()-num.length(), b.length(), num); + return b.toString(); + } + +}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/test/java/util/concurrent/ConcurrentQueues/RemovePollRace.java Wed Jul 29 11:19:14 2009 -0700 @@ -0,0 +1,230 @@ +/* + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + */ + +/* + * This file is available under and governed by the GNU General Public + * License version 2 only, as published by the Free Software Foundation. + * However, the following notice accompanied the original version of this + * file: + * + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ + +/* + * @test + * @bug 6785442 + * @summary Checks race between poll and remove(Object), while + * occasionally moonlighting as a microbenchmark. + * @run main RemovePollRace 12345 + */ + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import java.util.Map; + +public class RemovePollRace { + // Suitable for benchmarking. Overriden by args[0] for testing. + int count = 1024 * 1024; + + final Map<String,String> results = new ConcurrentHashMap<String,String>(); + + Collection<Queue<Boolean>> concurrentQueues() { + List<Queue<Boolean>> queues = new ArrayList<Queue<Boolean>>(); + queues.add(new ConcurrentLinkedQueue<Boolean>()); + queues.add(new ArrayBlockingQueue<Boolean>(count, false)); + queues.add(new ArrayBlockingQueue<Boolean>(count, true)); + queues.add(new LinkedBlockingQueue<Boolean>()); + queues.add(new LinkedBlockingDeque<Boolean>()); + + try { + queues.add((Queue<Boolean>) + Class.forName("java.util.concurrent.LinkedTransferQueue") + .newInstance()); + } catch (IllegalAccessException e) { + } catch (InstantiationException e) { + } catch (ClassNotFoundException e) { + // OK; not yet added to JDK + } + + // Following additional implementations are available from: + // http://gee.cs.oswego.edu/dl/concurrency-interest/index.html + // queues.add(new LinkedTransferQueue<Boolean>()); + // queues.add(new SynchronizedLinkedListQueue<Boolean>()); + + // Avoid "first fast, second slow" benchmark effect. + Collections.shuffle(queues); + return queues; + } + + void prettyPrintResults() { + List<String> classNames = new ArrayList<String>(results.keySet()); + Collections.sort(classNames); + int maxClassNameLength = 0; + int maxNanosLength = 0; + for (String name : classNames) { + if (maxClassNameLength < name.length()) + maxClassNameLength = name.length(); + if (maxNanosLength < results.get(name).length()) + maxNanosLength = results.get(name).length(); + } + String format = String.format("%%%ds %%%ds nanos/item%%n", + maxClassNameLength, maxNanosLength); + for (String name : classNames) + System.out.printf(format, name, results.get(name)); + } + + void test(String[] args) throws Throwable { + if (args.length > 0) + count = Integer.valueOf(args[0]); + // Warmup + for (Queue<Boolean> queue : concurrentQueues()) + test(queue); + results.clear(); + for (Queue<Boolean> queue : concurrentQueues()) + test(queue); + + prettyPrintResults(); + } + + void await(CountDownLatch latch) { + try { latch.await(); } + catch (InterruptedException e) { unexpected(e); } + } + + void test(final Queue<Boolean> q) throws Throwable { + long t0 = System.nanoTime(); + final int SPINS = 5; + final AtomicLong removes = new AtomicLong(0); + final AtomicLong polls = new AtomicLong(0); + final int adderCount = + Math.max(1, Runtime.getRuntime().availableProcessors() / 4); + final int removerCount = + Math.max(1, Runtime.getRuntime().availableProcessors() / 4); + final int pollerCount = removerCount; + final int threadCount = adderCount + removerCount + pollerCount; + final CountDownLatch startingGate = new CountDownLatch(1); + final CountDownLatch addersDone = new CountDownLatch(adderCount); + final Runnable remover = new Runnable() { + public void run() { + await(startingGate); + int spins = 0; + for (;;) { + boolean quittingTime = (addersDone.getCount() == 0); + if (q.remove(Boolean.TRUE)) + removes.getAndIncrement(); + else if (quittingTime) + break; + else if (++spins > SPINS) { + Thread.yield(); + spins = 0; + }}}}; + final Runnable poller = new Runnable() { + public void run() { + await(startingGate); + int spins = 0; + for (;;) { + boolean quittingTime = (addersDone.getCount() == 0); + if (q.poll() == Boolean.TRUE) + polls.getAndIncrement(); + else if (quittingTime) + break; + else if (++spins > SPINS) { + Thread.yield(); + spins = 0; + }}}}; + final Runnable adder = new Runnable() { + public void run() { + await(startingGate); + for (int i = 0; i < count; i++) { + for (;;) { + try { q.add(Boolean.TRUE); break; } + catch (IllegalStateException e) { Thread.yield(); } + } + } + addersDone.countDown(); + }}; + + final List<Thread> adders = new ArrayList<Thread>(); + final List<Thread> removers = new ArrayList<Thread>(); + final List<Thread> pollers = new ArrayList<Thread>(); + for (int i = 0; i < adderCount; i++) + adders.add(checkedThread(adder)); + for (int i = 0; i < removerCount; i++) + removers.add(checkedThread(remover)); + for (int i = 0; i < pollerCount; i++) + pollers.add(checkedThread(poller)); + + final List<Thread> allThreads = new ArrayList<Thread>(); + allThreads.addAll(removers); + allThreads.addAll(pollers); + allThreads.addAll(adders); + + for (Thread t : allThreads) + t.start(); + startingGate.countDown(); + for (Thread t : allThreads) + t.join(); + + String className = q.getClass().getSimpleName(); + long elapsed = System.nanoTime() - t0; + int nanos = (int) ((double) elapsed / (adderCount * count)); + results.put(className, String.valueOf(nanos)); + if (removes.get() + polls.get() != adderCount * count) { + String msg = String.format + ("class=%s removes=%s polls=%d count=%d", + className, removes.get(), polls.get(), count); + fail(msg); + } + } + + //--------------------- Infrastructure --------------------------- + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { + if (x == null ? y == null : x.equals(y)) pass(); + else fail(x + " not equal to " + y);} + public static void main(String[] args) throws Throwable { + new RemovePollRace().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} + System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); + if (failed > 0) throw new AssertionError("Some tests failed");} + Thread checkedThread(final Runnable r) { + return new Thread() {public void run() { + try {r.run();} catch (Throwable t) {unexpected(t);}}};} +}
--- a/test/java/util/concurrent/LinkedBlockingQueue/OfferRemoveLoops.java Fri Jul 24 11:22:29 2009 -0700 +++ b/test/java/util/concurrent/LinkedBlockingQueue/OfferRemoveLoops.java Wed Jul 29 11:19:14 2009 -0700 @@ -28,62 +28,74 @@ * @author Martin Buchholz */ +import java.util.*; import java.util.concurrent.*; public class OfferRemoveLoops { - private static void realMain(String[] args) throws Throwable { + void test(String[] args) throws Throwable { testQueue(new LinkedBlockingQueue<String>(10)); testQueue(new LinkedBlockingQueue<String>()); testQueue(new LinkedBlockingDeque<String>(10)); testQueue(new LinkedBlockingDeque<String>()); testQueue(new ArrayBlockingQueue<String>(10)); testQueue(new PriorityBlockingQueue<String>(10)); + testQueue(new ConcurrentLinkedQueue<String>()); } - private abstract static class ControlledThread extends Thread { + abstract class CheckedThread extends Thread { abstract protected void realRun(); public void run() { try { realRun(); } catch (Throwable t) { unexpected(t); } } } - private static void testQueue(final BlockingQueue<String> q) throws Throwable { - System.out.println(q.getClass()); - final int count = 10000; - final long quittingTime = System.nanoTime() + 1L * 1000L * 1000L * 1000L; - Thread t1 = new ControlledThread() { - protected void realRun() { - for (int i = 0, j = 0; i < count; i++) - while (! q.remove(String.valueOf(i)) - && System.nanoTime() - quittingTime < 0) - Thread.yield();}}; - Thread t2 = new ControlledThread() { - protected void realRun() { - for (int i = 0, j = 0; i < count; i++) - while (! q.offer(String.valueOf(i)) - && System.nanoTime() - quittingTime < 0) - Thread.yield();}}; + void testQueue(final Queue<String> q) throws Throwable { + System.out.println(q.getClass().getSimpleName()); + final int count = 1000 * 1000; + final long testDurationSeconds = 1L; + final long testDurationMillis = testDurationSeconds * 1000L; + final long quittingTimeNanos + = System.nanoTime() + testDurationSeconds * 1000L * 1000L * 1000L; + Thread t1 = new CheckedThread() { + protected void realRun() { + for (int i = 0; i < count; i++) { + if ((i % 1024) == 0 && + System.nanoTime() - quittingTimeNanos > 0) + return; + while (! q.remove(String.valueOf(i))) + Thread.yield(); + }}}; + Thread t2 = new CheckedThread() { + protected void realRun() { + for (int i = 0; i < count; i++) { + if ((i % 1024) == 0 && + System.nanoTime() - quittingTimeNanos > 0) + return; + while (! q.offer(String.valueOf(i))) + Thread.yield(); + }}}; t1.setDaemon(true); t2.setDaemon(true); t1.start(); t2.start(); - t1.join(10000); t2.join(10000); + t1.join(10 * testDurationMillis); + t2.join(10 * testDurationMillis); check(! t1.isAlive()); check(! t2.isAlive()); } //--------------------- Infrastructure --------------------------- - static volatile int passed = 0, failed = 0; - static void pass() { passed++; } - static void fail() { failed++; Thread.dumpStack(); } - static void unexpected(Throwable t) { failed++; t.printStackTrace(); } - static void check(boolean cond) { if (cond) pass(); else fail(); } - static void equal(Object x, Object y) { + volatile int passed = 0, failed = 0; + void pass() {passed++;} + void fail() {failed++; Thread.dumpStack();} + void fail(String msg) {System.err.println(msg); fail();} + void unexpected(Throwable t) {failed++; t.printStackTrace();} + void check(boolean cond) {if (cond) pass(); else fail();} + void equal(Object x, Object y) { if (x == null ? y == null : x.equals(y)) pass(); - else {System.out.println(x + " not equal to " + y); fail(); }} - + else fail(x + " not equal to " + y);} public static void main(String[] args) throws Throwable { - try { realMain(args); } catch (Throwable t) { unexpected(t); } - + new OfferRemoveLoops().instanceMain(args);} + public void instanceMain(String[] args) throws Throwable { + try {test(args);} catch (Throwable t) {unexpected(t);} System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); - if (failed > 0) throw new Exception("Some tests failed"); - } + if (failed > 0) throw new AssertionError("Some tests failed");} }