home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
DP Tool Club 9
/
CD_ASCQ_09_1193.iso
/
news
/
4441
/
mpegcode
/
src
/
parallel.c
< prev
next >
Wrap
C/C++ Source or Header
|
1993-09-27
|
38KB
|
1,400 lines
/*===========================================================================*
* parallel.c *
* *
* Procedures to make encoder run in parallel *
* *
* EXPORTED PROCEDURES: *
* StartInputServer *
* StartOutputServer *
* SendRemoteFrame *
* GetRemoteFrame *
* StartParallelServer *
* NotifyMasterDone *
* *
*===========================================================================*/
/*
* Copyright (c) 1993 The Regents of the University of California.
* All rights reserved.
*
* Permission to use, copy, modify, and distribute this software and its
* documentation for any purpose, without fee, and without written agreement is
* hereby granted, provided that the above copyright notice and the following
* two paragraphs appear in all copies of this software.
*
* IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
* DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
* OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
* CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
* AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
* ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
* PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
*/
/*
* $Header: /n/picasso/users/keving/encode/src/RCS/parallel.c,v 1.2 1993/07/22 22:23:43 keving Exp keving $
* $Log: parallel.c,v $
* Revision 1.2 1993/07/22 22:23:43 keving
* nothing
*
* Revision 1.1 1993/06/30 20:06:09 keving
* nothing
*
*/
/*==============*
* HEADER FILES *
*==============*/
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/times.h>
#include <netinet/in.h>
#include <netdb.h>
#include <errno.h>
#include "all.h"
#include "param.h"
#include "mpeg.h"
#include "prototypes.h"
#include "parallel.h"
#include "readframe.h"
#include "fsize.h"
#include "combine.h"
/*==================*
* STATIC VARIABLES *
*==================*/
static int32 diffTime;
static boolean separateConversion;
static char rsh[256];
static struct hostent *hostEntry = NULL;
static boolean *frameDone;
static int outputServerSocket;
/*==================*
* GLOBAL VARIABLES *
*==================*/
extern int yuvHeight, yuvWidth;
extern time_t timeStart, timeEnd;
extern char statFileName[256];
extern FILE *statFile;
int parallelTestFrames = 10;
int parallelTimeChunks = 60;
char *IOhostName;
int IOportNumber;
int outputPortNumber;
boolean niceProcesses;
/*===============================*
* INTERNAL PROCEDURE prototypes *
*===============================*/
static void TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
int IOportNum));
static void EndIOServer _ANSI_ARGS_((void));
static int SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
/*=====================*
* EXPORTED PROCEDURES *
*=====================*/
/*=================*
* IO SERVER STUFF *
*=================*/
/*===========================================================================*
*
* SetIOConvert
*
* sets the IO conversion to be separate or not. If separate, then
* some post-processing is done at slave end
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
SetIOConvert(separate)
boolean separate;
{
separateConversion = separate;
}
/*===========================================================================*
*
* SetRemoteShell
*
* sets the remote shell program (usually rsh, but different on some
* machines)
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
SetRemoteShell(shell)
char *shell;
{
strcpy(rsh, shell);
}
/*===========================================================================*
*
* StartInputServer
*
* start-up the InputServer with this process
* handles slave requests for frames, and exits when master tells it to
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
StartInputServer(numInputFiles, parallelHostName, portNum)
int numInputFiles;
char *parallelHostName;
int portNum;
{
char *hostName;
int IOportNum;
int serverSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
int otherSock, otherSize;
struct sockaddr otherSocket;
int32 buffer[8];
boolean done = FALSE;
int frameNumber;
MpegFrame *frame;
register int y;
int numBytes;
unsigned char *bigBuffer;
unsigned char smallBuffer[1000];
int bigBufferSize;
FILE *filePtr;
uint32 data;
char inputFileName[1024];
bigBufferSize = 0;
bigBuffer = NULL;
/* once we get IO port num, should transmit it to parallel server */
/* create a server socket */
hostName = getenv("HOST");
if ( hostName == NULL ) {
fprintf(stderr, "ERROR: Set HOST environment variable\n");
exit(1);
}
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(hostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Could not find host %s in database\n",
hostName);
exit(1);
}
}
serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( serverSocket == -1 ) {
fprintf(stderr, "ERROR: Call to socket() gave error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
#ifdef BLEAH
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
#endif
bzero((char *) &nameEntry, sizeof(nameEntry));
/* find a port number that isn't used */
IOportNum = 2048;
do {
IOportNum++;
tempShort = IOportNum;
nameEntry.sin_port = htons(tempShort);
result = bind(serverSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
}
while ( result == -1 );
/* would really like to wait for 1+numMachines machines, but this is max
* allowable, unfortunately
*/
result = listen(serverSocket, SOMAXCONN);
if ( result == -1 ) {
fprintf(stderr, "ERROR: call to listen() gave error %d\n", errno);
exit(1);
}
fprintf(stdout, "====I/O USING PORT %d\n", IOportNum);
TransmitPortNum(parallelHostName, portNum, IOportNum);
otherSize = sizeof(otherSocket);
SetFileType(); /* for reading */
/* now, wait until get done signal */
while ( ! done ) {
otherSock = accept(serverSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: Input SERVER accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)buffer, 4);
frameNumber = ntohl(buffer[0]);
if ( frameNumber == -1 ) {
done = TRUE;
} else {
#ifdef BLEAH
fprintf(stdout, "Input SERVER GETTING FRAME %d\n", frameNumber);
fflush(stdout);
#endif
/* should read in frame, then write to socket */
frame = Frame_New(frameNumber, 'i');
if ( separateConversion ) {
GetNthInputFileName(inputFileName, frameNumber);
/* do conversion and send right to the socket */
filePtr = ReadIOConvert(inputFileName);
do {
numBytes = fread(smallBuffer, 1, 1000, filePtr);
if ( numBytes > 0 ) {
data = numBytes;
data = htonl(data);
write(otherSock, &data, 4);
write(otherSock, smallBuffer, numBytes);
}
}
while ( numBytes == 1000 );
if ( strcmp(ioConversion, "*") == 0 ) {
fclose(filePtr);
} else {
pclose(filePtr);
}
} else {
GetNthInputFileName(inputFileName, frameNumber);
ReadFrame(frame, inputFileName, inputConversion, TRUE);
/* should now transmit yuv values */
for (y = 0; y < yuvHeight; y++) { /* Y */
write(otherSock, frame->orig_y[y], yuvWidth);
}
for (y = 0; y < yuvHeight / 2; y++) { /* U */
write(otherSock, frame->orig_cb[y], yuvWidth / 2);
}
for (y = 0; y < yuvHeight / 2; y++) { /* V */
write(otherSock, frame->orig_cr[y], yuvWidth / 2);
}
/* now, make sure we don't leave until other processor read everything */
result = SafeRead(otherSock, (char *)buffer, 4);
/* should = 0 */
}
#ifdef BLEAH
fprintf(stdout, "====Input SERVER: READ FRAME %d\n",
frameNumber);
#endif
Frame_Free(frame);
}
close(otherSock);
}
close(serverSocket);
fprintf(stdout, "====Input SERVER: Shutting Down\n");
}
/*===========================================================================*
*
* SendRemoteFrame
*
* called by a slave to the Output server; sends an encoded frame
* to the server to be sent to disk
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
SendRemoteFrame(frameNumber, bb)
int frameNumber;
BitBucket *bb;
{
int clientSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
u_long data;
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(IOhostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Couldn't get host by name (%s)\n",
IOhostName);
exit(1);
}
}
clientSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( clientSocket == -1 ) {
fprintf(stderr, "ERROR: socket returned error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
tempShort = outputPortNumber;
nameEntry.sin_port = htons(tempShort);
result = connect(clientSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
if ( result == -1 ) {
fprintf(stderr, "ERROR: connect (SendRemoteFrame, port %d) returned error %d\n",
outputPortNumber, errno);
exit(1);
}
data = htonl(frameNumber);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (SendRemoteFrame) of frame number\n");
exit(1);
}
if ( frameNumber != -1 ) {
/* send number of bytes */
data = (bb->totalbits+7)/8;
data = htonl(data);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (SendRemoteFrame) of #bytes\n");
exit(1);
}
/* now send the bytes themselves */
Bitio_WriteToSocket(bb, clientSocket);
}
close(clientSocket);
}
/*===========================================================================*
*
* NoteFrameDone
*
* called by a slave to the Output server; tells it these frames are
* done
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
NoteFrameDone(frameStart, frameEnd)
int frameStart;
int frameEnd;
{
int clientSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
u_long data;
int negativeTwo = -2;
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(IOhostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Couldn't get host by name (%s)\n",
IOhostName);
exit(1);
}
}
clientSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( clientSocket == -1 ) {
fprintf(stderr, "ERROR: socket returned error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
tempShort = outputPortNumber;
nameEntry.sin_port = htons(tempShort);
result = connect(clientSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
if ( result == -1 ) {
fprintf(stderr, "ERROR: connect (NoteFrameDone, port %d) returned error %d\n",
outputPortNumber, errno);
exit(1);
}
data = negativeTwo;
data = htonl(negativeTwo);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (NoteFrameDone) of -2\n");
exit(1);
}
data = htonl(frameStart);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (NoteFrameDone) of frame start\n");
exit(1);
}
data = htonl(frameEnd);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (NoteFrameDone) of frame end\n");
exit(1);
}
close(clientSocket);
}
/*===========================================================================*
*
* GetRemoteFrame
*
* called by a slave; gets a remote frame from the Input server
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
GetRemoteFrame(frame, frameNumber)
MpegFrame *frame;
int frameNumber;
{
FILE *filePtr;
int clientSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
unsigned char smallBuffer[1000];
int result;
register int y;
int numBytes;
u_long data;
int32 tempStart, tempEnd;
Fsize_Note(frameNumber, yuvWidth, yuvHeight);
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(IOhostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Couldn't get host by name (%s)\n",
IOhostName);
exit(1);
}
}
clientSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( clientSocket == -1 ) {
fprintf(stderr, "ERROR: socket returned error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
tempShort = IOportNumber;
nameEntry.sin_port = htons(tempShort);
#ifdef BLEAH
fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
getenv("HOST"), frameNumber);
fflush(stdout);
#endif
time(&tempStart);
result = connect(clientSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
if ( result == -1 ) {
fprintf(stderr, "ERROR: connect (GetRemoteFrame, port %d) returned error %d\n",
IOportNumber, errno);
exit(1);
}
time(&tempEnd);
data = frameNumber;
data = htonl(data);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (GetRemoteFrame) of frame number\n");
exit(1);
}
if ( frameNumber != -1 ) {
if ( separateConversion ) {
filePtr = fopen( "/tmp/foobar", "w");
/* read in stuff, write to file, perform local conversion */
do {
result = SafeRead(clientSocket, (char *)&numBytes, 4);
numBytes = ntohl(numBytes);
SafeRead(clientSocket, (char *)smallBuffer, numBytes);
fwrite(smallBuffer, 1, numBytes, filePtr);
} while ( numBytes == 1000 );
fclose(filePtr);
/* now do slave conversion */
ReadFrame(frame, "/tmp/foobar", slaveConversion, FALSE);
} else {
Frame_AllocYCC(frame);
#ifdef BLEAH
fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
getenv("HOST"), frameNumber);
fflush(stdout);
#endif
/* should now read yuv values */
for (y = 0; y < yuvHeight; y++) { /* Y */
result = SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
if ( result != yuvWidth ) {
fprintf(stderr, "ERROR: read (GetRemoteFrame) of Y (got %d bytes)\n",
result);
exit(1);
}
}
for (y = 0; y < yuvHeight / 2; y++) { /* U */
result = SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth/2);
if ( result != yuvWidth/2 ) {
fprintf(stderr, "ERROR: read (GetRemoteFrame) of Y (got %d bytes)\n",
result);
exit(1);
}
}
for (y = 0; y < yuvHeight / 2; y++) { /* V */
result = SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth/2);
if ( result != yuvWidth/2 ) {
fprintf(stderr, "ERROR: read (GetRemoteFrame) of Y (got %d bytes)\n",
result);
exit(1);
}
}
}
}
data = 0;
data = htonl(data);
if ( write(clientSocket, &data, 4) != 4 ) {
fprintf(stderr, "ERROR: write (GetRemoteFrame) of EOT\n");
exit(1);
}
close(clientSocket);
#ifdef BLEAH
fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
getenv("HOST"), frameNumber);
fflush(stdout);
#endif
}
/*===========================================================================*
*
* StartOutputServer
*
* start-up the OutputServer with this process
* handles output and combination of frames, and exits
* when master tells it to
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
StartOutputServer(numInputFiles, outputFileName, parallelHostName, portNum)
int numInputFiles;
char *outputFileName;
char *parallelHostName;
int portNum;
{
char *hostName;
int outputPortNum;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
FILE *ofp;
/* once we get Output port num, should transmit it to parallel server */
/* create a server socket */
if ( hostEntry == NULL ) {
hostName = getenv("HOST");
if ( hostName == NULL ) {
fprintf(stderr, "ERROR: Set HOST environment variable\n");
exit(1);
}
hostEntry = gethostbyname(hostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Could not find host %s in database\n",
hostName);
exit(1);
}
}
outputServerSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( outputServerSocket == -1 ) {
fprintf(stderr, "ERROR: Call to socket() gave error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
#ifdef BLEAH
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
#endif
bzero((char *) &nameEntry, sizeof(nameEntry));
/* find a port number that isn't used */
outputPortNum = 2048;
do {
outputPortNum++;
tempShort = outputPortNum;
nameEntry.sin_port = htons(tempShort);
result = bind(outputServerSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
}
while ( result == -1 );
/* would really like to wait for 1+numMachines machines, but this is max
* allowable, unfortunately
*/
result = listen(outputServerSocket, SOMAXCONN);
if ( result == -1 ) {
fprintf(stderr, "ERROR: call to listen() gave error %d\n", errno);
exit(1);
}
fprintf(stdout, "====OUTPUT USING PORT %d\n", outputPortNum);
TransmitPortNum(parallelHostName, portNum, outputPortNum);
frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
bzero((char *)frameDone, numInputFiles*sizeof(boolean));
if ( (ofp = fopen(outputFileName, "w")) == NULL ) {
fprintf(stderr, "ERROR: Could not open output file!\n");
fflush(stderr);
exit(1);
}
FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
fprintf(stdout, "====OUTPUT SERVER: Shutting Down\n");
fflush(stdout);
/* tell Master server we are done */
TransmitPortNum(parallelHostName, portNum, outputPortNum);
close(outputServerSocket);
}
/*===========================================================================*
*
* WaitForOutputFile
*
* keep handling output events until we get the specified frame
* number
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
WaitForOutputFile(number)
int number;
{
int otherSock;
static int otherSize = sizeof(struct sockaddr);
struct sockaddr otherSocket;
int result;
int frameNumber;
int32 buffer[8];
static unsigned char *bigBuffer = NULL;
static int bigBufferSize = 0;
int numBytes;
char fileName[1024];
FILE *filePtr;
int frameStart, frameEnd;
while ( ! frameDone[number] ) {
otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: Output SERVER accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)buffer, 4);
frameNumber = ntohl(buffer[0]);
if ( frameNumber == -2 ) {
/* this is notification from non-remote process that a frame is done */
result = SafeRead(otherSock, (char *)buffer, 8);
frameStart = buffer[0];
frameStart = ntohl(frameStart);
frameEnd = buffer[1];
frameEnd = ntohl(frameEnd);
for ( frameNumber = frameStart; frameNumber <= frameEnd;
frameNumber++ ) {
frameDone[frameNumber] = TRUE;
}
} else {
/* read in number of bytes */
result = SafeRead(otherSock, (char *)buffer, 4);
numBytes = ntohl(buffer[0]);
/* make sure buffer is big enough for data */
if ( numBytes > bigBufferSize ) {
bigBufferSize = numBytes;
if ( bigBuffer != NULL ) {
free(bigBuffer);
}
bigBuffer = (unsigned char *) malloc(bigBufferSize*
sizeof(unsigned char));
}
/* now read in the bytes */
result = SafeRead(otherSock, (char *) bigBuffer, numBytes);
/* open file to output this stuff to */
sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
if ( (filePtr = fopen(fileName, "w")) == NULL ) {
fprintf(stderr, "ERROR: Could not open output file: %s\n",
fileName);
exit(1);
}
/* now write the bytes here */
fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
fclose(filePtr);
#ifdef BLEAH
fprintf(stdout, "====Output SERVER: WROTE FRAME %d to disk\n",
frameNumber);
fflush(stdout);
#endif
frameDone[frameNumber] = TRUE;
}
close(otherSock);
}
#ifdef BLEAH
fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
fflush(stdout);
#endif
}
/*=======================*
* PARALLEL SERVER STUFF *
*=======================*/
/*===========================================================================*
*
* StartParallelServer
*
* start the master server with this process
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
void
StartParallelServer(numInputFiles, paramFile, outputFileName)
int numInputFiles;
char *paramFile;
char *outputFileName;
{
FILE *filePtr;
register int index, index2;
int framesPerMachine;
char command[1024];
char *hostName;
int portNum;
int serverSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
boolean finished[MAX_MACHINES];
int numFinished;
int otherSock, otherSize;
struct sockaddr otherSocket;
int seconds;
int32 buffer[8];
int IOportNum;
int outputPortNum;
int nextFrame;
int numFrames[MAX_MACHINES];
int lastNumFrames[MAX_MACHINES];
int numSeconds[MAX_MACHINES];
float framesPerSecond;
float totalFPS, localFPS;
int framesDone;
float avgFPS;
char niceNess[256];
int32 startFrame, endFrame;
if ( niceProcesses ) {
sprintf(niceNess, "nice");
} else {
niceNess[0] = '\0';
}
time(&timeStart);
PrintStartStats(-1, 0);
/* create a server socket */
hostName = getenv("HOST");
if ( hostName == NULL ) {
fprintf(stderr, "ERROR: Set HOST environment variable\n");
exit(1);
}
hostEntry = gethostbyname(hostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Could not find host %s in database\n",
hostName);
exit(1);
}
hostName = hostEntry->h_name;
serverSocket = socket(AF_INET, SOCK_STREAM, 0);
if ( serverSocket == -1 ) {
fprintf(stderr, "ERROR: Call to socket() gave error %d\n", errno);
exit(1);
}
nameEntry.sin_family = AF_INET;
#ifdef BLEAH
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
#endif
bzero((char *) &nameEntry, sizeof(nameEntry));
/* find a port number that isn't used */
portNum = 2048;
do {
portNum++;
tempShort = portNum;
nameEntry.sin_port = htons(tempShort);
result = bind(serverSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
}
while ( result == -1 );
/* would really like to wait for 1+numMachines machines, but this is max
* allowable, unfortunately
*/
result = listen(serverSocket, SOMAXCONN);
if ( result == -1 ) {
fprintf(stderr, "ERROR: call to listen() gave error %d\n", errno);
exit(1);
}
fprintf(stdout, "---USING PORT %d\n", portNum);
sprintf(command, "mpeg_encode -max_machines %d -io_server %s %d %s &",
numMachines, hostName, portNum, paramFile);
system(command); /* should do an exec? */
/* should now listen for connection from IO server */
otherSize = sizeof(otherSocket);
otherSock = accept(serverSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: PARALLEL SERVER accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)(&IOportNum), 4);
IOportNum = ntohl(IOportNum);
close(otherSock);
fprintf(stdout, "---PARALLEL SERVER: IO port number = %d\n", IOportNum);
/* START OUTPUT SERVER */
sprintf(command, "mpeg_encode -max_machines %d -output_server %s %d %d %s &",
numMachines, hostName, portNum, numInputFiles, paramFile);
system(command); /* should do an exec? */
/* should now listen for connection from Output server */
otherSize = sizeof(otherSocket);
otherSock = accept(serverSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: PARALLEL SERVER accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)(&outputPortNum), 4);
outputPortNum = ntohl(outputPortNum);
close(otherSock);
fprintf(stdout, "---PARALLEL SERVER: Output port number = %d\n",
outputPortNum);
/* we are doing whole thing (if not, see above) */
framesPerMachine = numInputFiles/numMachines;
numFinished = 0;
/* DO INITIAL TIME TESTS */
nextFrame = 0;
for ( index = 0; index < numMachines; index++ ) {
finished[index] = FALSE;
numSeconds[index] = 0;
if ( remote[index] ) {
sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d -frames %d %d %s &",
rsh,
machineName[index], userName[index], niceNess,
executable[index],
hostName, portNum, IOportNum, outputPortNum, index,
remote[index],
nextFrame, nextFrame+parallelTestFrames-1,
remoteParamFile[index]);
} else {
sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d -frames %d %d %s &",
rsh,
machineName[index], userName[index], niceNess,
executable[index],
hostName, portNum, IOportNum, outputPortNum, index,
remote[index],
nextFrame, nextFrame+parallelTestFrames-1,
paramFile);
}
fprintf(stdout, "---%s: frames %d to %d\n",
machineName[index], nextFrame,
nextFrame+parallelTestFrames-1);
system(command); /* should do an exec? */
nextFrame += parallelTestFrames;
numFrames[index] = parallelTestFrames;
lastNumFrames[index] = parallelTestFrames;
}
framesDone = 0;
/* now, wait for other processes to finish and boss them around */
while ( numFinished != numMachines ) {
otherSize = sizeof(otherSocket);
otherSock = accept(serverSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: PARALLEL SERVER 2 accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)buffer, 8);
index = ntohl(buffer[0]);
seconds = ntohl(buffer[1]);
#ifdef BLEAH
fprintf(stdout, "%d BYTES READ\n", result);
for ( index = 0; index < result/4; index++ ) {
fprintf(stdout, "buffer[%d] = %d\n", index, buffer[index]);
}
fprintf(stdout, "AND SO index = 0x%x, seconds = 0x%x\n",
index, seconds);
#endif
numSeconds[index] += seconds;
if ( seconds != 0 )
framesPerSecond = (float)lastNumFrames[index]/(float)seconds;
else
framesPerSecond = (float)lastNumFrames[index]*2.0;
framesDone += lastNumFrames[index];
if ( nextFrame >= numInputFiles ) {
buffer[0] = htonl(-1);
buffer[1] = htonl(0);
write(otherSock, (char *)buffer, 8);
numFinished++;
fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done): DONE\n",
machineName[index], framesPerSecond, numFinished,
numMachines);
} else {
avgFPS = (float)numFrames[index]/(float)numSeconds[index];
startFrame = nextFrame;
endFrame = nextFrame +
(int)((float)parallelTimeChunks*avgFPS) - 1;
if ( endFrame < startFrame ) { /* always give at least 1 frame */
endFrame = startFrame;
}
if ( endFrame >= numInputFiles ) {
endFrame = numInputFiles-1;
}
nextFrame = endFrame+1;
numFrames[index] += (endFrame-startFrame+1);
lastNumFrames[index] = (endFrame-startFrame+1);
fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done): next: %d to %d\n",
machineName[index], framesPerSecond, numFinished,
numMachines, startFrame, endFrame);
buffer[0] = htonl(startFrame);
buffer[1] = htonl(endFrame);
write(otherSock, (char *)buffer, 8);
}
close(otherSock);
fprintf(stdout, "---FRAMES DONE: %d\tFARMED OUT: %d\tLEFT: %d\n",
framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
}
IOhostName = hostName;
IOportNumber = IOportNum;
EndIOServer();
/* now wait for OutputServer to tell us it's done */
otherSize = sizeof(otherSocket);
otherSock = accept(serverSocket, &otherSocket, &otherSize);
if ( otherSock == -1 ) {
fprintf(stderr, "ERROR: PARALLEL SERVER accept returned error %d\n", errno);
exit(1);
}
result = SafeRead(otherSock, (char *)buffer, 4);
close(otherSock);
close(serverSocket);
time(&timeEnd);
diffTime = (int32)(timeEnd-timeStart);
for ( index2 = 0; index2 < 2; index2++ ) {
if ( index2 == 0 ) {
filePtr = stdout;
} else if ( statFile != NULL ) {
filePtr = statFile;
} else {
continue;
}
fprintf(filePtr, "\n\n");
fprintf(filePtr, "PARALLEL SUMMARY\n");
fprintf(filePtr, "----------------\n");
fprintf(filePtr, "\n");
fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
"MACHINE");
fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
totalFPS = 0.0;
for ( index = 0; index < numMachines; index++ ) {
localFPS = (float)numFrames[index]/(float)numSeconds[index];
fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
machineName[index], numFrames[index], numSeconds[index],
localFPS, (int)((float)numInputFiles/localFPS));
totalFPS += localFPS;
}
fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL",
(int)((float)numInputFiles/totalFPS),
totalFPS);
fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime,
(float)numInputFiles/(float)diffTime);
fprintf(filePtr, "\n\n");
}
if ( statFile != NULL ) {
fclose(statFile);
}
}
/*===========================================================================*
*
* NotifyMasterDone
*
* called by a slave process; tells the master process it is done
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
boolean
NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
frameEnd)
char *hostName;
int portNum;
int machineNumber;
int seconds;
int *frameStart;
int *frameEnd;
{
int clientSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
int32 buffer[8];
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(hostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: host (%s) could not be found in database\n",
hostName);
exit(1);
}
}
clientSocket = socket(AF_INET, SOCK_STREAM, 0);
nameEntry.sin_family = AF_INET;
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
tempShort = portNum;
nameEntry.sin_port = htons(tempShort);
#ifdef BLEAH
fprintf(stdout, "NOTIFY: Making connection\n");
#endif
result = connect(clientSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
if ( result == -1 ) {
fprintf(stderr, "ERROR: connect (NotifyMasterDone, port %d) returned error %d\n",
portNum, errno);
exit(1);
}
buffer[0] = htonl(machineNumber);
buffer[1] = htonl(seconds);
#ifdef BLEAH
fprintf(stdout, "WRITING: buffer[0] = 0x%x, buffer[1] = 0x%x\n",
buffer[0], buffer[1]);
fflush(stdout);
#endif
write(clientSocket, (char *)buffer, 8);
result = SafeRead(clientSocket, (char *)buffer, 8);
*frameStart = ntohl(buffer[0]);
*frameEnd = ntohl(buffer[1]);
close(clientSocket);
return ((*frameStart) >= 0);
}
/*=====================*
* INTERNAL PROCEDURES *
*=====================*/
/*===========================================================================*
*
* TransmitPortNum
*
* called by the I/O or Output server; transmits the appropriate
* port number to the master
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
static void
TransmitPortNum(hostName, portNum, newPortNum)
char *hostName;
int portNum;
int newPortNum;
{
int clientSocket;
struct sockaddr_in nameEntry;
u_short tempShort;
int result;
u_long data;
if ( hostEntry == NULL ) {
hostEntry = gethostbyname(hostName);
if ( hostEntry == NULL ) {
fprintf(stderr, "ERROR: Could not find host %s in database\n",
hostName);
exit(1);
}
}
clientSocket = socket(AF_INET, SOCK_STREAM, 0);
nameEntry.sin_family = AF_INET;
bzero((char *)nameEntry.sin_zero, 8);
bcopy((char *) hostEntry->h_addr_list[0],
(char *) &(nameEntry.sin_addr.s_addr),
(size_t) hostEntry->h_length);
tempShort = portNum;
nameEntry.sin_port = htons(tempShort);
result = connect(clientSocket, (struct sockaddr *) &nameEntry,
sizeof(struct sockaddr));
if ( result == -1 ) {
fprintf(stderr, "ERROR: connect (TransmitPortNum, port %d) returned error %d\n",
portNum, errno);
exit(1);
}
data = htonl(newPortNum);
write(clientSocket, (char *) &data, 4);
close(clientSocket);
}
/*===========================================================================*
*
* SafeRead
*
* safely read from the given socket; the procedure keeps reading until
* it gets the number of bytes specified
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
static int
SafeRead(fd, buf, nbyte)
int fd;
char *buf;
int nbyte;
{
int numRead;
int result;
numRead = 0;
while ( numRead != nbyte ) {
result = read(fd, &buf[numRead], nbyte-numRead);
if ( result == -1 ) {
fprintf(stderr, "ERROR: read (of %d bytes (total %d) ) returned error %d\n",
nbyte-numRead, nbyte, errno);
exit(1);
}
numRead += result;
}
return numRead;
}
/*===========================================================================*
*
* EndIOServer
*
* called by the master process -- tells the I/O server to commit
* suicide
*
* RETURNS: nothing
*
* SIDE EFFECTS: none
*
*===========================================================================*/
static void
EndIOServer()
{
/* send signal to IO server: -1 as frame number */
GetRemoteFrame(NULL, -1);
}