home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Source Code 1992 March
/
Source_Code_CD-ROM_Walnut_Creek_March_1992.iso
/
usenet
/
altsrcs
/
1
/
1607
/
buffer.c
next >
Wrap
C/C++ Source or Header
|
1990-12-28
|
10KB
|
491 lines
/* This is a reblocking process, designed to try and read from stdin
* and write to stdout - but to always try and keep the writing side
* busy. It is meant to try and stream tape writes.
*
* This program runs in two parts. The reader and the writer. They
* communicate using shared memory with semaphores locking the access.
* The shared memory implements a circular list of blocks of data.
*
* L.McLoughlin, Imperial College, 1990
*
* $Log: buffer.c,v $
* Revision 1.3 90/05/15 23:27:46 lmjm
* Added -S option (show how much has been writen).
* Added -m option to specify how much shared memory to grab.
* Now tries to fill this with blocks.
* reader waits for writer to terminate and then frees the shared mem and sems.
*
* Revision 1.2 90/01/20 21:37:59 lmjm
* Reset default number of blocks and blocksize for best thruput of
* standard tar 10K blocks.
* Allow number of blocks to be changed.
* Don't need a hole in the circular queue since the semaphores prevent block
* clash.
*
* Revision 1.1 90/01/17 11:30:23 lmjm
* Initial revision
*
*/
#include <stdio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#ifndef lint
static char *rcsid = "$Header: /home/gould/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.3 90/05/15 23:27:46 lmjm Exp Locker: lmjm $";
#endif
extern char *shmat();
/* General macros */
#define TRUE 1
#define FALSE 0
#define K *1024
/* Some forward declarations */
void byee();
void start_reader_and_writer();
/* When showing print a note every this many bytes writen */
int showevery = 0;
#define PRINT_EVERY 10 K
/* This is the inter-process buffer - it implements a circular list
* of blocks. */
#define DEF_BLOCKSIZE (10 K)
#define MAX_BLOCKSIZE (64 K)
int blocksize = DEF_BLOCKSIZE;
/* Numbers of blocks in the queue.
*/
#define MAX_BLOCKS 2048
int blocks = 1;
/* Circular increment of a buffer index */
#define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))
/* Max ammount of shared memory you can allocate - can't see a way to look
* this up.
*/
#define DEF_SHMEM (1 K K)
int max_shmem = DEF_SHMEM;
/* Just a flag to show unfilled */
#define NONE (-1)
/* the shared memory id of the buffer */
int buffer_id = NONE;
struct block {
int bytes;
char *data;
} *curr_block;
#define NO_BUFFER ((struct buffer *)-1)
struct buffer {
/* writer will hang trying to lock this till reader fills in a block */
int blocks_used_lock;
/* reader will hang trying to lock this till writer empties a block */
int blocks_free_lock;
int next_block_in;
int next_block_out;
struct block block[ MAX_BLOCKS ];
/* These actual space for the blocks is here - the array extends
* pass 1 */
char data_space[ 1 ];
} *pbuffer = NO_BUFFER;
int buffer_size;
int writer_pid = 0;
int debug = 0;
char *progname = "buffer";
main( argc, argv )
int argc;
char **argv;
{
parse_args( argc, argv );
set_handlers();
buffer_allocate();
start_reader_and_writer();
byee( 0 );
}
parse_args( argc, argv )
int argc;
char **argv;
{
int c;
extern char *optarg;
extern int optind;
char blocks_given = FALSE;
while( (c = getopt( argc, argv, "S:dm:s:b:" )) != -1 ){
switch( c ){
case 'S':
/* Show every once in a while how much is printed */
showevery = do_size( optarg );
if( showevery <= 0 )
showevery = PRINT_EVERY;
break;
case 'd': /* debug */
debug = 1;
setbuf( stdout, NULL );
setbuf( stderr, NULL );
fprintf( stderr, "debugging turned on\n" );
break;
case 'm':
/* Max size of shared memory lump */
max_shmem = do_size( optarg );
if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
fprintf( stderr, "max_shmem %d too low\n", max_shmem );
byee( -1 );
}
break;
case 'b':
/* Number of blocks */
blocks_given = TRUE;
blocks = atoi( optarg );
if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
fprintf( stderr, "blocks %d out of range\n", blocks );
byee( -1 );
}
break;
case 's': /* Size of a block */
blocksize = do_size( optarg );
if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
fprintf( stderr, "blocksize %d out of range\n", blocksize );
byee( -1 );
}
break;
default:
fprintf( stderr, "Usage: %s [-S size] [-m memsize] [-b blocks] [-s blocksize]\n",
progname );
fprintf( stderr, "-S = show ammount writen every size bytes\n" );
fprintf( stderr, "-m = size of shared mem chunk to grab\n" );
fprintf( stderr, "-b = number of blocks in queue\n" );
fprintf( stderr, "-s = size of a block\n" );
byee( -1 );
}
}
/* If -b was not given try and work out the max buffer size */
if( !blocks_given ){
blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
if( blocks <= 0 ){
fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
byee( -1 );
}
if( MAX_BLOCKS < blocks ){
fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
byee( -1 );
}
}
}
/* The interrupt handler */
shutdown()
{
byee( -1 );
}
set_handlers()
{
signal( SIGHUP, shutdown );
signal( SIGINT, shutdown );
signal( SIGQUIT, shutdown );
signal( SIGTERM, shutdown );
if( writer_pid ){
/* This is the reader - propogate the signal to the writer */
kill( writer_pid, SIGTERM );
}
}
buffer_allocate()
{
int i;
/* Allow for the data space */
buffer_size = sizeof( struct buffer ) +
((blocks * blocksize) - sizeof( char ));
/* Create the space for the buffer */
buffer_id = shmget( IPC_PRIVATE,
buffer_size,
IPC_CREAT|S_IREAD|S_IWRITE );
if( buffer_id < 0 ){
perror( "couldn't create shared memory segment" );
byee( -1 );
}
get_buffer();
if( debug )
fprintf( stderr, "pbuffer is 0x%08x, buffer_size is %d [%d x %d]\n",
(char *)pbuffer, buffer_size, blocks, blocksize );
bzero( (char *)pbuffer, buffer_size );
pbuffer->blocks_used_lock = -1;
pbuffer->blocks_free_lock = -1;
pbuffer->blocks_used_lock = new_sem();
/* Start it off locked - it is unlocked when a buffer gets filled in */
lock( pbuffer->blocks_used_lock );
pbuffer->blocks_free_lock = new_sem();
/* start this off so lock() can be called on it for each block
* till all the blocks are used up */
sem_set( pbuffer->blocks_free_lock, blocks - 1 );
/* Detattach the shared memory so the fork doesnt do anything odd */
shmdt( (char *)pbuffer );
pbuffer = NO_BUFFER;
}
buffer_remove()
{
static char removing = FALSE;
int i;
/* Avoid accidental recursion */
if( removing )
return;
removing = TRUE;
/* Buffer not yet created */
if( buffer_id == NONE )
return;
/* There should be a buffer so this must be after its detached it
* but before the fork picks it up */
if( pbuffer == NO_BUFFER )
get_buffer();
if( debug )
fprintf( stderr, "removing semaphores and buffer\n" );
remove_sem( pbuffer->blocks_used_lock );
remove_sem( pbuffer->blocks_free_lock );
if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 )
perror( "failed to remove shared memory buffer" );
}
get_buffer()
{
int b;
/* Grab the buffer space */
pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
if( pbuffer == NO_BUFFER ){
perror( "failed to attach shared memory" );
byee( -1 );
}
/* Setup the data space pointers */
for( b = 0; b < blocks; b++ )
pbuffer->block[ b ].data =
&pbuffer->data_space[ b * blocksize ];
}
void
start_reader_and_writer()
{
int status, deadpid;
fflush( stdout );
fflush( stderr );
if( (writer_pid = fork()) == -1 ){
perror( "unable to fork" );
byee( -1 );
}
else if( writer_pid == 0 ){
/* Never trust fork() to propogate signals - reset them */
set_handlers();
writer();
}
else {
reader();
/* Now wait for the writer to finish */
while( ((deadpid = wait( &status )) != writer_pid) &&
deadpid != -1 )
;
}
}
/* Read from stdin into the buffer */
reader()
{
if( debug )
fprintf( stderr, "Entering reader\n" );
get_buffer();
while( 1 ){
get_next_free_block();
if( ! fill_block() )
break;
}
if( debug )
fprintf( stderr, "Exiting reader\n" );
}
get_next_free_block()
{
/* Maybe wait till there is room in the buffer */
lock( pbuffer->blocks_free_lock );
curr_block = &pbuffer->block[ pbuffer->next_block_in ];
pbuffer->next_block_in = INC( pbuffer->next_block_in );
}
fill_block()
{
int bytes;
char *start;
int toread;
start = curr_block->data;
toread = blocksize;
/* Fill the block with input. This reblocks the input. */
while( toread != 0 && (bytes = read( 0, start, toread )) > 0 ){
start += bytes;
toread -= bytes;
}
if( bytes < 0 ){
perror( "failed to read input" );
byee( -1 );
}
/* number of bytes available. Zero will be taken as eof */
curr_block->bytes = blocksize - toread;
if( debug )
fprintf( stderr, "got %d bytes\n", curr_block->bytes );
unlock( pbuffer->blocks_used_lock );
return curr_block->bytes;
}
/* Write the buffer to stdout */
writer()
{
if( debug )
fprintf( stderr, "\tEntering writer\n" );
get_buffer();
while( 1 ){
get_next_filled_block();
if( !data_to_write() )
break;
write_block_to_stdout();
}
if( debug )
fprintf( stderr, "\tExiting writer\n" );
}
get_next_filled_block()
{
/* Hang till some data is available */
lock( pbuffer->blocks_used_lock );
curr_block = &pbuffer->block[ pbuffer->next_block_out ];
pbuffer->next_block_out = INC( pbuffer->next_block_out );
}
data_to_write()
{
return curr_block->bytes;
}
write_block_to_stdout()
{
static int out = 0;
static int next_k;
if( write( 1, curr_block->data, curr_block->bytes ) != curr_block->bytes ){
perror( "write of data failed" );
byee( -1 );
}
if( showevery ){
out += curr_block->bytes;
if( out > next_k ){
fprintf( stderr, "% 8dK\r", out / 1024 );
next_k += showevery;
}
}
unlock( pbuffer->blocks_free_lock );
}
void
byee( exit_val )
int exit_val;
{
/* Only the parent (reader) should zap the buffer */
if( writer_pid != 0 )
buffer_remove();
else if( showevery )
fprintf( stderr, "\n" );
exit( exit_val );
}
/* Given a string of <num>[<suff>] returns a num
* suff =
* m/M for 1meg
* k/K for 1k
* b/B for 512
*/
do_size( arg )
char *arg;
{
char format[ 20 ];
int ret;
*format = '\0';
sscanf( arg, "%d%s", &ret, format );
switch( *format ){
case 'm':
case 'M':
ret = ret K K;
break;
case 'k':
case 'K':
ret = ret K;
break;
case 'b':
case 'B':
ret *= 512;
break;
}
return ret;
}