Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Java > Pattern for queue + thread pool with in-order processing?

Reply
Thread Tools

Pattern for queue + thread pool with in-order processing?

 
 
meselfo
Guest
Posts: n/a
 
      03-22-2009
Im writing a server that accepts connections from multiple network
clients. Messages from these client are submitted as tasks to worker
threads through a bounded queue. Messages coming from the _SAME_
client needs to be processed in the order they were received from that
client. How do I enforce this requirement?

Im using ThreadPoolExecutor from the java api which has a built in
queue for submitting tasks. If I stamp messages from the same client
with a sequence number and make worker threads aware of this number so
that they pause processing a message if the number indicates that
another message from the same client should finish processing first
then i'm effectively blocking that worker thread - thus a client
sending many messages fast could reduce the number of effective worker
threads. There must be some sort of pattern on how to process some
tasks submitted to a pool of worker thread in order??

 
Reply With Quote
 
 
 
 
Giovanni Azua
Guest
Posts: n/a
 
      03-22-2009
hi there,

"meselfo" <(E-Mail Removed)> wrote in message
> Im writing a server that accepts connections from multiple network
> clients. Messages from these client are submitted as tasks to worker
> threads through a bounded queue. Messages coming from the _SAME_
> client needs to be processed in the order they were received from that
> client. How do I enforce this requirement?
>
> [snip]
>

If you can assure to have a finite number of different clients or expect it
to be reasonably bounded then I think a good solution would be to dedicate a
single Queue per client. You would insert in the appropriate Queue depending
on the Client ID or e.g. in JMS terms a Correlation ID. I think you have a
very good use-case for using the ArrayBlockingQueue implementation:
http://java.sun.com/javase/6/docs/ap...kingQueue.html

With this setup I think you will have minimal contention since multiple
threads will consume from multiple orders without having to worry about
breaking the per-client-ordering invariant. However, I would be curious as
to whether there are better performing alternatives.

HTH,
Best regards,
Giovanni


 
Reply With Quote
 
 
 
 
Mike Schilling
Guest
Posts: n/a
 
      03-22-2009
meselfo wrote:
> Im writing a server that accepts connections from multiple network
> clients. Messages from these client are submitted as tasks to worker
> threads through a bounded queue. Messages coming from the _SAME_
> client needs to be processed in the order they were received from
> that
> client. How do I enforce this requirement?
>
> Im using ThreadPoolExecutor from the java api which has a built in
> queue for submitting tasks. If I stamp messages from the same client
> with a sequence number and make worker threads aware of this number
> so
> that they pause processing a message if the number indicates that
> another message from the same client should finish processing first
> then i'm effectively blocking that worker thread - thus a client
> sending many messages fast could reduce the number of effective
> worker
> threads.


Right, you don't want to change how the worker threads execute. You
want to change the order in which messages are pulled off the queue.

> There must be some sort of pattern on how to process some
> tasks submitted to a pool of worker thread in order??


There are two things you need to ensure:

1. No messages from client A get processed as long as any other
messages from client A are still being processed.
2. When it's safe to process another message from client A, process
the earliest one.

You get 2 for free, as it's how queues work. All you really need to
do is:

A. Keep data about which clients have messages currently being
processed. The simplest thing here is a Set into which clients are
added when a job starts and removed when it ends.
B. Create a queue implementation that skips messages whose clients are
in the set from part A. That is, if messages from C1 and C2, but not
C3, are currently being processed, and the queue looks like

C1.5, C1.6, C2.4, C3.6

the next item returned would be C3.6. Now, if the C1 processing
completes, the next item returned will be C1.5.

The tricky bit here, I think, will be implementing your queue's poll()
method, since it needs to check whether to complete both when

i. a new item is added to the queue, and
ii. message-processing completes (i.e. a client is removed from the
Set), which may make an existing queue item newly eligible to be
returned.


 
Reply With Quote
 
Mike Schilling
Guest
Posts: n/a
 
      03-22-2009
Mike Schilling wrote:
> meselfo wrote:
>> Im writing a server that accepts connections from multiple network
>> clients. Messages from these client are submitted as tasks to
>> worker
>> threads through a bounded queue. Messages coming from the _SAME_
>> client needs to be processed in the order they were received from
>> that
>> client. How do I enforce this requirement?
>>
>> Im using ThreadPoolExecutor from the java api which has a built in
>> queue for submitting tasks. If I stamp messages from the same
>> client
>> with a sequence number and make worker threads aware of this number
>> so
>> that they pause processing a message if the number indicates that
>> another message from the same client should finish processing first
>> then i'm effectively blocking that worker thread - thus a client
>> sending many messages fast could reduce the number of effective
>> worker
>> threads.

>
> Right, you don't want to change how the worker threads execute. You
> want to change the order in which messages are pulled off the queue.
>
>> There must be some sort of pattern on how to process some
>> tasks submitted to a pool of worker thread in order??

>
> There are two things you need to ensure:
>
> 1. No messages from client A get processed as long as any other
> messages from client A are still being processed.
> 2. When it's safe to process another message from client A, process
> the earliest one.
>
> You get 2 for free, as it's how queues work. All you really need to
> do is:


Here's a simpler version (since it doesn't require implementing a new
kind of BlockingQueue)
>
> A. Keep data about which clients have messages currently being
> processed. The simplest thing here is a Set into which clients are
> added when a job starts and removed when it ends.


This changes slightly:

A. Keep data about which clients have messages which have been queued
to the ThreadPoolExecutor but are not yet fully processed. The
simplest thing
here is a Set into which clients areadded when a message is queued and
removed
when it is done being processed.

> B. Create a queue implementation that skips messages whose clients
> are
> in the set from part A. That is, if messages from C1 and C2, but
> not
> C3, are currently being processed, and the queue looks like
>
> C1.5, C1.6, C2.4, C3.6
>
> the next item returned would be C3.6. Now, if the C1 processing
> completes, the next item returned will be C1.5.


Instead of this,

B1. Use a standard BlockingQueue impelmentation for the
ThreadPoolExecutor,
but instead of adding new messages to it directly:
B2. For each client, create a LinkedList of messages to be processed,
which will
eventually be moved to the BlockingQueue, and do the following:

WHEN a message is received from a client
IF the client is currently in the Set described above
THEN
Append that message to that client's LinkedList.
ELSE
Append it directly to the Blocking Queue
Add the client to the Set

WHEN a client finishes processing a message
IF the client's LinkedList is empty
THEN
Remove the client from the Set
ELSE
Move the oldest message from the client's LinkedList to
the BlockingQueue



 
Reply With Quote
 
meselfo
Guest
Posts: n/a
 
      03-23-2009

> This changes slightly:
>
> A. Keep data about which clients have messages which have been queued
> to the ThreadPoolExecutor but are not yet fully processed. The
> simplest thing
> here is a Set into which clients areadded when a message is queued and
> removed


Im using NIO for non-blocking read() operations in the IO thread. Thus
one
thread is handling all client accept and read. Im using the Selector
API
in the IO thread for accept/read readiness notification.

I was thinking of using the selection key .attach() method to store
data specific to the client, such as a flag that indicates if a
message
from that client has been submitted to the worker pool. The selection
key
is being passed along with the message to the worker thread. The flag
would then be cleared by the worker thread once is has finished
processing
the message.


> Instead of this,
>
> B1. Use a standard BlockingQueue impelmentation for the
> ThreadPoolExecutor,
> but instead of adding new messages to it directly:
> B2. For each client, create a LinkedList of messages to be processed,
> which will
> eventually be moved to the BlockingQueue, and do the following:
>
> WHEN a message is received from a client
> IF the client is currently in the Set described above
> THEN
> Append that message to that client's LinkedList.
> ELSE
> Append it directly to the Blocking Queue
> Add the client to the Set
>
> WHEN a client finishes processing a message
> IF the client's LinkedList is empty
> THEN
> Remove the client from the Set
> ELSE
> Move the oldest message from the client's LinkedList to
> the BlockingQueue


I been playing with something like your suggestion too. I was thinking
that
I could store any excess messages in the selection key attach() until
the
flag mentioned above was cleared to indicate that the client has no
messages
currently in the queue and no message being processed by a worker
thread.
Like this:

WHEN worker thread finishes processing a message
IF more unsubmitted messages exists attached to the same
clients selection key
THEN
Let the worker thread invoke wakeup() on the IO Thread
selector
so the IO thread can submit the next message.

But wakeup() is given me a headache. Basically wakeup() interrupts a
blocked select()
invocation in the IO thread and if the IO thread is not currently
blocked then the
next invocation of select() will return right way. If a lot of worker
threads are
bombarding the IO thread with wakeup() then the IO thread will be
starved of time
to do readiness notification and the queue feeding the worker pool
would be starved.
Thus I would get a fluctuating pattern where reading is starved until
the worker threads
have no more jobs and stop invoking wakeup() at which point the queue
is filled up again,
the worker threads go back to work and the IO is starved again...

Maybe you were thinking to let the worker thread submit the next
unprocessed message
themselves if the same client has more messages in the linked list?
This would avoid the
wakeup() but im not too keen on having anyone else but the IO thread
dispatch jobs to the
worker pool. It complicates the design when different types of threads
dispatch jobs - but
it would give me an undisturbed flow of messages to the worker pool.

Wakeup() is unfortunately also haunting me when I need to write data
using non-blocking writes
through the Selector API. If a worker thread wants to write data back
to a client then
I need to submit the return message back to the IO thread so it can
write the data in a
non-blocking fashion. If I let the worker thread write data by itself
it would effectively
block the thread even if it uses a Selector since the worker thread
really cant pick another
job from the queue while doing readiness selection on a previous write
operation. If the
worker thread wakeup() the IO thread to get a write operation going
then again im starving the
readiness notification in the IO thread since a lot of worker threads
will want to write data
and the IO thread would thus be bombarded with wakeup().
If I create a dedicated IO thread for write operations only I wouldnt
starve the accept/read
operations of the other IO thread but wakeup() to signal the write
thread to poll()
messages sent to it for write operations would still starve it of
write readiness notifications.
I could try blocking the IO thread using select(some short interval)
so no wakeup() is needed
but i would hate to have this artificial time delay on write
operations.
 
Reply With Quote
 
Mike Schilling
Guest
Posts: n/a
 
      03-23-2009
meselfo wrote:
>
> Maybe you were thinking to let the worker thread submit the next
> unprocessed message
> themselves if the same client has more messages in the linked list?
> This would avoid the
> wakeup() but im not too keen on having anyone else but the IO thread
> dispatch jobs to the
> worker pool. It complicates the design when different types of threads
> dispatch jobs - but
> it would give me an undisturbed flow of messages to the worker pool.


Why does it complicate the design? The BlockingQueue is designed to accept
items from many threads; just take a bit of care with the synchronization.

As to the output side, I don't have enough experience with NIO to make any
sensible suggestions.


 
Reply With Quote
 
Cupdoo Cupdoo is offline
Junior Member
Join Date: Sep 2011
Posts: 1
 
      09-11-2011
Quite an interesting problem, I stumbled over this post when I was looking for a solution to exactly the same problem: Dispatching several runnables to a threadpool, but forcing some of these runnables to be executed in strict order.

As suggested here, I tried to use a ThreadPoolExecutor with a custom blocking queue. However, it turned out that sun's implementation bypasses the queue if there are idling threads in the pool. I ended up writing my own ThreadPool with a custom Queue. Works like a charm

For those who might find this thread as well: I've uploaded the source to https://github.com/EarlOfWenc/snippe...encyThreadPool
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Program blocked in Queue.Queue.get and Queue.Queue.put Kris Python 0 01-04-2012 03:46 PM
How to queue each loop iteration into a thread pool, with a max size? David Karr Perl Misc 2 04-07-2011 03:00 PM
Thread control in thread pool for sending testisok Python 0 02-17-2009 07:34 AM
[Thread[pool-1-thread-2,5,main] - What does this mean? Hugo Java 4 03-27-2008 01:42 AM
Is Queue.Queue.queue.clear() thread-safe? Russell Warren Python 4 06-27-2006 03:03 PM



Advertisments