Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Python > Multiprocessing problem with producer/consumer

Reply
Thread Tools

Multiprocessing problem with producer/consumer

 
 
Wu Zhe
Guest
Posts: n/a
 
      05-27-2009

I am writing a server program with one producer and multiple consumers,
what confuses me is only the first task producer put into the queue gets
consumed, after which tasks enqueued no longer get consumed, they remain
in the queue forever.

from multiprocessing import Process, Pool, Queue, cpu_count
from http import httpserv

def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)

class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()

def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers
w.start()

httpserv(self.queue)

def reload(self):
print "RELOAD"

def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESS):
self.workers[i].join()
queue.close()

Manager().start()

The producer is a HTTP server which put a task in the queue once receive
a request from the user. It seems that consumer processes are still
blocked when there are new tasks in the queue, which is weird.

P.S. Another two questions not relating to the above, I am not sure if
it's better to put HTTP server in its own process other than the main
process, if yes how can I make the main process keep running before all
children processes end. Second question, what's the best way to stop the
HTTP server gracefully?
 
Reply With Quote
 
 
 
 
MRAB
Guest
Posts: n/a
 
      05-27-2009
Wu Zhe wrote:
> I am writing a server program with one producer and multiple consumers,
> what confuses me is only the first task producer put into the queue gets
> consumed, after which tasks enqueued no longer get consumed, they remain
> in the queue forever.
>
> from multiprocessing import Process, Pool, Queue, cpu_count
> from http import httpserv
>
> def work(queue):
> while True:
> task = queue.get()
> if task is None:
> break
> time.sleep(5)


The 'time' module hasn't been imported, so the worker raises an
exception when it gets to this line and then terminates.

> print "task done:", task
> queue.put(None)
>
> class Manager:
> def __init__(self):
> self.queue = Queue()
> self.NUMBER_OF_PROCESSES = cpu_count()
>
> def start(self):
> self.workers = [Process(target=work, args=(self.queue,))
> for i in xrange(self.NUMBER_OF_PROCESSES)]
> for w in self.workers


Missing ":" on the end of the line.

> w.start()
>
> httpserv(self.queue)
>
> def reload(self):
> print "RELOAD"
>
> def stop(self):
> self.queue.put(None)
> for i in range(self.NUMBER_OF_PROCESS):


Should be "self.NUMBER_OF_PROCESSES".

> self.workers[i].join()
> queue.close()
>
> Manager().start()
>
> The producer is a HTTP server which put a task in the queue once receive
> a request from the user. It seems that consumer processes are still
> blocked when there are new tasks in the queue, which is weird.
>
> P.S. Another two questions not relating to the above, I am not sure if
> it's better to put HTTP server in its own process other than the main
> process, if yes how can I make the main process keep running before all
> children processes end. Second question, what's the best way to stop the
> HTTP server gracefully?


 
Reply With Quote
 
 
 
 
Piet van Oostrum
Guest
Posts: n/a
 
      05-27-2009
>>>>> Wu Zhe <(E-Mail Removed)> (WZ) wrote:

>WZ> I am writing a server program with one producer and multiple consumers,
>WZ> what confuses me is only the first task producer put into the queue gets
>WZ> consumed, after which tasks enqueued no longer get consumed, they remain
>WZ> in the queue forever.


>WZ> from multiprocessing import Process, Pool, Queue, cpu_count
>WZ> from http import httpserv


>WZ> def work(queue):
>WZ> while True:
>WZ> task = queue.get()
>WZ> if task is None:
>WZ> break
>WZ> time.sleep(5)
>WZ> print "task done:", task
>WZ> queue.put(None)


>WZ> class Manager:
>WZ> def __init__(self):
>WZ> self.queue = Queue()
>WZ> self.NUMBER_OF_PROCESSES = cpu_count()


>WZ> def start(self):
>WZ> self.workers = [Process(target=work, args=(self.queue,))
>WZ> for i in xrange(self.NUMBER_OF_PROCESSES)]
>WZ> for w in self.workers
>WZ> w.start()


>WZ> httpserv(self.queue)


>WZ> def reload(self):
>WZ> print "RELOAD"


>WZ> def stop(self):
>WZ> self.queue.put(None)
>WZ> for i in range(self.NUMBER_OF_PROCESS):
>WZ> self.workers[i].join()
>WZ> queue.close()


>WZ> Manager().start()


>WZ> The producer is a HTTP server which put a task in the queue once receive
>WZ> a request from the user. It seems that consumer processes are still
>WZ> blocked when there are new tasks in the queue, which is weird.


How do you know there are still tasks in the queue?

When I replace your httpserv(self.queue) with:

for i in range(100):
self.queue.put(i)
it just works. So it seems probable to me that the problem is in
httpserv. Maybe it stalls or maybe it puts a None in the queue? You
could debug by logging the puts in the queue.

>WZ> P.S. Another two questions not relating to the above, I am not sure if
>WZ> it's better to put HTTP server in its own process other than the main
>WZ> process, if yes how can I make the main process keep running before all
>WZ> children processes end. Second question, what's the best way to stop the
>WZ> HTTP server gracefully?


In don't think it is useful to put the HTTP server in its own process as
the Manager process has hardly anything to do. But if you do you can
make it wait by doing the join of the worker processes at the end,
instead of inside the stop().

Stopping the HTTP server: is it multithreaded? You can have a boolean
that indicates it should accept no new requests. Without more info about
the server it is hard to give a more detailed answer.
--
Piet van Oostrum <(E-Mail Removed)>
URL: http://pietvanoostrum.com [PGP 8DAE142BE17999C4]
Private email: http://www.velocityreviews.com/forums/(E-Mail Removed)
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Re: problem with multiprocessing and defaultdict Robert Kern Python 2 01-12-2010 12:16 PM
Problem with multiprocessing managers Metalone Python 0 01-06-2010 10:50 PM
Problem with multiprocessing tleeuwenburg@gmail.com Python 3 09-04-2009 05:48 AM
Re: problem in implementing multiprocessing James Mills Python 2 01-19-2009 11:52 AM
problem in implementing multiprocessing gopal mishra Python 0 01-16-2009 05:35 AM



Advertisments