Without delving too deep into your problem I think you could give a
try to my attached module PreforkAgent.pm.
It was made for very much the same task - stress testing a website in
fact.
You could probably use the module in your script without any
adaptations. Just follow the pod instructions on how to set up your
callback routines.
In contrast to your current concept the specified number of children
is created only once before the actual work starts. And each returning
child is immediantly given the next task as long as there are any
more.
Under Linux the Module works like a charm with up to 500 children.
Because the overhead for the forks is all done beforehand it's
possible to create a heavy load on a target web (or any other) server
with only moderate local means.
If you want to put a cap on the load, you will need to do so in your
wrapper script.
Because I never tried it in Windows I'd be delighted to hear how you
fare.
Cheers, Steffen
#
# PreforkAgent
#
# Allows execution of many jobs in parallel.
# All parent / child communication is implemented with pipes.
# Only the signal INT is caught by the parent for cleanup purposes.
#
# Steffen Heinrich - Jun 2007
#
package PreforkAgent;
use strict;
my $VERSION = '0.03';
#################################################
# libs and class vars
use IO::Select;
my $EOF_MSG_SEQ = "\x1F"; # ASCII cotrol character US (Unit Separator)
############################
# constructor
sub new {
my $class = shift;
my $me = bless {
debug_out => 0,
parent => $$,
listener => IO::Select->new(),
kids_to_spawn => 0,
kids => 0,
living => {},
pids => {},
jobs => {},
child_prepare => sub {1}
}, $class;
$me
}
############################
# methods
sub register {
# registers one or more callback routines with the agent
my $self = shift() or return;
my %subs = @_ or return;
my @errors;
while (my ($s, $c) = each %subs) {
if ($s !~ /^(child_prepare|fetch_next_task|process_job|
process_response)$/) {
push @errors, "'$s' is not a known sub";
} elsif (defined($c) && ref($c) && ref($c) =~ /CODE/) {
$self->{$s} = $c;
} else {
push @errors, "'$s' does not reference a sub";
}
}
die join("\n", @errors)."\n" if @errors;
}
sub spawn {
# creates a given number of children
# and opens bidrectional pipes for each
my $self = shift() or return;
my $kids_to_spawn = shift() or return;
$self->{kids_to_spawn} = $kids_to_spawn;
my $process_job = $self->{process_job}
or die "process_job() must have been registered with PreforkAgent
before a call to spawn()!\n";
my $sel = $self->{listener};
# prevent zombies since we won't wait()
$SIG{CHLD} = 'IGNORE';
# fork loop
for my $child (1..$kids_to_spawn) {
my $whdl = 'W'.$child;
my $rhdl = 'R'.$child;
{ no strict 'refs';
# open bidirect comm
pipe $rhdl, WH or die "pipe1: $!"; # parent <- child
pipe RH, $whdl or die "pipe2: $!"; # child <- parent
# register the read handle with the ones to listen to
$sel->add(\*$rhdl);
}
# save write handle connected with readhandle
$self->{living}{$child} = $whdl;
select((select(WH), $| = 1)[0]); # autoflush
select((select($whdl), $| = 1)[0]); # autoflush
my $pid;
unless ($pid = fork()) {
# Child process
# closes unnecessary handles
close $rhdl;
close $whdl;
# execute individual initialization
my $init = $self->{child_prepare};
defined(&$init($child)) or die;
# creates a new listener
$sel = IO::Select->new;
# registers the one handle to listen to
$sel->add(\*RH);
# signals readiness
_write_into_pipe(\*WH, 'READY');
while ($sel->can_read) {
my $job = _read_from_pipe(\*RH);
if ($job eq 'QUIT') {
last;
} else {
# do something
my $answer = &$process_job($job, $child);
_write_into_pipe(\*WH, $answer);
}
}
# child is done, unload and quit
$sel->remove(\*RH);
close WH;
close RH;
exit 0;
}
# Parent process closes unnecessary handles
close WH;
close RH;
# and registers child
$self->{pids}{$child} = $pid;
# parent catches SIGINT
$SIG{INT} = sub {$self->cleanup()}
unless $self->{kids}++;
} # loop to start others
($self->{kids} == $self->{kids_to_spawn})
or die "Could only spawn $self->{kids} kids of $self-
>{kids_to_spawn}: $!";
$self->{kids}
} # end of spawn()
sub assign {
# Sending out jobs to any child which is ready to listen
# and collecting any responses which are then being reported to the
registered callback.
# As long as their are more jobs to do, they are being immediately
assigned to returning children.
my $self = shift() or return;
my $fetch_next_task = $self->{fetch_next_task};
my $process_response = $self->{process_response};
($fetch_next_task && $process_response)
or die "fetch_next_task() and process_response() must have been
registered with PreforkAgent before a call to assign()!\n";
$self->{kids} > 0
or die "You need to call spawn(kids) before assigning jobs!\n";
my $sel = $self->{listener};
# work loop
while ($self->{kids}) {
my $not_finished = 1;
while (my @ready = $sel->can_read) {
foreach my $rhdl (@ready) {
my $child = '';
{ no strict 'refs';
*{$rhdl} =~ /^(.+:

?R(\d+)$/
and $child = $2;
}
my $whdl = $self->{living}{$child};
my $response = _read_from_pipe($rhdl);
unless ($response eq 'READY') {
&$process_response($response, $child);
}
# assign next task
my $task = undef;
$not_finished = $not_finished && defined($task = &
$fetch_next_task($child));
if ($child && $whdl && $not_finished) {
_write_into_pipe($whdl, $task);
$self->{jobs}{$child}++ if $self->{debug_out};
} else {
# tell child to exit
_write_into_pipe($whdl, 'QUIT');
# unregister child
$sel->remove($rhdl);
delete $self->{living}{$child};
# close handles to child
close $rhdl;
close $whdl;
$self->{kids}--;
}
}
}
}
# since all children exited and we set SIGCHLD = IGNORE
# we don't have to wait()
if ($self->{debug_out}) {
my $job_cnt = $self->{jobs};
foreach my $child (sort {$job_cnt->{$b} <=> $job_cnt->{$a}} keys %
$job_cnt) {
printf "%4s: %5d\n", $child, $job_cnt->{$child};
}
}
$self->cleanup();
} # end of assign()
############################
# subroutines
sub _read_from_pipe {
my ($fh) = @_;
my $blksize = (stat $fh)[11] || 16384;
my $offset = 0;
my $buf = '';
while (my $len = sysread($fh, $buf, $blksize, $offset)) {
if (!defined $len) {
next if $! =~ /^Interrupted/;
die "System Read Error: $!\n";
}
$offset += $len;
last if $buf =~ s/$EOF_MSG_SEQ$//o;
}
$buf
}
sub _write_into_pipe {
my ($fh, $msg) = @_;
$msg .= $EOF_MSG_SEQ;
my $length = length($msg);
my $blksize = (stat $fh)[11] || 16384;
my $offset = 0;
while ($length) {
my $len = syswrite($fh, $msg, $blksize, $offset);
die "System Write Error: $!\n"
unless defined $len;
$length -= $len;
$offset += $len;
}
$offset
}
sub cleanup {
my $self = shift() or return;
# only for parent
return unless $self->{parent} == $$;
# print "\$kids = $self->{kids}\n";
# print "\%living count = ", scalar(keys %{$self->{living}}), "\n";
# print "select bitmap = '", defined $self->{listener}->bits()?
(unpack 'b*', $self->{listener}->bits()):'', "'\n";
while (my ($kid, $pid) = each %{$self->{pids}}) {
my $ps = `ps $pid`;
if ($ps =~ /$0\b/so) {
print STDERR "killing $pid\n";
`kill $pid`;
}
delete $self->{pids}{$kid};
}
}
1
__END__
=pod
=head1 NAME
PreforkAgent - A dispatch wrapper for simultanous tasks.
=head1 PURPOSE
Any big number of similar tasks that have to be run in parallel with
outmost throughput.
First, a given number of children is spawned. Then each of them is
handed the next task
from a common queue in succsession as they return with a response.
=head1 SYNOPSIS
use PreforkAgent;
my $pfa = PreforkAgent->new or die;
$pfa->register(
child_prepare => \&individual_init, # this sub is optional
fetch_next_task => \&next_job,
process_job => \&dispatch,
process_response => \&collect_response
);
my $GLOBAL_VAR = "fancy value";
my $kid_count = $pfa->spawn(5) or die;
$pfa->assign();
exit;
sub individual_init {
# child will die if false is returned
# this sub is optional
my $child_id = shift() or return;
# Child context:
# can read $GLOBAL_VAR at time of spawn(), but not change
# any initialization to be done by each child goes here
...
return $success;
}
sub next_job {
# must return a string, which will be subsequently passed to
dispatch()
# and MUST return undef, if finished.
# Enables the main program to tie a certain job to a specific
child's response returned in collect_response().
my $child_id = shift;
# Parent context:
# can read AND write $GLOBAL_VAR
...
return $str_job;
}
sub dispatch {
# defines the parallel task for the children
# processes the job and returns a serialized response
my $str_job = shift() or return;
my $child_id = shift() or return;
# Child context:
# can read $GLOBAL_VAR at time of spawn(), but not change
...
return $str_response;
}
sub collect_response {
# allows the main program to evaluate any of the child's reponses
my $response = shift() or return;
my $child_id = shift() or return;
# Parent context:
# can read AND write $GLOBAL_VAR
...
}
=head1 SEE ALSO
ParallelUserAgent by Marc Langheinrich
=head1 VERSION
This document describes version 0.03.
=head1 LICENSE
Copyright (C) 2007, Steffen Heinrich. All rights reserved.
This module is free software;
you can redistribute it and/or modify it under the same terms as Perl
itself.
=cut