#!/usr/local/bin/perl
.include Perl.pro

.revised 'threads.pro  2007-10-09  Mark Senn  http://engineering.purdue.edu/~mark'
.created 'threads.pro  2007-09-24  Mark Senn  http://engineering.purdue.edu/~mark'

.index run specifed shell commmands in threads

.synopsis
    threads -c command [-h] [-t number_of_threads] argfile
.esynopsis

use Config;
use threads;
use Thread::Queue;



#
#  Status
#
#  Show current status of what is running.
#

sub Status
{
    print "now running (maxthreads is $maxthreads):\n";
    foreach $th (threads->list())
    {
        $tid = $th->tid();
        printf "  %${len}d/$n  %s\n", $index[$tid] + 1, ($show eq 'a') ? $arg[$index[$tid]] : $command[$index[$tid]];
    }
    return 0;
}
        


#
#  Sub
#
#  Start a command in a thead.
#

sub Sub
{
    my ($i, $q) = @_;
    my $command = $command[$i];
    system $command;
    $q->enqueue(threads->self->tid);
    return 0;
}



.usage
    c:hs:t: 1 1
    usage: $0 -c command [-h] [-t #] argfile
        -c         command (there is no default)
        -h         print help and abort
        -s [ac]    show arguments (a) only or entire command (c)
                     in status messages (default is a)
        -t #       define number of threads to use (defaunlt is 1)

    example: $0 -t 4 argfile
.eusage

$Config{usethreads}  or  die "No thread support in this perl, stopped";


#  Command.

if (defined $opt_c)
{
    $command = $opt_c;
}
else
{
    die qq/o "-c command" option specified, stopped/;
}


#  Show arguments only or entire command.

$show = 'a';
if (defined $opt_s)
{
    if ($opt_s =~ /^[ac]$/)
    {
        ($opt_s eq 'c')  and  $show = 'c';
    }
    else
    {
        die /The "-s $opt_s" option is not defined, stopped/;
    }
}


#  Maximum number of threads.

$maxthreads = 1;           
if (defined $opt_t)
{
    if  ($opt_t =~ /^\d+$/)
    {
        $maxthreads = $opt_t;
    }
    else
    {
        die qq/Number of threads "$maxthreads" is not a number, stopped/;
    }
}


foreach $fn (@ARGV)
{
    open $fh, '<', $fn  or  die qq/Can't open "$fn" for input: $!, stopped/;

    while (<$fh>)
    {
        chomp;
        s/^\s+//;
        s/\s*[^\\]?#.*$//;
        /^$/  and  next;
        push @arg, $_;
    }

    close $fh  or  die qq/Can't close "$fn" input file: $!, stopped/;
}

print "These commands will be done:\n";
$n = @arg;
$len = length $n;
for ($i = 0;  $i < $n;  $i++)
{
    $command[$i] = "$command $arg[$i]";
    printf "%${len}d/$n  %s\n",  $i+1, $command[$i];
}


# How many threads are currently in progress.
$nth = 0;

# The Start sub fills this thread-save queue.
# The main program empties it.
$q = new Thread::Queue;

print "Doing commands:\n";
for ($i = 0;  $i < $n;  $i++)
{
    # Make threads.
    if ($nth < $maxthreads)
    {
        if ($th = threads->new(\&Sub, $i, $q))
        {
            $tid = $th->tid();
            $index[$tid] = $i;
            printf "\nstarting %${len}d/$n  %s\n", $index[$tid] + 1, ($show eq 'a') ? $arg[$index[$tid]] : $command[$index[$tid]];
            $nth++;
            Status;
        }
        else
        {
            die qq/Couldn't start thread.\nStopped/;
        }
    }

    # Join threads.
    while ($nth >= $maxthreads  ||  $q->pending)
    {
        $tid = $q->dequeue;
        $th = threads->object($tid);
        $th->join;
        printf "\nfinishing %${len}d/$n  %s\n", $index[$tid] + 1, ($show eq 'a') ? $arg[$index[$tid]] : $command[$index[$tid]];
        $nth--;
        Status;
    }

}

# Join all remaining threads.
while ($nth)
{
    $tid = $q->dequeue;
    $th = threads->object($tid);
    $th->join;
    printf "finishing %${len}d/$n  %s\n", $tid,  ($show eq 'a') ? $arg[$index[$tid]] : $command[$index[$tid]];
    $nth--;
}

exit 0;
