wrote:
> wrote:
> > I thought I could use a ThreadPoolExecutor for a producer/consumer
> > relationship. I wanted to have a fixed queue size for the pool, which
> > blocked on the producer side if the queue was full, until a slot in the
> > queue was open. I can see that a RejectedExecutionHandler is called
> > when the queue is full and there are some pre-existing handlers, to
> > drop the Runnable or to run the Runnable in the current thread, but no
> > support for waiting until a slot is empty. I thought that running the
> > Runnable in the current thread is pretty close, but if multiple slots
> > open up, while the current thread is busy with a Runnable, it can't
> > give more tasks to waiting threads.
> >
> > So I wrote this simple class to block until a slot is empty. Does this
> > seem reasonable? Does something like this already exist in the JDK that
> > I missed?
> >
> >
> >
> > import java.util.concurrent.ArrayBlockingQueue;
> > import java.util.concurrent.RejectedExecutionException;
> > import java.util.concurrent.RejectedExecutionHandler;
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> >
> >
> > public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
> >
> > private static class BlockingQueuePut implements
> > RejectedExecutionHandler {
> > public void rejectedExecution(Runnable r, ThreadPoolExecutor
> > executor) {
> > try {
> > executor.getQueue().put(r);
> > } catch (InterruptedException ie) {
> > throw new RejectedExecutionException(ie);
> > }
> > }
> > }
> >
> > public BlockingThreadPoolExecutor(int coreThreadSize, int
> > maxThreadSize, int queueSize) {
> > super(
> > coreThreadSize,
> > maxThreadSize,
> > 5,
> > TimeUnit.SECONDS,
> > new ArrayBlockingQueue<Runnable>(queueSize),
> > new BlockingQueuePut());
> > }
> >
> > }
>
> Whats wrong with this?:
>
> BlockingQueue<Runnable> fixedSizeQueue = new
> ArrayBlockingQueue<Runnable>(size);
> Executor executor = new ThreadPoolExecutor(........., fixedSizeQueue);
>
> Just add tasks to the fixedSizeQueue, which will block if the queue
> overflows?
>
> Seems much simpler to me. Would doesn't this solve your problem?
No, by default ThreadPoolExecutor does not block when the queue is
full. It throws a RejectedExecutionException.
If you run the code below you will see that happen.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestExecutorService {
public static void runTest(ExecutorService executor, final long
sleepTime, int itemsToRun)
throws InterruptedException
{
System.err.println("Starting test.");
for (int i=0; i<itemsToRun; i++) {
final int id = i+1;
System.err.println("enqueing item " + id + ".");
executor.execute(new Runnable() {
public void run() {
System.err.println("Running " + id);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {}
System.err.println("Finished " + id);
}
});
}
System.err.println("Waiting for shutdown.");
executor.shutdown();
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
; // do nothing
}
}
public static void main(String[] args) {
try {
ExecutorService executor = new ThreadPoolExecutor(1, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
//ExecutorService executor = new BlockingThreadPoolExecutor(1, 10,
5);
runTest(executor, 1000, 50);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}