home *** CD-ROM | disk | FTP | other *** search
-
- /*
- * FIFO.C
- */
-
- #include "defs.h"
-
- void ReCalcReaderIdx(Fifo *);
- void SignalEof(Fifo *);
- void FixFiFlags(Fifo *);
- LibCall void RequestFifo(FHan *, Message *, long);
- void SetEOF(Fifo *);
- long AvailWBufSpace(Fifo *);
-
- __stkargs long BitTestSet(char *, short);
-
- /*
- * Open up a new fifo by name.
- */
-
- LibCall FHan *
- OpenFifo(name, bytes, flags)
- char *name;
- long bytes;
- long flags;
- {
- Fifo *fifo;
- FHan *fhan;
-
- { /* bytes a power of 2? */
- unsigned long i = 8;
- while (i) {
- if (bytes == i)
- break;
- i = i << 1;
- }
- if (i == 0)
- return(NULL);
- }
- if (fhan = AllocMem(sizeof(FHan), MEMF_PUBLIC | MEMF_CLEAR)) {
- Forbid();
- if (fifo = (Fifo *)FindName((MaxList *)&FifoList, name)) {
- ;
- } else {
- if (fifo = AllocMem(sizeof(Fifo) - sizeof(fifo->fi_Buf) + bytes + strlen(name) + 1, MEMF_PUBLIC | MEMF_CLEAR)) {
- AddTail((MaxList *)&FifoList, &fifo->fi_Node);
- NewList((MaxList *)&fifo->fi_HanList);
- NewList((MaxList *)&fifo->fi_WrNotify);
- NewList((MaxList *)&fifo->fi_RdNotify);
- fifo->fi_BufSize = bytes;
- fifo->fi_BufMask = bytes - 1;
- fifo->fi_Node.ln_Name = (char *)fifo + (sizeof(Fifo) - sizeof(fifo->fi_Buf)) + bytes;
- strcpy(fifo->fi_Node.ln_Name, name);
- InitSemaphore(&fifo->fi_SigSem);
- }
- }
- if (fifo) {
- AddTail((MaxList *)&fifo->fi_HanList, (MaxNode *)&fhan->fh_Node);
- fhan->fh_Fifo = fifo;
- fhan->fh_Flags = flags;
-
- fhan->fh_Msg.mn_ReplyPort = &fhan->fh_Port;
-
- fhan->fh_Port.mp_Node.ln_Type = NT_MSGPORT;
- fhan->fh_Port.mp_Flags = PA_SIGNAL;
- fhan->fh_Port.mp_SigBit = SIGB_SINGLE;
-
- NewList(&fhan->fh_Port.mp_MsgList);
-
- if (flags & FIFOF_READ) {
- ++fifo->fi_RRefs;
- fhan->fh_RIdx = fifo->fi_RIdx;
- }
- if (flags & FIFOF_WRITE) {
- ++fifo->fi_WRefs;
- }
- /*
- if (flags & FIFOF_CLOSEOF)
- fifo->fi_Flags |= FIFOF_CLOSEOF;
- */
- if (flags & FIFOF_KEEPIFD)
- fifo->fi_Flags |= FIFOF_KEEPIFD;
- if (flags & FIFOF_RREQUIRED)
- fifo->fi_Flags |= FIFOF_RREQUIRED;
- ++fifo->fi_ORefs;
- FixFiFlags(fifo);
- } else {
- FreeMem(fhan, sizeof(FHan));
- fhan = NULL;
- }
- Permit();
- }
- return(fhan);
- }
-
- /*
- * Close a previously openned fifo
- */
-
- LibCall void
- CloseFifo(fhan, flags)
- FHan *fhan;
- long flags;
- {
- Fifo *fifo;
-
- Forbid();
- if (fifo = fhan->fh_Fifo) {
- fhan->fh_Fifo = NULL; /* try to catch duplicate closes */
- Remove((MaxNode *)&fhan->fh_Node);
-
- --fifo->fi_ORefs;
-
- if (fhan->fh_Flags & FIFOF_WRITE) {
- if (flags & FIFOF_EOF)
- fifo->fi_Flags |= FIFOF_CLOSEOF;
- if (--fifo->fi_WRefs == 0) {
- if (fifo->fi_Flags & FIFOF_CLOSEOF) {
- SetEOF(fifo);
- }
- }
- }
- if (fhan->fh_Flags & FIFOF_READ) {
- --fifo->fi_RRefs;
-
- if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
- Message *msg;
- while (msg = RemHead((MaxList *)&fifo->fi_WrNotify))
- ReplyMsg(msg);
- }
- }
-
- if (fifo->fi_ORefs == 0) {
- if ((fifo->fi_Flags & FIFOF_KEEPIFD) == 0 || fifo->fi_RIdx == fifo->fi_WIdx) {
- Remove(&fifo->fi_Node);
- FreeMem(fifo, sizeof(Fifo) - sizeof(fifo->fi_Buf) + fifo->fi_BufSize + strlen(fifo->fi_Node.ln_Name) + 1);
- }
- } else {
- if (fhan->fh_Flags & FIFOF_WRITE)
- SignalEof(fifo);
- if (fhan->fh_Flags & FIFOF_READ)
- ReCalcReaderIdx(fifo);
- FixFiFlags(fifo);
- }
- FreeMem(fhan, sizeof(FHan));
- }
- Permit();
- }
-
- /*
- * Read from a fifo. Block if necessary (and not FIFOF_NBIO)
- */
-
- LibCall long
- ReadFifo(fhan, pbuf, skip)
- FHan *fhan;
- char **pbuf;
- long skip;
- {
- long n;
- Fifo *fifo = fhan->fh_Fifo;
-
- /*
- * attempt to clear <skip> bytes
- */
-
- while (skip > 0) {
- long widx = fifo->fi_WIdx; /* snapshot widx */
- long ridx = fhan->fh_RIdx;
- long len;
-
- if (ridx <= widx) {
- len = widx - ridx;
- } else {
- len = fifo->fi_BufSize - ridx;
- }
- if (len == 0)
- break;
- if (len > skip)
- len = skip;
- n += len;
- skip -= len;
-
- Forbid();
- fhan->fh_RIdx = (ridx + len) & fifo->fi_BufMask;
- if (ridx == fifo->fi_RIdx)
- ReCalcReaderIdx(fifo); /* update mast-idx/writer-waiters */
- Permit();
- }
-
- /*
- * return available data
- */
-
- for (;;) {
- long widx = fifo->fi_WIdx; /* snapshot widx */
- long ridx = fhan->fh_RIdx;
-
- if (ridx <= widx) {
- n = widx - ridx;
- } else {
- n = fifo->fi_BufSize - ridx;
- }
- *pbuf = fifo->fi_Buf + ridx;
- if (n == 0) {
- /*
- * EOF on a per-handle basis since it gets cleared after EOF
- * is returned.
- */
-
- if ((fhan->fh_Flags & FIFOF_EOF) || (fifo->fi_Flags & FIFOF_EOF)) {
- /*fhan->fh_Flags &= ~FIFOF_EOF;*/
- n = -1;
- break;
- }
- if ((fhan->fh_Flags & FIFOF_NBIO) == 0) {
- fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
- RequestFifo(fhan, &fhan->fh_Msg, FREQ_RPEND);
- WaitPort(&fhan->fh_Port);
- Remove((MaxNode *)&fhan->fh_Msg);
- continue;
- }
- }
- break;
- }
- return(n);
- }
-
- /*
- * Write to a fifo
- */
-
- LibCall long
- WriteFifo(fhan, buf, bytes)
- FHan *fhan;
- char *buf;
- long bytes;
- {
- long n = -1;
- short wsigchk = 0;
- short normal = 0;
- Fifo *fifo = fhan->fh_Fifo;
-
- if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED))
- return(-1);
-
- if (bytes < 0 || bytes > (fifo->fi_BufSize >> 1))
- return(-2);
-
- Forbid();
-
- /*
- * A normal FIFO uses fi_SigSem
- * A non-normal FIFO cannot afford to block and uses fi_Lock
- */
-
- if (fifo->fi_Flags & FIFOF_WRNORM)
- normal = 1;
-
- if (normal) {
- ObtainSemaphore(&fifo->fi_SigSem);
- } else if (BitTestSet(&fifo->fi_Lock, 0) != 0) {
- Permit();
- return(n);
- }
-
- {
- n = 0;
-
- for (;;) {
- while (bytes) {
- long ridx = fifo->fi_RIdx; /* snapshot ridx */
- long len;
-
- if (fifo->fi_WIdx < ridx) {
- len = ridx - fifo->fi_WIdx;
- if (len <= bytes) /* FIFO FULL */
- break;
- } else {
- len = fifo->fi_BufSize - fifo->fi_WIdx;
- if (len + ridx <= bytes) /* FIFO FULL */
- break;
- }
- if (len > bytes)
- len = bytes;
- movmem(buf, fifo->fi_Buf + fifo->fi_WIdx, len);
- buf += len;
- n += len;
- bytes -= len;
- fifo->fi_WIdx = (fifo->fi_WIdx + len) & fifo->fi_BufMask;
- if (fhan->fh_Flags & FIFOF_NORMAL)
- wsigchk = 1;
- }
-
- /*
- * if fifo is full and NBIO not set, then block and loop
- */
- if (bytes && !(fhan->fh_Flags & FIFOF_NBIO)) {
- fhan->fh_Port.mp_SigTask = SysBase->ThisTask;
- RequestFifo(fhan, &fhan->fh_Msg, FREQ_WAVAIL);
- WaitPort(&fhan->fh_Port);
- Remove((MaxNode *)&fhan->fh_Msg);
- continue;
- }
- break;
- }
-
- /*
- * check for any blocked readers, data is probably now available
- * so wake them up.
- */
-
- if (wsigchk && fifo->fi_RdNotify.mlh_Head->mln_Succ) {
- Message *msg;
-
- while (msg = RemHead((MaxList *)&fifo->fi_RdNotify))
- ReplyMsg(msg);
- }
-
- if (normal)
- ReleaseSemaphore(&fifo->fi_SigSem);
- else
- fifo->fi_Lock = 0;
- }
- Permit();
- return(n);
- }
-
- LibCall long
- BufSizeFifo(fhan)
- FHan *fhan;
- {
- return(fhan->fh_Fifo->fi_BufSize);
- }
-
-
- /*
- * Calculate distance between fh->fh_RIdx and fifo->fi_RIdx, shortest
- * wins. If none found then nothing changes due to initial best set
- * to exactly the buffer size.
- *
- * If the master index is modified and writers are waiting, signal them.
- *
- * Called while Forbid() or otherwise reader-lockedout
- */
-
- void
- ReCalcReaderIdx(fifo)
- Fifo *fifo;
- {
- FHan *fh;
- long bestLen = fifo->fi_BufSize;
- long ridx;
-
- for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ) {
- if (fh->fh_Flags & FIFOF_READ) {
- long len = (fh->fh_RIdx - fifo->fi_RIdx) & fifo->fi_BufMask;
- if (len < bestLen)
- bestLen = len;
- }
- }
-
- ridx = (fifo->fi_RIdx + bestLen) & fifo->fi_BufMask;
-
- /*
- * more buffer space available, wakeup writers?
- */
-
- if (ridx != fifo->fi_RIdx) {
- fifo->fi_RIdx = ridx;
-
- if (fifo->fi_WrNotify.mlh_Head->mln_Succ) {
- Message *msg;
-
- if (AvailWBufSpace(fifo) >= (fifo->fi_BufSize >> 1)) {
- while (msg = RemHead((MaxList *)&fifo->fi_WrNotify))
- ReplyMsg(msg);
- }
- }
- }
- }
-
- /*
- * signal EOF to any blocked readers
- */
-
- void
- SignalEof(fifo)
- Fifo *fifo;
- {
- FHan *fh;
-
- if (fifo->fi_Flags & FIFOF_EOF) {
- Message *msg;
-
- Forbid();
- while (msg = RemHead((MaxList *)&fifo->fi_RdNotify))
- ReplyMsg(msg);
- Permit();
- }
- }
-
- /*
- * SetEOF()
- */
-
- void
- SetEOF(fifo)
- Fifo *fifo;
- {
- FHan *fh;
-
- for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ)
- fh->fh_Flags |= FIFOF_EOF;
- fifo->fi_Flags |= FIFOF_EOF;
- }
-
- /*
- * FIXME, if state change occurs in master fifo, must unblock anybody
- * blocked due to previous information. XXX
- */
-
- void
- FixFiFlags(fifo)
- Fifo *fifo;
- {
- FHan *fh;
- long rflags = FIFOF_RDNORM | FIFOF_WRNORM;
-
- fifo->fi_Flags &= ~rflags;
- for (fh = fifo->fi_HanList.mlh_Head; fh->fh_Node.mln_Succ; fh = fh->fh_Node.mln_Succ) {
- if ((fh->fh_Flags & FIFOF_NORMAL) == 0) {
- if (fh->fh_Flags & FIFOF_READ)
- rflags &= ~FIFOF_RDNORM;
- if (fh->fh_Flags & FIFOF_WRITE)
- rflags &= ~FIFOF_WRNORM;
- }
- }
- fifo->fi_Flags |= rflags;
- }
-
- /*
- * request message on event. Returns message immediately if event already
- * satisfied.
- */
-
- LibCall void
- RequestFifo(fhan, msg, req)
- FHan *fhan;
- Message *msg;
- long req;
- {
- Fifo *fifo = fhan->fh_Fifo;
-
- Forbid();
-
- switch(req) {
- case FREQ_RPEND:
- if ((fhan->fh_Flags & FIFOF_EOF) || fhan->fh_RIdx != fifo->fi_WIdx) {
- ReplyMsg(msg);
- } else {
- msg->mn_Node.ln_Type = NT_MESSAGE;
- AddTail((MaxList *)&fifo->fi_RdNotify, &msg->mn_Node);
- }
- break;
- case FREQ_WAVAIL:
- /*
- * determine available buffer space, alert if more than 1/2 empty.
- *
- * check for broken pipe
- */
-
- if (fifo->fi_RRefs == 0 && (fifo->fi_Flags & FIFOF_RREQUIRED)) {
- ReplyMsg(msg);
- } else if (AvailWBufSpace(fifo) >= (fifo->fi_BufSize >> 1)) {
- ReplyMsg(msg);
- } else {
- msg->mn_Node.ln_Type = NT_MESSAGE;
- AddTail((MaxList *)&fifo->fi_WrNotify, &msg->mn_Node);
- }
- break;
- case FREQ_ABORT:
- if (msg->mn_Node.ln_Type == NT_MESSAGE) { /* if not returned */
- Remove(&msg->mn_Node); /* return it */
- ReplyMsg(msg);
- }
- break;
- }
- Permit();
- }
-
- long
- AvailWBufSpace(fifo)
- Fifo *fifo;
- {
- long ridx = fifo->fi_RIdx;
- long len;
-
- if (fifo->fi_WIdx < ridx)
- len = ridx - fifo->fi_WIdx;
- else
- len = fifo->fi_BufSize - fifo->fi_WIdx + ridx - 1;
- return(len);
- }
-