diff options
Diffstat (limited to 'src/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java')
-rw-r--r-- | src/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java | 698 |
1 files changed, 698 insertions, 0 deletions
diff --git a/src/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java b/src/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java new file mode 100644 index 0000000..cf59129 --- /dev/null +++ b/src/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java @@ -0,0 +1,698 @@ +/* + * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $ + * $Revision: 677240 $ + * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $ + * + * ==================================================================== + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * <http://www.apache.org/>. + * + */ + +package org.apache.http.impl.conn.tsccm; + +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Queue; +import java.util.LinkedList; +import java.util.Map; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.http.conn.routing.HttpRoute; +import org.apache.http.conn.ClientConnectionOperator; +import org.apache.http.conn.ConnectionPoolTimeoutException; +import org.apache.http.conn.params.ConnPerRoute; +import org.apache.http.conn.params.ConnManagerParams; +import org.apache.http.params.HttpParams; + + +/** + * A connection pool that maintains connections by route. + * This class is derived from <code>MultiThreadedHttpConnectionManager</code> + * in HttpClient 3.x, see there for original authors. It implements the same + * algorithm for connection re-use and connection-per-host enforcement: + * <ul> + * <li>connections are re-used only for the exact same route</li> + * <li>connection limits are enforced per route rather than per host</li> + * </ul> + * Note that access to the pool datastructures is synchronized via the + * {@link AbstractConnPool#poolLock poolLock} in the base class, + * not via <code>synchronized</code> methods. + * + * @author <a href="mailto:rolandw at apache.org">Roland Weber</a> + * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a> + * @author and others + */ +public class ConnPoolByRoute extends AbstractConnPool { + + private final Log log = LogFactory.getLog(getClass()); + + /** Connection operator for this pool */ + protected final ClientConnectionOperator operator; + + /** The list of free connections */ + protected Queue<BasicPoolEntry> freeConnections; + + /** The list of WaitingThreads waiting for a connection */ + protected Queue<WaitingThread> waitingThreads; + + /** + * A map of route-specific pools. + * Keys are of class {@link HttpRoute}, + * values of class {@link RouteSpecificPool}. + */ + protected final Map<HttpRoute, RouteSpecificPool> routeToPool; + + protected final int maxTotalConnections; + + private final ConnPerRoute connPerRoute; + + /** + * Creates a new connection pool, managed by route. + */ + public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { + super(); + if (operator == null) { + throw new IllegalArgumentException("Connection operator may not be null"); + } + this.operator = operator; + + freeConnections = createFreeConnQueue(); + waitingThreads = createWaitingThreadQueue(); + routeToPool = createRouteToPoolMap(); + maxTotalConnections = ConnManagerParams + .getMaxTotalConnections(params); + connPerRoute = ConnManagerParams + .getMaxConnectionsPerRoute(params); + } + + + /** + * Creates the queue for {@link #freeConnections}. + * Called once by the constructor. + * + * @return a queue + */ + protected Queue<BasicPoolEntry> createFreeConnQueue() { + return new LinkedList<BasicPoolEntry>(); + } + + /** + * Creates the queue for {@link #waitingThreads}. + * Called once by the constructor. + * + * @return a queue + */ + protected Queue<WaitingThread> createWaitingThreadQueue() { + return new LinkedList<WaitingThread>(); + } + + /** + * Creates the map for {@link #routeToPool}. + * Called once by the constructor. + * + * @return a map + */ + protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() { + return new HashMap<HttpRoute, RouteSpecificPool>(); + } + + + /** + * Creates a new route-specific pool. + * Called by {@link #getRoutePool} when necessary. + * + * @param route the route + * + * @return the new pool + */ + protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { + return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route)); + } + + + /** + * Creates a new waiting thread. + * Called by {@link #getRoutePool} when necessary. + * + * @param cond the condition to wait for + * @param rospl the route specific pool, or <code>null</code> + * + * @return a waiting thread representation + */ + protected WaitingThread newWaitingThread(Condition cond, + RouteSpecificPool rospl) { + return new WaitingThread(cond, rospl); + } + + + /** + * Get a route-specific pool of available connections. + * + * @param route the route + * @param create whether to create the pool if it doesn't exist + * + * @return the pool for the argument route, + * never <code>null</code> if <code>create</code> is <code>true</code> + */ + protected RouteSpecificPool getRoutePool(HttpRoute route, + boolean create) { + RouteSpecificPool rospl = null; + poolLock.lock(); + try { + + rospl = routeToPool.get(route); + if ((rospl == null) && create) { + // no pool for this route yet (or anymore) + rospl = newRouteSpecificPool(route); + routeToPool.put(route, rospl); + } + + } finally { + poolLock.unlock(); + } + + return rospl; + } + + + //@@@ consider alternatives for gathering statistics + public int getConnectionsInPool(HttpRoute route) { + + poolLock.lock(); + try { + // don't allow a pool to be created here! + RouteSpecificPool rospl = getRoutePool(route, false); + return (rospl != null) ? rospl.getEntryCount() : 0; + + } finally { + poolLock.unlock(); + } + } + + @Override + public PoolEntryRequest requestPoolEntry( + final HttpRoute route, + final Object state) { + + final WaitingThreadAborter aborter = new WaitingThreadAborter(); + + return new PoolEntryRequest() { + + public void abortRequest() { + poolLock.lock(); + try { + aborter.abort(); + } finally { + poolLock.unlock(); + } + } + + public BasicPoolEntry getPoolEntry( + long timeout, + TimeUnit tunit) + throws InterruptedException, ConnectionPoolTimeoutException { + return getEntryBlocking(route, state, timeout, tunit, aborter); + } + + }; + } + + /** + * Obtains a pool entry with a connection within the given timeout. + * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} + * must be called before blocking, to allow the thread to be interrupted. + * + * @param route the route for which to get the connection + * @param timeout the timeout, 0 or negative for no timeout + * @param tunit the unit for the <code>timeout</code>, + * may be <code>null</code> only if there is no timeout + * @param aborter an object which can abort a {@link WaitingThread}. + * + * @return pool entry holding a connection for the route + * + * @throws ConnectionPoolTimeoutException + * if the timeout expired + * @throws InterruptedException + * if the calling thread was interrupted + */ + protected BasicPoolEntry getEntryBlocking( + HttpRoute route, Object state, + long timeout, TimeUnit tunit, + WaitingThreadAborter aborter) + throws ConnectionPoolTimeoutException, InterruptedException { + + Date deadline = null; + if (timeout > 0) { + deadline = new Date + (System.currentTimeMillis() + tunit.toMillis(timeout)); + } + + BasicPoolEntry entry = null; + poolLock.lock(); + try { + + RouteSpecificPool rospl = getRoutePool(route, true); + WaitingThread waitingThread = null; + + while (entry == null) { + + if (isShutDown) { + throw new IllegalStateException + ("Connection pool shut down."); + } + + if (log.isDebugEnabled()) { + log.debug("Total connections kept alive: " + freeConnections.size()); + log.debug("Total issued connections: " + issuedConnections.size()); + log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections); + } + + // the cases to check for: + // - have a free connection for that route + // - allowed to create a free connection for that route + // - can delete and replace a free connection for another route + // - need to wait for one of the things above to come true + + entry = getFreeEntry(rospl, state); + if (entry != null) { + break; + } + + boolean hasCapacity = rospl.getCapacity() > 0; + + if (log.isDebugEnabled()) { + log.debug("Available capacity: " + rospl.getCapacity() + + " out of " + rospl.getMaxEntries() + + " [" + route + "][" + state + "]"); + } + + if (hasCapacity && numConnections < maxTotalConnections) { + + entry = createEntry(rospl, operator); + + } else if (hasCapacity && !freeConnections.isEmpty()) { + + deleteLeastUsedEntry(); + entry = createEntry(rospl, operator); + + } else { + + if (log.isDebugEnabled()) { + log.debug("Need to wait for connection" + + " [" + route + "][" + state + "]"); + } + + if (waitingThread == null) { + waitingThread = + newWaitingThread(poolLock.newCondition(), rospl); + aborter.setWaitingThread(waitingThread); + } + + boolean success = false; + try { + rospl.queueThread(waitingThread); + waitingThreads.add(waitingThread); + success = waitingThread.await(deadline); + + } finally { + // In case of 'success', we were woken up by the + // connection pool and should now have a connection + // waiting for us, or else we're shutting down. + // Just continue in the loop, both cases are checked. + rospl.removeThread(waitingThread); + waitingThreads.remove(waitingThread); + } + + // check for spurious wakeup vs. timeout + if (!success && (deadline != null) && + (deadline.getTime() <= System.currentTimeMillis())) { + throw new ConnectionPoolTimeoutException + ("Timeout waiting for connection"); + } + } + } // while no entry + + } finally { + poolLock.unlock(); + } + + return entry; + + } // getEntry + + + // non-javadoc, see base class AbstractConnPool + @Override + public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { + + HttpRoute route = entry.getPlannedRoute(); + if (log.isDebugEnabled()) { + log.debug("Freeing connection" + + " [" + route + "][" + entry.getState() + "]"); + } + + poolLock.lock(); + try { + if (isShutDown) { + // the pool is shut down, release the + // connection's resources and get out of here + closeConnection(entry.getConnection()); + return; + } + + // no longer issued, we keep a hard reference now + issuedConnections.remove(entry.getWeakRef()); + + RouteSpecificPool rospl = getRoutePool(route, true); + + if (reusable) { + rospl.freeEntry(entry); + freeConnections.add(entry); + idleConnHandler.add(entry.getConnection(), validDuration, timeUnit); + } else { + rospl.dropEntry(); + numConnections--; + } + + notifyWaitingThread(rospl); + + } finally { + poolLock.unlock(); + } + + } // freeEntry + + + + /** + * If available, get a free pool entry for a route. + * + * @param rospl the route-specific pool from which to get an entry + * + * @return an available pool entry for the given route, or + * <code>null</code> if none is available + */ + protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { + + BasicPoolEntry entry = null; + poolLock.lock(); + try { + boolean done = false; + while(!done) { + + entry = rospl.allocEntry(state); + + if (entry != null) { + if (log.isDebugEnabled()) { + log.debug("Getting free connection" + + " [" + rospl.getRoute() + "][" + state + "]"); + + } + freeConnections.remove(entry); + boolean valid = idleConnHandler.remove(entry.getConnection()); + if(!valid) { + // If the free entry isn't valid anymore, get rid of it + // and loop to find another one that might be valid. + if(log.isDebugEnabled()) + log.debug("Closing expired free connection" + + " [" + rospl.getRoute() + "][" + state + "]"); + closeConnection(entry.getConnection()); + // We use dropEntry instead of deleteEntry because the entry + // is no longer "free" (we just allocated it), and deleteEntry + // can only be used to delete free entries. + rospl.dropEntry(); + numConnections--; + } else { + issuedConnections.add(entry.getWeakRef()); + done = true; + } + + } else { + done = true; + if (log.isDebugEnabled()) { + log.debug("No free connections" + + " [" + rospl.getRoute() + "][" + state + "]"); + } + } + } + } finally { + poolLock.unlock(); + } + + return entry; + } + + + /** + * Creates a new pool entry. + * This method assumes that the new connection will be handed + * out immediately. + * + * @param rospl the route-specific pool for which to create the entry + * @param op the operator for creating a connection + * + * @return the new pool entry for a new connection + */ + protected BasicPoolEntry createEntry(RouteSpecificPool rospl, + ClientConnectionOperator op) { + + if (log.isDebugEnabled()) { + log.debug("Creating new connection [" + rospl.getRoute() + "]"); + } + + // the entry will create the connection when needed + BasicPoolEntry entry = + new BasicPoolEntry(op, rospl.getRoute(), refQueue); + + poolLock.lock(); + try { + + rospl.createdEntry(entry); + numConnections++; + + issuedConnections.add(entry.getWeakRef()); + + } finally { + poolLock.unlock(); + } + + return entry; + } + + + /** + * Deletes a given pool entry. + * This closes the pooled connection and removes all references, + * so that it can be GCed. + * + * <p><b>Note:</b> Does not remove the entry from the freeConnections list. + * It is assumed that the caller has already handled this step.</p> + * <!-- @@@ is that a good idea? or rather fix it? --> + * + * @param entry the pool entry for the connection to delete + */ + protected void deleteEntry(BasicPoolEntry entry) { + + HttpRoute route = entry.getPlannedRoute(); + + if (log.isDebugEnabled()) { + log.debug("Deleting connection" + + " [" + route + "][" + entry.getState() + "]"); + } + + poolLock.lock(); + try { + + closeConnection(entry.getConnection()); + + RouteSpecificPool rospl = getRoutePool(route, true); + rospl.deleteEntry(entry); + numConnections--; + if (rospl.isUnused()) { + routeToPool.remove(route); + } + + idleConnHandler.remove(entry.getConnection());// not idle, but dead + + } finally { + poolLock.unlock(); + } + } + + + /** + * Delete an old, free pool entry to make room for a new one. + * Used to replace pool entries with ones for a different route. + */ + protected void deleteLeastUsedEntry() { + + try { + poolLock.lock(); + + //@@@ with get() instead of remove, we could + //@@@ leave the removing to deleteEntry() + BasicPoolEntry entry = freeConnections.remove(); + + if (entry != null) { + deleteEntry(entry); + } else if (log.isDebugEnabled()) { + log.debug("No free connection to delete."); + } + + } finally { + poolLock.unlock(); + } + } + + + // non-javadoc, see base class AbstractConnPool + @Override + protected void handleLostEntry(HttpRoute route) { + + poolLock.lock(); + try { + + RouteSpecificPool rospl = getRoutePool(route, true); + rospl.dropEntry(); + if (rospl.isUnused()) { + routeToPool.remove(route); + } + + numConnections--; + notifyWaitingThread(rospl); + + } finally { + poolLock.unlock(); + } + } + + + /** + * Notifies a waiting thread that a connection is available. + * This will wake a thread waiting in the specific route pool, + * if there is one. + * Otherwise, a thread in the connection pool will be notified. + * + * @param rospl the pool in which to notify, or <code>null</code> + */ + protected void notifyWaitingThread(RouteSpecificPool rospl) { + + //@@@ while this strategy provides for best connection re-use, + //@@@ is it fair? only do this if the connection is open? + // Find the thread we are going to notify. We want to ensure that + // each waiting thread is only interrupted once, so we will remove + // it from all wait queues before interrupting. + WaitingThread waitingThread = null; + + poolLock.lock(); + try { + + if ((rospl != null) && rospl.hasThread()) { + if (log.isDebugEnabled()) { + log.debug("Notifying thread waiting on pool" + + " [" + rospl.getRoute() + "]"); + } + waitingThread = rospl.nextThread(); + } else if (!waitingThreads.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("Notifying thread waiting on any pool"); + } + waitingThread = waitingThreads.remove(); + } else if (log.isDebugEnabled()) { + log.debug("Notifying no-one, there are no waiting threads"); + } + + if (waitingThread != null) { + waitingThread.wakeup(); + } + + } finally { + poolLock.unlock(); + } + } + + + //@@@ revise this cleanup stuff + //@@@ move method to base class when deleteEntry() is fixed + // non-javadoc, see base class AbstractConnPool + @Override + public void deleteClosedConnections() { + + poolLock.lock(); + try { + + Iterator<BasicPoolEntry> iter = freeConnections.iterator(); + while (iter.hasNext()) { + BasicPoolEntry entry = iter.next(); + if (!entry.getConnection().isOpen()) { + iter.remove(); + deleteEntry(entry); + } + } + + } finally { + poolLock.unlock(); + } + } + + + // non-javadoc, see base class AbstractConnPool + @Override + public void shutdown() { + + poolLock.lock(); + try { + + super.shutdown(); + + // close all free connections + //@@@ move this to base class? + Iterator<BasicPoolEntry> ibpe = freeConnections.iterator(); + while (ibpe.hasNext()) { + BasicPoolEntry entry = ibpe.next(); + ibpe.remove(); + closeConnection(entry.getConnection()); + } + + // wake up all waiting threads + Iterator<WaitingThread> iwth = waitingThreads.iterator(); + while (iwth.hasNext()) { + WaitingThread waiter = iwth.next(); + iwth.remove(); + waiter.wakeup(); + } + + routeToPool.clear(); + + } finally { + poolLock.unlock(); + } + } + + +} // class ConnPoolByRoute + |