home *** CD-ROM | disk | FTP | other *** search
/ Usenet 1994 January / usenetsourcesnewsgroupsinfomagicjanuary1994.iso / sources / unix / volume11 / musbus / part02 / big.c next >
Encoding:
C/C++ Source or Header  |  1987-09-16  |  12.8 KB  |  570 lines

  1. /*
  2.  *  dummy code for execl test [ old version of makework.c ]
  3.  *
  4.  *  makework [ -r rate ] [ -c copyfile ] nusers
  5.  *
  6.  *  job streams are specified on standard input with lines of the form
  7.  *  full_path_name_for_command [ options ] [ <standard_input_file ]
  8.  *
  9.  *  "standard input" is send to all nuser instances of the commands in the
  10.  *  job streams at a rate not in excess of "rate" characters per second
  11.  *  per command
  12.  *
  13.  *  $Header: big.c,v 1.2 87/06/22 14:22:40 kjmcdonell Beta $
  14.  */
  15.  
  16. #include <stdio.h>
  17. #include <signal.h>
  18.  
  19. #define DEF_RATE    5.0
  20. #define GRANULE        5
  21. #define CHUNK        60
  22. #define MAXCHILD    12
  23. #define MAXWORK        10
  24.  
  25. float    thres;
  26. float    est_rate = DEF_RATE;
  27. int    nusers;        /* number of concurrent users to be simulated by
  28.              * this process */
  29. int    firstuser;    /* ordinal identification of first user for this
  30.              * process */
  31. int    nwork = 0;    /* number of job streams */
  32. int    exit_status = 0;    /* returned to parent */
  33. int    sigpipe;    /* pipe write error flag */
  34.  
  35. struct st_work {
  36.     char    *cmd;        /* name of command to run */
  37.     char    **av;        /* arguments to command */
  38.     char    *input;        /* standard input buffer */
  39.     int    inpsize;    /* size of standard input buffer */
  40.     char    *outf;        /* standard output (filename) */
  41. } work[MAXWORK];
  42.  
  43. struct {
  44.     int    xmit;    /* # characters sent */
  45.     char    *bp;    /* std input buffer pointer */
  46.     int    blen;    /* std input buffer length */
  47.     int    fd;    /* stdin to command */
  48.     int    pid;    /* child PID */
  49.     char    *line;    /* start of input line */ 
  50.     int    firstjob;    /* inital piece of work */
  51.     int    thisjob;    /* current piece of work */
  52. } child[MAXCHILD], *cp;
  53.  
  54. main(argc, argv)
  55. int    argc;
  56. char    *argv[];
  57. {
  58.     int        i;
  59.     int        l;
  60.     int        fcopy = 0;    /* fd for copy output */
  61.     int        master = 1;    /* the REAL master, == 0 for clones */
  62.     int        nchild;        /* no. of children for a clone to run */
  63.     int        done;        /* count of children finished */
  64.     int        output;        /* aggregate output char count for all
  65.                    children */
  66.     int        c;
  67.     int        thiswork = 0;    /* next job stream to allocate */
  68.     int        nch;        /* # characters to write */
  69.     int        written;    /* # characters actully written */
  70.     char    logname[15];    /* name of the log file(s) */
  71.     int        onalarm();
  72.     int        pipeerr();
  73.     int        wrapup();
  74.     int        grunt();
  75.     int        pvec[2];    /* for pipes */
  76.     char    *p;
  77.     char    *prog;        /* my name */
  78.  
  79. #if ! debug
  80.     freopen("masterlog.00", "a", stderr);
  81. #endif
  82.     fprintf(stderr, "*** New Run ***  ");
  83.     prog = argv[0];
  84.     while (argc > 1 && argv[1][0] == '-')  {
  85.     p = &argv[1][1];
  86.     argc--;
  87.     argv++;
  88.     while (*p) {
  89.         switch (*p) {
  90.         case 'r':
  91.             est_rate = atoi(argv[1]);
  92.             sscanf(argv[1], "%f", &est_rate);
  93.             if (est_rate <= 0) {
  94.                 fprintf(stderr, "%s: bad rate, reset to %.2f chars/sec\n", prog, DEF_RATE);
  95.                 est_rate = DEF_RATE;
  96.             }
  97.             argc--;
  98.             argv++;
  99.             break;
  100.  
  101.         case 'c':
  102.             fcopy = open(argv[1], 1);
  103.             if (fcopy < 0)
  104.                 fcopy = creat(argv[1], 0600);
  105.             if (fcopy < 0) {
  106.                 fprintf(stderr, "%s: cannot open copy file '%s'\n",
  107.                 prog, argv[1]);
  108.                 exit(2);
  109.             }
  110.             lseek(fcopy, 0L, 2);    /* append at end of file */
  111.             argc--;
  112.             argv++;
  113.             break;
  114.  
  115.         default:
  116.         fprintf(stderr, "%s: bad flag '%c'\n", prog, *p);
  117.             exit(4);
  118.         }
  119.         p++;
  120.     }
  121.     }
  122.     
  123.     if (argc < 2) {
  124.     fprintf(stderr, "%s: missing nusers\n", prog);
  125.     exit(4);
  126.     }
  127.  
  128.     nusers = atoi(argv[1]);
  129.     if (nusers < 1) {
  130.     fprintf(stderr, "%s: impossible nusers (%d<-%s)\n", prog, nusers, argv[1]);
  131.     exit(4);
  132.     }
  133.     fprintf(stderr, "%d Users\n", nusers);
  134.     argc--;
  135.     argv++;
  136.  
  137.     /* build job streams */
  138.     getwork();
  139. #if debug
  140.     dumpwork();
  141. #endif
  142.  
  143.     /* clone copies of myself to run up to MAXCHILD jobs each */
  144.     firstuser = MAXCHILD;
  145.     fprintf(stderr, "master pid %d\n", getpid());
  146.     fflush(stderr);
  147.     while (nusers > MAXCHILD) {
  148.     fflush(stderr);
  149.     if (nusers >= 2*MAXCHILD)
  150.         /* the next clone must run MAXCHILD jobs */
  151.         nchild = MAXCHILD;
  152.     else
  153.         /* the next clone must run the leftover jobs */
  154.         nchild = nusers - MAXCHILD;
  155.     if ((l = fork()) == -1) {
  156.         /* fork failed */
  157.         fatal("** clone fork failed **\n");
  158.         goto bepatient;
  159.     } else if (l > 0) {
  160.         fprintf(stderr, "master clone pid %d\n", l);
  161.         /* I am the master with nchild fewer jobs to run */
  162.         nusers -= nchild;
  163.         firstuser += MAXCHILD;
  164.         continue;
  165.     } else {
  166.         /* I am a clone, run MAXCHILD jobs */
  167. #if ! debug
  168.         sprintf(logname, "masterlog.%02d", firstuser/MAXCHILD);
  169.         freopen(logname, "w", stderr);
  170. #endif
  171.         master = 0;
  172.         nusers = nchild;
  173.         break;
  174.     }
  175.     }
  176.     if (master)
  177.     firstuser = 0;
  178.  
  179.     close(0);
  180.     for (i = 0; i < nusers; i++ ) {
  181.     fprintf(stderr, "user %d job %d ", firstuser+i, thiswork);
  182.     if (pipe(pvec) == -1) {
  183.         /* this is fatal */
  184.         fatal("** pipe failed **\n");
  185.         goto bepatient;
  186.     }
  187.     fflush(stderr);
  188.     if ((child[i].pid = fork()) == 0) {
  189.         int    fd;
  190.         /* the command */
  191.         if (pvec[0] != 0) {
  192.         close(0);
  193.         dup(pvec[0]);
  194.         }
  195. #if ! debug
  196.         sprintf(logname, "userlog.%02d", firstuser+i);
  197.         freopen(logname, "w", stderr);
  198. #endif
  199.         for (fd = 3; fd < 24; fd++)
  200.         close(fd);
  201.         if (work[thiswork].outf[0] != '\0') {
  202.         /* redirect std output */
  203.         char    *q;
  204.         for (q = work[thiswork].outf; *q != '\n'; q++) ;
  205.         *q = '\0';
  206.         if (freopen(work[thiswork].outf, "w", stdout) == NULL) {
  207.             fprintf(stderr, "makework: cannot open %s for std output\n",
  208.             work[thiswork].outf);
  209.             fflush(stderr);
  210.         }
  211.         *q = '\n';
  212.         }
  213.         execv(work[thiswork].cmd, work[thiswork].av);
  214.         /* don't expect to get here! */
  215.         fatal("** exec failed **\n");
  216.         goto bepatient;
  217.     }
  218.     else if (child[i].pid == -1) {
  219.         fatal("** fork failed **\n");
  220.         goto bepatient;
  221.     }
  222.     else {
  223.         close(pvec[0]);
  224.         child[i].fd = pvec[1];
  225.         child[i].line = child[i].bp = work[thiswork].input;
  226.         child[i].blen = work[thiswork].inpsize;
  227.         child[i].thisjob = thiswork;
  228.         child[i].firstjob = thiswork;
  229.         fprintf(stderr, "pid %d pipe fd %d", child[i].pid, child[i].fd);
  230.         if (work[thiswork].outf[0] != '\0') {
  231.         char *q;
  232.         fprintf(stderr, " > ");
  233.         for (q=work[thiswork].outf; *q != '\n'; q++)
  234.             fputc(*q, stderr);
  235.         }
  236.         fputc('\n', stderr);
  237.         thiswork++;
  238.         if (thiswork >= nwork)
  239.         thiswork = 0;
  240.     }
  241.     }
  242.     fflush(stderr);
  243.  
  244.     srand(time(0));
  245.     thres = 0;
  246.     done = output = 0;
  247.     for (i = 0; i < nusers; i++) {
  248.     if (child[i].blen == 0)
  249.         done++;
  250.     else
  251.         thres += est_rate * GRANULE;
  252.     }
  253.     est_rate = thres;
  254.  
  255.     signal(SIGALRM, onalarm);
  256.     signal(SIGPIPE, pipeerr);
  257.     alarm(GRANULE);
  258.     while (done < nusers) {
  259.     for (i = 0; i < nusers; i++) {
  260.         cp = &child[i];
  261.         if (cp->xmit >= cp->blen) continue;
  262.         l = rand() % CHUNK + 1;    /* 1-CHUNK chars */
  263.         if (l == 0) continue;
  264.         if (cp->xmit + l > cp->blen)
  265.         l = cp->blen - cp->xmit;
  266.         p = cp->bp;
  267.         cp->bp += l;
  268.         cp->xmit += l;
  269. #if debug
  270.         fprintf(stderr, "child %d, %d processed, %d to go\n", i, cp->xmit, cp->blen - cp->xmit);
  271. #endif
  272.         while (p < cp->bp) {
  273.         if (*p == '\n' || (p == &cp->bp[-1] && cp->xmit >= cp->blen)) {
  274.             /* write it out */
  275.             nch = p - cp->line + 1;
  276.             if ((written = write(cp->fd, cp->line, nch)) != nch) {
  277.             /* argh! */
  278.             cp->line[nch] = '\0';
  279.             fprintf(stderr, "user %d job %d cmd %s ",
  280.                 firstuser+i, cp->thisjob, cp->line);
  281.              fprintf(stderr, "write(,,%d) returns %d\n", nch, written);
  282.             if (sigpipe)
  283.                 fatal("** SIGPIPE error **\n");
  284.             else
  285.                 fatal("** write error **\n");
  286.             goto bepatient;
  287.  
  288.             }
  289.             if (fcopy)
  290.             write(fcopy, cp->line, p - cp->line + 1);
  291. #if debug
  292.             fprintf(stderr, "child %d gets \"", i);
  293.             {
  294.             char *q = cp->line;
  295.             while (q <= p) {
  296.                 if (*q >= ' ' && *q <= '~')
  297.                     fputc(*q, stderr);
  298.                 else
  299.                     fprintf(stderr, "\\%03o", *q);
  300.                 q++;
  301.             }
  302.             }
  303.             fputc('"', stderr);
  304. #endif
  305.             cp->line = &p[1];
  306.         }
  307.         p++;
  308.         }
  309.         if (cp->xmit >= cp->blen) {
  310.         done++;
  311.         close(cp->fd);
  312. #if debug
  313.     fprintf(stderr, "child %d, close std input\n", i);
  314. #endif
  315.         }
  316.         output += l;
  317.     }
  318.     while (output > thres) {
  319.         pause();
  320. #if debug
  321.         fprintf(stderr, "after pause: output, thres, done %d %.2f %d\n", output, thres, done);
  322. #endif
  323.     }
  324.     }
  325.  
  326. bepatient:
  327.     alarm(0);
  328. /****
  329.  *  If everything is going OK, we should simply be able to keep
  330.  *  looping unitil 'wait' fails, however some descendent process may
  331.  *  be in a state from which it can never exit, and so a timeout
  332.  *  is used.
  333.  *  5 minutes should be ample, since the time to run all jobs is of
  334.  *  the order of 5-10 minutes, however some machines are painfully slow,
  335.  *  so the timeout has been set at 20 minutes (1200 seconds).
  336.  ****/
  337.     signal(SIGALRM, grunt);
  338.     alarm(1200);
  339.     while ((c = wait(&l)) != -1) {
  340.         for (i = 0; i < nusers; i++) {
  341.         if (c == child[i].pid) {
  342.         fprintf(stderr, "user %d job %d pid %d done", firstuser+i, child[i].thisjob, c);
  343.         if (l != 0) {
  344.             if (l & 0x7f)
  345.             fprintf(stderr, " status %d", l & 0x7f);
  346.             if (l & 0xff00)
  347.             fprintf(stderr, " exit code %d", (l>>8) & 0xff);
  348.             exit_status = 4;
  349.         }
  350.         fputc('\n', stderr);
  351.         c = child[i].pid = -1;
  352.         break;
  353.         }
  354.     }
  355.     if (c != -1) {
  356.         fprintf(stderr, "master clone done, pid %d ", c);
  357.         if (l != 0) {
  358.         if (l & 0x7f)
  359.             fprintf(stderr, " status %d", l & 0x7f);
  360.         if (l & 0xff00)
  361.             fprintf(stderr, " exit code %d", (l>>8) & 0xff);
  362.         exit_status = 4;
  363.         }
  364.         fputc('\n', stderr);
  365.     }
  366.     }
  367.     alarm(0);
  368.     wrapup("Finished waiting ...");
  369.  
  370.  
  371. }
  372.  
  373. onalarm()
  374. {
  375.     thres += est_rate;
  376.     signal(SIGALRM, onalarm);
  377.     alarm(GRANULE);
  378. }
  379.  
  380. grunt()
  381. {
  382.     /* timeout after label "bepatient" in main */
  383.     exit_status = 4;
  384.     wrapup("Timed out waiting for jobs to finish ...");
  385. }
  386.  
  387. pipeerr()
  388. {
  389.     sigpipe++;
  390. }
  391.  
  392. wrapup(reason)
  393. char    *reason;
  394. {
  395.     int i;
  396.     int killed = 0;
  397.     fflush(stderr);
  398.     for (i = 0; i < nusers; i++) {
  399.     if (child[i].pid > 0 && kill(child[i].pid, SIGKILL) != -1) {
  400.         if (!killed) {
  401.         killed++;
  402.         fprintf(stderr, "%s\n", reason);
  403.         fflush(stderr);
  404.         }
  405.         fprintf(stderr, "user %d job %d pid %d killed off\n", firstuser+i, child[i].thisjob, child[i].pid);
  406.     fflush(stderr);
  407.     }
  408.     }
  409.     exit(exit_status);
  410. }
  411.  
  412. getwork()
  413. {
  414.     int            i;
  415.     int            f;
  416.     int            ac;
  417.     char        *lp;
  418.     char        *q;
  419.     struct st_work    *w;
  420.     char        line[512];
  421.     char        c;
  422.     char        *malloc(), *realloc();
  423.  
  424.     while (gets(line) != NULL) {
  425.     if (nwork >= MAXWORK) {
  426.         fprintf(stderr, stderr, "Too many jobs specified, .. increase MAXWORK\n");
  427.         exit(4);
  428.     }
  429.     w = &work[nwork];
  430.     lp = line;
  431.     i = 1;
  432.     while (*lp && *lp != ' ') {
  433.         i++;
  434.         lp++;
  435.     }
  436.     w->cmd = (char *)malloc(i);
  437.     strncpy(w->cmd, line, i-1);
  438.     w->cmd[i-1] = '\0';
  439.     w->inpsize = 0;
  440.     w->input = "";
  441.     /* start to build arg list */
  442.     ac = 2;
  443.     w->av = (char **)malloc(2*sizeof(char *));
  444.     q = w->cmd;
  445.     while (*q) q++;
  446.     q--;
  447.     while (q >= w->cmd) {
  448.         if (*q == '/') {
  449.         q++;
  450.         break;
  451.         }
  452.         q--;
  453.     }
  454.     w->av[0] = q;
  455.     while (*lp) {
  456.         if (*lp == ' ') {
  457.         /* space */
  458.         lp++;
  459.         continue;
  460.         }
  461.         else if (*lp == '<') {
  462.         /* standard input for this job */
  463.         q = ++lp;
  464.         while (*lp && *lp != ' ') lp++;
  465.         c = *lp;
  466.         *lp = '\0';
  467.         if ((f = open(q, 0)) == -1) {
  468.             fprintf(stderr, "cannot open input file (%s) for job %d\n",
  469.                 q, nwork);
  470.             exit(4);
  471.         }
  472.         /* gobble input */
  473.         w->input = (char *)malloc(512);
  474.         while ((i = read(f, &w->input[w->inpsize], 512)) > 0) {
  475.             w->inpsize += i;
  476.             w->input = (char *)realloc(w->input, w->inpsize+512);
  477.         }
  478.         w->input = (char *)realloc(w->input, w->inpsize);
  479.         close(f);
  480.         /* extract stdout file name from line beginning "C=" */
  481.         w->outf = "";
  482.         for (q = w->input; q < &w->input[w->inpsize-10]; q++) {
  483.             if (*q == '\n' && strncmp(&q[1], "C=", 2) == 0) {
  484.             w->outf = &q[3];
  485.             break;
  486.             }
  487.         }
  488. #if debug
  489.         if (*w->outf) {
  490.             fprintf(stderr, "stdout->");
  491.             for (q=w->outf; *q != '\n'; q++)
  492.             fputc(*q, stderr);
  493.             fputc('\n', stderr);
  494.         }
  495. #endif
  496.         }
  497.         else {
  498.         /* a command option */
  499.         ac++;
  500.         w->av = (char **)realloc(w->av, ac*sizeof(char *));
  501.         q = lp;
  502.         i = 1;
  503.         while (*lp && *lp != ' ') {
  504.             lp++;
  505.             i++;
  506.         }
  507.         w->av[ac-2] = (char *)malloc(i);
  508.         strncpy(w->av[ac-2], q, i-1);
  509.         w->av[ac-2][i-1] = '\0';
  510.         }
  511.     }
  512.     w->av[ac-1] = (char *)0;
  513.     nwork++;
  514.     }
  515. }
  516.  
  517. #if debug
  518. dumpwork()
  519. {
  520.     int        i;
  521.     int        j;
  522.  
  523.     for (i = 0; i < nwork; i++) {
  524.     fprintf(stderr, "job %d: cmd: %s\n", i, work[i].cmd);
  525.     j = 0;
  526.     while (work[i].av[j]) {
  527.         fprintf(stderr, "argv[%d]: %s\n", j, work[i].av[j]);
  528.         j++;
  529.     }
  530.     fprintf(stderr, "input: %d chars text: ", work[i].inpsize);
  531.     if (work[i].input == (char *)0)
  532.         fprintf(stderr, "<NULL>\n");
  533.     else {
  534.             register char    *pend;
  535.             char        *p;
  536.         char        c;
  537.         p = work[i].input;
  538.         while (*p) {
  539.             pend = p;
  540.             while (*pend && *pend != '\n')
  541.                 pend++;
  542.             c = *pend;
  543.             *pend = '\0';
  544.             fprintf(stderr, "%s\n", p);
  545.             *pend = c;
  546.             p = &pend[1];
  547.         }
  548.     }
  549.     }
  550. }
  551. #endif
  552.  
  553. fatal(s)
  554. char *s;
  555. {
  556.     int    i;
  557.     fprintf(stderr, s);
  558.     fflush(stderr);
  559.     perror("Reason?");
  560.     fflush(stderr);
  561.     for (i = 0; i < nusers; i++) {
  562.     if (child[i].pid > 0 && kill(child[i].pid, SIGKILL) != -1) {
  563.         fprintf(stderr, "pid %d killed off\n", child[i].pid);
  564.         fflush(stderr);
  565.     }
  566.     }
  567.     exit_status = 4;
  568.     return;
  569. }
  570.