home *** CD-ROM | disk | FTP | other *** search
/ Usenet 1994 October / usenetsourcesnewsgroupsinfomagicoctober1994disk2.iso / misc / volume25 / QBATCH / part06 < prev    next >
Text File  |  1991-11-04  |  19KB  |  580 lines

  1. Newsgroups: comp.sources.misc
  2. From: alan@tharr.UUCP (Alan Saunders)
  3. Subject:  v25i025:  QBATCH - a queued batch processing system for UNIX, Part06/06
  4. Message-ID: <1991Nov5.034922.5231@sparky.imd.sterling.com>
  5. X-Md4-Signature: b01840a9111880e2bf66690532ec6ffc
  6. Date: Tue, 5 Nov 1991 03:49:22 GMT
  7. Approved: kent@sparky.imd.sterling.com
  8.  
  9. Submitted-by: alan@tharr.UUCP (Alan Saunders)
  10. Posting-number: Volume 25, Issue 25
  11. Archive-name: QBATCH/part06
  12. Environment: UNIX
  13.  
  14. #! /bin/sh
  15. # This is a shell archive.  Remove anything before this line, then unpack
  16. # it by saving it into a file and typing "sh file".  To overwrite existing
  17. # files, type "sh file -c".  You can also feed this as standard input via
  18. # unshar, or by typing "sh <file", e.g..  If this archive is complete, you
  19. # will see the following message at the end:
  20. #        "End of archive 6 (of 6)."
  21. # Contents:  src/qp.c
  22. # Wrapped by root@vfib_d on Thu Oct 31 16:36:22 1991
  23. PATH=/bin:/usr/bin:/usr/ucb ; export PATH
  24. if test -f 'src/qp.c' -a "${1}" != "-c" ; then 
  25.   echo shar: Will not clobber existing file \"'src/qp.c'\"
  26. else
  27. echo shar: Extracting \"'src/qp.c'\" \(16216 characters\)
  28. sed "s/^X//" >'src/qp.c' <<'END_OF_FILE'
  29. X/************************************************************************/
  30. X/*                                                                      */
  31. X/* qp .. queue process. start a process engine for a given queue        */
  32. X/*                                                                      */
  33. X/* usage: qp [-r] qname                                                 */
  34. X/*                                                                      */
  35. X/*   Copyright (c) Vita Services 1990                                   */
  36. X/*             (c) Vita Fibres   1990 1991                              */
  37. X/*                                                                      */
  38. X/************************************************************************/
  39. X
  40. X#include "qbatch.h"
  41. X#include <malloc.h>
  42. int fpq = 0,
  43. X    fplck = 0,
  44. X    fptemp = 0;
  45. X    result,
  46. X    interrupt = 0;
  47. XFILE *fpin,
  48. X     *mon;
  49. struct tm *jtimes;
  50. char exit_message[64];
  51. char envflg[5], envvar[255];
  52. char temp[16],
  53. X   *jcl,
  54. X   *jcwd = NULL;
  55. int i,
  56. X    j,
  57. X    k,
  58. X    rflag = 0;
  59. pid_t ppid;
  60. extern int had_usersig, lastflags;
  61. int seen_usersig;
  62. extern pid_t childid;
  63. char queue[128];
  64. char buff[128];
  65. char buff1[255];
  66. char path[128];
  67. char qbc[10];
  68. unsigned long q_queued, q_real, q_user, q_system;
  69. char qentry[12];
  70. char *args[10],
  71. X   *envs[50],
  72. X   *queuename;
  73. X
  74. char * print_time(val)
  75. unsigned long val;
  76. X{
  77. X    int temp;
  78. X    char sval[8];
  79. X    static char tval[32];
  80. X    *tval = 0;
  81. X    temp = val/360000;
  82. X    if (temp)
  83. X    {
  84. X        sprintf(sval, "%d", temp);
  85. X        strcpy (tval, sval);
  86. X        strcat(tval, ":");
  87. X        val%=360000;
  88. X    }
  89. X    temp = val/6000;
  90. X    if ((temp) || (*tval))
  91. X    {
  92. X        if (*tval) sprintf(sval, "%02d", temp);
  93. X        else       sprintf(sval, "%d",   temp);
  94. X        strcat     (tval, sval);
  95. X        strcat       (tval, ":");
  96. X        val%=6000;
  97. X    }
  98. X    temp = val/100;
  99. X    if (*tval) sprintf(sval, "%02d", temp);
  100. X    else       sprintf(sval, "%d",   temp);
  101. X    strcat     (tval, sval);
  102. X    strcat       (tval, ".");
  103. X    temp = val%100;
  104. X    sprintf(sval, "%02d", temp);
  105. X    strcat     (tval, sval);
  106. X    return(tval);
  107. X}
  108. void handle_flags ()
  109. X{
  110. X    int oldaction, newaction;
  111. X    had_usersig ++;
  112. X    fptemp = open (queue, O_RDONLY);
  113. X    read (fptemp, &head, sizeof (head));        /* Get header */
  114. X    close (fptemp);
  115. X    if ((lastflags&qh_action) == (head.qh_flags&qh_action))
  116. X    {
  117. X    lastflags=head.qh_flags;
  118. X        return;  
  119. X    }
  120. X    oldaction = lastflags & qh_action;
  121. X    newaction = head.qh_flags & qh_action;
  122. X    if ((newaction & qh_kill) != 0)
  123. X    {
  124. X        if (childid)
  125. X        {
  126. X            kill_child_group();
  127. X        }
  128. X        head.qh_flags -= qh_kill;
  129. X    }
  130. X    if ((newaction & qh_halt) != (oldaction & qh_halt))
  131. X    {
  132. X        if (childid)
  133. X        {
  134. X            toggle_halt_status(newaction);
  135. X        }
  136. X    }
  137. X    lastflags = head.qh_flags;
  138. X}
  139. X
  140. main (argc, argv, envp)
  141. int argc;
  142. char *argv[],
  143. X   *envp[];
  144. X
  145. X{
  146. X    if (getuid() != 0)
  147. X    {
  148. X        fprintf (stderr, "Must be root to process a queue\n");
  149. X        exit (-1);
  150. X    }
  151. X
  152. X    if (argc < 2)
  153. X    {
  154. X        fprintf (stderr, "Usage qp <qname>\n");
  155. X        qb_term (-1);
  156. X    }
  157. X    if (strcmp(argv[1], "-v") == 0) q_version();
  158. X    queuename = argv[1];
  159. X    strcpy (queue, QUEUEPATH);
  160. X    strcat (queue, queuename);
  161. X    fpq = open (queue, O_RDWR);
  162. X    if (fpq == -1)
  163. X    {
  164. X        perror ("cannot access queue ");
  165. X        qb_term (-1);
  166. X    }
  167. X    qb_setterm();
  168. X    q_lock(fpq);
  169. X    read (fpq, &head, sizeof (head));
  170. X    if (bad_queue()) qb_exit(-1);
  171. X    if (head.qh_pid != 0)
  172. X    {
  173. X    strcpy(buff, QUEUEPATH);
  174. X    strcat(buff, ".");
  175. X    strcat(buff, queuename);
  176. X    strcat(buff, ".lck");
  177. X    fplck = open (buff, O_RDWR+O_CREAT, 0644);
  178. X    if (fplck == -1)
  179. X    {
  180. X        fprintf (stderr, "Cannot open queue lockfile!\n");
  181. X        qb_exit(-1);
  182. X    }
  183. X    if (q_islock(fplck))
  184. X    {
  185. X    /* queue lock is active, let's just check the pid */
  186. X        read (fplck, &childid, sizeof(childid));
  187. X        if (childid != head.qh_pid)
  188. X        {
  189. X        fprintf (stderr, "You have a problem!!\n");
  190. X        fprintf (stderr, "A lock is held on the queue lock file : %s ", buff);
  191. X        fprintf(stderr, "by an activity (%d)\n", childid);
  192. X        fprintf (stderr, "Which does not match that of the qp engine in ");
  193. X        fprintf (stderr, "the queue header (%d)\n", head.qh_pid);
  194. X        qb_exit(-1);
  195. X        }
  196. X        printf ("%s process activity already running .. pid = %d\n", queuename, head.qh_pid);
  197. X        if (head.qh_flags & qh_stop)
  198. X        {
  199. X        printf ("But stop flag is set .. do you wish to clear it (y/n)?");
  200. X        i = getchar ();
  201. X        if (i == 'y' || i == 'Y')
  202. X        {
  203. X        head.qh_flags ^= qh_stop;
  204. X        lseek (fpq, 0, SEEK_SET);
  205. X        write (fpq, &head, sizeof (head));
  206. X        }
  207. X        }
  208. X        q_unlock(fpq);
  209. X        close (fpq);
  210. X        fpq = 0;
  211. X        qb_resetterm();
  212. X        exit (0);
  213. X    }
  214. X    else q_unlock(fplck);
  215. X    *buff = 0;
  216. X    close (fplck);
  217. X    fplck = 0;
  218. X    }
  219. X    q_unlock(fpq);
  220. X    close (fpq);
  221. X    fpq = 0;
  222. X/*
  223. X    We are in a position to actually process the queue, so to free the
  224. X    controlling vdu, we'll fork into the background, and terminate the
  225. X    foreground
  226. X*/
  227. X    ppid = fork ();
  228. X    if (ppid == -1)
  229. X    {
  230. X        perror ("Cannot create process activity! ");
  231. X        qb_resetterm();
  232. X        exit (-1);
  233. X    }
  234. X    if (ppid > 0)
  235. X    {
  236. X        printf ("%s process activity running .. pid = %d\n", queuename, ppid);
  237. X        qb_resetterm();
  238. X        exit (0);
  239. X    }
  240. X
  241. X/* Now we're in the background .. we can get to work! */
  242. X
  243. X    qb_setterm();
  244. X    fpq = open (queue, O_RDWR);
  245. X    if (fpq == -1)  perror ("can't open queue");
  246. X    q_lock(fpq);
  247. X    result = read (fpq, &head, sizeof (head));
  248. X    if (result == -1) perror ("can't read queue");
  249. X    ppid = getpid();
  250. X    /* set up system queue lock */
  251. X    strcpy(buff, QUEUEPATH);
  252. X    strcat(buff, ".");
  253. X    strcat(buff, queuename);
  254. X    strcat(buff, ".lck");
  255. X    fplck = open (buff, O_RDWR + O_CREAT, 0644);
  256. X    q_lock(fplck);
  257. X    write (fplck, &ppid, sizeof(ppid));
  258. X    fsync(fplck);
  259. X    head.qh_pid = ppid; /* set up header data .. */
  260. X    head.qh_flags &= (qh_enabled+qh_fixed);     /* ,..flags */
  261. X    head.qh_proc = time ((time_t *) 0);        /* set up start time */
  262. X    head.qh_queued = 0;
  263. X    head.qh_real = 0;
  264. X    head.qh_user = 0;
  265. X    head.qh_system = 0;
  266. X    head.qh_jobcount = 0;
  267. X    result = lseek (fpq, 0, SEEK_SET);
  268. X    if (result == -1) perror ("can't lseek queue");
  269. X    result = write (fpq, &head, sizeof (head));  /* and write it away */
  270. X    if (result == -1) perror ("can't write queue");
  271. X    q_unlock(fpq);
  272. X    result = close (fpq);
  273. X    if (result == -1) perror ("can't close queue");
  274. X    fpq = 0;
  275. X    qb_setuser(handle_flags);
  276. X    while (1)
  277. X    {   /* and enter main loop */
  278. X/*
  279. X    don't bother locking here, we're only checking status
  280. X*/
  281. X        fpq = open (queue, O_RDWR);
  282. X        read (fpq, &head, sizeof (head));
  283. X        if (head.qh_noentries == 0)
  284. X        {       /* nothing to do? */
  285. X            while ((head.qh_noentries == 0)
  286. X         ||(head.qh_flags&qh_halt != 0))
  287. X            {   /* wait until there is */
  288. X                close (fpq);
  289. X                fpq = 0;
  290. X                if (! had_usersig) qb_pause();   /* wait for a signal from a support program */
  291. X        else had_usersig --;
  292. X                if (lastflags & qh_stop) /* stop flag set */
  293. X                {
  294. X                    printf ("%s process activity Stopped .. pid was %d\n", queuename, ppid);
  295. X                fpq = open (queue, O_RDWR);
  296. X                q_lock(fpq);
  297. X                read (fpq, &head, sizeof (head));
  298. X                    lseek (fpq, 0, SEEK_SET);
  299. X                 head.qh_pid = 0;
  300. X                    write (fpq, &head, sizeof (head));
  301. X            q_unlock(fpq);
  302. X            close (fpq); fpq = 0;
  303. X            q_unlock(fplck);
  304. X            close(fplck);
  305. X                    qb_term (0);
  306. X                }
  307. X                fpq = open (queue, O_RDWR);
  308. X                read (fpq, &head, sizeof (head));
  309. X            }
  310. X        }
  311. X        close (fpq);
  312. X        fpq = 0;
  313. X/*
  314. X        we should only get here if there's a job in the queue
  315. X*/
  316. X        fpq = open (queue, O_RDWR);
  317. X        q_lock(fpq);
  318. X        read (fpq, &head, sizeof (head));
  319. X/*
  320. X        just in case a cancel managed to get in while queue unlocked.
  321. X*/
  322. X        if (head.qh_noentries == 0)
  323. X        {
  324. X        q_unlock(fpq);
  325. X            close (fpq);
  326. X            fpq = 0;
  327. X        }
  328. X        else
  329. X        {       /* get entry and fork it */
  330. X            read (fpq, &entry, sizeof (entry));
  331. X            if (entry.qe_monitor[0] != 0)
  332. X            {
  333. X                mon = fopen (entry.qe_monitor, "a");
  334. X            }
  335. X            else
  336. X            {
  337. X                mon = fopen (head.qh_defmon, "a");
  338. X            }
  339. X            head.qh_start = time ((time_t *) 0);        /* set up start time */
  340. X        q_queued = head.qh_start - entry.qe_submitted;
  341. X            jtimes = (struct tm *)localtime (&head.qh_start);
  342. X        fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
  343. X                    ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
  344. X        fprintf (mon, "## Started %s##\n" ,asctime (jtimes));
  345. X            fclose (mon);
  346. X            entry.qe_repcount++;        /* increment count */
  347. X/*
  348. X             set up path to jcl
  349. X*/
  350. X            strcpy (path, head.qh_spool);
  351. X            if (path[strlen (path) - 1] != '/')
  352. X                strcat (path, "/");
  353. X            strcat (path, entry.qe_jcl);
  354. X            if (access (path, F_OK) == -1)
  355. X            {
  356. X                perror ("Can't access jcl ");
  357. X                if (head.qh_flags &qh_repeat != 0)
  358. X                    head.qh_flags -= qh_repeat;
  359. X            }
  360. X            else
  361. X            {
  362. X        qb_set_timer();
  363. X                childid = fork ();
  364. X                if (childid == 0)
  365. X                {
  366. X/*
  367. X                     fork returned 0 .. must be the child
  368. X*/
  369. X                    qb_setpgrp ();     /* create new process
  370. X                                   group for child */
  371. X            fpin = fopen(path, "r");
  372. X            fgets (buff, 128, fpin);
  373. X            if ((buff[0] == '#') && (buff[1] == '!') && (buff[2] == 'L'))
  374. X            {
  375. X            /* job is linked, jcl is actually the file pointed to */
  376. X            fclose (fpin);
  377. X            strcpy (path, &buff[3]);
  378. X            fpin = fopen (path, "r");
  379. X            if (fpin == NULL)
  380. X            fprintf (stderr, "Cannot access linked jcl\n");
  381. X            exit (-1);
  382. X            }
  383. X            i = 2, j = 0;
  384. X            while (1) /* parse strings in interpreter line */
  385. X            {
  386. X            args[j] = buff+i;
  387. X            for (;((buff[i] != ' ') && (buff[i] != '\n')); i++){};
  388. X            k = buff[i];
  389. X            buff[i] = 0;
  390. X            i++;
  391. X            j++;
  392. X            if (k == '\n') break;
  393. X            }
  394. X            args[j++] = path;
  395. X                    args[j] = (char *) NULL;
  396. X                    strcpy (queue, "QUEUE=");
  397. X                    strcat (queue, queuename);
  398. X            for (j=0; j< 50; j++) envs[j] = (char *) NULL;
  399. X            j = 0;
  400. X            while (1) /* transfer environment variables (up to 46) */
  401. X                {
  402. X                fgets (buff1, 255, fpin);
  403. X                sscanf (buff1, "%s %s", envflg, envvar);
  404. X                if (strcmp (envflg, "#ENV")) break; /* no more */
  405. X                envs [j] = malloc(strlen(envvar)+1);
  406. X                strcpy (envs[j++], envvar);
  407. X                if (j > 45) break; /* can't fit any more in table */
  408. X                }
  409. X            fclose (fpin);
  410. X                    envs[j++] = queue;
  411. X                    sprintf (qbc, "QBC=%u", entry.qe_repcount);
  412. X                    envs[j++] = qbc;
  413. X                    sprintf (qentry, "QENTRY=%d", entry.qe_jobno);
  414. X                    envs[j++] = qentry;
  415. X            j = 0;
  416. X                    jtimes = (struct tm *)localtime (&head.qh_start);
  417. X                    if (entry.qe_monitor[0] != 0)
  418. X                    {
  419. X                        mon = freopen (entry.qe_monitor, "a", stdout);
  420. X                        mon = freopen (entry.qe_monitor, "a", stderr);
  421. X                    }
  422. X                    else
  423. X                    {
  424. X                        mon = freopen (head.qh_defmon, "a", stdout);
  425. X                        mon = freopen (head.qh_defmon, "a", stderr);
  426. X                    }
  427. X                    if (setuid (entry.qe_uid) != 0)
  428. X            {
  429. X            perror ("cannot change uid!!");
  430. X            exit (-1);
  431. X            }
  432. X                    setgid (entry.qe_gid);
  433. X                    nice (head.qh_priority);    /* set priority */
  434. X                    if (execve (args[0], args, envs) == -1)
  435. X                    {
  436. X                        perror ("cannot execute job! ");
  437. X                        if (head.qh_flags &qh_repeat != 0)
  438. X                            head.qh_flags -= qh_repeat;
  439. X            exit (-1);
  440. X                    }
  441. X                }
  442. X                if (childid == -1)
  443. X                {
  444. X                    perror ("cannot execute job! ");
  445. X                    if (head.qh_flags &qh_repeat != 0)
  446. X                        head.qh_flags -= qh_repeat;
  447. X                }
  448. X/*
  449. X                 set child on track, now update queue and wait for it to finish
  450. X*/
  451. X            head.qh_flags &= (qh_enabled+qh_fixed);     /* ,..flags */
  452. X                entry.qe_status = childid;  /* and entry status */
  453. X                lseek (fpq, 0, SEEK_SET);
  454. X                head.qh_pid = ppid;
  455. X                write (fpq, &head, sizeof (head));   /* and write them away */
  456. X                write (fpq, &entry, sizeof (entry));
  457. X        q_unlock(fpq);
  458. X                close (fpq);
  459. X                fpq = 0;
  460. X
  461. X        q_wait(exit_message);
  462. X        qb_get_timer(&q_real, &q_user, &q_system);
  463. X                childid = 0;
  464. X                if (entry.qe_monitor[0] != 0)
  465. X                {
  466. X                    mon = fopen (entry.qe_monitor, "a");
  467. X                }
  468. X                else
  469. X                {
  470. X                    mon = fopen (head.qh_defmon, "a");
  471. X                }
  472. X                head.qh_start = time ((time_t *) 0);        /* set up end time */
  473. X                jtimes = (struct tm *)localtime (&head.qh_start);
  474. X            fprintf (mon, "##\n## Queue %s entry %d (uid = %d : %s) Job: %s\n"
  475. X                        ,queuename, entry.qe_jobno, entry.qe_uid, entry.qe_uname, entry.qe_jobname);
  476. X            fprintf (mon, "## stopped %s" ,asctime (jtimes));
  477. X        fprintf(mon, "## Times:- Queued: %15s", print_time(q_queued));
  478. X        fprintf(mon, "   Real: %15s\n", print_time(q_real));
  479. X        fprintf(mon, "##           User: %15s", print_time(q_user));
  480. X        fprintf(mon, " System: %15s\n", print_time(q_system));
  481. X        if (exit_message != NULL)
  482. X            fprintf (mon, "##  (%s)\n", exit_message);
  483. X        fprintf (mon,"##\n");
  484. X                fclose (mon);
  485. X            }
  486. X            if (entry.qe_notify > 0)
  487. X            {
  488. X                /* notify user */
  489. X                sprintf (buff, "/usr/bin/bv/jobdone %s %d %d %s %s \"%s\"", queuename, entry.qe_jobno, entry.qe_notify, entry.qe_uname, entry.qe_tty, entry.qe_jobname);
  490. X                system (buff);
  491. X            }
  492. X            fpq = open (queue, O_RDWR);
  493. X        q_lock(fpq);
  494. X            read (fpq, &head, sizeof(head));
  495. X            head.qh_start = 0;
  496. X        head.qh_queued += (q_queued*100);
  497. X        head.qh_real += q_real;
  498. X        head.qh_user += q_user;
  499. X        head.qh_system += q_system;
  500. X        head.qh_jobcount ++;
  501. X            if ((head.qh_flags & qh_repeat) == 0)
  502. X            {
  503. X                head.qh_noentries--;
  504. X/*  
  505. X                get rid of first entry in queue
  506. X*/  
  507. X                lseek (fpq, 0, SEEK_SET);
  508. X                head.qh_pid = ppid;
  509. X                write (fpq, &head, sizeof (head));
  510. X                if (head.qh_noentries > 0)
  511. X                {
  512. X                    for (i = 0; i < head.qh_noentries; i++)
  513. X                    {
  514. X                        lseek (fpq, (sizeof (head) + ((i + 1) * sizeof (entry))), SEEK_SET);
  515. X                        read (fpq, &entry, sizeof (entry));
  516. X                        lseek (fpq, (sizeof (head) + (i * sizeof (entry))), SEEK_SET);
  517. X                        write (fpq, &entry, sizeof (entry));
  518. X                    }
  519. X                }
  520. X                ftruncate (fpq, sizeof (head) + head.qh_noentries * sizeof (entry));
  521. X/*
  522. X                get rid of jcl file
  523. X*/
  524. X        chmod (path, 0755);
  525. X                unlink (path);
  526. X            }
  527. X            else
  528. X            {
  529. X                read (fpq, &entry, sizeof(entry));
  530. X                lseek (fpq, 0, SEEK_SET);
  531. X                head.qh_pid = ppid;
  532. X                write (fpq, &head, sizeof (head));
  533. X                entry.qe_status = 0;
  534. X                write (fpq, &entry, sizeof(entry));
  535. X            }
  536. X            if (head.qh_flags & qh_stop)
  537. X            {
  538. X                printf ("%s process activity stopped .. pid was %d\n", queuename, ppid);
  539. X                lseek (fpq, 0, SEEK_SET);
  540. X        head.qh_pid = 0;
  541. X                write (fpq, &head, sizeof (head));
  542. X                qb_term (0);
  543. X            }
  544. X        q_unlock(fpq);
  545. X            close (fpq);
  546. X            fpq = 0;
  547. X        }
  548. X    }
  549. X}
  550. END_OF_FILE
  551. if test 16216 -ne `wc -c <'src/qp.c'`; then
  552.     echo shar: \"'src/qp.c'\" unpacked with wrong size!
  553. fi
  554. # end of 'src/qp.c'
  555. fi
  556. echo shar: End of archive 6 \(of 6\).
  557. cp /dev/null ark6isdone
  558. MISSING=""
  559. for I in 1 2 3 4 5 6 ; do
  560.     if test ! -f ark${I}isdone ; then
  561.     MISSING="${MISSING} ${I}"
  562.     fi
  563. done
  564. if test "${MISSING}" = "" ; then
  565.     echo You have unpacked all 6 archives.
  566.     rm -f ark[1-9]isdone
  567. else
  568.     echo You still need to unpack the following archives:
  569.     echo "        " ${MISSING}
  570. fi
  571. ##  End of shell archive.
  572. exit 0
  573.  
  574. exit 0 # Just in case...
  575. -- 
  576. Kent Landfield                   INTERNET: kent@sparky.IMD.Sterling.COM
  577. Sterling Software, IMD           UUCP:     uunet!sparky!kent
  578. Phone:    (402) 291-8300         FAX:      (402) 291-4362
  579. Please send comp.sources.misc-related mail to kent@uunet.uu.net.
  580.