Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Python > draining pipes simultaneously

Reply
Thread Tools

draining pipes simultaneously

 
 
Dmitry Teslenko
Guest
Posts: n/a
 
      03-05-2008
Hello!
Here's my implementation of a function that executes some command and
drains stdout/stderr invoking other functions for every line of
command output:

def __execute2_drain_pipe(queue, pipe):
for line in pipe:
queue.put(line)
return

def execute2(command, out_filter = None, err_filter = None):
p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
env = os.environ)

qo = Queue.Queue()
qe = Queue.Queue()

to = threading.Thread(target = __execute2_drain_pipe, \
args = (qo, p.stdout))
to.start()
time.sleep(0)
te = threading.Thread(target = __execute2_drain_pipe, \
args = (qe, p.stderr))
te.start()

while to.isAlive() or te.isAlive():
try:
line = qo.get()
if out_filter:
out_filter(line)
qo.task_done()
except Queue.Empty:
pass

try:
line = qe.get()
if err_filter:
err_filter(line)
qe.task_done()
except Queue.Empty:
pass

to.join()
te.join()
return p.wait()

Problem is my implementation is buggy and function hungs when there's
empty stdout/stderr. Can I have your feedback?
 
Reply With Quote
 
 
 
 
bockman@virgilio.it
Guest
Posts: n/a
 
      03-05-2008
On 5 Mar, 10:33, "Dmitry Teslenko" <(E-Mail Removed)> wrote:
> Hello!
> Here's my implementation of a function that executes some command and
> drains stdout/stderr invoking other functions for every line of
> command output:
>
> def __execute2_drain_pipe(queue, pipe):
> * * * * for line in pipe:
> * * * * * * * * queue.put(line)
> * * * * return
>
> def execute2(command, out_filter = None, err_filter = None):
> * * * * p = subprocess.Popen(command , shell=True, stdin = subprocess.PIPE, \
> * * * * * * * * stdout = subprocess.PIPE, stderr = subprocess.PIPE, \
> * * * * * * * * env = os.environ)
>
> * * * * qo = Queue.Queue()
> * * * * qe = Queue.Queue()
>
> * * * * to = threading.Thread(target = __execute2_drain_pipe, \
> * * * * * * * * args = (qo, p.stdout))
> * * * * to.start()
> * * * * time.sleep(0)
> * * * * te = threading.Thread(target = __execute2_drain_pipe, \
> * * * * * * * * args = (qe, p.stderr))
> * * * * te.start()
>
> * * * * while to.isAlive() or te.isAlive():
> * * * * * * * * try:
> * * * * * * * * * * * * line = qo.get()
> * * * * * * * * * * * * if out_filter:
> * * * * * * * * * * * * * * * * out_filter(line)
> * * * * * * * * * * * * qo.task_done()
> * * * * * * * * except Queue.Empty:
> * * * * * * * * * * * * pass
>
> * * * * * * * * try:
> * * * * * * * * * * * * line = qe.get()
> * * * * * * * * * * * * if err_filter:
> * * * * * * * * * * * * * * * * err_filter(line)
> * * * * * * * * * * * * qe.task_done()
> * * * * * * * * except Queue.Empty:
> * * * * * * * * * * * * pass
>
> * * * * to.join()
> * * * * te.join()
> * * * * return p.wait()
>
> Problem is my implementation is buggy and function hungs when there's
> empty stdout/stderr. Can I have your feedback?


The Queue.get method by default is blocking. The documentation is not
100% clear about that (maybe it should report
the full python definition of the function parameters, which makes
self-evident the default value) but if you do
help(Queue.Queue) in a python shell you will see it.

Hence, try using a timeout or a non-blocking get (but in case of a non
blocking get you should add a delay in the
loop, or you will poll the queues at naximum speed and maybe prevent
the other threads from accessing them).

Ciao
-----
FB

 
Reply With Quote
 
 
 
 
Dmitry Teslenko
Guest
Posts: n/a
 
      03-05-2008
On Wed, Mar 5, 2008 at 1:34 PM, <(E-Mail Removed)> wrote:
> The Queue.get method by default is blocking. The documentation is not
> 100% clear about that (maybe it should report
> the full python definition of the function parameters, which makes
> self-evident the default value) but if you do
> help(Queue.Queue) in a python shell you will see it.


> Hence, try using a timeout or a non-blocking get (but in case of a non
> blocking get you should add a delay in the
> loop, or you will poll the queues at naximum speed and maybe prevent
> the other threads from accessing them).


Thanks for advice! Finally I came up to following loop:

while to.isAlive() or te.isAlive():
try:
while True:
line = qo.get(False)
if out_filter:
out_filter(line)
except Queue.Empty:
pass

try:
while True:
line = qe.get(False)
if err_filter:
err_filter(line)
except Queue.Empty:
pass

Inserting delay in the beginning of the loop causes feeling of command
taking long to start and delay at the end of the loop may cause of
data loss when both thread became inactive during delay.
 
Reply With Quote
 
bockman@virgilio.it
Guest
Posts: n/a
 
      03-05-2008

>
> Inserting delay in the beginning of the loop causes feeling of command
> taking long to start and delay at the end of the loop may cause of
> data loss when both thread became inactive during delay.


time.sleep() pauses ony the thread that executes it, not the
others. And queue objects can hold large amount of data (if you have
the RAM),
so unless your subprocess is outputting data very fast, you should not
have data loss.
Anyway, if it works for you ...

Ciao
-----
FB
 
Reply With Quote
 
Dmitry Teslenko
Guest
Posts: n/a
 
      03-05-2008
On Wed, Mar 5, 2008 at 3:39 PM, <(E-Mail Removed)> wrote:
> time.sleep() pauses ony the thread that executes it, not the
> others. And queue objects can hold large amount of data (if you have
> the RAM),
> so unless your subprocess is outputting data very fast, you should not
> have data loss.
> Anyway, if it works for you ...


After some testing I'll agree Without time.sleep() in main thread
python eats up all aviable processor time
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Memory draining when on internet John Computer Information 2 11-22-2005 01:10 AM
can't run firefox from work and home simultaneously wayne Firefox 0 04-27-2005 02:59 PM
WiFi: ad-hoc and infrastructure mode simultaneously? Eric Wireless Networking 7 08-23-2004 07:57 PM
Obtaining MCSE and MCDBA Simultaneously Boba Fett Microsoft Certification 6 10-28-2003 07:08 PM
Draining the laptop battery Starstuffed Computer Support 6 10-24-2003 05:40 PM



Advertisments