[alt.sources] mx message queue utilities

dbin@norsat.UUCP (Dave Binette) (09/22/89)

Several weeks ago I posted requests for code fragments using message
queues, semaphores, and shared memory.

I was *very* happy to receive numerous replies.  In return I offer
some of the fruits of that generosity and my thanks.

This package (my first source submission) provides a message queue
utility to create and delete message Queues, receive and send data.
The message Queues may be referred to by QID or QKEY.

I still have copies of the semaphore and shared memory code fragments
and will distribute as requested, including a *simple* low overhead
16-user chat program I wrote that uses shared memory.

--------------------------------------- cut here ----------------
# This is a shell archive.  Remove anything before this line,
# then unpack it by saving it in a file and typing "sh file".
#
# Wrapped by dbin on Thu Sep 21 21:58:23 PDT 1989
# Contents:  README makefile mx.1 mx.c
 
echo x - README
sed 's/^@//' > "README" <<'@//E*O*F README//'
Message Queue I/O
Released to PUBLIC DOMAIN by David J. Binette
in appreciation of the replies to my request for IPC code fragments
on the USEers NETwork.

I wrote this code to utilize message Q's (which can be slow but handy)
Pipes are faster!

A single executable called   "mx"   is compiled.
"mx" is then linked to "mr"  and  "ms".
mx -i    is equivalent to  ms     (Message Send)
mx -o    is equivalent to  mr     (Message Receive)

The only tunable parameters in the code are:

#define		MBUFSIZ	8192			/* maximum size of a message */
#define		NAPTIME	100L			/* for waiting on msgsend limit */

Here are some (awful) examples of use:

ls -al | ms -Q73,600 -w
#NOTE: ms will output the 'q' number of Q73   verify with ipcs if you desire
mr -Q73

The above example will send the outpout from the 'ls' command
into a message Q with a key of 73 and permission mode 0600.

The -Q73,600  causes the Q to be created with a key of 73 if it did not
exist. The q id number is reported to stderr.

Other ways to create message Q's are:
mr -Qkey,permission
mx -Qkey,permission


NOTE: -Qkey   is a Hexadecimal number to maintain compatability with ipcs.
      -qId    is a Decimal #, again to maintain compatability with ipcs.


The Permission flag is optional and unneccessary if the Q already exists.

You can delete queues with:
mx -d#ID
or
mx -d#KEY

When sending/receiving messages to/from the Q, a message "type" may be
specified with the -n option.

To prevent mx from using too many available Q slots you can use the -l#
option to specify that the message slot may not exceed the # specified.
The -w option will cause mx to wait until the Q drains to the specified
limit if the Q slots were occupied.


When receiving messages, the -w option means to wait until a message is
available, and the -c option means "continuous", ie: don't exit after
reading the message, instead wait for another message (-w is implied)


The -t option indicates it is ok to truncate received messages.


The -v option generates verbose info about the Q.
Try mr -Q# -v -c    while feeding the Q from another terminal or process.


Finally the -help option generates too much text but may be informative.


Again, I'd like to thank all those from USENET who responded to my requests
for code fragments and help regarding message Q's

-
uucp:  {uunet,ubc-cs}!van-bc!norsat!dbin | 302-12886 78th Ave
bbs:   (604)597-4361     24/12/PEP/3     | Surrey BC CANADA
voice: (604)597-6298     (Dave Binette)  | V3W 8E7
@//E*O*F README//
chmod u=rw,g=r,o=r README
 
echo x - makefile
sed 's/^@//' > "makefile" <<'@//E*O*F makefile//'
# compilation for "mx" under SCO XENIX 2.3.1
# sysname=XENIX
# nodename=norsat
# release=2.3.1
# version=SysV
# machine=i80386
# origin=3
#
# The source code for mx.c, the executables mx, mr and ms
# as well as the accompanying documentation README and mx.1
# are placed in the public domain.
# Use it, sell it, do what you will.
# If your nice, you'll leave my name on it.
# If your kind, and you modify it, you'll put your name on it too!
# Author: uunet!norsat!dbin (David J. Binette)

mx: mx.c
	cc -M3e -W2 -Od mx.c -s -o mx -lx 
	rm -f mr ms
	ln mx mr
	ln mx ms
	@echo Done! "mx" created and linked to "mr" and "ms"
@//E*O*F makefile//
chmod u=rw,g=r,o=r makefile
 
echo x - mx.1
sed 's/^@//' > "mx.1" <<'@//E*O*F mx.1//'
@.TH MX 1 NORSAT
@.SH NAME
mx, mr, ms \- Message Q Utilities
@.SH SYNOPSIS
@.B mx
@.RB [\- "\ vqQcdDinostw" ]
@.br
@.B mr
@.RB [\- "\ vqQcdDnstw" ]
@.br
@.B ms
@.RB [\- "\ vqQdDlnw" ]
@.SH DESCRIPTION
This program sends and receives data via message Queues.

V1.0 Released to PUBLIC DOMAIN by David J. Binette in appreciation of the replies to a request for code fragments on USENET.

@.RE
The executable "mx" may be linked to "mr" and "ms".
@.RE
@.I mx -i
is equivalent to
@.I ms
(Message Send).
@.br
@.RE
@.I mx -o
is equivalent to
@.I mr
(Message Receive).
@.PP
The meanings of the available options are:
@.PP
@.PD 0

@.TP
@.B -v#Verbose
Sets the Verbosity level.
@.br
0=Quiet.
@.br
1=Normal.
@.br
2=Extended.
@.br
3=Verbosely describe the status of the message Queue before displaying the received message.
@.br

@.TP
@.B -d#ID
Delete the message Queue specified by Queue ID.
@.br

@.TP
@.B -D#KEY
Delete the message Queue specified by Queue KEY.
@.br

@.TP
@.B -q#ID
Specify the message Queue ID of an existing message Q.
@.br

@.TP
@.B -Q#KEY
Specify the message Queue KEY of an existing message Q.
The KEY is a hexadecimal number (numbers 0-9, digits A-F).
@.br

@.TP
@.B -Q#KEY,#PERM
Specify the message Queue KEY of an existing message Q, or create it with the the specified PERMisions.
@.br

@.TP
@.B -n#Message type
An optional message type (Decimal) may be specified to identify the message being sent, or to restrict the type of message(s) received.
@.br

@.TP
@.B -w
When Reading from the message Queue wait for a message to arrive.
@.br
When Writing to the message Q don't wait for it to be successfull.
@.br

@.TP
@.B -i
Used by "mx" to perform as "ms" (input to Q).
@.br

@.TP
@.B -o
Used by "mx" to perform as "mr" (output from Q).
@.br

@.TP
@.B -l#LIMIT
When sending messages to the message Q, wait until the number of pending messages on the Q falls below LIMIT.
@.br

@.TP
@.B -s#SIZE
Specify the maximum SIZE of a received message.
@.br

@.TP
@.B -t
Truncation of received messages is not to be considered an error.
@.br

@.TP
@.B -c
Receive messages continuously without exiting.
@.br

@.TP
@.B -help
Displays examples and usage help.
@.br

@.SH EXAMPLES
Create a message Queue with a Queue KEY of 27 (Hex).
@.br
mx -Q27,660
@.br

Send data to a message Queue.
@.br
cal | ms -Q27
@.br

Read data from the message Queue.
@.br
mr -Q27
@.br

Wait for Data to arrive on message Queue.
@.br
mr -Q27 -w
@.br

@.RE
@.SH SEE ALSO
ipcs(C),
ipcrm(C),
msgctl(S),
msgop(S),
msgget(S),
msgop(S),
intro(S)

@.SH RETURN VALUE
Returns 0 for normal successfull exit.
@.br
Returns 255 if help was requested.
@.br
Failed calls return with the errno of the failed function.


@.SH NOTES
No warranties expressed or implied, use at your own risk.

@.SH BUGS
The -v option must be the first option to be effective.
@//E*O*F mx.1//
chmod u=rw,g=r,o=r mx.1
 
echo x - mx.c
sed 's/^@//' > "mx.c" <<'@//E*O*F mx.c//'
/* mx.c
 *
 * Message Queue I/O
 * Released to public domain by David J. Binette
 * in appreciation of the replies to my request for
 * IPC code fragments on the USEers NETwork.
 *
 * link to   ms  for message send
 * link to   mr  for message receive
 * or invoke as mx -i (input to message Q)
 * or invoke as mx -o (output from Message Q)
 *
 * The source code for mx.c, the executables mx, mr and ms
 * as well as the accompanying documentation README and mx.1
 * are placed in the public domain.
 * Use it, sell it, do what you will.
 * If your nice, you'll leave my name on it.
 * If your kind, and you modify it, you'll put your name on it too!
*/

/*
 * Program Name    : mx
 * Associated files: mx.1 makefile (executable links to mr ms)
 * Author          : uunet!norsat!dbin (David J. Binette)
 * Version         : 1.0
 * Release Date    : Thu Sep 21 21:38:47 PDT 1989
 * Revision History:
 *                 :
 *                 :
 *                 :
 *                 :
*/


#include	<stdio.h>
#include	<string.h>
#include	<sys/types.h>
#include	<sys/ipc.h>
#include	<sys/msg.h>

#define		MBUFSIZ	8192		/* system dependant you might change this */
#define		NAPTIME	100L		/* for waiting on msgsend limit */

#define		er(t)	fputs(t,stderr)

#define		UNSPEC	0			/* transit state for mx */
#define		RCVING	1			/* transit state for mr */
#define		XMTING	2			/* transit state for ms */

extern int errno;
char *	progname;
int		transit;
int		Verbose;
int		rval;

/* ----------------------------------------------------------------- */

void usage()
{
	fprintf(stderr,"usage: %s [-v#Verbose] -q#QID|-Q#KEY|-Q#CreateKEY,Perm [-n#MsgType]\n",progname);
	if(transit==UNSPEC)
		er("          -iInput|-oOutput\n");
	if(transit!=RCVING)
		er("          [-l#limit]\n");
	if(transit!=XMTING)
		er("          [-s#Size] [-tTruncateOk] [-cContinuous]\n");
	er("          [-wWait]\n");
	fprintf(stderr,"Type %s -help for more help\n",progname);

	exit(255);
}

/* ----------------------------------------------------------------- */

void help()
{
	er("\n\n\n\n\nMessage Queue I/O\n\n");
	er("Released to public domain by David J. Binette       uunet!van-bc!norsat!dbin\n");
	er("in appreciation of the replies to my request\n");
	er("for IPC code fragments on the USEers NETwork.\n\n\n");
	er("link to       ms      for message send\n");
	er("link to       mr      for message receive\n");
	er("or invoke as  mx -i  (input to message Q)\n");
	er("or invoke as  mx -o  (output from Message Q)\n\n");
	er("Lowercase 'q' refers to the \"QID\"\n");
	er("UPPERCASE 'Q' refers to the \"KEY\"\n\n");
	er("usage: mx -q#QID|-Q#KEY|-Q#CreateKEY,Perm [-n#MsgType] [-wWait]\n");
	er("          -iInput|-oOutput\n");
	er("          [-l#limit]\n");
	er("          [-s#Size] [-tTruncateOk] [-cContinuous] [-v#Verbose]\n\n");
	er("Verbose may be 0=quiet, 1=default, 2=extended, 3=Detailed\n");
	er("\n");
	er("Type mx -help for this help\n\n\n");

	er("--More--[Press ENTER]--"); getchar();
	er("\n\nNew message Q's may be created with the desired 'KEY' (ie 7) via:\n");
	er("mx -Q7,660           where 7 is the KEY and 660 is the permissions.\n");
	er("mr -Q7,600           If a new Q is created for the KEY, the QID of\n");
	er("ms -Q7,644           the newly created will be sent to stderr.\nor\n");
	er("mx -Q0,666           Make a \"PRIVATE\" Q accessable by all\n\n\n");
	er("--More--[Press ENTER]--"); getchar();
	er("\n\nSend data to the Q via 'ms' or 'mx -i'\n");
	er("Use lowercase q to send to the QID number          : cal | ms -q7\n");
	er("Use capital Q to send to the 'KEY' number          : cal | ms -Q7\n");
	er("The message \"type\" may be specified via -n         : cal | mx -i -Q7 -n3\n");
	er("Wait until the # of messages < \"limit\" via -l      : cal | mx -i -Q7 -n3 -w -l10\n\n");
	er("Receive data from the Q using 'mr' or 'mx -o'\n");
	er("Use lowercase q to receive from the QID number     : mx -o -q3410\n");
	er("Use capital Q to receive from the 'KEY' number     : mr -Q7\n");
	er("Wait on a Q, output data as it arrives             : mr -Q8579309 -c\n");
	er("Wait for a single message to arrive Q via -c       : mx -o -Q8579309 -c\n");
	er("Wait for a specific message 'type' via -n          : mr -Q7 -n3\n\n");
	exit(255);
}

/* ----------------------------------------------------------------- */

void showerr(s,r)
char *s;
int	r;
{
	if(Verbose)
		perror(progname);
	if(Verbose>1)
		fprintf(stderr,"%s: (%d)\n",s,r);
}

/* ----------------------------------------------------------------- */

char * basename(s)
char * s;
{
	char * p;

	p=strrchr(s,'/');
	return( p ? ++p : s );
}

/* ----------------------------------------------------------------- */

int	findq(key,perm)
key_t	key;
int		perm;
{
	int	qid;
	
	if((qid=msgget(key,perm))<0)
	{
		rval=errno;
		showerr("msgget",rval);
		exit(rval);
	}

	if(perm & IPC_CREAT)
		fprintf(stderr, "%d\n", qid);

	return(qid);
}

/* ----------------------------------------------------------------- */

void deleteq(qid)
int	qid;
{
    if(msgctl(qid,IPC_RMID,NULL)== -1)
	{
		rval=errno;
		showerr("msgctl rmid",rval);
		exit(rval);
	}
}

/* ----------------------------------------------------------------- */

void main(argc, argv)
int argc;
char *argv[];
{
	int		infinite=0;
	int		arg=0;
	int		qperm=0;
	int		c;
	int		msgsz=MBUFSIZ;
	int		msgqid=IPC_PRIVATE;
	int		msgflg=IPC_NOWAIT;
	int		msgtyp=0L;
	key_t	msgkey=IPC_PRIVATE;
	int		limit= -1;
	struct msqid_ds qbuf;

	struct
	{
		long	mtype;
		char	mtext[MBUFSIZ];
	} msgbuf;

	msgbuf.mtype=1L;
	progname=basename(argv[0]);
	transit=UNSPEC;
	Verbose=1;					/* normal error messages */
	rval=0;

	if(!strcmp(progname,"ms"))		/* linked to ms for sending */
		transit=XMTING;
	else
		if(!strcmp(progname,"mr"))	/* linked to ms for receiving */
			transit=RCVING;

	if(argc<2)
		usage();

	while(++arg<argc)
	{
		if(argv[arg][0]!='-')
			usage();

		switch(argv[arg][1])
		{
		case 'h':
					help();
					break;
		case 'i':
					if(transit==UNSPEC)
						transit=XMTING;
					else
						usage();
					break;
		case 'o':
					if(transit==UNSPEC)
						transit=RCVING;
					else
						usage();
					break;
		case 'n':
					msgtyp=atol(&argv[arg][2]);
					msgbuf.mtype=msgtyp;
					break;
		case 'w':
					if(infinite==0)
						msgflg&=~IPC_NOWAIT;
					break;
		case 's':
					if(transit!=RCVING)
						usage();
					msgsz=atoi(&argv[arg][2]);
					break;
		case 'v':
					Verbose=atoi(&argv[arg][2]);
					break;
		case 't':
					if(transit!=RCVING)
						usage();
					msgflg|=MSG_NOERROR;
					break;
		case 'c':
					if(transit!=RCVING)
						usage();
					infinite=1;
					msgflg&=~IPC_NOWAIT;
					break;
		case 'l':
					limit=atoi(&argv[arg][2]);
					break;
		case 'd':
					msgqid=atoi(&argv[arg][2]);
					deleteq(msgqid);
					break;
		case 'D':
					if(sscanf(&argv[arg][2],"%lx",&msgkey)!=1)
						usage();
					msgqid = findq( msgkey, 0 );
					deleteq(msgqid);
					break;
		case 'q':
					msgqid=atoi(&argv[arg][2]);
					break;
		case 'Q':
					qperm=0;
					switch(sscanf(&argv[arg][2],"%lx,%o",&msgkey,&qperm))
					{
					case 1:
							qperm=0;
							break;
					case 2:
							if(qperm==0)
								qperm=0600;
							qperm |= IPC_CREAT;
							break;
					default:
							usage();
							break;
					}
					msgqid = findq( msgkey, qperm );
					break;
		default:	usage();
		}
	}
	
	if(transit==XMTING)					/* sending messages */
	{
		fclose(stdout);					/* not needed */
		msgsz=0;
		while((msgsz < MBUFSIZ) && ((c=getchar()) !=EOF))
			msgbuf.mtext[msgsz++] = (char)c;
		
		if(limit>=0)					/* if enforcing limit */
		{
			for( ; ; )
			{
				if(msgctl(msgqid, IPC_STAT, &qbuf)<0)
				{
					rval=errno;
					showerr("msgctl stat",rval);
					exit(rval);
				}
				if(qbuf.msg_qnum <= (ushort)limit)
					break;
				nap(NAPTIME);
			}
		}

		if((rval=msgsnd(msgqid, &msgbuf, msgsz, msgflg)) == -1)
		{
			rval=errno;
			showerr("msgsnd",rval);
		}
	}
	else
	if(transit==RCVING)						/* sending messages */
	{
		fclose(stdin);						/* not needed */
		do
		{
			if((arg=msgrcv(msgqid, &msgbuf, msgsz, msgtyp, msgflg))== -1)
			{
				rval=errno;
				showerr("msgrcv",rval);
				infinite=0;
			}
			else
			{
				if(Verbose>2)
				{
					printf("------------------- Verbose Msgctl Stats -------------------\n");
					if((rval=msgctl(msgqid, IPC_STAT, &qbuf))<0)
					{
						rval=errno;
						showerr("msgctl stat",rval);
					}
					else
					{
						printf("uid=%-7d    gid=%-7d    cuid=%-7d   cgid=%-7d\n",
								qbuf.msg_perm.uid,
								qbuf.msg_perm.gid,
								qbuf.msg_perm.cuid,
								qbuf.msg_perm.cgid);
						printf("mode=O%-7o  seq=%-7d    qnum=%-7d   key=%ld\n",
								qbuf.msg_perm.mode,
								qbuf.msg_perm.seq,
								qbuf.msg_qnum,
								(long)qbuf.msg_perm.key);
						printf("cbytes=%-14d qbytes=%-14d  rbytes=%-14d\n",
								qbuf.msg_cbytes,
								qbuf.msg_qbytes,
								arg);
						printf("rtime=%-14ld  stime=%-14ld   ctime=%-14ld\n",
								qbuf.msg_rtime,
								qbuf.msg_stime,
								qbuf.msg_ctime);
						printf("lrpid=%-14d  lspid=%-14d\n",
								qbuf.msg_lrpid,
								qbuf.msg_lspid);
						printf("type   = %ld\n",msgbuf.mtype);
					}
					printf("------------------------------------------------------------\n");
				}
				c=0;
				while(c<arg)
					putchar(msgbuf.mtext[c++]);
				putchar('\n');			/* you might not want this */
				arg=0;
			}
		}
		while(infinite);
	}
	exit(rval);
}

/* end */
@//E*O*F mx.c//
chmod u=rw,g=r,o=r mx.c
 
echo Inspecting for damage in transit...
temp=/tmp/shar$$; dtemp=/tmp/.shar$$
trap "rm -f $temp $dtemp; exit" 0 1 2 3 15
cat > $temp <<\!!!
     77    428   2486 README
     22    121    621 makefile
    165    497   2728 mx.1
    389   1037   9385 mx.c
    653   2083  15220 total
!!!
wc  README makefile mx.1 mx.c | sed 's=[^ ]*/==' | diff -b $temp - >$dtemp
if [ -s $dtemp ]
then echo "Ouch [diff of wc output]:" ; cat $dtemp
else echo "No problems found."
fi
exit 0
---
"If I was smarter than I was bad, I wouldn't get in any trouble"
(Laura my 4 yr. old daughter)
uucp:  {uunet,ubc-cs}!van-bc!norsat!dbin | 302-12886 78th Ave
bbs:   (604)597-4361     24/12/PEP/3     | Surrey BC CANADA
voice: (604)597-6298     (Dave Binette)  | V3W 8E7