home *** CD-ROM | disk | FTP | other *** search
- /*
- * makework -- emulate a series of terminal users
- *
- * makework nusers
- *
- * job streams are specified on standard input with lines of the form
- * home_dir cmd_path_name [ options ] [ <stdin_file ] [ >stdout_file ]
- *
- * Input is send to all nuser instances of the commands in the
- * job streams at a rate not in excess of "rate" characters per second
- * per command
- *
- * environment variables $rate and $tty control typing rate (characters
- * per second) and destination of echoed output.
- *
- * $Header: makework.c,v 3.9 87/09/17 05:55:13 kenj Exp $
- */
-
- #include "makework.h"
-
- #define DEF_RATE 5.0
- #define GRANULE 5
- #define CHUNK 60
-
- float thres;
- float est_rate = DEF_RATE;
- int firstuser; /* ordinal identification of first user for this
- * process */
- int exit_status = 0;
- int sigpipe; /* pipe write error flag */
- int nstream;
-
- static stream *cp;
-
- main(argc, argv)
- int argc;
- char *argv[];
- {
- int i;
- int l;
- int fcopy = 0; /* fd for copy output */
- int master = 1; /* the REAL master, == 0 for clones */
- int cmseq = 0; /* clone/master seq number */
- int done; /* count of children finished */
- int output; /* aggregate output char count for all
- children */
- int c;
- int nch; /* # characters to write */
- int written; /* # characters actully written */
- char logname[20]; /* name of the log file(s) */
- int nusers;
- int onalarm();
- int pipeerr();
- int wrapup();
- int grunt();
- int pvec[2]; /* for pipes */
- char *p;
- char *prog; /* my name */
- char *getenv();
-
- #ifndef DEBUG
- freopen("Tmp/masterlog.00", "a", stderr);
- #endif
- fprintf(stderr, "*** New Run *** ");
- prog = argv[0];
- if ((p = getenv("rate")) != (char *)0) {
- sscanf(p, "%f", &est_rate);
- if (est_rate <= 0) {
- fprintf(stderr, "%s: bad rate, reset to %.2f chars/sec\n", prog, DEF_RATE);
- est_rate = DEF_RATE;
- }
- #ifdef DEBUG
- else
- fprintf(stderr, "%s: typing rate reset to %.2f chars/sec\n", prog, est_rate);
- #endif
- }
- if ((p = getenv("tty")) != (char *)0) {
- fcopy = open(p, 1);
- if (fcopy < 0)
- fcopy = creat(p, 0600);
- if (fcopy < 0) {
- fprintf(stderr, "%s: cannot open copy file '%s'\n", prog, p);
- fflush(stderr);
- exit(2);
- }
- lseek(fcopy, 0L, 2); /* append at end of file */
- }
-
- if (argc < 2) {
- fprintf(stderr, "%s: missing nusers\n", prog);
- fflush(stderr);
- exit(4);
- }
-
- nusers = atoi(argv[1]);
- if (nusers < 1) {
- fprintf(stderr, "%s: impossible nusers (%d<-%s)\n", prog, nusers, argv[1]);
- fflush(stderr);
- exit(4);
- }
- fprintf(stderr, "%d Users\n", nusers);
- argc--;
- argv++;
-
- /* clone copies of myself to run up to MAXSTREAM jobs each */
- firstuser = MAXSTREAM;
- fprintf(stderr, "master pid %d\n", getpid());
- fflush(stderr);
- while (nusers > MAXSTREAM) {
- fflush(stderr);
- if (nusers >= 2*MAXSTREAM)
- /* the next clone must run MAXSTREAM jobs */
- nstream = MAXSTREAM;
- else
- /* the next clone must run the leftover jobs */
- nstream = nusers - MAXSTREAM;
- /* build job streams for the clone */
- getwork(nstream);
- #ifdef DEBUG
- dumpwork();
- #endif
- cmseq = firstuser/MAXSTREAM;
- if ((l = fork()) == -1) {
- /* fork failed */
- fatal("** clone fork failed **\n");
- goto bepatient;
- } else if (l > 0) {
- fprintf(stderr, "clone %d pid %d\n", cmseq, l);
- /* I am the master with nstream fewer jobs to run */
- nusers -= nstream;
- firstuser += MAXSTREAM;
- continue;
- } else {
- /* I am a clone, run MAXSTREAM jobs */
- master = 0;
- nusers = nstream;
- #ifndef DEBUG
- sprintf(logname, "Tmp/masterlog.%02d", cmseq);
- freopen(logname, "w", stderr);
- #endif
- break;
- }
- }
- if (master) {
- firstuser = 0;
- cmseq = 0;
- nstream = nusers;
- /* build job streams for the master */
- getwork(nstream);
- #ifdef DEBUG
- dumpwork();
- #endif
- }
-
- close(0);
-
- for (i = 0; i < nstream; i++ ) {
- if (master)
- fprintf(stderr, "user %d master stream %d ", firstuser+i, i);
- else
- fprintf(stderr, "user %d clone %d stream %d ", firstuser+i, cmseq, i);
- if (pipe(pvec) == -1) {
- /* this is fatal */
- fatal("** pipe failed **\n");
- goto bepatient;
- }
- fflush(stderr);
- if ((work[i].pid = fork()) == 0) {
- int fd;
- /* the command */
- if (pvec[0] != 0) {
- close(0);
- dup(pvec[0]);
- }
- #ifndef DEBUG
- sprintf(logname, "Tmp/userlog.%02d", firstuser+i);
- freopen(logname, "w", stderr);
- #endif
- for (fd = 3; fd < 24; fd++)
- close(fd);
- if (work[i].tty[0] != '\0') {
- /* redirect std output */
- if (freopen(work[i].tty, "w", stdout) == NULL) {
- fprintf(stderr, "makework: cannot open %s for std output\n",
- work[i].tty);
- fflush(stderr);
- goto bepatient;
- }
- }
- if (chdir(work[i].home) == -1) {
- fprintf(stderr, "makework: chdir to \"%s\" failed!\n",
- work[i].home);
- fflush(stderr);
- goto bepatient;
- }
- sprintf(logname, "USER.%02d", firstuser+i);
- if (close(creat(logname, 0600)) == -1) {
- fprintf(stderr, "makework: creat \"%s\" failed!\n", logname);
- fflush(stderr);
- goto bepatient;
- }
-
- execv(work[i].cmd, work[i].av);
- /* don't expect to get here! */
- fatal("** exec failed **\n");
- goto bepatient;
- }
- else if (work[i].pid == -1) {
- fatal("** fork failed **\n");
- goto bepatient;
- }
- else {
- close(pvec[0]);
- work[i].fd = pvec[1];
- work[i].line = work[i].bp = work[i].buf;
- fprintf(stderr, "pid %d pipe fd %d", work[i].pid, work[i].fd);
- if (work[i].tty[0] != '\0')
- fprintf(stderr, " > %s", work[i].tty);
- fputc('\n', stderr);
- }
- }
- fflush(stderr);
-
- srand(time(0));
- thres = 0;
- done = output = 0;
- for (i = 0; i < nstream; i++) {
- if (work[i].blen == 0)
- done++;
- else
- thres += est_rate * GRANULE;
- }
- est_rate = thres;
-
- signal(SIGALRM, onalarm);
- signal(SIGPIPE, pipeerr);
- alarm(GRANULE);
- while (done < nstream) {
- for (i = 0; i < nstream; i++) {
- cp = &work[i];
- if (cp->xmit >= cp->blen) continue;
- l = rand() % CHUNK + 1; /* 1-CHUNK chars */
- if (l == 0) continue;
- if (cp->xmit + l > cp->blen)
- l = cp->blen - cp->xmit;
- p = cp->bp;
- cp->bp += l;
- cp->xmit += l;
- #ifdef DEBUG
- fprintf(stderr, "child %d, %d processed, %d to go\n", i, cp->xmit, cp->blen - cp->xmit);
- #endif
- while (p < cp->bp) {
- if (*p == '\n' || (p == &cp->bp[-1] && cp->xmit >= cp->blen)) {
- /* write it out */
- nch = p - cp->line + 1;
- if ((written = write(cp->fd, cp->line, nch)) != nch) {
- /* argh! */
- cp->line[nch] = '\0';
- fprintf(stderr, "user %d cmd %s ",
- firstuser+i, cp->line);
- fprintf(stderr, "write(,,%d) returns %d\n", nch, written);
- if (sigpipe)
- fatal("** SIGPIPE error **\n");
- else
- fatal("** write error **\n");
- goto bepatient;
-
- }
- if (fcopy)
- write(fcopy, cp->line, p - cp->line + 1);
- #ifdef DEBUG
- fprintf(stderr, "child %d gets \"", i);
- {
- char *q = cp->line;
- while (q <= p) {
- if (*q >= ' ' && *q <= '~')
- fputc(*q, stderr);
- else
- fprintf(stderr, "\\%03o", *q);
- q++;
- }
- }
- fputc('"', stderr);
- #endif
- cp->line = &p[1];
- }
- p++;
- }
- if (cp->xmit >= cp->blen) {
- done++;
- close(cp->fd);
- #ifdef DEBUG
- fprintf(stderr, "child %d, close std input\n", i);
- #endif
- }
- output += l;
- }
- while (output > thres) {
- pause();
- #ifdef DEBUG
- fprintf(stderr, "after pause: output, thres, done %d %.2f %d\n", output, thres, done);
- #endif
- }
- }
-
- bepatient:
- alarm(0);
- /****
- * If everything is going OK, we should simply be able to keep
- * looping unitil 'wait' fails, however some descendent process may
- * be in a state from which it can never exit, and so a timeout
- * is used.
- * 5 minutes should be ample, since the time to run all jobs is of
- * the order of 5-10 minutes, however some machines are painfully slow,
- * so the timeout has been set at 20 minutes (1200 seconds).
- ****/
- signal(SIGALRM, grunt);
- alarm(1200);
- while ((c = wait(&l)) != -1) {
- for (i = 0; i < nstream; i++) {
- if (c == work[i].pid) {
- fprintf(stderr, "user %d pid %d done", firstuser+i, c);
- if (l != 0) {
- if (l & 0x7f)
- fprintf(stderr, " status %d", l & 0x7f);
- if (l & 0xff00)
- fprintf(stderr, " exit code %d", (l>>8) & 0xff);
- exit_status = 4;
- }
- fputc('\n', stderr);
- c = work[i].pid = -1;
- break;
- }
- }
- if (c != -1) {
- fprintf(stderr, "clone %d done, pid %d ", cmseq, c);
- if (l != 0) {
- if (l & 0x7f)
- fprintf(stderr, " status %d", l & 0x7f);
- if (l & 0xff00)
- fprintf(stderr, " exit code %d", (l>>8) & 0xff);
- exit_status = 4;
- }
- fputc('\n', stderr);
- }
- }
- alarm(0);
- wrapup("Finished waiting ...");
-
-
- }
-
- onalarm()
- {
- thres += est_rate;
- signal(SIGALRM, onalarm);
- alarm(GRANULE);
- }
-
- grunt()
- {
- /* timeout after label "bepatient" in main */
- exit_status = 4;
- wrapup("Timed out waiting for jobs to finish ...");
- }
-
- pipeerr()
- {
- sigpipe++;
- }
-
- wrapup(reason)
- char *reason;
- {
- int i;
- int killed = 0;
- fflush(stderr);
- for (i = 0; i < nstream; i++) {
- if (work[i].pid > 0 && kill(work[i].pid, SIGKILL) != -1) {
- if (!killed) {
- killed++;
- fprintf(stderr, "%s\n", reason);
- fflush(stderr);
- }
- fprintf(stderr, "user %d pid %d killed off\n", firstuser+i, work[i].pid);
- fflush(stderr);
- }
- }
- exit(exit_status);
- }
-