Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Ruby > Managing a fork pool to handle tasks

Reply
Thread Tools

Managing a fork pool to handle tasks

 
 
Abdul-rahman Advany
Guest
Posts: n/a
 
      04-29-2008
Hey guys,

I am doing some background processes in ruby, and I would like to use
forks. But I can't figure out how I can manage them.

- I want to create a pool of child processes (forks) running in the
background, limited on a number I specify
- Create them from a parent task but creation of new child processes
should be blocked untill one of the child processes in the pool is done
- I want to limit the time a child process can run (so it should quit
after x seconds)

I tried to use Threads but somehow when managing a pool of threads
doesn't work (they get stuck while doing stuff and don't die). I tried
to use the code below but somehow the pool fill's up and I keep waiting
for new threads to become available...

pool = ThreadPool.new(10) # up to 10 threads
pool.process do
timeout(4) do
fetch pages, parse stuff, enc...
end
end
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
 
 
 
Roger Pack
Guest
Posts: n/a
 
      04-29-2008
google ruby forkoff [?]

On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany
<(E-Mail Removed)> wrote:
> Hey guys,
>
> I am doing some background processes in ruby, and I would like to use
> forks. But I can't figure out how I can manage them.
>
> - I want to create a pool of child processes (forks) running in the
> background, limited on a number I specify
> - Create them from a parent task but creation of new child processes
> should be blocked untill one of the child processes in the pool is done
> - I want to limit the time a child process can run (so it should quit
> after x seconds)
>
> I tried to use Threads but somehow when managing a pool of threads
> doesn't work (they get stuck while doing stuff and don't die). I tried
> to use the code below but somehow the pool fill's up and I keep waiting
> for new threads to become available...
>
> pool = ThreadPool.new(10) # up to 10 threads
> pool.process do
> timeout(4) do
> fetch pages, parse stuff, enc...
> end
> end
> --
> Posted via http://www.ruby-forum.com/.
>
>


 
Reply With Quote
 
 
 
 
Abdul-rahman Advany
Guest
Posts: n/a
 
      04-29-2008
Roger Pack wrote:
> google ruby forkoff [?]
>
> On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany


I don't have a fixed queue (I use memcache to fill it and use other
processed to get values from memcache). I don't think forkoff will work
(and I have a hard time understanding how it works. I only see one call
fork.. does that fork the thread?
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
Abdul-rahman Advany
Guest
Posts: n/a
 
      04-29-2008
Abdul-rahman Advany wrote:
> Roger Pack wrote:
>> google ruby forkoff [?]
>>
>> On Tue, Apr 29, 2008 at 3:01 PM, Abdul-rahman Advany

>
> I don't have a fixed queue (I use memcache to fill it and use other
> processed to get values from memcache). I don't think forkoff will work
> (and I have a hard time understanding how it works. I only see one call
> fork.. does that fork the thread?


Sorry, I didn't know that calling fork makes the thread become a child
process
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
John Carter
Guest
Posts: n/a
 
      04-29-2008
On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

> I tried to use Threads but somehow when managing a pool of threads
> doesn't work (they get stuck while doing stuff and don't die). I tried
> to use the code below but somehow the pool fill's up and I keep waiting
> for new threads to become available...
>
> pool = ThreadPool.new(10) # up to 10 threads
> pool.process do
> timeout(4) do
> fetch pages, parse stuff, enc...
> end
> end


Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)

require 'thread'
Thread.abort_on_exception = true

class MultiFail < Exception
attr_reader :queue

def initialize( _queue)
@queue = _queue
end
end

class MultiThread
private

def do_stuff
job = @jobs.deq
while job
job.call(Thread.current[:index])
job = @jobs.deq
end
rescue Exception => failure
@failed << failure
end

public

# Spawns a pool of _jobs worker threads
def initialize( _jobs = 1)
raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
@jobs = SizedQueue.new( 2 * _jobs)
@threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
@failed = Queue.new
end

# Run block in one of the threads
def run(&block)
raise MultiFail.new(@failed) if @failed.size > 0
@jobs.enq( block)
end

# Wait until all threads are finished doing whatever they're doing.
def join
@threads.each{|t| @jobs.enq nil}
@threads.each{|t| t.join}
raise MultiFail.new(@failed) if @failed.size > 0
end
end

if $0 == __FILE__ then
require 'test/unit'

class TC_MultiThread < Test::Unit::TestCase
def initialize(test)
super(test)
@c = 0
end

def wrap(s)
@c += s
if @c > 70
puts
@c = 0
end
end

def dot(c)
s = sprintf( '%x< ',c)
print s
wrap s.size
end

def undot(c)
s = sprintf( '>%x ',c )
print s
wrap s.size
end

def try_for(loops,threads)
puts "Trying [#{loops},#{threads}]"
i = 0
k = 0
max = 0
mutex = Mutex.new
multi_thread = MultiThread.new(threads)

loops.times do |j|
multi_thread.run do |t|
dot(t)
mutex.synchronize do
i += 1
end
sleep 1
mutex.synchronize do
assert( i <= threads)
k +=1
max = i if i > max
end
mutex.synchronize do
i -= 1
end
undot(t)
end
end
multi_thread.join
assert_equal(0, i)
assert( ((threads <= 1) || (loops <= 1)) || max > 1)
assert_equal( loops, k)
end

def test_multi
assert_raises(RuntimeError){ try_for(0,0)}
try_for(0,1)
try_for(0,2)
try_for(1,1)
try_for(2,1)
try_for(2,2)
try_for(2,100)
try_for(3,1)
try_for(3,2)
try_for(3,3)
try_for(3,100)
try_for(100,100)
end

def test_fail
multi_thread = MultiThread.new(3)

multi_thread.run do
sleep 2
end

multi_thread.run do
raise "This thread failed for test purposes"
end

assert_raises( MultiFail) do
multi_thread.run do
sleep 2
end
end

begin
multi_thread.join
rescue MultiFail => multi_fail
assert_equal( RuntimeError, multi_fail.queue.pop.class)
end
end
end

end




John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : http://www.velocityreviews.com/forums/(E-Mail Removed)
New Zealand


 
Reply With Quote
 
Abdul-rahman Advany
Guest
Posts: n/a
 
      04-29-2008
Your multithread class doesn't catch failures...
http://ruby-rails.pl/true-ruby-thread-pool
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
John Carter
Guest
Posts: n/a
 
      04-29-2008
On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

> Your multithread class doesn't catch failures...
> http://ruby-rails.pl/true-ruby-thread-pool


Contrariwise.

It does.

Of course it's a bit debatable what you want to do with a failure once
you have caught it.

Having a exception bubble up the call frames to the top level of a
generic pool worker thread is not very helpful.

Having all the tasks complete before you act on a failure is not what
I wanted either.

The gotcha is two or more failures can happen before you start
handling them in the parent thread.

So what I do is catch failues, and drop them in a list which I check
before every run / join.

If there have been any failures I throw them all in a bundle up the
parent thread.

That may not be what you want, but it makes sense to me.



John Carter Phone : (64)(3) 358 6639
Tait Electronics Fax : (64)(3) 359 4632
PO Box 1645 Christchurch Email : (E-Mail Removed)
New Zealand


 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Pool module -- does map pre-assign tasks to processes? Luca Python 1 10-05-2009 06:14 PM
os.fork and pty.fork Eric Snow Python 0 01-08-2009 06:32 AM
A problem with fork() and managing processes Michele Dondi Perl Misc 7 12-03-2004 09:33 PM
tomcat managing non-persistent session on pool of servers Vinc Java 0 10-21-2004 12:05 AM
Timeout expired. The timeout period elapsed prior to obtaining a connection from the pool. This may have occurred because all pooled connections were in use and max pool size was reached. Guoqi Zheng ASP .Net 4 06-03-2004 06:39 PM



Advertisments