Velocity Reviews - Computer Hardware Reviews

Velocity Reviews > Newsgroups > Programming > Perl > Perl Misc > How to make only one thread sleep?

Reply
Thread Tools

How to make only one thread sleep?

 
 
Ralph Moritz
Guest
Posts: n/a
 
      09-13-2006
Hi,

I've written a little multi-threaded file monitoring program. When it
starts it creates a detached "factory" thread, which loops infinitely,
creating detached worker threads. In the main thread we enter a
"monitor" loop which scans a directory for new files at intervals and
inserts them into a queue for the factory thread to extract. The
factory thread will block if the maximum number of allowed worker
threads are already running or there is no data in the queue.

This design was chosen so that the monitor loop can continue running
even if the factory thread blocks. Some problems have appeared though:

* When I call sleep() in the main thread (monitor loop), it affects
the entire process, rather than just the main thread. How do I put
just one, specific thread to sleep?

* There appears to be a memory leak somewhere. When I run the program
with perl -w I get warnings about leaked scalars. How do I track
this down?

My program is included below in it's entirety. Any advice on the
design and the code will be much appreciated.

TIA,
Ralph

 
Reply With Quote
 
 
 
 
Ralph Moritz
Guest
Posts: n/a
 
      09-13-2006
I wrote:

> My program is included below in it's entirety.


Sorry, I forgot to include the program :-] Here it is:

use strict;
use threads;
use threads::shared;
use Thread::Semaphore;
use Thread::Queue;
use File::Copy;
use Digest::MD5 qw(md5);

# ------ Fallback config values ------------------

# Path to application root directory
our $root_dir = '/home/ralph/Sources/QMSv2';

# Directory containing inbound files
our $data_dir = "$root_dir/inbound";

# Directory to put processed files
our $result_dir = "$root_dir/processed";

# Directory to put files containing errors
our $error_dir = "$root_dir/errors";

# Path to the log file
our $logfile = "$root_dir/logs/monit.log";

# Frequency to scan inbound directory (in seconds)
our $scan_interval = 60;

# File extensions to match against
our $file_ext = qr/.txt$/;

# Maximum number of worker threads to create
our $max_workers = 10;

# -----------------------------------------------

# HACK: hard-code path to config file
my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
read_config($config_file);

my $io_sem = Thread::Semaphore->new; # I/O semaphore
my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
my $queue = Thread::Queue->new; # Data queue
my %files :shared;

monitor_loop($data_dir);

sub monitor_loop {
log_msg('Entering monitor loop');
my $factory = threads->create(\&factory_loop);
$factory->detach;

while (1) {
log_msg('Woke up. Scanning for files...');

opendir(my $indir, $data_dir) or
log_msg("Error: failed to open directory $data_dir: $!")
&& exit 1;
my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
readdir($indir);
closedir($indir) or
log_msg("Error: failed to close directory $data_dir: $!")
&& exit 1;

foreach my $fn (@flist) {
open(my $in, $fn) or log_msg("Error: failed to open file $fn:
$!")
&& exit 1;
my $digest = Digest::MD5->new->addfile($in)->hexdigest;
close($in);

# Ignore files which are still being processed.
lock(%files);
unless (exists $files{$digest}) {
log_msg("Queueing file $fn...");
$files{$digest} = $fn;
$queue->enqueue($digest);
}
}

log_msg("Sleeping...");

# BUG: this puts the whole process to sleep, instead
# of just the current thread.
sleep($scan_interval);
}
}

sub factory_loop {
log_msg('Entering factory loop');
while (1) {
$th_sem->down;
my $digest = $queue->dequeue;
log_msg('Dispatching worker thread...');
my $worker = threads->create(\&process_file, $digest);
$worker->detach;
}
}

sub process_file {
my $digest = shift;
my $file = $files{$digest};
open(my $in, $file) or log_msg("Error: failed to open file $file:
$!")
&& exit 1;
log_msg("Processing file $file...");

# ---- DUMMY PROCESSING CODE -----
my $count = 0;
$count++ while (my $line = <$in>);

close $in;
if ($count) {
log_msg("OK:\t$file");
move($file, $result_dir);
} else {
log_msg("ERR:\t$file");
move($file, $error_dir);
}
# -------------------------------

lock(%files);
delete $files{$digest};
$th_sem->up;
}

sub log_msg {
$io_sem->down;
my $msg = shift;
open(my $logfile, ">> $logfile");
print $logfile localtime() . " - $msg\n";
close($logfile);
$io_sem->up;
}

sub read_config {
my $file = shift;
open(my $cfg, $file) or
warn "Warning: failed to open config file $file: $!. Using default
values instead."
&& return;
my $config = join('', <$cfg>);
eval $config;
die "Error: in config file $file: $@" if $@;
}

 
Reply With Quote
 
 
 
 
Ralph Moritz
Guest
Posts: n/a
 
      09-13-2006
A. Sinan Unur wrote:
> "Ralph Moritz" <(E-Mail Removed)> wrote in
> news:(E-Mail Removed) oups.com:
>
> >> My program is included below in it's entirety.

>
> Your program has too many external dependencies for anyone else to try
> to debug it. The need to post the shortest possible program that still
> exhibits the problem.


There are no external dependencies. I posted the entire program.

> There are oddities such as defining a subroutine warn with the name of a
> builtin. Again, I don't know if this has anything to do with the problem
> or even if it is a problem but why create complications?


I don't know what you mean. There is no definition for a subroutine
called warn() in the code I posted. ??

> As far as I can see, your factory_loop does not limit the number of
> threads it is creating.


Not explicitly, but if you look closely you'll see that it's calling
th_sem->down which decrements the semaphore's count by one.
If the count were to drop below zero, the semaphore blocks until
the count is raised again. This effectively makes factory_loop block
if the maximum number of worker threads are already running.

--
Ralph Moritz

 
Reply With Quote
 
A. Sinan Unur
Guest
Posts: n/a
 
      09-13-2006
"Ralph Moritz" <(E-Mail Removed)> wrote in
news:(E-Mail Removed) ups.com:

> A. Sinan Unur wrote:
>> "Ralph Moritz" <(E-Mail Removed)> wrote in
>> news:(E-Mail Removed) oups.com:
>>
>> >> My program is included below in it's entirety.

>>
>> Your program has too many external dependencies for anyone else to try
>> to debug it. The need to post the shortest possible program that still
>> exhibits the problem.

>
> There are no external dependencies.


Yes it does.

> I posted the entire program.


But one would need to create a config file, directories and files to be
able to run it. Reduce the program to something which still exhibits your
problem, and which others can run just by copying and pasting.


>> There are oddities such as defining a subroutine warn with the name of
>> a builtin. Again, I don't know if this has anything to do with the
>> problem or even if it is a problem but why create complications?

>
> I don't know what you mean. There is no definition for a subroutine
> called warn() in the code I posted. ?


I confused myself in an attempt to reduce your program to something that
can be run.

>> As far as I can see, your factory_loop does not limit the number of
>> threads it is creating.

>
> Not explicitly, but if you look closely you'll see that it's calling
> th_sem->down which decrements the semaphore's count by one.
> If the count were to drop below zero, the semaphore blocks until
> the count is raised again. This effectively makes factory_loop block
> if the maximum number of worker threads are already running.


It looks like my efforts have been wasted. Did you actually run the
program I posted? Does it work on your system as it does on mine? If so,
we can rule out any problems with sleep.

Now, it is your turn to do some work: Extend that program to what you
need in a step-by-step approach: Add one thing, test, add another, test.
You will find the solution to your problem. If not, you will at least
have come up with a program which others can use to help you.

Sinan


> --
> Ralph Moritz


PS: Use the correct format for the signature separator. It is "dash-dash-
space-newline"

 
Reply With Quote
 
John W. Krahn
Guest
Posts: n/a
 
      09-13-2006
Ralph Moritz wrote:
> I wrote:
>
>>My program is included below in it's entirety.

>
> Sorry, I forgot to include the program :-] Here it is:


use warnings;

> use strict;
> use threads;
> use threads::shared;
> use Thread::Semaphore;
> use Thread::Queue;
> use File::Copy;
> use Digest::MD5 qw(md5);
>
> # ------ Fallback config values ------------------
>
> # Path to application root directory
> our $root_dir = '/home/ralph/Sources/QMSv2';
>
> # Directory containing inbound files
> our $data_dir = "$root_dir/inbound";
>
> # Directory to put processed files
> our $result_dir = "$root_dir/processed";
>
> # Directory to put files containing errors
> our $error_dir = "$root_dir/errors";
>
> # Path to the log file
> our $logfile = "$root_dir/logs/monit.log";
>
> # Frequency to scan inbound directory (in seconds)
> our $scan_interval = 60;
>
> # File extensions to match against
> our $file_ext = qr/.txt$/;


our $file_ext = qr/\.txt$/;

The . will match any character, you need to escape it.


> # Maximum number of worker threads to create
> our $max_workers = 10;
>
> # -----------------------------------------------
>
> # HACK: hard-code path to config file
> my $config_file = '/home/ralph/Sources/QMSv2/config.pl';
> read_config($config_file);
>
> my $io_sem = Thread::Semaphore->new; # I/O semaphore
> my $th_sem = Thread::Semaphore->new($max_workers); # Thread semaphore
> my $queue = Thread::Queue->new; # Data queue
> my %files :shared;
>
> monitor_loop($data_dir);
>
> sub monitor_loop {
> log_msg('Entering monitor loop');
> my $factory = threads->create(\&factory_loop);
> $factory->detach;
>
> while (1) {
> log_msg('Woke up. Scanning for files...');
>
> opendir(my $indir, $data_dir) or
> log_msg("Error: failed to open directory $data_dir: $!")
> && exit 1;
> my @flist = grep { $_ = "$data_dir/$_" if /$file_ext/ }
> readdir($indir);


You probably want either:

my @flist = map /$file_ext/ ? "$data_dir/$_" : (), readdir($indir);

Or:

my @flist = map "$data_dir/$_", grep /$file_ext/, readdir($indir);


$_ is an alias to the current list element just like it is in for/foreach so
modifying $_ also propagates the changes back to the original list/array.




John
--
use Perl;
program
fulfillment
 
Reply With Quote
 
 
 
Reply

Thread Tools

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is Off
Trackbacks are On
Pingbacks are On
Refbacks are Off


Similar Threads
Thread Thread Starter Forum Replies Last Post
Re: How include a large array? Edward A. Falk C Programming 1 04-04-2013 08:07 PM
one producer thread, one consumer thread: mutex needed? smith4894@excite.com C Programming 3 09-19-2005 05:03 PM
Passing result from one thread to another thread at the end of execution sayoyo Java 3 02-16-2004 03:39 PM
Can't make one thread waiting for another while running lonelyplanet999 Java 6 11-18-2003 07:32 PM
Writing something from one thread into BufferedReader of other Thread possible ??? Peter Blatt Java 0 08-11-2003 11:54 AM



Advertisments