aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNIIBE Yutaka <[email protected]>2023-07-25 03:02:26 +0000
committerNIIBE Yutaka <[email protected]>2023-07-25 03:10:07 +0000
commit1ddd69935da629188dcf9215cd9e7a8f68b34a97 (patch)
tree3dd81fc60bf6b5ee9be63d1748f60afbb2df24d5
parentsm: Use estream for I/O. (diff)
downloadgnupg-1ddd69935da629188dcf9215cd9e7a8f68b34a97.tar.gz
gnupg-1ddd69935da629188dcf9215cd9e7a8f68b34a97.zip
gpg: Add parallelized filter for hashing.
* g10/options.h (COMPAT_PARALLELIZED): New. * g10/filter.h (md_thd_filter_context_t): New type. (md_thd_filter_set_md, md_thd_filter): New. * g10/gpg.c (compatibility_flags): Update to support COMPAT_PARALLELIZED. * g10/mdfilter.c (struct md_thd_filter_context): New. (lock_md, unlock_md, get_buffer_to_hash, put_buffer_to_recv): New. (get_buffer_to_fill, put_buffer_to_send, md_thread): New. (md_thd_filter, md_thd_filter_set_md): New. * g10/sign.c (sign_file): Add support for md_thd_filter. (sign_symencrypt_file): Likewise. -- GnuPG-bug-id: 6570 Signed-off-by: NIIBE Yutaka <[email protected]>
-rw-r--r--g10/filter.h4
-rw-r--r--g10/gpg.c1
-rw-r--r--g10/mdfilter.c295
-rw-r--r--g10/options.h4
-rw-r--r--g10/sign.c64
5 files changed, 352 insertions, 16 deletions
diff --git a/g10/filter.h b/g10/filter.h
index 4b4fc55ff..321b553dc 100644
--- a/g10/filter.h
+++ b/g10/filter.h
@@ -29,6 +29,9 @@ typedef struct {
size_t maxbuf_size;
} md_filter_context_t;
+typedef struct md_thd_filter_context *md_thd_filter_context_t;
+void md_thd_filter_set_md (md_thd_filter_context_t mfx, gcry_md_hd_t md);
+
typedef struct {
int refcount; /* Initialized to 1. */
@@ -165,6 +168,7 @@ typedef struct {
/*-- mdfilter.c --*/
int md_filter( void *opaque, int control, iobuf_t a, byte *buf, size_t *ret_len);
+int md_thd_filter( void *opaque, int control, iobuf_t a, byte *buf, size_t *ret_len);
void free_md_filter_context( md_filter_context_t *mfx );
/*-- armor.c --*/
diff --git a/g10/gpg.c b/g10/gpg.c
index 23bf8d971..8b3d79e1e 100644
--- a/g10/gpg.c
+++ b/g10/gpg.c
@@ -1027,6 +1027,7 @@ static struct debug_flags_s debug_flags [] =
/* The list of compatibility flags. */
static struct compatibility_flags_s compatibility_flags [] =
{
+ { COMPAT_PARALLELIZED, "parallelized" },
{ 0, NULL }
};
diff --git a/g10/mdfilter.c b/g10/mdfilter.c
index f3318f15c..a655d6d72 100644
--- a/g10/mdfilter.c
+++ b/g10/mdfilter.c
@@ -22,6 +22,7 @@
#include <stdlib.h>
#include <string.h>
#include <errno.h>
+#include <npth.h>
#include "gpg.h"
#include "../common/status.h"
@@ -71,3 +72,297 @@ free_md_filter_context( md_filter_context_t *mfx )
mfx->md2 = NULL;
mfx->maxbuf_size = 0;
}
+
+
+/****************
+ * Threaded implementation for hashing.
+ */
+
+struct md_thd_filter_context {
+ gcry_md_hd_t md;
+ npth_t thd;
+ /**/
+ npth_mutex_t mutex;
+ npth_cond_t cond;
+ size_t bufsize;
+ unsigned int produce : 1;
+ unsigned int consume : 1;
+ ssize_t written0;
+ ssize_t written1;
+ unsigned char buf[1];
+};
+
+
+static void
+lock_md (struct md_thd_filter_context *mfx)
+{
+ int rc = npth_mutex_lock (&mfx->mutex);
+ if (rc)
+ log_fatal ("%s: failed to acquire mutex: %s\n", __func__,
+ gpg_strerror (gpg_error_from_errno (rc)));
+}
+
+
+static void
+unlock_md (struct md_thd_filter_context * mfx)
+{
+ int rc = npth_mutex_unlock (&mfx->mutex);
+ if (rc)
+ log_fatal ("%s: failed to release mutex: %s\n", __func__,
+ gpg_strerror (gpg_error_from_errno (rc)));
+}
+
+static int
+get_buffer_to_hash (struct md_thd_filter_context *mfx,
+ unsigned char **r_buf, size_t *r_len)
+{
+ int rc = 0;
+
+ lock_md (mfx);
+
+ if ((mfx->consume == 0 && mfx->written0 < 0)
+ || (mfx->consume != 0 && mfx->written1 < 0))
+ {
+ rc = npth_cond_wait (&mfx->cond, &mfx->mutex);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return -1;
+ }
+ }
+
+ if (mfx->consume == 0)
+ {
+ *r_buf = mfx->buf;
+ *r_len = mfx->written0;
+ }
+ else
+ {
+ *r_buf = mfx->buf + mfx->bufsize;
+ *r_len = mfx->written1;
+ }
+
+ unlock_md (mfx);
+
+ return 0;
+}
+
+static int
+put_buffer_to_recv (struct md_thd_filter_context *mfx)
+{
+ int rc = 0;
+
+ lock_md (mfx);
+ if (mfx->consume == 0)
+ {
+ mfx->written0 = -1;
+ mfx->consume = 1;
+ }
+ else
+ {
+ mfx->written1 = -1;
+ mfx->consume = 0;
+ }
+
+ rc = npth_cond_signal (&mfx->cond);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return -1;
+ }
+
+ unlock_md (mfx);
+ return 0;
+}
+
+static int
+get_buffer_to_fill (struct md_thd_filter_context *mfx,
+ unsigned char **r_buf, size_t len)
+{
+ lock_md (mfx);
+
+ if (len > mfx->bufsize)
+ {
+ unlock_md (mfx);
+ return GPG_ERR_BUFFER_TOO_SHORT;
+ }
+
+ if ((mfx->produce == 0 && mfx->written0 >= 0)
+ || (mfx->produce != 0 && mfx->written1 >= 0))
+ {
+ int rc = npth_cond_wait (&mfx->cond, &mfx->mutex);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return gpg_error_from_errno (rc);
+ }
+ }
+
+ if (mfx->produce == 0)
+ *r_buf = mfx->buf;
+ else
+ *r_buf = mfx->buf + mfx->bufsize;
+ unlock_md (mfx);
+ return 0;
+}
+
+static int
+put_buffer_to_send (struct md_thd_filter_context *mfx, size_t len)
+{
+ int rc;
+
+ lock_md (mfx);
+ if (mfx->produce == 0)
+ {
+ mfx->written0 = len;
+ mfx->produce = 1;
+ }
+ else
+ {
+ mfx->written1 = len;
+ mfx->produce = 0;
+ }
+
+ rc = npth_cond_signal (&mfx->cond);
+ if (rc)
+ {
+ unlock_md (mfx);
+ return gpg_error_from_errno (rc);
+ }
+
+ unlock_md (mfx);
+
+ /* Yield to the md_thread to let it compute the hash in parallel */
+ npth_usleep (0);
+ return 0;
+}
+
+
+static void *
+md_thread (void *arg)
+{
+ struct md_thd_filter_context *mfx = arg;
+
+ while (1)
+ {
+ unsigned char *buf;
+ size_t len;
+
+ if (get_buffer_to_hash (mfx, &buf, &len) < 0)
+ /* Error */
+ return NULL;
+
+ if (len == 0)
+ break;
+
+ npth_unprotect ();
+ gcry_md_write (mfx->md, buf, len);
+ npth_protect ();
+
+ if (put_buffer_to_recv (mfx) < 0)
+ /* Error */
+ return NULL;
+ }
+
+ return NULL;
+}
+
+int
+md_thd_filter (void *opaque, int control,
+ IOBUF a, byte *buf, size_t *ret_len)
+{
+ size_t size = *ret_len;
+ struct md_thd_filter_context **r_mfx = opaque;
+ struct md_thd_filter_context *mfx = *r_mfx;
+ int rc=0;
+
+ if (control == IOBUFCTRL_INIT)
+ {
+ npth_attr_t tattr;
+ size_t n;
+
+ n = 2 * iobuf_set_buffer_size (0) * 1024;
+ mfx = xtrymalloc (n + offsetof (struct md_thd_filter_context, buf));
+ if (!mfx)
+ return gpg_error_from_syserror ();
+ *r_mfx = mfx;
+ mfx->bufsize = n / 2;
+ mfx->consume = mfx->produce = 0;
+ mfx->written0 = -1;
+ mfx->written1 = -1;
+
+ rc = npth_mutex_init (&mfx->mutex, NULL);
+ if (rc)
+ {
+ return gpg_error_from_errno (rc);
+ }
+ rc = npth_cond_init (&mfx->cond, NULL);
+ if (rc)
+ {
+ npth_mutex_destroy (&mfx->mutex);
+ return gpg_error_from_errno (rc);
+ }
+ rc = npth_attr_init (&tattr);
+ if (rc)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ return gpg_error_from_errno (rc);
+ }
+ npth_attr_setdetachstate (&tattr, NPTH_CREATE_JOINABLE);
+ rc = npth_create (&mfx->thd, &tattr, md_thread, mfx);
+ if (rc)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ npth_attr_destroy (&tattr);
+ return gpg_error_from_errno (rc);
+ }
+ npth_attr_destroy (&tattr);
+ }
+ else if (control == IOBUFCTRL_UNDERFLOW)
+ {
+ int i;
+ unsigned char *md_buf = NULL;
+
+ i = iobuf_read (a, buf, size);
+ if (i == -1)
+ i = 0;
+
+ rc = get_buffer_to_fill (mfx, &md_buf, i);
+ if (rc)
+ return rc;
+
+ if (i)
+ memcpy (md_buf, buf, i);
+
+ rc = put_buffer_to_send (mfx, i);
+ if (rc)
+ return rc;
+
+ if (i == 0)
+ {
+ npth_join (mfx->thd, NULL);
+ rc = -1; /* eof */
+ }
+
+ *ret_len = i;
+ }
+ else if (control == IOBUFCTRL_FREE)
+ {
+ npth_cond_destroy (&mfx->cond);
+ npth_mutex_destroy (&mfx->mutex);
+ xfree (mfx);
+ *r_mfx = NULL;
+ }
+ else if (control == IOBUFCTRL_DESC)
+ mem2str (buf, "md_thd_filter", *ret_len);
+
+ return rc;
+}
+
+void
+md_thd_filter_set_md (struct md_thd_filter_context *mfx, gcry_md_hd_t md)
+{
+ mfx->md = md;
+}
diff --git a/g10/options.h b/g10/options.h
index 914c24849..327a6a06f 100644
--- a/g10/options.h
+++ b/g10/options.h
@@ -373,7 +373,9 @@ EXTERN_UNLESS_MAIN_MODULE int memory_debug_mode;
EXTERN_UNLESS_MAIN_MODULE int memory_stat_debug_mode;
/* Compatibility flags */
-/* #define COMPAT_FOO 1 */
+#define COMPAT_PARALLELIZED 1
+
+/* #define COMPAT_FOO 2 */
/* Compliance test macors. */
diff --git a/g10/sign.c b/g10/sign.c
index 5588557c8..f68ea3bc7 100644
--- a/g10/sign.c
+++ b/g10/sign.c
@@ -1020,7 +1020,9 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
const char *fname;
armor_filter_context_t *afx;
compress_filter_context_t zfx;
+ gcry_md_hd_t md;
md_filter_context_t mfx;
+ md_thd_filter_context_t mfx2 = NULL;
text_filter_context_t tfx;
progress_filter_context_t *pfx;
encrypt_filter_context_t efx;
@@ -1126,10 +1128,10 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
iobuf_push_filter (inp, text_filter, &tfx);
}
- if (gcry_md_open (&mfx.md, 0, 0))
+ if (gcry_md_open (&md, 0, 0))
BUG ();
if (DBG_HASHING)
- gcry_md_debug (mfx.md, "sign");
+ gcry_md_debug (md, "sign");
/* If we're encrypting and signing, it is reasonable to pick the
* hash algorithm to use out of the recipient key prefs. This is
@@ -1226,10 +1228,21 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
}
for (sk_rover = sk_list; sk_rover; sk_rover = sk_rover->next)
- gcry_md_enable (mfx.md, hash_for (sk_rover->pk));
+ gcry_md_enable (md, hash_for (sk_rover->pk));
if (!multifile)
- iobuf_push_filter (inp, md_filter, &mfx);
+ {
+ if (encryptflag && (opt.compat_flags & COMPAT_PARALLELIZED))
+ {
+ iobuf_push_filter (inp, md_thd_filter, &mfx2);
+ md_thd_filter_set_md (mfx2, md);
+ }
+ else
+ {
+ iobuf_push_filter (inp, md_filter, &mfx);
+ mfx.md = md;
+ }
+ }
if (detached && !encryptflag)
afx->what = 2;
@@ -1292,7 +1305,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
goto leave;
}
- write_status_begin_signing (mfx.md);
+ write_status_begin_signing (md);
/* Setup the inner packet. */
if (detached)
@@ -1332,7 +1345,16 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
memset (&tfx, 0, sizeof tfx);
iobuf_push_filter (inp, text_filter, &tfx);
}
- iobuf_push_filter (inp, md_filter, &mfx);
+ if (encryptflag && (opt.compat_flags & COMPAT_PARALLELIZED))
+ {
+ iobuf_push_filter (inp, md_thd_filter, &mfx2);
+ md_thd_filter_set_md (mfx2, md);
+ }
+ else
+ {
+ iobuf_push_filter (inp, md_filter, &mfx);
+ mfx.md = md;
+ }
while (iobuf_read (inp, NULL, iobuf_size) != -1)
;
iobuf_close (inp);
@@ -1361,7 +1383,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
goto leave;
/* Write the signatures. */
- rc = write_signature_packets (ctrl, sk_list, out, mfx.md, extrahash,
+ rc = write_signature_packets (ctrl, sk_list, out, md, extrahash,
opt.textmode && !outfile? 0x01 : 0x00,
0, duration, detached ? 'D':'S', NULL);
if (rc)
@@ -1378,7 +1400,7 @@ sign_file (ctrl_t ctrl, strlist_t filenames, int detached, strlist_t locusr,
write_status (STATUS_END_ENCRYPTION);
}
iobuf_close (inp);
- gcry_md_close (mfx.md);
+ gcry_md_close (md);
release_sk_list (sk_list);
release_pk_list (pk_list);
recipient_digest_algo = 0;
@@ -1561,6 +1583,8 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr)
progress_filter_context_t *pfx;
compress_filter_context_t zfx;
md_filter_context_t mfx;
+ md_thd_filter_context_t mfx2 = NULL;
+ gcry_md_hd_t md;
text_filter_context_t tfx;
cipher_filter_context_t cfx;
iobuf_t inp = NULL;
@@ -1644,15 +1668,25 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr)
/* Prepare to calculate the MD over the input. */
if (opt.textmode)
iobuf_push_filter (inp, text_filter, &tfx);
- if (gcry_md_open (&mfx.md, 0, 0))
+ if (gcry_md_open (&md, 0, 0))
BUG ();
if (DBG_HASHING)
- gcry_md_debug (mfx.md, "symc-sign");
+ gcry_md_debug (md, "symc-sign");
for (sk_rover = sk_list; sk_rover; sk_rover = sk_rover->next)
- gcry_md_enable (mfx.md, hash_for (sk_rover->pk));
+ gcry_md_enable (md, hash_for (sk_rover->pk));
+
+ if ((opt.compat_flags & COMPAT_PARALLELIZED))
+ {
+ iobuf_push_filter (inp, md_thd_filter, &mfx2);
+ md_thd_filter_set_md (mfx2, md);
+ }
+ else
+ {
+ iobuf_push_filter (inp, md_filter, &mfx);
+ mfx.md = md;
+ }
- iobuf_push_filter (inp, md_filter, &mfx);
/* Push armor output filter */
if (opt.armor)
@@ -1694,7 +1728,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr)
if (rc)
goto leave;
- write_status_begin_signing (mfx.md);
+ write_status_begin_signing (md);
/* Pipe data through all filters; i.e. write the signed stuff. */
/* (current filters: zip - encrypt - armor) */
@@ -1706,7 +1740,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr)
/* Write the signatures. */
/* (current filters: zip - encrypt - armor) */
- rc = write_signature_packets (ctrl, sk_list, out, mfx.md, extrahash,
+ rc = write_signature_packets (ctrl, sk_list, out, md, extrahash,
opt.textmode? 0x01 : 0x00,
0, duration, 'S', NULL);
if (rc)
@@ -1723,7 +1757,7 @@ sign_symencrypt_file (ctrl_t ctrl, const char *fname, strlist_t locusr)
}
iobuf_close (inp);
release_sk_list (sk_list);
- gcry_md_close (mfx.md);
+ gcry_md_close (md);
xfree (cfx.dek);
xfree (s2k);
release_progress_context (pfx);