From be28316c352241a4becba6e2a4f661ba5ce0f57a Mon Sep 17 00:00:00 2001 From: Werner Koch Date: Wed, 28 Feb 2001 10:18:38 +0000 Subject: [PATCH] Use a writer thread under W32 (arggg) --- gpgme/ChangeLog | 9 ++ gpgme/w32-io.c | 294 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 283 insertions(+), 20 deletions(-) diff --git a/gpgme/ChangeLog b/gpgme/ChangeLog index ecb2ea2d..e8c0c9b8 100644 --- a/gpgme/ChangeLog +++ b/gpgme/ChangeLog @@ -1,3 +1,12 @@ +2001-02-28 Werner Koch + + * w32-io.c (destroy_reader): Set sop_me flag. + (writer,create_writer,destroy_writer,find_writer,kill_writer): New. + (_gpgme_io_write): Use a writer thread to avaoid blocking. + (_gpgme_io_close): Cleanup a writer thread + (_gpgme_io_select): Repalce tthe faked wait on writing by a real + waiting which is now possible due to the use of a writer thread. + 2001-02-20 Werner Koch * w32-io.c (destroy_reader,kill_reader): New. diff --git a/gpgme/w32-io.c b/gpgme/w32-io.c index 79707eee..311eb3c5 100644 --- a/gpgme/w32-io.c +++ b/gpgme/w32-io.c @@ -51,6 +51,9 @@ #define handle_to_pid(a) ((int)(a)) #define READBUF_SIZE 4096 +#define WRITEBUF_SIZE 4096 +#define MAX_READERS 20 +#define MAX_WRITERS 20 static struct { int inuse; @@ -80,7 +83,6 @@ struct reader_context_s { }; -#define MAX_READERS 20 static struct { volatile int used; int fd; @@ -89,6 +91,34 @@ static struct { static int reader_table_size= MAX_READERS; DEFINE_STATIC_LOCK (reader_table_lock); + +struct writer_context_s { + HANDLE file_hd; + HANDLE thread_hd; + DECLARE_LOCK (mutex); + + int stop_me; + int error; + int error_code; + + HANDLE have_data; /* manually reset */ + HANDLE is_empty; + HANDLE stopped; + size_t nbytes; + char buffer[WRITEBUF_SIZE]; +}; + + +static struct { + volatile int used; + int fd; + struct writer_context_s *context; +} writer_table[MAX_WRITERS]; +static int writer_table_size= MAX_WRITERS; +DEFINE_STATIC_LOCK (writer_table_lock); + + + static HANDLE set_synchronize (HANDLE h) { @@ -239,6 +269,7 @@ create_reader (HANDLE fd) static void destroy_reader (struct reader_context_s *c) { + c->stop_me = 1; if (c->have_space_ev) SetEvent (c->have_space_ev); @@ -368,27 +399,240 @@ _gpgme_io_read ( int fd, void *buffer, size_t count ) } + +/* + * The writer does use a simple buffering strategy so that we are + * informed about write errors as soon as possible (i.e. with the the + * next call to the write function + */ +static DWORD CALLBACK +writer (void *arg) +{ + struct writer_context_s *c = arg; + DWORD nwritten; + + DEBUG2 ("writer thread %p for file %p started", c->thread_hd, c->file_hd ); + for (;;) { + LOCK (c->mutex); + if ( !c->nbytes ) { + if (!ResetEvent (c->have_data) ) + DEBUG1 ("ResetEvent failed: ec=%d", (int)GetLastError ()); + UNLOCK (c->mutex); + DEBUG1 ("writer thread %p: idle ...", c->thread_hd ); + WaitForSingleObject (c->have_data, INFINITE); + DEBUG1 ("writer thread %p: got data to send", c->thread_hd ); + LOCK (c->mutex); + } + if ( c->stop_me ) { + UNLOCK (c->mutex); + break; + } + UNLOCK (c->mutex); + + DEBUG2 ("writer thread %p: writing %d bytes", + c->thread_hd, c->nbytes ); + if ( c->nbytes && !WriteFile ( c->file_hd, c->buffer, c->nbytes, + &nwritten, NULL)) { + c->error_code = (int)GetLastError (); + c->error = 1; + DEBUG2 ("writer thread %p: write error: ec=%d", + c->thread_hd, c->error_code ); + break; + } + DEBUG2 ("writer thread %p: wrote %d bytes", + c->thread_hd, (int)nwritten ); + + LOCK (c->mutex); + c->nbytes -= nwritten; + if (c->stop_me) { + UNLOCK (c->mutex); + break; + } + if ( !c->nbytes ) { + if ( !SetEvent (c->is_empty) ) + DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ()); + } + UNLOCK (c->mutex); + } + /* indicate that we have an error */ + if ( !SetEvent (c->is_empty) ) + DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ()); + DEBUG1 ("writer thread %p ended", c->thread_hd ); + SetEvent (c->stopped); + + return 0; +} + + +static struct writer_context_s * +create_writer (HANDLE fd) +{ + struct writer_context_s *c; + SECURITY_ATTRIBUTES sec_attr; + DWORD tid; + + DEBUG1 ("creating new write thread for file handle %p", fd ); + memset (&sec_attr, 0, sizeof sec_attr ); + sec_attr.nLength = sizeof sec_attr; + sec_attr.bInheritHandle = FALSE; + + c = xtrycalloc (1, sizeof *c ); + if (!c) + return NULL; + + c->file_hd = fd; + c->have_data = CreateEvent (&sec_attr, FALSE, FALSE, NULL); + c->is_empty = CreateEvent (&sec_attr, TRUE, TRUE, NULL); + c->stopped = CreateEvent (&sec_attr, TRUE, FALSE, NULL); + if (!c->have_data || !c->is_empty || !c->stopped ) { + DEBUG1 ("** CreateEvent failed: ec=%d\n", (int)GetLastError ()); + if (c->have_data) + CloseHandle (c->have_data); + if (c->is_empty) + CloseHandle (c->is_empty); + if (c->stopped) + CloseHandle (c->stopped); + xfree (c); + return NULL; + } + + c->is_empty = set_synchronize (c->is_empty); + INIT_LOCK (c->mutex); + + c->thread_hd = CreateThread (&sec_attr, 0, writer, c, 0, &tid ); + if (!c->thread_hd) { + DEBUG1 ("** failed to create writer thread: ec=%d\n", + (int)GetLastError ()); + DESTROY_LOCK (c->mutex); + if (c->have_data) + CloseHandle (c->have_data); + if (c->is_empty) + CloseHandle (c->is_empty); + if (c->stopped) + CloseHandle (c->stopped); + xfree (c); + return NULL; + } + + return c; +} + +static void +destroy_writer (struct writer_context_s *c) +{ + c->stop_me = 1; + if (c->have_data) + SetEvent (c->have_data); + + DEBUG1 ("waiting for thread %p termination ...", c->thread_hd ); + WaitForSingleObject (c->stopped, INFINITE); + DEBUG1 ("thread %p has terminated", c->thread_hd ); + + if (c->stopped) + CloseHandle (c->stopped); + if (c->have_data) + CloseHandle (c->have_data); + if (c->is_empty) + CloseHandle (c->is_empty); + CloseHandle (c->thread_hd); + DESTROY_LOCK (c->mutex); + xfree (c); +} + + +/* + * Find a writer context or create a new one + * Note that the writer context will last until a io_close. + */ +static struct writer_context_s * +find_writer (int fd, int start_it) +{ + int i; + + for (i=0; i < writer_table_size ; i++ ) { + if ( writer_table[i].used && writer_table[i].fd == fd ) + return writer_table[i].context; + } + if (!start_it) + return NULL; + + LOCK (writer_table_lock); + for (i=0; i < writer_table_size; i++ ) { + if (!writer_table[i].used) { + writer_table[i].fd = fd; + writer_table[i].context = create_writer (fd_to_handle (fd)); + writer_table[i].used = 1; + UNLOCK (writer_table_lock); + return writer_table[i].context; + } + } + UNLOCK (writer_table_lock); + return NULL; +} + + +static void +kill_writer (int fd) +{ + int i; + + LOCK (writer_table_lock); + for (i=0; i < writer_table_size; i++ ) { + if (writer_table[i].used && writer_table[i].fd == fd ) { + destroy_writer (writer_table[i].context); + writer_table[i].context = NULL; + writer_table[i].used = 0; + break; + } + } + UNLOCK (writer_table_lock); +} + + + + int _gpgme_io_write ( int fd, const void *buffer, size_t count ) { - DWORD nwritten; - HANDLE h = fd_to_handle (fd); - - /* writing blocks for large counts, so we limit it here. */ - if (count > 1024) - count = 1024; + struct writer_context_s *c = find_writer (fd,1); DEBUG2 ("fd %d: about to write %d bytes\n", fd, (int)count ); - if ( !WriteFile ( h, buffer, count, &nwritten, NULL) ) { - DEBUG1 ("WriteFile failed: ec=%d\n", (int)GetLastError ()); + if ( !c ) { + DEBUG0 ( "no writer thread\n"); return -1; } - DEBUG2 ("fd %d: wrote %d bytes\n", - fd, (int)nwritten ); - return (int)nwritten; + LOCK (c->mutex); + if ( c->nbytes ) { /* bytes are pending for send */ + UNLOCK (c->mutex); + DEBUG2 ("fd %d: waiting for empty buffer in thread %p", + fd, c->thread_hd); + WaitForSingleObject (c->is_empty, INFINITE); + DEBUG2 ("fd %d: thread %p buffer is empty", fd, c->thread_hd); + assert (!c->nbytes); + LOCK (c->mutex); + } + + if ( c->error) { + UNLOCK (c->mutex); + DEBUG1 ("fd %d: write error", fd ); + return -1; + } + + if (count > WRITEBUF_SIZE) + count = WRITEBUF_SIZE; + memcpy (c->buffer, buffer, count); + c->nbytes = count; + if (!SetEvent (c->have_data)) + DEBUG1 ("SetEvent failed: ec=%d", (int)GetLastError ()); + UNLOCK (c->mutex); + + DEBUG2 ("fd %d: copied %d bytes\n", + fd, (int)count ); + return (int)count; } + int _gpgme_io_pipe ( int filedes[2], int inherit_idx ) { @@ -448,6 +692,7 @@ _gpgme_io_close ( int fd ) DEBUG1 ("** closing handle for fd %d\n", fd); kill_reader (fd); + kill_writer (fd); LOCK (notify_table_lock); for ( i=0; i < DIM (notify_table); i++ ) { if (notify_table[i].inuse && notify_table[i].fd == fd) { @@ -736,13 +981,13 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds ) HANDLE waitbuf[MAXIMUM_WAIT_OBJECTS]; int waitidx[MAXIMUM_WAIT_OBJECTS]; int code, nwait; - int i, any, any_write; + int i, any; int count; void *dbg_help; restart: DEBUG_BEGIN (dbg_help, "select on [ "); - any = any_write = 0; + any = 0; nwait = 0; count = 0; for ( i=0; i < nfds; i++ ) { @@ -772,13 +1017,22 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds ) any = 1; } else if ( fds[i].for_write ) { + struct writer_context_s *c = find_writer (fds[i].fd,1); + + if (!c) { + DEBUG1 ("oops: no writer thread for fd %d", fds[i].fd); + } + else { + if ( nwait >= DIM (waitbuf) ) { + DEBUG_END (dbg_help, "oops ]"); + DEBUG0 ("Too many objects for WFMO!" ); + return -1; + } + waitidx[nwait] = i; + waitbuf[nwait++] = c->is_empty; + } DEBUG_ADD1 (dbg_help, "w%d ", fds[i].fd ); any = 1; - /* no way to see whether a handle is ready for writing, - * so we signal them all */ - fds[i].signaled = 1; - any_write =1; - count++; } } } @@ -786,7 +1040,7 @@ _gpgme_io_select ( struct io_select_fd_s *fds, size_t nfds ) if (!any) return 0; - code = WaitForMultipleObjects ( nwait, waitbuf, 0, any_write? 200:1000); + code = WaitForMultipleObjects ( nwait, waitbuf, 0, 1000); if ( code >= WAIT_OBJECT_0 && code < WAIT_OBJECT_0 + nwait ) { /* This WFMO is a really silly function: It does return either * the index of the signaled object or if 2 objects have been