Use a writer thread under W32 (arggg)
This commit is contained in:
parent
72af6e5349
commit
be28316c35
@ -1,3 +1,12 @@
|
|||||||
|
2001-02-28 Werner Koch <wk@gnupg.org>
|
||||||
|
|
||||||
|
* w32-io.c (destroy_reader): Set sop_me flag.
|
||||||
|
(writer,create_writer,destroy_writer,find_writer,kill_writer): New.
|
||||||
|
(_gpgme_io_write): Use a writer thread to avaoid blocking.
|
||||||
|
(_gpgme_io_close): Cleanup a writer thread
|
||||||
|
(_gpgme_io_select): Repalce tthe faked wait on writing by a real
|
||||||
|
waiting which is now possible due to the use of a writer thread.
|
||||||
|
|
||||||
2001-02-20 Werner Koch <wk@gnupg.org>
|
2001-02-20 Werner Koch <wk@gnupg.org>
|
||||||
|
|
||||||
* w32-io.c (destroy_reader,kill_reader): New.
|
* w32-io.c (destroy_reader,kill_reader): New.
|
||||||
|
294
gpgme/w32-io.c
294
gpgme/w32-io.c
@ -51,6 +51,9 @@
|
|||||||
#define handle_to_pid(a) ((int)(a))
|
#define handle_to_pid(a) ((int)(a))
|
||||||
|
|
||||||
#define READBUF_SIZE 4096
|
#define READBUF_SIZE 4096
|
||||||
|
#define WRITEBUF_SIZE 4096
|
||||||
|
#define MAX_READERS 20
|
||||||
|
#define MAX_WRITERS 20
|
||||||
|
|
||||||
static struct {
|
static struct {
|
||||||
int inuse;
|
int inuse;
|
||||||
@ -80,7 +83,6 @@ struct reader_context_s {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
#define MAX_READERS 20
|
|
||||||
static struct {
|
static struct {
|
||||||
volatile int used;
|
volatile int used;
|
||||||
int fd;
|
int fd;
|
||||||
@ -89,6 +91,34 @@ static struct {
|
|||||||
static int reader_table_size= MAX_READERS;
|
static int reader_table_size= MAX_READERS;
|
||||||
DEFINE_STATIC_LOCK (reader_table_lock);
|
DEFINE_STATIC_LOCK (reader_table_lock);
|
||||||
|
|
||||||
|
|
||||||
|
struct writer_context_s {
|
||||||
|
HANDLE file_hd;
|
||||||
|
HANDLE thread_hd;
|
||||||
|
DECLARE_LOCK (mutex);
|
||||||
|
|
||||||
|
int stop_me;
|
||||||
|
int error;
|
||||||
|
int error_code;
|
||||||
|
|
||||||
|
HANDLE have_data; /* manually reset */
|
||||||
|
HANDLE is_empty;
|
||||||
|
HANDLE stopped;
|
||||||
|
size_t nbytes;
|
||||||
|
char buffer[WRITEBUF_SIZE];
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static struct {
|
||||||
|
volatile int used;
|
||||||
|
int fd;
|
||||||
|
struct writer_context_s *context;
|
||||||
|
} writer_table[MAX_WRITERS];
|
||||||
|
static int writer_table_size= MAX_WRITERS;
|
||||||
|
DEFINE_STATIC_LOCK (writer_table_lock);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
static HANDLE
|
static HANDLE
|
||||||
set_synchronize (HANDLE h)
|
set_synchronize (HANDLE h)
|
||||||
{
|
{
|
||||||
@ -239,6 +269,7 @@ create_reader (HANDLE fd)
|
|||||||
static void
|
static void
|
||||||
destroy_reader (struct reader_context_s *c)
|
destroy_reader (struct reader_context_s *c)
|
||||||
{
|
{
|
||||||
|
c->stop_me = 1;
|
||||||
if (c->have_space_ev)
|
if (c->have_space_ev)
|
||||||
SetEvent (c->have_space_ev);
|
SetEvent (c->have_space_ev);
|
||||||
|
|
||||||
@ -368,27 +399,240 @@ _gpgme_io_read ( int fd, void *buffer, size_t count )
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The writer does use a simple buffering strategy so that we are
|
||||||
|
* informed about write errors as soon as possible (i.e. with the the
|
||||||
|
* next call to the write function
|
||||||
|
*/
|
||||||
|
static DWORD CALLBACK
|
||||||
|
writer (void *arg)
|
||||||
|
{
|
||||||
|
struct writer_context_s *c = arg;
|
||||||
|
DWORD nwritten;
|
||||||
|
|
||||||
|
DEBUG2 ("writer thread %p for file %p started", c->thread_hd, c->file_hd );
|
||||||
|
for (;;) {
|
||||||
|
LOCK (c->mutex);
|
||||||
|
if ( !c->nbytes ) {
|
||||||
|
if (!ResetEvent (c->have_data) )
|
||||||
|
DEBUG1 ("ResetEvent failed: ec=%d", (int)GetLastError ());
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
DEBUG1 ("writer thread %p: idle ...", c->thread_hd );
|
||||||
|
WaitForSingleObject (c->have_data, INFINITE);
|
||||||
|
DEBUG1 ("writer thread %p: got data to send", c->thread_hd );
|
||||||
|
LOCK (c->mutex);
|
||||||
|
}
|
||||||
|
if ( c->stop_me ) {
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
|
||||||
|
DEBUG2 ("writer thread %p: writing %d bytes",
|
||||||
|
c->thread_hd, c->nbytes );
|
||||||
|
if ( c->nbytes && !WriteFile ( c->file_hd, c->buffer, c->nbytes,
|
||||||
|
&nwritten, NULL)) {
|
||||||
|
c->error_code = (int)GetLastError ();
|
||||||
|
c->error = 1;
|
||||||
|
DEBUG2 ("writer thread %p: write error: ec=%d",
|
||||||
|
c->thread_hd, c->error_code );
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
DEBUG2 ("writer thread %p: wrote %d bytes",
|
||||||
|
c->thread_hd, (int)nwritten );
|
||||||
|
|
||||||
|
LOCK (c->mutex);
|
||||||
|
c->nbytes -= nwritten;
|
||||||
|
if (c->stop_me) {
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if ( !c->nbytes ) {
|
||||||
|
if ( !SetEvent (c->is_empty) )
|
||||||
|
DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
|
||||||
|
}
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
}
|
||||||
|
/* indicate that we have an error */
|
||||||
|
if ( !SetEvent (c->is_empty) )
|
||||||
|
DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
|
||||||
|
DEBUG1 ("writer thread %p ended", c->thread_hd );
|
||||||
|
SetEvent (c->stopped);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static struct writer_context_s *
|
||||||
|
create_writer (HANDLE fd)
|
||||||
|
{
|
||||||
|
struct writer_context_s *c;
|
||||||
|
SECURITY_ATTRIBUTES sec_attr;
|
||||||
|
DWORD tid;
|
||||||
|
|
||||||
|
DEBUG1 ("creating new write thread for file handle %p", fd );
|
||||||
|
memset (&sec_attr, 0, sizeof sec_attr );
|
||||||
|
sec_attr.nLength = sizeof sec_attr;
|
||||||
|
sec_attr.bInheritHandle = FALSE;
|
||||||
|
|
||||||
|
c = xtrycalloc (1, sizeof *c );
|
||||||
|
if (!c)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
c->file_hd = fd;
|
||||||
|
c->have_data = CreateEvent (&sec_attr, FALSE, FALSE, NULL);
|
||||||
|
c->is_empty = CreateEvent (&sec_attr, TRUE, TRUE, NULL);
|
||||||
|
c->stopped = CreateEvent (&sec_attr, TRUE, FALSE, NULL);
|
||||||
|
if (!c->have_data || !c->is_empty || !c->stopped ) {
|
||||||
|
DEBUG1 ("** CreateEvent failed: ec=%d\n", (int)GetLastError ());
|
||||||
|
if (c->have_data)
|
||||||
|
CloseHandle (c->have_data);
|
||||||
|
if (c->is_empty)
|
||||||
|
CloseHandle (c->is_empty);
|
||||||
|
if (c->stopped)
|
||||||
|
CloseHandle (c->stopped);
|
||||||
|
xfree (c);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
c->is_empty = set_synchronize (c->is_empty);
|
||||||
|
INIT_LOCK (c->mutex);
|
||||||
|
|
||||||
|
c->thread_hd = CreateThread (&sec_attr, 0, writer, c, 0, &tid );
|
||||||
|
if (!c->thread_hd) {
|
||||||
|
DEBUG1 ("** failed to create writer thread: ec=%d\n",
|
||||||
|
(int)GetLastError ());
|
||||||
|
DESTROY_LOCK (c->mutex);
|
||||||
|
if (c->have_data)
|
||||||
|
CloseHandle (c->have_data);
|
||||||
|
if (c->is_empty)
|
||||||
|
CloseHandle (c->is_empty);
|
||||||
|
if (c->stopped)
|
||||||
|
CloseHandle (c->stopped);
|
||||||
|
xfree (c);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
destroy_writer (struct writer_context_s *c)
|
||||||
|
{
|
||||||
|
c->stop_me = 1;
|
||||||
|
if (c->have_data)
|
||||||
|
SetEvent (c->have_data);
|
||||||
|
|
||||||
|
DEBUG1 ("waiting for thread %p termination ...", c->thread_hd );
|
||||||
|
WaitForSingleObject (c->stopped, INFINITE);
|
||||||
|
DEBUG1 ("thread %p has terminated", c->thread_hd );
|
||||||
|
|
||||||
|
if (c->stopped)
|
||||||
|
CloseHandle (c->stopped);
|
||||||
|
if (c->have_data)
|
||||||
|
CloseHandle (c->have_data);
|
||||||
|
if (c->is_empty)
|
||||||
|
CloseHandle (c->is_empty);
|
||||||
|
CloseHandle (c->thread_hd);
|
||||||
|
DESTROY_LOCK (c->mutex);
|
||||||
|
xfree (c);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find a writer context or create a new one
|
||||||
|
* Note that the writer context will last until a io_close.
|
||||||
|
*/
|
||||||
|
static struct writer_context_s *
|
||||||
|
find_writer (int fd, int start_it)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i=0; i < writer_table_size ; i++ ) {
|
||||||
|
if ( writer_table[i].used && writer_table[i].fd == fd )
|
||||||
|
return writer_table[i].context;
|
||||||
|
}
|
||||||
|
if (!start_it)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
LOCK (writer_table_lock);
|
||||||
|
for (i=0; i < writer_table_size; i++ ) {
|
||||||
|
if (!writer_table[i].used) {
|
||||||
|
writer_table[i].fd = fd;
|
||||||
|
writer_table[i].context = create_writer (fd_to_handle (fd));
|
||||||
|
writer_table[i].used = 1;
|
||||||
|
UNLOCK (writer_table_lock);
|
||||||
|
return writer_table[i].context;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UNLOCK (writer_table_lock);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
kill_writer (int fd)
|
||||||
|
{
|
||||||
|
int i;
|
||||||
|
|
||||||
|
LOCK (writer_table_lock);
|
||||||
|
for (i=0; i < writer_table_size; i++ ) {
|
||||||
|
if (writer_table[i].used && writer_table[i].fd == fd ) {
|
||||||
|
destroy_writer (writer_table[i].context);
|
||||||
|
writer_table[i].context = NULL;
|
||||||
|
writer_table[i].used = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UNLOCK (writer_table_lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
_gpgme_io_write ( int fd, const void *buffer, size_t count )
|
_gpgme_io_write ( int fd, const void *buffer, size_t count )
|
||||||
{
|
{
|
||||||
DWORD nwritten;
|
struct writer_context_s *c = find_writer (fd,1);
|
||||||
HANDLE h = fd_to_handle (fd);
|
|
||||||
|
|
||||||
/* writing blocks for large counts, so we limit it here. */
|
|
||||||
if (count > 1024)
|
|
||||||
count = 1024;
|
|
||||||
|
|
||||||
DEBUG2 ("fd %d: about to write %d bytes\n", fd, (int)count );
|
DEBUG2 ("fd %d: about to write %d bytes\n", fd, (int)count );
|
||||||
if ( !WriteFile ( h, buffer, count, &nwritten, NULL) ) {
|
if ( !c ) {
|
||||||
DEBUG1 ("WriteFile failed: ec=%d\n", (int)GetLastError ());
|
DEBUG0 ( "no writer thread\n");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
DEBUG2 ("fd %d: wrote %d bytes\n",
|
|
||||||
fd, (int)nwritten );
|
|
||||||
|
|
||||||
return (int)nwritten;
|
LOCK (c->mutex);
|
||||||
|
if ( c->nbytes ) { /* bytes are pending for send */
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
DEBUG2 ("fd %d: waiting for empty buffer in thread %p",
|
||||||
|
fd, c->thread_hd);
|
||||||
|
WaitForSingleObject (c->is_empty, INFINITE);
|
||||||
|
DEBUG2 ("fd %d: thread %p buffer is empty", fd, c->thread_hd);
|
||||||
|
assert (!c->nbytes);
|
||||||
|
LOCK (c->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( c->error) {
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
DEBUG1 ("fd %d: write error", fd );
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count > WRITEBUF_SIZE)
|
||||||
|
count = WRITEBUF_SIZE;
|
||||||
|
memcpy (c->buffer, buffer, count);
|
||||||
|
c->nbytes = count;
|
||||||
|
if (!SetEvent (c->have_data))
|
||||||
|
DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ());
|
||||||
|
UNLOCK (c->mutex);
|
||||||
|
|
||||||
|
DEBUG2 ("fd %d: copied %d bytes\n",
|
||||||
|
fd, (int)count );
|
||||||
|
return (int)count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int
|
int
|
||||||
_gpgme_io_pipe ( int filedes[2], int inherit_idx )
|
_gpgme_io_pipe ( int filedes[2], int inherit_idx )
|
||||||
{
|
{
|
||||||
@ -448,6 +692,7 @@ _gpgme_io_close ( int fd )
|
|||||||
|
|
||||||
DEBUG1 ("** closing handle for fd %d\n", fd);
|
DEBUG1 ("** closing handle for fd %d\n", fd);
|
||||||
kill_reader (fd);
|
kill_reader (fd);
|
||||||
|
kill_writer (fd);
|
||||||
LOCK (notify_table_lock);
|
LOCK (notify_table_lock);
|
||||||
for ( i=0; i < DIM (notify_table); i++ ) {
|
for ( i=0; i < DIM (notify_table); i++ ) {
|
||||||
if (notify_table[i].inuse && notify_table[i].fd == fd) {
|
if (notify_table[i].inuse && notify_table[i].fd == fd) {
|
||||||
@ -736,13 +981,13 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds )
|
|||||||
HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS];
|
HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS];
|
||||||
int waitidx[MAXIMUM_WAIT_OBJECTS];
|
int waitidx[MAXIMUM_WAIT_OBJECTS];
|
||||||
int code, nwait;
|
int code, nwait;
|
||||||
int i, any, any_write;
|
int i, any;
|
||||||
int count;
|
int count;
|
||||||
void *dbg_help;
|
void *dbg_help;
|
||||||
|
|
||||||
restart:
|
restart:
|
||||||
DEBUG_BEGIN (dbg_help, "select on [ ");
|
DEBUG_BEGIN (dbg_help, "select on [ ");
|
||||||
any = any_write = 0;
|
any = 0;
|
||||||
nwait = 0;
|
nwait = 0;
|
||||||
count = 0;
|
count = 0;
|
||||||
for ( i=0; i < nfds; i++ ) {
|
for ( i=0; i < nfds; i++ ) {
|
||||||
@ -772,13 +1017,22 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds )
|
|||||||
any = 1;
|
any = 1;
|
||||||
}
|
}
|
||||||
else if ( fds[i].for_write ) {
|
else if ( fds[i].for_write ) {
|
||||||
|
struct writer_context_s *c = find_writer (fds[i].fd,1);
|
||||||
|
|
||||||
|
if (!c) {
|
||||||
|
DEBUG1 ("oops: no writer thread for fd %d", fds[i].fd);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if ( nwait >= DIM (waitbuf) ) {
|
||||||
|
DEBUG_END (dbg_help, "oops ]");
|
||||||
|
DEBUG0 ("Too many objects for WFMO!" );
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
waitidx[nwait] = i;
|
||||||
|
waitbuf[nwait++] = c->is_empty;
|
||||||
|
}
|
||||||
DEBUG_ADD1 (dbg_help, "w%d ", fds[i].fd );
|
DEBUG_ADD1 (dbg_help, "w%d ", fds[i].fd );
|
||||||
any = 1;
|
any = 1;
|
||||||
/* no way to see whether a handle is ready for writing,
|
|
||||||
* so we signal them all */
|
|
||||||
fds[i].signaled = 1;
|
|
||||||
any_write =1;
|
|
||||||
count++;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -786,7 +1040,7 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds )
|
|||||||
if (!any)
|
if (!any)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
code = WaitForMultipleObjects ( nwait, waitbuf, 0, any_write? 200:1000);
|
code = WaitForMultipleObjects ( nwait, waitbuf, 0, 1000);
|
||||||
if ( code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait ) {
|
if ( code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait ) {
|
||||||
/* This WFMO is a really silly function: It does return either
|
/* This WFMO is a really silly function: It does return either
|
||||||
* the index of the signaled object or if 2 objects have been
|
* the index of the signaled object or if 2 objects have been
|
||||||
|
Loading…
Reference in New Issue
Block a user