[comp.parallel] Linda, a toy implementation

jwb@cepmax.ncsu.edu (John W. Baugh Jr.) (02/20/91)

The recent requests for Linda on distributed workstations prompted my
curosity about how one could implement its (her?) primitives.  Since
most (?) folks have some kind of distributed file system, a toy
implementation could simply use a (shared) file as tuple space (TS)
with printf and scanf for output to and selective input from TS, with
each tuple being separated by a newline.  Workers can be invoked on
remote machines using rsh, e.g.,

   system("rsh ceap0 worker | rsh ceap1 worker...");

I spent a couple of hours playing around with this idea and came up
with the following implementation in shell archive (shar) format.
It's not very elegant or efficient, but it's fun to play with.  I
would expect that it might even be a reasonable implementation for
coarse-grained parallelism.  I've tried it on DECstations and Apollo
workstations, and it works just fine (both with workers on a single
machine, and with workers scattered across multiple machines).

In addition to the 'play Linda' files, I've included an example of
matrix multiplication.  You'll need to edit matrix.h to set up proper
paths for your machine.  The matrix workers can also be made to take
additional time (using sleep), which better distributes the work among
processes when running on a single machine.  For example,

   % matrix 4 2

uses 4 workers with 2 seconds added to the time it takes one matrix
worker to complete a dot product.

I'd be glad to receive any comments about what follows.  Like I said,
it's just a little something I hacked up, so it's likely that there
are some bugs and portability problems, not to mention a lack of
documentation.  Oh yeah, this implementation allows multiple tuple
spaces, e.g.,

      OUT(TS, ("A 1 1 2 3\n"));

outputs tuple "A 1 1 2 3" to tuple space TS.  The tuple might be read
in using READ or IN (which extracts the tuple upon reading it), e.g.,

      READ(TS, ("A 1 %d %d %d", &a[0], &a[1], &a[2]));

The input macros use scanf to read in only tuples that match a certain
form, e.g., tuples beginning with "A 1".  Of course, the format string
can be generated dynamically using sprintf as follows:

      sprintf(s, "A %d %%d %%d %%d", i);
      READ(TS, (s, &a[0], &a[1], &a[2]));

John Baugh
jwb@cepmax.ncsu.edu

-----------------------------------------------------------------------------
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting pl.h...
cat >pl.h <<'!E!O!F!'
/* pl.h -- 'play Linda' header file */

#define OUT(ts, etc) { ts_open_write(ts); printf etc; ts_close_write(ts); }
#define READ(ts, etc) { ts_open_read(ts); \
			while (scanf etc <= 0) ts_read_retry(ts); \
                        ts_close_read(ts); }
#define IN(ts, etc) { int ts_line_no = 1; \
		      ts_open_in(ts); \
		      while (scanf etc <= 0) ts_in_retry(ts, &ts_line_no); \
                      ts_close_in(ts, ts_line_no); }
		
void ts_open_read(char *filename);
void ts_open_in(char *filename);
void ts_open_write(char *filename);

void ts_close_read(char *filename);
void ts_close_in(char *filename, int line_no);
void ts_close_write(char *filename);

void ts_read_retry(char *filename);
void ts_in_retry(char *filename, int *line_no);
!E!O!F!
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting pl.c...
cat >pl.c <<'!E!O!F!'
/* pl.c -- 'play Linda' */

#include <stdlib.h>
#include <stdio.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/file.h>
#include <sys/errno.h>
extern int errno;

void
message(char *fmt, ...)
{
  char hostname[80];
  va_list args;

  gethostname(hostname, 80);
  va_start(args, fmt);
  fprintf(stderr, "Message -- host: %s, pid: %d\n  ", hostname, getpid());
  vfprintf(stderr, fmt, args);
  fprintf(stderr, "\n");
  va_end(args);
}
    
void
lock(char *pathname)
{
  int tempfd;
  char lockfile[80];
  
  sprintf(lockfile, "%s.LCK", pathname);

  /*
   * Try to create the lock file, using open() with both
   * O_CREAT (create file if it doesn't exist) and O_EXCL
   * (error if create and file already exists).
   * If this fails, some other process has the lock.
   */

  while ( (tempfd = open(lockfile, O_RDWR|O_CREAT|O_EXCL, 0666)) < 0) {
    if (errno != EEXIST)
      message("In lock: error opening lock file '%s'", lockfile);
    sleep(1);
  }
  close(tempfd);
}

void
unlock(char *pathname)
{
  char lockfile[80];
  
  sprintf(lockfile, "%s.LCK", pathname);
  if (unlink(lockfile) < 0)
    message("In unlock: error removing lock file '%s'", lockfile);
}

void
ts_attempt_open(char *filename, char *mode, FILE *fp)
{
  lock(filename);
  if (freopen(filename, mode, fp) == NULL) {
    message("In ts_attempt_open: cannot open file '%s'", filename);
    exit(-1);
  }
}

void
ts_open_read(char *filename)
{
  ts_attempt_open(filename, "r", stdin);
}

void
ts_open_in(char *filename)
{
  ts_attempt_open(filename, "r+",stdin);
}

void
ts_open_write(char *filename)
{
  ts_attempt_open(filename, "a", stdout);
}

void
ts_attempt_close(char *filename, FILE *fp)
{
  if (fclose(fp) != 0) {
    message("In ts_attempt_close: cannot close file '%s'", filename);
    exit(-1);
  }
}

void
ts_close_read(char *filename)
{
  ts_attempt_close(filename, stdin);
  unlock(filename);
}

void
ts_close_write(char *filename)
{
  ts_attempt_close(filename, stdout);
  unlock(filename);
}

int
advance_line()
{
  int c;

  while ((c = getchar()) != EOF && c != '\n')
    ;
  return c == '\n';
}

void
block_until_modified(char *filename)
{
  time_t mtime;
  struct stat buf;

  stat(filename, &buf);
  mtime = buf.st_mtime;
  stat(filename, &buf);
  while (buf.st_mtime == mtime) {
    sleep(1);
    stat(filename, &buf);
  }
}

void
ts_read_retry(char *filename)
{
  if (!advance_line()) {
    ts_close_read(filename);
    message("Waiting until %s is modified", filename);
    block_until_modified(filename);
    ts_open_read(filename);
  }
}

void
ts_in_retry(char *filename, int *line_no)
{
  if (!advance_line()) {
    ts_close_read(filename);
    message("Waiting until %s is modified", filename);
    block_until_modified(filename);
    ts_open_in(filename);
    *line_no = 1;
  } else
    ++(*line_no);
}

int
cp_line(FILE *from, FILE* to)
{
  int c;

  while ((c = getc(from)) != EOF && c != '\n')
    putc(c, to);
  if (c == '\n')
    putc(c, to);
  return c;
}

void
ts_close_in(char *filename, int line_no)
{
  FILE *ts_new_p;
  char ts_new[80];
  int counter = 1;

  sprintf(ts_new, "%s.NEW%d", filename, getpid());

  rewind(stdin);
  if ((ts_new_p = fopen(ts_new, "w+")) == NULL) {
    message("In ts_close_in: cannot open temporary file '%s'\n", ts_new);
    exit(-1);
  }
  do
    if (counter++ == line_no) {
      advance_line();
      counter++;
    }
  while (cp_line(stdin, ts_new_p) != EOF);
    
  ts_attempt_close(ts_new, ts_new_p);
  ts_attempt_close(filename, stdin);
  if (rename(ts_new, filename) != 0) {
    message("In ts_close_in: cannot rename file '%s' to be '%s'",
	  ts_new, filename);
    exit(-1);
  }
  unlock(filename);
}
!E!O!F!
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting matrix.h...
cat >matrix.h <<'!E!O!F!'
#define TS "/usr/users/jwb/pl/matrix.ts"
#define WORKER "rsh cepmax /usr/users/jwb/pl/worker"
!E!O!F!
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting matrix.c...
cat >matrix.c <<'!E!O!F!'
#include <stdlib.h>
#include <stdio.h>
#include "pl.h"
#include "matrix.h"

#define MAX_WORKERS 20

void
nworkers(int n, char *command)
{
  FILE *worker[MAX_WORKERS];
  
  int i;

  for (i = 0; i < n; i++)
    worker[i] = popen(command, "r");
  
  for (i = 0; i < n; i++)
    pclose(worker[i]);
}
  
main(int argc, char **argv)
{
  char s[80];
  int i, j, value, n, worktime;

  n = ((argc > 1) ? atoi(argv[1]) : 2);
  worktime = ((argc > 2) ? atoi(argv[2]) : 0);
  fprintf(stderr, "Using %d workers, %ds additional worktime...\n",
	  n, worktime);

  unlink(TS);

  OUT(TS, ("A 1 1 2 3\n"));
  OUT(TS, ("A 2 4 5 6\n"));
  OUT(TS, ("A 3 7 8 9\n"));
  OUT(TS, ("B 1 1 0 2\n"));
  OUT(TS, ("B 2 1 2 1\n"));
  OUT(TS, ("B 3 1 1 1\n"));

  OUT(TS, ("NEXT 1 %d\n", worktime));

  nworkers(n, WORKER);

  for (i = 1; i <= 3; i++)
    for (j = 1; j <= 3; j++) {
      sprintf(s, "C %d %d %%d", i, j);
      IN(TS, (s, &value));
      fprintf(stderr, "C[%d][%d] = %d\n", i, j, value);
    }

  return 0;
}
!E!O!F!
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting worker.c...
cat >worker.c <<'!E!O!F!'
#include <stdlib.h>
#include <stdio.h>
#include "pl.h"
#include "matrix.h"

int
inner_product(int a[], int b[], int n)
{
  int i, sum = 0;

  for (i = 0; i < n; i++)
    sum = sum + a[i] * b[i];

  return sum;
}
    
main()
{
  char s[80];
  int element, worktime, i, j, a[3], b[3];

  while (1) {

    IN(TS, ("NEXT %d %d", &element, &worktime));
    OUT(TS, ("NEXT %d %d\n", element+1, worktime));

    if (element > 3 * 3)
      break;

    i = (element - 1) / 3 + 1;
    j = (element - 1) % 3 + 1;

    sprintf(s, "A %d %%d %%d %%d", i);
    READ(TS, (s, &a[0], &a[1], &a[2]));

    sprintf(s, "B %d %%d %%d %%d", j);
    READ(TS, (s, &b[0], &b[1], &b[2]));

    sleep(worktime);

    OUT(TS, ("C %d %d %d\n", i, j, inner_product(a, b, 3)));

    fprintf(stderr, "[%d] %d\n", element, getpid());
  }

  return 0;
}
!E!O!F!
#
# type    sh pl.shar   to unpack this archive.
#
echo extracting Makefile...
cat >Makefile <<'!E!O!F!'
CC =		gcc -Wall -ansi

all:		matrix worker

matrix:		matrix.o pl.o
		$(CC) matrix.o pl.o -o matrix

matrix.o:	matrix.c matrix.h pl.h
		$(CC) -c matrix.c

worker:		worker.o pl.o
		$(CC) worker.o pl.o -o worker

worker.o:	worker.c matrix.h pl.h
		$(CC) -c worker.c

pl.o:		pl.c
		$(CC) -c pl.c

!E!O!F!