home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Usenet 1994 October
/
usenetsourcesnewsgroupsinfomagicoctober1994disk2.iso
/
unix
/
volume22
/
queuer
/
part03
/
enqueue.c
next >
Wrap
C/C++ Source or Header
|
1990-06-07
|
12KB
|
555 lines
/* Copyright 1990 The President and Fellows of Harvard University
Permission to use, copy, modify, and distribute this program for any
purpose and without fee is hereby granted, provided that this
copyright and permission notice appear on all copies and supporting
documentation, the name of Harvard University not be used in advertising
or publicity pertaining to distribution of the program, or to results
derived from its use, without specific prior written permission, and notice
be given in supporting documentation that copying and distribution is by
permission of Harvard University. Harvard University makes no
representations about the suitability of this software for any purpose.
It is provided "as is" without express or implied warranty. */
/* enqueue.c - Dan Lanciani '85 */
#include <sys/param.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/times.h>
#include <signal.h>
#include <netdb.h>
#include <pwd.h>
#include <grp.h>
#include <errno.h>
#include <stdio.h>
#include "queue.h"
extern int errno;
extern char **environ;
int queuegroup, flat, pid, nofiles, nifiles, byebye();
time_t starttime, time();
char **ofiles, **ifiles, **lfiles;
long unid;
enqueue(s, host, sin, local, recover)
char *host;
struct sockaddr_in *sin;
{
int se = -1, sargc, i, uid, gid, ng, nenviron;
register long l;
gid_t groups[NGROUPS];
char **sargv, buf[BUFSIZ], *user, *group, *grl[XGROUPS];
char *cwd, *xcwd, **envp, name1[BUFSIZ], name2[BUFSIZ];
register struct passwd *pw;
register struct group *gr;
struct stat statb;
struct hostent *hp;
struct servent *sp;
struct sockaddr_in sin2;
register FILE *n, *m, *ck;
FILE *popen();
if(gr = getgrnam(QUEUEGROUP))
queuegroup = gr->gr_gid;
else
queuegroup = NOBODY;
getstr(s, buf);
if(recover) {
host = newstring(buf);
gethostname(buf, sizeof(buf));
local = !strcmp(host, buf);
}
else {
i = (u_short)atoi(buf);
if(i) {
if(i >= IPPORT_RESERVED)
exit(1);
ng = IPPORT_RESERVED - 1;
if((se = rresvport(&ng)) < 0)
exit(1);
sin->sin_port = htons((u_short)i);
if(connect(se, sin, sizeof(struct sockaddr_in)))
exit(1);
}
}
flat = local || infile(FLATFILE, host);
sprintf(buf, "%s/%d", SPOOLDIR, getpid());
if(!(ck = fopen(buf, "w")))
exit(1);
xfputs(host, ck);
getstr(s, buf);
unid = atol(buf);
sprintf(buf, "%s/%ld", SPOOLDIR, unid);
if(unid)
unlink(buf);
if(recover)
unid = 0;
fprintf(ck, "%ld", unid);
putc('\0', ck);
if(unid) {
if(!(n = fopen(buf, "w")))
exit(1);
fprintf(n, "%d", getpid());
putc('\0', n);
fclose(n);
}
getstr(s, buf);
xfputs(buf, ck);
sargc = atoi(buf);
sargv = (char **)malloc((sargc+1) * sizeof(char *));
for(i = 0; i < sargc; i++) {
sargv[i] = newstring(getstr(s, buf));
xfputs(buf, ck);
}
sargv[i] = NULL;
user = newstring(getstr(s, buf));
xfputs(buf, ck);
if(flat) {
if(!(pw = getpwnam(user)))
exit(1);
uid = pw->pw_uid;
}
else
uid = NOBODY;
group = newstring(getstr(s, buf));
xfputs(buf, ck);
if(flat) {
if(!(gr = getgrnam(group)))
exit(1);
gid = gr->gr_gid;
}
else
gid = NOBODY;
getstr(s, buf);
xfputs(buf, ck);
if((ng = atoi(buf)) > XGROUPS)
exit(1);
for(i = 0; i < ng; i++) {
grl[i] = newstring(getstr(s, buf));
xfputs(buf, ck);
if(flat) {
if(!(gr = getgrnam(grl[i])))
exit(1);
groups[i] = gr->gr_gid;
}
else
groups[i] = NOGROUP;
}
xcwd = cwd = newstring(getstr(s, buf));
xfputs(buf, ck);
getstr(s, buf);
nenviron = atoi(buf);
xfputs(buf, ck);
envp = (char **)malloc((nenviron+1) * sizeof(char *));
for(i = 0; i < nenviron; i++) {
envp[i] = newstring(getstr(s, buf));
xfputs(buf, ck);
}
envp[i] = NULL;
environ = envp;
if(readconf(sargv[0]))
exit(1);
if(recover) {
if(mode == QM_INTERACTIVE || !restart) {
fclose(ck);
sprintf(buf, "%s/%d", SPOOLDIR, getpid());
unlink(buf);
sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
if(!access(buf, 0)) {
if(!vfork()) {
execl("/bin/rm", "rm", "-rf", buf, 0);
exit(1);
}
wait(0);
}
exit(0);
}
mode = QM_BATCH;
}
if(!local) {
if(!recover) {
sprintf(buf, "%s/%d.dir", SPOOLDIR, getpid());
if(!access(buf, 0)) {
if(!vfork()) {
execl("/bin/rm", "rm", "-rf", buf, 0);
exit(1);
}
wait(0);
}
mkdir(buf, 0700);
}
chown(buf, uid, gid);
xcwd = newstring(buf);
}
if(priv)
setregid(gid, queuegroup);
else
setgid(gid);
setgroups(ng, groups);
setreuid(uid, 0);
chdir(xcwd);
nofiles = nifiles = 0;
while(1) {
getstr(s, buf);
xfputs(buf, ck);
if(!strcmp(buf, "done"))
break;
if(!strcmp(buf, "copyout")) {
getstr(s, buf);
xfputs(buf, ck);
nofiles = atoi(buf);
ofiles = (char **)malloc(nofiles * sizeof(char *));
for(i = 0; i < nofiles; i++) {
getstr(s, buf);
xfputs(buf, ck);
ofiles[i] = newstring(buf);
}
continue;
}
if(!strcmp(buf, "efs")) {
getstr(s, buf);
xfputs(buf, ck);
nifiles = atoi(buf);
ifiles = (char **)malloc(nifiles * sizeof(char *));
lfiles = (char **)malloc(nifiles * sizeof(char *));
for(i = 0; i < nifiles; i++) {
getstr(s, buf);
xfputs(buf, ck);
ifiles[i] = newstring(buf);
#ifdef SANEEFS
if(ifiles[i][0] == '/') {
sprintf(buf,"/r/%s%s",host,ifiles[i]);
lfiles[i] = newstring(buf);
}
else
lfiles[i] = ifiles[i];
#else
getstr(s, buf);
xfputs(buf, ck);
lfiles[i] = newstring(buf);
#endif
}
if(xcwd != cwd)
free(xcwd);
if(strncmp(cwd, "/r/", 3)) {
sprintf(buf, "/r/%s%s", host, cwd);
xcwd = newstring(buf);
}
else
xcwd = cwd;
chdir(xcwd);
continue;
}
if(!strcmp(buf, "copyin")) {
getstr(s, buf);
xfputs(buf, ck);
nifiles = atoi(buf);
ifiles = (char **)malloc(nifiles * sizeof(char *));
lfiles = (char **)malloc(nifiles * sizeof(char *));
for(i = 0; i < nifiles; i++) {
getstr(s, buf);
xfputs(buf, ck);
ifiles[i] = newstring(buf);
if(rindex(ifiles[i], '/'))
lfiles[i] = rindex(ifiles[i], '/') + 1;
else
lfiles[i] = ifiles[i];
if(!recover) {
if(!(n = fopen(lfiles[i], "w")))
exit(1);
chown(lfiles[i], uid, gid);
getstr(s, buf);
l = atol(buf);
if(l < 0) {
unlink(lfiles[i]);
l = 0;
}
while(l) {
ng = sizeof(buf);
if(ng > l)
ng = l;
if((ng = read(s, buf, ng)) <= 0)
exit(1);
l -= ng;
fwrite(buf, ng, 1, n);
}
fclose(n);
}
}
continue;
}
}
fclose(ck);
for(i = 1; i < sargc; i++)
for(ng = 0; ng < nifiles; ng++)
if(!strcmp(sargv[i], ifiles[ng]))
sargv[i] = lfiles[ng];
if(mode == QM_BATCH) {
close(s);
if(se >= 0)
close(se);
sprintf(buf, "%s/%d.batch", SPOOLDIR, getpid());
s = creat(buf, 0600);
chown(buf, uid, gid);
sprintf(buf, "%s/%d.ebatch", SPOOLDIR, getpid());
se = creat(buf, 0600);
chown(buf, uid, gid);
}
for(i = 0; i < 3; i++)
dup2(s, i);
if(se >= 0)
dup2(se, 2);
close(s);
close(se);
if(mode != QM_INTERACTIVE) {
close(0);
open("/dev/null", 2);
}
waitrun(queue);
if(minload)
while(getload() > minload)
sleep(60);
sprintf(buf, "%s/%d", SPOOLDIR, getpid());
chmod(buf, 0755);
pid = 0;
starttime = time(0);
signal(SIGTERM, byebye);
pid = cspawn(prog, sargv);
pcontrol();
while((i = wait(0)) != pid)
if(i < 0 && errno != EINTR)
break;
signal(SIGALRM, SIG_IGN);
alarm(0);
signal(SIGTERM, SIG_IGN);
killpg(pid, SIGHUP);
killpg(pid, SIGCONT);
sleep(2);
killpg(pid, 9);
pid = getpid();
if(!access(QACCT, 0) && (n = fopen(QACCT, "a"))) {
struct tms tms;
times(&tms);
fprintf(n, "%ld\t%ld\t%s\t%s\t%s\t%s\n",
tms.tms_utime, tms.tms_stime,
sargv[0], user, group, host);
fclose(n);
}
if(!local) {
sprintf(buf, "%s/%d.dir", SPOOLDIR, pid);
if(!access(buf, 0)) {
if(!vfork()) {
execl("/bin/rm", "rm", "-rf", buf, 0);
exit(1);
}
wait(0);
}
}
if(unid) {
sprintf(buf, "%s/%ld", SPOOLDIR, unid);
unlink(buf);
}
sprintf(name1, "%s/%s", SPOOLDIR, queue);
lock(name1);
n = fopen(name1, "r");
strcpy(name2, name1);
strcat(name2, ".tmp");
m = fopen(name2, "w");
i = 0;
while(fgets(buf, sizeof(buf), n))
if(atoi(buf) != pid) {
if(i++ < maxrun)
kill(atoi(buf), SIGALRM);
fputs(buf, m);
}
fclose(m);
fclose(n);
unlink(name1);
link(name2, name1);
unlink(name2);
unlock(name1);
for(i = 0; i < 3; i++)
close(i);
open("/dev/null", 2);
dup(0);
dup(0);
if(*qm && (sp = getservbyname("qmaster", "udp")) &&
(hp = gethostbyname(qm)) &&
(i = socket(AF_INET, SOCK_DGRAM, 0)) >= 0) {
*buf = 0;
if(unid)
sprintf(buf + 1, "%ld", unid);
else
sprintf(buf + 1, "%d", pid);
sin2.sin_family = hp->h_addrtype;
sin2.sin_port = sp->s_port;
bcopy(hp->h_addr, &sin2.sin_addr, hp->h_length);
sendto(i, buf, 2 + strlen(buf + 1), 0, &sin2, sizeof(sin2));
close(i);
}
if(mode == QM_BATCH) {
int sentsome = 0;
char *p;
sprintf(name1, "%s@%s", user, host);
name2[0] = '\0';
for(i = 0; i < sargc - 1; i++) {
strcat(name2, sargv[i]);
strcat(name2, " ");
}
strcat(name2, sargv[sargc - 1]);
sprintf(buf, "%s/%d.batch", SPOOLDIR, pid);
close(0);
open(buf, 0);
unlink(buf);
if(!fstat(0, &statb) && statb.st_size) {
sprintf(buf, "batch job %d output (%s)", pid, name2);
if(!vfork()) {
setuid(uid);
execl("/usr/ucb/Mail", "Mail", "-s",
buf, name1, 0);
exit(1);
}
wait(0);
sentsome++;
}
sprintf(buf, "%s/%d.ebatch", SPOOLDIR, pid);
close(0);
open(buf, 0);
unlink(buf);
if(!fstat(0, &statb) && statb.st_size) {
sprintf(buf, "batch job %d errors (%s)", pid, name2);
if(!vfork()) {
setuid(uid);
execl("/usr/ucb/Mail", "Mail", "-s",
buf, name1, 0);
exit(1);
}
wait(0);
sentsome++;
}
close(0);
dup(1);
sprintf(buf, "%s/%d", SPOOLDIR, pid);
unlink(buf);
setuid(uid);
if(!(p = getenv("QNOTIFY")))
p = "mail";
if(!strcmp(p, "send") || !strcmp(p,"saml")||!strcmp(p,"soml")) {
sprintf(buf, "exec /usr/lib/sendmail -S -eq %s", name1);
i = 1;
if(n = popen(buf, "w")) {
fprintf(n, "Your batch job %d is finished.\n", pid);
fprintf(n, "\"%s\"\n", name2);
i = pclose(n);
}
}
if(!sentsome &&
(!strcmp(p, "mail") || !strcmp(p, "saml") || (!strcmp(p, "soml") && i))) {
sprintf(buf, "exec /usr/ucb/Mail -s 'batch job' %s",
name1);
if(n = popen(buf, "w")) {
fprintf(n, "Your batch job %d is finished.\n", pid);
fprintf(n, "\"%s\"\n", name2);
pclose(n);
}
}
}
else {
sprintf(buf, "%s/%d", SPOOLDIR, pid);
unlink(buf);
}
}
byebye()
{
if(pid) {
killpg(pid, SIGHUP);
killpg(pid, SIGCONT);
kill(pid, SIGCONT);
killpg(pid, 9);
kill(pid, 9);
}
signal(SIGTERM, byebye);
}
catch()
{
signal(SIGALRM, catch);
}
int running = 1;
pcontrol()
{
int load;
if(maxtime && time(0) - starttime > maxtime)
byebye();
if(minload != maxload) {
load = getload();
if(running) {
if(load >= maxload) {
running = 0;
killpg(pid, SIGSTOP);
}
}
else {
if(load <= minload) {
running = 1;
killpg(pid, SIGCONT);
}
}
}
signal(SIGALRM, pcontrol);
alarm(60);
}
waitrun(q)
char *q;
{
int i, pid;
char buf[BUFSIZ], p[BUFSIZ];
FILE *n;
signal(SIGALRM, catch);
sprintf(p, "%s/%s", SPOOLDIR, q);
pid = getpid();
lock(p);
if(!(n = fopen(p, "a"))) {
unlock(p);
exit(1);
}
fprintf(n, "%d\n", pid);
fclose(n);
unlock(p);
while(1) {
lock(p);
if(!(n = fopen(p, "r"))) {
unlock(p);
exit(1);
}
for(i = 0; i < maxrun; i++) {
fgets(buf, sizeof(buf), n);
buf[strlen(buf)-1] = '\0';
if(atoi(buf) == pid) {
fclose(n);
unlock(p);
return;
}
}
fclose(n);
unlock(p);
alarm(300);
pause();
alarm(0);
}
}