home *** CD-ROM | disk | FTP | other *** search
/ Source Code 1992 March / Source_Code_CD-ROM_Walnut_Creek_March_1992.iso / usenet / altsrcs / 1 / 1607 / buffer.c next >
C/C++ Source or Header  |  1990-12-28  |  10KB  |  491 lines

  1. /* This is a reblocking process, designed to try and read from stdin
  2.  * and write to stdout - but to always try and keep the writing side
  3.  * busy.  It is meant to try and stream tape writes.
  4.  *
  5.  * This program runs in two parts.  The reader and the writer.  They
  6.  * communicate using shared memory with semaphores locking the access.
  7.  * The shared memory implements a circular list of blocks of data.
  8.  *
  9.  * L.McLoughlin, Imperial College, 1990
  10.  *
  11.  * $Log:    buffer.c,v $
  12.  * Revision 1.3  90/05/15  23:27:46  lmjm
  13.  * Added -S option (show how much has been writen).
  14.  * Added -m option to specify how much shared memory to grab.
  15.  * Now tries to fill this with blocks.
  16.  * reader waits for writer to terminate and then frees the shared mem and sems.
  17.  * 
  18.  * Revision 1.2  90/01/20  21:37:59  lmjm
  19.  * Reset default number of  blocks and blocksize for best thruput of
  20.  * standard tar 10K blocks.
  21.  * Allow number of blocks to be changed.
  22.  * Don't need a hole in the circular queue since the semaphores prevent block
  23.  * clash.
  24.  * 
  25.  * Revision 1.1  90/01/17  11:30:23  lmjm
  26.  * Initial revision
  27.  * 
  28.  */
  29. #include <stdio.h>
  30. #include <sys/types.h>
  31. #include <sys/stat.h>
  32. #include <sys/ipc.h>
  33. #include <sys/shm.h>
  34. #include <sys/sem.h>
  35.  
  36. #ifndef lint
  37. static char *rcsid = "$Header: /home/gould/staff/csg/lmjm/src/buffer/RCS/buffer.c,v 1.3 90/05/15 23:27:46 lmjm Exp Locker: lmjm $";
  38. #endif
  39.  
  40. extern char *shmat();
  41.  
  42. /* General macros */
  43. #define TRUE 1
  44. #define FALSE 0
  45. #define K *1024
  46.  
  47. /* Some forward declarations */
  48. void byee();
  49. void start_reader_and_writer();
  50.  
  51. /* When showing print a note every this many bytes writen */
  52. int showevery = 0;
  53. #define PRINT_EVERY 10 K
  54.  
  55. /* This is the inter-process buffer - it implements a circular list
  56.  * of blocks. */
  57.  
  58. #define DEF_BLOCKSIZE (10 K)
  59. #define MAX_BLOCKSIZE (64 K)
  60. int blocksize = DEF_BLOCKSIZE;
  61.  
  62. /* Numbers of blocks in the queue. 
  63.  */
  64. #define MAX_BLOCKS 2048
  65. int blocks = 1;
  66. /* Circular increment of a buffer index */
  67. #define INC(i) (((i)+1) == blocks ? 0 : ((i)+1))
  68.  
  69. /* Max ammount of shared memory you can allocate - can't see a way to look
  70.  * this up.
  71.  */
  72. #define DEF_SHMEM (1 K K)
  73. int max_shmem = DEF_SHMEM;
  74.  
  75. /* Just a flag to show unfilled */
  76. #define NONE (-1)
  77.  
  78. /* the shared memory id of the buffer */
  79. int buffer_id = NONE;
  80. struct block {
  81.     int bytes;
  82.     char *data;
  83. } *curr_block;
  84.  
  85. #define NO_BUFFER ((struct buffer *)-1)
  86. struct buffer {
  87.     /* writer will hang trying to lock this till reader fills in a block */
  88.     int blocks_used_lock;
  89.     /* reader will hang trying to lock this till writer empties a block */
  90.     int blocks_free_lock;
  91.  
  92.     int next_block_in;
  93.     int next_block_out;
  94.  
  95.     struct block block[ MAX_BLOCKS ];
  96.  
  97.     /* These actual space for the blocks is here - the array extends
  98.      * pass 1 */
  99.     char data_space[ 1 ];
  100. } *pbuffer = NO_BUFFER;
  101. int buffer_size;
  102.  
  103. int writer_pid = 0;
  104. int debug = 0;
  105. char *progname = "buffer";
  106.  
  107. main( argc, argv )
  108.     int argc;
  109.     char **argv;
  110. {
  111.     parse_args( argc, argv );
  112.  
  113.     set_handlers();
  114.  
  115.     buffer_allocate();
  116.  
  117.     start_reader_and_writer();
  118.  
  119.     byee( 0 );
  120. }
  121.  
  122. parse_args( argc, argv )
  123.     int argc;
  124.     char **argv;
  125. {
  126.     int c;
  127.     extern char *optarg;
  128.     extern int optind;
  129.     char blocks_given = FALSE;
  130.  
  131.     while( (c = getopt( argc, argv, "S:dm:s:b:" )) != -1 ){
  132.         switch( c ){
  133.         case 'S':
  134.             /* Show every once in a while how much is printed */
  135.             showevery = do_size( optarg );
  136.             if( showevery <= 0 )
  137.                 showevery = PRINT_EVERY;
  138.             break;
  139.         case 'd':    /* debug */
  140.             debug = 1;
  141.             setbuf( stdout, NULL );
  142.             setbuf( stderr, NULL );
  143.             fprintf( stderr, "debugging turned on\n" );
  144.             break;
  145.         case 'm':
  146.             /* Max size of shared memory lump */
  147.             max_shmem = do_size( optarg );
  148.  
  149.             if( max_shmem < (sizeof( struct buffer ) + (blocksize * blocks)) ){
  150.                 fprintf( stderr, "max_shmem %d too low\n", max_shmem );
  151.                 byee( -1 );
  152.             }
  153.             break;
  154.         case 'b':
  155.             /* Number of blocks */
  156.             blocks_given = TRUE;
  157.             blocks = atoi( optarg );
  158.             if( (blocks <= 0) || (MAX_BLOCKS < blocks) ){
  159.                 fprintf( stderr, "blocks %d out of range\n", blocks );
  160.                 byee( -1 );
  161.             }
  162.             break;
  163.         case 's':    /* Size of a block */
  164.             blocksize = do_size( optarg );
  165.  
  166.             if( (blocksize <= 0) || (MAX_BLOCKSIZE < blocksize) ){
  167.                 fprintf( stderr, "blocksize %d out of range\n", blocksize );
  168.                 byee( -1 );
  169.             }
  170.             break;
  171.         default:
  172.             fprintf( stderr, "Usage: %s [-S size] [-m memsize] [-b blocks] [-s blocksize]\n",
  173.                 progname );
  174.             fprintf( stderr, "-S = show ammount writen every size bytes\n" );
  175.             fprintf( stderr, "-m = size of shared mem chunk to grab\n" );
  176.             fprintf( stderr, "-b = number of blocks in queue\n" );
  177.             fprintf( stderr, "-s = size of a block\n" );
  178.             byee( -1 );
  179.         }
  180.     }
  181.  
  182.     /* If -b was not given try and work out the max buffer size */
  183.     if( !blocks_given ){
  184.         blocks = (max_shmem - sizeof( struct buffer )) / blocksize;
  185.         if( blocks <= 0 ){
  186.             fprintf( stderr, "Cannot handle blocks that big, aborting!\n" );
  187.             byee( -1 );
  188.         }
  189.         if( MAX_BLOCKS < blocks  ){
  190.             fprintf( stderr, "Cannot handle that many blocks, aborting!\n" );
  191.             byee( -1 );
  192.         }
  193.     }
  194. }
  195.  
  196. /* The interrupt handler */
  197. shutdown()
  198. {
  199.     byee( -1 );
  200. }
  201.  
  202. set_handlers()
  203. {
  204.     signal( SIGHUP, shutdown );
  205.     signal( SIGINT, shutdown );
  206.     signal( SIGQUIT, shutdown );
  207.     signal( SIGTERM, shutdown );
  208.  
  209.     if( writer_pid ){
  210.         /* This is the reader - propogate the signal to the writer */
  211.         kill( writer_pid, SIGTERM );
  212.     }
  213. }
  214.  
  215. buffer_allocate()
  216. {
  217.     int i;
  218.  
  219.     /* Allow for the data space */
  220.     buffer_size = sizeof( struct buffer ) +
  221.         ((blocks * blocksize) - sizeof( char ));
  222.  
  223.     /* Create the space for the buffer */
  224.     buffer_id = shmget( IPC_PRIVATE,
  225.                buffer_size,
  226.                IPC_CREAT|S_IREAD|S_IWRITE );
  227.     if( buffer_id < 0 ){
  228.         perror( "couldn't create shared memory segment" );
  229.         byee( -1 );
  230.     }
  231.  
  232.     get_buffer();
  233.  
  234.     if( debug )
  235.         fprintf( stderr, "pbuffer is 0x%08x, buffer_size is %d [%d x %d]\n",
  236.             (char *)pbuffer, buffer_size, blocks, blocksize );
  237.  
  238.     bzero( (char *)pbuffer, buffer_size );
  239.     pbuffer->blocks_used_lock = -1;
  240.     pbuffer->blocks_free_lock = -1;
  241.  
  242.     pbuffer->blocks_used_lock = new_sem();
  243.     /* Start it off locked - it is unlocked when a buffer gets filled in */
  244.     lock( pbuffer->blocks_used_lock );
  245.  
  246.     pbuffer->blocks_free_lock = new_sem();
  247.     /* start this off so lock() can be called on it for each block
  248.      * till all the blocks are used up */
  249.     sem_set( pbuffer->blocks_free_lock, blocks - 1 );
  250.  
  251.     /* Detattach the shared memory so the fork doesnt do anything odd */
  252.     shmdt( (char *)pbuffer );
  253.     pbuffer = NO_BUFFER;
  254. }
  255.  
  256. buffer_remove()
  257. {
  258.     static char removing = FALSE;
  259.     int i;
  260.  
  261.     /* Avoid accidental recursion */
  262.     if( removing )
  263.         return;
  264.     removing = TRUE;
  265.  
  266.     /* Buffer not yet created */
  267.     if( buffer_id == NONE )
  268.         return;
  269.  
  270.     /* There should be a buffer so this must be after its detached it
  271.      * but before the fork picks it up */
  272.     if( pbuffer == NO_BUFFER )
  273.         get_buffer();
  274.  
  275.     if( debug )
  276.         fprintf( stderr, "removing semaphores and buffer\n" );
  277.     remove_sem( pbuffer->blocks_used_lock );
  278.     remove_sem( pbuffer->blocks_free_lock );
  279.     
  280.     if( shmctl( buffer_id, IPC_RMID, (struct shmid_ds *)0 ) == -1 )
  281.         perror( "failed to remove shared memory buffer" );
  282. }
  283.  
  284. get_buffer()
  285. {
  286.     int b;
  287.  
  288.     /* Grab the buffer space */
  289.     pbuffer = (struct buffer *)shmat( buffer_id, (char *)0, 0 );
  290.     if( pbuffer == NO_BUFFER ){
  291.         perror( "failed to attach shared memory" );
  292.         byee( -1 );
  293.     }
  294.  
  295.     /* Setup the data space pointers */
  296.     for( b = 0; b < blocks; b++ )
  297.         pbuffer->block[ b ].data =
  298.             &pbuffer->data_space[ b * blocksize ];
  299.  
  300. }
  301.  
  302. void
  303. start_reader_and_writer()
  304. {
  305.     int status, deadpid;
  306.  
  307.     fflush( stdout );
  308.     fflush( stderr );
  309.  
  310.     if( (writer_pid = fork()) == -1 ){
  311.         perror( "unable to fork" );
  312.         byee( -1 );
  313.     }
  314.     else if( writer_pid == 0 ){
  315.         /* Never trust fork() to propogate signals - reset them */
  316.         set_handlers();
  317.  
  318.         writer();
  319.     }
  320.     else {
  321.         reader();
  322.  
  323.         /* Now wait for the writer to finish */
  324.         while( ((deadpid = wait( &status )) != writer_pid) &&
  325.             deadpid != -1 )
  326.             ;
  327.     }
  328. }
  329.  
  330. /* Read from stdin into the buffer */
  331. reader()
  332. {
  333.     if( debug )
  334.         fprintf( stderr, "Entering reader\n" );
  335.  
  336.     get_buffer();
  337.  
  338.     while( 1 ){
  339.         get_next_free_block();
  340.         if( ! fill_block() )
  341.             break;
  342.     }
  343.  
  344.     if( debug )
  345.         fprintf( stderr, "Exiting reader\n" );
  346. }
  347.  
  348. get_next_free_block()
  349. {
  350.     /* Maybe wait till there is room in the buffer */
  351.     lock( pbuffer->blocks_free_lock );
  352.  
  353.     curr_block = &pbuffer->block[ pbuffer->next_block_in ];
  354.  
  355.     pbuffer->next_block_in = INC( pbuffer->next_block_in );
  356. }
  357.  
  358. fill_block()
  359. {
  360.     int bytes;
  361.     char *start;
  362.     int toread;
  363.  
  364.     start = curr_block->data;
  365.     toread = blocksize;
  366.     
  367.     /* Fill the block with input.  This reblocks the input. */
  368.     while( toread != 0 && (bytes = read( 0, start, toread )) > 0 ){
  369.         start += bytes;
  370.         toread -= bytes;
  371.     }
  372.  
  373.     if( bytes < 0 ){
  374.         perror( "failed to read input" );
  375.         byee( -1 );
  376.     }
  377.  
  378.     /* number of bytes available. Zero will be taken as eof */
  379.     curr_block->bytes = blocksize - toread;
  380.  
  381.     if( debug )
  382.         fprintf( stderr, "got %d bytes\n", curr_block->bytes );
  383.  
  384.     unlock( pbuffer->blocks_used_lock );
  385.  
  386.     return curr_block->bytes;
  387. }
  388.  
  389.  
  390. /* Write the buffer to stdout */
  391. writer()
  392. {
  393.     if( debug )
  394.         fprintf( stderr, "\tEntering writer\n" );
  395.  
  396.     get_buffer();
  397.  
  398.     while( 1 ){
  399.         get_next_filled_block();
  400.         if( !data_to_write() )
  401.             break;
  402.         write_block_to_stdout();
  403.     }
  404.  
  405.     if( debug )
  406.         fprintf( stderr, "\tExiting writer\n" );
  407. }
  408.  
  409. get_next_filled_block()
  410. {
  411.     /* Hang till some data is available */
  412.     lock( pbuffer->blocks_used_lock );
  413.  
  414.     curr_block = &pbuffer->block[ pbuffer->next_block_out ];
  415.  
  416.     pbuffer->next_block_out = INC( pbuffer->next_block_out );
  417. }
  418.  
  419. data_to_write()
  420. {
  421.     return curr_block->bytes;
  422. }
  423.  
  424. write_block_to_stdout()
  425. {
  426.     static int out = 0;
  427.     static int next_k;
  428.  
  429.     if( write( 1, curr_block->data, curr_block->bytes ) != curr_block->bytes ){
  430.         perror( "write of data failed" );
  431.         byee( -1 );
  432.     }
  433.  
  434.     if( showevery ){
  435.         out += curr_block->bytes;
  436.         if( out > next_k ){
  437.             fprintf( stderr, "% 8dK\r", out / 1024 );
  438.             next_k += showevery;
  439.         }
  440.     }
  441.  
  442.     unlock( pbuffer->blocks_free_lock );
  443. }
  444.  
  445.  
  446. void
  447. byee( exit_val )
  448.     int exit_val;
  449. {
  450.     /* Only the parent (reader) should zap the buffer */
  451.     if( writer_pid != 0 )
  452.         buffer_remove();
  453.     else if( showevery )
  454.         fprintf( stderr, "\n" );
  455.  
  456.     exit( exit_val );
  457. }
  458.  
  459. /* Given a string of <num>[<suff>] returns a num
  460.  * suff =
  461.  *   m/M for 1meg
  462.  *   k/K for 1k
  463.  *   b/B for 512
  464.  */
  465. do_size( arg )
  466.     char *arg;
  467. {
  468.     char format[ 20 ];
  469.     int ret;
  470.  
  471.     *format = '\0';
  472.     sscanf( arg, "%d%s", &ret, format );
  473.  
  474.     switch( *format ){
  475.     case 'm':
  476.     case 'M':
  477.         ret = ret K K;
  478.         break;
  479.     case 'k':
  480.     case 'K':
  481.         ret = ret K;
  482.         break;
  483.     case 'b':
  484.     case 'B':
  485.         ret *= 512;
  486.         break;
  487.     }
  488.     
  489.     return ret;
  490. }
  491.