home *** CD-ROM | disk | FTP | other *** search
/ Usenet 1994 January / usenetsourcesnewsgroupsinfomagicjanuary1994.iso / sources / unix / volume11 / musbus / part03 / makework.c < prev    next >
Encoding:
C/C++ Source or Header  |  1987-09-16  |  9.2 KB  |  391 lines

  1. /*
  2.  *  makework  -- emulate a series of terminal users
  3.  *
  4.  *  makework nusers
  5.  *
  6.  *  job streams are specified on standard input with lines of the form
  7.  *  home_dir cmd_path_name [ options ] [ <stdin_file ] [ >stdout_file ]
  8.  *
  9.  *  Input is send to all nuser instances of the commands in the
  10.  *  job streams at a rate not in excess of "rate" characters per second
  11.  *  per command
  12.  *
  13.  *  environment variables $rate and $tty control typing rate (characters
  14.  *  per second) and destination of echoed output.
  15.  *
  16.  *  $Header: makework.c,v 3.9 87/09/17 05:55:13 kenj Exp $
  17.  */
  18.  
  19. #include "makework.h"
  20.  
  21. #define DEF_RATE    5.0
  22. #define GRANULE        5
  23. #define CHUNK        60
  24.  
  25. float    thres;
  26. float    est_rate = DEF_RATE;
  27. int    firstuser;    /* ordinal identification of first user for this
  28.              * process */
  29. int    exit_status = 0;
  30. int    sigpipe;    /* pipe write error flag */
  31. int    nstream;
  32.  
  33. static stream *cp;
  34.  
  35. main(argc, argv)
  36. int    argc;
  37. char    *argv[];
  38. {
  39.     int        i;
  40.     int        l;
  41.     int        fcopy = 0;    /* fd for copy output */
  42.     int        master = 1;    /* the REAL master, == 0 for clones */
  43.     int        cmseq = 0;    /* clone/master seq number */
  44.     int        done;        /* count of children finished */
  45.     int        output;        /* aggregate output char count for all
  46.                    children */
  47.     int        c;
  48.     int        nch;        /* # characters to write */
  49.     int        written;    /* # characters actully written */
  50.     char    logname[20];    /* name of the log file(s) */
  51.     int        nusers;
  52.     int        onalarm();
  53.     int        pipeerr();
  54.     int        wrapup();
  55.     int        grunt();
  56.     int        pvec[2];    /* for pipes */
  57.     char    *p;
  58.     char    *prog;        /* my name */
  59.     char    *getenv();
  60.  
  61. #ifndef DEBUG
  62.     freopen("Tmp/masterlog.00", "a", stderr);
  63. #endif
  64.     fprintf(stderr, "*** New Run ***  ");
  65.     prog = argv[0];
  66.     if ((p = getenv("rate")) != (char *)0) {
  67.     sscanf(p, "%f", &est_rate);
  68.     if (est_rate <= 0) {
  69.         fprintf(stderr, "%s: bad rate, reset to %.2f chars/sec\n", prog, DEF_RATE);
  70.         est_rate = DEF_RATE;
  71.     }
  72. #ifdef DEBUG
  73.     else
  74.         fprintf(stderr, "%s: typing rate reset to %.2f chars/sec\n", prog, est_rate);
  75. #endif
  76.     }
  77.     if ((p = getenv("tty")) != (char *)0) {
  78.     fcopy = open(p, 1);
  79.     if (fcopy < 0)
  80.         fcopy = creat(p, 0600);
  81.     if (fcopy < 0) {
  82.         fprintf(stderr, "%s: cannot open copy file '%s'\n", prog, p);
  83.         fflush(stderr);
  84.         exit(2);
  85.     }
  86.     lseek(fcopy, 0L, 2);    /* append at end of file */
  87.     }
  88.     
  89.     if (argc < 2) {
  90.     fprintf(stderr, "%s: missing nusers\n", prog);
  91.     fflush(stderr);
  92.     exit(4);
  93.     }
  94.  
  95.     nusers = atoi(argv[1]);
  96.     if (nusers < 1) {
  97.     fprintf(stderr, "%s: impossible nusers (%d<-%s)\n", prog, nusers, argv[1]);
  98.     fflush(stderr);
  99.     exit(4);
  100.     }
  101.     fprintf(stderr, "%d Users\n", nusers);
  102.     argc--;
  103.     argv++;
  104.  
  105.     /* clone copies of myself to run up to MAXSTREAM jobs each */
  106.     firstuser = MAXSTREAM;
  107.     fprintf(stderr, "master pid %d\n", getpid());
  108.     fflush(stderr);
  109.     while (nusers > MAXSTREAM) {
  110.     fflush(stderr);
  111.     if (nusers >= 2*MAXSTREAM)
  112.         /* the next clone must run MAXSTREAM jobs */
  113.         nstream = MAXSTREAM;
  114.     else
  115.         /* the next clone must run the leftover jobs */
  116.         nstream = nusers - MAXSTREAM;
  117.     /* build job streams for the clone */
  118.     getwork(nstream);
  119. #ifdef DEBUG
  120.     dumpwork();
  121. #endif
  122.     cmseq = firstuser/MAXSTREAM;
  123.     if ((l = fork()) == -1) {
  124.         /* fork failed */
  125.         fatal("** clone fork failed **\n");
  126.         goto bepatient;
  127.     } else if (l > 0) {
  128.         fprintf(stderr, "clone %d pid %d\n", cmseq, l);
  129.         /* I am the master with nstream fewer jobs to run */
  130.         nusers -= nstream;
  131.         firstuser += MAXSTREAM;
  132.         continue;
  133.     } else {
  134.         /* I am a clone, run MAXSTREAM jobs */
  135.         master = 0;
  136.         nusers = nstream;
  137. #ifndef DEBUG
  138.         sprintf(logname, "Tmp/masterlog.%02d", cmseq);
  139.         freopen(logname, "w", stderr);
  140. #endif
  141.         break;
  142.     }
  143.     }
  144.     if (master) {
  145.     firstuser = 0;
  146.     cmseq = 0;
  147.     nstream = nusers;
  148.     /* build job streams for the master */
  149.     getwork(nstream);
  150. #ifdef DEBUG
  151.     dumpwork();
  152. #endif
  153.     }
  154.  
  155.     close(0);
  156.  
  157.     for (i = 0; i < nstream; i++ ) {
  158.     if (master)
  159.         fprintf(stderr, "user %d master stream %d ", firstuser+i, i);
  160.     else
  161.         fprintf(stderr, "user %d clone %d stream %d ", firstuser+i, cmseq, i);
  162.     if (pipe(pvec) == -1) {
  163.         /* this is fatal */
  164.         fatal("** pipe failed **\n");
  165.         goto bepatient;
  166.     }
  167.     fflush(stderr);
  168.     if ((work[i].pid = fork()) == 0) {
  169.         int    fd;
  170.         /* the command */
  171.         if (pvec[0] != 0) {
  172.         close(0);
  173.         dup(pvec[0]);
  174.         }
  175. #ifndef DEBUG
  176.         sprintf(logname, "Tmp/userlog.%02d", firstuser+i);
  177.         freopen(logname, "w", stderr);
  178. #endif
  179.         for (fd = 3; fd < 24; fd++)
  180.         close(fd);
  181.         if (work[i].tty[0] != '\0') {
  182.         /* redirect std output */
  183.         if (freopen(work[i].tty, "w", stdout) == NULL) {
  184.             fprintf(stderr, "makework: cannot open %s for std output\n",
  185.             work[i].tty);
  186.             fflush(stderr);
  187.             goto bepatient;
  188.         }
  189.         }
  190.         if (chdir(work[i].home) == -1) {
  191.         fprintf(stderr, "makework: chdir to \"%s\" failed!\n",
  192.             work[i].home);
  193.         fflush(stderr);
  194.         goto bepatient;
  195.         }
  196.         sprintf(logname, "USER.%02d", firstuser+i);
  197.         if (close(creat(logname, 0600)) == -1) {
  198.         fprintf(stderr, "makework: creat \"%s\" failed!\n", logname);
  199.         fflush(stderr);
  200.         goto bepatient;
  201.         }
  202.  
  203.         execv(work[i].cmd, work[i].av);
  204.         /* don't expect to get here! */
  205.         fatal("** exec failed **\n");
  206.         goto bepatient;
  207.     }
  208.     else if (work[i].pid == -1) {
  209.         fatal("** fork failed **\n");
  210.         goto bepatient;
  211.     }
  212.     else {
  213.         close(pvec[0]);
  214.         work[i].fd = pvec[1];
  215.         work[i].line = work[i].bp = work[i].buf;
  216.         fprintf(stderr, "pid %d pipe fd %d", work[i].pid, work[i].fd);
  217.         if (work[i].tty[0] != '\0')
  218.         fprintf(stderr, " > %s", work[i].tty);
  219.         fputc('\n', stderr);
  220.     }
  221.     }
  222.     fflush(stderr);
  223.  
  224.     srand(time(0));
  225.     thres = 0;
  226.     done = output = 0;
  227.     for (i = 0; i < nstream; i++) {
  228.     if (work[i].blen == 0)
  229.         done++;
  230.     else
  231.         thres += est_rate * GRANULE;
  232.     }
  233.     est_rate = thres;
  234.  
  235.     signal(SIGALRM, onalarm);
  236.     signal(SIGPIPE, pipeerr);
  237.     alarm(GRANULE);
  238.     while (done < nstream) {
  239.     for (i = 0; i < nstream; i++) {
  240.         cp = &work[i];
  241.         if (cp->xmit >= cp->blen) continue;
  242.         l = rand() % CHUNK + 1;    /* 1-CHUNK chars */
  243.         if (l == 0) continue;
  244.         if (cp->xmit + l > cp->blen)
  245.         l = cp->blen - cp->xmit;
  246.         p = cp->bp;
  247.         cp->bp += l;
  248.         cp->xmit += l;
  249. #ifdef DEBUG
  250.         fprintf(stderr, "child %d, %d processed, %d to go\n", i, cp->xmit, cp->blen - cp->xmit);
  251. #endif
  252.         while (p < cp->bp) {
  253.         if (*p == '\n' || (p == &cp->bp[-1] && cp->xmit >= cp->blen)) {
  254.             /* write it out */
  255.             nch = p - cp->line + 1;
  256.             if ((written = write(cp->fd, cp->line, nch)) != nch) {
  257.             /* argh! */
  258.             cp->line[nch] = '\0';
  259.             fprintf(stderr, "user %d cmd %s ",
  260.                 firstuser+i, cp->line);
  261.              fprintf(stderr, "write(,,%d) returns %d\n", nch, written);
  262.             if (sigpipe)
  263.                 fatal("** SIGPIPE error **\n");
  264.             else
  265.                 fatal("** write error **\n");
  266.             goto bepatient;
  267.  
  268.             }
  269.             if (fcopy)
  270.             write(fcopy, cp->line, p - cp->line + 1);
  271. #ifdef DEBUG
  272.             fprintf(stderr, "child %d gets \"", i);
  273.             {
  274.             char *q = cp->line;
  275.             while (q <= p) {
  276.                 if (*q >= ' ' && *q <= '~')
  277.                     fputc(*q, stderr);
  278.                 else
  279.                     fprintf(stderr, "\\%03o", *q);
  280.                 q++;
  281.             }
  282.             }
  283.             fputc('"', stderr);
  284. #endif
  285.             cp->line = &p[1];
  286.         }
  287.         p++;
  288.         }
  289.         if (cp->xmit >= cp->blen) {
  290.         done++;
  291.         close(cp->fd);
  292. #ifdef DEBUG
  293.     fprintf(stderr, "child %d, close std input\n", i);
  294. #endif
  295.         }
  296.         output += l;
  297.     }
  298.     while (output > thres) {
  299.         pause();
  300. #ifdef DEBUG
  301.         fprintf(stderr, "after pause: output, thres, done %d %.2f %d\n", output, thres, done);
  302. #endif
  303.     }
  304.     }
  305.  
  306. bepatient:
  307.     alarm(0);
  308. /****
  309.  *  If everything is going OK, we should simply be able to keep
  310.  *  looping unitil 'wait' fails, however some descendent process may
  311.  *  be in a state from which it can never exit, and so a timeout
  312.  *  is used.
  313.  *  5 minutes should be ample, since the time to run all jobs is of
  314.  *  the order of 5-10 minutes, however some machines are painfully slow,
  315.  *  so the timeout has been set at 20 minutes (1200 seconds).
  316.  ****/
  317.     signal(SIGALRM, grunt);
  318.     alarm(1200);
  319.     while ((c = wait(&l)) != -1) {
  320.         for (i = 0; i < nstream; i++) {
  321.         if (c == work[i].pid) {
  322.         fprintf(stderr, "user %d pid %d done", firstuser+i, c);
  323.         if (l != 0) {
  324.             if (l & 0x7f)
  325.             fprintf(stderr, " status %d", l & 0x7f);
  326.             if (l & 0xff00)
  327.             fprintf(stderr, " exit code %d", (l>>8) & 0xff);
  328.             exit_status = 4;
  329.         }
  330.         fputc('\n', stderr);
  331.         c = work[i].pid = -1;
  332.         break;
  333.         }
  334.     }
  335.     if (c != -1) {
  336.         fprintf(stderr, "clone %d done, pid %d ", cmseq, c);
  337.         if (l != 0) {
  338.         if (l & 0x7f)
  339.             fprintf(stderr, " status %d", l & 0x7f);
  340.         if (l & 0xff00)
  341.             fprintf(stderr, " exit code %d", (l>>8) & 0xff);
  342.         exit_status = 4;
  343.         }
  344.         fputc('\n', stderr);
  345.     }
  346.     }
  347.     alarm(0);
  348.     wrapup("Finished waiting ...");
  349.  
  350.  
  351. }
  352.  
  353. onalarm()
  354. {
  355.     thres += est_rate;
  356.     signal(SIGALRM, onalarm);
  357.     alarm(GRANULE);
  358. }
  359.  
  360. grunt()
  361. {
  362.     /* timeout after label "bepatient" in main */
  363.     exit_status = 4;
  364.     wrapup("Timed out waiting for jobs to finish ...");
  365. }
  366.  
  367. pipeerr()
  368. {
  369.     sigpipe++;
  370. }
  371.  
  372. wrapup(reason)
  373. char    *reason;
  374. {
  375.     int i;
  376.     int killed = 0;
  377.     fflush(stderr);
  378.     for (i = 0; i < nstream; i++) {
  379.     if (work[i].pid > 0 && kill(work[i].pid, SIGKILL) != -1) {
  380.         if (!killed) {
  381.         killed++;
  382.         fprintf(stderr, "%s\n", reason);
  383.         fflush(stderr);
  384.         }
  385.         fprintf(stderr, "user %d pid %d killed off\n", firstuser+i, work[i].pid);
  386.     fflush(stderr);
  387.     }
  388.     }
  389.     exit(exit_status);
  390. }
  391.