On Sep 18, 10:43 am, MenTaLguY <men...@rydia.net> wrote:
> On Wed, 19 Sep 2007 01:40:08 +0900, castillo.br...@gmail.com wrote:
> > It seems to me like there is something wrong with the latest stable
> > version of ruby.
>
> 1.8.6-p0 is broken, but it is not the most recent stable release.
>
> The latest stable release is 1.8.6-p36.
>
> -mental
Even though 1.8.6-p0 was not working with Mutex/ConditionVariable, I
changed the code to use the Monitor class. I did not receive the
error even in 1.8.6-p0.
I found this post about the differences between Mutex and Monitor.
http://groups.google.com/group/comp....12d7952937abd8
I think I will stick to using Monitor for future code.
Here is the code that works on 1.8.6-p0:
require 'thread'
require 'monitor'
class ThreadPool
class PoolStopped < Exception; end
def initialize(thread_size=10, queue_size=100)
@mutex = Monitor.new
@cv = @mutex.new_cond
@queue = []
@max_queue_size = queue_size
@threads = []
@stopped = false
thread_size.times { @threads << Thread.new { start_worker } }
end
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
def push_task(task)
@mutex.synchronize do
raise PoolStopped.new if @stopped
@cv.wait_while { @max_queue_size > 0 && @queue.size >=
@max_queue_size }
@queue.push(task)
@cv.broadcast
end
task
end
def pop_task
task = nil
@mutex.synchronize do
@cv.wait_while { @queue.size == 0 }
task = @queue.shift
@cv.broadcast
end
task
end
def shutdown
@mutex.synchronize do
@stopped = true
@threads.each { @queue.push(:stop) }
@cv.broadcast
end
@threads.each { |thread| thread.join }
end
def start_worker
while true
task = pop_task
return if task == :stop
task.execute
end
end
# wait for current work to complete
def sync
tasks = @mutex.synchronize { @queue.dup }
tasks.each { |task| task.join }
end
class Task
attr_reader :result, :exception
def initialize(*args, &callback)
@args = args
@callback = callback
@done = false
@result = nil
@exception = nil
@mutex = Monitor.new
@cv = @mutex.new_cond
end
def execute
begin
@result = @callback.call(*@args)
rescue Exception => e
@exception = e
STDERR.puts "Error in thread #{Thread.current} - #{e}"
e.backtrace.each { |element| STDERR.puts(element) }
end
@mutex.synchronize do
@done = true
@cv.broadcast
end
end
def join
@mutex.synchronize { @cv.wait_until { @done } }
end
end
end
tasks = []
tp = ThreadPool.new(10, 1000)
sleep(1)
100.times do |id|
STDERR.puts "adding work"
tasks << tp.add_work do
puts "Running #{id} #{Thread.current}"
sleep 5
puts "Ending #{id} #{Thread.current}"
end
end
puts "Waiting for shutdown"
tp.shutdown
puts "done"