home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Usenet 1994 October
/
usenetsourcesnewsgroupsinfomagicoctober1994disk2.iso
/
misc
/
volume25
/
QBATCH
/
part06
< prev
next >
Wrap
Text File
|
1991-11-04
|
19KB
|
580 lines
Newsgroups: comp.sources.misc
From: alan@tharr.UUCP (Alan Saunders)
Subject: v25i025: QBATCH - a queued batch processing system for UNIX, Part06/06
Message-ID: <1991Nov5.034922.5231@sparky.imd.sterling.com>
X-Md4-Signature: b01840a9111880e2bf66690532ec6ffc
Date: Tue, 5 Nov 1991 03:49:22 GMT
Approved: kent@sparky.imd.sterling.com
Submitted-by: alan@tharr.UUCP (Alan Saunders)
Posting-number: Volume 25, Issue 25
Archive-name: QBATCH/part06
Environment: UNIX
#! /bin/sh
# This is a shell archive. Remove anything before this line, then unpack
# it by saving it into a file and typing "sh file". To overwrite existing
# files, type "sh file -c". You can also feed this as standard input via
# unshar, or by typing "sh <file", e.g.. If this archive is complete, you
# will see the following message at the end:
# "End of archive 6 (of 6)."
# Contents: src/qp.c
# Wrapped by root@vfib_d on Thu Oct 31 16:36:22 1991
PATH=/bin:/usr/bin:/usr/ucb ; export PATH
if test -f 'src/qp.c' -a "${1}" != "-c" ; then
echo shar: Will not clobber existing file \"'src/qp.c'\"
else
echo shar: Extracting \"'src/qp.c'\" \(16216 characters\)
sed "s/^X//" >'src/qp.c' <<'END_OF_FILE'
X/************************************************************************/
X/* */
X/* qp .. queue process. start a process engine for a given queue */
X/* */
X/* usage: qp [-r] qname */
X/* */
X/* Copyright (c) Vita Services 1990 */
X/* (c) Vita Fibres 1990 1991 */
X/* */
X/************************************************************************/
X
X#include "qbatch.h"
X#include <malloc.h>
int fpq = 0,
X fplck = 0,
X fptemp = 0;
X result,
X interrupt = 0;
XFILE *fpin,
X *mon;
struct tm *jtimes;
char exit_message[64];
char envflg[5], envvar[255];
char temp[16],
X *jcl,
X *jcwd = NULL;
int i,
X j,
X k,
X rflag = 0;
pid_t ppid;
extern int had_usersig, lastflags;
int seen_usersig;
extern pid_t childid;
char queue[128];
char buff[128];
char buff1[255];
char path[128];
char qbc[10];
unsigned long q_queued, q_real, q_user, q_system;
char qentry[12];
char *args[10],
X *envs[50],
X *queuename;
X
char * print_time(val)
unsigned long val;
X{
X int temp;
X char sval[8];
X static char tval[32];
X *tval = 0;
X temp = val/360000;
X if (temp)
X {
X sprintf(sval, "%d", temp);
X strcpy (tval, sval);
X strcat(tval, ":");
X val%=360000;
X }
X temp = val/6000;
X if ((temp) || (*tval))
X {
X if (*tval) sprintf(sval, "%02d", temp);
X else sprintf(sval, "%d", temp);
X strcat (tval, sval);
X strcat (tval, ":");
X val%=6000;
X }
X temp = val/100;
X if (*tval) sprintf(sval, "%02d", temp);
X else sprintf(sval, "%d", temp);
X strcat (tval, sval);
X strcat (tval, ".");
X temp = val%100;
X sprintf(sval, "%02d", temp);
X strcat (tval, sval);
X return(tval);
X}
void handle_flags ()
X{
X int oldaction, newaction;
X had_usersig ++;
X fptemp = open (queue, O_RDONLY);
X read (fptemp, &head, sizeof (head)); /* Get header */
X close (fptemp);
X if ((lastflags&qh_action) == (head.qh_flags&qh_action))
X {
X lastflags=head.qh_flags;
X return;
X }
X oldaction = lastflags & qh_action;
X newaction = head.qh_flags & qh_action;
X if ((newaction & qh_kill) != 0)
X {
X if (childid)
X {
X kill_child_group();
X }
X head.qh_flags -= qh_kill;
X }
X if ((newaction & qh_halt) != (oldaction & qh_halt))
X {
X if (childid)
X {
X toggle_halt_status(newaction);
X }
X }
X lastflags = head.qh_flags;
X}
X
main (argc, argv, envp)
int argc;
char *argv[],
X *envp[];
X
X{
X if (getuid() != 0)
X {
X fprintf (stderr, "Must be root to process a queue\n");
X exit (-1);
X }
X
X if (argc < 2)
X {
X fprintf (stderr, "Usage qp <qname>\n");
X qb_term (-1);
X }
X if (strcmp(argv[1], "-v") == 0) q_version();
X queuename = argv[1];
X strcpy (queue, QUEUEPATH);
X strcat (queue, queuename);
X fpq = open (queue, O_RDWR);
X if (fpq == -1)
X {
X perror ("cannot access queue ");
X qb_term (-1);
X }
X qb_setterm();
X q_lock(fpq);
X read (fpq, &head, sizeof (head));
X if (bad_queue()) qb_exit(-1);
X if (head.qh_pid != 0)
X {
X strcpy(buff, QUEUEPATH);
X strcat(buff, ".");
X strcat(buff, queuename);
X strcat(buff, ".lck");
X fplck = open (buff, O_RDWR+O_CREAT, 0644);
X if (fplck == -1)
X {
X fprintf (stderr, "Cannot open queue lockfile!\n");
X qb_exit(-1);
X }
X if (q_islock(fplck))
X {
X /* queue lock is active, let's just check the pid */
X read (fplck, &childid, sizeof(childid));
X if (childid != head.qh_pid)
X {
X fprintf (stderr, "You have a problem!!\n");
X fprintf (stderr, "A lock is held on the queue lock file : %s ", buff);
X fprintf(stderr, "by an activity (%d)\n", childid);
X fprintf (stderr, "Which does not match that of the qp engine in ");
X fprintf (stderr, "the queue header (%d)\n", head.qh_pid);
X qb_exit(-1);
X }
X printf ("%s process activity already running .. pid = %d\n", queuename, head.qh_pid);
X if (head.qh_flags & qh_stop)
X {
X printf ("But stop flag is set .. do you wish to clear it (y/n)?");
X i = getchar ();
X if (i == 'y' || i == 'Y')
X {
X head.qh_flags ^= qh_stop;
X lseek (fpq, 0, SEEK_SET);
X write (fpq, &head, sizeof (head));
X }
X }
X q_unlock(fpq);
X close (fpq);
X fpq = 0;
X qb_resetterm();
X exit (0);
X }
X else q_unlock(fplck);
X *buff = 0;
X close (fplck);
X fplck = 0;
X }
X q_unlock(fpq);
X close (fpq);
X fpq = 0;
X/*
X We are in a position to actually process the queue, so to free the
X controlling vdu, we'll fork into the background, and terminate the
X foreground
X*/
X ppid = fork ();
X if (ppid == -1)
X {
X perror ("Cannot create process activity! ");
X qb_resetterm();
X exit (-1);
X }
X if (ppid > 0)
X {
X printf ("%s process activity running .. pid = %d\n", queuename, ppid);
X qb_resetterm();
X exit (0);
X }
X
X/* Now we're in the background .. we can get to work! */
X
X qb_setterm();
X fpq = open (queue, O_RDWR);
X if (fpq == -1) perror ("can't open queue");
X q_lock(fpq);
X result = read (fpq, &head, sizeof (head));
X if (result == -1) perror ("can't read queue");
X ppid = getpid();
X /* set up system queue lock */
X strcpy(buff, QUEUEPATH);
X strcat(buff, ".");
X strcat(buff, queuename);
X strcat(buff, ".lck");
X fplck = open (buff, O_RDWR + O_CREAT, 0644);
X q_lock(fplck);
X write (fplck, &ppid, sizeof(ppid));
X fsync(fplck);
X head.qh_pid = ppid; /* set up header data .. */
X head.qh_flags &= (qh_enabled+qh_fixed); /* ,..flags */
X head.qh_proc = time ((time_t *) 0); /* set up start time */
X head.qh_queued = 0;
X head.qh_real = 0;
X head.qh_user = 0;
X head.qh_system = 0;
X head.qh_jobcount = 0;
X result = lseek (fpq, 0, SEEK_SET);
X if (result == -1) perror ("can't lseek queue");
X result = write (fpq, &head, sizeof (head)); /* and write it away */
X if (result == -1) perror ("can't write queue");
X q_unlock(fpq);
X result = close (fpq);
X if (result == -1) perror ("can't close queue");
X fpq = 0;
X qb_setuser(handle_flags);
X while (1)
X { /* and enter main loop */
X/*
X don't bother locking here, we're only checking status
X*/
X fpq = open (queue, O_RDWR);
X read (fpq, &head, sizeof (head));
X if (head.qh_noentries == 0)
X { /* nothing to do? */
X while ((head.qh_noentries == 0)
X ||(head.qh_flags&qh_halt != 0))
X { /* wait until there is */
X close (fpq);
X fpq = 0;
X if (! had_usersig) qb_pause(); /* wait for a signal from a support program */
X else had_usersig --;
X if (lastflags & qh_stop) /* stop flag set */
X {
X printf ("%s process activity Stopped .. pid was %d\n", queuename, ppid);
X fpq = open (queue, O_RDWR);
X q_lock(fpq);
X read (fpq, &head, sizeof (head));
X lseek (fpq, 0, SEEK_SET);
X head.qh_pid = 0;
X write (fpq, &head, sizeof (head));
X q_unlock(fpq);
X close (fpq); fpq = 0;
X q_unlock(fplck);
X close(fplck);
X qb_term (0);
X }
X fpq = open (queue, O_RDWR);
X read (fpq, &head, sizeof (head));
X }
X }
X close (fpq);
X fpq = 0;
X/*
X we should only get here if there's a job in the queue
X*/
X fpq = open (queue, O_RDWR);
X q_lock(fpq);
X read (fpq, &head, sizeof (head));
X/*
X just in case a cancel managed to get in while queue unlocked.
X*/
X if (head.qh_noentries == 0)
X {
X q_unlock(fpq);
X close (fpq);
X fpq = 0;
X }
X else
X { /* get entry and fork it */
X read (fpq, &entry, sizeof (entry));
X if (entry.qe_monitor[0] != 0)
X {
X mon = fopen (entry.qe_monitor, "a");
X }
X else
X {
X mon = fopen (head.qh_defmon, "a");
X }
X head.qh_start = time ((time_t *) 0); /* set up start time */
X q_queued = head.qh_start - entry.qe_submitted;
X jtimes = (struct tm *)localtime (&head.qh_start);
X fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
X ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
X fprintf (mon, "## Started %s##\n" ,asctime (jtimes));
X fclose (mon);
X entry.qe_repcount++; /* increment count */
X/*
X set up path to jcl
X*/
X strcpy (path, head.qh_spool);
X if (path[strlen (path) - 1] != '/')
X strcat (path, "/");
X strcat (path, entry.qe_jcl);
X if (access (path, F_OK) == -1)
X {
X perror ("Can't access jcl ");
X if (head.qh_flags &qh_repeat != 0)
X head.qh_flags -= qh_repeat;
X }
X else
X {
X qb_set_timer();
X childid = fork ();
X if (childid == 0)
X {
X/*
X fork returned 0 .. must be the child
X*/
X qb_setpgrp (); /* create new process
X group for child */
X fpin = fopen(path, "r");
X fgets (buff, 128, fpin);
X if ((buff[0] == '#') && (buff[1] == '!') && (buff[2] == 'L'))
X {
X /* job is linked, jcl is actually the file pointed to */
X fclose (fpin);
X strcpy (path, &buff[3]);
X fpin = fopen (path, "r");
X if (fpin == NULL)
X fprintf (stderr, "Cannot access linked jcl\n");
X exit (-1);
X }
X i = 2, j = 0;
X while (1) /* parse strings in interpreter line */
X {
X args[j] = buff+i;
X for (;((buff[i] != ' ') && (buff[i] != '\n')); i++){};
X k = buff[i];
X buff[i] = 0;
X i++;
X j++;
X if (k == '\n') break;
X }
X args[j++] = path;
X args[j] = (char *) NULL;
X strcpy (queue, "QUEUE=");
X strcat (queue, queuename);
X for (j=0; j< 50; j++) envs[j] = (char *) NULL;
X j = 0;
X while (1) /* transfer environment variables (up to 46) */
X {
X fgets (buff1, 255, fpin);
X sscanf (buff1, "%s %s", envflg, envvar);
X if (strcmp (envflg, "#ENV")) break; /* no more */
X envs [j] = malloc(strlen(envvar)+1);
X strcpy (envs[j++], envvar);
X if (j > 45) break; /* can't fit any more in table */
X }
X fclose (fpin);
X envs[j++] = queue;
X sprintf (qbc, "QBC=%u", entry.qe_repcount);
X envs[j++] = qbc;
X sprintf (qentry, "QENTRY=%d", entry.qe_jobno);
X envs[j++] = qentry;
X j = 0;
X jtimes = (struct tm *)localtime (&head.qh_start);
X if (entry.qe_monitor[0] != 0)
X {
X mon = freopen (entry.qe_monitor, "a", stdout);
X mon = freopen (entry.qe_monitor, "a", stderr);
X }
X else
X {
X mon = freopen (head.qh_defmon, "a", stdout);
X mon = freopen (head.qh_defmon, "a", stderr);
X }
X if (setuid (entry.qe_uid) != 0)
X {
X perror ("cannot change uid!!");
X exit (-1);
X }
X setgid (entry.qe_gid);
X nice (head.qh_priority); /* set priority */
X if (execve (args[0], args, envs) == -1)
X {
X perror ("cannot execute job! ");
X if (head.qh_flags &qh_repeat != 0)
X head.qh_flags -= qh_repeat;
X exit (-1);
X }
X }
X if (childid == -1)
X {
X perror ("cannot execute job! ");
X if (head.qh_flags &qh_repeat != 0)
X head.qh_flags -= qh_repeat;
X }
X/*
X set child on track, now update queue and wait for it to finish
X*/
X head.qh_flags &= (qh_enabled+qh_fixed); /* ,..flags */
X entry.qe_status = childid; /* and entry status */
X lseek (fpq, 0, SEEK_SET);
X head.qh_pid = ppid;
X write (fpq, &head, sizeof (head)); /* and write them away */
X write (fpq, &entry, sizeof (entry));
X q_unlock(fpq);
X close (fpq);
X fpq = 0;
X
X q_wait(exit_message);
X qb_get_timer(&q_real, &q_user, &q_system);
X childid = 0;
X if (entry.qe_monitor[0] != 0)
X {
X mon = fopen (entry.qe_monitor, "a");
X }
X else
X {
X mon = fopen (head.qh_defmon, "a");
X }
X head.qh_start = time ((time_t *) 0); /* set up end time */
X jtimes = (struct tm *)localtime (&head.qh_start);
X fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
X ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
X fprintf (mon, "## stopped %s" ,asctime (jtimes));
X fprintf(mon, "## Times:- Queued: %15s", print_time(q_queued));
X fprintf(mon, " Real: %15s\n", print_time(q_real));
X fprintf(mon, "## User: %15s", print_time(q_user));
X fprintf(mon, " System: %15s\n", print_time(q_system));
X if (exit_message != NULL)
X fprintf (mon, "## (%s)\n", exit_message);
X fprintf (mon,"##\n");
X fclose (mon);
X }
X if (entry.qe_notify > 0)
X {
X /* notify user */
X sprintf (buff, "/usr/bin/bv/jobdone %s %d %d %s %s \"%s\"", queuename, entry.qe_jobno, entry.qe_notify, entry.qe_uname, entry.qe_tty, entry.qe_jobname);
X system (buff);
X }
X fpq = open (queue, O_RDWR);
X q_lock(fpq);
X read (fpq, &head, sizeof(head));
X head.qh_start = 0;
X head.qh_queued += (q_queued*100);
X head.qh_real += q_real;
X head.qh_user += q_user;
X head.qh_system += q_system;
X head.qh_jobcount ++;
X if ((head.qh_flags & qh_repeat) == 0)
X {
X head.qh_noentries--;
X/*
X get rid of first entry in queue
X*/
X lseek (fpq, 0, SEEK_SET);
X head.qh_pid = ppid;
X write (fpq, &head, sizeof (head));
X if (head.qh_noentries > 0)
X {
X for (i = 0; i < head.qh_noentries; i++)
X {
X lseek (fpq, (sizeof (head) + ((i + 1) * sizeof (entry))), SEEK_SET);
X read (fpq, &entry, sizeof (entry));
X lseek (fpq, (sizeof (head) + (i * sizeof (entry))), SEEK_SET);
X write (fpq, &entry, sizeof (entry));
X }
X }
X ftruncate (fpq, sizeof (head) + head.qh_noentries * sizeof (entry));
X/*
X get rid of jcl file
X*/
X chmod (path, 0755);
X unlink (path);
X }
X else
X {
X read (fpq, &entry, sizeof(entry));
X lseek (fpq, 0, SEEK_SET);
X head.qh_pid = ppid;
X write (fpq, &head, sizeof (head));
X entry.qe_status = 0;
X write (fpq, &entry, sizeof(entry));
X }
X if (head.qh_flags & qh_stop)
X {
X printf ("%s process activity stopped .. pid was %d\n", queuename, ppid);
X lseek (fpq, 0, SEEK_SET);
X head.qh_pid = 0;
X write (fpq, &head, sizeof (head));
X qb_term (0);
X }
X q_unlock(fpq);
X close (fpq);
X fpq = 0;
X }
X }
X}
END_OF_FILE
if test 16216 -ne `wc -c <'src/qp.c'`; then
echo shar: \"'src/qp.c'\" unpacked with wrong size!
fi
# end of 'src/qp.c'
fi
echo shar: End of archive 6 \(of 6\).
cp /dev/null ark6isdone
MISSING=""
for I in 1 2 3 4 5 6 ; do
if test ! -f ark${I}isdone ; then
MISSING="${MISSING} ${I}"
fi
done
if test "${MISSING}" = "" ; then
echo You have unpacked all 6 archives.
rm -f ark[1-9]isdone
else
echo You still need to unpack the following archives:
echo " " ${MISSING}
fi
## End of shell archive.
exit 0
exit 0 # Just in case...
--
Kent Landfield INTERNET: kent@sparky.IMD.Sterling.COM
Sterling Software, IMD UUCP: uunet!sparky!kent
Phone: (402) 291-8300 FAX: (402) 291-4362
Please send comp.sources.misc-related mail to kent@uunet.uu.net.