Use an ssse3 target instead of an inline declaration.
[rsync.git] / token.c
1 /*
2  * Routines used by the file-transfer code.
3  *
4  * Copyright (C) 1996 Andrew Tridgell
5  * Copyright (C) 1996 Paul Mackerras
6  * Copyright (C) 2003-2020 Wayne Davison
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, visit the http://fsf.org website.
20  */
21
22 #include "rsync.h"
23 #include "itypes.h"
24 #include <zlib.h>
25 #ifdef SUPPORT_ZSTD
26 #include <zstd.h>
27 #endif
28 #ifdef SUPPORT_LZ4
29 #include <lz4.h>
30 #endif
31
32 extern int do_compression;
33 extern int protocol_version;
34 extern int module_id;
35 extern int do_compression_level;
36 extern char *skip_compress;
37
38 #ifndef Z_INSERT_ONLY
39 #define Z_INSERT_ONLY Z_SYNC_FLUSH
40 #endif
41
42 static int compression_level; /* The compression level for the current file. */
43 static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */
44 static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */
45
46 struct suffix_tree {
47         struct suffix_tree *sibling;
48         struct suffix_tree *child;
49         char letter, word_end;
50 };
51
52 static char *match_list;
53 static struct suffix_tree *suftree;
54
55 void init_compression_level(void)
56 {
57         int min_level, max_level, def_level, off_level;
58
59         switch (do_compression) {
60         case CPRES_NONE:
61                 return;
62         case CPRES_ZLIB:
63         case CPRES_ZLIBX:
64                 min_level = 1;
65                 max_level = Z_BEST_COMPRESSION;
66                 def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
67                 off_level = skip_compression_level = Z_NO_COMPRESSION;
68                 if (do_compression_level == Z_DEFAULT_COMPRESSION)
69                         do_compression_level = def_level;
70                 break;
71 #ifdef SUPPORT_ZSTD
72         case CPRES_ZSTD:
73                 min_level = skip_compression_level = ZSTD_minCLevel();
74                 max_level = ZSTD_maxCLevel();
75                 def_level = ZSTD_CLEVEL_DEFAULT;
76                 off_level = CLVL_NOT_SPECIFIED;
77                 if (do_compression_level == 0)
78                         do_compression_level = def_level;
79                 break;
80 #endif
81 #ifdef SUPPORT_LZ4
82         case CPRES_LZ4:
83                 min_level = skip_compression_level = 0;
84                 max_level = 0;
85                 def_level = 0;
86                 off_level = CLVL_NOT_SPECIFIED;
87                 break;
88 #endif
89         default: /* paranoia to prevent missing case values */
90                 assert(0);
91         }
92
93         if (do_compression_level == CLVL_NOT_SPECIFIED)
94                 do_compression_level = def_level;
95         else if (do_compression_level == off_level) {
96                 do_compression = CPRES_NONE;
97                 return;
98         }
99
100         /* We don't bother with any errors or warnings -- just make sure that the values are valid. */
101         if (do_compression_level < min_level)
102                 do_compression_level = min_level;
103         else if (do_compression_level > max_level)
104                 do_compression_level = max_level;
105 }
106
107 static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
108 {
109         struct suffix_tree *node, *newnode;
110
111         if (ltr == '[') {
112                 const char *after = strchr(str, ']');
113                 /* Treat "[foo" and "[]" as having a literal '['. */
114                 if (after && after++ != str+1) {
115                         while ((ltr = *str++) != ']')
116                                 add_suffix(prior, ltr, after);
117                         return;
118                 }
119         }
120
121         for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
122                 if (node->letter == ltr) {
123                         if (*str)
124                                 add_suffix(&node->child, *str, str+1);
125                         else
126                                 node->word_end = 1;
127                         return;
128                 }
129                 if (node->letter > ltr)
130                         break;
131         }
132         if (!(newnode = new(struct suffix_tree)))
133                 out_of_memory("add_suffix");
134         newnode->sibling = node;
135         newnode->child = NULL;
136         newnode->letter = ltr;
137         *prior = newnode;
138         if (*str) {
139                 add_suffix(&newnode->child, *str, str+1);
140                 newnode->word_end = 0;
141         } else
142                 newnode->word_end = 1;
143 }
144
145 static void add_nocompress_suffixes(const char *str)
146 {
147         char *buf, *t;
148         const char *f = str;
149
150         if (!(buf = new_array(char, strlen(f) + 1)))
151                 out_of_memory("add_nocompress_suffixes");
152
153         while (*f) {
154                 if (*f == '/') {
155                         f++;
156                         continue;
157                 }
158
159                 t = buf;
160                 do {
161                         if (isUpper(f))
162                                 *t++ = toLower(f);
163                         else
164                                 *t++ = *f;
165                 } while (*++f != '/' && *f);
166                 *t++ = '\0';
167
168                 add_suffix(&suftree, *buf, buf+1);
169         }
170
171         free(buf);
172 }
173
174 static void init_set_compression(void)
175 {
176         const char *f;
177         char *t, *start;
178
179         if (skip_compress)
180                 add_nocompress_suffixes(skip_compress);
181
182         /* A non-daemon transfer skips the default suffix list if the
183          * user specified --skip-compress. */
184         if (skip_compress && module_id < 0)
185                 f = "";
186         else
187                 f = lp_dont_compress(module_id);
188
189         if (!(match_list = t = new_array(char, strlen(f) + 2)))
190                 out_of_memory("set_compression");
191
192         per_file_default_level = do_compression_level;
193
194         while (*f) {
195                 if (*f == ' ') {
196                         f++;
197                         continue;
198                 }
199
200                 start = t;
201                 do {
202                         if (isUpper(f))
203                                 *t++ = toLower(f);
204                         else
205                                 *t++ = *f;
206                 } while (*++f != ' ' && *f);
207                 *t++ = '\0';
208
209                 if (t - start == 1+1 && *start == '*') {
210                         /* Optimize a match-string of "*". */
211                         *match_list = '\0';
212                         suftree = NULL;
213                         per_file_default_level = skip_compression_level;
214                         break;
215                 }
216
217                 /* Move *.foo items into the stuffix tree. */
218                 if (*start == '*' && start[1] == '.' && start[2]
219                  && !strpbrk(start+2, ".?*")) {
220                         add_suffix(&suftree, start[2], start+3);
221                         t = start;
222                 }
223         }
224         *t++ = '\0';
225 }
226
227 /* determine the compression level based on a wildcard filename list */
228 void set_compression(const char *fname)
229 {
230         const struct suffix_tree *node;
231         const char *s;
232         char ltr;
233
234         if (!do_compression)
235                 return;
236
237         if (!match_list)
238                 init_set_compression();
239
240         compression_level = per_file_default_level;
241
242         if (!*match_list && !suftree)
243                 return;
244
245         if ((s = strrchr(fname, '/')) != NULL)
246                 fname = s + 1;
247
248         for (s = match_list; *s; s += strlen(s) + 1) {
249                 if (iwildmatch(s, fname)) {
250                         compression_level = skip_compression_level;
251                         return;
252                 }
253         }
254
255         if (!(node = suftree) || !(s = strrchr(fname, '.'))
256          || s == fname || !(ltr = *++s))
257                 return;
258
259         while (1) {
260                 if (isUpper(&ltr))
261                         ltr = toLower(&ltr);
262                 while (node->letter != ltr) {
263                         if (node->letter > ltr)
264                                 return;
265                         if (!(node = node->sibling))
266                                 return;
267                 }
268                 if ((ltr = *++s) == '\0') {
269                         if (node->word_end)
270                                 compression_level = skip_compression_level;
271                         return;
272                 }
273                 if (!(node = node->child))
274                         return;
275         }
276 }
277
278 /* non-compressing recv token */
279 static int32 simple_recv_token(int f, char **data)
280 {
281         static int32 residue;
282         static char *buf;
283         int32 n;
284
285         if (!buf) {
286                 buf = new_array(char, CHUNK_SIZE);
287                 if (!buf)
288                         out_of_memory("simple_recv_token");
289         }
290
291         if (residue == 0) {
292                 int32 i = read_int(f);
293                 if (i <= 0)
294                         return i;
295                 residue = i;
296         }
297
298         *data = buf;
299         n = MIN(CHUNK_SIZE,residue);
300         residue -= n;
301         read_buf(f,buf,n);
302         return n;
303 }
304
305 /* non-compressing send token */
306 static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n)
307 {
308         if (n > 0) {
309                 int32 len = 0;
310                 while (len < n) {
311                         int32 n1 = MIN(CHUNK_SIZE, n-len);
312                         write_int(f, n1);
313                         write_buf(f, map_ptr(buf, offset+len, n1), n1);
314                         len += n1;
315                 }
316         }
317         /* a -2 token means to send data only and no token */
318         if (token != -2)
319                 write_int(f, -(token+1));
320 }
321
322 /* Flag bytes in compressed stream are encoded as follows: */
323 #define END_FLAG        0       /* that's all folks */
324 #define TOKEN_LONG      0x20    /* followed by 32-bit token number */
325 #define TOKENRUN_LONG   0x21    /* ditto with 16-bit run count */
326 #define DEFLATED_DATA   0x40    /* + 6-bit high len, then low len byte */
327 #define TOKEN_REL       0x80    /* + 6-bit relative token number */
328 #define TOKENRUN_REL    0xc0    /* ditto with 16-bit run count */
329
330 #define MAX_DATA_COUNT  16383   /* fit 14 bit count into 2 bytes with flags */
331
332 /* zlib.h says that if we want to be able to compress something in a single
333  * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
334  * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
335  * to ensure that this is a compile-time value). */
336 #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
337
338 /* For coding runs of tokens */
339 static int32 last_token = -1;
340 static int32 run_start;
341 static int32 last_run_end;
342
343 /* Deflation state */
344 static z_stream tx_strm;
345
346 /* Output buffer */
347 static char *obuf;
348
349 /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
350  * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
351 #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
352 #define OBUF_SIZE       (MAX_DATA_COUNT+2)
353 #else
354 #define OBUF_SIZE       AVAIL_OUT_SIZE(CHUNK_SIZE)
355 #endif
356
357 /* Send a deflated token */
358 static void
359 send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen)
360 {
361         static int init_done, flush_pending;
362         int32 n, r;
363
364         if (last_token == -1) {
365                 /* initialization */
366                 if (!init_done) {
367                         tx_strm.next_in = NULL;
368                         tx_strm.zalloc = NULL;
369                         tx_strm.zfree = NULL;
370                         if (deflateInit2(&tx_strm, compression_level,
371                                          Z_DEFLATED, -15, 8,
372                                          Z_DEFAULT_STRATEGY) != Z_OK) {
373                                 rprintf(FERROR, "compression init failed\n");
374                                 exit_cleanup(RERR_PROTOCOL);
375                         }
376                         if ((obuf = new_array(char, OBUF_SIZE)) == NULL)
377                                 out_of_memory("send_deflated_token");
378                         init_done = 1;
379                 } else
380                         deflateReset(&tx_strm);
381                 last_run_end = 0;
382                 run_start = token;
383                 flush_pending = 0;
384         } else if (last_token == -2) {
385                 run_start = token;
386         } else if (nb != 0 || token != last_token + 1
387                    || token >= run_start + 65536) {
388                 /* output previous run */
389                 r = run_start - last_run_end;
390                 n = last_token - run_start;
391                 if (r >= 0 && r <= 63) {
392                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
393                 } else {
394                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
395                         write_int(f, run_start);
396                 }
397                 if (n != 0) {
398                         write_byte(f, n);
399                         write_byte(f, n >> 8);
400                 }
401                 last_run_end = last_token;
402                 run_start = token;
403         }
404
405         last_token = token;
406
407         if (nb != 0 || flush_pending) {
408                 /* deflate the data starting at offset */
409                 int flush = Z_NO_FLUSH;
410                 tx_strm.avail_in = 0;
411                 tx_strm.avail_out = 0;
412                 do {
413                         if (tx_strm.avail_in == 0 && nb != 0) {
414                                 /* give it some more input */
415                                 n = MIN(nb, CHUNK_SIZE);
416                                 tx_strm.next_in = (Bytef *)
417                                         map_ptr(buf, offset, n);
418                                 tx_strm.avail_in = n;
419                                 nb -= n;
420                                 offset += n;
421                         }
422                         if (tx_strm.avail_out == 0) {
423                                 tx_strm.next_out = (Bytef *)(obuf + 2);
424                                 tx_strm.avail_out = MAX_DATA_COUNT;
425                                 if (flush != Z_NO_FLUSH) {
426                                         /*
427                                          * We left the last 4 bytes in the
428                                          * buffer, in case they are the
429                                          * last 4.  Move them to the front.
430                                          */
431                                         memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4);
432                                         tx_strm.next_out += 4;
433                                         tx_strm.avail_out -= 4;
434                                 }
435                         }
436                         if (nb == 0 && token != -2)
437                                 flush = Z_SYNC_FLUSH;
438                         r = deflate(&tx_strm, flush);
439                         if (r != Z_OK) {
440                                 rprintf(FERROR, "deflate returned %d\n", r);
441                                 exit_cleanup(RERR_STREAMIO);
442                         }
443                         if (nb == 0 || tx_strm.avail_out == 0) {
444                                 n = MAX_DATA_COUNT - tx_strm.avail_out;
445                                 if (flush != Z_NO_FLUSH) {
446                                         /*
447                                          * We have to trim off the last 4
448                                          * bytes of output when flushing
449                                          * (they are just 0, 0, ff, ff).
450                                          */
451                                         n -= 4;
452                                 }
453                                 if (n > 0) {
454                                         obuf[0] = DEFLATED_DATA + (n >> 8);
455                                         obuf[1] = n;
456                                         write_buf(f, obuf, n+2);
457                                 }
458                         }
459                 } while (nb != 0 || tx_strm.avail_out == 0);
460                 flush_pending = token == -2;
461         }
462
463         if (token == -1) {
464                 /* end of file - clean up */
465                 write_byte(f, END_FLAG);
466         } else if (token != -2 && do_compression == CPRES_ZLIB) {
467                 /* Add the data in the current block to the compressor's
468                  * history and hash table. */
469                 do {
470                         /* Break up long sections in the same way that
471                          * see_deflate_token() does. */
472                         int32 n1 = toklen > 0xffff ? 0xffff : toklen;
473                         toklen -= n1;
474                         tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
475                         tx_strm.avail_in = n1;
476                         if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
477                                 offset += n1;
478                         tx_strm.next_out = (Bytef *) obuf;
479                         tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
480                         r = deflate(&tx_strm, Z_INSERT_ONLY);
481                         if (r != Z_OK || tx_strm.avail_in != 0) {
482                                 rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
483                                         r, tx_strm.avail_in);
484                                 exit_cleanup(RERR_STREAMIO);
485                         }
486                 } while (toklen > 0);
487         }
488 }
489
490 /* tells us what the receiver is in the middle of doing */
491 static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
492
493 /* for inflating stuff */
494 static z_stream rx_strm;
495 static char *cbuf;
496 static char *dbuf;
497
498 /* for decoding runs of tokens */
499 static int32 rx_token;
500 static int32 rx_run;
501
502 /* Receive a deflated token and inflate it */
503 static int32 recv_deflated_token(int f, char **data)
504 {
505         static int init_done;
506         static int32 saved_flag;
507         int32 n, flag;
508         int r;
509
510         for (;;) {
511                 switch (recv_state) {
512                 case r_init:
513                         if (!init_done) {
514                                 rx_strm.next_out = NULL;
515                                 rx_strm.zalloc = NULL;
516                                 rx_strm.zfree = NULL;
517                                 if (inflateInit2(&rx_strm, -15) != Z_OK) {
518                                         rprintf(FERROR, "inflate init failed\n");
519                                         exit_cleanup(RERR_PROTOCOL);
520                                 }
521                                 if (!(cbuf = new_array(char, MAX_DATA_COUNT))
522                                  || !(dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE))))
523                                         out_of_memory("recv_deflated_token");
524                                 init_done = 1;
525                         } else {
526                                 inflateReset(&rx_strm);
527                         }
528                         recv_state = r_idle;
529                         rx_token = 0;
530                         break;
531
532                 case r_idle:
533                 case r_inflated:
534                         if (saved_flag) {
535                                 flag = saved_flag & 0xff;
536                                 saved_flag = 0;
537                         } else
538                                 flag = read_byte(f);
539                         if ((flag & 0xC0) == DEFLATED_DATA) {
540                                 n = ((flag & 0x3f) << 8) + read_byte(f);
541                                 read_buf(f, cbuf, n);
542                                 rx_strm.next_in = (Bytef *)cbuf;
543                                 rx_strm.avail_in = n;
544                                 recv_state = r_inflating;
545                                 break;
546                         }
547                         if (recv_state == r_inflated) {
548                                 /* check previous inflated stuff ended correctly */
549                                 rx_strm.avail_in = 0;
550                                 rx_strm.next_out = (Bytef *)dbuf;
551                                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
552                                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
553                                 n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
554                                 /*
555                                  * Z_BUF_ERROR just means no progress was
556                                  * made, i.e. the decompressor didn't have
557                                  * any pending output for us.
558                                  */
559                                 if (r != Z_OK && r != Z_BUF_ERROR) {
560                                         rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
561                                                 r, n);
562                                         exit_cleanup(RERR_STREAMIO);
563                                 }
564                                 if (n != 0 && r != Z_BUF_ERROR) {
565                                         /* have to return some more data and
566                                            save the flag for later. */
567                                         saved_flag = flag + 0x10000;
568                                         *data = dbuf;
569                                         return n;
570                                 }
571                                 /*
572                                  * At this point the decompressor should
573                                  * be expecting to see the 0, 0, ff, ff bytes.
574                                  */
575                                 if (!inflateSyncPoint(&rx_strm)) {
576                                         rprintf(FERROR, "decompressor lost sync!\n");
577                                         exit_cleanup(RERR_STREAMIO);
578                                 }
579                                 rx_strm.avail_in = 4;
580                                 rx_strm.next_in = (Bytef *)cbuf;
581                                 cbuf[0] = cbuf[1] = 0;
582                                 cbuf[2] = cbuf[3] = 0xff;
583                                 inflate(&rx_strm, Z_SYNC_FLUSH);
584                                 recv_state = r_idle;
585                         }
586                         if (flag == END_FLAG) {
587                                 /* that's all folks */
588                                 recv_state = r_init;
589                                 return 0;
590                         }
591
592                         /* here we have a token of some kind */
593                         if (flag & TOKEN_REL) {
594                                 rx_token += flag & 0x3f;
595                                 flag >>= 6;
596                         } else
597                                 rx_token = read_int(f);
598                         if (flag & 1) {
599                                 rx_run = read_byte(f);
600                                 rx_run += read_byte(f) << 8;
601                                 recv_state = r_running;
602                         }
603                         return -1 - rx_token;
604
605                 case r_inflating:
606                         rx_strm.next_out = (Bytef *)dbuf;
607                         rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
608                         r = inflate(&rx_strm, Z_NO_FLUSH);
609                         n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
610                         if (r != Z_OK) {
611                                 rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
612                                 exit_cleanup(RERR_STREAMIO);
613                         }
614                         if (rx_strm.avail_in == 0)
615                                 recv_state = r_inflated;
616                         if (n != 0) {
617                                 *data = dbuf;
618                                 return n;
619                         }
620                         break;
621
622                 case r_running:
623                         ++rx_token;
624                         if (--rx_run == 0)
625                                 recv_state = r_idle;
626                         return -1 - rx_token;
627                 }
628         }
629 }
630
631 /*
632  * put the data corresponding to a token that we've just returned
633  * from recv_deflated_token into the decompressor's history buffer.
634  */
635 static void see_deflate_token(char *buf, int32 len)
636 {
637         int r;
638         int32 blklen;
639         unsigned char hdr[5];
640
641         rx_strm.avail_in = 0;
642         blklen = 0;
643         hdr[0] = 0;
644         do {
645                 if (rx_strm.avail_in == 0 && len != 0) {
646                         if (blklen == 0) {
647                                 /* Give it a fake stored-block header. */
648                                 rx_strm.next_in = (Bytef *)hdr;
649                                 rx_strm.avail_in = 5;
650                                 blklen = len;
651                                 if (blklen > 0xffff)
652                                         blklen = 0xffff;
653                                 hdr[1] = blklen;
654                                 hdr[2] = blklen >> 8;
655                                 hdr[3] = ~hdr[1];
656                                 hdr[4] = ~hdr[2];
657                         } else {
658                                 rx_strm.next_in = (Bytef *)buf;
659                                 rx_strm.avail_in = blklen;
660                                 if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
661                                         buf += blklen;
662                                 len -= blklen;
663                                 blklen = 0;
664                         }
665                 }
666                 rx_strm.next_out = (Bytef *)dbuf;
667                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
668                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
669                 if (r != Z_OK && r != Z_BUF_ERROR) {
670                         rprintf(FERROR, "inflate (token) returned %d\n", r);
671                         exit_cleanup(RERR_STREAMIO);
672                 }
673         } while (len || rx_strm.avail_out == 0);
674 }
675
676 #ifdef SUPPORT_ZSTD
677
678 static ZSTD_inBuffer zstd_in_buff;
679 static ZSTD_outBuffer zstd_out_buff;
680 static ZSTD_CCtx *zstd_cctx;
681
682 static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
683 {
684         static int comp_init_done, flush_pending;
685         ZSTD_EndDirective flush = ZSTD_e_continue;
686         int32 n, r;
687
688         /* initialization */
689         if (!comp_init_done) {
690
691                 zstd_cctx = ZSTD_createCCtx();
692                 if (!zstd_cctx) {
693                         rprintf(FERROR, "compression init failed\n");
694                         exit_cleanup(RERR_PROTOCOL);
695                 }
696
697                 obuf = new_array(char, OBUF_SIZE);
698                 if (!obuf)
699                         out_of_memory("send_deflated_token");
700
701                 ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
702                 zstd_out_buff.dst = obuf + 2;
703
704                 comp_init_done = 1;
705         }
706
707         if (last_token == -1) {
708                 last_run_end = 0;
709                 run_start = token;
710                 flush_pending = 0;
711         } else if (last_token == -2) {
712                 run_start = token;
713
714         } else if (nb != 0 || token != last_token + 1
715                    || token >= run_start + 65536) {
716
717                 /* output previous run */
718                 r = run_start - last_run_end;
719                 n = last_token - run_start;
720
721                 if (r >= 0 && r <= 63) {
722                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
723                 } else {
724                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
725                         write_int(f, run_start);
726                 }
727                 if (n != 0) {
728                         write_byte(f, n);
729                         write_byte(f, n >> 8);
730                 }
731                 last_run_end = last_token;
732                 run_start = token;
733         }
734
735         last_token = token;
736
737         if (nb || flush_pending) {
738
739                 zstd_in_buff.src = map_ptr(buf, offset, nb);
740                 zstd_in_buff.size = nb;
741                 zstd_in_buff.pos = 0;
742
743                 do {
744                         if (zstd_out_buff.size == 0) {
745                                 zstd_out_buff.size = MAX_DATA_COUNT;
746                                 zstd_out_buff.pos = 0;
747                         }
748
749                         /* File ended, flush */
750                         if (token != -2)
751                                 flush = ZSTD_e_flush;
752
753                         r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
754                         if (ZSTD_isError(r)) {
755                                 rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
756                                 exit_cleanup(RERR_STREAMIO);
757                         }
758
759                         /*
760                          * Nothing is sent if the buffer isn't full so avoid smaller
761                          * transfers. If a file is finished then we flush the internal
762                          * state and send a smaller buffer so that the remote side can
763                          * finish the file.
764                          */
765                         if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
766                                 n = zstd_out_buff.pos;
767
768                                 obuf[0] = DEFLATED_DATA + (n >> 8);
769                                 obuf[1] = n;
770                                 write_buf(f, obuf, n+2);
771
772                                 zstd_out_buff.size = 0;
773                         }
774                         /*
775                          * Loop while the input buffer isn't full consumed or the
776                          * internal state isn't fully flushed.
777                          */
778                 } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
779                 flush_pending = token == -2;
780         }
781
782         if (token == -1) {
783                 /* end of file - clean up */
784                 write_byte(f, END_FLAG);
785         }
786 }
787
788 static ZSTD_DCtx *zstd_dctx;
789
790 static int32 recv_zstd_token(int f, char **data)
791 {
792         static int decomp_init_done;
793         static int out_buffer_size;
794         int32 n, flag;
795         int r;
796
797         if (!decomp_init_done) {
798
799                 zstd_dctx = ZSTD_createDCtx();
800                 if (!zstd_dctx) {
801                         rprintf(FERROR, "ZSTD_createDStream failed\n");
802                         exit_cleanup(RERR_PROTOCOL);
803                 }
804
805                 /* Output buffer fits two decompressed blocks */
806                 out_buffer_size = ZSTD_DStreamOutSize() * 2;
807                 cbuf = new_array(char, MAX_DATA_COUNT);
808                 dbuf = new_array(char, out_buffer_size);
809                 if (!cbuf || !dbuf)
810                         out_of_memory("recv_zstd_token");
811
812                 zstd_in_buff.src = cbuf;
813                 zstd_out_buff.dst = dbuf;
814
815                 decomp_init_done = 1;
816         }
817
818         do {
819         switch (recv_state) {
820         case r_init:
821                 recv_state = r_idle;
822                 rx_token = 0;
823                 break;
824
825         case r_idle:
826                 flag = read_byte(f);
827                 if ((flag & 0xC0) == DEFLATED_DATA) {
828                         n = ((flag & 0x3f) << 8) + read_byte(f);
829                         read_buf(f, cbuf, n);
830
831                         zstd_in_buff.size = n;
832                         zstd_in_buff.pos = 0;
833
834                         recv_state = r_inflating;
835
836                 } else if (flag == END_FLAG) {
837                         /* that's all folks */
838                         recv_state = r_init;
839                         return 0;
840
841                 } else {
842                         /* here we have a token of some kind */
843                         if (flag & TOKEN_REL) {
844                                 rx_token += flag & 0x3f;
845                                 flag >>= 6;
846                         } else
847                                 rx_token = read_int(f);
848                         if (flag & 1) {
849                                 rx_run = read_byte(f);
850                                 rx_run += read_byte(f) << 8;
851                                 recv_state = r_running;
852                         }
853                         return -1 - rx_token;
854                 }
855                 break;
856
857         case r_inflating:
858                 zstd_out_buff.size = out_buffer_size;
859                 zstd_out_buff.pos = 0;
860
861                 r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
862                 n = zstd_out_buff.pos;
863                 if (ZSTD_isError(r)) {
864                         rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
865                         exit_cleanup(RERR_STREAMIO);
866                 }
867
868                 /*
869                  * If the input buffer is fully consumed and the output
870                  * buffer is not full then next step is to read more
871                  * data.
872                  */
873                 if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
874                         recv_state = r_idle;
875
876                 if (n != 0) {
877                         *data = dbuf;
878                         return n;
879                 }
880                 break;
881
882         case r_running:
883                 ++rx_token;
884                 if (--rx_run == 0)
885                         recv_state = r_idle;
886                 return -1 - rx_token;
887                 break;
888
889         case r_inflated:
890                 break;
891         }
892         } while (1);
893 }
894 #endif /* SUPPORT_ZSTD */
895
896 #ifdef SUPPORT_LZ4
897 static void
898 send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
899 {
900         static int init_done, flush_pending;
901         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
902         int32 n, r;
903
904         if (last_token == -1) {
905                 if (!init_done) {
906                         if ((obuf = new_array(char, size)) == NULL)
907                                 out_of_memory("send_compressed_token");
908                         init_done = 1;
909                 }
910                 last_run_end = 0;
911                 run_start = token;
912                 flush_pending = 0;
913         } else if (last_token == -2) {
914                 run_start = token;
915         } else if (nb != 0 || token != last_token + 1
916                    || token >= run_start + 65536) {
917                 /* output previous run */
918                 r = run_start - last_run_end;
919                 n = last_token - run_start;
920                 if (r >= 0 && r <= 63) {
921                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
922                 } else {
923                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
924                         write_int(f, run_start);
925                 }
926                 if (n != 0) {
927                         write_byte(f, n);
928                         write_byte(f, n >> 8);
929                 }
930                 last_run_end = last_token;
931                 run_start = token;
932         }
933
934         last_token = token;
935
936         if (nb != 0 || flush_pending) {
937                 int available_in, available_out = 0;
938                 const char *next_in;
939
940                 do {
941                         char *ptr = obuf;
942                         char *next_out = obuf + 2;
943
944                         if (available_out == 0) {
945                                 available_in = MIN(nb, MAX_DATA_COUNT);
946                                 next_in = map_ptr(buf, offset, available_in);
947                         } else
948                                 available_in /= 2;
949
950                         available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
951                         if (!available_out) {
952                                 rprintf(FERROR, "compress returned %d\n", available_out);
953                                 exit_cleanup(RERR_STREAMIO);
954                         }
955                         if (available_out <= MAX_DATA_COUNT) {
956                                 ptr[0] = DEFLATED_DATA + (available_out >> 8);
957                                 ptr[1] = available_out;
958
959                                 write_buf(f, ptr, available_out + 2);
960
961                                 available_out = 0;
962                                 nb -= available_in;
963                                 offset += available_in;
964                         }
965                 } while (nb != 0);
966                 flush_pending = token == -2;
967         }
968         if (token == -1)
969                 /* end of file - clean up */
970                 write_byte(f, END_FLAG);
971 }
972
973 static int32 recv_compressed_token(int f, char **data)
974 {
975         static int32 saved_flag;
976         static int init_done;
977         int32 n, flag;
978         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
979         static const char *next_in;
980         static int avail_in;
981         int avail_out;
982
983         for (;;) {
984                 switch (recv_state) {
985                 case r_init:
986                         if (!init_done) {
987                                 if (!(cbuf = new_array(char, MAX_DATA_COUNT))
988                                  || !(dbuf = new_array(char, size)))
989                                         out_of_memory("recv_compressed_token");
990                                 init_done = 1;
991                         }
992                         recv_state = r_idle;
993                         rx_token = 0;
994                         break;
995                 case r_idle:
996                 case r_inflated:
997                         if (saved_flag) {
998                                 flag = saved_flag & 0xff;
999                                 saved_flag = 0;
1000                         } else
1001                                 flag = read_byte(f);
1002                         if ((flag & 0xC0) == DEFLATED_DATA) {
1003                                 n = ((flag & 0x3f) << 8) + read_byte(f);
1004                                 read_buf(f, cbuf, n);
1005                                 next_in = (char *)cbuf;
1006                                 avail_in = n;
1007                                 recv_state = r_inflating;
1008                                 break;
1009                         }
1010
1011                         if (recv_state == r_inflated)
1012                                 recv_state = r_idle;
1013
1014                         if (flag == END_FLAG) {
1015                                 /* that's all folks */
1016                                 recv_state = r_init;
1017                                 return 0;
1018                         }
1019
1020                         /* here we have a token of some kind */
1021                         if (flag & TOKEN_REL) {
1022                                 rx_token += flag & 0x3f;
1023                                 flag >>= 6;
1024                         } else
1025                                 rx_token = read_int(f);
1026                         if (flag & 1) {
1027                                 rx_run = read_byte(f);
1028                                 rx_run += read_byte(f) << 8;
1029                                 recv_state = r_running;
1030                         }
1031                         return -1 - rx_token;
1032
1033                 case r_inflating:
1034                         avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
1035                         if (avail_out < 0) {
1036                                 rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1037                                 exit_cleanup(RERR_STREAMIO);
1038                         }
1039                         recv_state = r_inflated;
1040                         *data = dbuf;
1041                         return avail_out;
1042
1043                 case r_running:
1044                         ++rx_token;
1045                         if (--rx_run == 0)
1046                                 recv_state = r_idle;
1047                         return -1 - rx_token;
1048                 }
1049         }
1050
1051 }
1052
1053 # if 0
1054 static void see_uncompressed_token(char *buf, int32 len)
1055 {
1056         static const char *next_in;
1057         static int avail_in;
1058         int avail_out;
1059
1060         int32 blklen;
1061         char hdr[5];
1062
1063         avail_in = 0;
1064         blklen = 0;
1065         hdr[0] = 0;
1066         do {
1067                 if (avail_in == 0 && len != 0) {
1068                         if (blklen == 0) {
1069                                 /* Give it a fake stored-block header. */
1070                                 next_in = hdr;
1071                                 avail_in = 5;
1072                                 blklen = len;
1073                                 if (blklen > 0xffff)
1074                                         blklen = 0xffff;
1075                                 hdr[1] = blklen;
1076                                 hdr[2] = blklen >> 8;
1077                                 hdr[3] = ~hdr[1];
1078                                 hdr[4] = ~hdr[2];
1079                         } else {
1080                                 next_in = (char *)buf;
1081                                 avail_in = blklen;
1082                                 if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
1083                                         buf += blklen;
1084                                 len -= blklen;
1085                                 blklen = 0;
1086                         }
1087                 }
1088                 avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, LZ4_compressBound(CHUNK_SIZE));
1089                 if (avail_out < 0) {
1090                         rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1091                         exit_cleanup(RERR_STREAMIO);
1092                 }
1093
1094         } while (len);
1095 }
1096 # endif /* 0 */
1097 #endif /* SUPPORT_LZ4 */
1098
1099 /**
1100  * Transmit a verbatim buffer of length @p n followed by a token.
1101  * If token == -1 then we have reached EOF
1102  * If n == 0 then don't send a buffer
1103  */
1104 void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
1105                 int32 n, int32 toklen)
1106 {
1107         switch (do_compression) {
1108         case CPRES_NONE:
1109                 simple_send_token(f, token, buf, offset, n);
1110                 break;
1111         case CPRES_ZLIB:
1112         case CPRES_ZLIBX:
1113                 send_deflated_token(f, token, buf, offset, n, toklen);
1114                 break;
1115 #ifdef SUPPORT_ZSTD
1116         case CPRES_ZSTD:
1117                 send_zstd_token(f, token, buf, offset, n);
1118                 break;
1119 #endif
1120 #ifdef SUPPORT_LZ4
1121         case CPRES_LZ4:
1122                 send_compressed_token(f, token, buf, offset, n);
1123                 break;
1124 #endif
1125         default:
1126                 assert(0);
1127         }
1128 }
1129
1130 /*
1131  * receive a token or buffer from the other end. If the return value is >0 then
1132  * it is a data buffer of that length, and *data will point at the data.
1133  * if the return value is -i then it represents token i-1
1134  * if the return value is 0 then the end has been reached
1135  */
1136 int32 recv_token(int f, char **data)
1137 {
1138         switch (do_compression) {
1139         case CPRES_NONE:
1140                 return simple_recv_token(f,data);
1141         case CPRES_ZLIB:
1142         case CPRES_ZLIBX:
1143                 return recv_deflated_token(f, data);
1144 #ifdef SUPPORT_ZSTD
1145         case CPRES_ZSTD:
1146                 return recv_zstd_token(f, data);
1147 #endif
1148 #ifdef SUPPORT_LZ4
1149         case CPRES_LZ4:
1150                 return recv_compressed_token(f, data);
1151 #endif
1152         default:
1153                 assert(0);
1154         }
1155 }
1156
1157 /*
1158  * look at the data corresponding to a token, if necessary
1159  */
1160 void see_token(char *data, int32 toklen)
1161 {
1162         switch (do_compression) {
1163         case CPRES_NONE:
1164                 break;
1165         case CPRES_ZLIB:
1166                 see_deflate_token(data, toklen);
1167                 break;
1168         case CPRES_ZLIBX:
1169                 break;
1170 #ifdef SUPPORT_LZ4
1171         case CPRES_LZ4:
1172                 /*see_uncompressed_token(data, toklen);*/
1173                 break;
1174 #endif
1175 #ifdef SUPPORT_LZ4
1176         case CPRES_ZSTD:
1177                 break;
1178 #endif
1179         default:
1180                 assert(0);
1181         }
1182 }