Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Ruby > Non-blocking communication between Ruby processes

Reply
Thread Tools

Non-blocking communication between Ruby processes

 
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
Hi, I run Unicorn which is a Rack http server using N forked worker process=
es.=20
I need the following:

=2D When a worker processes a HTTP request it must notify some data to othe=
r=20
independent Ruby process XXX (different than Unicorn).

=2D This communication must be non-blocking, this is, the Unicorn worker pr=
ocess=20
sends the notification and doesn't wait for response from the process XXX, =
so=20
the Unicorn worker can, at the moment, generate the HTTP response and send=
=20
back to the client, getting free to handle new HTTP requests.

=2D The ruby process XXX should use some kind of queue system to store=20
notifications and handle them. In fact, it should take them periodically an=
d=20
send via TCP (but not HTTP) to other server.


Which is the best approach to design such communication? perhaps using=20
something as EventMachine for the XXX process and Unix/TCP socket=20
communication between Unicorn processes and XXX process? any other alternat=
ive=20
or suggestion?

Thanks a lot.

=2D-=20
I=C3=B1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
 
 
 
Robert Klemme
Guest
Posts: n/a
 
      01-07-2010
On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
> Hi, I run Unicorn which is a Rack http server using N forked worker processes.
> I need the following:
>
> - When a worker processes a HTTP request it must notify some data to other
> independent Ruby process XXX (different than Unicorn).
>
> - This communication must be non-blocking, this is, the Unicorn worker process
> sends the notification and doesn't wait for response from the process XXX, so
> the Unicorn worker can, at the moment, generate the HTTP response and send
> back to the client, getting free to handle new HTTP requests.
>
> - The ruby process XXX should use some kind of queue system to store
> notifications and handle them. In fact, it should take them periodically and
> send via TCP (but not HTTP) to other server.
>
>
> Which is the best approach to design such communication? perhaps using
> something as EventMachine for the XXX process and Unix/TCP socket
> communication between Unicorn processes and XXX process? any other alternative
> or suggestion?
>
> Thanks a lot.


I would probably first try a simple setup: make process XXX publish a
Queue via DRb on a well known port and have one or more threads fetching
from the queue and processing data. If you fear resource exhaustion,
you can make the queue size limited. E.g.:

x.rb server
c.rb client


robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

QUEUE_SIZE = 1024
THREAD_COUNT = 5
URI="druby://localhost:8787"

QUEUE = SizedQueue.new QUEUE_SIZE

threads = (1..THREAD_COUNT).map do
Thread.new do
while msg = QUEUE.deq
p msg
end
end
end

DRb.start_service(URI, QUEUE)
DRb.thread.join

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

QUEUE = DRbObject.new_with_uri(SERVER_URI)

10.times do |i|
puts Benchmark.times do
QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
end
end
robert@fussel:~$

Of course you can as well use a named pipe for the communication. But
then demarcation of message boundaries might be more difficult etc.

Kind regards

robert


--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/
 
Reply With Quote
 
 
 
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
> On 01/07/2010 02:18 PM, I=F1aki Baz Castillo wrote:
> > Hi, I run Unicorn which is a Rack http server using N forked worker
> > processes. I need the following:
> >
> > - When a worker processes a HTTP request it must notify some data to
> > other independent Ruby process XXX (different than Unicorn).
> >
> > - This communication must be non-blocking, this is, the Unicorn worker
> > process sends the notification and doesn't wait for response from the
> > process XXX, so the Unicorn worker can, at the moment, generate the HTTP
> > response and send back to the client, getting free to handle new HTTP
> > requests.
> >
> > - The ruby process XXX should use some kind of queue system to store
> > notifications and handle them. In fact, it should take them periodically
> > and send via TCP (but not HTTP) to other server.
> >
> >
> > Which is the best approach to design such communication? perhaps using
> > something as EventMachine for the XXX process and Unix/TCP socket
> > communication between Unicorn processes and XXX process? any other
> > alternative or suggestion?
> >
> > Thanks a lot.

>=20
> I would probably first try a simple setup: make process XXX publish a
> Queue via DRb on a well known port and have one or more threads fetching
> from the queue and processing data. If you fear resource exhaustion,
> you can make the queue size limited. E.g.:
>=20
> x.rb server
> c.rb client
>=20
>=20
> robert@fussel:~$ cat x.rb
> #!/usr/local/bin/ruby19
>=20
> require 'thread'
> require 'drb'
>=20
> QUEUE_SIZE =3D 1024
> THREAD_COUNT =3D 5
> URI=3D"druby://localhost:8787"
>=20
> QUEUE =3D SizedQueue.new QUEUE_SIZE
>=20
> threads =3D (1..THREAD_COUNT).map do
> Thread.new do
> while msg =3D QUEUE.deq
> p msg
> end
> end
> end
>=20
> DRb.start_service(URI, QUEUE)
> DRb.thread.join
>=20
> robert@fussel:~$ cat c.rb
> #!/usr/local/bin/ruby19
>=20
> require 'drb/drb'
> require 'benchmark'
>=20
> SERVER_URI=3D"druby://localhost:8787"
>=20
> QUEUE =3D DRbObject.new_with_uri(SERVER_URI)
>=20
> 10.times do |i|
> puts Benchmark.times do
> QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
> end
> end
> robert@fussel:~$
>=20
> Of course you can as well use a named pipe for the communication. But
> then demarcation of message boundaries might be more difficult etc.


Really thanks a lot.
just a question: is it DRb good enough for performance?


=2D-=20
I=F1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
Robert Klemme
Guest
Posts: n/a
 
      01-07-2010
On 01/07/2010 03:07 PM, Iñaki Baz Castillo wrote:
> El Jueves, 7 de Enero de 2010, Robert Klemme escribió:
>> On 01/07/2010 02:18 PM, Iñaki Baz Castillo wrote:
>>> Hi, I run Unicorn which is a Rack http server using N forked worker
>>> processes. I need the following:
>>>
>>> - When a worker processes a HTTP request it must notify some data to
>>> other independent Ruby process XXX (different than Unicorn).
>>>
>>> - This communication must be non-blocking, this is, the Unicorn worker
>>> process sends the notification and doesn't wait for response from the
>>> process XXX, so the Unicorn worker can, at the moment, generate the HTTP
>>> response and send back to the client, getting free to handle new HTTP
>>> requests.
>>>
>>> - The ruby process XXX should use some kind of queue system to store
>>> notifications and handle them. In fact, it should take them periodically
>>> and send via TCP (but not HTTP) to other server.
>>>
>>>
>>> Which is the best approach to design such communication? perhaps using
>>> something as EventMachine for the XXX process and Unix/TCP socket
>>> communication between Unicorn processes and XXX process? any other
>>> alternative or suggestion?
>>>
>>> Thanks a lot.

>> I would probably first try a simple setup: make process XXX publish a
>> Queue via DRb on a well known port and have one or more threads fetching
>> from the queue and processing data. If you fear resource exhaustion,
>> you can make the queue size limited. E.g.:
>>
>> x.rb server
>> c.rb client
>>
>>
>> robert@fussel:~$ cat x.rb
>> #!/usr/local/bin/ruby19
>>
>> require 'thread'
>> require 'drb'
>>
>> QUEUE_SIZE = 1024
>> THREAD_COUNT = 5
>> URI="druby://localhost:8787"
>>
>> QUEUE = SizedQueue.new QUEUE_SIZE
>>
>> threads = (1..THREAD_COUNT).map do
>> Thread.new do
>> while msg = QUEUE.deq
>> p msg
>> end
>> end
>> end
>>
>> DRb.start_service(URI, QUEUE)
>> DRb.thread.join
>>
>> robert@fussel:~$ cat c.rb
>> #!/usr/local/bin/ruby19
>>
>> require 'drb/drb'
>> require 'benchmark'
>>
>> SERVER_URI="druby://localhost:8787"
>>
>> QUEUE = DRbObject.new_with_uri(SERVER_URI)
>>
>> 10.times do |i|
>> puts Benchmark.times do
>> QUEUE.enq(sprintf("msg %4d at %-20s", i, Time.now))
>> end
>> end
>> robert@fussel:~$
>>
>> Of course you can as well use a named pipe for the communication. But
>> then demarcation of message boundaries might be more difficult etc.

>
> Really thanks a lot.
> just a question: is it DRb good enough for performance?


I don't know about your requirements. Just try it out - you can start
multiple clients and vary the number of threads and the queue size in
the server at will. To me it seemed pretty fast. I did

$ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done

and message came really fast. Also note that each client prints timing
so you can see how fast it is on your machine.

If you need more performance then I'm sure you'll find a Ruby binding to
any of the queuing framework like GNU Queue, NQS and whatnot. But I'd
start with the simple DRb based solution. It's easily done, you have
everything you need and do not need to install extra software, not even
gems.

I just notice, there was a bug in my code: I used Benchmark.times which
prints timings of the current process. What I meant was
Benchmark.measure. I have changed the code a bit so you can easy
experiment with queue ssizes, thread counts and message counts (see below).

With this command line

t=10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq
1 $t`; do wait; done; cat cl-*

I get pretty good timings of 7.6ms / msg with unlimited Queue size and
default thread count (5) for this unrealistic test that the queue is
hammered.

Kind regards

robert

Modified code:

robert@fussel:~$ cat x.rb
#!/usr/local/bin/ruby19

require 'thread'
require 'drb'

THREAD_COUNT = (ARGV.shift || 5).to_i
QUEUE_SIZE = ARGV.shift

printf "%4d threads, queue size=%p\n", THREAD_COUNT, QUEUE_SIZE

URI="druby://localhost:8787"

Thread.abort_on_exception = true

QUEUE = QUEUE_SIZE ? SizedQueue.new(QUEUE_SIZE.to_i) : Queue.new
# QUEUE.extend DRb:RbUndumped

threads = (1..THREAD_COUNT).map do |i|
Thread.new i do |id|
while msg = QUEUE.deq
printf "thread %2d: %p\n", id, msg
end
end
end

DRb.start_service(URI, QUEUE)
puts 'Started'
DRb.thread.join
puts 'Returned'
threads.each {|th| th.join rescue nil}
puts 'Done'

robert@fussel:~$

robert@fussel:~$ cat c.rb
#!/usr/local/bin/ruby19

require 'drb/drb'
require 'benchmark'

SERVER_URI="druby://localhost:8787"

rep = (ARGV.shift || 20).to_i

QUEUE = DRb:RbObject.new_with_uri(SERVER_URI)

QUEUE.enq "Started client"

Benchmark.bm 20 do |b|
b.report "client %4d" % $$ do
rep.times do |i|
QUEUE.enq(sprintf("client %4d msg %4d at %-20s", $$, i, Time.now))
end
end
end

QUEUE.enq "Stopped client"

robert@fussel:~$


--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/
 
Reply With Quote
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
> I don't know about your requirements. Just try it out - you can start=20
> multiple clients and vary the number of threads and the queue size in=20
> the server at will. To me it seemed pretty fast. I did
>=20
> $ for i in 1 2 3 4 5 6 7 8 9 10; do ./c.rb & done
>=20
> and message came really fast. Also note that each client prints timing=20
> so you can see how fast it is on your machine.
>=20
> If you need more performance then I'm sure you'll find a Ruby binding to=

=20
> any of the queuing framework like GNU Queue, NQS and whatnot. But I'd=20
> start with the simple DRb based solution. It's easily done, you have=20
> everything you need and do not need to install extra software, not even=20
> gems.


Thanks a lot. I've tryed a code similar to this one:
http://www.idle-hacking.com/2007/11/...communication/

It uses a pipe file (of course there is no queue at all).

Well, sending 100000 strings (with a loop) it takes 2-3 seconds to receive =
and=20
print all the received data.
however using the DRb solution it just didn't finish (I had to interrupt th=
e=20
process after 30 seconds due to CPU usage).

I'd like a simple solution. Using DRb could be nice. However using a pipe f=
ile=20
seems simpler and faster. The doubt I have now is about how secure is a pip=
e.=20
Could it leak memory if some process die or the reader process is not so fa=
st=20
to handle the received data?





> I just notice, there was a bug in my code: I used Benchmark.times which=20
> prints timings of the current process. What I meant was=20
> Benchmark.measure. I have changed the code a bit so you can easy=20
> experiment with queue ssizes, thread counts and message counts (see below=

).
>=20
> With this command line
>=20
> t=3D10;for i in `seq 1 $t`; do ./c.rb 10000 >"cl-$i"& done; for i in `seq=

=20
> 1 $t`; do wait; done; cat cl-*
>=20
> I get pretty good timings of 7.6ms / msg with unlimited Queue size and=20
> default thread count (5) for this unrealistic test that the queue is=20
> hammered.

=20
Really thanks a lot, I'll try it.



=2D-=20
I=F1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
> The doubt I have now is about how secure is a pipe.=20
> Could it leak memory if some process die or the reader process is not so
> fast to handle the received data?


Hummm, I have a reader process and a writer process.
The wirter process writes into the pipe file.
If I kill the reader process then the writer process remains writting in th=
e=20
pipe and the data is stored (in the filesystem?).

So there is the leaking problem... I must investigate it a bit more...

Thanks a lot.


=2D-=20
I=F1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
Phillip Gawlowski
Guest
Posts: n/a
 
      01-07-2010
On 07.01.2010 18:58, Iñaki Baz Castillo wrote:

> Hummm, I have a reader process and a writer process.
> The wirter process writes into the pipe file.
> If I kill the reader process then the writer process remains writting in the
> pipe and the data is stored (in the filesystem?).
>
> So there is the leaking problem... I must investigate it a bit more...


pipe.write unless pipe.full?

i.e. check if your pipe hits a set limit on disk, and generate an
exception if the pipe_file reaches (or is close to reaching) the limit.

You could then buffer the data to be written until an additional (or
new) reading thread has started.

--
Phillip Gawlowski

 
Reply With Quote
 
Robert Klemme
Guest
Posts: n/a
 
      01-07-2010
On 01/07/2010 06:58 PM, Iñaki Baz Castillo wrote:
> El Jueves, 7 de Enero de 2010, Iñaki Baz Castillo escribió:
>> The doubt I have now is about how secure is a pipe.
>> Could it leak memory if some process die or the reader process is not so
>> fast to handle the received data?

>
> Hummm, I have a reader process and a writer process.


I thought you have multiple writers. Didn't you mention multiple forked
Rack handlers?

> The wirter process writes into the pipe file.
> If I kill the reader process then the writer process remains writting in the
> pipe and the data is stored (in the filesystem?).
>
> So there is the leaking problem...


Not exactly: the writer is blocked. You can try this out:

robert@fussel:~$ mkfifo ff
robert@fussel:~$ ls -lF ff
prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
^Z
[1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ wc ff &
[2] 14036
robert@fussel:~$ %1
ruby19 -e 'puts("+"*10_000)' > ff
robert@fussel:~$ 1 1 10001 ff

[2]+ Done wc ff
robert@fussel:~$ jobs
robert@fussel:~$

At the point where I pressed Ctrl-Z the writer hung because the pipe was
full. (The size of a pipe is usually the memory page size of the OS
IIRC, this would be 4k in case of Linux 32 bit).

> I must investigate it a bit more...


I'd personally prefer to use the DRb approach because then you can
actually send typed messages, i.e. whatever information you need. Also,
it was fun to play around with those small test programs. And you
can have the reader run on any machine in the network.

Whatever you do, you have to decide how to go about the situation when
the reader goes away - for whatever reasons. You could write your
messages to a file and use an approach like "tail -f" uses to read them.
But this has the nasty effect of clobbering the file system plus if
the reader goes away the file might grow arbitrary large. And you have
locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
preferrable IMHO. The you can still decide in the client what you do if
you cannot get rid of the message.

> Thanks a lot.


You're welcome.

Kind regards

robert

--
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/
 
Reply With Quote
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
> El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
> > The doubt I have now is about how secure is a pipe.
> > Could it leak memory if some process die or the reader process is not so
> > fast to handle the received data?

>=20
> Hummm, I have a reader process and a writer process.
> The wirter process writes into the pipe file.
> If I kill the reader process then the writer process remains writting in
> the pipe and the data is stored (in the filesystem?).
>=20
> So there is the leaking problem... I must investigate it a bit more...


Ok, the fifo remains working at SO level so it can receive messages after s=
ome=20
SO buffer capability is filled. Then the writer process blocks when trying =
to=20
"flush" the data.
=46ortunatelly it just blocks as Ruby thread level so other thread can work.

=2D-=20
I=F1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
Iñaki Baz Castillo
Guest
Posts: n/a
 
      01-07-2010
El Jueves, 7 de Enero de 2010, Robert Klemme escribi=F3:
> On 01/07/2010 06:58 PM, I=F1aki Baz Castillo wrote:
> > El Jueves, 7 de Enero de 2010, I=F1aki Baz Castillo escribi=F3:
> >> The doubt I have now is about how secure is a pipe.
> >> Could it leak memory if some process die or the reader process is not =

so
> >> fast to handle the received data?

> >
> > Hummm, I have a reader process and a writer process.

>=20
> I thought you have multiple writers. Didn't you mention multiple forked
> Rack handlers?


Yes, that's true. Sure I'll get into problems when writting in the FIFO fro=
m=20
varios clients at the same time
But for that I could generate so many fifo's as Rack workers...



> > The wirter process writes into the pipe file.
> > If I kill the reader process then the writer process remains writting in
> > the pipe and the data is stored (in the filesystem?).
> >
> > So there is the leaking problem...

>=20
> Not exactly: the writer is blocked. You can try this out:
>=20
> robert@fussel:~$ mkfifo ff
> robert@fussel:~$ ls -lF ff
> prw-r--r-- 1 robert robert 0 2010-01-07 19:25 ff|
> robert@fussel:~$ ruby19 -e 'puts("+"*10_000)' > ff
> ^Z
> [1]+ Stopped ruby19 -e 'puts("+"*10_000)' > ff
> robert@fussel:~$ wc ff &
> [2] 14036
> robert@fussel:~$ %1
> ruby19 -e 'puts("+"*10_000)' > ff
> robert@fussel:~$ 1 1 10001 ff
>=20
> [2]+ Done wc ff
> robert@fussel:~$ jobs
> robert@fussel:~$
>=20
> At the point where I pressed Ctrl-Z the writer hung because the pipe was
> full. (The size of a pipe is usually the memory page size of the OS
> IIRC, this would be 4k in case of Linux 32 bit).
>=20
> > I must investigate it a bit more...

>=20
> I'd personally prefer to use the DRb approach because then you can
> actually send typed messages, i.e. whatever information you need. Also,
> it was fun to play around with those small test programs. And you
> can have the reader run on any machine in the network.
>=20
> Whatever you do, you have to decide how to go about the situation when
> the reader goes away - for whatever reasons.


It's realtime info so if the reader dies then it's not so important to reco=
ver=20
that information when starting again. Well, it would be nice to recover it=
=20
just for 5-10 minutes, but no more.



> You could write your
> messages to a file and use an approach like "tail -f" uses to read them.
> But this has the nasty effect of clobbering the file system plus if
> the reader goes away the file might grow arbitrary large. And you have
> locking issues. Using any in memory pipe (e.g. mkfifo or via DRb) is
> preferrable IMHO. The you can still decide in the client what you do if
> you cannot get rid of the message.


Yes, I must think a bit aobut it

Thanks a lot for your help.


=2D-=20
I=F1aki Baz Castillo <(E-Mail Removed)>

 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Controlling processes and what to "feed" other processes Marc Heiler Ruby 1 05-24-2009 05:37 PM
Communication between 2 ruby programs Julien Genestoux Ruby 7 07-11-2008 05:03 AM
Two processes with communication through a signal. jumpz VHDL 5 06-03-2008 02:53 AM
communication between processes john VHDL 10 11-30-2004 09:59 AM
How do I: Main thread spawn child threads, which child processes...control those child processes? Jeff Rodriguez C Programming 23 12-09-2003 11:06 PM



Advertisments