Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Java > producer/consumer remove problem

Reply
Thread Tools

producer/consumer remove problem

 
 
Jeff
Guest
Posts: n/a
 
      10-21-2004
For a specific producer to distribute to several consumers , I have a simple
extension of HashSet of consumers. However, a problem sometimes occurs
while a consumer is removing himself from the HashSet.

The problem occurs when distribute() calls the .eventObserved() method of
the consumer that is trying to remove himself. The .eventObserved() method
never returns. I think that is because the consumer's thread is waiting in
ConsumerSet's remove(). The producing caller and consumer are always
different threads.

To solve the problem, I'm considering having remove() spawn a thread to do
the remove so that remove does not wait on synchronized. distribute() is
called A LOT, but remove() is called rarely.

Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
Is there a better solution, or at least more Java-like?

public class ConsumerSet extends HashSet {
public void distribute(Object message) {
synchronized( this ) {
for (Iterator i = iterator(); i.hasNext()
((Consumer) i.next()).eventObserved(message);
}
}

public void add( Consumer consumer ) {
synchronized (this) {
super.add( consumer );
}
}

public void remove( Consumer consumer ) {
synchronized ( this ) {
super.remove( consumer );
}
}
} // ConsumerSet

// usually be an extension of Thread
public interface Consumer {
/**
* Invoked when an event is observed
*/
public void eventObserved( Object toProcess );
}



--
Jeff


 
Reply With Quote
 
 
 
 
xarax
Guest
Posts: n/a
 
      10-21-2004
"Jeff" <(E-Mail Removed)> wrote in message
news:8tSdd.6713$Ug4.1382@trndny01...
> For a specific producer to distribute to several consumers , I have a simple
> extension of HashSet of consumers. However, a problem sometimes occurs
> while a consumer is removing himself from the HashSet.
>
> The problem occurs when distribute() calls the .eventObserved() method of
> the consumer that is trying to remove himself. The .eventObserved() method
> never returns. I think that is because the consumer's thread is waiting in
> ConsumerSet's remove(). The producing caller and consumer are always
> different threads.


That's your intent, but likely not what you implemented.

> To solve the problem, I'm considering having remove() spawn a thread to do
> the remove so that remove does not wait on synchronized. distribute() is
> called A LOT, but remove() is called rarely.
>
> Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
> Is there a better solution, or at least more Java-like?
>
> public class ConsumerSet extends HashSet {
> public void distribute(Object message) {
> synchronized( this ) {
> for (Iterator i = iterator(); i.hasNext()
> ((Consumer) i.next()).eventObserved(message);
> }
> }
>
> public void add( Consumer consumer ) {
> synchronized (this) {
> super.add( consumer );
> }
> }
>
> public void remove( Consumer consumer ) {
> synchronized ( this ) {
> super.remove( consumer );
> }
> }
> } // ConsumerSet
>
> // usually be an extension of Thread
> public interface Consumer {
> /**
> * Invoked when an event is observed
> */
> public void eventObserved( Object toProcess );
> }


This is likely your problem. You seem to have one
thread calling eventObserved(Object) that is defined
in the Thread instance of another thread. Of course,
you should know that that will not cause eventObserved()
method to run under the other thread. The other thread
must itself call eventObserved().

Your design is likely flawed.

If you have a producer thread and multiple consumer
threads, then you need something like an event queue.
The producer thread places a node onto the queue. Other
consumer threads are waiting for a node to appear on
the queue. One of the consumer threads will pull the
node off of the queue and process it.

The Observer/Observed pattern doesn't work for multiple
threads. You need a synchronized event queue where the
consumer threads will wait() until the queue is non-empty,
and the producer thread will notifyAll() when it puts a
new node onto the queue.

If you need some mutex classes, you can download the
source at http://mindprod.com/products.html, look for
the Mutex download.

Also, J2SE 5.0 has new interfaces and classes that are
very similar to the mutex download (see above).

Hope this helps.

--
----------------------------
Jeffrey D. Smith
Farsight Systems Corporation
24 BURLINGTON DRIVE
LONGMONT, CO 80501-6906
http://www.farsight-systems.com
z/Debug debugs your Systems/C programs running on IBM z/OS for FREE!



 
Reply With Quote
 
 
 
 
John C. Bollinger
Guest
Posts: n/a
 
      10-21-2004
Jeff wrote:

> For a specific producer to distribute to several consumers , I have a simple
> extension of HashSet of consumers. However, a problem sometimes occurs
> while a consumer is removing himself from the HashSet.


I don't see the point of extending HashSet here. I think you are
placing responsibilities on your extended version that more properly
belong on the producer object.

> The problem occurs when distribute() calls the .eventObserved() method of
> the consumer that is trying to remove himself. The .eventObserved() method
> never returns. I think that is because the consumer's thread is waiting in
> ConsumerSet's remove().


It is conceivable that you are getting deadlocks this way, but it would
depend on how the consumer's eventObserved() method was written. If I
interpret your code and comments rightly, then eventObserved() will be
executed by the producer's thread, whereas ConsumerSet.remove() will be
executed by the consumer's thread.

> The producing caller and consumer are always
> different threads.


This is causing you confusion. Do not put application logic into Thread
subclasses; use Runnables instead. This gives you a more consistent
object model in the first place, but more importantly, it tends to
reduce confusion about who can do what, when, and to whom.

> To solve the problem, I'm considering having remove() spawn a thread to do
> the remove so that remove does not wait on synchronized. distribute() is
> called A LOT, but remove() is called rarely.


No, don't. Throwing more threads at a synchronization problem just
makes for a messier synchronization problem.

> Because I use ConsumerSet extensively, I'd like to get some wiser opinions.
> Is there a better solution, or at least more Java-like?
>
> public class ConsumerSet extends HashSet {
> public void distribute(Object message) {
> synchronized( this ) {
> for (Iterator i = iterator(); i.hasNext()
> ((Consumer) i.next()).eventObserved(message);
> }
> }


The distribute() method belongs on your producer object, not on the set.
The set should not be exposed directly to consumer objects; instead
they should register themselves with the producer, which will add them
to the Set. This obviates any need for type-safe add() and remove()
methods, so this whole class becomes superfluous.

> public void add( Consumer consumer ) {
> synchronized (this) {
> super.add( consumer );
> }
> }
>
> public void remove( Consumer consumer ) {
> synchronized ( this ) {
> super.remove( consumer );
> }
> }
> } // ConsumerSet
>
> // usually be an extension of Thread


Should NOT be an extension of Thread. Probably should not even be an
implementation of Runnable. May be an object shared between the
producer thread and some other thread.

> public interface Consumer {
> /**
> * Invoked when an event is observed
> */
> public void eventObserved( Object toProcess );
> }


Your producer should look something like this:

public class Producer implements Runnable {

private Set consumerSet = new HashSet();

public void registerConsumer(Consumer c) {
synchronized (consumerSet) {
consumerSet.add(c);
}
}

public void unregisterConsumer(Consumer c) {
synchronized (consumerSet) {
consumerSet.remove(c);
}
}

protected void fireEvent(Object event) {
synchronized (consumerSet) {
for (Iterator it = consumerSet.iterator(); it.next(); ) {
((Consumer) it.next()).eventObserved(event);
}
}
}

public void run() {
// do stuff that ends up invoking fireEvent() periodically
}
}

(Rather resembles your ConsumerSet, doesn't it?)

Encapsulating the consumer set in this way prevents any unexpected
synchronization on it that might cause deadlock. Your documentation for
the Consumer interface should remark that its eventObserved() method
must execute quickly and that the scope of its execution must not
include any attempt to unregister the consumer (which would not
deadlock, but might throw a ConcurrentModificationException).

You should also keep in mind that registering and unregistering event
listeners (err... consumers) can block on completion of fireEvent(), and
therefore no thread should invoke registerConsumer() or
unregisterConsumer() while holding the monitor for an object that the
relevant Consumer's eventObserved() method needs to lock. [That is very
likely what is causing your deadlock now.] In particular, it may not be
possible to prevent a Consumer from observing events after a request to
unregister it has been dispatched (but before the unregisterConsumer()
returns).


John Bollinger
http://www.velocityreviews.com/forums/(E-Mail Removed)
 
Reply With Quote
 
Jeff
Guest
Posts: n/a
 
      10-21-2004
Thanks for the thoughtful response. I need to clarify the problem.

ALL consumers must consume the object. It's really a network multicast
requirement in software. The queue implementation fails to achieve this
requirement because the first consumer dequeues the object. The second
consumer also needs the object, but it's gone from the queue.

I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.


"xarax" <(E-Mail Removed)> wrote in message
news3Udd.3405$%(E-Mail Removed) ink.net...
> "Jeff" <(E-Mail Removed)> wrote in message
> news:8tSdd.6713$Ug4.1382@trndny01...
> > For a specific producer to distribute to several consumers , I have a

simple
> > extension of HashSet of consumers. However, a problem sometimes occurs
> > while a consumer is removing himself from the HashSet.
> >
> > The problem occurs when distribute() calls the .eventObserved() method

of
> > the consumer that is trying to remove himself. The .eventObserved()

method
> > never returns. I think that is because the consumer's thread is waiting

in
> > ConsumerSet's remove(). The producing caller and consumer are always
> > different threads.

>
> That's your intent, but likely not what you implemented.
>
> > To solve the problem, I'm considering having remove() spawn a thread to

do
> > the remove so that remove does not wait on synchronized. distribute() is
> > called A LOT, but remove() is called rarely.
> >
> > Because I use ConsumerSet extensively, I'd like to get some wiser

opinions.
> > Is there a better solution, or at least more Java-like?
> >
> > public class ConsumerSet extends HashSet {
> > public void distribute(Object message) {
> > synchronized( this ) {
> > for (Iterator i = iterator(); i.hasNext()
> > ((Consumer) i.next()).eventObserved(message);
> > }
> > }
> >
> > public void add( Consumer consumer ) {
> > synchronized (this) {
> > super.add( consumer );
> > }
> > }
> >
> > public void remove( Consumer consumer ) {
> > synchronized ( this ) {
> > super.remove( consumer );
> > }
> > }
> > } // ConsumerSet
> >
> > // usually be an extension of Thread
> > public interface Consumer {
> > /**
> > * Invoked when an event is observed
> > */
> > public void eventObserved( Object toProcess );
> > }

>
> This is likely your problem. You seem to have one
> thread calling eventObserved(Object) that is defined
> in the Thread instance of another thread. Of course,
> you should know that that will not cause eventObserved()
> method to run under the other thread. The other thread
> must itself call eventObserved().
>
> Your design is likely flawed.
>
> If you have a producer thread and multiple consumer
> threads, then you need something like an event queue.
> The producer thread places a node onto the queue. Other
> consumer threads are waiting for a node to appear on
> the queue. One of the consumer threads will pull the
> node off of the queue and process it.
>
> The Observer/Observed pattern doesn't work for multiple
> threads. You need a synchronized event queue where the
> consumer threads will wait() until the queue is non-empty,
> and the producer thread will notifyAll() when it puts a
> new node onto the queue.
>
> If you need some mutex classes, you can download the
> source at http://mindprod.com/products.html, look for
> the Mutex download.
>
> Also, J2SE 5.0 has new interfaces and classes that are
> very similar to the mutex download (see above).
>
> Hope this helps.
>
> --
> ----------------------------
> Jeffrey D. Smith
> Farsight Systems Corporation
> 24 BURLINGTON DRIVE
> LONGMONT, CO 80501-6906
> http://www.farsight-systems.com
> z/Debug debugs your Systems/C programs running on IBM z/OS for FREE!
>
>
>



 
Reply With Quote
 
xarax
Guest
Posts: n/a
 
      10-22-2004

"Jeff" <(E-Mail Removed)> wrote in message
news:zDVdd.17078$fP3.74@trndny05...
> "xarax" <(E-Mail Removed)> wrote in message
> news3Udd.3405$%(E-Mail Removed) ink.net...
> > "Jeff" <(E-Mail Removed)> wrote in message
> > news:8tSdd.6713$Ug4.1382@trndny01...
> > > For a specific producer to distribute to several consumers , I have a

> simple
> > > extension of HashSet of consumers. However, a problem sometimes occurs
> > > while a consumer is removing himself from the HashSet.
> > >
> > > The problem occurs when distribute() calls the .eventObserved() method

> of
> > > the consumer that is trying to remove himself. The .eventObserved()

> method
> > > never returns. I think that is because the consumer's thread is waiting

> in
> > > ConsumerSet's remove(). The producing caller and consumer are always
> > > different threads.

> >
> > That's your intent, but likely not what you implemented.
> >
> > > To solve the problem, I'm considering having remove() spawn a thread to

> do
> > > the remove so that remove does not wait on synchronized. distribute() is
> > > called A LOT, but remove() is called rarely.
> > >
> > > Because I use ConsumerSet extensively, I'd like to get some wiser

> opinions.
> > > Is there a better solution, or at least more Java-like?
> > >
> > > public class ConsumerSet extends HashSet {
> > > public void distribute(Object message) {
> > > synchronized( this ) {
> > > for (Iterator i = iterator(); i.hasNext()
> > > ((Consumer) i.next()).eventObserved(message);
> > > }
> > > }
> > >
> > > public void add( Consumer consumer ) {
> > > synchronized (this) {
> > > super.add( consumer );
> > > }
> > > }
> > >
> > > public void remove( Consumer consumer ) {
> > > synchronized ( this ) {
> > > super.remove( consumer );
> > > }
> > > }
> > > } // ConsumerSet
> > >
> > > // usually be an extension of Thread
> > > public interface Consumer {
> > > /**
> > > * Invoked when an event is observed
> > > */
> > > public void eventObserved( Object toProcess );
> > > }

> >
> > This is likely your problem. You seem to have one
> > thread calling eventObserved(Object) that is defined
> > in the Thread instance of another thread. Of course,
> > you should know that that will not cause eventObserved()
> > method to run under the other thread. The other thread
> > must itself call eventObserved().
> >
> > Your design is likely flawed.
> >
> > If you have a producer thread and multiple consumer
> > threads, then you need something like an event queue.
> > The producer thread places a node onto the queue. Other
> > consumer threads are waiting for a node to appear on
> > the queue. One of the consumer threads will pull the
> > node off of the queue and process it.
> >
> > The Observer/Observed pattern doesn't work for multiple
> > threads. You need a synchronized event queue where the
> > consumer threads will wait() until the queue is non-empty,
> > and the producer thread will notifyAll() when it puts a
> > new node onto the queue.
> >
> > If you need some mutex classes, you can download the
> > source at http://mindprod.com/products.html, look for
> > the Mutex download.
> >
> > Also, J2SE 5.0 has new interfaces and classes that are
> > very similar to the mutex download (see above).
> >
> > Hope this helps.

> Thanks for the thoughtful response. I need to clarify the problem.
>
> ALL consumers must consume the object. It's really a network multicast
> requirement in software. The queue implementation fails to achieve this
> requirement because the first consumer dequeues the object. The second
> consumer also needs the object, but it's gone from the queue.
>
> I considered the 1.5 enhancements, but I would rather not move to 1.5 yet.
>
>

Please don't top post.

It would seem that the consumable object must be posted
to each and every consumer queue via the distribute()
method. Then each consumer thread can process the consumable.

Of course, by now you understand that extending Thread
to add the eventObserved() method is worse than useless,
because it clouds the semantics of your design, doesn't
offer any functionality to the Thread class, and can
confuse the casual reader into thinking that somehow
the target thread is processing the method call.


 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Httpmodule still executed even if remove using <remove> config entry Simon-Pierre Jarry ASP .Net 2 08-10-2005 11:22 AM
How to remove items from add/remove list please Caractucus Potts Computer Support 5 07-03-2005 10:31 PM
Re: Why I can not remove all dialers after "remove"-I have two anti-dialers programs Joseph Ladovic Computer Security 3 05-26-2005 03:00 AM
Add/Remove Program Glitch: Asks If I Want To To Remove Wrong Program ? Robert11 Computer Support 6 08-02-2004 09:02 PM
how to remove software from add/remove mohnic Computer Information 2 02-07-2004 11:31 PM



Advertisments