Switch to using LZ4_compress_default().
[rsync.git] / token.c
1 /*
2  * Routines used by the file-transfer code.
3  *
4  * Copyright (C) 1996 Andrew Tridgell
5  * Copyright (C) 1996 Paul Mackerras
6  * Copyright (C) 2003-2020 Wayne Davison
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License as published by
10  * the Free Software Foundation; either version 3 of the License, or
11  * (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  * GNU General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public License along
19  * with this program; if not, visit the http://fsf.org website.
20  */
21
22 #include "rsync.h"
23 #include "itypes.h"
24 #include <zlib.h>
25 #ifdef SUPPORT_ZSTD
26 #include <zstd.h>
27 #endif
28 #ifdef SUPPORT_LZ4
29 #include <lz4.h>
30 #endif
31
32 extern int do_compression;
33 extern int protocol_version;
34 extern int module_id;
35 extern int do_compression_level;
36 extern char *skip_compress;
37
38 #ifndef Z_INSERT_ONLY
39 #define Z_INSERT_ONLY Z_SYNC_FLUSH
40 #endif
41
42 static int compression_level, per_file_default_level;
43
44 struct suffix_tree {
45         struct suffix_tree *sibling;
46         struct suffix_tree *child;
47         char letter, word_end;
48 };
49
50 static char *match_list;
51 static struct suffix_tree *suftree;
52
53 void init_compression_level(void)
54 {
55         int min_level, max_level, def_level, off_level;
56
57         switch (do_compression) {
58         case CPRES_ZLIB:
59         case CPRES_ZLIBX:
60                 min_level = 1;
61                 max_level = Z_BEST_COMPRESSION;
62                 def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
63                 off_level = Z_NO_COMPRESSION;
64                 if (do_compression_level == Z_DEFAULT_COMPRESSION)
65                         do_compression_level = def_level;
66                 break;
67 #ifdef SUPPORT_ZSTD
68         case CPRES_ZSTD:
69                 min_level = ZSTD_minCLevel();
70                 max_level = ZSTD_maxCLevel();
71                 def_level = 3;
72                 off_level = CLVL_NOT_SPECIFIED;
73                 break;
74 #endif
75 #ifdef SUPPORT_LZ4
76         case CPRES_LZ4:
77                 min_level = 0;
78                 max_level = 0;
79                 def_level = 0;
80                 off_level = CLVL_NOT_SPECIFIED;
81                 break;
82 #endif
83         default: /* paranoia to prevent missing case values */
84                 exit_cleanup(RERR_UNSUPPORTED);
85         }
86
87         if (do_compression_level == CLVL_NOT_SPECIFIED)
88                 do_compression_level = def_level;
89         else if (do_compression_level == off_level) {
90                 do_compression = CPRES_NONE;
91                 return;
92         }
93
94         /* We don't bother with any errors or warnings -- just make sure that the values are valid. */
95         if (do_compression_level < min_level)
96                 do_compression_level = min_level;
97         else if (do_compression_level > max_level)
98                 do_compression_level = max_level;
99 }
100
101 static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
102 {
103         struct suffix_tree *node, *newnode;
104
105         if (ltr == '[') {
106                 const char *after = strchr(str, ']');
107                 /* Treat "[foo" and "[]" as having a literal '['. */
108                 if (after && after++ != str+1) {
109                         while ((ltr = *str++) != ']')
110                                 add_suffix(prior, ltr, after);
111                         return;
112                 }
113         }
114
115         for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
116                 if (node->letter == ltr) {
117                         if (*str)
118                                 add_suffix(&node->child, *str, str+1);
119                         else
120                                 node->word_end = 1;
121                         return;
122                 }
123                 if (node->letter > ltr)
124                         break;
125         }
126         if (!(newnode = new(struct suffix_tree)))
127                 out_of_memory("add_suffix");
128         newnode->sibling = node;
129         newnode->child = NULL;
130         newnode->letter = ltr;
131         *prior = newnode;
132         if (*str) {
133                 add_suffix(&newnode->child, *str, str+1);
134                 newnode->word_end = 0;
135         } else
136                 newnode->word_end = 1;
137 }
138
139 static void add_nocompress_suffixes(const char *str)
140 {
141         char *buf, *t;
142         const char *f = str;
143
144         if (!(buf = new_array(char, strlen(f) + 1)))
145                 out_of_memory("add_nocompress_suffixes");
146
147         while (*f) {
148                 if (*f == '/') {
149                         f++;
150                         continue;
151                 }
152
153                 t = buf;
154                 do {
155                         if (isUpper(f))
156                                 *t++ = toLower(f);
157                         else
158                                 *t++ = *f;
159                 } while (*++f != '/' && *f);
160                 *t++ = '\0';
161
162                 add_suffix(&suftree, *buf, buf+1);
163         }
164
165         free(buf);
166 }
167
168 static void init_set_compression(void)
169 {
170         const char *f;
171         char *t, *start;
172
173         if (skip_compress)
174                 add_nocompress_suffixes(skip_compress);
175
176         /* A non-daemon transfer skips the default suffix list if the
177          * user specified --skip-compress. */
178         if (skip_compress && module_id < 0)
179                 f = "";
180         else
181                 f = lp_dont_compress(module_id);
182
183         if (!(match_list = t = new_array(char, strlen(f) + 2)))
184                 out_of_memory("set_compression");
185
186         per_file_default_level = do_compression_level;
187
188         while (*f) {
189                 if (*f == ' ') {
190                         f++;
191                         continue;
192                 }
193
194                 start = t;
195                 do {
196                         if (isUpper(f))
197                                 *t++ = toLower(f);
198                         else
199                                 *t++ = *f;
200                 } while (*++f != ' ' && *f);
201                 *t++ = '\0';
202
203                 if (t - start == 1+1 && *start == '*') {
204                         /* Optimize a match-string of "*". */
205                         *match_list = '\0';
206                         suftree = NULL;
207                         per_file_default_level = 0;
208                         break;
209                 }
210
211                 /* Move *.foo items into the stuffix tree. */
212                 if (*start == '*' && start[1] == '.' && start[2]
213                  && !strpbrk(start+2, ".?*")) {
214                         add_suffix(&suftree, start[2], start+3);
215                         t = start;
216                 }
217         }
218         *t++ = '\0';
219 }
220
221 /* determine the compression level based on a wildcard filename list */
222 void set_compression(const char *fname)
223 {
224         const struct suffix_tree *node;
225         const char *s;
226         char ltr;
227
228         if (!do_compression)
229                 return;
230
231         if (!match_list)
232                 init_set_compression();
233
234         compression_level = per_file_default_level;
235
236         if (!*match_list && !suftree)
237                 return;
238
239         if ((s = strrchr(fname, '/')) != NULL)
240                 fname = s + 1;
241
242         for (s = match_list; *s; s += strlen(s) + 1) {
243                 if (iwildmatch(s, fname)) {
244                         compression_level = 0;
245                         return;
246                 }
247         }
248
249         if (!(node = suftree) || !(s = strrchr(fname, '.'))
250          || s == fname || !(ltr = *++s))
251                 return;
252
253         while (1) {
254                 if (isUpper(&ltr))
255                         ltr = toLower(&ltr);
256                 while (node->letter != ltr) {
257                         if (node->letter > ltr)
258                                 return;
259                         if (!(node = node->sibling))
260                                 return;
261                 }
262                 if ((ltr = *++s) == '\0') {
263                         if (node->word_end)
264                                 compression_level = 0;
265                         return;
266                 }
267                 if (!(node = node->child))
268                         return;
269         }
270 }
271
272 /* non-compressing recv token */
273 static int32 simple_recv_token(int f, char **data)
274 {
275         static int32 residue;
276         static char *buf;
277         int32 n;
278
279         if (!buf) {
280                 buf = new_array(char, CHUNK_SIZE);
281                 if (!buf)
282                         out_of_memory("simple_recv_token");
283         }
284
285         if (residue == 0) {
286                 int32 i = read_int(f);
287                 if (i <= 0)
288                         return i;
289                 residue = i;
290         }
291
292         *data = buf;
293         n = MIN(CHUNK_SIZE,residue);
294         residue -= n;
295         read_buf(f,buf,n);
296         return n;
297 }
298
299 /* non-compressing send token */
300 static void simple_send_token(int f, int32 token, struct map_struct *buf,
301                               OFF_T offset, int32 n)
302 {
303         if (n > 0) {
304                 int32 len = 0;
305                 while (len < n) {
306                         int32 n1 = MIN(CHUNK_SIZE, n-len);
307                         write_int(f, n1);
308                         write_buf(f, map_ptr(buf, offset+len, n1), n1);
309                         len += n1;
310                 }
311         }
312         /* a -2 token means to send data only and no token */
313         if (token != -2)
314                 write_int(f, -(token+1));
315 }
316
317 /* Flag bytes in compressed stream are encoded as follows: */
318 #define END_FLAG        0       /* that's all folks */
319 #define TOKEN_LONG      0x20    /* followed by 32-bit token number */
320 #define TOKENRUN_LONG   0x21    /* ditto with 16-bit run count */
321 #define DEFLATED_DATA   0x40    /* + 6-bit high len, then low len byte */
322 #define TOKEN_REL       0x80    /* + 6-bit relative token number */
323 #define TOKENRUN_REL    0xc0    /* ditto with 16-bit run count */
324
325 #define MAX_DATA_COUNT  16383   /* fit 14 bit count into 2 bytes with flags */
326
327 /* zlib.h says that if we want to be able to compress something in a single
328  * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
329  * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
330  * to ensure that this is a compile-time value). */
331 #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
332
333 /* For coding runs of tokens */
334 static int32 last_token = -1;
335 static int32 run_start;
336 static int32 last_run_end;
337
338 /* Deflation state */
339 static z_stream tx_strm;
340
341 /* Output buffer */
342 static char *obuf;
343
344 /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
345  * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
346 #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
347 #define OBUF_SIZE       (MAX_DATA_COUNT+2)
348 #else
349 #define OBUF_SIZE       AVAIL_OUT_SIZE(CHUNK_SIZE)
350 #endif
351
352 /* Send a deflated token */
353 static void
354 send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
355                     int32 nb, int32 toklen)
356 {
357         static int init_done, flush_pending;
358         int32 n, r;
359
360         if (last_token == -1) {
361                 /* initialization */
362                 if (!init_done) {
363                         tx_strm.next_in = NULL;
364                         tx_strm.zalloc = NULL;
365                         tx_strm.zfree = NULL;
366                         if (deflateInit2(&tx_strm, compression_level,
367                                          Z_DEFLATED, -15, 8,
368                                          Z_DEFAULT_STRATEGY) != Z_OK) {
369                                 rprintf(FERROR, "compression init failed\n");
370                                 exit_cleanup(RERR_PROTOCOL);
371                         }
372                         if ((obuf = new_array(char, OBUF_SIZE)) == NULL)
373                                 out_of_memory("send_deflated_token");
374                         init_done = 1;
375                 } else
376                         deflateReset(&tx_strm);
377                 last_run_end = 0;
378                 run_start = token;
379                 flush_pending = 0;
380         } else if (last_token == -2) {
381                 run_start = token;
382         } else if (nb != 0 || token != last_token + 1
383                    || token >= run_start + 65536) {
384                 /* output previous run */
385                 r = run_start - last_run_end;
386                 n = last_token - run_start;
387                 if (r >= 0 && r <= 63) {
388                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
389                 } else {
390                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
391                         write_int(f, run_start);
392                 }
393                 if (n != 0) {
394                         write_byte(f, n);
395                         write_byte(f, n >> 8);
396                 }
397                 last_run_end = last_token;
398                 run_start = token;
399         }
400
401         last_token = token;
402
403         if (nb != 0 || flush_pending) {
404                 /* deflate the data starting at offset */
405                 int flush = Z_NO_FLUSH;
406                 tx_strm.avail_in = 0;
407                 tx_strm.avail_out = 0;
408                 do {
409                         if (tx_strm.avail_in == 0 && nb != 0) {
410                                 /* give it some more input */
411                                 n = MIN(nb, CHUNK_SIZE);
412                                 tx_strm.next_in = (Bytef *)
413                                         map_ptr(buf, offset, n);
414                                 tx_strm.avail_in = n;
415                                 nb -= n;
416                                 offset += n;
417                         }
418                         if (tx_strm.avail_out == 0) {
419                                 tx_strm.next_out = (Bytef *)(obuf + 2);
420                                 tx_strm.avail_out = MAX_DATA_COUNT;
421                                 if (flush != Z_NO_FLUSH) {
422                                         /*
423                                          * We left the last 4 bytes in the
424                                          * buffer, in case they are the
425                                          * last 4.  Move them to the front.
426                                          */
427                                         memcpy(tx_strm.next_out,
428                                                obuf+MAX_DATA_COUNT-2, 4);
429                                         tx_strm.next_out += 4;
430                                         tx_strm.avail_out -= 4;
431                                 }
432                         }
433                         if (nb == 0 && token != -2)
434                                 flush = Z_SYNC_FLUSH;
435                         r = deflate(&tx_strm, flush);
436                         if (r != Z_OK) {
437                                 rprintf(FERROR, "deflate returned %d\n", r);
438                                 exit_cleanup(RERR_STREAMIO);
439                         }
440                         if (nb == 0 || tx_strm.avail_out == 0) {
441                                 n = MAX_DATA_COUNT - tx_strm.avail_out;
442                                 if (flush != Z_NO_FLUSH) {
443                                         /*
444                                          * We have to trim off the last 4
445                                          * bytes of output when flushing
446                                          * (they are just 0, 0, ff, ff).
447                                          */
448                                         n -= 4;
449                                 }
450                                 if (n > 0) {
451                                         obuf[0] = DEFLATED_DATA + (n >> 8);
452                                         obuf[1] = n;
453                                         write_buf(f, obuf, n+2);
454                                 }
455                         }
456                 } while (nb != 0 || tx_strm.avail_out == 0);
457                 flush_pending = token == -2;
458         }
459
460         if (token == -1) {
461                 /* end of file - clean up */
462                 write_byte(f, END_FLAG);
463         } else if (token != -2 && do_compression == CPRES_ZLIB) {
464                 /* Add the data in the current block to the compressor's
465                  * history and hash table. */
466                 do {
467                         /* Break up long sections in the same way that
468                          * see_deflate_token() does. */
469                         int32 n1 = toklen > 0xffff ? 0xffff : toklen;
470                         toklen -= n1;
471                         tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
472                         tx_strm.avail_in = n1;
473                         if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
474                                 offset += n1;
475                         tx_strm.next_out = (Bytef *) obuf;
476                         tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
477                         r = deflate(&tx_strm, Z_INSERT_ONLY);
478                         if (r != Z_OK || tx_strm.avail_in != 0) {
479                                 rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
480                                         r, tx_strm.avail_in);
481                                 exit_cleanup(RERR_STREAMIO);
482                         }
483                 } while (toklen > 0);
484         }
485 }
486
487 /* tells us what the receiver is in the middle of doing */
488 static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
489
490 /* for inflating stuff */
491 static z_stream rx_strm;
492 static char *cbuf;
493 static char *dbuf;
494
495 /* for decoding runs of tokens */
496 static int32 rx_token;
497 static int32 rx_run;
498
499 /* Receive a deflated token and inflate it */
500 static int32 recv_deflated_token(int f, char **data)
501 {
502         static int init_done;
503         static int32 saved_flag;
504         int32 n, flag;
505         int r;
506
507         for (;;) {
508                 switch (recv_state) {
509                 case r_init:
510                         if (!init_done) {
511                                 rx_strm.next_out = NULL;
512                                 rx_strm.zalloc = NULL;
513                                 rx_strm.zfree = NULL;
514                                 if (inflateInit2(&rx_strm, -15) != Z_OK) {
515                                         rprintf(FERROR, "inflate init failed\n");
516                                         exit_cleanup(RERR_PROTOCOL);
517                                 }
518                                 if (!(cbuf = new_array(char, MAX_DATA_COUNT))
519                                     || !(dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE))))
520                                         out_of_memory("recv_deflated_token");
521                                 init_done = 1;
522                         } else {
523                                 inflateReset(&rx_strm);
524                         }
525                         recv_state = r_idle;
526                         rx_token = 0;
527                         break;
528
529                 case r_idle:
530                 case r_inflated:
531                         if (saved_flag) {
532                                 flag = saved_flag & 0xff;
533                                 saved_flag = 0;
534                         } else
535                                 flag = read_byte(f);
536                         if ((flag & 0xC0) == DEFLATED_DATA) {
537                                 n = ((flag & 0x3f) << 8) + read_byte(f);
538                                 read_buf(f, cbuf, n);
539                                 rx_strm.next_in = (Bytef *)cbuf;
540                                 rx_strm.avail_in = n;
541                                 recv_state = r_inflating;
542                                 break;
543                         }
544                         if (recv_state == r_inflated) {
545                                 /* check previous inflated stuff ended correctly */
546                                 rx_strm.avail_in = 0;
547                                 rx_strm.next_out = (Bytef *)dbuf;
548                                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
549                                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
550                                 n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
551                                 /*
552                                  * Z_BUF_ERROR just means no progress was
553                                  * made, i.e. the decompressor didn't have
554                                  * any pending output for us.
555                                  */
556                                 if (r != Z_OK && r != Z_BUF_ERROR) {
557                                         rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
558                                                 r, n);
559                                         exit_cleanup(RERR_STREAMIO);
560                                 }
561                                 if (n != 0 && r != Z_BUF_ERROR) {
562                                         /* have to return some more data and
563                                            save the flag for later. */
564                                         saved_flag = flag + 0x10000;
565                                         *data = dbuf;
566                                         return n;
567                                 }
568                                 /*
569                                  * At this point the decompressor should
570                                  * be expecting to see the 0, 0, ff, ff bytes.
571                                  */
572                                 if (!inflateSyncPoint(&rx_strm)) {
573                                         rprintf(FERROR, "decompressor lost sync!\n");
574                                         exit_cleanup(RERR_STREAMIO);
575                                 }
576                                 rx_strm.avail_in = 4;
577                                 rx_strm.next_in = (Bytef *)cbuf;
578                                 cbuf[0] = cbuf[1] = 0;
579                                 cbuf[2] = cbuf[3] = 0xff;
580                                 inflate(&rx_strm, Z_SYNC_FLUSH);
581                                 recv_state = r_idle;
582                         }
583                         if (flag == END_FLAG) {
584                                 /* that's all folks */
585                                 recv_state = r_init;
586                                 return 0;
587                         }
588
589                         /* here we have a token of some kind */
590                         if (flag & TOKEN_REL) {
591                                 rx_token += flag & 0x3f;
592                                 flag >>= 6;
593                         } else
594                                 rx_token = read_int(f);
595                         if (flag & 1) {
596                                 rx_run = read_byte(f);
597                                 rx_run += read_byte(f) << 8;
598                                 recv_state = r_running;
599                         }
600                         return -1 - rx_token;
601
602                 case r_inflating:
603                         rx_strm.next_out = (Bytef *)dbuf;
604                         rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
605                         r = inflate(&rx_strm, Z_NO_FLUSH);
606                         n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
607                         if (r != Z_OK) {
608                                 rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
609                                 exit_cleanup(RERR_STREAMIO);
610                         }
611                         if (rx_strm.avail_in == 0)
612                                 recv_state = r_inflated;
613                         if (n != 0) {
614                                 *data = dbuf;
615                                 return n;
616                         }
617                         break;
618
619                 case r_running:
620                         ++rx_token;
621                         if (--rx_run == 0)
622                                 recv_state = r_idle;
623                         return -1 - rx_token;
624                 }
625         }
626 }
627
628 /*
629  * put the data corresponding to a token that we've just returned
630  * from recv_deflated_token into the decompressor's history buffer.
631  */
632 static void see_deflate_token(char *buf, int32 len)
633 {
634         int r;
635         int32 blklen;
636         unsigned char hdr[5];
637
638         rx_strm.avail_in = 0;
639         blklen = 0;
640         hdr[0] = 0;
641         do {
642                 if (rx_strm.avail_in == 0 && len != 0) {
643                         if (blklen == 0) {
644                                 /* Give it a fake stored-block header. */
645                                 rx_strm.next_in = (Bytef *)hdr;
646                                 rx_strm.avail_in = 5;
647                                 blklen = len;
648                                 if (blklen > 0xffff)
649                                         blklen = 0xffff;
650                                 hdr[1] = blklen;
651                                 hdr[2] = blklen >> 8;
652                                 hdr[3] = ~hdr[1];
653                                 hdr[4] = ~hdr[2];
654                         } else {
655                                 rx_strm.next_in = (Bytef *)buf;
656                                 rx_strm.avail_in = blklen;
657                                 if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
658                                         buf += blklen;
659                                 len -= blklen;
660                                 blklen = 0;
661                         }
662                 }
663                 rx_strm.next_out = (Bytef *)dbuf;
664                 rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
665                 r = inflate(&rx_strm, Z_SYNC_FLUSH);
666                 if (r != Z_OK && r != Z_BUF_ERROR) {
667                         rprintf(FERROR, "inflate (token) returned %d\n", r);
668                         exit_cleanup(RERR_STREAMIO);
669                 }
670         } while (len || rx_strm.avail_out == 0);
671 }
672
673 #ifdef SUPPORT_ZSTD
674
675 static ZSTD_inBuffer zstd_in_buff;
676 static ZSTD_outBuffer zstd_out_buff;
677 static ZSTD_CCtx *zstd_cctx;
678
679 static void send_zstd_token(int f, int32 token, struct map_struct *buf,
680                             OFF_T offset, int32 nb)
681 {
682         static int comp_init_done, flush_pending;
683         ZSTD_EndDirective flush = ZSTD_e_continue;
684         int32 n, r;
685
686         /* initialization */
687         if (!comp_init_done) {
688
689                 zstd_cctx = ZSTD_createCCtx();
690                 if (!zstd_cctx) {
691                         rprintf(FERROR, "compression init failed\n");
692                         exit_cleanup(RERR_PROTOCOL);
693                 }
694
695                 obuf = new_array(char, OBUF_SIZE);
696                 if (!obuf)
697                         out_of_memory("send_deflated_token");
698
699                 ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel,
700                                        do_compression_level);
701                 zstd_out_buff.dst = obuf + 2;
702
703                 comp_init_done = 1;
704         }
705
706         if (last_token == -1) {
707                 last_run_end = 0;
708                 run_start = token;
709                 flush_pending = 0;
710         } else if (last_token == -2) {
711                 run_start = token;
712
713         } else if (nb != 0 || token != last_token + 1
714                    || token >= run_start + 65536) {
715
716                 /* output previous run */
717                 r = run_start - last_run_end;
718                 n = last_token - run_start;
719
720                 if (r >= 0 && r <= 63) {
721                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
722                 } else {
723                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
724                         write_int(f, run_start);
725                 }
726                 if (n != 0) {
727                         write_byte(f, n);
728                         write_byte(f, n >> 8);
729                 }
730                 last_run_end = last_token;
731                 run_start = token;
732         }
733
734         last_token = token;
735
736         if (nb || flush_pending) {
737
738                 zstd_in_buff.src = map_ptr(buf, offset, nb);
739                 zstd_in_buff.size = nb;
740                 zstd_in_buff.pos = 0;
741
742                 do {
743                         if (zstd_out_buff.size == 0) {
744                                 zstd_out_buff.size = MAX_DATA_COUNT;
745                                 zstd_out_buff.pos = 0;
746                         }
747
748                         /* File ended, flush */
749                         if (token != -2)
750                                 flush = ZSTD_e_flush;
751
752                         r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
753                         if (ZSTD_isError(r)) {
754                                 rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
755                                 exit_cleanup(RERR_STREAMIO);
756                         }
757
758                         /*
759                          * Nothing is sent if the buffer isn't full so avoid smaller
760                          * transfers. If a file is finished then we flush the internal
761                          * state and send a smaller buffer so that the remote side can
762                          * finish the file.
763                          */
764                         if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
765                                 n = zstd_out_buff.pos;
766
767                                 obuf[0] = DEFLATED_DATA + (n >> 8);
768                                 obuf[1] = n;
769                                 write_buf(f, obuf, n+2);
770
771                                 zstd_out_buff.size = 0;
772                         }
773                         /*
774                          * Loop while the input buffer isn't full consumed or the
775                          * internal state isn't fully flushed.
776                          */
777                 } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
778                 flush_pending = token == -2;
779         }
780
781         if (token == -1) {
782                 /* end of file - clean up */
783                 write_byte(f, END_FLAG);
784         }
785 }
786
787 static ZSTD_DCtx *zstd_dctx;
788
789 static int32 recv_zstd_token(int f, char **data)
790 {
791         static int decomp_init_done;
792         static int out_buffer_size;
793         int32 n, flag;
794         int r;
795
796         if (!decomp_init_done) {
797
798                 zstd_dctx = ZSTD_createDCtx();
799                 if (!zstd_dctx) {
800                         rprintf(FERROR, "ZSTD_createDStream failed\n");
801                         exit_cleanup(RERR_PROTOCOL);
802                 }
803
804                 /* Output buffer fits two decompressed blocks */
805                 out_buffer_size = ZSTD_DStreamOutSize() * 2;
806                 cbuf = new_array(char, MAX_DATA_COUNT);
807                 dbuf = new_array(char, out_buffer_size);
808                 if (!cbuf || !dbuf)
809                         out_of_memory("recv_zstd_token");
810
811                 zstd_in_buff.src = cbuf;
812                 zstd_out_buff.dst = dbuf;
813
814                 decomp_init_done = 1;
815         }
816
817         do {
818         switch (recv_state) {
819         case r_init:
820                 recv_state = r_idle;
821                 rx_token = 0;
822                 break;
823
824         case r_idle:
825                 flag = read_byte(f);
826                 if ((flag & 0xC0) == DEFLATED_DATA) {
827                         n = ((flag & 0x3f) << 8) + read_byte(f);
828                         read_buf(f, cbuf, n);
829
830                         zstd_in_buff.size = n;
831                         zstd_in_buff.pos = 0;
832
833                         recv_state = r_inflating;
834
835                 } else if (flag == END_FLAG) {
836                         /* that's all folks */
837                         recv_state = r_init;
838                         return 0;
839
840                 } else {
841                         /* here we have a token of some kind */
842                         if (flag & TOKEN_REL) {
843                                 rx_token += flag & 0x3f;
844                                 flag >>= 6;
845                         } else
846                                 rx_token = read_int(f);
847                         if (flag & 1) {
848                                 rx_run = read_byte(f);
849                                 rx_run += read_byte(f) << 8;
850                                 recv_state = r_running;
851                         }
852                         return -1 - rx_token;
853                 }
854                 break;
855
856         case r_inflating:
857                 zstd_out_buff.size = out_buffer_size;
858                 zstd_out_buff.pos = 0;
859
860                 r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
861                 n = zstd_out_buff.pos;
862                 if (ZSTD_isError(r)) {
863                         rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
864                         exit_cleanup(RERR_STREAMIO);
865                 }
866
867                 /*
868                  * If the input buffer is fully consumed and the output
869                  * buffer is not full then next step is to read more
870                  * data.
871                  */
872                 if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
873                         recv_state = r_idle;
874
875                 if (n != 0) {
876                         *data = dbuf;
877                         return n;
878                 }
879                 break;
880
881         case r_running:
882                 ++rx_token;
883                 if (--rx_run == 0)
884                         recv_state = r_idle;
885                 return -1 - rx_token;
886                 break;
887
888         case r_inflated:
889                 break;
890         }
891         } while (1);
892 }
893 #endif /* SUPPORT_ZSTD */
894
895 #ifdef SUPPORT_LZ4
896 static void
897 send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
898 {
899         static int init_done, flush_pending;
900         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
901         int32 n, r;
902
903         if (last_token == -1) {
904                 if (!init_done) {
905                         if ((obuf = new_array(char, size)) == NULL)
906                                 out_of_memory("send_compressed_token");
907                         init_done = 1;
908                 }
909                 last_run_end = 0;
910                 run_start = token;
911                 flush_pending = 0;
912         } else if (last_token == -2) {
913                 run_start = token;
914         } else if (nb != 0 || token != last_token + 1
915                    || token >= run_start + 65536) {
916                 /* output previous run */
917                 r = run_start - last_run_end;
918                 n = last_token - run_start;
919                 if (r >= 0 && r <= 63) {
920                         write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
921                 } else {
922                         write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
923                         write_int(f, run_start);
924                 }
925                 if (n != 0) {
926                         write_byte(f, n);
927                         write_byte(f, n >> 8);
928                 }
929                 last_run_end = last_token;
930                 run_start = token;
931         }
932
933         last_token = token;
934
935         if (nb != 0 || flush_pending) {
936                 int available_in, available_out = 0;
937                 const char *next_in;
938
939                 do {
940                         char *ptr = obuf;
941                         char *next_out = obuf + 2;
942
943                         if (available_out == 0) {
944                                 available_in = MIN(nb, MAX_DATA_COUNT);
945                                 next_in = map_ptr(buf, offset, available_in);
946                         } else
947                                 available_in /= 2;
948
949                         available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
950                         if (!available_out) {
951                                 rprintf(FERROR, "compress returned %d\n", available_out);
952                                 exit_cleanup(RERR_STREAMIO);
953                         }
954                         if (available_out <= MAX_DATA_COUNT) {
955                                 ptr[0] = DEFLATED_DATA + (available_out >> 8);
956                                 ptr[1] = available_out;
957
958                                 write_buf(f, ptr, available_out + 2);
959
960                                 available_out = 0;
961                                 nb -= available_in;
962                                 offset += available_in;
963                         }
964                 } while (nb != 0);
965                 flush_pending = token == -2;
966         }
967         if (token == -1)
968                 /* end of file - clean up */
969                 write_byte(f, END_FLAG);
970 }
971
972 static int32 recv_compressed_token(int f, char **data)
973 {
974         static int32 saved_flag;
975         static int init_done;
976         int32 n, flag;
977         int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
978         static const char *next_in;
979         static int avail_in;
980         int avail_out;
981
982         for (;;) {
983                 switch (recv_state) {
984                 case r_init:
985                         if (!init_done) {
986                                 if (!(cbuf = new_array(char, MAX_DATA_COUNT))
987                                     || !(dbuf = new_array(char, size)))
988                                         out_of_memory("recv_compressed_token");
989                                 init_done = 1;
990                         }
991                         recv_state = r_idle;
992                         rx_token = 0;
993                         break;
994                 case r_idle:
995                 case r_inflated:
996                         if (saved_flag) {
997                                 flag = saved_flag & 0xff;
998                                 saved_flag = 0;
999                         } else
1000                                 flag = read_byte(f);
1001                         if ((flag & 0xC0) == DEFLATED_DATA) {
1002                                 n = ((flag & 0x3f) << 8) + read_byte(f);
1003                                 read_buf(f, cbuf, n);
1004                                 next_in = (char *)cbuf;
1005                                 avail_in = n;
1006                                 recv_state = r_inflating;
1007                                 break;
1008                         }
1009
1010                         if (recv_state == r_inflated)
1011                                 recv_state = r_idle;
1012
1013                         if (flag == END_FLAG) {
1014                                 /* that's all folks */
1015                                 recv_state = r_init;
1016                                 return 0;
1017                         }
1018
1019                         /* here we have a token of some kind */
1020                         if (flag & TOKEN_REL) {
1021                                 rx_token += flag & 0x3f;
1022                                 flag >>= 6;
1023                         } else
1024                                 rx_token = read_int(f);
1025                         if (flag & 1) {
1026                                 rx_run = read_byte(f);
1027                                 rx_run += read_byte(f) << 8;
1028                                 recv_state = r_running;
1029                         }
1030                         return -1 - rx_token;
1031
1032                 case r_inflating:
1033                         avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
1034                         if (avail_out < 0) {
1035                                 rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1036                                 exit_cleanup(RERR_STREAMIO);
1037                         }
1038                         recv_state = r_inflated;
1039                         *data = dbuf;
1040                         return avail_out;
1041
1042                 case r_running:
1043                         ++rx_token;
1044                         if (--rx_run == 0)
1045                                 recv_state = r_idle;
1046                         return -1 - rx_token;
1047                 }
1048         }
1049
1050 }
1051
1052 # if 0
1053 static void see_uncompressed_token(char *buf, int32 len)
1054 {
1055         static const char *next_in;
1056         static int avail_in;
1057         int avail_out;
1058
1059         int32 blklen;
1060         char hdr[5];
1061
1062         avail_in = 0;
1063         blklen = 0;
1064         hdr[0] = 0;
1065         do {
1066                 if (avail_in == 0 && len != 0) {
1067                         if (blklen == 0) {
1068                                 /* Give it a fake stored-block header. */
1069                                 next_in = hdr;
1070                                 avail_in = 5;
1071                                 blklen = len;
1072                                 if (blklen > 0xffff)
1073                                         blklen = 0xffff;
1074                                 hdr[1] = blklen;
1075                                 hdr[2] = blklen >> 8;
1076                                 hdr[3] = ~hdr[1];
1077                                 hdr[4] = ~hdr[2];
1078                         } else {
1079                                 next_in = (char *)buf;
1080                                 avail_in = blklen;
1081                                 if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
1082                                         buf += blklen;
1083                                 len -= blklen;
1084                                 blklen = 0;
1085                         }
1086                 }
1087                 avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, LZ4_compressBound(CHUNK_SIZE));
1088                 if (avail_out < 0) {
1089                         rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1090                         exit_cleanup(RERR_STREAMIO);
1091                 }
1092
1093         } while (len);
1094 }
1095 # endif /* 0 */
1096 #endif /* SUPPORT_LZ4 */
1097
1098 /**
1099  * Transmit a verbatim buffer of length @p n followed by a token.
1100  * If token == -1 then we have reached EOF
1101  * If n == 0 then don't send a buffer
1102  */
1103 void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
1104                 int32 n, int32 toklen)
1105 {
1106         if (!do_compression)
1107                 simple_send_token(f, token, buf, offset, n);
1108 #ifdef SUPPORT_ZSTD
1109         else if (do_compression == CPRES_ZSTD)
1110                 send_zstd_token(f, token, buf, offset, n);
1111 #endif
1112 #ifdef SUPPORT_LZ4
1113         else if (do_compression == CPRES_LZ4)
1114                 send_compressed_token(f, token, buf, offset, n);
1115 #endif
1116         else
1117                 send_deflated_token(f, token, buf, offset, n, toklen);
1118 }
1119
1120 /*
1121  * receive a token or buffer from the other end. If the return value is >0 then
1122  * it is a data buffer of that length, and *data will point at the data.
1123  * if the return value is -i then it represents token i-1
1124  * if the return value is 0 then the end has been reached
1125  */
1126 int32 recv_token(int f, char **data)
1127 {
1128         int tok;
1129
1130         if (!do_compression)
1131                 tok = simple_recv_token(f,data);
1132 #ifdef SUPPORT_ZSTD
1133         else if (do_compression == CPRES_ZSTD)
1134                 tok = recv_zstd_token(f, data);
1135 #endif
1136 #ifdef SUPPORT_LZ4
1137         else if (do_compression == CPRES_LZ4)
1138                 tok = recv_compressed_token(f, data);
1139 #endif
1140         else /* CPRES_ZLIB & CPRES_ZLIBX */
1141                 tok = recv_deflated_token(f, data);
1142         return tok;
1143 }
1144
1145 /*
1146  * look at the data corresponding to a token, if necessary
1147  */
1148 void see_token(char *data, int32 toklen)
1149 {
1150         if (do_compression == CPRES_ZLIB)
1151                 see_deflate_token(data, toklen);
1152 #ifdef SUPPORT_LZ4
1153 # if 0
1154         else if (do_compression == CPRES_LZ4)
1155                 see_uncompressed_token(data, toklen);
1156 # endif
1157 #endif
1158 }