[alt.sources] Buffer: helps write remote tapes *fast*

lmjm@doc.ic.ac.uk (Lee McLoughlin) (07/20/90)

This is a program designed to speed up writing tapes on remote tape
drives.  It runs on SunOS so long as you have shared memory and locks. 

The program splits itself into two processes.  The first process reads
(and reblocks) from stdin into a shared memory buffer.  The second
writes from the shared memory buffer to stdout.  Doing it this way
means that the writing side effectly sits in a tight write loop.

I run an archive and need to write large chunks out to tape regularly
with an ethernet in the way.  Using 'buffer' in a command like:

	tar cvf - stuff | rsh somebox "buffer > /dev/rst8"

is a factor of 5 faster than the best alternative, gnu tar with its
remote tape option:

	tar cvf somebox:/dev/rst8 stuff

It also speeds up remote dumps!

Lee McLoughlin, Imperial College, 1990

Janet: lmjm@uk.ac.ic.doc	Uucp:  lmjm@icdoc.UUCP (or ..!ukc!icdoc!lmjm)
DARPA: lmjm@doc.ic.ac.uk (or lmjm%uk.ac.ic.doc@nsfnet-relay.ac.uk)



#! /bin/sh
# This is a shell archive, meaning:
# 1. Remove everything above the #! /bin/sh line.
# 2. Save the resulting text in a file.
# 3. Execute the file with /bin/sh (not csh) to create the files:
#	README
#	buffer.man
#	makefile
#	buffer.c
#	sem.c
# This archive created: Thu Jul 19 23:43:35 1990
export PATH; PATH=/bin:$PATH
if test -f 'README'
then
	echo shar: will not over-write existing file "'README'"
else
cat << \SHAR_EOF > 'README'
This is a program designed to speed up writing tapes on remote tape
drives.  It runs on SunOS so long as you have shared memory and locks. 

The program splits itself into two processes.  The first process reads
(and reblocks) from stdin into a shared memory buffer.  The second
writes from the shared memory buffer to stdout.  Doing it this way
means that the writing side effectly sits in a tight write loop.

I run an archive and need to write large chunks out to tape regularly
with an ethernet in the way.  Using 'buffer' in a command like:

	tar cvf - stuff | rsh somebox "buffer > /dev/rst8"

is a factor of 5 faster than the best alternative, gnu tar with its
remote tape option:

	tar cvf somebox:/dev/rst8 stuff

It also speeds up remote dumps!

Lee McLoughlin,
Imperial College, 1990

Janet: lmjm@uk.ac.ic.doc	Uucp:  lmjm@icdoc.UUCP (or ..!ukc!icdoc!lmjm)
DARPA: lmjm@doc.ic.ac.uk (or lmjm%uk.ac.ic.doc@nsfnet-relay.ac.uk)


SHAR_EOF
fi # end of overwriting check
if test -f 'buffer.man'
then
	echo shar: will not over-write existing file "'buffer.man'"
else
cat << \SHAR_EOF > 'buffer.man'
.TH BUFFER 1 "14 May 1990"
.SH NAME
buffer \- very fast reblocking program
.SH SYNTAX
.B buffer
[\fB\-S size\fR] [\fB\-b blocks\fR] [\fB\-s size\fR] [\fB\-m size\fR]
.SH OPTIONS
.TP 5
.B \-S size
Approxomately every size bytes show how much has been writen so far.
.TP
.B \-m size
maximum size of the shared memory chunk to allocate for the circular
queue. Defaults to one megabyte.
.TP
.B \-b blocks
Number of blocks to allocate to shared memory circular buffer.
Defaults the number required to fill up the shared memory requested.
.TP
.B \-s size
size in bytes of each block.
The default blocksize is 10k to match the normal output of the
.I tar(1)
program.
.PP
Sizes are a number with an optional trailing character.   A 'b' 
multiplies the size by 512, a 'k' by 1024 and an 'm' by a meg.
.SH DESCRIPTION
.I Buffer
reads from standard input reblocking to the given blocksize and writes
each block to standard output.
.PP
Internally
.I buffer
is a pair of processes communicating via a large circular queue held
in shared memory.  The reader process only has to block when the queue
is full and the writer process when the queue is empty.
.I Buffer
is designed to try and keep the writer side continuously busy so that
it can stream on suitable tape drives.  In particular when used to write
tapes with an intervening network using
.I buffer
to reblock can result in a considerable performance increase.
.SH EXAMPLES
.br
$ \fBbuffer < /etc/termcap > /dev/rst8\fR
.br
.sp
$ \fBtar cf - | rsh somehost 'buffer > /dev/rst8'\fR
.br
.sp
$ \fBdump fu - | rsh somehost 'buffer -s 16k > /dev/nrst8'\fR
.SH SEE ALSO
dd(1), tar(1), rsh(1)
SHAR_EOF
fi # end of overwriting check
if test -f 'makefile'
then
	echo shar: will not over-write existing file "'makefile'"
else
cat << \SHAR_EOF > 'makefile'
# Make the buffer program
RM=/bin/rm
MAKE=/bin/make
# Where to install buffer and its manual pages
INSTBIN=/usr/local/bin
INSTMAN=/usr/man/manl
# The manual page section (normally l or 1)
S=l

buffer: buffer.o sem.o
	$(CC) -o buffer $(CFLAGS) buffer.o sem.o

clean:
	$(RM) -f *.o core buffer .merrs

install: buffer
	cp buffer $(INSTBIN)
	chmod 111 $(INSTBIN)/buffer
	cp buffer.man $(INSTMAN)/buffer.$S
	chmod 444 $(INSTMAN)/buffer.$S

shar:
	/bin/rm -f buffer.shar
	shar README buffer.man makefile buffer.c sem.c > buffer.shar
SHAR_EOF
fi # end of overwriting check
if test -f 'buffer.c'
then
	echo shar: will not over-write existing file "'buffer.c'"
else
cat << \SHAR_EOF > 'buffer.c'
/* This is a reblocking process, designed to try and read from stdin
 * and write to stdout - but to always try and keep the writing side
 * busy.  It is meant to try and stream tape writes.
 *
 * This program runs in two parts.  The reader and the writer.  They
 * communicate using shared memory with semaphores locking the access.
 * The shared memory implements a circular list of blocks of data.
 *
 * L.McLoughlin, Imperial College, 1990
 *
 * $Log:	buffer.c,v $
 * Revision 1.3  90/05/15  23:27:46  lmjm
 * Added -S option (show how much has been writen).
 * Added -m option to specify how much shared memory to grab.
 * Now tries to fill this with blocks.
 * reader waits for writer to terminate and then frees the shared mem and sems.
 * 
 * Revision 1.2  90/01/20  21:37:59  lmjm
 * Reset default number of  blocks and blocksize for best thruput of
 * standard tar 10K blocks.
 * Allow number of blocks to be changed.
 * Don't need a hole in the circular queue since the semaphores prevent block
 * clash.
 * 
 * Revision 1.1  90/01/17  11:30:23  lmjm
 * Initial revision
 * 
 */
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>

#ifndef lint
static char *rcsid = "$Header: /home/gould/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.3 90/05/15 23:27:46 lmjm Exp Locker: lmjm $";
#endif

extern char *shmat();

/* General macros */
#define TRUE 1
#define FALSE 0
#define K *1024

/* Some forward declarations */
void byee();
void start_reader_and_writer();

/* When showing print a note every this many bytes writen */
int showevery = 0;
#define PRINT_EVERY 10 K

/* This is the inter-process buffer - it implements a circular list
 * of blocks. */

#define DEF_BLOCKSIZE (10 K)
#define MAX_BLOCKSIZE (64 K)
int blocksize = DEF_BLOCKSIZE;

/* Numbers of blocks in the queue. 
 */
#define MAX_BLOCKS 2048
int blocks = 1;
/* Circular increment of a buffer index */
#define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))

/* Max ammount of shared memory you can allocate - can't see a way to look
 * this up.
 */
#define DEF_SHMEM (1 K K)
int max_shmem = DEF_SHMEM;

/* Just a flag to show unfilled */
#define NONE (-1)

/* the shared memory id of the buffer */
int buffer_id = NONE;
struct block {
	int bytes;
	char *data;
} *curr_block;

#define NO_BUFFER ((struct buffer *)-1)
struct buffer {
	/* writer will hang trying to lock this till reader fills in a block */
	int blocks_used_lock;
	/* reader will hang trying to lock this till writer empties a block */
	int blocks_free_lock;

	int next_block_in;
	int next_block_out;

	struct block block[ MAX_BLOCKS ];

	/* These actual space for the blocks is here - the array extends
	 * pass 1 */
	char data_space[ 1 ];
} *pbuffer = NO_BUFFER;
int buffer_size;

int writer_pid = 0;
int debug = 0;
char *progname = "buffer";

main( argc, argv )
	int argc;
	char **argv;
{
	parse_args( argc, argv );

	set_handlers();

	buffer_allocate();

	start_reader_and_writer();

	byee( 0 );
}

parse_args( argc, argv )
	int argc;
	char **argv;
{
	int c;
	extern char *optarg;
	extern int optind;
	char blocks_given = FALSE;

	while( (c = getopt( argc, argv, "S:dm:s:b:" )) != -1 ){
		switch( c ){
		case 'S':
			/* Show every once in a while how much is printed */
			showevery = do_size( optarg );
			if( showevery <= 0 )
				showevery = PRINT_EVERY;
			break;
		case 'd':	/* debug */
			debug = 1;
			setbuf( stdout, NULL );
			setbuf( stderr, NULL );
			fprintf( stderr, "debugging turned on\n" );
			break;
		case 'm':
			/* Max size of shared memory lump */
			max_shmem = do_size( optarg );

			if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
				fprintf( stderr, "max_shmem %d too low\n", max_shmem );
				byee( -1 );
			}
			break;
		case 'b':
			/* Number of blocks */
			blocks_given = TRUE;
			blocks = atoi( optarg );
			if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
				fprintf( stderr, "blocks %d out of range\n", blocks );
				byee( -1 );
			}
			break;
		case 's':	/* Size of a block */
			blocksize = do_size( optarg );

			if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
				fprintf( stderr, "blocksize %d out of range\n", blocksize );
				byee( -1 );
			}
			break;
		default:
			fprintf( stderr, "Usage: %s [-S size] [-m memsize] [-b blocks] [-s blocksize]\n",
				progname );
			fprintf( stderr, "-S = show ammount writen every size bytes\n" );
			fprintf( stderr, "-m = size of shared mem chunk to grab\n" );
			fprintf( stderr, "-b = number of blocks in queue\n" );
			fprintf( stderr, "-s = size of a block\n" );
			byee( -1 );
		}
	}

	/* If -b was not given try and work out the max buffer size */
	if( !blocks_given ){
		blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
		if( blocks <= 0 ){
			fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
			byee( -1 );
		}
		if( MAX_BLOCKS < blocks  ){
			fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
			byee( -1 );
		}
	}
}

/* The interrupt handler */
shutdown()
{
	byee( -1 );
}

set_handlers()
{
	signal( SIGHUP, shutdown );
	signal( SIGINT, shutdown );
	signal( SIGQUIT, shutdown );
	signal( SIGTERM, shutdown );

	if( writer_pid ){
		/* This is the reader - propogate the signal to the writer */
		kill( writer_pid, SIGTERM );
	}
}

buffer_allocate()
{
	int i;

	/* Allow for the data space */
	buffer_size = sizeof( struct buffer ) +
		((blocks * blocksize) - sizeof( char ));

	/* Create the space for the buffer */
	buffer_id = shmget( IPC_PRIVATE,
			   buffer_size,
			   IPC_CREAT|S_IREAD|S_IWRITE );
	if( buffer_id < 0 ){
		perror( "couldn't create shared memory segment" );
		byee( -1 );
	}

	get_buffer();

	if( debug )
		fprintf( stderr, "pbuffer is 0x%08x, buffer_size is %d [%d x %d]\n",
			(char *)pbuffer, buffer_size, blocks, blocksize );

	bzero( (char *)pbuffer, buffer_size );
	pbuffer->blocks_used_lock = -1;
	pbuffer->blocks_free_lock = -1;

	pbuffer->blocks_used_lock = new_sem();
	/* Start it off locked - it is unlocked when a buffer gets filled in */
	lock( pbuffer->blocks_used_lock );

	pbuffer->blocks_free_lock = new_sem();
	/* start this off so lock() can be called on it for each block
	 * till all the blocks are used up */
	sem_set( pbuffer->blocks_free_lock, blocks - 1 );

	/* Detattach the shared memory so the fork doesnt do anything odd */
	shmdt( (char *)pbuffer );
	pbuffer = NO_BUFFER;
}

buffer_remove()
{
	static char removing = FALSE;
	int i;

	/* Avoid accidental recursion */
	if( removing )
		return;
	removing = TRUE;

	/* Buffer not yet created */
	if( buffer_id == NONE )
		return;

	/* There should be a buffer so this must be after its detached it
	 * but before the fork picks it up */
	if( pbuffer == NO_BUFFER )
		get_buffer();

	if( debug )
		fprintf( stderr, "removing semaphores and buffer\n" );
	remove_sem( pbuffer->blocks_used_lock );
	remove_sem( pbuffer->blocks_free_lock );
	
	if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 )
		perror( "failed to remove shared memory buffer" );
}

get_buffer()
{
	int b;

	/* Grab the buffer space */
	pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
	if( pbuffer == NO_BUFFER ){
		perror( "failed to attach shared memory" );
		byee( -1 );
	}

	/* Setup the data space pointers */
	for( b = 0; b < blocks; b++ )
		pbuffer->block[ b ].data =
			&pbuffer->data_space[ b * blocksize ];

}

void
start_reader_and_writer()
{
	int status, deadpid;

	fflush( stdout );
	fflush( stderr );

	if( (writer_pid = fork()) == -1 ){
		perror( "unable to fork" );
		byee( -1 );
	}
	else if( writer_pid == 0 ){
		/* Never trust fork() to propogate signals - reset them */
		set_handlers();

		writer();
	}
	else {
		reader();

		/* Now wait for the writer to finish */
		while( ((deadpid = wait( &status )) != writer_pid) &&
			deadpid != -1 )
			;
	}
}

/* Read from stdin into the buffer */
reader()
{
	if( debug )
		fprintf( stderr, "Entering reader\n" );

	get_buffer();

	while( 1 ){
		get_next_free_block();
		if( ! fill_block() )
			break;
	}

	if( debug )
		fprintf( stderr, "Exiting reader\n" );
}

get_next_free_block()
{
	/* Maybe wait till there is room in the buffer */
	lock( pbuffer->blocks_free_lock );

	curr_block = &pbuffer->block[ pbuffer->next_block_in ];

	pbuffer->next_block_in = INC( pbuffer->next_block_in );
}

fill_block()
{
	int bytes;
	char *start;
	int toread;

	start = curr_block->data;
	toread = blocksize;
	
	/* Fill the block with input.  This reblocks the input. */
	while( toread != 0 && (bytes = read( 0, start, toread )) > 0 ){
		start += bytes;
		toread -= bytes;
	}

	if( bytes < 0 ){
		perror( "failed to read input" );
		byee( -1 );
	}

	/* number of bytes available. Zero will be taken as eof */
	curr_block->bytes = blocksize - toread;

	if( debug )
		fprintf( stderr, "got %d bytes\n", curr_block->bytes );

	unlock( pbuffer->blocks_used_lock );

	return curr_block->bytes;
}


/* Write the buffer to stdout */
writer()
{
	if( debug )
		fprintf( stderr, "\tEntering writer\n" );

	get_buffer();

	while( 1 ){
		get_next_filled_block();
		if( !data_to_write() )
			break;
		write_block_to_stdout();
	}

	if( debug )
		fprintf( stderr, "\tExiting writer\n" );
}

get_next_filled_block()
{
	/* Hang till some data is available */
	lock( pbuffer->blocks_used_lock );

	curr_block = &pbuffer->block[ pbuffer->next_block_out ];

	pbuffer->next_block_out = INC( pbuffer->next_block_out );
}

data_to_write()
{
	return curr_block->bytes;
}

write_block_to_stdout()
{
	static int out = 0;
	static int next_k;

	if( write( 1, curr_block->data, curr_block->bytes ) != curr_block->bytes ){
		perror( "write of data failed" );
		byee( -1 );
	}

	if( showevery ){
		out += curr_block->bytes;
		if( out > next_k ){
			fprintf( stderr, "% 8dK\r", out / 1024 );
			next_k += showevery;
		}
	}

	unlock( pbuffer->blocks_free_lock );
}


void
byee( exit_val )
	int exit_val;
{
	/* Only the parent (reader) should zap the buffer */
	if( writer_pid != 0 )
		buffer_remove();
	else if( showevery )
		fprintf( stderr, "\n" );

	exit( exit_val );
}

/* Given a string of <num>[<suff>] returns a num
 * suff =
 *   m/M for 1meg
 *   k/K for 1k
 *   b/B for 512
 */
do_size( arg )
	char *arg;
{
	char format[ 20 ];
	int ret;

	*format = '\0';
	sscanf( arg, "%d%s", &ret, format );

	switch( *format ){
	case 'm':
	case 'M':
		ret = ret K K;
		break;
	case 'k':
	case 'K':
		ret = ret K;
		break;
	case 'b':
	case 'B':
		ret *= 512;
		break;
	}
	
	return ret;
}
SHAR_EOF
fi # end of overwriting check
if test -f 'sem.c'
then
	echo shar: will not over-write existing file "'sem.c'"
else
cat << \SHAR_EOF > 'sem.c'
/* Some simple binary semaphore routines */
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/sem.h>

/* Set a semaphore to a particular value - meant to be used before
 * first lock/unlock */
void
sem_set( sem_id, val )
	int sem_id;
	int val;
{
	union semun arg;
	extern int errno;

	arg.val = val;

	errno = 0;
	semctl( sem_id, 0, SETVAL, arg );
	if( errno != 0 ){
		perror( "internal error, sem_set" );
		exit( -1 );
	}
}
	
int
new_sem()
{
	int sem;

	sem = semget( IPC_PRIVATE, 1, IPC_CREAT|S_IREAD|S_IWRITE );
	if( sem < 0 ){
		perror( "internal error, couldn't create semaphore" );
		exit( -1 );
	}
	sem_set( sem, 1 );

	return sem;
}

void
lock( sem_id )
	int sem_id;
{
	struct sembuf sembuf;

	sembuf.sem_num = 0;
	sembuf.sem_op = -1;
	sembuf.sem_flg = 0;

	if( semop( sem_id, &sembuf, 1 ) == -1 ){
		fprintf( stderr, "internal error, lock id %d\n", sem_id );
		perror( "lock error" );
		exit( -1 );
	}
}

void
unlock( sem_id )
	int sem_id;
{
	struct sembuf sembuf;

	sembuf.sem_num = 0;
	sembuf.sem_op = 1;
	sembuf.sem_flg = 0;

	if( semop( sem_id, &sembuf, 1 ) == -1 ){
		fprintf( stderr, "internal error, lock id %d\n", sem_id );
		perror( "unlock error" );
		exit( -1 );
	}
}

void
remove_sem( sem_id )
	int sem_id;
{
	if( sem_id == -1 )
		return;

	if( semctl( sem_id, 0, IPC_RMID, NULL ) == -1 )
		perror( "internal error, failed to remove semaphore" );
}
SHAR_EOF
fi # end of overwriting check
#	End of shell archive
exit 0