[comp.lang.perl] parallel execution with serialization of output

thoth@reef.cis.ufl.edu (Gilligan) (12/10/90)

  Whenever my fileserver goes down I lose mail, so I hacked up a perl
script to execute commands in parallel and hand me the output in
sequence.

ypcat hosts | awk '/cis\.ufl\.edu/{print "rsh",$2,"from"}' | multicommand.perl

  Yes, the entire line could be a perl script.  Here's a first attempt
(don't expect it to work).

perl -e 'open(IN,"ypcat hosts|"); open(OUT,"|multicommand.perl"); while (<IN>) {if (/cis\.ufl\.edu/) { ($foo, $name) = split; print OUT "rsh $name from\n";} } ; '

  harumph, two lines instead of one.  Well, there are certain things
where the shell syntax is more compact.

  I'm soliciting comments and speed hacks from the experts.  I'm
mainly looking for advice about which sections of the code can be sped
up, improved, modularized, or bulletproofed.  Can I get rid of the
index for loops?  I also want a way to wait on a specific pid (I'm not
concerned about hanging here, the subprocess should exit).  If anyone
has a better version I'll take it!

$Header: perly.c,v 3.0.1.7 90/08/13 22:22:22 lwall Locked $
Patch level: 28


#!/local/bin/perl

sub reap {

  while (1) {
    $nfound = select($rout=$rin, undef, undef, undef);
    if ($nfound == -1) {
	warn("select failed");
	last;
    }
#    print $nfound, " file descriptors with input waiting", $timeleft, "\n";
#    print $nprocs, " processes left\n";
    for ($i=0; $i<$scale; $i++) {
	$fd = $INPUT[$i];
        if (vec($rout, fileno($fd), 1)) {
#	  print "input waiting on ", $fd, "\n";
	  $_ = <$fd>;
	  last unless $_;
          $BUFFER[$i] .= $_;
        }
    }
    last unless ($i >= $scale);
  }
#  print "pipe #$i $INPUT[$i] closed\n";

  $pid = wait;
  if ($pids[$i] != $pid) {
	# this does happen, particularly when suspended
	print STDERR "pipe closed, but unexpected pid ",
		"($pid, not $pids[$i]) exit.  ";
	warn;
  } else {
#	print "pid ", $pids[$i], " done\n";
	$pids[$i] = 0;
  }

  vec($rin, fileno($INPUT[$i]), 1) = 0;
  close ($INPUT[$i]);
  print $BUFFER[$i], "\n", "---" x 5, "\n";
  $BUFFER[$i] = '';
  $nprocs--;
}

$CSH = "/bin/csh";

$| = 1;
$scale = 6;
while ($_ = shift @ARGV) {
  if ($_ eq "-n") { $scale = shift @ARGV; }
  elsif (/^-(\d+)$/) { $scale = $1; }
  else { last; }
}
if ($_) {unshift(@ARGV, $_);}

$filehandle = "FHANDLE";
for ($i=0; $i<$scale; $i++) {
   push(@INPUT, $filehandle++);
   push(@OUTPUT, $filehandle++);
}

$i = 0;
$nprocs = 0;
while ( $process = <> ) {

  pipe($INPUT[$i], $OUTPUT[$i]);
  vec($rin, fileno($INPUT[$i]), 1) = 1;
  $BUFFER[$i] = '';

  $fork = fork;
  if ($fork==0) {
	close($INPUT[$i]);
	close(STDIN);
	open(STDIN, "</dev/null");
	open(STDOUT, ">&$OUTPUT[$i]");
	open(STDERR, ">&$OUTPUT[$i]");
	select(STDOUT);
	print $process;
	exec $CSH, "-c", $process;
	die "couldn't exec at";
  } elsif ($fork != -1) {
	$pids[$i] = $fork;
	close ($OUTPUT[$i]);
	$nprocs++;
  } else {
	warn "fork failed at";
	next;
  }

  if ($nprocs >= $scale) {
    &reap;
  } else {
	# this should only happen at the start.
    $i++;
  }
}

while ($nprocs>0) {
  &reap;
}

print "all done\n";

--
"Until it's on my desk, it's vaporware"  (`it' is the NeXT)
"Those who would trade freedom for security soon will have neither" B. Franklin
"SHIT! I'm outta acid" (intense XTank game)