[net.sources] multibuffered 4.2bsd streaming /etc/dump

speck@cit-vlsi (Don Speck) (08/05/85)

: Shell archive of 4.2bsd triple-buffered dump.
: Extract with sh, not csh.
echo "x README"
sed 's/^X//' >README <<FGD135
X    This is a complete rewrite of the February posting of
Xmodifications to add triple-buffering to 4.2bsd /etc/dump.
XSynchronization is now done with flock() instead of pipes.
XUnder optimum conditions, it is now fast enough to stream
Xa TU80 in 100 ips mode.  CPU consumption is lower too.
X
X    Several bugs have gone away.  This version doesn't "run
Xon" after aborting, the way the old one did.  If interrupts
Xare being ignored, they now stay that way.  Tape usage is
Xnow accounted almost correctly.
X
X    An RDUMP optimization enclosed by "#ifdef not_sun" hits
Xsome bug in Suns, whether the rdump is to or from the Sun.
XThe optimization works okay when both hosts are vaxen, and
Xgives an additional 20% speedup.
X
X    The diff of dumptraverse is merely a code optimization;
Xit is independent of the dumptape.c changes.
FGD135

echo "x traverse.diff"
sed 's/^X//' >traverse.diff <<Dr.
X232,233c232,236
X< 	for(i = 0; i < sizeof(union u_spcl)/sizeof(int); i++)
X< 		s += *ip++;
X---
X> 	i = sizeof(union u_spcl) / (4*sizeof(int));
X> 	while (--i >= 0) {
X> 		s += *ip++; s += *ip++;
X> 		s += *ip++; s += *ip++;
X> 	}
Dr.

echo "x dumptape.c"
sed 's/^X//' >dumptape.c <<Strangelove
Xstatic char *sccsid = "@(#)dumptape.c 2.1 (Berkeley+Caltech mods) 4/7/85";
X#include "dump.h"
X
Xchar	(*tblock)[TP_BSIZE];	/* Pointer to malloc()ed buffer for tape */
Xint	writesize;		/* Size of malloc()ed buffer for tape */
Xint	trecno = 0;
Xextern int ntrec;		/* blocking factor on tape */
Xextern int cartridge;
Xint	tenths; 		/* length of tape used per block written */
X
X/* Concurrent dump mods (Caltech) - disk block reading and tape writing
X * are exported to several slave processes.  While one slave writes the
X * tape, the others read disk blocks; they pass control of the tape in
X * a ring via flock().	The parent process traverses the filesystem and
X * sends spclrec()'s and lists of daddr's to each slave via pipes.
X */
Xstruct req {			/* instruction packets sent to slaves */
X	daddr_t dblk;
X	int count;
X} *req;
Xint reqsiz;
X
X#define SLAVES 3		/* 1 slave writing, 1 reading, 1 for slack */
Xint slavepid[SLAVES];
Xint slavefd[SLAVES];		/* Pipes from master to each slave */
Xint rotor;			/* Current slave number */
Xint master;			/* Pid of master, for sending error signals */
Xint trace=0;			/* Protocol trace; easily patchable with adb*/
X#define  tmsg	if (trace) msg
X
X/* Allocate tape buffer contiguous with the array of instruction
X * packets, so flusht() can write them together with one write().
X * Align tape buffer on page boundary to speed up tape write().
X */
Xalloctape()
X{
X	int pgoff = getpagesize() - 1;
X	writesize = ntrec * TP_BSIZE;
X	tenths = writesize/density + (cartridge ? 16 : density == 625 ? 4 : 8);
X	/* 92185 NEEDS 0.4"; 92181 NEEDS 0.8" to start/stop (see TU80 manual) */
X
X	reqsiz = ntrec * sizeof(struct req);
X	req = (struct req *)malloc(reqsiz+writesize+pgoff);
X	if (req == NULL) return(0);
X
X	tblock = (char (*)[TP_BSIZE]) (((long)&req[ntrec] + pgoff) &~ pgoff);
X	req = (struct req *)tblock;
X	req = &req[-ntrec];	/* Cmd packets go in front of tape buffer */
X	return(1);
X}
X
X
Xtaprec(dp)
X	char *dp;
X{		/* make copy of spclrec, to later send to tape writer */
X	tmsg("taprec %d\n", trecno);
X	req[trecno].dblk = (daddr_t)0;
X	req[trecno].count = 1;
X	*(union u_spcl *)(*tblock++) = *(union u_spcl *)dp;	/* movc3 */
X	trecno++;
X	spcl.c_tapea++;
X	if(trecno >= ntrec)
X		flusht();
X}
X
Xdmpblk(blkno, size)
X	daddr_t blkno;
X	int size;
X{
X	int tpblks, dblkno;
X	register int avail;
X
X	dblkno = fsbtodb(sblock, blkno);
X	tpblks = size / TP_BSIZE;
X	while ((avail=MIN(tpblks,ntrec-trecno)) > 0) {
X		tmsg("dmpblk %d\n", avail);
X		req[trecno].dblk = dblkno;
X		req[trecno].count = avail;
X		spcl.c_tapea += avail;
X		if ((trecno+=avail) >= ntrec)
X			flusht();
X		dblkno += avail * (TP_BSIZE / DEV_BSIZE);
X		tpblks -= avail;
X	}
X}
X
Xint	nogripe = 0;
X
Xtperror() {
X	if (pipeout) {
X		msg("Tape write error on %s\n", tape);
X		msg("Cannot recover\n");
X		dumpabort();
X		/* NOTREACHED */
X	}
X	msg("Tape write error on tape %d\n", tapeno);
X	broadcast("TAPE ERROR!\n");
X	if (!query("Do you want to restart?"))
X		dumpabort();
X	msg("This tape will rewind.  After it is rewound,\n");
X	msg("replace the faulty tape with a new one;\n");
X	msg("this dump volume will be rewritten.\n");
X	killall();
X	nogripe = 1;
X	close_rewind();
X	Exit(X_REWRITE);
X}
X
X#ifdef RDUMP
Xtflush(i) int i; {
X	for (i=0; i<ntrec; i++) spclrec();
X}
X#endif
X
Xflusht() {
X	int sig, siz = (char *)tblock - (char *)req;
X
X	tmsg("flusht %d\n", siz);
X	sig = sigblock(1 << SIGINT-1);		/* Don't abort pipe write */
X	if (write(slavefd[rotor], req, siz) != siz) {
X		perror("  DUMP: pipe write");
X		dumpabort();
X	}
X	sigsetmask(sig);
X	if (++rotor >= SLAVES) rotor = 0;
X	tblock = (char (*)[TP_BSIZE]) &req[ntrec];
X	trecno = 0;
X	asize += tenths;
X	blockswritten += ntrec;
X	if (!pipeout && asize > tsize) {
X		close_rewind();
X		otape();
X	}
X	timeest();
X}
X
Xrewind() {
X	int f;
X
X	if (pipeout)
X		return;
X	for (f=0; f<SLAVES; f++)
X		close(slavefd[f]);
X	while (wait(NULL) >= 0)    ;	/* wait for any signals from slaves */
X	msg("Tape rewinding\n");
X#ifdef RDUMP
X	rmtclose();
X	while (rmtopen(tape, 0) < 0)
X		sleep(10);
X	rmtclose();
X#else
X	close(to);
X	while ((f = open(tape, 0)) < 0)
X		sleep (10);
X	close(f);
X#endif
X}
X
Xclose_rewind()
X{
X	rewind();
X	if (!nogripe){
X		msg("Change Tapes: Mount tape #%d\n", tapeno+1);
X		broadcast("CHANGE TAPES!\7\7\n");
X	}
X	while (!query("Is the new tape mounted and ready to go?"))
X		if (query("Do you want to abort?"))
X			dumpabort();
X}
X
X/*
X *	We implement taking and restoring checkpoints on
X *	the tape level.
X *	When each tape is opened, a new process is created by forking; this
X *	saves all of the necessary context in the parent.  The child
X *	continues the dump; the parent waits around, saving the context.
X *	If the child returns X_REWRITE, then it had problems writing that tape;
X *	this causes the parent to fork again, duplicating the context, and
X *	everything continues as if nothing had happened.
X */
X
Xotape()
X{
X	int	parentpid;
X	int	childpid;
X	int	status;
X	int	waitpid;
X	int	(*interrupt)();
X
X	parentpid = getpid();
X
X    restore_check_point:
X	interrupt = signal(SIGINT, SIG_IGN);
X	childpid = fork();
X	if (childpid < 0){
X		msg("Context save fork fails in parent %d\n", parentpid);
X		Exit(X_ABORT);
X	}
X	if (childpid != 0){
X		/*
X		 *	PARENT:
X		 *	save the context by waiting
X		 *	until the child doing all of the work returns.
X		 *	don't catch the interrupt
X		 */
X#ifdef TDEBUG
X		msg("Tape: %d; parent process: %d child process %d\n",
X			tapeno+1, parentpid, childpid);
X#endif TDEBUG
X		while ((waitpid=wait(&status)) != childpid)
X			msg("Parent %d waiting for child %d has another child %d return\n",
X				parentpid, childpid, waitpid);
X		if (status & 0xFF){
X			msg("Child %d returns LOB status %o\n",
X				childpid, status&0xFF);
X		}
X		status = (status >> 8) & 0xFF;
X#ifdef TDEBUG
X		switch(status){
X			case X_FINOK:
X				msg("Child %d finishes X_FINOK\n", childpid);
X				break;
X			case X_ABORT:
X				msg("Child %d finishes X_ABORT\n", childpid);
X				break;
X			case X_REWRITE:
X				msg("Child %d finishes X_REWRITE\n", childpid);
X				break;
X			default:
X				msg("Child %d finishes unknown %d\n", childpid,status);
X				break;
X		}
X#endif TDEBUG
X		switch(status){
X			case X_FINOK:
X				Exit(X_FINOK);
X			case X_ABORT:
X				Exit(X_ABORT);
X			case X_REWRITE:
X				goto restore_check_point;
X			default:
X				msg("Bad return code from dump: %d\n", status);
X				Exit(X_ABORT);
X		}
X		/*NOTREACHED*/
X	} else {	/* we are the child; just continue */
X#ifdef TDEBUG
X		sleep(4);	/* allow time for parent's message to get out */
X		msg("Child on Tape %d has parent %d, my pid = %d\n",
X			tapeno+1, parentpid, getpid());
X#endif
X		signal(SIGINT, interrupt);
X#ifdef RDUMP
X		while ((to = rmtopen(tape, 2)) < 0)
X#else
X		while ((to = pipeout ? 1 : creat(tape,0666)) < 0)
X#endif
X			if (!query("Cannot open tape.  Do you want to retry the open?"))
X				dumpabort();
X
X		enslave();  /* Share open tape file descriptor with slaves */
X
X		asize = 0;
X		tapeno++;		/* current tape sequence */
X		newtape++;		/* new tape signal */
X		spcl.c_volume++;
X		spcl.c_type = TS_TAPE;
X		spclrec();
X		if (tapeno > 1)
X			msg("Tape %d begins with blocks from ino %d\n",
X				tapeno, ino);
X	}
X}
X
Xdumpabort()
X{
X	if (master != 0 && master != getpid())
X		kill(master,SIGPIPE);
X	else {
X		killall();
X		msg("The ENTIRE dump is aborted.\n");
X	}
X	Exit(X_ABORT);
X}
X
XExit(status)
X{
X#ifdef TDEBUG
X	msg("pid = %d exits with status %d\n", getpid(), status);
X#endif TDEBUG
X	exit(status);
X}
X
Xlockfile(fd) int fd[2]; {	/* prefer pipe(), but flock() barfs on them */
X	char tmpname[20];
X	strcpy(tmpname, "/tmp/dumplockXXXXXX");
X	mktemp(tmpname);
X	if ((fd[1]=creat(tmpname,0400)) < 0)
X		return(fd[1]);
X	fd[0] = open(tmpname, 0);
X	unlink(tmpname);
X	return(fd[0] < 0 ? fd[0] : 0);
X}
X
X#include <sys/file.h>
X
Xenslave() {
X	int first[2], prev[2], next[2], cmd[2];     /* file descriptors */
X	register int i, j;
X
X	master = getpid();
X	signal(SIGPIPE,dumpabort);  /* Slave quit/died/killed -> abort */
X	signal(SIGIOT,tperror);     /* SIGIOT -> restart from checkpoint */
X	lockfile(first);
X	for (i=0; i<SLAVES; ++i) {
X		if (i == 0) {
X			prev[0] = first[1];
X			prev[1] = first[0];
X		} else {
X			prev[0] = next[0];
X			prev[1] = next[1];
X			flock(prev[1], LOCK_EX);
X		}
X		next[0] = first[0];
X		next[1] = first[1];	    /* Last slave loops back */
X		if ((i < SLAVES-1 && lockfile(next) < 0) || pipe(cmd) < 0
X				|| (slavepid[i]=fork()) < 0) {
X			perror("  DUMP: too many slaves (recompile smaller)");
X			dumpabort();
X		}
X		slavefd[i] = cmd[1];
X		if (slavepid[i] == 0) { 	    /* Slave starts up here */
X			for (j=0; j <= i; j++)
X				close(slavefd[j]);
X			signal(SIGINT,SIG_IGN);     /* Master handles these */
X			signal(SIGTERM,SIG_IGN);
X			doslave(i,cmd[0],prev,next);
X			Exit(X_FINOK);
X		}
X		close(cmd[0]);
X		if (i > 0) {
X			close(prev[0]);
X			close(prev[1]);
X		}
X	}
X	close(first[0]);
X	close(first[1]);
X	master = 0; rotor = 0;
X}
X
Xkillall() {
X	register int i;
X	for (i=0; i<SLAVES; i++)
X		if (slavepid[i] > 0) kill(slavepid[i], SIGKILL);
X}
X
X/* Synchronization - each process has a lockfile, and shares file
X * descriptors to the following process's lockfile.  When our write
X * completes, we release our lock on the following process's lock-
X * file, allowing the following process to lock it and proceed. We
X * get the lock back for the next cycle by swapping descriptors.
X */
X
Xdoslave(mynum,cmd,prev,next)
X	int mynum, cmd, prev[2], next[2];
X{
X	register int toggle = 0, firstdone = mynum;
X
X	tmsg("slave %d\n", mynum);
X	close(fi);
X	if ((fi=open(disk,0)) < 0) {		/* Don't share seek pointer */
X		perror("  DUMP: slave couldn't reopen disk");
X		kill(master, SIGPIPE);		/* dumpabort */
X		Exit(X_ABORT);
X	}
X	while (readpipe(cmd,req,reqsiz) > 0) {	/* Get list of blocks to dump */
X		register struct req *p = req;
X		for (trecno=0; trecno < ntrec; trecno+=p->count, p+=p->count) {
X			if (p->dblk) {
X				tmsg("%d READS %d\n",mynum,p->count);
X				bread(p->dblk,tblock[trecno],p->count*TP_BSIZE);
X			} else {
X				tmsg("%d PIPEIN %d\n",mynum,p->count);
X				if (p->count != 1 || readpipe(cmd,
X				    tblock[trecno],TP_BSIZE) <= 0) {
X					msg("Master/slave protocol botched");
X					dumpabort();
X				}
X			}
X		}
X		flock(prev[toggle], LOCK_EX);	/* Wait our turn */
X		tmsg("%d WRITE\n",mynum);
X#ifdef RDUMP
X#ifdef not_sun	/* Defer checking first write until next one is started */
X		rmtwrite0(writesize);
X		rmtwrite1(tblock[0],writesize);
X		if (firstdone == 0) firstdone = -1;
X		else if (rmtwrite2() != writesize) {
X			rmtwrite2();		/* Don't care if another err */
X#else
X		/* Asynchronous writes can hang Suns; do it synchronously */
X		if (rmtwrite(tblock[0],writesize) != writesize) {
X#endif
X#else		/* Local tape drive */
X		if (write(to,tblock[0],writesize) != writesize) {
X			perror(tape);
X#endif
X			kill(master, SIGIOT);	/* restart from checkpoint */
X			for (;;) sigpause(0);
X		}
X		toggle ^= 1;
X		flock(next[toggle], LOCK_UN);	/* Next slave's turn */
X	}					/* Also jolts him awake */
X#ifdef RDUMP			/* One more time around, to check last write */
X#ifdef not_sun
X	flock(prev[toggle], LOCK_EX);
X	tmsg("%d LAST\n",mynum);
X	if (firstdone < 0 && rmtwrite2() != writesize) {
X		kill(master, SIGIOT);
X		for (;;) sigpause(0);
X	}
X	toggle ^= 1;
X	flock(next[toggle], LOCK_UN);
X#endif
X#endif
X}
X
Xreadpipe(fd,buf,count) int fd, count; char *buf; {
X	int i, n;
X	for (n=0; n<count; buf+=i, n+=i)
X		if ((i=read(fd,buf,count-n)) <= 0) {
X			if (i==0 && n==0)
X				return(0);		/* Normal EOF */
X			msg("short pipe read");
X			dumpabort();
X		}
X	return(n);
X}
Strangelove