aboutsummaryrefslogtreecommitdiffstats
path: root/g10/mdfilter.c
diff options
context:
space:
mode:
authorNIIBE Yutaka <[email protected]>2023-07-25 03:02:26 +0000
committerNIIBE Yutaka <[email protected]>2023-07-25 03:10:07 +0000
commit1ddd69935da629188dcf9215cd9e7a8f68b34a97 (patch)
tree3dd81fc60bf6b5ee9be63d1748f60afbb2df24d5 /g10/mdfilter.c
parentsm: Use estream for I/O. (diff)
downloadgnupg-1ddd69935da629188dcf9215cd9e7a8f68b34a97.tar.gz
gnupg-1ddd69935da629188dcf9215cd9e7a8f68b34a97.zip
gpg: Add parallelized filter for hashing.
* g10/options.h (COMPAT_PARALLELIZED): New. * g10/filter.h (md_thd_filter_context_t): New type. (md_thd_filter_set_md, md_thd_filter): New. * g10/gpg.c (compatibility_flags): Update to support COMPAT_PARALLELIZED. * g10/mdfilter.c (struct md_thd_filter_context): New. (lock_md, unlock_md, get_buffer_to_hash, put_buffer_to_recv): New. (get_buffer_to_fill, put_buffer_to_send, md_thread): New. (md_thd_filter, md_thd_filter_set_md): New. * g10/sign.c (sign_file): Add support for md_thd_filter. (sign_symencrypt_file): Likewise. -- GnuPG-bug-id: 6570 Signed-off-by: NIIBE Yutaka <[email protected]>
Diffstat (limited to 'g10/mdfilter.c')
-rw-r--r--g10/mdfilter.c295
1 files changed, 295 insertions, 0 deletions
diff --git a/g10/mdfilter.c b/g10/mdfilter.c
index f3318f15c..a655d6d72 100644
--- a/g10/mdfilter.c
+++ b/g10/mdfilter.c
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
+#include <npth.h>
#include "gpg.h"
#include "../common/status.h"
@@ -71,3 +72,297 @@ free_md_filter_context( md_filter_context_t *mfx )
mfx->md2 = NULL;
mfx->maxbuf_size = 0;
}
+
+
+/****************
+ * Threaded implementation for hashing.
+ */
+
+struct md_thd_filter_context {
+ gcry_md_hd_t md;
+ npth_t thd;
+ /**/
+ npth_mutex_t mutex;
+ npth_cond_t cond;
+ size_t bufsize;
+ unsigned int produce : 1;
+ unsigned int consume : 1;
+ ssize_t written0;
+ ssize_t written1;
+ unsigned char buf[1];
+};
+
+
+static void
+lock_md (struct md_thd_filter_context *mfx)
+{
+ int rc = npth_mutex_lock (&mfx->mutex);
+ if (rc)
+ log_fatal ("%s: failed to acquire mutex: %s\n", __func__,
+ gpg_strerror (gpg_error_from_errno (rc)));
+}
+
+
+static void
+unlock_md (struct md_thd_filter_context * mfx)
+{
+ int rc = npth_mutex_unlock (&mfx->mutex);
+ if (rc)
+ log_fatal ("%s: failed to release mutex: %s\n", __func__,
+ gpg_strerror (gpg_error_from_errno (rc)));
+}
+
+static int
+get_buffer_to_hash (struct md_thd_filter_context *mfx,
+ unsigned char **r_buf, size_t *r_len)
+{
+ int rc = 0;
+
+ lock_md (mfx);
+
+ if ((mfx->consume == 0 && mfx->written0 < 0)
+ || (mfx->consume != 0 && mfx->written1 < 0))
+ {
+ rc = npth_cond_wait (&mfx->cond, &mfx->mutex);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return -1;
+ }
+ }
+
+ if (mfx->consume == 0)
+ {
+ *r_buf = mfx->buf;
+ *r_len = mfx->written0;
+ }
+ else
+ {
+ *r_buf = mfx->buf + mfx->bufsize;
+ *r_len = mfx->written1;
+ }
+
+ unlock_md (mfx);
+
+ return 0;
+}
+
+static int
+put_buffer_to_recv (struct md_thd_filter_context *mfx)
+{
+ int rc = 0;
+
+ lock_md (mfx);
+ if (mfx->consume == 0)
+ {
+ mfx->written0 = -1;
+ mfx->consume = 1;
+ }
+ else
+ {
+ mfx->written1 = -1;
+ mfx->consume = 0;
+ }
+
+ rc = npth_cond_signal (&mfx->cond);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return -1;
+ }
+
+ unlock_md (mfx);
+ return 0;
+}
+
+static int
+get_buffer_to_fill (struct md_thd_filter_context *mfx,
+ unsigned char **r_buf, size_t len)
+{
+ lock_md (mfx);
+
+ if (len > mfx->bufsize)
+ {
+ unlock_md (mfx);
+ return GPG_ERR_BUFFER_TOO_SHORT;
+ }
+
+ if ((mfx->produce == 0 && mfx->written0 >= 0)
+ || (mfx->produce != 0 && mfx->written1 >= 0))
+ {
+ int rc = npth_cond_wait (&mfx->cond, &mfx->mutex);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return gpg_error_from_errno (rc);
+ }
+ }
+
+ if (mfx->produce == 0)
+ *r_buf = mfx->buf;
+ else
+ *r_buf = mfx->buf + mfx->bufsize;
+ unlock_md (mfx);
+ return 0;
+}
+
+static int
+put_buffer_to_send (struct md_thd_filter_context *mfx, size_t len)
+{
+ int rc;
+
+ lock_md (mfx);
+ if (mfx->produce == 0)
+ {
+ mfx->written0 = len;
+ mfx->produce = 1;
+ }
+ else
+ {
+ mfx->written1 = len;
+ mfx->produce = 0;
+ }
+
+ rc = npth_cond_signal (&mfx->cond);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return gpg_error_from_errno (rc);
+ }
+
+ unlock_md (mfx);
+
+ /* Yield to the md_thread to let it compute the hash in parallel */
+ npth_usleep (0);
+ return 0;
+}
+
+
+static void *
+md_thread (void *arg)
+{
+ struct md_thd_filter_context *mfx = arg;
+
+ while (1)
+ {
+ unsigned char *buf;
+ size_t len;
+
+ if (get_buffer_to_hash (mfx, &buf, &len) < 0)
+ /* Error */
+ return NULL;
+
+ if (len == 0)
+ break;
+
+ npth_unprotect ();
+ gcry_md_write (mfx->md, buf, len);
+ npth_protect ();
+
+ if (put_buffer_to_recv (mfx) < 0)
+ /* Error */
+ return NULL;
+ }
+
+ return NULL;
+}
+
+int
+md_thd_filter (void *opaque, int control,
+ IOBUF a, byte *buf, size_t *ret_len)
+{
+ size_t size = *ret_len;
+ struct md_thd_filter_context **r_mfx = opaque;
+ struct md_thd_filter_context *mfx = *r_mfx;
+ int rc=0;
+
+ if (control == IOBUFCTRL_INIT)
+ {
+ npth_attr_t tattr;
+ size_t n;
+
+ n = 2 * iobuf_set_buffer_size (0) * 1024;
+ mfx = xtrymalloc (n + offsetof (struct md_thd_filter_context, buf));
+ if (!mfx)
+ return gpg_error_from_syserror ();
+ *r_mfx = mfx;
+ mfx->bufsize = n / 2;
+ mfx->consume = mfx->produce = 0;
+ mfx->written0 = -1;
+ mfx->written1 = -1;
+
+ rc = npth_mutex_init (&mfx->mutex, NULL);
+ if (rc)
+ {
+ return gpg_error_from_errno (rc);
+ }
+ rc = npth_cond_init (&mfx->cond, NULL);
+ if (rc)
+ {
+ npth_mutex_destroy (&mfx->mutex);
+ return gpg_error_from_errno (rc);
+ }
+ rc = npth_attr_init (&tattr);
+ if (rc)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ return gpg_error_from_errno (rc);
+ }
+ npth_attr_setdetachstate (&tattr, NPTH_CREATE_JOINABLE);
+ rc = npth_create (&mfx->thd, &tattr, md_thread, mfx);
+ if (rc)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ npth_attr_destroy (&tattr);
+ return gpg_error_from_errno (rc);
+ }
+ npth_attr_destroy (&tattr);
+ }
+ else if (control == IOBUFCTRL_UNDERFLOW)
+ {
+ int i;
+ unsigned char *md_buf = NULL;
+
+ i = iobuf_read (a, buf, size);
+ if (i == -1)
+ i = 0;
+
+ rc = get_buffer_to_fill (mfx, &md_buf, i);
+ if (rc)
+ return rc;
+
+ if (i)
+ memcpy (md_buf, buf, i);
+
+ rc = put_buffer_to_send (mfx, i);
+ if (rc)
+ return rc;
+
+ if (i == 0)
+ {
+ npth_join (mfx->thd, NULL);
+ rc = -1; /* eof */
+ }
+
+ *ret_len = i;
+ }
+ else if (control == IOBUFCTRL_FREE)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ xfree (mfx);
+ *r_mfx = NULL;
+ }
+ else if (control == IOBUFCTRL_DESC)
+ mem2str (buf, "md_thd_filter", *ret_len);
+
+ return rc;
+}
+
+void
+md_thd_filter_set_md (struct md_thd_filter_context *mfx, gcry_md_hd_t md)
+{
+ mfx->md = md;
+}