Reply to comment

A Throttling CompletionService

The CompletionService interface defines a service that allows the caller to submit tasks to be completed in the future. A commonly used implementation is the ExecutorCompletionService, which uses an Executor to run the tasks that have been submitted.

In most cases the Executor used will be the ThreadPoolExecutor which creates a pool of threads to use in executing submitted tasks. ThreadPoolExecutors are very expensive things to create. They should not be created on-the-fly because of this. ThreadPoolExecutors are excellent at limiting the total number of threads running a particular type of task and therefore provide a safeguard against thread leakage or other scenarios that may result in running out of threads. CompletionServices are, on the other hand, rather inexpensive to create.

In a recent use case I was presented with I realized that I needed to create a varying number of tasks that needed to run in parallel but I did not want one particularly large request to starve out other requests and make them wait for an available thread from the pool to execute their tasks. That's when I came up with the idea that if I could have a CompletionService that would limit the number of tasks that would simultaneously be submitted to the Executor, I could have one thread pool that is shared by all requests.

At first I thought I would extend the ExecutorCompletionService. As I wrote the code I realized I was just delegating all the behavior to the methods defined in CompletionService. So, I created the "ThrottledCompletionService" as a Decorator which takes another CompletionService. In this way I was able to add this functionality to any CompletionService not just a ExecutorCompletionService

  1 package com.dhptech.utils.concurrent;
  2 
  3 import java.util.LinkedList;
  4 import java.util.Queue;
  5 import java.util.concurrent.Callable;
  6 import java.util.concurrent.CompletionService;
  7 import java.util.concurrent.CountDownLatch;
  8 import java.util.concurrent.ExecutionException;
  9 import java.util.concurrent.Executor;
 10 import java.util.concurrent.ExecutorCompletionService;
 11 import java.util.concurrent.Future;
 12 import java.util.concurrent.TimeUnit;
 13 import java.util.concurrent.TimeoutException;
 14 
 15 import com.dhptech.utils.concurrent.ConcurrentUtils;
 16 
 17 /**
 18  * This CompletionService decorates another CompleationService and throttles the
 19  * number of concurrently submitted tasks that will be submitted and any given time.
 20  *
 21  * This can be used in conjunction with an Executor to complete tasks but keep
 22  * larger batches of tasks from starving out smaller batches.  Each "batch" whould
 23  * get a new ThrottledCompletionService but all of them use the same central
 24  * Executor.
 25  *
 26  * This object is lightweight enough to be created and destroyed for each batch
 27  * of tasks that need to be submitted to a common Executor.
 28  *
 29  * @author danap
 30  */
 31 public class ThrottledCompletionService<V> implements CompletionService<V> {
 32 
 33   /**
 34    * Acts as a base class to implement the Future interface for the Runnable
 35    * and callable variants.
 36    *
 37    * @param <T> the return type of the task.
 38    */
 39   private abstract class ThrottledTask<T> implements Future<T> {
 40     private CountDownLatch submitted = new CountDownLatch(1);
 41     private Future<T> delegateFuture = null;
 42     private boolean cancelled = false;
 43 
 44     /**
 45      * Sets the delegateFuture and counts down the submitted latch.
 46      *
 47      * @param delegateFuture the future from the decorated completion service.
 48      */
 49     protected void setDelegateFuture(Future<T> delegateFuture) {
 50       this.delegateFuture = delegateFuture;
 51       submitted.countDown();
 52     }
 53 
 54     /**
 55      * Should submit the task to the completion service.
 56      */
 57     public abstract void submit();
 58 
 59     /**
 60      * @see Future#cancel(boolean)
 61      */
 62     public boolean cancel(boolean mayInterruptIfRunning) {
 63       if ( submitted.getCount() > 0 || delegateFuture == null ) {
 64         cancelled = true;
 65         return true;
 66       } else {
 67         return delegateFuture.cancel(mayInterruptIfRunning);
 68       }
 69     }
 70 
 71     /**
 72      * @see Future#isCancelled()
 73      */
 74     public boolean isCancelled() {
 75       if ( cancelled ) {
 76         return true;
 77       }
 78       if ( submitted.getCount() == 0 ) {
 79         return delegateFuture.isCancelled();
 80       }
 81       return false;
 82     }
 83 
 84     /**
 85      * @see Future#isDone()
 86      */
 87     public boolean isDone() {
 88       if ( submitted.getCount() == 0 ) {
 89         return delegateFuture.isDone();
 90       }
 91       return false;
 92     }
 93 
 94     /**
 95      * @see Future#get()
 96      */
 97     public T get() throws InterruptedException, ExecutionException {
 98       submitted.await();
 99       return delegateFuture.get();
100     }
101 
102     /**
103      * Get the value from this future.
104      *
105      * @see Future#get(long, java.util.concurrent.TimeUnit)
106      *
107      * NOTE: because this operation will first wait for the task to be submitted and
108      * then wait for the future returned by the decorated completion service, the
109      * timeout is converted to nanoseconds and the time taken to wait for the
110      * task to be submitted is subtracted from it before passing it to the get
111      * method of the future returned by the decorated compleation service.
112      */
113     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
114       // Convert the timeout unit to nanoseconds.
115       long timeoutNanos = ConcurrentUtils.convertTimeUnitToNanos(timeout, unit);
116       // record the time we started waiting.
117       long start = System.nanoTime();
118       submitted.await(timeoutNanos, TimeUnit.NANOSECONDS);
119       // remove the time we waited from the timeout.
120       long elapsed = System.nanoTime() - start;
121       timeoutNanos -= elapsed;
122       // wait the remainder of the timeout for the result.
123       return delegateFuture.get(timeoutNanos,TimeUnit.NANOSECONDS);
124     }
125   }
126 
127   /**
128    * An implementation of ThrottledTask that handles Runnables.
129    */
130   private class ThrottledRunnable extends ThrottledTask<V> implements Runnable {
131     private Runnable delegate;
132     private V value;
133 
134     /**
135      * Contructs a new ThrottledRunnable.
136      *
137      * @param delegate the runnable that we will delegate to.
138      *
139      * @param value the value to return from this task.
140      */
141     public ThrottledRunnable(Runnable delegate, V value) {
142       this.delegate = delegate;
143       this.value = value;
144     }
145 
146     /**
147      * @see ThrottledTask#submit()
148      */
149     public void submit() {
150       setDelegateFuture(mDelegate.submit(this,value));
151     }
152 
153     /**
154      * @see Runnable#run()
155      *
156      * calls {@link ThrottledCompletionService#submitNext()} when the task is done.
157      */
158     public void run() {
159       try {
160         delegate.run();
161       } finally {
162         submitNext();
163       }
164     }
165   }
166 
167   /**
168    * An implementation of ThrottledTask for Callables.
169    */
170   private class ThrottledCallable extends ThrottledTask<V> implements Callable<V> {
171     private Callable<V> delegate;
172 
173     /**
174      * Construct a ThrottledCallable.
175      *
176      * @param delegate the callable we will delegate to.
177      */
178     public ThrottledCallable(Callable<V> delegate) {
179       this.delegate = delegate;
180     }
181 
182     /**
183      * @see Callable#call()
184      *
185      * calls {@link ThrottledCompletionService#submitNext()} when the decorated
186      * task is complete.
187      *
188      * @return the return value of the callable.
189      *
190      * @throws java.lang.Exception an exception when something bad happens.
191      */
192     public V call() throws Exception {
193       try {
194         return delegate.call();
195       } finally {
196         submitNext();
197       }
198     }
199 
200     /**
201      * @see ThrottledTask#submit()
202      */
203     public void submit() {
204       setDelegateFuture(mDelegate.submit(this));
205     }
206   }
207 
208   private int mThrottle = 1;
209   private CompletionService<V> mDelegate;
210 
211   private int mSubmitted = 0;
212   private Queue<ThrottledTask<V>> mTaskQueue = new LinkedList();
213 
214   /**
215    * Creates a new ThrottledCompletionService wrapping the given completion service
216    * as a delegate.
217    *
218    * @param delegate
219    */
220   public ThrottledCompletionService(CompletionService<V> delegate) {
221     this.mDelegate = delegate;
222   }
223 
224   /**
225    * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
226    * that references the passed executor.
227    *
228    * @param executor the executor.
229    */
230   public ThrottledCompletionService(Executor executor) {
231     this.mDelegate = new ExecutorCompletionService<V>(executor);
232   }
233 
234   /**
235    * Creates a new ThrottledCompletionService wrapping the given completion service
236    * as a delegate.
237    *
238    * @param delegate
239    */
240   public ThrottledCompletionService(CompletionService<V> delegate, final int throttle) {
241     this.mDelegate = delegate;
242     this.mThrottle = throttle;
243   }
244 
245   /**
246    * Creates a new ThrottledCompletionServices wrapping a new ExecutorCompletionService
247    * that references the passed executor.
248    *
249    * @param executor the executor.
250    */
251   public ThrottledCompletionService(Executor executor, final int throttle) {
252     this.mDelegate = new ExecutorCompletionService<V>(executor);
253     this.mThrottle = throttle;
254   }
255 
256   /**
257    * Decrements the submitted count, attempts to submit the next task.  Callled
258    * when a submitted task completes.  If there are no tasks to submit, simply
259    * return.
260    */
261   private synchronized void submitNext() {
262     mSubmitted -= 1;
263     if ( mSubmitted < mThrottle ) {
264       while(true) {
265         ThrottledTask<V> task = mTaskQueue.poll();
266         if ( task != null ) {
267           // skip any tasks that were cancelled before we submitted them.
268           if (task.isCancelled()) {
269             continue;
270           }
271           task.submit();
272           mSubmitted += 1;
273         }
274         break;
275       }
276     }
277   }
278 
279   /**
280    * @see CompletionService#submit(java.util.concurrent.Callable)
281    *
282    * @param task the task to submit
283    *
284    * @return a future for
285    */
286   public synchronized Future<V> submit(Callable<V> task) {
287     ThrottledCallable ttask = new ThrottledCallable(task);
288     if ( mSubmitted < mThrottle ) {
289       mSubmitted += 1;
290       return mDelegate.submit(ttask);
291     } else {
292       mTaskQueue.offer(ttask);
293       return ttask;
294     }
295   }
296 
297   /**
298    * @see CompletionService#submit(java.lang.Runnable, java.lang.Object)
299    *
300    * @param task the task to be run.
301    * @param result the result to return from the Future<V> object.
302    * @return a future object that can be used to access the result of the submitted task.
303    */
304   public synchronized Future<V> submit(Runnable task, V result) {
305     ThrottledRunnable ttask = new ThrottledRunnable(task,result);
306     if ( mSubmitted < mThrottle ) {
307       mSubmitted += 1;
308       return mDelegate.submit(ttask,result);
309     } else {
310       mTaskQueue.offer(ttask);
311       return ttask;
312     }
313   }
314 
315   /**
316    * @see CompletionService#take()
317    */
318   public Future<V> take() throws InterruptedException {
319     return mDelegate.take();
320   }
321 
322   /**
323    * @see CompletionService#poll(long, java.util.concurrent.TimeUnit)
324    */
325   public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
326     return mDelegate.poll(timeout, unit);
327   }
328 
329   /**
330    * @see CompletionService#poll()
331    */
332   public Future<V> poll() {
333     return mDelegate.poll();
334   }
335 }
336 
337 

Reply

The content of this field is kept private and will not be shown publicly.
  • Allowed HTML tags: <a> <em> <strong> <cite> <code> <ul> <ol> <li> <dl> <dt> <dd>
  • Lines and paragraphs break automatically.

More information about formatting options

CAPTCHA
This question is for testing whether you are a human visitor and to prevent automated spam submissions.