[net.sources] multibuffered streaming tape copy

speck@cit-vlsi (Don Speck) (08/05/85)

: Shell archive of 4.2bsd streaming tape copy program.
: Extract with sh, not csh.
echo "x stream.c"
sed 's/^X//' >stream.c <<FGD135
X/*  4.2bsd multi-buffered screaming tape copy
X    with streamer as input, output, or both.
X    Does it all in user code via 2 concurrent processes,
X    synchronized with flock().
X    For streamer-to-streamer copies, NBUFS should be large (~10)
X*/
X
X#define NBUFS  2	/* Number of multi-buffers (hence processes) */
X
X#include <errno.h>
X#include <signal.h>
X#include <sys/file.h>
X#include <sys/wait.h>
X
Xint firstrd[2], prevrd[2], nextrd[2];	      /* File descriptors */
Xint firstwr[2], prevwr[2], nextwr[2];
Xint slavepid[NBUFS];
Xint bufsiz = 10240;
Xchar *buf, *sbrk();
X
Xunsigned atou(s) register char *s; {	/* Parse digit string to unsigned int */
X	register unsigned u = 0;
X	while (*s >= '0' && *s <= '9')
X		u = u*10 + (*s++ - '0');
X	if (*s == 'b') u *= 512, s++;
X	if (*s == 'k') u *= 1024, s++;
X	return(*s == '\0' ? u : 0);
X}
X
Xabort() {				/* Signal catchers */
X	killall();
X	_exit(EINTR);
X}
X
Xdone() {
X	_exit(0);
X}
X
Xmain(argc,argv) int argc; char *argv[]; {
X	register int i, pid;
X	static int wstat, children = NBUFS;
X
X	if (argc > 2 || argc == 2 && (bufsiz=atou(argv[1])) == 0) {
X		static char usage[] = "Usage: stream [bufsiz][b|k]\n";
X		write(2, usage, sizeof(usage)-1);
X		_exit(EINVAL);
X	}
X	buf = sbrk(bufsiz);
X	if (buf == (char *) -1) {
X		perror("sbrk");
X		_exit(ENOMEM);
X	}
X	if (signal(SIGINT, abort) == SIG_IGN)
X		signal(SIGINT, SIG_IGN);
X	if (signal(SIGTERM, abort) == SIG_IGN)
X		signal(SIGTERM, SIG_IGN);
X
X	lockpipe(firstrd);
X	lockpipe(firstwr);
X	for (i=0; i<NBUFS; ++i) {
X		if (i == 0) {
X			prevrd[0] = firstrd[1]; prevrd[1] = firstrd[0];
X			prevwr[0] = firstwr[1]; prevwr[1] = firstwr[0];
X		} else {
X			prevrd[0] = nextrd[0];	prevrd[1] = nextrd[1];
X			prevwr[0] = nextwr[0];	prevwr[1] = nextwr[1];
X		}
X		flock(prevrd[1], LOCK_EX);
X		flock(prevwr[1], LOCK_EX);
X		nextrd[0] = firstrd[0]; nextrd[1] = firstrd[1];
X		nextwr[0] = firstwr[0]; nextwr[1] = firstwr[1];
X		if ((i < NBUFS-1 && (lockpipe(nextrd)<0 || lockpipe(nextwr)<0))
X				|| (slavepid[i]=fork()) < 0) {
X			perror("stream: too many slaves (recompile smaller)");
X			killall();
X			_exit(EAGAIN);
X		}
X		if (slavepid[i] == 0) { 	    /* Slave starts up here */
X			signal(SIGINT,SIG_IGN);
X			signal(SIGTERM,done);	    /* exit cleanly */
X			copier();
X			_exit(0);
X		}
X		if (i > 0) {
X			close(prevrd[0]); close(prevrd[1]);
X			close(prevwr[0]); close(prevwr[1]);
X		}
X	}
X	flock(firstrd[0], LOCK_UN);
X	flock(firstwr[0], LOCK_UN);
X	close(firstrd[0]);  close(firstrd[1]);
X	close(firstwr[0]);  close(firstwr[1]);
X
X	while (children > 0 && (pid=wait(&wstat)) > 0)
X		for (i=0; i<NBUFS; i++)
X			if (pid == slavepid[i]) {
X				children--;
X				slavepid[i] = 0;
X				killall();
X				if (wstat != 0) _exit(EIO);
X			}
X	_exit(0);
X}
X
Xkillall() {
X	register int i;
X	for (i=0; i<NBUFS; i++)
X		if (slavepid[i] > 0) kill(slavepid[i], SIGTERM);
X}
X
Xlockpipe(fd) int fd[2]; {	/* prefer pipe(), but flock() barfs on them */
X	char tmpname[20];
X	strcpy(tmpname, "/tmp/lockpipeXXXXXX");
X	mktemp(tmpname);
X	if ((fd[1]=creat(tmpname,0400)) < 0)
X		return(fd[1]);
X	fd[0] = open(tmpname, 0);
X	unlink(tmpname);
X	return(fd[0] < 0 ? fd[0] : 0);
X}
X
X/* Synchronization - each process has a lockfile, and shares file
X * descriptors to the following process's lockfile.  When our write
X * completes, we release our lock on the following process's lock-
X * file, allowing the following process to lock it and proceed. We
X * get the lock back for the next cycle by swapping descriptors.
X * Similarly for reads.
X */
X
X#include <stdio.h>
Xcopier() {
X	register int nread, toggle = 0;
X
X	flock(prevrd[toggle], LOCK_EX);
X	while ((nread=read(0, buf, bufsiz)) > 0) {
X		flock(nextrd[toggle^1], LOCK_UN); /* Jolt awake next reader */
X		flock(prevwr[toggle], LOCK_EX);   /* Wait for previous write */
X		if (write(1, buf, nread) != nread) {
X			perror("stdout");
X			_exit(1);
X		}
X		toggle ^= 1;
X		flock(nextwr[toggle], LOCK_UN);   /* Jolt awake next writer */
X		flock(prevrd[toggle], LOCK_EX);   /* Now wait for the read */
X	}
X	flock(prevwr[toggle], LOCK_EX);
X	if (nread < 0) {
X		perror("stdin");
X		_exit(1);
X	}
X}
FGD135