Skip to content

Commit f7a2b8a

Browse files
eshaztridge
authored andcommitted
feat: add threads to zstd compression
1 parent d941807 commit f7a2b8a

3 files changed

Lines changed: 16 additions & 9 deletions

File tree

compat.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ extern int need_messages_from_generator;
5252
extern int delete_mode, delete_before, delete_during, delete_after;
5353
extern int do_compression;
5454
extern int do_compression_level;
55+
extern int do_compression_threads;
5556
extern int saw_stderr_opt;
5657
extern int msgs2stderr;
5758
extern char *shell_cmd;

options.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ int sparse_files = 0;
8686
int preallocate_files = 0;
8787
int do_compression = 0;
8888
int do_compression_level = CLVL_NOT_SPECIFIED;
89+
int do_compression_threads = 0;
8990
int am_root = 0; /* 0 = normal, 1 = root, 2 = --super, -1 = --fake-super */
9091
int am_server = 0;
9192
int am_sender = 0;
@@ -754,6 +755,8 @@ static struct poptOption long_options[] = {
754755
{"compress-choice", 0, POPT_ARG_STRING, &compress_choice, 0, 0, 0 },
755756
{"zc", 0, POPT_ARG_STRING, &compress_choice, 0, 0, 0 },
756757
{"skip-compress", 0, POPT_ARG_STRING, &skip_compress, 0, 0, 0 },
758+
{"compress-threads", 0, POPT_ARG_INT, &do_compression_threads, 0, 0, 0 },
759+
{"zt", 0, POPT_ARG_INT, &do_compression_threads, 0, 0, 0 },
757760
{"compress-level", 0, POPT_ARG_INT, &do_compression_level, 0, 0, 0 },
758761
{"zl", 0, POPT_ARG_INT, &do_compression_level, 0, 0, 0 },
759762
{0, 'P', POPT_ARG_NONE, 0, 'P', 0, 0 },

token.c

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ extern int do_compression;
3333
extern int protocol_version;
3434
extern int module_id;
3535
extern int do_compression_level;
36+
extern int do_compression_threads;
3637
extern char *skip_compress;
3738

3839
#ifndef Z_INSERT_ONLY
@@ -692,6 +693,8 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
692693
obuf = new_array(char, OBUF_SIZE);
693694

694695
ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
696+
ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_nbWorkers, do_compression_threads);
697+
695698
zstd_out_buff.dst = obuf + 2;
696699

697700
comp_init_done = 1;
@@ -729,12 +732,11 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
729732
zstd_in_buff.src = map_ptr(buf, offset, nb);
730733
zstd_in_buff.size = nb;
731734
zstd_in_buff.pos = 0;
732-
735+
736+
int finished;
733737
do {
734-
if (zstd_out_buff.size == 0) {
735-
zstd_out_buff.size = MAX_DATA_COUNT;
736-
zstd_out_buff.pos = 0;
737-
}
738+
zstd_out_buff.size = MAX_DATA_COUNT;
739+
zstd_out_buff.pos = 0;
738740

739741
/* File ended, flush */
740742
if (token != -2)
@@ -752,20 +754,21 @@ static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T of
752754
* state and send a smaller buffer so that the remote side can
753755
* finish the file.
754756
*/
755-
if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
757+
finished = (flush == ZSTD_e_flush) ? (r == 0) : (zstd_in_buff.pos == zstd_in_buff.size);
758+
759+
if (zstd_out_buff.pos != 0) {
756760
n = zstd_out_buff.pos;
757761

758762
obuf[0] = DEFLATED_DATA + (n >> 8);
759763
obuf[1] = n;
760764
write_buf(f, obuf, n+2);
761-
762-
zstd_out_buff.size = 0;
763765
}
764766
/*
765767
* Loop while the input buffer isn't full consumed or the
766768
* internal state isn't fully flushed.
767769
*/
768-
} while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
770+
} while (!finished);
771+
769772
flush_pending = token == -2;
770773
}
771774

0 commit comments

Comments
 (0)