pcg@aber-cs.UUCP (Piercarlo Grandi) (03/17/89)
This program copies its standard input tom its standard output. It does this in a pipelined fashion, permitting overlap of reading with writing. Other similar programs are ddd (posted in comp.sources.unix) and strm (standard with Microport). It uses a ring of pipes connecting a ring of processes; the number of processes can be chosen at command startup time. Each process waits for a can-read token from its input pipe, then fills a buffer, whose size can be chosen at command startup, after which it passes the can-read token down its output pipe. It then waits for a can-write token from the input pipe, and then writes the buffer, and then passes the can-write token along. The net effect is asynchronous copy with any size ring of any size buffers. It helps a lot with backups, and makes streaming possible. One of greatest virtues is that it should run unchanged on virtually any UNIX version I can think of, as it uses only pipes and other V7 features. On the other hand this has two drawbacks, higher CPU consumption, and specificity to UNIX (it relies on all processes derived from the same parent having a shared file pointer in their inherited standard input and output). It is reliable. I have not find bugs for a while. Finally, a bit of history. This program has already been posted to BIX. It has not been changed since then (if I remember correctly). Even more finally :-), a disclaimer: in no way the University College of Wales, my employer, has supported or helped the development of this program. It has been entirely developed on my own time with my own resources, and is an adjunct to my own personal research, and in no way it has anything to do with the research of the University College of Wales. I thank the University College of Wales for allowing me to post this program through the use of their computer resources. -------------------------- cut here ----------------------- #! /bin/sh # This is a shell archive, meaning: # 1. Remove everything above the #! /bin/sh line. # 2. Save the resulting text in a file. # 3. Execute the file with /bin/sh (not csh) to create the files: # team.1 # team.c # This archive created: Thu Mar 16 15:29:56 1989 export PATH; PATH=/bin:$PATH if test -f 'team.1' then echo shar: will not over-write existing file "'team.1'" else cat << \SHAR_EOF > 'team.1' .TH TEAM 1 (pg) .ad b .SH NAME team \- parallel "pipe", allows asynchronous io .SH SYNOPSIS .B team [blocksize[\fBb\fP|\fBk\fP] [processes]] .SH DESCRIPTION .I Team just copies its standard input to its standard output. It does so however forking a team of independent .I processes (default is 8), arranged in a ring, with reads overlapped with writes. .LP Each process will wait for the end of the read phase of previous process, will then read .I blocksize bytes (or 512 byte blocks if suffixed with .I b or kilobytes if suffixed with .IR k , the default is 10240) from its standard input, activate the next process read phase, wait for the previous process write phase end, then write to its standard output, and activate the next process write phase. .LP .I Team consumes system time to synchronize and task switch among its processes; also, in order to avoid slowing it, it is best run on a quiescent system. .LP This program is most useful for output to a device, especially where a streaming tape is involved. It may be used to advantage with disc to disc and disc to tape copies. .SH EXAMPLES find dir -print | cpio -oBc | team 20k 8 >/dev/rmt0 .br team 20k 8 </dev/rmt0 | cpio -iBcdmu .SH ADVICE You are advised to experiment with different combinations of block size and number of processes; each program used with .I team works best with certain parameters, and performance depends even more strongly on the output device, so experiment with parameters also for this (it seems that the blocking factor of the process that feeds .I team ought to be inferior to that given to it, and possibly inferior to the limit on the size of a pipe for your version of the system). .I Team ought to be adaptive, and adjust dynamically both parameters, in order to reach a state where there is no pause between each stage of the ring. This is too difficult to achieve under UNIX. .LP Notice also that this program will read and write blocks all of the same size as prescribed, except the last, even when reading from pipes; if a read from its input supplies less bytes than the prescribed block size, this program will read again until its buffer is filled to norm or the input finishes. .LP A final note: it is usually advantageous to give to .I team a block size that is a multiple of the block size produced by the program before it in a pipeline. Notice that in many cases, such as the tape archival programs, the output will not be directly recognizable to the tape archiver in input, but will have to be reblocked back to the blocksize expected by the tape archiver either by way of .I dd or reapplication of .IR team , that is much faster of course. .SH BUGS .I Team will emit a number of messages comprehensible only to the author in case of errors. Plase note them and report them to the author. .SH SEE ALSO .IR volcopy (8) .br .IR cpio (1) .br .IR tar (1) .br .IR dump (8) .SH AUTHOR Piercarlo Grandi, Milano. SHAR_EOF fi # end of overwriting check if test -f 'team.c' then echo shar: will not over-write existing file "'team.c'" else cat << \SHAR_EOF > 'team.c' /* $Header$ */ static char Notice[] = "Copyright (C) 1987,1989 Piercarlo Grandi. All rights reserved."; /* This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 1, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You may have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */ #undef DEBUG /* Unix programs normally do synchronous read and write, that is, you read and then you write; no overlap is possible. This is especially catastrophic for device to device copies, whereit is important to minimize elapsed time, by overlapping activity on one with activity on another. To obtain this, a multiprocess structure is necessary under Unix. This program is functionally equivalento to a pipe, in that it copies its input (fd 0) to its output (fd 1) link. This programs is executed as a Team of N processes, called Guys, all of which share the same input and output links; the first reads a chunk of input, awakens the second, writes the chunk to its output; the second does the same, and the last awakens the first. Since this process is essentially cyclic, we use a ring of pipesto synchronize the Guys. Each guy has un input pipe from the upstream guy and an output pipe to the downstream guy. Whenever a guy receives a READ command from the upstream, it first reads a block and then passes on the READ command downstream; it then waits for a WRITE command from upstream, and then writes the block and after that passes the WRITE command downstream. Two other commands are used, one is STOP, and is sent downstream from the guy that detects the end of file of the input, after which the guy exits, and ABORT, which is sent downstream from the guy which detects trouble in the guy upstream to it, which has much the same effect. */ #include <errno.h> extern int errno; #include <signal.h> #include <stdio.h> #include <sys/types.h> #include <sys/file.h> #ifdef DEBUG # define Mesg(list) mesg list #else # define Mesg(list) #endif /*VARARGS1*/ mesg(a,b,c,d,e,f,g,h,i) char *a; { # if (defined(LOCK_EX)) int fd = open("/dev/tty",1); flock(fd,LOCK_EX); # endif fprintf(stderr,"%u: ",getpid()); fprintf(stderr,a,b,c,d,e,f,g,h,i); # if (defined(LOCK_EX)) flock(fd,LOCK_UN); close(fd); # endif } #ifndef PCG # define PCG 0 #endif #if (PCG) # include "Extend.h" # if (defined(vax) && defined(unix)) # include "Here42BSD.h" # endif # include "Sizes.h" # include "Types.h" #else # define were if # define fast register # define public /* extern */ # define private static # define void int # define boolean int # define true 1 # define false 0 # define NIL ((pointer) 0) # define scalar int typedef char *pointer; # if (defined(SMALL_M)) typedef unsigned address; # else typedef long address; # endif # define when break; case # define otherwise break; default #endif /* The regular Unix read and write calls are not guaranteed to process all the bytes requested. These procedures guarantee that if the request is for N bytes, all of them are read or written unless there is an error or eof. */ #define FdCLOSED 0 #define FdOPEN 1 #define FdEOF 2 #define FdERROR 3 struct Fd { int FFd; short FStatus; }; struct Fd FdIn,FdOut; public boolean FdOpen(fd,ffd) fast struct Fd *fd; int ffd; { fd->FStatus = (ffd >= 0) ? FdOPEN : FdCLOSED; fd->FFd = ffd; Mesg(("FdOpen fd %d\n",ffd)); return ffd >= 0; } public boolean FdClose(fd) fast struct Fd *fd; { int ffd; ffd = fd->FFd; Mesg(("FdClose fd %d\n",fd->FFd)); fd->FStatus = FdCLOSED; fd->FFd = -1; return close(ffd) >= 0; } public boolean FdCopy(to,from) fast struct Fd *to,*from; { to->FStatus = from->FStatus; to->FFd = dup(from->FFd); Mesg(("FdCopy of %d is %d\n",from->FFd,to->FFd)); return to->FFd >= 0; } public void FdSet(to,from) fast struct Fd *to,*from; { if (from->FFd < 0) fprintf(stderr,"team: set an invalid fd\n"); to->FStatus = from->FStatus; to->FFd = from->FFd; } public address FdRead(fd,buffer,goal) fast struct Fd *fd; pointer buffer; fast address goal; { fast int nread; fast address total; switch (fd->FStatus) { when FdEOF: return 0; when FdERROR: return -1; when FdCLOSED: return -1; when FdOPEN: for ( total = 0; total < goal && (nread = read(fd->FFd,buffer+total, (unsigned) (goal-total))) > 0; total += nread ); if (nread == 0) fd->FStatus = FdEOF; if (nread < 0) fd->FStatus = FdERROR; Mesg(("FdRead %d reads %d last %d\n",fd->FFd,total,nread)); return (total == 0) ? nread : total; } /*NOTREACHED*/ } public address FdWrite(fd,buffer,goal) fast struct Fd *fd; pointer buffer; fast address goal; { fast int nwritten; fast address total; switch (fd->FStatus) { when FdEOF: return 0; when FdERROR: return -1; when FdCLOSED: return -1; when FdOPEN: for ( total = 0; total < goal && (nwritten = write(fd->FFd,buffer+total,(unsigned) (goal-total))) > 0; total += nwritten ); Mesg(("FdWrite %d writes %d last %d\n",fd->FFd,total,nwritten)); if (nwritten == 0) fd->FStatus = FdEOF; if (nwritten < 0) fd->FStatus = FdERROR; return (total == 0) ? nwritten : total; } /*NOTREACHED*/ } /* A Token is scalar value representing a command. */ typedef short scalar Token; #define TokenREAD 0 #define TokenWRITE 1 #define TokenSTOP 2 #define TokenABORT -1 /* */ public boolean StreamPipe(downstream,upstream) fast struct Fd *downstream; fast struct Fd *upstream; { int links[2]; if (pipe(links) < 0) { perror("team: opening links"); return false; } Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0])); return FdOpen(downstream,links[1]) && FdOpen(upstream,links[0]); } struct StreamMsg { Token SmToken; short SmStatus; }; public boolean StreamSend(fd,token,status) fast struct Fd *fd; Token token; short status; { fast int n; struct StreamMsg message; message.SmToken = token; message.SmStatus = status; n = FdWrite(fd,(pointer) &message,(address) sizeof message); Mesg(("StreamSend fd %u n %d token %d\n",fd->FFd,n,token)); return n == sizeof message; } public boolean StreamReceive(fd,tokenp,statusp) fast struct Fd *fd; Token *tokenp; short *statusp; { fast int n; struct StreamMsg message; n = FdRead(fd,(pointer) &message,(address) sizeof message); *tokenp = message.SmToken; *statusp = message.SmStatus; Mesg(("StreamReceive fd %u n %d token %d\n",fd->FFd,n,*tokenp)); return n == sizeof message; } /* A guy is an instance of the input to output copier. It is attached to a relay station, with an upstream link, from which commands arrive, and a downward link, to which they are relayed once they are executed. */ struct Guy { int GPid; struct Fd GUpStream; struct Fd GDownStream; }; public boolean GuyOpen(guy,pid,upstream,downstream) fast struct Guy *guy; int pid; struct Fd *upstream,*downstream; { Mesg(("GuyOpen pid %u upstream %u downstream %u\n", pid,upstream->FFd,downstream->FFd)); guy->GPid = pid; FdSet(&guy->GUpStream,upstream); FdSet(&guy->GDownStream,downstream); return true; } #define GuySEND(guy,token,status) \ StreamSend(&guy->GDownStream,token,status) #define GuyRECEIVE(guy,tokenp,statusp) \ StreamReceive(&guy->GUpStream,tokenp,statusp) public boolean GuyStart(guy,bufsize) fast struct Guy *guy; address bufsize; { fast char *buffer; Token token; short status; boolean received; static int nread,nwritten; extern char *malloc(); Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize)); buffer = (pointer) malloc((unsigned) bufsize); if (buffer == NIL) { fprintf(stderr,"team: guy %d cannot allocate %u bytes\n", guy->GPid,bufsize); return false; } while ((received = GuyRECEIVE(guy,&token,&status)) && token != TokenSTOP) switch (token) { when TokenREAD: FdIn.FStatus = status; Mesg(("GuyStart reading %d chars\n",bufsize)); nread = FdRead(&FdIn,(pointer) buffer,bufsize); Mesg(("GuyStart reads %d chars\n",nread)); if (nread == 0) GuyStop(guy,NIL); if (nread < 0) GuyStop(guy,"error on guy read"); if (!GuySEND(guy,TokenREAD,FdIn.FStatus)) GuyStop(guy,"guy cannot send READ"); when TokenWRITE: FdOut.FStatus = status; Mesg(("GuyStart writing %d chars\n",nread)); nwritten = FdWrite(&FdOut,(pointer) buffer,(address) nread); Mesg(("GuyStart writes %d chars\n",nwritten)); if (nwritten == 0) GuyStop(guy,"eof on guy write"); if (nwritten < 0) GuyStop(guy,"error on guy write"); if (!GuySEND(guy,TokenWRITE,FdOut.FStatus)) GuyStop(guy,"guy cannot send WRITE"); when TokenABORT: GuyStop(guy,"guy was aborted"); otherwise: GuyStop(guy,"impossible token on ring"); } free((char *) buffer); GuyStop(guy,(received) ? NIL : "error on upstream receive"); /*NOTREACHED*/ /*return true;*/ } public boolean GuyStop(guy,errormsg) fast struct Guy *guy; char *errormsg; { Mesg(("GuyStop guy %#o\n",guy)); if (errormsg != NIL) { fprintf(stderr,"team: guy pid %u: %s\n",guy->GPid,errormsg); (void) GuySEND(guy,TokenABORT,FdERROR); exit(1); /*NOTREACHED*/ } if (!GuySEND(guy,TokenSTOP,FdEOF)) { exit(1); /*NOTREACHED*/ } exit(0); /*NOTREACHED*/ } public boolean GuyClose(guy) fast struct Guy *guy; { return FdClose(&guy->GUpStream) && FdClose(&guy->GDownStream); } /* A team is made up of a ring of guys; each guy copies a blockfrom its input to its ouput, and is driven by tokens sent to it by the previous guy on a pipe. */ struct Team { struct Guy *TGuys; short unsigned TSize; short unsigned TActive; }; public boolean TeamOpen(team,nominalsize) struct Team *team; short unsigned nominalsize; { extern char *calloc(); Mesg(("TeamOpen nominalsize %u\n",nominalsize)); team->TSize = 0; team->TActive = 0; team->TGuys = (struct Guy *) calloc(sizeof (struct Guy),nominalsize); for (team->TSize = 0; team->TSize < nominalsize; team->TSize++); were (team->TGuys == (struct Guy *) NIL) return false; return true; } public boolean TeamStart(team,bufsize) fast struct Team *team; address bufsize; { /* When generating each guy, we pass it an upstream link that is the downstream of the previous guy, and create a new downstream link that will be the next upstream. At each turn we obviously close the old downstream once it has been passed to the forked guy. A special case are the first and last guys; the upstreamof the first guy shall be the downstream of the last. This goes against the grain of our main logic, where the upstream is expected to already exist and the downstream must be created. This means that the last and first guys are created in a special way. When creating the first guy we shall create its upstreamlink as well as its downstream, and we shall save that in a special variable, last_downstream. This we shall use as the downstreamof the last guy. We shall also keep it open in the team manager (parent process) because we shall use it to do the initial send of the read and write tokens that will circulate in the relay ring, activating the guys. Of course because of this each guy will inherit this link as wellas its upstream and downstream, but shall graciously close it. */ struct Fd last_downstream; struct Fd this_upstream; struct Fd this_downstream; struct Fd next_upstream; Mesg(("TeamStart team %#o size %u bufsize %u\n", team,team->TSize,bufsize)); (void) FdOpen(&FdIn,0); (void) FdOpen(&FdOut,1); for (team->TActive = 0; team->TActive < team->TSize; team->TActive++) { fast struct Guy *guy; fast int pid; guy = team->TGuys+team->TActive; if (team->TActive == 0) { if (!StreamPipe(&last_downstream,&this_upstream)) { perror("cannot open first link"); return false; } if (!StreamPipe(&this_downstream,&next_upstream)) { perror("cannot open link"); return false; } } else if (team->TActive < (team->TSize-1)) { if (!StreamPipe(&this_downstream,&next_upstream)) { perror("cannot open link"); return false; } } else /*if (team->TActive == team->TSize-1)*/ { FdSet(&this_downstream,&last_downstream); if (!FdCopy(&last_downstream,&this_downstream)) perror("team: cannot copy last downstream"); } Mesg(("TeamStart going to fork for guy %#o\n",guy)); pid = fork(); if (pid > 0) { Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid)); guy->GPid = pid; if (!FdClose(&this_upstream)) perror("cannot close this upstream link"); if (!FdClose(&this_downstream)) perror("cannot close this downstream link"); FdSet(&this_upstream,&next_upstream); } else if (pid == 0) { pid = getpid(); if (!FdClose(&last_downstream)) perror("cannot close inherited first link"); if (!GuyOpen(guy,pid,&this_upstream,&this_downstream)) GuyStop(guy,"cannot open guy"); if (!GuyStart(guy,bufsize)) GuyStop(guy,"cannot start guy"); if (!GuyClose(guy)) perror("cannot close guy"); /*NOTREACHED*/ } else if (pid < 0) { perror("team: forking a guy"); return false; } } if (!StreamSend(&last_downstream,TokenREAD,FdOPEN)) { perror("cannot send first READ token"); return false; } if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN)) { perror("cannot send first WRITE token"); return false; } if (!FdClose(&last_downstream)) perror("cannot close first link"); return true; } public boolean TeamWait(team) fast struct Team *team; { while (team->TActive != 0) { int guypid; int status; guypid = wait(&status); if (guypid >= 0) { fast short unsigned guyno; for (guyno = 0; guyno < team->TSize; guyno++) if (guypid == team->TGuys[guyno].GPid) { team->TGuys[guyno].GPid = -1; break; } } else { fprintf(stderr,"team: no guys, believed %u left\n", team->TActive); return true; } --team->TActive; if (status != 0 && team->TActive != 0) return false; } return true; } public boolean TeamStop(team) fast struct Team *team; { fast short unsigned guyno; Mesg(("TeamStop team %#o\n",team)); for (guyno = 0; guyno < team->TSize; guyno++) { fast struct Guy *guy; guy = team->TGuys+guyno; if (guy->GPid >= 0) { /*kill(guy->GPid,SIGKILL);*/ --team->TActive; } } return team->TActive == 0; } public boolean TeamClose(team) fast struct Team *team; { for (team->TSize; team->TSize != 0; --team->TSize) continue; free(team->TGuys); return true; } public void usage() { fputs("\ syntax: team [size[bk] [#ofprocesses]]\n\ copies standard input to output\n\ as a # of (default 8) pipelined processes\n\ writing blocks of size (default 10240) bytes,\n\ or 512 byte blocks or kilobytes\n",stderr); exit(1); } public void main(argc,argv) int argc; char *(argv[]); { struct Team team; short unsigned teamsize; address bufsize; if (argc <= 1) bufsize = 10*1024; else { fast char *cipher; for ( cipher = argv[1]; *cipher >= '0' && *cipher <= '9'; cipher++ ) bufsize = bufsize*10 + *cipher-'0'; if (*cipher == 'b') bufsize *= 512; if (*cipher == 'k') bufsize *= 1024; if (bufsize < 64L || bufsize > (64L*1024L-1)) { fprintf(stderr,"team: invalid block size %d\n", bufsize); usage(); } } if (argc <= 2) teamsize = 8; else { teamsize = atoi(argv[2]); if (teamsize < 2 || teamsize > 16) { fprintf(stderr,"team: invalid # of processes %d\n", teamsize); usage(); } } if (argc > 3) usage(); if (!TeamOpen(&team,teamsize)) { fprintf(stderr,"team: cannot setup the team with %u guys\n", teamsize); exit(1); } if (!TeamStart(&team,bufsize)) { fprintf(stderr,"team: cannot start the team\n"); exit(1); } if (!TeamWait(&team)) { fprintf(stderr,"team: stop remaining %u guys\n",team.TActive); if (!TeamStop(&team)) { fprintf(stderr,"team: cannot stop the team\n"); exit(1); } } if (!TeamClose(&team)) { fprintf(stderr,"team: cannot close the team\n"); exit(1); } exit(0); } SHAR_EOF fi # end of overwriting check # End of shell archive exit 0 -- Piercarlo "Peter" Grandi | ARPA: pcg%cs.aber.ac.uk@nss.cs.ucl.ac.uk Dept of CS, UCW Aberystwyth | UUCP: ...!mcvax!ukc!aber-cs!pcg Penglais, Aberystwyth SY23 3BZ, UK | INET: pcg@cs.aber.ac.uk