speck@cit-vlsi (Don Speck) (08/05/85)
: Shell archive of 4.2bsd streaming tape copy program. : Extract with sh, not csh. echo "x stream.c" sed 's/^X//' >stream.c <<FGD135 X/* 4.2bsd multi-buffered screaming tape copy X with streamer as input, output, or both. X Does it all in user code via 2 concurrent processes, X synchronized with flock(). X For streamer-to-streamer copies, NBUFS should be large (~10) X*/ X X#define NBUFS 2 /* Number of multi-buffers (hence processes) */ X X#include <errno.h> X#include <signal.h> X#include <sys/file.h> X#include <sys/wait.h> X Xint firstrd[2], prevrd[2], nextrd[2]; /* File descriptors */ Xint firstwr[2], prevwr[2], nextwr[2]; Xint slavepid[NBUFS]; Xint bufsiz = 10240; Xchar *buf, *sbrk(); X Xunsigned atou(s) register char *s; { /* Parse digit string to unsigned int */ X register unsigned u = 0; X while (*s >= '0' && *s <= '9') X u = u*10 + (*s++ - '0'); X if (*s == 'b') u *= 512, s++; X if (*s == 'k') u *= 1024, s++; X return(*s == '\0' ? u : 0); X} X Xabort() { /* Signal catchers */ X killall(); X _exit(EINTR); X} X Xdone() { X _exit(0); X} X Xmain(argc,argv) int argc; char *argv[]; { X register int i, pid; X static int wstat, children = NBUFS; X X if (argc > 2 || argc == 2 && (bufsiz=atou(argv[1])) == 0) { X static char usage[] = "Usage: stream [bufsiz][b|k]\n"; X write(2, usage, sizeof(usage)-1); X _exit(EINVAL); X } X buf = sbrk(bufsiz); X if (buf == (char *) -1) { X perror("sbrk"); X _exit(ENOMEM); X } X if (signal(SIGINT, abort) == SIG_IGN) X signal(SIGINT, SIG_IGN); X if (signal(SIGTERM, abort) == SIG_IGN) X signal(SIGTERM, SIG_IGN); X X lockpipe(firstrd); X lockpipe(firstwr); X for (i=0; i<NBUFS; ++i) { X if (i == 0) { X prevrd[0] = firstrd[1]; prevrd[1] = firstrd[0]; X prevwr[0] = firstwr[1]; prevwr[1] = firstwr[0]; X } else { X prevrd[0] = nextrd[0]; prevrd[1] = nextrd[1]; X prevwr[0] = nextwr[0]; prevwr[1] = nextwr[1]; X } X flock(prevrd[1], LOCK_EX); X flock(prevwr[1], LOCK_EX); X nextrd[0] = firstrd[0]; nextrd[1] = firstrd[1]; X nextwr[0] = firstwr[0]; nextwr[1] = firstwr[1]; X if ((i < NBUFS-1 && (lockpipe(nextrd)<0 || lockpipe(nextwr)<0)) X || (slavepid[i]=fork()) < 0) { X perror("stream: too many slaves (recompile smaller)"); X killall(); X _exit(EAGAIN); X } X if (slavepid[i] == 0) { /* Slave starts up here */ X signal(SIGINT,SIG_IGN); X signal(SIGTERM,done); /* exit cleanly */ X copier(); X _exit(0); X } X if (i > 0) { X close(prevrd[0]); close(prevrd[1]); X close(prevwr[0]); close(prevwr[1]); X } X } X flock(firstrd[0], LOCK_UN); X flock(firstwr[0], LOCK_UN); X close(firstrd[0]); close(firstrd[1]); X close(firstwr[0]); close(firstwr[1]); X X while (children > 0 && (pid=wait(&wstat)) > 0) X for (i=0; i<NBUFS; i++) X if (pid == slavepid[i]) { X children--; X slavepid[i] = 0; X killall(); X if (wstat != 0) _exit(EIO); X } X _exit(0); X} X Xkillall() { X register int i; X for (i=0; i<NBUFS; i++) X if (slavepid[i] > 0) kill(slavepid[i], SIGTERM); X} X Xlockpipe(fd) int fd[2]; { /* prefer pipe(), but flock() barfs on them */ X char tmpname[20]; X strcpy(tmpname, "/tmp/lockpipeXXXXXX"); X mktemp(tmpname); X if ((fd[1]=creat(tmpname,0400)) < 0) X return(fd[1]); X fd[0] = open(tmpname, 0); X unlink(tmpname); X return(fd[0] < 0 ? fd[0] : 0); X} X X/* Synchronization - each process has a lockfile, and shares file X * descriptors to the following process's lockfile. When our write X * completes, we release our lock on the following process's lock- X * file, allowing the following process to lock it and proceed. We X * get the lock back for the next cycle by swapping descriptors. X * Similarly for reads. X */ X X#include <stdio.h> Xcopier() { X register int nread, toggle = 0; X X flock(prevrd[toggle], LOCK_EX); X while ((nread=read(0, buf, bufsiz)) > 0) { X flock(nextrd[toggle^1], LOCK_UN); /* Jolt awake next reader */ X flock(prevwr[toggle], LOCK_EX); /* Wait for previous write */ X if (write(1, buf, nread) != nread) { X perror("stdout"); X _exit(1); X } X toggle ^= 1; X flock(nextwr[toggle], LOCK_UN); /* Jolt awake next writer */ X flock(prevrd[toggle], LOCK_EX); /* Now wait for the read */ X } X flock(prevwr[toggle], LOCK_EX); X if (nread < 0) { X perror("stdin"); X _exit(1); X } X} FGD135