home *** CD-ROM | disk | FTP | other *** search
/ Usenet 1994 October / usenetsourcesnewsgroupsinfomagicoctober1994disk2.iso / unix / volume22 / queuer / part03 / enqueue.c next >
C/C++ Source or Header  |  1990-06-07  |  12KB  |  555 lines

  1. /* Copyright 1990  The President and Fellows of Harvard University
  2.  
  3. Permission to use, copy, modify, and distribute this program for any
  4. purpose and without fee is hereby granted, provided that this
  5. copyright and permission notice appear on all copies and supporting
  6. documentation, the name of Harvard University not be used in advertising
  7. or publicity pertaining to distribution of the program, or to results
  8. derived from its use, without specific prior written permission, and notice
  9. be given in supporting documentation that copying and distribution is by
  10. permission of Harvard University.  Harvard University makes no
  11. representations about the suitability of this software for any purpose.
  12. It is provided "as is" without express or implied warranty.    */
  13.  
  14.  
  15. /* enqueue.c - Dan Lanciani '85 */
  16.  
  17. #include <sys/param.h>
  18. #include <sys/stat.h>
  19. #include <sys/socket.h>
  20. #include <netinet/in.h>
  21. #include <sys/times.h>
  22. #include <signal.h>
  23. #include <netdb.h>
  24. #include <pwd.h>
  25. #include <grp.h>
  26. #include <errno.h>
  27. #include <stdio.h>
  28.  
  29. #include "queue.h"
  30.  
  31. extern int errno;
  32. extern char **environ;
  33. int queuegroup, flat, pid, nofiles, nifiles, byebye();
  34. time_t starttime, time();
  35. char **ofiles, **ifiles, **lfiles;
  36. long unid;
  37.  
  38. enqueue(s, host, sin, local, recover)
  39. char *host;
  40. struct sockaddr_in *sin;
  41. {
  42.     int se = -1, sargc, i, uid, gid, ng, nenviron;
  43.     register long l;
  44.     gid_t groups[NGROUPS];
  45.     char **sargv, buf[BUFSIZ], *user, *group, *grl[XGROUPS];
  46.     char *cwd, *xcwd, **envp, name1[BUFSIZ], name2[BUFSIZ];
  47.     register struct passwd *pw;
  48.     register struct group *gr;
  49.     struct stat statb;
  50.     struct hostent *hp;
  51.     struct servent *sp;
  52.     struct sockaddr_in sin2;
  53.     register FILE *n, *m, *ck;
  54.     FILE *popen();
  55.  
  56.     if(gr = getgrnam(QUEUEGROUP))
  57.         queuegroup = gr->gr_gid;
  58.     else
  59.         queuegroup = NOBODY;
  60.     getstr(s, buf);
  61.     if(recover) {
  62.         host = newstring(buf);
  63.         gethostname(buf, sizeof(buf));
  64.         local = !strcmp(host, buf);
  65.     }
  66.     else {
  67.         i = (u_short)atoi(buf);
  68.         if(i) {
  69.             if(i >= IPPORT_RESERVED)
  70.                 exit(1);
  71.             ng = IPPORT_RESERVED - 1;
  72.             if((se = rresvport(&ng)) < 0)
  73.                 exit(1);
  74.             sin->sin_port = htons((u_short)i);
  75.             if(connect(se, sin, sizeof(struct sockaddr_in)))
  76.                 exit(1);
  77.         }
  78.     }
  79.     flat = local || infile(FLATFILE, host);
  80.     sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  81.     if(!(ck = fopen(buf, "w")))
  82.         exit(1);
  83.     xfputs(host, ck);
  84.     getstr(s, buf);
  85.     unid = atol(buf);
  86.     sprintf(buf, "%s/%ld", SPOOLDIR, unid);
  87.     if(unid)
  88.         unlink(buf);
  89.     if(recover)
  90.         unid = 0;
  91.     fprintf(ck, "%ld", unid);
  92.     putc('\0', ck);
  93.     if(unid) {
  94.         if(!(n = fopen(buf, "w")))
  95.             exit(1);
  96.         fprintf(n, "%d", getpid());
  97.         putc('\0', n);
  98.         fclose(n);
  99.     }
  100.     getstr(s, buf);
  101.     xfputs(buf, ck);
  102.     sargc = atoi(buf);
  103.     sargv = (char **)malloc((sargc+1) * sizeof(char *));
  104.     for(i = 0; i < sargc; i++) {
  105.         sargv[i] = newstring(getstr(s, buf));
  106.         xfputs(buf, ck);
  107.     }
  108.     sargv[i] = NULL;
  109.     user = newstring(getstr(s, buf));
  110.     xfputs(buf, ck);
  111.     if(flat) {
  112.         if(!(pw = getpwnam(user)))
  113.             exit(1);
  114.         uid = pw->pw_uid;
  115.     }
  116.     else
  117.         uid = NOBODY;
  118.     group = newstring(getstr(s, buf));
  119.     xfputs(buf, ck);
  120.     if(flat) {
  121.         if(!(gr = getgrnam(group)))
  122.             exit(1);
  123.         gid = gr->gr_gid;
  124.     }
  125.     else
  126.         gid = NOBODY;
  127.     getstr(s, buf);
  128.     xfputs(buf, ck);
  129.     if((ng = atoi(buf)) > XGROUPS)
  130.         exit(1);
  131.     for(i = 0; i < ng; i++) {
  132.         grl[i] = newstring(getstr(s, buf));
  133.         xfputs(buf, ck);
  134.         if(flat) {
  135.             if(!(gr = getgrnam(grl[i])))
  136.                 exit(1);
  137.             groups[i] = gr->gr_gid;
  138.         }
  139.         else
  140.             groups[i] = NOGROUP;
  141.     }
  142.     xcwd = cwd = newstring(getstr(s, buf));
  143.     xfputs(buf, ck);
  144.     getstr(s, buf);
  145.     nenviron = atoi(buf);
  146.     xfputs(buf, ck);
  147.     envp = (char **)malloc((nenviron+1) * sizeof(char *));
  148.     for(i = 0; i < nenviron; i++) {
  149.         envp[i] = newstring(getstr(s, buf));
  150.         xfputs(buf, ck);
  151.     }
  152.     envp[i] = NULL;
  153.     environ = envp;
  154.     if(readconf(sargv[0]))
  155.         exit(1);
  156.     if(recover) {
  157.         if(mode == QM_INTERACTIVE || !restart) {
  158.             fclose(ck);
  159.             sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  160.             unlink(buf);
  161.             sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
  162.             if(!access(buf, 0)) {
  163.                 if(!vfork()) {
  164.                     execl("/bin/rm", "rm", "-rf", buf, 0);
  165.                     exit(1);
  166.                 }
  167.                 wait(0);
  168.             }
  169.             exit(0);
  170.         }
  171.         mode = QM_BATCH;
  172.     }
  173.  
  174.     if(!local) {
  175.         if(!recover) {
  176.             sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
  177.             if(!access(buf, 0)) {
  178.                 if(!vfork()) {
  179.                     execl("/bin/rm", "rm", "-rf", buf, 0);
  180.                     exit(1);
  181.                 }
  182.                 wait(0);
  183.             }
  184.             mkdir(buf, 0700);
  185.         }
  186.         chown(buf, uid, gid);
  187.         xcwd = newstring(buf);
  188.     }
  189.     if(priv)
  190.         setregid(gid, queuegroup);
  191.     else
  192.         setgid(gid);
  193.     setgroups(ng, groups);
  194.     setreuid(uid, 0);
  195.     chdir(xcwd);
  196.     nofiles = nifiles = 0;
  197.     while(1) {
  198.         getstr(s, buf);
  199.         xfputs(buf, ck);
  200.         if(!strcmp(buf, "done"))
  201.             break;
  202.         if(!strcmp(buf, "copyout")) {
  203.             getstr(s, buf);
  204.             xfputs(buf, ck);
  205.             nofiles = atoi(buf);
  206.             ofiles = (char **)malloc(nofiles * sizeof(char *));
  207.             for(i = 0; i < nofiles; i++) {
  208.                 getstr(s, buf);
  209.                 xfputs(buf, ck);
  210.                 ofiles[i] = newstring(buf);
  211.             }
  212.             continue;
  213.         }
  214.         if(!strcmp(buf, "efs")) {
  215.             getstr(s, buf);
  216.             xfputs(buf, ck);
  217.             nifiles = atoi(buf);
  218.             ifiles = (char **)malloc(nifiles * sizeof(char *));
  219.             lfiles = (char **)malloc(nifiles * sizeof(char *));
  220.             for(i = 0; i < nifiles; i++) {
  221.                 getstr(s, buf);
  222.                 xfputs(buf, ck);
  223.                 ifiles[i] = newstring(buf);
  224. #ifdef    SANEEFS
  225.                 if(ifiles[i][0] == '/') {
  226.                     sprintf(buf,"/r/%s%s",host,ifiles[i]);
  227.                     lfiles[i] = newstring(buf);
  228.                 }
  229.                 else
  230.                     lfiles[i] = ifiles[i];
  231. #else
  232.                 getstr(s, buf);
  233.                 xfputs(buf, ck);
  234.                 lfiles[i] = newstring(buf);
  235. #endif
  236.             }
  237.             if(xcwd != cwd)
  238.                 free(xcwd);
  239.             if(strncmp(cwd, "/r/", 3)) {
  240.                 sprintf(buf, "/r/%s%s", host, cwd);
  241.                 xcwd = newstring(buf);
  242.             }
  243.             else
  244.                 xcwd = cwd;
  245.             chdir(xcwd);
  246.             continue;
  247.         }
  248.         if(!strcmp(buf, "copyin")) {
  249.             getstr(s, buf);
  250.             xfputs(buf, ck);
  251.             nifiles = atoi(buf);
  252.             ifiles = (char **)malloc(nifiles * sizeof(char *));
  253.             lfiles = (char **)malloc(nifiles * sizeof(char *));
  254.             for(i = 0; i < nifiles; i++) {
  255.                 getstr(s, buf);
  256.                 xfputs(buf, ck);
  257.                 ifiles[i] = newstring(buf);
  258.                 if(rindex(ifiles[i], '/'))
  259.                     lfiles[i] = rindex(ifiles[i], '/') + 1;
  260.                 else
  261.                     lfiles[i] = ifiles[i];
  262.                 if(!recover) {
  263.                     if(!(n = fopen(lfiles[i], "w")))
  264.                         exit(1);
  265.                     chown(lfiles[i], uid, gid);
  266.                     getstr(s, buf);
  267.                     l = atol(buf);
  268.                     if(l < 0) {
  269.                         unlink(lfiles[i]);
  270.                         l = 0;
  271.                     }
  272.                     while(l) {
  273.                         ng = sizeof(buf);
  274.                         if(ng > l)
  275.                             ng = l;
  276.                         if((ng = read(s, buf, ng)) <= 0)
  277.                             exit(1);
  278.                         l -= ng;
  279.                         fwrite(buf, ng, 1, n);
  280.                     }
  281.                     fclose(n);
  282.                 }
  283.             }
  284.         continue;
  285.         }
  286.     }
  287.     fclose(ck);
  288.     for(i = 1; i < sargc; i++)
  289.         for(ng = 0; ng < nifiles; ng++)
  290.             if(!strcmp(sargv[i], ifiles[ng]))
  291.                 sargv[i] = lfiles[ng];
  292.  
  293.     if(mode == QM_BATCH) {
  294.         close(s);
  295.         if(se >= 0)
  296.             close(se);
  297.         sprintf(buf, "%s/%d.batch", SPOOLDIR, getpid());
  298.         s = creat(buf, 0600);
  299.         chown(buf, uid, gid);
  300.         sprintf(buf, "%s/%d.ebatch", SPOOLDIR, getpid());
  301.         se = creat(buf, 0600);
  302.         chown(buf, uid, gid);
  303.     }
  304.     for(i = 0; i < 3; i++)
  305.         dup2(s, i);
  306.     if(se >= 0)
  307.         dup2(se, 2);
  308.     close(s);
  309.     close(se);
  310.     if(mode != QM_INTERACTIVE) {
  311.         close(0);
  312.         open("/dev/null", 2);
  313.     }
  314.  
  315.     waitrun(queue);
  316.     if(minload)
  317.         while(getload() > minload)
  318.             sleep(60);
  319.     sprintf(buf, "%s/%d", SPOOLDIR, getpid());
  320.     chmod(buf, 0755);
  321.     pid = 0;
  322.     starttime = time(0);
  323.     signal(SIGTERM, byebye);
  324.     pid = cspawn(prog, sargv);
  325.     pcontrol();
  326.     while((i = wait(0)) != pid)
  327.         if(i < 0 && errno != EINTR)
  328.             break;
  329.     signal(SIGALRM, SIG_IGN);
  330.     alarm(0);
  331.     signal(SIGTERM, SIG_IGN);
  332.     killpg(pid, SIGHUP);
  333.     killpg(pid, SIGCONT);
  334.     sleep(2);
  335.     killpg(pid, 9);
  336.     pid = getpid();
  337.     if(!access(QACCT, 0) && (n = fopen(QACCT, "a"))) {
  338.         struct tms tms;
  339.         times(&tms);
  340.         fprintf(n, "%ld\t%ld\t%s\t%s\t%s\t%s\n",
  341.             tms.tms_utime, tms.tms_stime,
  342.             sargv[0], user, group, host);
  343.         fclose(n);
  344.     }
  345.     if(!local) {
  346.         sprintf(buf, "%s/%d.dir", SPOOLDIR, pid);
  347.         if(!access(buf, 0)) {
  348.             if(!vfork()) {
  349.                 execl("/bin/rm", "rm", "-rf", buf, 0);
  350.                 exit(1);
  351.             }
  352.             wait(0);
  353.         }
  354.     }
  355.     if(unid) {
  356.         sprintf(buf, "%s/%ld", SPOOLDIR, unid);
  357.         unlink(buf);
  358.     }
  359.     sprintf(name1, "%s/%s", SPOOLDIR, queue);
  360.     lock(name1);
  361.     n = fopen(name1, "r");
  362.     strcpy(name2, name1);
  363.     strcat(name2, ".tmp");
  364.     m = fopen(name2, "w");
  365.     i = 0;
  366.     while(fgets(buf, sizeof(buf), n))
  367.         if(atoi(buf) != pid) {
  368.             if(i++ < maxrun)
  369.                 kill(atoi(buf), SIGALRM);
  370.             fputs(buf, m);
  371.         }
  372.     fclose(m);
  373.     fclose(n);
  374.     unlink(name1);
  375.     link(name2, name1);
  376.     unlink(name2);
  377.     unlock(name1);
  378.     for(i = 0; i < 3; i++)
  379.         close(i);
  380.     open("/dev/null", 2);
  381.     dup(0);
  382.     dup(0);
  383.     if(*qm && (sp = getservbyname("qmaster", "udp")) &&
  384.         (hp = gethostbyname(qm)) &&
  385.         (i = socket(AF_INET, SOCK_DGRAM, 0)) >= 0) {
  386.         *buf = 0;
  387.         if(unid)
  388.             sprintf(buf + 1, "%ld", unid);
  389.         else
  390.             sprintf(buf + 1, "%d", pid);
  391.         sin2.sin_family = hp->h_addrtype;
  392.         sin2.sin_port = sp->s_port;
  393.         bcopy(hp->h_addr, &sin2.sin_addr, hp->h_length);
  394.         sendto(i, buf, 2 + strlen(buf + 1), 0, &sin2, sizeof(sin2));
  395.         close(i);
  396.     }
  397.     if(mode == QM_BATCH) {
  398.         int sentsome = 0;
  399.         char *p;
  400.  
  401.         sprintf(name1, "%s@%s", user, host);
  402.         name2[0] = '\0';
  403.         for(i = 0; i < sargc - 1; i++) {
  404.             strcat(name2, sargv[i]);
  405.             strcat(name2, " ");
  406.         }
  407.         strcat(name2, sargv[sargc - 1]);
  408.         sprintf(buf, "%s/%d.batch", SPOOLDIR, pid);
  409.         close(0);
  410.         open(buf, 0);
  411.         unlink(buf);
  412.         if(!fstat(0, &statb) && statb.st_size) {
  413.             sprintf(buf, "batch job %d output (%s)", pid, name2);
  414.             if(!vfork()) {
  415.                 setuid(uid);
  416.                 execl("/usr/ucb/Mail", "Mail", "-s",
  417.                     buf, name1, 0);
  418.                 exit(1);
  419.             }
  420.             wait(0);
  421.             sentsome++;
  422.         }
  423.         sprintf(buf, "%s/%d.ebatch", SPOOLDIR, pid);
  424.         close(0);
  425.         open(buf, 0);
  426.         unlink(buf);
  427.         if(!fstat(0, &statb) && statb.st_size) {
  428.             sprintf(buf, "batch job %d errors (%s)", pid, name2);
  429.             if(!vfork()) {
  430.                 setuid(uid);
  431.                 execl("/usr/ucb/Mail", "Mail", "-s",
  432.                     buf, name1, 0);
  433.                 exit(1);
  434.             }
  435.             wait(0);
  436.             sentsome++;
  437.         }
  438.         close(0);
  439.         dup(1);
  440.         sprintf(buf, "%s/%d", SPOOLDIR, pid);
  441.         unlink(buf);
  442.         setuid(uid);
  443.         if(!(p = getenv("QNOTIFY")))
  444.             p = "mail";
  445.         if(!strcmp(p, "send") || !strcmp(p,"saml")||!strcmp(p,"soml")) {
  446.             sprintf(buf, "exec /usr/lib/sendmail -S -eq %s", name1);
  447.             i = 1;
  448.             if(n = popen(buf, "w")) {
  449.             fprintf(n, "Your batch job %d is finished.\n", pid);
  450.             fprintf(n, "\"%s\"\n", name2);
  451.             i = pclose(n);
  452.             }
  453.         }
  454.         if(!sentsome &&
  455. (!strcmp(p, "mail") || !strcmp(p, "saml") || (!strcmp(p, "soml") && i))) {
  456.             sprintf(buf, "exec /usr/ucb/Mail -s 'batch job' %s",
  457.                 name1);
  458.             if(n = popen(buf, "w")) {
  459.             fprintf(n, "Your batch job %d is finished.\n", pid);
  460.             fprintf(n, "\"%s\"\n", name2);
  461.             pclose(n);
  462.             }
  463.         }
  464.     }
  465.     else {
  466.         sprintf(buf, "%s/%d", SPOOLDIR, pid);
  467.         unlink(buf);
  468.     }
  469. }
  470.  
  471. byebye()
  472. {    
  473.     if(pid) {
  474.         killpg(pid, SIGHUP);
  475.         killpg(pid, SIGCONT);
  476.         kill(pid, SIGCONT);
  477.         killpg(pid, 9);
  478.         kill(pid, 9);
  479.     }
  480.     signal(SIGTERM, byebye);
  481. }
  482.  
  483. catch()
  484. {
  485.     signal(SIGALRM, catch);
  486. }
  487.  
  488. int running = 1;
  489.  
  490. pcontrol()
  491. {
  492.     int load;
  493.  
  494.     if(maxtime && time(0) - starttime > maxtime)
  495.         byebye();
  496.     if(minload != maxload) {
  497.         load = getload();
  498.         if(running) {
  499.             if(load >= maxload) {
  500.                 running = 0;
  501.                 killpg(pid, SIGSTOP);
  502.             }
  503.         }
  504.         else {
  505.             if(load <= minload) {
  506.                 running = 1;
  507.                 killpg(pid, SIGCONT);
  508.             }
  509.         }
  510.     }
  511.     signal(SIGALRM, pcontrol);
  512.     alarm(60);
  513. }
  514.  
  515. waitrun(q)
  516. char *q;
  517. {
  518.     int i, pid;
  519.     char buf[BUFSIZ], p[BUFSIZ];
  520.     FILE *n;
  521.  
  522.     signal(SIGALRM, catch);
  523.     sprintf(p, "%s/%s", SPOOLDIR, q);
  524.     pid = getpid();
  525.     lock(p);
  526.     if(!(n = fopen(p, "a"))) {
  527.         unlock(p);
  528.         exit(1);
  529.     }
  530.     fprintf(n, "%d\n", pid);
  531.     fclose(n);
  532.     unlock(p);
  533.     while(1) {
  534.         lock(p);
  535.         if(!(n = fopen(p, "r"))) {
  536.             unlock(p);
  537.             exit(1);
  538.         }
  539.         for(i = 0; i < maxrun; i++) {
  540.             fgets(buf, sizeof(buf), n);
  541.             buf[strlen(buf)-1] = '\0';
  542.             if(atoi(buf) == pid) {
  543.                 fclose(n);
  544.                 unlock(p);
  545.                 return;
  546.             }
  547.         }
  548.         fclose(n);
  549.         unlock(p);
  550.         alarm(300);
  551.         pause();
  552.         alarm(0);
  553.     }
  554. }
  555.