jwb@cepmax.ncsu.edu (John W. Baugh Jr.) (02/20/91)
The recent requests for Linda on distributed workstations prompted my curosity about how one could implement its (her?) primitives. Since most (?) folks have some kind of distributed file system, a toy implementation could simply use a (shared) file as tuple space (TS) with printf and scanf for output to and selective input from TS, with each tuple being separated by a newline. Workers can be invoked on remote machines using rsh, e.g., system("rsh ceap0 worker | rsh ceap1 worker..."); I spent a couple of hours playing around with this idea and came up with the following implementation in shell archive (shar) format. It's not very elegant or efficient, but it's fun to play with. I would expect that it might even be a reasonable implementation for coarse-grained parallelism. I've tried it on DECstations and Apollo workstations, and it works just fine (both with workers on a single machine, and with workers scattered across multiple machines). In addition to the 'play Linda' files, I've included an example of matrix multiplication. You'll need to edit matrix.h to set up proper paths for your machine. The matrix workers can also be made to take additional time (using sleep), which better distributes the work among processes when running on a single machine. For example, % matrix 4 2 uses 4 workers with 2 seconds added to the time it takes one matrix worker to complete a dot product. I'd be glad to receive any comments about what follows. Like I said, it's just a little something I hacked up, so it's likely that there are some bugs and portability problems, not to mention a lack of documentation. Oh yeah, this implementation allows multiple tuple spaces, e.g., OUT(TS, ("A 1 1 2 3\n")); outputs tuple "A 1 1 2 3" to tuple space TS. The tuple might be read in using READ or IN (which extracts the tuple upon reading it), e.g., READ(TS, ("A 1 %d %d %d", &a[0], &a[1], &a[2])); The input macros use scanf to read in only tuples that match a certain form, e.g., tuples beginning with "A 1". Of course, the format string can be generated dynamically using sprintf as follows: sprintf(s, "A %d %%d %%d %%d", i); READ(TS, (s, &a[0], &a[1], &a[2])); John Baugh jwb@cepmax.ncsu.edu ----------------------------------------------------------------------------- # # type sh pl.shar to unpack this archive. # echo extracting pl.h... cat >pl.h <<'!E!O!F!' /* pl.h -- 'play Linda' header file */ #define OUT(ts, etc) { ts_open_write(ts); printf etc; ts_close_write(ts); } #define READ(ts, etc) { ts_open_read(ts); \ while (scanf etc <= 0) ts_read_retry(ts); \ ts_close_read(ts); } #define IN(ts, etc) { int ts_line_no = 1; \ ts_open_in(ts); \ while (scanf etc <= 0) ts_in_retry(ts, &ts_line_no); \ ts_close_in(ts, ts_line_no); } void ts_open_read(char *filename); void ts_open_in(char *filename); void ts_open_write(char *filename); void ts_close_read(char *filename); void ts_close_in(char *filename, int line_no); void ts_close_write(char *filename); void ts_read_retry(char *filename); void ts_in_retry(char *filename, int *line_no); !E!O!F! # # type sh pl.shar to unpack this archive. # echo extracting pl.c... cat >pl.c <<'!E!O!F!' /* pl.c -- 'play Linda' */ #include <stdlib.h> #include <stdio.h> #include <stdarg.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/file.h> #include <sys/errno.h> extern int errno; void message(char *fmt, ...) { char hostname[80]; va_list args; gethostname(hostname, 80); va_start(args, fmt); fprintf(stderr, "Message -- host: %s, pid: %d\n ", hostname, getpid()); vfprintf(stderr, fmt, args); fprintf(stderr, "\n"); va_end(args); } void lock(char *pathname) { int tempfd; char lockfile[80]; sprintf(lockfile, "%s.LCK", pathname); /* * Try to create the lock file, using open() with both * O_CREAT (create file if it doesn't exist) and O_EXCL * (error if create and file already exists). * If this fails, some other process has the lock. */ while ( (tempfd = open(lockfile, O_RDWR|O_CREAT|O_EXCL, 0666)) < 0) { if (errno != EEXIST) message("In lock: error opening lock file '%s'", lockfile); sleep(1); } close(tempfd); } void unlock(char *pathname) { char lockfile[80]; sprintf(lockfile, "%s.LCK", pathname); if (unlink(lockfile) < 0) message("In unlock: error removing lock file '%s'", lockfile); } void ts_attempt_open(char *filename, char *mode, FILE *fp) { lock(filename); if (freopen(filename, mode, fp) == NULL) { message("In ts_attempt_open: cannot open file '%s'", filename); exit(-1); } } void ts_open_read(char *filename) { ts_attempt_open(filename, "r", stdin); } void ts_open_in(char *filename) { ts_attempt_open(filename, "r+",stdin); } void ts_open_write(char *filename) { ts_attempt_open(filename, "a", stdout); } void ts_attempt_close(char *filename, FILE *fp) { if (fclose(fp) != 0) { message("In ts_attempt_close: cannot close file '%s'", filename); exit(-1); } } void ts_close_read(char *filename) { ts_attempt_close(filename, stdin); unlock(filename); } void ts_close_write(char *filename) { ts_attempt_close(filename, stdout); unlock(filename); } int advance_line() { int c; while ((c = getchar()) != EOF && c != '\n') ; return c == '\n'; } void block_until_modified(char *filename) { time_t mtime; struct stat buf; stat(filename, &buf); mtime = buf.st_mtime; stat(filename, &buf); while (buf.st_mtime == mtime) { sleep(1); stat(filename, &buf); } } void ts_read_retry(char *filename) { if (!advance_line()) { ts_close_read(filename); message("Waiting until %s is modified", filename); block_until_modified(filename); ts_open_read(filename); } } void ts_in_retry(char *filename, int *line_no) { if (!advance_line()) { ts_close_read(filename); message("Waiting until %s is modified", filename); block_until_modified(filename); ts_open_in(filename); *line_no = 1; } else ++(*line_no); } int cp_line(FILE *from, FILE* to) { int c; while ((c = getc(from)) != EOF && c != '\n') putc(c, to); if (c == '\n') putc(c, to); return c; } void ts_close_in(char *filename, int line_no) { FILE *ts_new_p; char ts_new[80]; int counter = 1; sprintf(ts_new, "%s.NEW%d", filename, getpid()); rewind(stdin); if ((ts_new_p = fopen(ts_new, "w+")) == NULL) { message("In ts_close_in: cannot open temporary file '%s'\n", ts_new); exit(-1); } do if (counter++ == line_no) { advance_line(); counter++; } while (cp_line(stdin, ts_new_p) != EOF); ts_attempt_close(ts_new, ts_new_p); ts_attempt_close(filename, stdin); if (rename(ts_new, filename) != 0) { message("In ts_close_in: cannot rename file '%s' to be '%s'", ts_new, filename); exit(-1); } unlock(filename); } !E!O!F! # # type sh pl.shar to unpack this archive. # echo extracting matrix.h... cat >matrix.h <<'!E!O!F!' #define TS "/usr/users/jwb/pl/matrix.ts" #define WORKER "rsh cepmax /usr/users/jwb/pl/worker" !E!O!F! # # type sh pl.shar to unpack this archive. # echo extracting matrix.c... cat >matrix.c <<'!E!O!F!' #include <stdlib.h> #include <stdio.h> #include "pl.h" #include "matrix.h" #define MAX_WORKERS 20 void nworkers(int n, char *command) { FILE *worker[MAX_WORKERS]; int i; for (i = 0; i < n; i++) worker[i] = popen(command, "r"); for (i = 0; i < n; i++) pclose(worker[i]); } main(int argc, char **argv) { char s[80]; int i, j, value, n, worktime; n = ((argc > 1) ? atoi(argv[1]) : 2); worktime = ((argc > 2) ? atoi(argv[2]) : 0); fprintf(stderr, "Using %d workers, %ds additional worktime...\n", n, worktime); unlink(TS); OUT(TS, ("A 1 1 2 3\n")); OUT(TS, ("A 2 4 5 6\n")); OUT(TS, ("A 3 7 8 9\n")); OUT(TS, ("B 1 1 0 2\n")); OUT(TS, ("B 2 1 2 1\n")); OUT(TS, ("B 3 1 1 1\n")); OUT(TS, ("NEXT 1 %d\n", worktime)); nworkers(n, WORKER); for (i = 1; i <= 3; i++) for (j = 1; j <= 3; j++) { sprintf(s, "C %d %d %%d", i, j); IN(TS, (s, &value)); fprintf(stderr, "C[%d][%d] = %d\n", i, j, value); } return 0; } !E!O!F! # # type sh pl.shar to unpack this archive. # echo extracting worker.c... cat >worker.c <<'!E!O!F!' #include <stdlib.h> #include <stdio.h> #include "pl.h" #include "matrix.h" int inner_product(int a[], int b[], int n) { int i, sum = 0; for (i = 0; i < n; i++) sum = sum + a[i] * b[i]; return sum; } main() { char s[80]; int element, worktime, i, j, a[3], b[3]; while (1) { IN(TS, ("NEXT %d %d", &element, &worktime)); OUT(TS, ("NEXT %d %d\n", element+1, worktime)); if (element > 3 * 3) break; i = (element - 1) / 3 + 1; j = (element - 1) % 3 + 1; sprintf(s, "A %d %%d %%d %%d", i); READ(TS, (s, &a[0], &a[1], &a[2])); sprintf(s, "B %d %%d %%d %%d", j); READ(TS, (s, &b[0], &b[1], &b[2])); sleep(worktime); OUT(TS, ("C %d %d %d\n", i, j, inner_product(a, b, 3))); fprintf(stderr, "[%d] %d\n", element, getpid()); } return 0; } !E!O!F! # # type sh pl.shar to unpack this archive. # echo extracting Makefile... cat >Makefile <<'!E!O!F!' CC = gcc -Wall -ansi all: matrix worker matrix: matrix.o pl.o $(CC) matrix.o pl.o -o matrix matrix.o: matrix.c matrix.h pl.h $(CC) -c matrix.c worker: worker.o pl.o $(CC) worker.o pl.o -o worker worker.o: worker.c matrix.h pl.h $(CC) -c worker.c pl.o: pl.c $(CC) -c pl.c !E!O!F!