home *** CD-ROM | disk | FTP | other *** search
/ DP Tool Club 9 / CD_ASCQ_09_1193.iso / news / 4441 / mpegcode / src / parallel.c < prev    next >
C/C++ Source or Header  |  1993-09-27  |  38KB  |  1,400 lines

  1. /*===========================================================================*
  2.  * parallel.c                                         *
  3.  *                                         *
  4.  *    Procedures to make encoder run in parallel                 *
  5.  *                                         *
  6.  * EXPORTED PROCEDURES:                                 *
  7.  *    StartInputServer                             *
  8.  *    StartOutputServer                             *
  9.  *    SendRemoteFrame                                 *
  10.  *    GetRemoteFrame                                 *
  11.  *    StartParallelServer                             *
  12.  *    NotifyMasterDone                             *
  13.  *                                         *
  14.  *===========================================================================*/
  15.  
  16. /*
  17.  * Copyright (c) 1993 The Regents of the University of California.
  18.  * All rights reserved.
  19.  *
  20.  * Permission to use, copy, modify, and distribute this software and its
  21.  * documentation for any purpose, without fee, and without written agreement is
  22.  * hereby granted, provided that the above copyright notice and the following
  23.  * two paragraphs appear in all copies of this software.
  24.  *
  25.  * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
  26.  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
  27.  * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
  28.  * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  29.  *
  30.  * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
  31.  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
  32.  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
  33.  * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
  34.  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
  35.  */
  36.  
  37. /*  
  38.  *  $Header: /n/picasso/users/keving/encode/src/RCS/parallel.c,v 1.2 1993/07/22 22:23:43 keving Exp keving $
  39.  *  $Log: parallel.c,v $
  40.  * Revision 1.2  1993/07/22  22:23:43  keving
  41.  * nothing
  42.  *
  43.  * Revision 1.1  1993/06/30  20:06:09  keving
  44.  * nothing
  45.  *
  46.  */
  47.  
  48.  
  49. /*==============*
  50.  * HEADER FILES *
  51.  *==============*/
  52.  
  53. #include <sys/types.h>
  54. #include <sys/socket.h>
  55. #include <sys/times.h>
  56. #include <netinet/in.h>
  57. #include <netdb.h>
  58. #include <errno.h>
  59. #include "all.h"
  60. #include "param.h"
  61. #include "mpeg.h"
  62. #include "prototypes.h"
  63. #include "parallel.h"
  64. #include "readframe.h"
  65. #include "fsize.h"
  66. #include "combine.h"
  67.  
  68.  
  69. /*==================*
  70.  * STATIC VARIABLES *
  71.  *==================*/
  72.  
  73. static int32   diffTime;
  74. static boolean    separateConversion;
  75. static char    rsh[256];
  76. static struct hostent *hostEntry = NULL;
  77. static boolean    *frameDone;
  78. static int    outputServerSocket;
  79.  
  80.  
  81. /*==================*
  82.  * GLOBAL VARIABLES *
  83.  *==================*/
  84.  
  85. extern int yuvHeight, yuvWidth;
  86. extern    time_t  timeStart, timeEnd;
  87. extern char    statFileName[256];
  88. extern FILE *statFile;
  89. int parallelTestFrames = 10;
  90. int parallelTimeChunks = 60;
  91. char *IOhostName;
  92. int IOportNumber;
  93. int outputPortNumber;
  94. boolean    niceProcesses;
  95.  
  96.  
  97. /*===============================*
  98.  * INTERNAL PROCEDURE prototypes *
  99.  *===============================*/
  100.  
  101. static void    TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
  102.                            int IOportNum));
  103. static void    EndIOServer _ANSI_ARGS_((void));
  104. static int SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
  105.  
  106.  
  107. /*=====================*
  108.  * EXPORTED PROCEDURES *
  109.  *=====================*/
  110.  
  111.             /*=================*
  112.              * IO SERVER STUFF *
  113.              *=================*/
  114.  
  115.  
  116. /*===========================================================================*
  117.  *
  118.  * SetIOConvert
  119.  *
  120.  *    sets the IO conversion to be separate or not.  If separate, then
  121.  *    some post-processing is done at slave end
  122.  *
  123.  * RETURNS:    nothing
  124.  *
  125.  * SIDE EFFECTS:    none
  126.  *
  127.  *===========================================================================*/
  128. void
  129. SetIOConvert(separate)
  130.     boolean separate;
  131. {
  132.     separateConversion = separate;
  133. }
  134.  
  135.  
  136. /*===========================================================================*
  137.  *
  138.  * SetRemoteShell
  139.  *
  140.  *    sets the remote shell program (usually rsh, but different on some
  141.  *    machines)
  142.  *
  143.  * RETURNS:    nothing
  144.  *
  145.  * SIDE EFFECTS:    none
  146.  *
  147.  *===========================================================================*/
  148. void
  149. SetRemoteShell(shell)
  150.     char *shell;
  151. {
  152.     strcpy(rsh, shell);
  153. }
  154.  
  155.  
  156. /*===========================================================================*
  157.  *
  158.  * StartInputServer
  159.  *
  160.  *    start-up the InputServer with this process
  161.  *    handles slave requests for frames, and exits when master tells it to
  162.  *
  163.  * RETURNS:    nothing
  164.  *
  165.  * SIDE EFFECTS:    none
  166.  *
  167.  *===========================================================================*/
  168. void
  169. StartInputServer(numInputFiles, parallelHostName, portNum)
  170.     int numInputFiles;
  171.     char *parallelHostName;
  172.     int portNum;
  173. {
  174.     char    *hostName;
  175.     int        IOportNum;
  176.     int        serverSocket;
  177.     struct sockaddr_in    nameEntry;
  178.     u_short tempShort;
  179.     int        result;
  180.     int        otherSock, otherSize;
  181.     struct sockaddr otherSocket;
  182.     int32   buffer[8];
  183.     boolean    done = FALSE;
  184.     int        frameNumber;
  185.     MpegFrame *frame;
  186.     register int y;
  187.     int        numBytes;
  188.     unsigned char   *bigBuffer;
  189.     unsigned char   smallBuffer[1000];
  190.     int        bigBufferSize;
  191.     FILE    *filePtr;
  192.     uint32  data;
  193.     char    inputFileName[1024];
  194.  
  195.     bigBufferSize = 0;
  196.     bigBuffer = NULL;
  197.  
  198. /* once we get IO port num, should transmit it to parallel server */
  199.  
  200.     /* create a server socket */
  201.     hostName = getenv("HOST");
  202.  
  203.     if ( hostName == NULL ) {
  204.     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
  205.     exit(1);
  206.     }
  207.  
  208.     if ( hostEntry == NULL ) {
  209.     hostEntry = gethostbyname(hostName);
  210.     if ( hostEntry == NULL ) {
  211.         fprintf(stderr, "ERROR:  Could not find host %s in database\n",
  212.             hostName);
  213.         exit(1);
  214.     }
  215.     }
  216.  
  217.     serverSocket = socket(AF_INET, SOCK_STREAM, 0);
  218.     if ( serverSocket == -1 ) {
  219.     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
  220.     exit(1);
  221.     }
  222.  
  223.     nameEntry.sin_family = AF_INET;
  224. #ifdef BLEAH
  225.     bzero((char *)nameEntry.sin_zero, 8);
  226.     bcopy((char *) hostEntry->h_addr_list[0],
  227.       (char *) &(nameEntry.sin_addr.s_addr),
  228.       (size_t) hostEntry->h_length);
  229. #endif
  230.     bzero((char *) &nameEntry, sizeof(nameEntry));
  231.  
  232.     /* find a port number that isn't used */
  233.     IOportNum = 2048;
  234.     do {
  235.     IOportNum++;
  236.     tempShort = IOportNum;
  237.     nameEntry.sin_port = htons(tempShort);
  238.     result = bind(serverSocket, (struct sockaddr *) &nameEntry,
  239.               sizeof(struct sockaddr));
  240.     }
  241.     while ( result == -1 );
  242.  
  243.     /* would really like to wait for 1+numMachines machines, but this is max
  244.      * allowable, unfortunately
  245.      */
  246.     result = listen(serverSocket, SOMAXCONN);
  247.  
  248.     if ( result == -1 ) {
  249.     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
  250.     exit(1);
  251.     }
  252.  
  253.     fprintf(stdout, "====I/O USING PORT %d\n", IOportNum);
  254.  
  255.     TransmitPortNum(parallelHostName, portNum, IOportNum);
  256.  
  257.     otherSize = sizeof(otherSocket);
  258.  
  259.     SetFileType();    /* for reading */
  260.  
  261.     /* now, wait until get done signal */
  262.     while ( ! done ) {
  263.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  264.     if ( otherSock == -1 ) {
  265.         fprintf(stderr, "ERROR:  Input SERVER accept returned error %d\n", errno);
  266.         exit(1);
  267.     }
  268.  
  269.     result = SafeRead(otherSock, (char *)buffer, 4);
  270.     frameNumber = ntohl(buffer[0]);
  271.  
  272.     if ( frameNumber == -1 ) {
  273.         done = TRUE;
  274.     } else {
  275.  
  276.  
  277. #ifdef BLEAH
  278. fprintf(stdout, "Input SERVER GETTING FRAME %d\n", frameNumber);
  279. fflush(stdout);
  280. #endif
  281.  
  282.         /* should read in frame, then write to socket */
  283.         frame = Frame_New(frameNumber, 'i');
  284.  
  285.         if ( separateConversion ) {
  286.         GetNthInputFileName(inputFileName, frameNumber);
  287.  
  288.         /* do conversion and send right to the socket */
  289.         filePtr = ReadIOConvert(inputFileName);
  290.             do {
  291.             numBytes = fread(smallBuffer, 1, 1000, filePtr);
  292.  
  293.             if ( numBytes > 0 ) {
  294.                 data = numBytes;
  295.                 data = htonl(data);
  296.                 write(otherSock, &data, 4);
  297.                 write(otherSock, smallBuffer, numBytes);
  298.             }
  299.             }
  300.             while ( numBytes == 1000 );
  301.  
  302.         if ( strcmp(ioConversion, "*") == 0 ) {
  303.             fclose(filePtr);
  304.         } else {
  305.             pclose(filePtr);
  306.         }
  307.         } else {
  308.         GetNthInputFileName(inputFileName, frameNumber);
  309.         ReadFrame(frame, inputFileName, inputConversion, TRUE);
  310.  
  311.         /* should now transmit yuv values */
  312.         for (y = 0; y < yuvHeight; y++) {            /* Y */
  313.             write(otherSock, frame->orig_y[y], yuvWidth);
  314.         }
  315.  
  316.         for (y = 0; y < yuvHeight / 2; y++) {            /* U */
  317.             write(otherSock, frame->orig_cb[y], yuvWidth / 2);
  318.         }
  319.  
  320.         for (y = 0; y < yuvHeight / 2; y++) {            /* V */
  321.             write(otherSock, frame->orig_cr[y], yuvWidth / 2);
  322.         }
  323.  
  324. /* now, make sure we don't leave until other processor read everything */
  325.  
  326.         result = SafeRead(otherSock, (char *)buffer, 4);
  327.         /* should = 0 */
  328.         }
  329.  
  330. #ifdef BLEAH
  331.         fprintf(stdout, "====Input SERVER:  READ FRAME %d\n",
  332.             frameNumber);
  333. #endif
  334.  
  335.         Frame_Free(frame);
  336.     }
  337.  
  338.     close(otherSock);
  339.     }
  340.  
  341.     close(serverSocket);
  342.  
  343.     fprintf(stdout, "====Input SERVER:  Shutting Down\n");
  344. }
  345.  
  346.  
  347. /*===========================================================================*
  348.  *
  349.  * SendRemoteFrame
  350.  *
  351.  *    called by a slave to the Output server; sends an encoded frame
  352.  *    to the server to be sent to disk
  353.  *
  354.  * RETURNS:    nothing
  355.  *
  356.  * SIDE EFFECTS:    none
  357.  *
  358.  *===========================================================================*/
  359. void
  360. SendRemoteFrame(frameNumber, bb)
  361.     int frameNumber;
  362.     BitBucket *bb;
  363. {
  364.     int    clientSocket;
  365.     struct sockaddr_in  nameEntry;
  366.     u_short        tempShort;
  367.     int        result;
  368.     u_long  data;
  369.  
  370.     if ( hostEntry == NULL ) {
  371.     hostEntry = gethostbyname(IOhostName);
  372.     if ( hostEntry == NULL ) {
  373.         fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
  374.             IOhostName);
  375.         exit(1);
  376.     }
  377.     }
  378.  
  379.     clientSocket = socket(AF_INET, SOCK_STREAM, 0);
  380.     if ( clientSocket == -1 ) {
  381.     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
  382.     exit(1);
  383.     }
  384.  
  385.     nameEntry.sin_family = AF_INET;
  386.     bzero((char *)nameEntry.sin_zero, 8);
  387.     bcopy((char *) hostEntry->h_addr_list[0],
  388.       (char *) &(nameEntry.sin_addr.s_addr),
  389.       (size_t) hostEntry->h_length);
  390.     tempShort = outputPortNumber;
  391.     nameEntry.sin_port = htons(tempShort);
  392.  
  393.     result = connect(clientSocket, (struct sockaddr *) &nameEntry,
  394.              sizeof(struct sockaddr));
  395.     if ( result == -1 ) {
  396.     fprintf(stderr, "ERROR:  connect (SendRemoteFrame, port %d) returned error %d\n",
  397.         outputPortNumber, errno);
  398.     exit(1);
  399.     }
  400.  
  401.     data = htonl(frameNumber);
  402.     if ( write(clientSocket, &data, 4) != 4 ) {
  403.     fprintf(stderr, "ERROR:  write (SendRemoteFrame) of frame number\n");
  404.     exit(1);
  405.     }
  406.  
  407.     if ( frameNumber != -1 ) {
  408.     /* send number of bytes */
  409.     data = (bb->totalbits+7)/8;
  410.     data = htonl(data);
  411.     if ( write(clientSocket, &data, 4) != 4 ) {
  412.         fprintf(stderr, "ERROR:  write (SendRemoteFrame) of #bytes\n");
  413.         exit(1);
  414.     }
  415.  
  416.     /* now send the bytes themselves */
  417.     Bitio_WriteToSocket(bb, clientSocket);
  418.     }
  419.  
  420.     close(clientSocket);
  421. }
  422.  
  423.  
  424.  
  425.  
  426. /*===========================================================================*
  427.  *
  428.  * NoteFrameDone
  429.  *
  430.  *    called by a slave to the Output server; tells it these frames are
  431.  *    done
  432.  *
  433.  * RETURNS:    nothing
  434.  *
  435.  * SIDE EFFECTS:    none
  436.  *
  437.  *===========================================================================*/
  438. void
  439. NoteFrameDone(frameStart, frameEnd)
  440.     int frameStart;
  441.     int frameEnd;
  442. {
  443.     int    clientSocket;
  444.     struct sockaddr_in  nameEntry;
  445.     u_short        tempShort;
  446.     int        result;
  447.     u_long  data;
  448.     int        negativeTwo = -2;
  449.  
  450.     if ( hostEntry == NULL ) {
  451.     hostEntry = gethostbyname(IOhostName);
  452.     if ( hostEntry == NULL ) {
  453.         fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
  454.             IOhostName);
  455.         exit(1);
  456.     }
  457.     }
  458.  
  459.     clientSocket = socket(AF_INET, SOCK_STREAM, 0);
  460.     if ( clientSocket == -1 ) {
  461.     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
  462.     exit(1);
  463.     }
  464.  
  465.     nameEntry.sin_family = AF_INET;
  466.     bzero((char *)nameEntry.sin_zero, 8);
  467.     bcopy((char *) hostEntry->h_addr_list[0],
  468.       (char *) &(nameEntry.sin_addr.s_addr),
  469.       (size_t) hostEntry->h_length);
  470.     tempShort = outputPortNumber;
  471.     nameEntry.sin_port = htons(tempShort);
  472.  
  473.     result = connect(clientSocket, (struct sockaddr *) &nameEntry,
  474.              sizeof(struct sockaddr));
  475.     if ( result == -1 ) {
  476.     fprintf(stderr, "ERROR:  connect (NoteFrameDone, port %d) returned error %d\n",
  477.         outputPortNumber, errno);
  478.     exit(1);
  479.     }
  480.  
  481.     data = negativeTwo;
  482.     data = htonl(negativeTwo);
  483.     if ( write(clientSocket, &data, 4) != 4 ) {
  484.     fprintf(stderr, "ERROR:  write (NoteFrameDone) of -2\n");
  485.     exit(1);
  486.     }
  487.  
  488.     data = htonl(frameStart);
  489.     if ( write(clientSocket, &data, 4) != 4 ) {
  490.     fprintf(stderr, "ERROR:  write (NoteFrameDone) of frame start\n");
  491.     exit(1);
  492.     }
  493.  
  494.     data = htonl(frameEnd);
  495.     if ( write(clientSocket, &data, 4) != 4 ) {
  496.     fprintf(stderr, "ERROR:  write (NoteFrameDone) of frame end\n");
  497.     exit(1);
  498.     }
  499.  
  500.     close(clientSocket);
  501. }
  502.  
  503.  
  504. /*===========================================================================*
  505.  *
  506.  * GetRemoteFrame
  507.  *
  508.  *    called by a slave; gets a remote frame from the Input server
  509.  *
  510.  * RETURNS:    nothing
  511.  *
  512.  * SIDE EFFECTS:    none
  513.  *
  514.  *===========================================================================*/
  515. void
  516. GetRemoteFrame(frame, frameNumber)
  517.     MpegFrame *frame;
  518.     int frameNumber;
  519. {
  520.     FILE    *filePtr;
  521.     int    clientSocket;
  522.     struct sockaddr_in  nameEntry;
  523.     u_short        tempShort;
  524.     unsigned char   smallBuffer[1000];
  525.     int        result;
  526.     register int y;
  527.     int        numBytes;
  528.     u_long  data;
  529.     int32   tempStart, tempEnd;
  530.  
  531.     Fsize_Note(frameNumber, yuvWidth, yuvHeight);
  532.  
  533.     if ( hostEntry == NULL ) {
  534.     hostEntry = gethostbyname(IOhostName);
  535.     if ( hostEntry == NULL ) {
  536.         fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
  537.             IOhostName);
  538.         exit(1);
  539.     }
  540.     }
  541.  
  542.     clientSocket = socket(AF_INET, SOCK_STREAM, 0);
  543.     if ( clientSocket == -1 ) {
  544.     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
  545.     exit(1);
  546.     }
  547.  
  548.     nameEntry.sin_family = AF_INET;
  549.     bzero((char *)nameEntry.sin_zero, 8);
  550.     bcopy((char *) hostEntry->h_addr_list[0],
  551.       (char *) &(nameEntry.sin_addr.s_addr),
  552.       (size_t) hostEntry->h_length);
  553.     tempShort = IOportNumber;
  554.     nameEntry.sin_port = htons(tempShort);
  555.  
  556. #ifdef BLEAH
  557. fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
  558.     getenv("HOST"), frameNumber);
  559. fflush(stdout);
  560. #endif
  561.  
  562. time(&tempStart);
  563.     result = connect(clientSocket, (struct sockaddr *) &nameEntry,
  564.              sizeof(struct sockaddr));
  565.     if ( result == -1 ) {
  566.     fprintf(stderr, "ERROR:  connect (GetRemoteFrame, port %d) returned error %d\n",
  567.         IOportNumber, errno);
  568.     exit(1);
  569.     }
  570. time(&tempEnd);
  571.  
  572.     data = frameNumber;
  573.     data = htonl(data);
  574.     if ( write(clientSocket, &data, 4) != 4 ) {
  575.     fprintf(stderr, "ERROR:  write (GetRemoteFrame) of frame number\n");
  576.     exit(1);
  577.     }
  578.  
  579.     if ( frameNumber != -1 ) {
  580.     if ( separateConversion ) {
  581.         filePtr = fopen( "/tmp/foobar", "w");
  582.  
  583.         /* read in stuff, write to file, perform local conversion */
  584.         do {
  585.         result = SafeRead(clientSocket, (char *)&numBytes, 4);
  586.         numBytes = ntohl(numBytes);
  587.  
  588.         SafeRead(clientSocket, (char *)smallBuffer, numBytes);
  589.  
  590.         fwrite(smallBuffer, 1, numBytes, filePtr);
  591.         } while ( numBytes == 1000 );
  592.  
  593.         fclose(filePtr);
  594.  
  595.         /* now do slave conversion */
  596.         ReadFrame(frame, "/tmp/foobar", slaveConversion, FALSE);
  597.     } else {
  598.         Frame_AllocYCC(frame);
  599.  
  600. #ifdef BLEAH
  601. fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
  602.     getenv("HOST"), frameNumber);
  603. fflush(stdout);
  604. #endif
  605.  
  606.         /* should now read yuv values */
  607.         for (y = 0; y < yuvHeight; y++) {            /* Y */
  608.         result = SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
  609.  
  610.         if ( result != yuvWidth ) {
  611.             fprintf(stderr, "ERROR:  read (GetRemoteFrame) of Y (got %d bytes)\n",
  612.                 result);
  613.             exit(1);
  614.         }
  615.         }
  616.  
  617.         for (y = 0; y < yuvHeight / 2; y++) {        /* U */
  618.         result = SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth/2);
  619.  
  620.         if ( result != yuvWidth/2 ) {
  621.             fprintf(stderr, "ERROR:  read (GetRemoteFrame) of Y (got %d bytes)\n",
  622.                 result);
  623.             exit(1);
  624.         }
  625.         }
  626.  
  627.         for (y = 0; y < yuvHeight / 2; y++) {        /* V */
  628.         result = SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth/2);
  629.  
  630.         if ( result != yuvWidth/2 ) {
  631.             fprintf(stderr, "ERROR:  read (GetRemoteFrame) of Y (got %d bytes)\n",
  632.                 result);
  633.             exit(1);
  634.         }
  635.         }
  636.     }
  637.     }
  638.  
  639.     data = 0;
  640.     data = htonl(data);
  641.     if ( write(clientSocket, &data, 4) != 4 ) {
  642.     fprintf(stderr, "ERROR:  write (GetRemoteFrame) of EOT\n");
  643.     exit(1);
  644.     }
  645.  
  646.     close(clientSocket);
  647.  
  648. #ifdef BLEAH
  649. fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
  650.     getenv("HOST"), frameNumber);
  651. fflush(stdout);
  652. #endif
  653.  
  654. }
  655.  
  656.  
  657. /*===========================================================================*
  658.  *
  659.  * StartOutputServer
  660.  *
  661.  *    start-up the OutputServer with this process
  662.  *    handles output and combination of frames, and exits
  663.  *    when master tells it to
  664.  *
  665.  * RETURNS:    nothing
  666.  *
  667.  * SIDE EFFECTS:    none
  668.  *
  669.  *===========================================================================*/
  670. void
  671. StartOutputServer(numInputFiles, outputFileName, parallelHostName, portNum)
  672.     int numInputFiles;
  673.     char *outputFileName;
  674.     char *parallelHostName;
  675.     int portNum;
  676. {
  677.     char    *hostName;
  678.     int        outputPortNum;
  679.     struct sockaddr_in    nameEntry;
  680.     u_short tempShort;
  681.     int        result;
  682.     FILE    *ofp;
  683.  
  684.     /* once we get Output port num, should transmit it to parallel server */
  685.  
  686.     /* create a server socket */
  687.  
  688.     if ( hostEntry == NULL ) {
  689.     hostName = getenv("HOST");
  690.  
  691.     if ( hostName == NULL ) {
  692.         fprintf(stderr, "ERROR:  Set HOST environment variable\n");
  693.         exit(1);
  694.     }
  695.  
  696.     hostEntry = gethostbyname(hostName);
  697.     if ( hostEntry == NULL ) {
  698.         fprintf(stderr, "ERROR:  Could not find host %s in database\n",
  699.             hostName);
  700.         exit(1);
  701.     }
  702.     }
  703.  
  704.     outputServerSocket = socket(AF_INET, SOCK_STREAM, 0);
  705.     if ( outputServerSocket == -1 ) {
  706.     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
  707.     exit(1);
  708.     }
  709.  
  710.     nameEntry.sin_family = AF_INET;
  711. #ifdef BLEAH
  712.     bzero((char *)nameEntry.sin_zero, 8);
  713.     bcopy((char *) hostEntry->h_addr_list[0],
  714.       (char *) &(nameEntry.sin_addr.s_addr),
  715.       (size_t) hostEntry->h_length);
  716. #endif
  717.     bzero((char *) &nameEntry, sizeof(nameEntry));
  718.  
  719.     /* find a port number that isn't used */
  720.     outputPortNum = 2048;
  721.     do {
  722.     outputPortNum++;
  723.     tempShort = outputPortNum;
  724.     nameEntry.sin_port = htons(tempShort);
  725.     result = bind(outputServerSocket, (struct sockaddr *) &nameEntry,
  726.               sizeof(struct sockaddr));
  727.     }
  728.     while ( result == -1 );
  729.  
  730.     /* would really like to wait for 1+numMachines machines, but this is max
  731.      * allowable, unfortunately
  732.      */
  733.     result = listen(outputServerSocket, SOMAXCONN);
  734.  
  735.     if ( result == -1 ) {
  736.     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
  737.     exit(1);
  738.     }
  739.  
  740.     fprintf(stdout, "====OUTPUT USING PORT %d\n", outputPortNum);
  741.  
  742.     TransmitPortNum(parallelHostName, portNum, outputPortNum);
  743.  
  744.     frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
  745.     bzero((char *)frameDone, numInputFiles*sizeof(boolean));
  746.  
  747.     if ( (ofp = fopen(outputFileName, "w")) == NULL ) {
  748.     fprintf(stderr, "ERROR:  Could not open output file!\n");
  749.     fflush(stderr);
  750.     exit(1);
  751.     }
  752.     FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
  753.  
  754.     fprintf(stdout, "====OUTPUT SERVER:  Shutting Down\n");
  755.     fflush(stdout);
  756.  
  757.     /* tell Master server we are done */
  758.     TransmitPortNum(parallelHostName, portNum, outputPortNum);
  759.  
  760.     close(outputServerSocket);
  761. }
  762.  
  763.  
  764. /*===========================================================================*
  765.  *
  766.  * WaitForOutputFile
  767.  *
  768.  *    keep handling output events until we get the specified frame
  769.  *    number
  770.  *
  771.  * RETURNS:    nothing
  772.  *
  773.  * SIDE EFFECTS:    none
  774.  *
  775.  *===========================================================================*/
  776. void
  777. WaitForOutputFile(number)
  778.     int number;
  779. {
  780.     int        otherSock;
  781.     static int otherSize = sizeof(struct sockaddr);
  782.     struct sockaddr otherSocket;
  783.     int        result;
  784.     int        frameNumber;
  785.     int32   buffer[8];
  786.     static unsigned char   *bigBuffer = NULL;
  787.     static int     bigBufferSize = 0;
  788.     int        numBytes;
  789.     char    fileName[1024];
  790.     FILE    *filePtr;
  791.     int frameStart, frameEnd;
  792.  
  793.     while ( ! frameDone[number] ) {
  794.     otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
  795.     if ( otherSock == -1 ) {
  796.         fprintf(stderr, "ERROR:  Output SERVER accept returned error %d\n", errno);
  797.         exit(1);
  798.     }
  799.  
  800.     result = SafeRead(otherSock, (char *)buffer, 4);
  801.     frameNumber = ntohl(buffer[0]);
  802.  
  803.     if ( frameNumber == -2 ) {
  804.         /* this is notification from non-remote process that a frame is done */
  805.  
  806.         result = SafeRead(otherSock, (char *)buffer, 8);
  807.         frameStart = buffer[0];
  808.         frameStart = ntohl(frameStart);
  809.         frameEnd = buffer[1];
  810.         frameEnd = ntohl(frameEnd);
  811.  
  812.         for ( frameNumber = frameStart; frameNumber <= frameEnd;
  813.           frameNumber++ ) {
  814.         frameDone[frameNumber] = TRUE;
  815.         }
  816.     } else {
  817.         /* read in number of bytes */
  818.         result = SafeRead(otherSock, (char *)buffer, 4);
  819.         numBytes = ntohl(buffer[0]);        
  820.  
  821.         /* make sure buffer is big enough for data */
  822.         if ( numBytes > bigBufferSize ) {
  823.         bigBufferSize = numBytes;
  824.         if ( bigBuffer != NULL ) {
  825.             free(bigBuffer);
  826.         }
  827.  
  828.         bigBuffer = (unsigned char *) malloc(bigBufferSize*
  829.                          sizeof(unsigned char));
  830.         }
  831.  
  832.         /* now read in the bytes */
  833.         result = SafeRead(otherSock, (char *) bigBuffer, numBytes);
  834.  
  835.         /* open file to output this stuff to */
  836.         sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
  837.         if ( (filePtr = fopen(fileName, "w")) == NULL ) {
  838.         fprintf(stderr, "ERROR:  Could not open output file:  %s\n",
  839.             fileName);
  840.         exit(1);
  841.         }
  842.  
  843.         /* now write the bytes here */
  844.         fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
  845.  
  846.         fclose(filePtr);
  847.  
  848. #ifdef BLEAH
  849.         fprintf(stdout, "====Output SERVER:  WROTE FRAME %d to disk\n",
  850.             frameNumber);
  851.         fflush(stdout);
  852. #endif
  853.  
  854.         frameDone[frameNumber] = TRUE;
  855.     }
  856.  
  857.     close(otherSock);
  858.     }
  859.  
  860. #ifdef BLEAH    
  861.     fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
  862.     fflush(stdout);
  863. #endif
  864. }
  865.  
  866.  
  867.             /*=======================*
  868.              * PARALLEL SERVER STUFF *
  869.              *=======================*/
  870.  
  871.  
  872. /*===========================================================================*
  873.  *
  874.  * StartParallelServer
  875.  *
  876.  *    start the master server with this process
  877.  *
  878.  * RETURNS:    nothing
  879.  *
  880.  * SIDE EFFECTS:    none
  881.  *
  882.  *===========================================================================*/
  883. void
  884. StartParallelServer(numInputFiles, paramFile, outputFileName)
  885.     int numInputFiles;
  886.     char *paramFile;
  887.     char *outputFileName;
  888. {
  889.     FILE    *filePtr;
  890.     register int index, index2;
  891.     int        framesPerMachine;
  892.     char    command[1024];
  893.     char    *hostName;
  894.     int        portNum;
  895.     int        serverSocket;
  896.     struct sockaddr_in    nameEntry;
  897.     u_short tempShort;
  898.     int        result;
  899.     boolean finished[MAX_MACHINES];
  900.     int        numFinished;
  901.     int        otherSock, otherSize;
  902.     struct sockaddr otherSocket;
  903.     int        seconds;
  904.     int32   buffer[8];
  905.     int IOportNum;
  906.     int        outputPortNum;
  907.     int        nextFrame;
  908.     int        numFrames[MAX_MACHINES];
  909.     int        lastNumFrames[MAX_MACHINES];
  910.     int        numSeconds[MAX_MACHINES];
  911.     float   framesPerSecond;
  912.     float   totalFPS, localFPS;
  913.     int        framesDone;
  914.     float   avgFPS;
  915.     char    niceNess[256];
  916.     int32   startFrame, endFrame;
  917.  
  918.     if ( niceProcesses ) {
  919.     sprintf(niceNess, "nice");
  920.     } else {
  921.     niceNess[0] = '\0';
  922.     }
  923.  
  924.     time(&timeStart);
  925.  
  926.     PrintStartStats(-1, 0);
  927.  
  928.     /* create a server socket */
  929.     hostName = getenv("HOST");
  930.  
  931.     if ( hostName == NULL ) {
  932.     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
  933.     exit(1);
  934.     }
  935.  
  936.     hostEntry = gethostbyname(hostName);
  937.     if ( hostEntry == NULL ) {
  938.     fprintf(stderr, "ERROR:  Could not find host %s in database\n",
  939.         hostName);
  940.     exit(1);
  941.     }
  942.  
  943.     hostName = hostEntry->h_name;
  944.  
  945.     serverSocket = socket(AF_INET, SOCK_STREAM, 0);
  946.     if ( serverSocket == -1 ) {
  947.     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
  948.     exit(1);
  949.     }
  950.  
  951.     nameEntry.sin_family = AF_INET;
  952. #ifdef BLEAH
  953.     bzero((char *)nameEntry.sin_zero, 8);
  954.     bcopy((char *) hostEntry->h_addr_list[0],
  955.       (char *) &(nameEntry.sin_addr.s_addr),
  956.       (size_t) hostEntry->h_length);
  957. #endif
  958.     bzero((char *) &nameEntry, sizeof(nameEntry));
  959.     
  960.     /* find a port number that isn't used */
  961.     portNum = 2048;
  962.     do {
  963.     portNum++;
  964.     tempShort = portNum;
  965.     nameEntry.sin_port = htons(tempShort);
  966.     result = bind(serverSocket, (struct sockaddr *) &nameEntry,
  967.               sizeof(struct sockaddr));
  968.     }
  969.     while ( result == -1 );
  970.  
  971.     /* would really like to wait for 1+numMachines machines, but this is max
  972.      * allowable, unfortunately
  973.      */
  974.     result = listen(serverSocket, SOMAXCONN);
  975.  
  976.     if ( result == -1 ) {
  977.     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
  978.     exit(1);
  979.     }
  980.  
  981.     fprintf(stdout, "---USING PORT %d\n", portNum);
  982.  
  983.     sprintf(command, "mpeg_encode -max_machines %d -io_server %s %d %s &",
  984.         numMachines, hostName, portNum, paramFile);
  985.     system(command);    /* should do an exec? */
  986.  
  987.     /* should now listen for connection from IO server */
  988.     otherSize = sizeof(otherSocket);
  989.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  990.     if ( otherSock == -1 ) {
  991.     fprintf(stderr, "ERROR:  PARALLEL SERVER accept returned error %d\n", errno);
  992.     exit(1);
  993.     }
  994.  
  995.     result = SafeRead(otherSock, (char *)(&IOportNum), 4);
  996.     IOportNum = ntohl(IOportNum);
  997.     close(otherSock);
  998.  
  999.     fprintf(stdout, "---PARALLEL SERVER:  IO port number = %d\n", IOportNum);
  1000.  
  1001.     /* START OUTPUT SERVER */
  1002.     sprintf(command, "mpeg_encode -max_machines %d -output_server %s %d %d %s &",
  1003.         numMachines, hostName, portNum, numInputFiles, paramFile);
  1004.     system(command);    /* should do an exec? */
  1005.  
  1006.     /* should now listen for connection from Output server */
  1007.     otherSize = sizeof(otherSocket);
  1008.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  1009.     if ( otherSock == -1 ) {
  1010.     fprintf(stderr, "ERROR:  PARALLEL SERVER accept returned error %d\n", errno);
  1011.     exit(1);
  1012.     }
  1013.  
  1014.     result = SafeRead(otherSock, (char *)(&outputPortNum), 4);
  1015.     outputPortNum = ntohl(outputPortNum);
  1016.     close(otherSock);
  1017.  
  1018.     fprintf(stdout, "---PARALLEL SERVER:  Output port number = %d\n",
  1019.         outputPortNum);
  1020.  
  1021.     /* we are doing whole thing (if not, see above) */
  1022.  
  1023.     framesPerMachine = numInputFiles/numMachines;
  1024.  
  1025.     numFinished = 0;
  1026.  
  1027.     /* DO INITIAL TIME TESTS */
  1028.     nextFrame = 0;
  1029.     for ( index = 0; index < numMachines; index++ ) {
  1030.     finished[index] = FALSE;
  1031.     numSeconds[index] = 0;
  1032.  
  1033.     if ( remote[index] ) {
  1034.         sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d -frames %d %d %s &",
  1035.             rsh,
  1036.             machineName[index], userName[index], niceNess,
  1037.             executable[index],
  1038.             hostName, portNum, IOportNum, outputPortNum, index,
  1039.             remote[index],
  1040.             nextFrame, nextFrame+parallelTestFrames-1,
  1041.             remoteParamFile[index]);
  1042.     } else {
  1043.         sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d -frames %d %d %s &",
  1044.             rsh,
  1045.             machineName[index], userName[index], niceNess,
  1046.             executable[index],
  1047.             hostName, portNum, IOportNum, outputPortNum, index,
  1048.             remote[index],
  1049.             nextFrame, nextFrame+parallelTestFrames-1,
  1050.             paramFile);
  1051.     }
  1052.  
  1053.     fprintf(stdout, "---%s:  frames %d to %d\n",
  1054.         machineName[index], nextFrame,
  1055.         nextFrame+parallelTestFrames-1);
  1056.     system(command);    /* should do an exec? */
  1057.  
  1058.     nextFrame += parallelTestFrames;
  1059.     numFrames[index] = parallelTestFrames;
  1060.     lastNumFrames[index] = parallelTestFrames;
  1061.     }
  1062.  
  1063.     framesDone = 0;
  1064.  
  1065.     /* now, wait for other processes to finish and boss them around */
  1066.     while ( numFinished != numMachines ) {
  1067.     otherSize = sizeof(otherSocket);
  1068.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  1069.     if ( otherSock == -1 ) {
  1070.         fprintf(stderr, "ERROR:  PARALLEL SERVER 2 accept returned error %d\n", errno);
  1071.         exit(1);
  1072.     }
  1073.  
  1074.     result = SafeRead(otherSock, (char *)buffer, 8);
  1075.  
  1076.     index = ntohl(buffer[0]);
  1077.     seconds = ntohl(buffer[1]);
  1078.  
  1079. #ifdef BLEAH
  1080.     fprintf(stdout, "%d BYTES READ\n", result);
  1081.     for ( index = 0; index < result/4; index++ ) {
  1082.         fprintf(stdout, "buffer[%d] = %d\n", index, buffer[index]);
  1083.     }
  1084.  
  1085. fprintf(stdout, "AND SO index = 0x%x, seconds = 0x%x\n",
  1086.     index, seconds);
  1087. #endif
  1088.  
  1089.     numSeconds[index] += seconds;
  1090.  
  1091.     if ( seconds != 0 )
  1092.         framesPerSecond = (float)lastNumFrames[index]/(float)seconds;
  1093.     else
  1094.         framesPerSecond = (float)lastNumFrames[index]*2.0;
  1095.  
  1096.     framesDone += lastNumFrames[index];
  1097.  
  1098.     if ( nextFrame >= numInputFiles ) {
  1099.         buffer[0] = htonl(-1);
  1100.         buffer[1] = htonl(0);
  1101.         write(otherSock, (char *)buffer, 8);
  1102.         numFinished++;
  1103.  
  1104.         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
  1105.             machineName[index], framesPerSecond, numFinished,
  1106.             numMachines);
  1107.     } else {
  1108.         avgFPS = (float)numFrames[index]/(float)numSeconds[index];
  1109.  
  1110.         startFrame = nextFrame;
  1111.         endFrame = nextFrame +
  1112.                 (int)((float)parallelTimeChunks*avgFPS) - 1;
  1113.  
  1114.         if ( endFrame < startFrame ) {   /* always give at least 1 frame */
  1115.         endFrame = startFrame;
  1116.         }
  1117.         if ( endFrame >= numInputFiles ) {
  1118.         endFrame = numInputFiles-1;
  1119.         }
  1120.  
  1121.         nextFrame = endFrame+1;
  1122.  
  1123.         numFrames[index] += (endFrame-startFrame+1);
  1124.         lastNumFrames[index] = (endFrame-startFrame+1);
  1125.  
  1126.         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
  1127.             machineName[index], framesPerSecond, numFinished,
  1128.             numMachines, startFrame, endFrame);
  1129.  
  1130.         buffer[0] = htonl(startFrame);
  1131.         buffer[1] = htonl(endFrame);
  1132.  
  1133.         write(otherSock, (char *)buffer, 8);
  1134.     }
  1135.  
  1136.     close(otherSock);
  1137.  
  1138.     fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
  1139.         framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
  1140.     }
  1141.  
  1142.     IOhostName = hostName;
  1143.     IOportNumber = IOportNum;
  1144.     EndIOServer();
  1145.  
  1146.     /* now wait for OutputServer to tell us it's done */
  1147.     otherSize = sizeof(otherSocket);
  1148.     otherSock = accept(serverSocket, &otherSocket, &otherSize);
  1149.     if ( otherSock == -1 ) {
  1150.     fprintf(stderr, "ERROR:  PARALLEL SERVER accept returned error %d\n", errno);
  1151.     exit(1);
  1152.     }
  1153.  
  1154.     result = SafeRead(otherSock, (char *)buffer, 4);
  1155.     close(otherSock);
  1156.     
  1157.     close(serverSocket);
  1158.  
  1159.     time(&timeEnd);
  1160.     diffTime = (int32)(timeEnd-timeStart);
  1161.  
  1162.     for ( index2 = 0; index2 < 2; index2++ ) {
  1163.     if ( index2 == 0 ) {
  1164.         filePtr = stdout;
  1165.     } else if ( statFile != NULL ) {
  1166.         filePtr = statFile;
  1167.     } else {
  1168.         continue;
  1169.     }
  1170.  
  1171.     fprintf(filePtr, "\n\n");
  1172.     fprintf(filePtr, "PARALLEL SUMMARY\n");
  1173.     fprintf(filePtr, "----------------\n");
  1174.     fprintf(filePtr, "\n");
  1175.     fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
  1176.         "MACHINE");
  1177.     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
  1178.     totalFPS = 0.0;
  1179.     for ( index = 0; index < numMachines; index++ ) {
  1180.         localFPS = (float)numFrames[index]/(float)numSeconds[index];
  1181.         fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
  1182.             machineName[index], numFrames[index], numSeconds[index],
  1183.             localFPS, (int)((float)numInputFiles/localFPS));
  1184.         totalFPS += localFPS;
  1185.     }
  1186.  
  1187.     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
  1188.  
  1189.     fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL", 
  1190.         (int)((float)numInputFiles/totalFPS),
  1191.         totalFPS);
  1192.     fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime, 
  1193.         (float)numInputFiles/(float)diffTime);
  1194.  
  1195.     fprintf(filePtr, "\n\n");
  1196.     }
  1197.  
  1198.     if ( statFile != NULL ) {
  1199.     fclose(statFile);
  1200.     }
  1201. }
  1202.  
  1203.  
  1204. /*===========================================================================*
  1205.  *
  1206.  * NotifyMasterDone
  1207.  *
  1208.  *    called by a slave process; tells the master process it is done
  1209.  *
  1210.  * RETURNS:    nothing
  1211.  *
  1212.  * SIDE EFFECTS:    none
  1213.  *
  1214.  *===========================================================================*/
  1215. boolean
  1216. NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
  1217.          frameEnd)
  1218.     char *hostName;
  1219.     int portNum;
  1220.     int machineNumber;
  1221.     int seconds;
  1222.     int *frameStart;
  1223.     int *frameEnd;
  1224. {
  1225.     int    clientSocket;
  1226.     struct sockaddr_in  nameEntry;
  1227.     u_short        tempShort;
  1228.     int        result;
  1229.     int32   buffer[8];
  1230.  
  1231.     if ( hostEntry == NULL ) {
  1232.     hostEntry = gethostbyname(hostName);
  1233.     if ( hostEntry == NULL ) {
  1234.         fprintf(stderr, "ERROR:  host (%s) could not be found in database\n",
  1235.             hostName);
  1236.         exit(1);
  1237.     }
  1238.     }
  1239.  
  1240.     clientSocket = socket(AF_INET, SOCK_STREAM, 0);
  1241.     nameEntry.sin_family = AF_INET;
  1242.     bzero((char *)nameEntry.sin_zero, 8);
  1243.     bcopy((char *) hostEntry->h_addr_list[0],
  1244.       (char *) &(nameEntry.sin_addr.s_addr),
  1245.       (size_t) hostEntry->h_length);
  1246.     tempShort = portNum;
  1247.     nameEntry.sin_port = htons(tempShort);
  1248.  
  1249. #ifdef BLEAH
  1250. fprintf(stdout, "NOTIFY:  Making connection\n");
  1251. #endif
  1252.  
  1253.     result = connect(clientSocket, (struct sockaddr *) &nameEntry,
  1254.              sizeof(struct sockaddr));
  1255.     if ( result == -1 ) {
  1256.     fprintf(stderr, "ERROR:  connect (NotifyMasterDone, port %d) returned error %d\n",
  1257.         portNum, errno);
  1258.     exit(1);
  1259.     }
  1260.  
  1261.     buffer[0] = htonl(machineNumber);
  1262.     buffer[1] = htonl(seconds);
  1263.  
  1264. #ifdef BLEAH
  1265. fprintf(stdout, "WRITING:  buffer[0] = 0x%x, buffer[1] = 0x%x\n",
  1266.     buffer[0], buffer[1]);
  1267. fflush(stdout);
  1268. #endif
  1269.  
  1270.     write(clientSocket, (char *)buffer, 8);
  1271.  
  1272.     result = SafeRead(clientSocket, (char *)buffer, 8);
  1273.     *frameStart = ntohl(buffer[0]);
  1274.     *frameEnd = ntohl(buffer[1]);
  1275.  
  1276.     close(clientSocket);
  1277.  
  1278.     return ((*frameStart) >= 0);
  1279. }
  1280.  
  1281.  
  1282. /*=====================*
  1283.  * INTERNAL PROCEDURES *
  1284.  *=====================*/
  1285.  
  1286.  
  1287. /*===========================================================================*
  1288.  *
  1289.  * TransmitPortNum
  1290.  *
  1291.  *    called by the I/O or Output server; transmits the appropriate
  1292.  *    port number to the master
  1293.  *
  1294.  * RETURNS:    nothing
  1295.  *
  1296.  * SIDE EFFECTS:    none
  1297.  *
  1298.  *===========================================================================*/
  1299. static void
  1300. TransmitPortNum(hostName, portNum, newPortNum)
  1301.     char *hostName;
  1302.     int portNum;
  1303.     int newPortNum;
  1304. {
  1305.     int    clientSocket;
  1306.     struct sockaddr_in  nameEntry;
  1307.     u_short        tempShort;
  1308.     int        result;
  1309.     u_long  data;
  1310.  
  1311.     if ( hostEntry == NULL ) {
  1312.     hostEntry = gethostbyname(hostName);
  1313.     if ( hostEntry == NULL ) {
  1314.         fprintf(stderr, "ERROR:  Could not find host %s in database\n",
  1315.             hostName);
  1316.         exit(1);
  1317.     }
  1318.     }
  1319.  
  1320.     clientSocket = socket(AF_INET, SOCK_STREAM, 0);
  1321.     nameEntry.sin_family = AF_INET;
  1322.     bzero((char *)nameEntry.sin_zero, 8);
  1323.     bcopy((char *) hostEntry->h_addr_list[0],
  1324.       (char *) &(nameEntry.sin_addr.s_addr),
  1325.       (size_t) hostEntry->h_length);
  1326.     tempShort = portNum;
  1327.     nameEntry.sin_port = htons(tempShort);
  1328.  
  1329.     result = connect(clientSocket, (struct sockaddr *) &nameEntry,
  1330.              sizeof(struct sockaddr));
  1331.     if ( result == -1 ) {
  1332.     fprintf(stderr, "ERROR:  connect (TransmitPortNum, port %d) returned error %d\n",
  1333.         portNum, errno);
  1334.     exit(1);
  1335.     }
  1336.  
  1337.     data = htonl(newPortNum);
  1338.     write(clientSocket, (char *) &data, 4);
  1339.  
  1340.     close(clientSocket);
  1341. }
  1342.  
  1343.  
  1344. /*===========================================================================*
  1345.  *
  1346.  * SafeRead
  1347.  *
  1348.  *    safely read from the given socket; the procedure keeps reading until
  1349.  *    it gets the number of bytes specified
  1350.  *
  1351.  * RETURNS:    nothing
  1352.  *
  1353.  * SIDE EFFECTS:    none
  1354.  *
  1355.  *===========================================================================*/
  1356. static int
  1357. SafeRead(fd, buf, nbyte)
  1358.     int fd;
  1359.     char *buf;
  1360.     int nbyte;
  1361. {
  1362.     int numRead;
  1363.     int result;
  1364.  
  1365.     numRead = 0;
  1366.  
  1367.     while ( numRead != nbyte ) {
  1368.         result = read(fd, &buf[numRead], nbyte-numRead);
  1369.  
  1370.     if ( result == -1 ) {
  1371.         fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
  1372.             nbyte-numRead, nbyte, errno);
  1373.         exit(1);
  1374.     }
  1375.     numRead += result;
  1376.     }
  1377.  
  1378.     return numRead;
  1379. }
  1380.  
  1381.  
  1382. /*===========================================================================*
  1383.  *
  1384.  * EndIOServer
  1385.  *
  1386.  *    called by the master process -- tells the I/O server to commit
  1387.  *    suicide
  1388.  *
  1389.  * RETURNS:    nothing
  1390.  *
  1391.  * SIDE EFFECTS:    none
  1392.  *
  1393.  *===========================================================================*/
  1394. static void
  1395. EndIOServer()
  1396. {
  1397.     /* send signal to IO server:  -1 as frame number */
  1398.     GetRemoteFrame(NULL, -1);
  1399. }
  1400.