lmjm@doc.ic.ac.uk (Lee McLoughlin) (07/20/90)
This is a program designed to speed up writing tapes on remote tape drives. It runs on SunOS so long as you have shared memory and locks. The program splits itself into two processes. The first process reads (and reblocks) from stdin into a shared memory buffer. The second writes from the shared memory buffer to stdout. Doing it this way means that the writing side effectly sits in a tight write loop. I run an archive and need to write large chunks out to tape regularly with an ethernet in the way. Using 'buffer' in a command like: tar cvf - stuff | rsh somebox "buffer > /dev/rst8" is a factor of 5 faster than the best alternative, gnu tar with its remote tape option: tar cvf somebox:/dev/rst8 stuff It also speeds up remote dumps! Lee McLoughlin, Imperial College, 1990 Janet: lmjm@uk.ac.ic.doc Uucp: lmjm@icdoc.UUCP (or ..!ukc!icdoc!lmjm) DARPA: lmjm@doc.ic.ac.uk (or lmjm%uk.ac.ic.doc@nsfnet-relay.ac.uk) #! /bin/sh # This is a shell archive, meaning: # 1. Remove everything above the #! /bin/sh line. # 2. Save the resulting text in a file. # 3. Execute the file with /bin/sh (not csh) to create the files: # README # buffer.man # makefile # buffer.c # sem.c # This archive created: Thu Jul 19 23:43:35 1990 export PATH; PATH=/bin:$PATH if test -f 'README' then echo shar: will not over-write existing file "'README'" else cat << \SHAR_EOF > 'README' This is a program designed to speed up writing tapes on remote tape drives. It runs on SunOS so long as you have shared memory and locks. The program splits itself into two processes. The first process reads (and reblocks) from stdin into a shared memory buffer. The second writes from the shared memory buffer to stdout. Doing it this way means that the writing side effectly sits in a tight write loop. I run an archive and need to write large chunks out to tape regularly with an ethernet in the way. Using 'buffer' in a command like: tar cvf - stuff | rsh somebox "buffer > /dev/rst8" is a factor of 5 faster than the best alternative, gnu tar with its remote tape option: tar cvf somebox:/dev/rst8 stuff It also speeds up remote dumps! Lee McLoughlin, Imperial College, 1990 Janet: lmjm@uk.ac.ic.doc Uucp: lmjm@icdoc.UUCP (or ..!ukc!icdoc!lmjm) DARPA: lmjm@doc.ic.ac.uk (or lmjm%uk.ac.ic.doc@nsfnet-relay.ac.uk) SHAR_EOF fi # end of overwriting check if test -f 'buffer.man' then echo shar: will not over-write existing file "'buffer.man'" else cat << \SHAR_EOF > 'buffer.man' .TH BUFFER 1 "14 May 1990" .SH NAME buffer \- very fast reblocking program .SH SYNTAX .B buffer [\fB\-S size\fR] [\fB\-b blocks\fR] [\fB\-s size\fR] [\fB\-m size\fR] .SH OPTIONS .TP 5 .B \-S size Approxomately every size bytes show how much has been writen so far. .TP .B \-m size maximum size of the shared memory chunk to allocate for the circular queue. Defaults to one megabyte. .TP .B \-b blocks Number of blocks to allocate to shared memory circular buffer. Defaults the number required to fill up the shared memory requested. .TP .B \-s size size in bytes of each block. The default blocksize is 10k to match the normal output of the .I tar(1) program. .PP Sizes are a number with an optional trailing character. A 'b' multiplies the size by 512, a 'k' by 1024 and an 'm' by a meg. .SH DESCRIPTION .I Buffer reads from standard input reblocking to the given blocksize and writes each block to standard output. .PP Internally .I buffer is a pair of processes communicating via a large circular queue held in shared memory. The reader process only has to block when the queue is full and the writer process when the queue is empty. .I Buffer is designed to try and keep the writer side continuously busy so that it can stream on suitable tape drives. In particular when used to write tapes with an intervening network using .I buffer to reblock can result in a considerable performance increase. .SH EXAMPLES .br $ \fBbuffer < /etc/termcap > /dev/rst8\fR .br .sp $ \fBtar cf - | rsh somehost 'buffer > /dev/rst8'\fR .br .sp $ \fBdump fu - | rsh somehost 'buffer -s 16k > /dev/nrst8'\fR .SH SEE ALSO dd(1), tar(1), rsh(1) SHAR_EOF fi # end of overwriting check if test -f 'makefile' then echo shar: will not over-write existing file "'makefile'" else cat << \SHAR_EOF > 'makefile' # Make the buffer program RM=/bin/rm MAKE=/bin/make # Where to install buffer and its manual pages INSTBIN=/usr/local/bin INSTMAN=/usr/man/manl # The manual page section (normally l or 1) S=l buffer: buffer.o sem.o $(CC) -o buffer $(CFLAGS) buffer.o sem.o clean: $(RM) -f *.o core buffer .merrs install: buffer cp buffer $(INSTBIN) chmod 111 $(INSTBIN)/buffer cp buffer.man $(INSTMAN)/buffer.$S chmod 444 $(INSTMAN)/buffer.$S shar: /bin/rm -f buffer.shar shar README buffer.man makefile buffer.c sem.c > buffer.shar SHAR_EOF fi # end of overwriting check if test -f 'buffer.c' then echo shar: will not over-write existing file "'buffer.c'" else cat << \SHAR_EOF > 'buffer.c' /* This is a reblocking process, designed to try and read from stdin * and write to stdout - but to always try and keep the writing side * busy. It is meant to try and stream tape writes. * * This program runs in two parts. The reader and the writer. They * communicate using shared memory with semaphores locking the access. * The shared memory implements a circular list of blocks of data. * * L.McLoughlin, Imperial College, 1990 * * $Log: buffer.c,v $ * Revision 1.3 90/05/15 23:27:46 lmjm * Added -S option (show how much has been writen). * Added -m option to specify how much shared memory to grab. * Now tries to fill this with blocks. * reader waits for writer to terminate and then frees the shared mem and sems. * * Revision 1.2 90/01/20 21:37:59 lmjm * Reset default number of blocks and blocksize for best thruput of * standard tar 10K blocks. * Allow number of blocks to be changed. * Don't need a hole in the circular queue since the semaphores prevent block * clash. * * Revision 1.1 90/01/17 11:30:23 lmjm * Initial revision * */ #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/sem.h> #ifndef lint static char *rcsid = "$Header: /home/gould/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.3 90/05/15 23:27:46 lmjm Exp Locker: lmjm $"; #endif extern char *shmat(); /* General macros */ #define TRUE 1 #define FALSE 0 #define K *1024 /* Some forward declarations */ void byee(); void start_reader_and_writer(); /* When showing print a note every this many bytes writen */ int showevery = 0; #define PRINT_EVERY 10 K /* This is the inter-process buffer - it implements a circular list * of blocks. */ #define DEF_BLOCKSIZE (10 K) #define MAX_BLOCKSIZE (64 K) int blocksize = DEF_BLOCKSIZE; /* Numbers of blocks in the queue. */ #define MAX_BLOCKS 2048 int blocks = 1; /* Circular increment of a buffer index */ #define INC(i) (((i)+1) == blocks ? 0 : ((i)+1)) /* Max ammount of shared memory you can allocate - can't see a way to look * this up. */ #define DEF_SHMEM (1 K K) int max_shmem = DEF_SHMEM; /* Just a flag to show unfilled */ #define NONE (-1) /* the shared memory id of the buffer */ int buffer_id = NONE; struct block { int bytes; char *data; } *curr_block; #define NO_BUFFER ((struct buffer *)-1) struct buffer { /* writer will hang trying to lock this till reader fills in a block */ int blocks_used_lock; /* reader will hang trying to lock this till writer empties a block */ int blocks_free_lock; int next_block_in; int next_block_out; struct block block[ MAX_BLOCKS ]; /* These actual space for the blocks is here - the array extends * pass 1 */ char data_space[ 1 ]; } *pbuffer = NO_BUFFER; int buffer_size; int writer_pid = 0; int debug = 0; char *progname = "buffer"; main( argc, argv ) int argc; char **argv; { parse_args( argc, argv ); set_handlers(); buffer_allocate(); start_reader_and_writer(); byee( 0 ); } parse_args( argc, argv ) int argc; char **argv; { int c; extern char *optarg; extern int optind; char blocks_given = FALSE; while( (c = getopt( argc, argv, "S:dm:s:b:" )) != -1 ){ switch( c ){ case 'S': /* Show every once in a while how much is printed */ showevery = do_size( optarg ); if( showevery <= 0 ) showevery = PRINT_EVERY; break; case 'd': /* debug */ debug = 1; setbuf( stdout, NULL ); setbuf( stderr, NULL ); fprintf( stderr, "debugging turned on\n" ); break; case 'm': /* Max size of shared memory lump */ max_shmem = do_size( optarg ); if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){ fprintf( stderr, "max_shmem %d too low\n", max_shmem ); byee( -1 ); } break; case 'b': /* Number of blocks */ blocks_given = TRUE; blocks = atoi( optarg ); if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){ fprintf( stderr, "blocks %d out of range\n", blocks ); byee( -1 ); } break; case 's': /* Size of a block */ blocksize = do_size( optarg ); if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){ fprintf( stderr, "blocksize %d out of range\n", blocksize ); byee( -1 ); } break; default: fprintf( stderr, "Usage: %s [-S size] [-m memsize] [-b blocks] [-s blocksize]\n", progname ); fprintf( stderr, "-S = show ammount writen every size bytes\n" ); fprintf( stderr, "-m = size of shared mem chunk to grab\n" ); fprintf( stderr, "-b = number of blocks in queue\n" ); fprintf( stderr, "-s = size of a block\n" ); byee( -1 ); } } /* If -b was not given try and work out the max buffer size */ if( !blocks_given ){ blocks = (max_shmem - sizeof( struct buffer )) / blocksize; if( blocks <= 0 ){ fprintf( stderr, "Cannot handle blocks that big, aborting!\n" ); byee( -1 ); } if( MAX_BLOCKS < blocks ){ fprintf( stderr, "Cannot handle that many blocks, aborting!\n" ); byee( -1 ); } } } /* The interrupt handler */ shutdown() { byee( -1 ); } set_handlers() { signal( SIGHUP, shutdown ); signal( SIGINT, shutdown ); signal( SIGQUIT, shutdown ); signal( SIGTERM, shutdown ); if( writer_pid ){ /* This is the reader - propogate the signal to the writer */ kill( writer_pid, SIGTERM ); } } buffer_allocate() { int i; /* Allow for the data space */ buffer_size = sizeof( struct buffer ) + ((blocks * blocksize) - sizeof( char )); /* Create the space for the buffer */ buffer_id = shmget( IPC_PRIVATE, buffer_size, IPC_CREAT|S_IREAD|S_IWRITE ); if( buffer_id < 0 ){ perror( "couldn't create shared memory segment" ); byee( -1 ); } get_buffer(); if( debug ) fprintf( stderr, "pbuffer is 0x%08x, buffer_size is %d [%d x %d]\n", (char *)pbuffer, buffer_size, blocks, blocksize ); bzero( (char *)pbuffer, buffer_size ); pbuffer->blocks_used_lock = -1; pbuffer->blocks_free_lock = -1; pbuffer->blocks_used_lock = new_sem(); /* Start it off locked - it is unlocked when a buffer gets filled in */ lock( pbuffer->blocks_used_lock ); pbuffer->blocks_free_lock = new_sem(); /* start this off so lock() can be called on it for each block * till all the blocks are used up */ sem_set( pbuffer->blocks_free_lock, blocks - 1 ); /* Detattach the shared memory so the fork doesnt do anything odd */ shmdt( (char *)pbuffer ); pbuffer = NO_BUFFER; } buffer_remove() { static char removing = FALSE; int i; /* Avoid accidental recursion */ if( removing ) return; removing = TRUE; /* Buffer not yet created */ if( buffer_id == NONE ) return; /* There should be a buffer so this must be after its detached it * but before the fork picks it up */ if( pbuffer == NO_BUFFER ) get_buffer(); if( debug ) fprintf( stderr, "removing semaphores and buffer\n" ); remove_sem( pbuffer->blocks_used_lock ); remove_sem( pbuffer->blocks_free_lock ); if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 ) perror( "failed to remove shared memory buffer" ); } get_buffer() { int b; /* Grab the buffer space */ pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 ); if( pbuffer == NO_BUFFER ){ perror( "failed to attach shared memory" ); byee( -1 ); } /* Setup the data space pointers */ for( b = 0; b < blocks; b++ ) pbuffer->block[ b ].data = &pbuffer->data_space[ b * blocksize ]; } void start_reader_and_writer() { int status, deadpid; fflush( stdout ); fflush( stderr ); if( (writer_pid = fork()) == -1 ){ perror( "unable to fork" ); byee( -1 ); } else if( writer_pid == 0 ){ /* Never trust fork() to propogate signals - reset them */ set_handlers(); writer(); } else { reader(); /* Now wait for the writer to finish */ while( ((deadpid = wait( &status )) != writer_pid) && deadpid != -1 ) ; } } /* Read from stdin into the buffer */ reader() { if( debug ) fprintf( stderr, "Entering reader\n" ); get_buffer(); while( 1 ){ get_next_free_block(); if( ! fill_block() ) break; } if( debug ) fprintf( stderr, "Exiting reader\n" ); } get_next_free_block() { /* Maybe wait till there is room in the buffer */ lock( pbuffer->blocks_free_lock ); curr_block = &pbuffer->block[ pbuffer->next_block_in ]; pbuffer->next_block_in = INC( pbuffer->next_block_in ); } fill_block() { int bytes; char *start; int toread; start = curr_block->data; toread = blocksize; /* Fill the block with input. This reblocks the input. */ while( toread != 0 && (bytes = read( 0, start, toread )) > 0 ){ start += bytes; toread -= bytes; } if( bytes < 0 ){ perror( "failed to read input" ); byee( -1 ); } /* number of bytes available. Zero will be taken as eof */ curr_block->bytes = blocksize - toread; if( debug ) fprintf( stderr, "got %d bytes\n", curr_block->bytes ); unlock( pbuffer->blocks_used_lock ); return curr_block->bytes; } /* Write the buffer to stdout */ writer() { if( debug ) fprintf( stderr, "\tEntering writer\n" ); get_buffer(); while( 1 ){ get_next_filled_block(); if( !data_to_write() ) break; write_block_to_stdout(); } if( debug ) fprintf( stderr, "\tExiting writer\n" ); } get_next_filled_block() { /* Hang till some data is available */ lock( pbuffer->blocks_used_lock ); curr_block = &pbuffer->block[ pbuffer->next_block_out ]; pbuffer->next_block_out = INC( pbuffer->next_block_out ); } data_to_write() { return curr_block->bytes; } write_block_to_stdout() { static int out = 0; static int next_k; if( write( 1, curr_block->data, curr_block->bytes ) != curr_block->bytes ){ perror( "write of data failed" ); byee( -1 ); } if( showevery ){ out += curr_block->bytes; if( out > next_k ){ fprintf( stderr, "% 8dK\r", out / 1024 ); next_k += showevery; } } unlock( pbuffer->blocks_free_lock ); } void byee( exit_val ) int exit_val; { /* Only the parent (reader) should zap the buffer */ if( writer_pid != 0 ) buffer_remove(); else if( showevery ) fprintf( stderr, "\n" ); exit( exit_val ); } /* Given a string of <num>[<suff>] returns a num * suff = * m/M for 1meg * k/K for 1k * b/B for 512 */ do_size( arg ) char *arg; { char format[ 20 ]; int ret; *format = '\0'; sscanf( arg, "%d%s", &ret, format ); switch( *format ){ case 'm': case 'M': ret = ret K K; break; case 'k': case 'K': ret = ret K; break; case 'b': case 'B': ret *= 512; break; } return ret; } SHAR_EOF fi # end of overwriting check if test -f 'sem.c' then echo shar: will not over-write existing file "'sem.c'" else cat << \SHAR_EOF > 'sem.c' /* Some simple binary semaphore routines */ #include <stdio.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/ipc.h> #include <sys/sem.h> /* Set a semaphore to a particular value - meant to be used before * first lock/unlock */ void sem_set( sem_id, val ) int sem_id; int val; { union semun arg; extern int errno; arg.val = val; errno = 0; semctl( sem_id, 0, SETVAL, arg ); if( errno != 0 ){ perror( "internal error, sem_set" ); exit( -1 ); } } int new_sem() { int sem; sem = semget( IPC_PRIVATE, 1, IPC_CREAT|S_IREAD|S_IWRITE ); if( sem < 0 ){ perror( "internal error, couldn't create semaphore" ); exit( -1 ); } sem_set( sem, 1 ); return sem; } void lock( sem_id ) int sem_id; { struct sembuf sembuf; sembuf.sem_num = 0; sembuf.sem_op = -1; sembuf.sem_flg = 0; if( semop( sem_id, &sembuf, 1 ) == -1 ){ fprintf( stderr, "internal error, lock id %d\n", sem_id ); perror( "lock error" ); exit( -1 ); } } void unlock( sem_id ) int sem_id; { struct sembuf sembuf; sembuf.sem_num = 0; sembuf.sem_op = 1; sembuf.sem_flg = 0; if( semop( sem_id, &sembuf, 1 ) == -1 ){ fprintf( stderr, "internal error, lock id %d\n", sem_id ); perror( "unlock error" ); exit( -1 ); } } void remove_sem( sem_id ) int sem_id; { if( sem_id == -1 ) return; if( semctl( sem_id, 0, IPC_RMID, NULL ) == -1 ) perror( "internal error, failed to remove semaphore" ); } SHAR_EOF fi # end of overwriting check # End of shell archive exit 0