home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Geek Gadgets 1
/
ADE-1.bin
/
ade-dist
/
kaffe-0.5p4-src.tgz
/
tar.out
/
contrib
/
kaffe
/
kaffevm
/
threadCalls.c
< prev
next >
Wrap
C/C++ Source or Header
|
1996-09-28
|
6KB
|
338 lines
/*
* threadCalls.c
* Support for threaded ops which may block (read, write, connect, etc.).
*
* Copyright (c) 1996 Systems Architecture Research Centre,
* City University, London, UK.
*
* See the file "license.terms" for information on usage and redistribution
* of this file, and for a DISCLAIMER OF ALL WARRANTIES.
*
* Written by Tim Wilkinson <tim@sarc.city.ac.uk>, February 1996.
*/
#define DBG(s)
#include "config.h"
#include <stdlib.h>
#include <sys/types.h>
#if defined(HAVE_UNISTD_H)
#include <unistd.h>
#endif
#include "lerrno.h"
#include <assert.h>
#if defined(HAVE_SYS_SOCKET_H)
#include <sys/socket.h>
#endif
#if defined(HAVE_SYS_TIME_H)
#include <sys/time.h>
#endif
#if defined(HAVE_WINSOCK_H)
#include <winsock.h>
#endif
#if defined(HAVE_SYS_IOCTL_H)
#include <sys/ioctl.h>
#endif
#if defined(HAVE_IO_H)
#include <io.h>
#endif
#if defined(HAVE_FCNTL_H)
#include <fcntl.h>
#endif
#include "object.h"
#include "thread.h"
#include "md.h"
/*
* We only need this stuff is we are using the internal thread system.
*/
#if defined(USE_INTERNAL_THREADS)
#if !defined(HAVE_MEMCPY)
void bcopy(void*, void*, size_t);
#define memcpy(d, s, n) bcopy ((s), (d), (n))
#endif
/* QUICK HACK!! */
#if defined(__WIN32__)
#define ioctl ioctlsocket
#endif
#define TH_READ 0
#define TH_WRITE 1
static int maxFd;
static fd_set readsPending;
static fd_set writesPending;
static thread* readQ[FD_SETSIZE];
static thread* writeQ[FD_SETSIZE];
static struct timeval tm = { 0, 0 };
static void blockOnFile(int, int);
void waitOnEvents(void);
extern thread* currentThread;
/* These are undefined because we do not yet support async I/O */
#undef F_SETOWN
#undef FIOSETOWN
#undef O_ASYNC
#undef FIOASYNC
/*
* QUICK HACK!!
* For the Amiga port we cannot thread writing. Neither can we use
* select to thread writes (which will block anyway).
*/
#if defined(__amigaos__)
#define NO_WRITE_THREADING
#endif
/*
* Create a threaded file descriptor.
*/
int
threadedFileDescriptor(int fd)
{
int r;
int on = 1;
int pid;
/* Make non-blocking */
#if defined(HAVE_FCNTL) && defined(O_NONBLOCK)
r = fcntl(fd, F_GETFL, 0);
r = fcntl(fd, F_SETFL, r|O_NONBLOCK);
#elif defined(HAVE_IOCTL) && defined(FIONBIO)
r = ioctl(fd, FIONBIO, &on);
#else
r = 0;
#endif
if (r < 0) {
return (r);
}
/* Allow socket to signal this process when new data is available */
pid = getpid();
#if defined(HAVE_FCNTL) && defined(F_SETOWN)
r = fcntl(fd, F_SETOWN, pid);
#elif defined(HAVE_IOCTL) && defined(FIOSETOWN)
r = ioctl(fd, FIOSETOWN, &pid);
#else
r = 0;
#endif
if (r < 0) {
return (r);
}
#if defined(HAVE_FCNTL) && defined(O_ASYNC)
r = fcntl(fd, F_GETFL, 0);
r = fcntl(fd, F_SETFL, r|O_ASYNC);
#elif defined(HAVE_IOCTL) && defined(FIOASYNC)
r = ioctl(fd, FIOASYNC, &on);
#else
r = 0;
#endif
if (r < 0) {
return (r);
}
return (fd);
}
/*
* Threaded create socket.
*/
int
threadedSocket(int af, int type, int proto)
{
int fd;
int r;
int on = 1;
int pid;
fd = socket(af, type, proto);
return (threadedFileDescriptor(fd));
}
/*
* Threaded file open.
*/
int
threadedOpen(char* path, int flags, int mode)
{
int fd;
int r;
int on = 1;
int pid;
fd = open(path, flags, mode);
return (threadedFileDescriptor(fd));
}
/*
* Threaded socket connect.
*/
int
threadedConnect(int fd, struct sockaddr* addr, int len)
{
int r;
r = connect(fd, addr, len);
if ((r < 0) && (errno == EINPROGRESS || errno == EALREADY || errno == EWOULDBLOCK)) {
blockOnFile(fd, TH_WRITE);
r = 0; /* Assume it's okay when we get released */
}
return (r);
}
/*
* Threaded socket accept.
*/
int
threadedAccept(int fd, struct sockaddr* addr, int* len)
{
int r;
int on = 1;
for (;;) {
r = accept(fd, addr, len);
if (r >= 0 || !(errno == EINPROGRESS || errno == EALREADY || errno == EWOULDBLOCK)) {
break;
}
blockOnFile(fd, TH_READ);
}
return (threadedFileDescriptor(r));
}
/*
* Read but only if we can.
*/
int
threadedRead(int fd, char* buf, int len)
{
int r;
for (;;) {
#if defined(NO_READ_THREADING)
blockOnFile(fd, TH_READ);
#endif
r = read(fd, buf, len);
if (r >= 0 || errno != EAGAIN) {
return (r);
}
#if !defined(NO_READ_THREADING)
blockOnFile(fd, TH_READ);
#endif
}
}
/*
* Write but only if we can.
*/
int
threadedWrite(int fd, char* buf, int len)
{
int r;
char* ptr;
ptr = buf;
do {
#if defined(NO_WRITE_THREADING)
blockOnFile(fd, TH_WRITE);
#endif
r = write(fd, ptr, len);
if (r < 0 && errno == EAGAIN) {
#if !defined(NO_WRITE_THREADING)
blockOnFile(fd, TH_WRITE);
#endif
r = 1;
continue;
}
ptr += r;
len -= r;
} while (len > 0 && r > 0);
return (ptr - buf);
}
/*
* An attempt to access a file would block, so suspend the thread until
* it will happen.
*/
static
void
blockOnFile(int fd, int op)
{
DBG( printf("blockOnFile()\n"); )
intsDisable();
if (fd > maxFd) {
maxFd = fd;
}
if (op == TH_READ) {
FD_SET(fd, &readsPending);
suspendOnQThread(currentThread, &readQ[fd]);
FD_CLR(fd, &readsPending);
}
else {
FD_SET(fd, &writesPending);
suspendOnQThread(currentThread, &writeQ[fd]);
FD_CLR(fd, &writesPending);
}
intsRestore();
}
/*
* Check if some file descriptor or other event to become ready.
* Block if required.
*/
void
checkEvents(bool block)
{
int r;
fd_set rd;
fd_set wr;
thread* tid;
thread* ntid;
int i;
DBG( printf("checkEvents block:%d\n", block); )
intsDisable();
#if defined(FD_COPY)
FD_COPY(&readsPending, &rd);
FD_COPY(&writesPending, &wr);
#else
memcpy(&rd, &readsPending, sizeof(rd));
memcpy(&wr, &writesPending, sizeof(wr));
#endif
r = select(maxFd+1, &rd, &wr, 0, (block ? 0 : &tm));
DBG( printf("Select returns %d\n", r); )
for (i = 0; r > 0 && i <= maxFd; i++) {
if (readQ[i] != 0 && FD_ISSET(i, &rd)) {
for (tid = readQ[i]; tid != 0; tid = ntid) {
ntid = tid->next;
resumeThread(tid);
}
readQ[i] = 0;
r--;
}
if (writeQ[i] != 0 && FD_ISSET(i, &wr)) {
for (tid = writeQ[i]; tid != 0; tid = ntid) {
ntid = tid->next;
resumeThread(tid);
}
writeQ[i] = 0;
r--;
}
}
intsRestore();
}
#endif