Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Ruby > Multi Threading

Reply
Thread Tools

Multi Threading

 
 
Sriram Varahan
Guest
Posts: n/a
 
      04-17-2009
Hello,

#*******************STARTCODE

start_time = Time.now
$count = 0
class Queue
def initialize *s # splat operator allows variable length argument
list
@mutex = Mutex.new
@queue = []
s.each { |e| @queue.push e }
end

def enq v
@queue.push v
end

def deq
@mutex.synchronize {item = @queue.shift}
end

def empty?
@mutex.synchronize{@queue.length == 0}
end

def count
@mutex.synchronize do
$count += 1
end
end
end


#*****Test

queue = Queue.new
500.times do |a|
queue.enq a
end
threads = []


# Create 5 threads which fetch values from the Q.
5.times do
threads << Thread.new {
until queue.empty?
queue.count
puts "Thread ID: #{Thread.current}.Job started"
puts "#{queue.deq}"
#sleep 0.0001
puts "Thread ID: #{Thread.current}.Job complete"
end
}
end


threads.each {|t| t.join }
puts "Count"
puts $count
puts "timeTaken:"
puts Time.now - start_time

# *************CODE ENDS******************


I have five threads which fetch values from a queue. The above code
works perfectly well in case of a single thread. But the issue arises
when there are more threads.

In case of 5 threads the number of times the block is executed is 503
where it should have been 500.

I know the reason why?
The "deq" and "empty?" methods are not synchronized.
So when the final item is removed from the thread, other threads access
the empty? method before the @queue.length becomes 0.

Hence the difference in count.

If the sleep is activated this problem is solved.

Any suggestion on how to get this working without the sleep?

Thanks.
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
 
 
 
Michael Neumann
Guest
Posts: n/a
 
      04-17-2009
Sriram Varahan wrote:

> Hello,
>
> #*******************STARTCODE
>
> start_time = Time.now
> $count = 0
> class Queue
> def initialize *s # splat operator allows variable length argument
> list
> @mutex = Mutex.new
> @queue = []
> s.each { |e| @queue.push e }
> end
>
> def enq v
> @queue.push v
> end
>
> def deq
> @mutex.synchronize {item = @queue.shift}
> end
>
> def empty?
> @mutex.synchronize{@queue.length == 0}
> end
>
> def count
> @mutex.synchronize do
> $count += 1
> end
> end
> end
>
>
> #*****Test
>
> queue = Queue.new
> 500.times do |a|
> queue.enq a
> end
> threads = []
>
>
> # Create 5 threads which fetch values from the Q.
> 5.times do
> threads << Thread.new {
> until queue.empty?
> queue.count
> puts "Thread ID: #{Thread.current}.Job started"
> puts "#{queue.deq}"
> #sleep 0.0001
> puts "Thread ID: #{Thread.current}.Job complete"
> end
> }
> end
>
>
> threads.each {|t| t.join }
> puts "Count"
> puts $count
> puts "timeTaken:"
> puts Time.now - start_time
>
> # *************CODE ENDS******************
>
>
> I have five threads which fetch values from a queue. The above code
> works perfectly well in case of a single thread. But the issue arises
> when there are more threads.
>
> In case of 5 threads the number of times the block is executed is 503
> where it should have been 500.
>
> I know the reason why?
> The "deq" and "empty?" methods are not synchronized.
> So when the final item is removed from the thread, other threads access
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?


You should also synchronize the enque operation (Queue#enq). Btw, there is
an existing Queue class that does this thread-safe:

require 'thread'
q = Queue.new
q.push 1
x = q.pop
q.pop # => would block the thread until a new element is available

q2 = SizedQueue.new(10) # bounded queue, which blocks when size > 10

Regards,

Michael



 
Reply With Quote
 
 
 
 
Robert Klemme
Guest
Posts: n/a
 
      04-17-2009
2009/4/17 Sriram Varahan <(E-Mail Removed)>:
> Hello,
>
> #*******************STARTCODE
>
> start_time =3D Time.now
> $count =3D 0
> class Queue
> =A0def initialize *s # splat operator allows variable length argument
> list
> =A0 =A0@mutex =3D Mutex.new
> =A0 =A0@queue =3D []
> =A0 =A0s.each { |e| @queue.push e }
> =A0end
>
> =A0def enq v
> =A0 =A0 =(E-Mail Removed) v
> =A0end
>
> =A0def deq
> =A0 =(E-Mail Removed) {item =3D @queue.shift}
> =A0end
>
> =A0def empty?
> =A0 =(E-Mail Removed){@queue.length =3D=3D 0}
> =A0end
>
> =A0def count
> =A0 @mutex.synchronize do
> =A0 =A0 $count +=3D 1
> =A0 end
> =A0end
> end
>
>
> #*****Test
>
> queue =3D Queue.new
> 500.times do |a|
> =A0queue.enq a
> end
> threads =3D []
>
>
> # Create 5 threads which fetch values from the Q.
> =A05.times do
> =A0 =A0threads << Thread.new {
> =A0 =A0until queue.empty?
> =A0 =A0 =A0queue.count
> =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0started"
> =A0 =A0 =A0puts "#{queue.deq}"
> =A0 =A0 =A0#sleep 0.0001
> =A0 =A0 =A0puts "Thread ID: #{Thread.current}.Job =A0complete"
> =A0 =A0end
> =A0 }
> =A0end
>
>
> threads.each {|t| t.join }
> puts "Count"
> puts $count
> puts "timeTaken:"
> puts Time.now - start_time
>
> # *************CODE ENDS******************
>
>
> I have five threads which fetch values from a queue. The above code
> works perfectly well in case of a single thread. But the issue arises
> when there are more threads.
>
> In case of 5 threads the number of times the block is executed is 503
> where it should have been 500.
>
> I know the reason why?
> The "deq" and "empty?" methods are not synchronized.
> So when the final item is removed from the thread, other threads =A0acces=

s
> the empty? method before the @queue.length becomes 0.
>
> Hence the difference in count.
>
> If the sleep is activated this problem is solved.
>
> Any suggestion on how to get this working without the sleep?


There are several options. You could use MonitorMixin instead of Mutex
and include it in initialize

def initialize *s
# @mutex =3D=3D self so you do not need to change sync code
@mutex =3D extend MonitorMixin
@queue =3D s.dup
end

Then you can do external synchronization, e.g.

queue.synchronize do
if queue.empty?
# finish
else
elm =3D deq
end
end

Much better though is this approach

require 'thread'

# use library class
queue =3D Queue.new

# _first_ start threads
# does not really matter but if filling
# the queue takes time work can
# begin immediately
threads =3D (1..5).map do
label =3D Thread.current.to_s.freeze

Thread.new queue do |q|
until ( job =3D q.deq ) =3D=3D :terminate
puts "Thread ID: #{label}.Job started"
puts job
puts "Thread ID: #{label}.Job complete"
end
end
end

# fill queues
500.times do |a|
queue.enq a
end

# "close" queues
threads.size.times { queue.enq :terminate }

# wait for termination
threads.each do |th|
th.join
end

Cheers

robert

--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

 
Reply With Quote
 
Sean O'Halpin
Guest
Posts: n/a
 
      04-17-2009
On Fri, Apr 17, 2009 at 7:55 AM, Robert Klemme
<(E-Mail Removed)> wrote:
>
> Much better though is this approach
>
> require 'thread'
>
> # use library class
> queue =3D Queue.new
>
> # _first_ start threads
> # does not really matter but if filling
> # the queue takes time work can
> # begin immediately
> threads =3D (1..5).map do
> =A0label =3D Thread.current.to_s.freeze
>
> =A0Thread.new queue do |q|
> =A0 =A0until ( job =3D q.deq ) =3D=3D :terminate
> =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0started"
> =A0 =A0 =A0puts job
> =A0 =A0 =A0puts "Thread ID: #{label}.Job =A0complete"
> =A0 =A0end
> =A0end
> end
>
> # fill queues
> 500.times do |a|
> =A0queue.enq a
> end
>
> # "close" queues
> threads.size.times { queue.enq :terminate }
>
> # wait for termination
> threads.each do |th|
> =A0th.join
> end
>
> Cheers
>
> robert
>
> --
> remember.guy do |as, often| as.you_can - without end
> http://blog.rubybestpractices.com/
>
>


Minor nitpick - these lines should be reversed:

> label =3D Thread.current.to_s.freeze
> Thread.new queue do |q|


i.e.

> Thread.new queue do |q|
> label =3D Thread.current.to_s.freeze


Regards,
Sean

 
Reply With Quote
 
Sriram Varahan
Guest
Posts: n/a
 
      04-17-2009
Hey Robert,

That was an amazing solution!Thanks a million


Thank you Michael and Sean for your time
--
Posted via http://www.ruby-forum.com/.

 
Reply With Quote
 
Robert Klemme
Guest
Posts: n/a
 
      04-17-2009
2009/4/17 Sean O'Halpin <(E-Mail Removed)>:

> Minor nitpick - these lines should be reversed:
>
>> =A0label =3D Thread.current.to_s.freeze
>> =A0Thread.new queue do |q|

>
> i.e.
>
>> =A0Thread.new queue do |q|
>> =A0 =A0label =3D Thread.current.to_s.freeze


Oh yes, absolutely! Apparently I moved the line too high. Thanks for
catching that gotcha, Sean!

Kind regards

robert

--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/

 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Terminating a Python program that uses multi-process, multi-threading akineko Python 3 01-29-2009 02:26 PM
Re: threading in PyQt vs threading in standard library Steven Woody Python 0 01-09-2009 07:48 AM
threading in PyQt vs threading in standard library Steven Woody Python 0 01-09-2009 07:15 AM
Cooperative threading preemptive threading - a bit confused failure_to@yahoo.co.uk Java 9 12-29-2007 01:10 AM
multi threading in multi processor (computer) ajikoe@gmail.com Python 38 02-15-2005 05:01 PM



Advertisments