A Throttling CompletionService
The CompletionService interface defines a service that allows the caller to submit tasks to be completed in the future. A commonly used implementation is the ExecutorCompletionService, which uses an Executor to run the tasks that have been submitted.
In most cases the Executor used will be the ThreadPoolExecutor which creates a pool of threads to use in executing submitted tasks. ThreadPoolExecutors are very expensive things to create. They should not be created on-the-fly because of this. ThreadPoolExecutors are excellent at limiting the total number of threads running a particular type of task and therefore provide a safeguard against thread leakage or other scenarios that may result in running out of threads. CompletionServices are, on the other hand, rather inexpensive to create.
In a recent use case I was presented with I realized that I needed to create a varying number of tasks that needed to run in parallel but I did not want one particularly large request to starve out other requests and make them wait for an available thread from the pool to execute their tasks. That's when I came up with the idea that if I could have a CompletionService that would limit the number of tasks that would simultaneously be submitted to the Executor, I could have one thread pool that is shared by all requests.
At first I thought I would extend the ExecutorCompletionService. As I wrote the code I realized I was just delegating all the behavior to the methods defined in CompletionService. So, I created the "ThrottledCompletionService" as a Decorator which takes another CompletionService. In this way I was able to add this functionality to any CompletionService not just a ExecutorCompletionService
1 package com.dhptech.utils.concurrent; 2 3 import java.util.LinkedList; 4 import java.util.Queue; 5 import java.util.concurrent.Callable; 6 import java.util.concurrent.CompletionService; 7 import java.util.concurrent.CountDownLatch; 8 import java.util.concurrent.ExecutionException; 9 import java.util.concurrent.Executor; 10 import java.util.concurrent.ExecutorCompletionService; 11 import java.util.concurrent.Future; 12 import java.util.concurrent.TimeUnit; 13 import java.util.concurrent.TimeoutException; 14 15 import com.dhptech.utils.concurrent.ConcurrentUtils; 16 17 /** 18 * This CompletionService decorates another CompleationService and throttles the 19 * number of concurrently submitted tasks that will be submitted and any given time. 20 * 21 * This can be used in conjunction with an Executor to complete tasks but keep 22 * larger batches of tasks from starving out smaller batches. Each "batch" whould 23 * get a new ThrottledCompletionService but all of them use the same central 24 * Executor. 25 * 26 * This object is lightweight enough to be created and destroyed for each batch 27 * of tasks that need to be submitted to a common Executor. 28 * 29 * @author danap 30 */ 31 public class ThrottledCompletionService<V> implements CompletionService<V> { 32 33 /** 34 * Acts as a base class to implement the Future interface for the Runnable 35 * and callable variants. 36 * 37 * @param <T> the return type of the task. 38 */ 39 private abstract class ThrottledTask<T> implements Future<T> { 40 private CountDownLatch submitted = new CountDownLatch(1); 41 private Future<T> delegateFuture = null; 42 private boolean cancelled = false; 43 44 /** 45 * Sets the delegateFuture and counts down the submitted latch. 46 * 47 * @param delegateFuture the future from the decorated completion service. 48 */ 49 protected void setDelegateFuture(Future<T> delegateFuture) { 50 this.delegateFuture = delegateFuture; 51 submitted.countDown(); 52 } 53 54 /** 55 * Should submit the task to the completion service. 56 */ 57 public abstract void submit(); 58 59 /** 60 * @see Future#cancel(boolean) 61 */ 62 public boolean cancel(boolean mayInterruptIfRunning) { 63 if ( submitted.getCount() > 0 || delegateFuture == null ) { 64 cancelled = true; 65 return true; 66 } else { 67 return delegateFuture.cancel(mayInterruptIfRunning); 68 } 69 } 70 71 /** 72 * @see Future#isCancelled() 73 */ 74 public boolean isCancelled() { 75 if ( cancelled ) { 76 return true; 77 } 78 if ( submitted.getCount() == 0 ) { 79 return delegateFuture.isCancelled(); 80 } 81 return false; 82 } 83 84 /** 85 * @see Future#isDone() 86 */ 87 public boolean isDone() { 88 if ( submitted.getCount() == 0 ) { 89 return delegateFuture.isDone(); 90 } 91 return false; 92 } 93 94 /** 95 * @see Future#get() 96 */ 97 public T get() throws InterruptedException, ExecutionException { 98 submitted.await(); 99 return delegateFuture.get(); 100 } 101 102 /** 103 * Get the value from this future. 104 * 105 * @see Future#get(long, java.util.concurrent.TimeUnit) 106 * 107 * NOTE: because this operation will first wait for the task to be submitted and 108 * then wait for the future returned by the decorated completion service, the 109 * timeout is converted to nanoseconds and the time taken to wait for the 110 * task to be submitted is subtracted from it before passing it to the get 111 * method of the future returned by the decorated compleation service. 112 */ 113 public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { 114 // Convert the timeout unit to nanoseconds. 115 long timeoutNanos = ConcurrentUtils.convertTimeUnitToNanos(timeout, unit); 116 // record the time we started waiting. 117 long start = System.nanoTime(); 118 submitted.await(timeoutNanos, TimeUnit.NANOSECONDS); 119 // remove the time we waited from the timeout. 120 long elapsed = System.nanoTime() - start; 121 timeoutNanos -= elapsed; 122 // wait the remainder of the timeout for the result. 123 return delegateFuture.get(timeoutNanos,TimeUnit.NANOSECONDS); 124 } 125 } 126 127 /** 128 * An implementation of ThrottledTask that handles Runnables. 129 */ 130 private class ThrottledRunnable extends ThrottledTask<V> implements Runnable { 131 private Runnable delegate; 132 private V value; 133 134 /** 135 * Contructs a new ThrottledRunnable. 136 * 137 * @param delegate the runnable that we will delegate to. 138 * 139 * @param value the value to return from this task. 140 */ 141 public ThrottledRunnable(Runnable delegate, V value) { 142 this.delegate = delegate; 143 this.value = value; 144 } 145 146 /** 147 * @see ThrottledTask#submit() 148 */ 149 public void submit() { 150 setDelegateFuture(mDelegate.submit(this,value)); 151 } 152 153 /** 154 * @see Runnable#run() 155 * 156 * calls {@link ThrottledCompletionService#submitNext()} when the task is done. 157 */ 158 public void run() { 159 try { 160 delegate.run(); 161 } finally { 162 submitNext(); 163 } 164 } 165 } 166 167 /** 168 * An implementation of ThrottledTask for Callables. 169 */ 170 private class ThrottledCallable extends ThrottledTask<V> implements Callable<V> { 171 private Callable<V> delegate; 172 173 /** 174 * Construct a ThrottledCallable. 175 * 176 * @param delegate the callable we will delegate to. 177 */ 178 public ThrottledCallable(Callable<V> delegate) { 179 this.delegate = delegate; 180 } 181 182 /** 183 * @see Callable#call() 184 * 185 * calls {@link ThrottledCompletionService#submitNext()} when the decorated 186 * task is complete. 187 * 188 * @return the return value of the callable. 189 * 190 * @throws java.lang.Exception an exception when something bad happens. 191 */ 192 public V call() throws Exception { 193 try { 194 return delegate.call(); 195 } finally { 196 submitNext(); 197 } 198 } 199 200 /** 201 * @see ThrottledTask#submit() 202 */ 203 public void submit() { 204 setDelegateFuture(mDelegate.submit(this)); 205 } 206 } 207 208 private int mThrottle = 1; 209 private CompletionService<V> mDelegate; 210 211 private int mSubmitted = 0; 212 private Queue<ThrottledTask<V>> mTaskQueue = new LinkedList(); 213 214 /** 215 * Creates a new ThrottledCompletionService wrapping the given completion service 216 * as a delegate. 217 * 218 * @param delegate 219 */ 220 public ThrottledCompletionService(CompletionService<V> delegate) { 221 this.mDelegate = delegate; 222 } 223 224 /** 225 * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService 226 * that references the passed executor. 227 * 228 * @param executor the executor. 229 */ 230 public ThrottledCompletionService(Executor executor) { 231 this.mDelegate = new ExecutorCompletionService<V>(executor); 232 } 233 234 /** 235 * Creates a new ThrottledCompletionService wrapping the given completion service 236 * as a delegate. 237 * 238 * @param delegate 239 */ 240 public ThrottledCompletionService(CompletionService<V> delegate, final int throttle) { 241 this.mDelegate = delegate; 242 this.mThrottle = throttle; 243 } 244 245 /** 246 * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService 247 * that references the passed executor. 248 * 249 * @param executor the executor. 250 */ 251 public ThrottledCompletionService(Executor executor, final int throttle) { 252 this.mDelegate = new ExecutorCompletionService<V>(executor); 253 this.mThrottle = throttle; 254 } 255 256 /** 257 * Decrements the submitted count, attempts to submit the next task. Callled 258 * when a submitted task completes. If there are no tasks to submit, simply 259 * return. 260 */ 261 private synchronized void submitNext() { 262 mSubmitted -= 1; 263 if ( mSubmitted < mThrottle ) { 264 while(true) { 265 ThrottledTask<V> task = mTaskQueue.poll(); 266 if ( task != null ) { 267 // skip any tasks that were cancelled before we submitted them. 268 if (task.isCancelled()) { 269 continue; 270 } 271 task.submit(); 272 mSubmitted += 1; 273 } 274 break; 275 } 276 } 277 } 278 279 /** 280 * @see CompletionService#submit(java.util.concurrent.Callable) 281 * 282 * @param task the task to submit 283 * 284 * @return a future for 285 */ 286 public synchronized Future<V> submit(Callable<V> task) { 287 ThrottledCallable ttask = new ThrottledCallable(task); 288 if ( mSubmitted < mThrottle ) { 289 mSubmitted += 1; 290 return mDelegate.submit(ttask); 291 } else { 292 mTaskQueue.offer(ttask); 293 return ttask; 294 } 295 } 296 297 /** 298 * @see CompletionService#submit(java.lang.Runnable, java.lang.Object) 299 * 300 * @param task the task to be run. 301 * @param result the result to return from the Future<V> object. 302 * @return a future object that can be used to access the result of the submitted task. 303 */ 304 public synchronized Future<V> submit(Runnable task, V result) { 305 ThrottledRunnable ttask = new ThrottledRunnable(task,result); 306 if ( mSubmitted < mThrottle ) { 307 mSubmitted += 1; 308 return mDelegate.submit(ttask,result); 309 } else { 310 mTaskQueue.offer(ttask); 311 return ttask; 312 } 313 } 314 315 /** 316 * @see CompletionService#take() 317 */ 318 public Future<V> take() throws InterruptedException { 319 return mDelegate.take(); 320 } 321 322 /** 323 * @see CompletionService#poll(long, java.util.concurrent.TimeUnit) 324 */ 325 public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException { 326 return mDelegate.poll(timeout, unit); 327 } 328 329 /** 330 * @see CompletionService#poll() 331 */ 332 public Future<V> poll() { 333 return mDelegate.poll(); 334 } 335 } 336 337
- danapsimer's blog
- 1281 reads


Post new comment