Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Java > ThreadPoolExecutor with blocking execute?

Reply
Thread Tools

ThreadPoolExecutor with blocking execute?

 
 
castillo.bryan@gmail.com
Guest
Posts: n/a
 
      12-19-2006

I thought I could use a ThreadPoolExecutor for a producer/consumer
relationship. I wanted to have a fixed queue size for the pool, which
blocked on the producer side if the queue was full, until a slot in the
queue was open. I can see that a RejectedExecutionHandler is called
when the queue is full and there are some pre-existing handlers, to
drop the Runnable or to run the Runnable in the current thread, but no
support for waiting until a slot is empty. I thought that running the
Runnable in the current thread is pretty close, but if multiple slots
open up, while the current thread is busy with a Runnable, it can't
give more tasks to waiting threads.

So I wrote this simple class to block until a slot is empty. Does this
seem reasonable? Does something like this already exist in the JDK that
I missed?



import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

private static class BlockingQueuePut implements
RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor
executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException ie) {
throw new RejectedExecutionException(ie);
}
}
}

public BlockingThreadPoolExecutor(int coreThreadSize, int
maxThreadSize, int queueSize) {
super(
coreThreadSize,
maxThreadSize,
5,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize),
new BlockingQueuePut());
}

}

 
Reply With Quote
 
 
 
 
wesley.hall@gmail.com
Guest
Posts: n/a
 
      12-19-2006

http://www.velocityreviews.com/forums/(E-Mail Removed) wrote:
> I thought I could use a ThreadPoolExecutor for a producer/consumer
> relationship. I wanted to have a fixed queue size for the pool, which
> blocked on the producer side if the queue was full, until a slot in the
> queue was open. I can see that a RejectedExecutionHandler is called
> when the queue is full and there are some pre-existing handlers, to
> drop the Runnable or to run the Runnable in the current thread, but no
> support for waiting until a slot is empty. I thought that running the
> Runnable in the current thread is pretty close, but if multiple slots
> open up, while the current thread is busy with a Runnable, it can't
> give more tasks to waiting threads.
>
> So I wrote this simple class to block until a slot is empty. Does this
> seem reasonable? Does something like this already exist in the JDK that
> I missed?
>
>
>
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.RejectedExecutionException;
> import java.util.concurrent.RejectedExecutionHandler;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
>
>
> public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
>
> private static class BlockingQueuePut implements
> RejectedExecutionHandler {
> public void rejectedExecution(Runnable r, ThreadPoolExecutor
> executor) {
> try {
> executor.getQueue().put(r);
> } catch (InterruptedException ie) {
> throw new RejectedExecutionException(ie);
> }
> }
> }
>
> public BlockingThreadPoolExecutor(int coreThreadSize, int
> maxThreadSize, int queueSize) {
> super(
> coreThreadSize,
> maxThreadSize,
> 5,
> TimeUnit.SECONDS,
> new ArrayBlockingQueue<Runnable>(queueSize),
> new BlockingQueuePut());
> }
>
> }


Whats wrong with this?:

BlockingQueue<Runnable> fixedSizeQueue = new
ArrayBlockingQueue<Runnable>(size);
Executor executor = new ThreadPoolExecutor(........., fixedSizeQueue);

Just add tasks to the fixedSizeQueue, which will block if the queue
overflows?

Seems much simpler to me. Would doesn't this solve your problem?

 
Reply With Quote
 
 
 
 
castillo.bryan@gmail.com
Guest
Posts: n/a
 
      12-19-2006

(E-Mail Removed) wrote:
> (E-Mail Removed) wrote:
> > I thought I could use a ThreadPoolExecutor for a producer/consumer
> > relationship. I wanted to have a fixed queue size for the pool, which
> > blocked on the producer side if the queue was full, until a slot in the
> > queue was open. I can see that a RejectedExecutionHandler is called
> > when the queue is full and there are some pre-existing handlers, to
> > drop the Runnable or to run the Runnable in the current thread, but no
> > support for waiting until a slot is empty. I thought that running the
> > Runnable in the current thread is pretty close, but if multiple slots
> > open up, while the current thread is busy with a Runnable, it can't
> > give more tasks to waiting threads.
> >
> > So I wrote this simple class to block until a slot is empty. Does this
> > seem reasonable? Does something like this already exist in the JDK that
> > I missed?
> >
> >
> >
> > import java.util.concurrent.ArrayBlockingQueue;
> > import java.util.concurrent.RejectedExecutionException;
> > import java.util.concurrent.RejectedExecutionHandler;
> > import java.util.concurrent.ThreadPoolExecutor;
> > import java.util.concurrent.TimeUnit;
> >
> >
> > public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
> >
> > private static class BlockingQueuePut implements
> > RejectedExecutionHandler {
> > public void rejectedExecution(Runnable r, ThreadPoolExecutor
> > executor) {
> > try {
> > executor.getQueue().put(r);
> > } catch (InterruptedException ie) {
> > throw new RejectedExecutionException(ie);
> > }
> > }
> > }
> >
> > public BlockingThreadPoolExecutor(int coreThreadSize, int
> > maxThreadSize, int queueSize) {
> > super(
> > coreThreadSize,
> > maxThreadSize,
> > 5,
> > TimeUnit.SECONDS,
> > new ArrayBlockingQueue<Runnable>(queueSize),
> > new BlockingQueuePut());
> > }
> >
> > }

>
> Whats wrong with this?:
>
> BlockingQueue<Runnable> fixedSizeQueue = new
> ArrayBlockingQueue<Runnable>(size);
> Executor executor = new ThreadPoolExecutor(........., fixedSizeQueue);
>
> Just add tasks to the fixedSizeQueue, which will block if the queue
> overflows?
>
> Seems much simpler to me. Would doesn't this solve your problem?


No, by default ThreadPoolExecutor does not block when the queue is
full. It throws a RejectedExecutionException.

If you run the code below you will see that happen.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class TestExecutorService {

public static void runTest(ExecutorService executor, final long
sleepTime, int itemsToRun)
throws InterruptedException
{
System.err.println("Starting test.");
for (int i=0; i<itemsToRun; i++) {
final int id = i+1;
System.err.println("enqueing item " + id + ".");
executor.execute(new Runnable() {
public void run() {
System.err.println("Running " + id);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {}
System.err.println("Finished " + id);
}
});
}
System.err.println("Waiting for shutdown.");
executor.shutdown();
while (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
; // do nothing
}
}

public static void main(String[] args) {
try {
ExecutorService executor = new ThreadPoolExecutor(1, 10, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
//ExecutorService executor = new BlockingThreadPoolExecutor(1, 10,
5);
runTest(executor, 1000, 50);
}
catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}

}

 
Reply With Quote
 
wesley.hall@gmail.com
Guest
Posts: n/a
 
      12-19-2006

> No, by default ThreadPoolExecutor does not block when the queue is
> full. It throws a RejectedExecutionException.
>
> If you run the code below you will see that happen.


<snip code>

I see what you mean. What you need to do is use a second queue to
manage your flow control. I wrote a quick example...

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPoolExecutorBlockTest
{
public static void main(String[] args)
{
final BlockingQueue<Runnable> queue = new
ArrayBlockingQueue<Runnable>(20, true);
final Executor executor = new ThreadPoolExecutor(10, 10, 1000,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

new Thread(new Runnable()
{
public void run()
{
while(true)
{
try
{
executor.execute(queue.take());
}
catch (InterruptedException e)
{
//Ignore and repeat loop
}
}
}
}).start();

for(int i = 0; i < 30; i++)
{
try
{
queue.put(new Printer(i));
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}

private static class Printer implements Runnable
{
private int number;

public Printer(int number)
{
this.number = number;
}

public void run()
{
System.out.println("Running task: " + number);
try
{
Thread.sleep(10000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}

 
Reply With Quote
 
castillo.bryan@gmail.com
Guest
Posts: n/a
 
      12-19-2006

(E-Mail Removed) wrote:
> > No, by default ThreadPoolExecutor does not block when the queue is
> > full. It throws a RejectedExecutionException.
> >
> > If you run the code below you will see that happen.

>
> <snip code>
>
> I see what you mean. What you need to do is use a second queue to
> manage your flow control. I wrote a quick example...
>
> import java.util.concurrent.BlockingQueue;
> import java.util.concurrent.ArrayBlockingQueue;
> import java.util.concurrent.Executor;
> import java.util.concurrent.ThreadPoolExecutor;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.LinkedBlockingQueue;
>
> public class ThreadPoolExecutorBlockTest
> {
> public static void main(String[] args)
> {
> final BlockingQueue<Runnable> queue = new
> ArrayBlockingQueue<Runnable>(20, true);
> final Executor executor = new ThreadPoolExecutor(10, 10, 1000,
> TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
>
> new Thread(new Runnable()
> {
> public void run()
> {
> while(true)
> {
> try
> {
> executor.execute(queue.take());
> }
> catch (InterruptedException e)
> {
> //Ignore and repeat loop
> }
> }
> }
> }).start();
>
> for(int i = 0; i < 30; i++)
> {
> try
> {
> queue.put(new Printer(i));
> }
> catch (InterruptedException e)
> {
> e.printStackTrace();
> }
> }
> }
>
> private static class Printer implements Runnable
> {
> private int number;
>
> public Printer(int number)
> {
> this.number = number;
> }
>
> public void run()
> {
> System.out.println("Running task: " + number);
> try
> {
> Thread.sleep(10000);
> }
> catch (InterruptedException e)
> {
> e.printStackTrace();
> }
> }
> }
> }




Yeah, but that code will basically be in a busy wait loop. It will
constantly have an exception thrown, recaught and retried. By using a
blocking put on the queue (my first post), the thread can yield itself
until it can actually do something. I think your example would eat the
CPU and is more complex than the first example I had.

 
Reply With Quote
 
Daniel Dyer
Guest
Posts: n/a
 
      12-19-2006
On Tue, 19 Dec 2006 18:22:09 -0000, <(E-Mail Removed)> wrote:

> Yeah, but that code will basically be in a busy wait loop. It will
> constantly have an exception thrown, recaught and retried. By using a
> blocking put on the queue (my first post), the thread can yield itself
> until it can actually do something. I think your example would eat the
> CPU and is more complex than the first example I had.


You probably need to write your own BlockingQueue implementation to get
the behaviour you want. SynchronousQueue would be a good place to start,
it kind of does what you want, except it returns immediately from its
offer() method instead of blocking, and thus leads to the task being
rejected. The overloaded offer method that takes a timeout is closer
still. With a long enough timeout, it would effectively block.
Unfortunately, the ThreadPoolExecutor class does not seem to make use of
this offer method. However, it would be trivial to write a wrapper for
SynchronousQueue that implements offer(E) by delegating to the other offer
method with a suitably long timeout.

Dan.

--
Daniel Dyer
http://www.uncommons.org
 
Reply With Quote
 
castillo.bryan@gmail.com
Guest
Posts: n/a
 
      12-19-2006

Daniel Dyer wrote:
> On Tue, 19 Dec 2006 18:22:09 -0000, <(E-Mail Removed)> wrote:
>
> > Yeah, but that code will basically be in a busy wait loop. It will
> > constantly have an exception thrown, recaught and retried. By using a
> > blocking put on the queue (my first post), the thread can yield itself
> > until it can actually do something. I think your example would eat the
> > CPU and is more complex than the first example I had.

>
> You probably need to write your own BlockingQueue implementation to get
> the behaviour you want. SynchronousQueue would be a good place to start,
> it kind of does what you want, except it returns immediately from its
> offer() method instead of blocking, and thus leads to the task being
> rejected. The overloaded offer method that takes a timeout is closer
> still. With a long enough timeout, it would effectively block.
> Unfortunately, the ThreadPoolExecutor class does not seem to make use of
> this offer method. However, it would be trivial to write a wrapper for
> SynchronousQueue that implements offer(E) by delegating to the other offer
> method with a suitably long timeout.
>


The first example I had, in the first post has something that works.
But it does this by accesing the queue directly when an item is
rejected. So, I have something that works. What I'm wondering is, if
there is anything wrong with the way I'm doing it.

I thought about trying it the way you are talking about and overriding
ArrayBlockingQueue's offer method to actually call put. For some
reason that gave me shivers...... I know I could set it up in a way
that no other class could use that subclass though.



> Dan.
>
> --
> Daniel Dyer
> http://www.uncommons.org


 
Reply With Quote
 
Daniel Dyer
Guest
Posts: n/a
 
      12-19-2006
On Tue, 19 Dec 2006 19:39:13 -0000, <(E-Mail Removed)> wrote:

> The first example I had, in the first post has something that works.
> But it does this by accesing the queue directly when an item is
> rejected. So, I have something that works. What I'm wondering is, if
> there is anything wrong with the way I'm doing it.
> I thought about trying it the way you are talking about and overriding
> ArrayBlockingQueue's offer method to actually call put. For some
> reason that gave me shivers...... I know I could set it up in a way
> that no other class could use that subclass though.


I'll admit that my suggestion is not particularly elegant in terms of
implementation. However, I think there is something to be said for
encapsulating this behaviour within the queue implementation, particularly
if you want to re-use it, since you won't have to worry about setting a
RejectedExecutionHandler on every thread pool you create. So I'm
convinced that the custom queue implementation is the right approach, but
the question remains as to what form should that implementation take,
whether you should write something from scratch or adapt one of the
existing implementations (presumably SynchronousQueue or
ArrayBlockingQueue).

I'd overlooked the put method but, now that you mention it, I think this
has to be better than my suggestion of fudging the issue by calling offer
with a long timeout (unless, of course, you want a timeout). There is
nothing in the API documentation that suggests that a blocking
implementation of offer would be a bad thing.

Dan.

--
Daniel Dyer
http://www.uncommons.org
 
Reply With Quote
 
wesley.hall@gmail.com
Guest
Posts: n/a
 
      12-20-2006

> Yeah, but that code will basically be in a busy wait loop. It will
> constantly have an exception thrown, recaught and retried. By using a
> blocking put on the queue (my first post), the thread can yield itself
> until it can actually do something. I think your example would eat the
> CPU and is more complex than the first example I had.


Huh?

There is no exception thrown, recaught and retried. Not sure where you
got this idea from. My code uses the blocking methods of the
BlockingQueue (which the executor does not). The CPU will not run hot
and an exception is not thrown unless the thread is interupted (which
it is not under normal operation).

Personally, I don't the idea of managing rejected execution
retroactively (as per your example). If you are happy with this and
prefer this approach then great. It is fairly subjective after all.

Dan's solution is nice. I was under the (incorrect) impression that the
offer method was documented as failing when queue is full, but it seems
it isn't.

 
Reply With Quote
 
castillo.bryan@gmail.com
Guest
Posts: n/a
 
      12-20-2006

(E-Mail Removed) wrote:
> > Yeah, but that code will basically be in a busy wait loop. It will
> > constantly have an exception thrown, recaught and retried. By using a
> > blocking put on the queue (my first post), the thread can yield itself
> > until it can actually do something. I think your example would eat the
> > CPU and is more complex than the first example I had.

>
> Huh?
>
> There is no exception thrown, recaught and retried. Not sure where you
> got this idea from. My code uses the blocking methods of the
> BlockingQueue (which the executor does not). The CPU will not run hot
> and an exception is not thrown unless the thread is interupted (which
> it is not under normal operation).
>



Sorry, I misread the code, I thought one of the comments indicated that
"more code should be written..." (And I didn't mean to sound like a
jerk - sorry)

So I ran your code, but it still doesn't actually limit the main
producing thread. The queue you used for the ThreadPool (not the other
one) is unbounded, so that's why you didn't get any exceptions. So the
problem I had with an unbounded queue, is that I could fill up memory.
If you put a print statement right after your for loop which puts items
on, you will see it gets through the loop very fast. The extra thread
and ArrayBlockingQueue don't help in limiting throughput.


> Personally, I don't the idea of managing rejected execution
> retroactively (as per your example). If you are happy with this and
> prefer this approach then great. It is fairly subjective after all.
>


I don't know that I like it either, but it seems like that was the
model intended by the API.

> Dan's solution is nice. I was under the (incorrect) impression that the
> offer method was documented as failing when queue is full, but it seems
> it isn't.


It is documented:
http://java.sun.com/j2se/1.5.0/docs/...e.html#offer(E)

 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
ThreadPoolExecutor backport Philipp Java 6 07-31-2008 07:32 PM
Holding threads after timeout (ThreadPoolExecutor) pksiazek Java 2 10-15-2007 02:41 PM
Suspending threads by ThreadPoolExecutor Maciej Java 1 10-27-2006 02:08 PM
looking for good guide to using ThreadPoolExecutor Marc E Java 0 04-29-2006 03:13 PM
ThreadPoolExecutor implementation question allen@rrsg.ee.uct.ac.za Java 0 10-04-2005 03:14 PM



Advertisments