tdb: Consistency check for tdb_storev
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 /* Returns 0 on fail.  On success, return offset of record, and fills
83    in rec */
84 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
85                         struct tdb_record *r)
86 {
87         tdb_off_t rec_ptr;
88
89         /* read in the hash top */
90         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
91                 return 0;
92
93         /* keep looking until we find the right record */
94         while (rec_ptr) {
95                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
96                         return 0;
97
98                 if (!TDB_DEAD(r) && hash==r->full_hash
99                     && key.dsize==r->key_len
100                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
101                                       r->key_len, tdb_key_compare,
102                                       NULL) == 0) {
103                         return rec_ptr;
104                 }
105                 /* detect tight infinite loop */
106                 if (rec_ptr == r->next) {
107                         tdb->ecode = TDB_ERR_CORRUPT;
108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
109                         return 0;
110                 }
111                 rec_ptr = r->next;
112         }
113         tdb->ecode = TDB_ERR_NOEXIST;
114         return 0;
115 }
116
117 /* As tdb_find, but if you succeed, keep the lock */
118 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
119                            struct tdb_record *rec)
120 {
121         uint32_t rec_ptr;
122
123         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
124                 return 0;
125         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
126                 tdb_unlock(tdb, BUCKET(hash), locktype);
127         return rec_ptr;
128 }
129
130 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
131
132 struct tdb_update_hash_state {
133         const TDB_DATA *dbufs;
134         int num_dbufs;
135         tdb_len_t dbufs_len;
136 };
137
138 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
139 {
140         struct tdb_update_hash_state *state = private_data;
141         unsigned char *dptr = data.dptr;
142         int i;
143
144         if (state->dbufs_len != data.dsize) {
145                 return -1;
146         }
147
148         for (i=0; i<state->num_dbufs; i++) {
149                 TDB_DATA dbuf = state->dbufs[i];
150                 int ret;
151                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
152                 if (ret != 0) {
153                         return -1;
154                 }
155                 dptr += dbuf.dsize;
156         }
157
158         return 0;
159 }
160
161 /* update an entry in place - this only works if the new data size
162    is <= the old data size and the key exists.
163    on failure return -1.
164 */
165 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
166                            uint32_t hash,
167                            const TDB_DATA *dbufs, int num_dbufs,
168                            tdb_len_t dbufs_len)
169 {
170         struct tdb_record rec;
171         tdb_off_t rec_ptr, ofs;
172         int i;
173
174         /* find entry */
175         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
176                 return -1;
177
178         /* it could be an exact duplicate of what is there - this is
179          * surprisingly common (eg. with a ldb re-index). */
180         if (rec.data_len == dbufs_len) {
181                 struct tdb_update_hash_state state = {
182                         .dbufs = dbufs, .num_dbufs = num_dbufs,
183                         .dbufs_len = dbufs_len
184                 };
185                 int ret;
186
187                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
188                 if (ret == 0) {
189                         return 0;
190                 }
191         }
192
193         /* must be long enough key, data and tailer */
194         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
195                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
196                 return -1;
197         }
198
199         ofs = rec_ptr + sizeof(rec) + rec.key_len;
200
201         for (i=0; i<num_dbufs; i++) {
202                 TDB_DATA dbuf = dbufs[i];
203                 int ret;
204
205                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
206                 if (ret == -1) {
207                         return -1;
208                 }
209                 ofs += dbuf.dsize;
210         }
211
212         if (dbufs_len != rec.data_len) {
213                 /* update size */
214                 rec.data_len = dbufs_len;
215                 return tdb_rec_write(tdb, rec_ptr, &rec);
216         }
217
218         return 0;
219 }
220
221 /* find an entry in the database given a key */
222 /* If an entry doesn't exist tdb_err will be set to
223  * TDB_ERR_NOEXIST. If a key has no data attached
224  * then the TDB_DATA will have zero length but
225  * a non-zero pointer
226  */
227 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
228 {
229         tdb_off_t rec_ptr;
230         struct tdb_record rec;
231         TDB_DATA ret;
232         uint32_t hash;
233
234         /* find which hash bucket it is in */
235         hash = tdb->hash_fn(&key);
236         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
237                 return tdb_null;
238
239         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
240                                   rec.data_len);
241         ret.dsize = rec.data_len;
242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
243         return ret;
244 }
245
246 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
247 {
248         TDB_DATA ret = _tdb_fetch(tdb, key);
249
250         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
251         return ret;
252 }
253
254 /*
255  * Find an entry in the database and hand the record's data to a parsing
256  * function. The parsing function is executed under the chain read lock, so it
257  * should be fast and should not block on other syscalls.
258  *
259  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
260  *
261  * For mmapped tdb's that do not have a transaction open it points the parsing
262  * function directly at the mmap area, it avoids the malloc/memcpy in this
263  * case. If a transaction is open or no mmap is available, it has to do
264  * malloc/read/parse/free.
265  *
266  * This is interesting for all readers of potentially large data structures in
267  * the tdb records, ldb indexes being one example.
268  *
269  * Return -1 if the record was not found.
270  */
271
272 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
273                      int (*parser)(TDB_DATA key, TDB_DATA data,
274                                    void *private_data),
275                      void *private_data)
276 {
277         tdb_off_t rec_ptr;
278         struct tdb_record rec;
279         int ret;
280         uint32_t hash;
281
282         /* find which hash bucket it is in */
283         hash = tdb->hash_fn(&key);
284
285         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
286                 /* record not found */
287                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
288                 tdb->ecode = TDB_ERR_NOEXIST;
289                 return -1;
290         }
291         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
292
293         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
294                              rec.data_len, parser, private_data);
295
296         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
297
298         return ret;
299 }
300
301 /* check if an entry in the database exists
302
303    note that 1 is returned if the key is found and 0 is returned if not found
304    this doesn't match the conventions in the rest of this module, but is
305    compatible with gdbm
306 */
307 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
308 {
309         struct tdb_record rec;
310
311         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
312                 return 0;
313         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
314         return 1;
315 }
316
317 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
318 {
319         uint32_t hash = tdb->hash_fn(&key);
320         int ret;
321
322         ret = tdb_exists_hash(tdb, key, hash);
323         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
324         return ret;
325 }
326
327 /* actually delete an entry in the database given the offset */
328 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
329 {
330         tdb_off_t last_ptr, i;
331         struct tdb_record lastrec;
332
333         if (tdb->read_only || tdb->traverse_read) return -1;
334
335         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
336             tdb_write_lock_record(tdb, rec_ptr) == -1) {
337                 /* Someone traversing here: mark it as dead */
338                 rec->magic = TDB_DEAD_MAGIC;
339                 return tdb_rec_write(tdb, rec_ptr, rec);
340         }
341         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
342                 return -1;
343
344         /* find previous record in hash chain */
345         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
346                 return -1;
347         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
348                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
349                         return -1;
350
351         /* unlink it: next ptr is at start of record. */
352         if (last_ptr == 0)
353                 last_ptr = TDB_HASH_TOP(rec->full_hash);
354         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
355                 return -1;
356
357         /* recover the space */
358         if (tdb_free(tdb, rec_ptr, rec) == -1)
359                 return -1;
360         return 0;
361 }
362
363 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
364 {
365         int res = 0;
366         tdb_off_t rec_ptr;
367         struct tdb_record rec;
368
369         /* read in the hash top */
370         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
371                 return 0;
372
373         while (rec_ptr) {
374                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
375                         return 0;
376
377                 if (rec.magic == TDB_DEAD_MAGIC) {
378                         res += 1;
379                 }
380                 rec_ptr = rec.next;
381         }
382         return res;
383 }
384
385 /*
386  * Purge all DEAD records from a hash chain
387  */
388 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
389 {
390         int res = -1;
391         struct tdb_record rec;
392         tdb_off_t rec_ptr;
393
394         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
395                 /*
396                  * Don't block the freelist if not strictly necessary
397                  */
398                 return -1;
399         }
400
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 goto fail;
404
405         while (rec_ptr) {
406                 tdb_off_t next;
407
408                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
409                         goto fail;
410                 }
411
412                 next = rec.next;
413
414                 if (rec.magic == TDB_DEAD_MAGIC
415                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
416                         goto fail;
417                 }
418                 rec_ptr = next;
419         }
420         res = 0;
421  fail:
422         tdb_unlock(tdb, -1, F_WRLCK);
423         return res;
424 }
425
426 /* delete an entry in the database given a key */
427 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
428 {
429         tdb_off_t rec_ptr;
430         struct tdb_record rec;
431         int ret;
432
433         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
434         if (rec_ptr == 0) {
435                 return -1;
436         }
437
438         if (tdb->max_dead_records != 0) {
439
440                 uint32_t magic = TDB_DEAD_MAGIC;
441
442                 /*
443                  * Allow for some dead records per hash chain, mainly for
444                  * tdb's with a very high create/delete rate like locking.tdb.
445                  */
446
447                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
448                         /*
449                          * Don't let the per-chain freelist grow too large,
450                          * delete all existing dead records
451                          */
452                         tdb_purge_dead(tdb, hash);
453                 }
454
455                 /*
456                  * Just mark the record as dead.
457                  */
458                 ret = tdb_ofs_write(
459                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
460                         &magic);
461         }
462         else {
463                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
464         }
465
466         if (ret == 0) {
467                 tdb_increment_seqnum(tdb);
468         }
469
470         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
471                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
472         return ret;
473 }
474
475 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
476 {
477         uint32_t hash = tdb->hash_fn(&key);
478         int ret;
479
480         ret = tdb_delete_hash(tdb, key, hash);
481         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
482         return ret;
483 }
484
485 /*
486  * See if we have a dead record around with enough space
487  */
488 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
489                         struct tdb_record *r, tdb_len_t length,
490                         tdb_off_t *p_last_ptr)
491 {
492         tdb_off_t rec_ptr, last_ptr;
493         tdb_off_t best_rec_ptr = 0;
494         tdb_off_t best_last_ptr = 0;
495         struct tdb_record best = { .rec_len = UINT32_MAX };
496
497         length += sizeof(tdb_off_t); /* tailer */
498
499         last_ptr = TDB_HASH_TOP(hash);
500
501         /* read in the hash top */
502         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
503                 return 0;
504
505         /* keep looking until we find the right record */
506         while (rec_ptr) {
507                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
508                         return 0;
509
510                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
511                     (r->rec_len < best.rec_len)) {
512                         best_rec_ptr = rec_ptr;
513                         best_last_ptr = last_ptr;
514                         best = *r;
515                 }
516                 last_ptr = rec_ptr;
517                 rec_ptr = r->next;
518         }
519
520         if (best.rec_len == UINT32_MAX) {
521                 return 0;
522         }
523
524         *r = best;
525         *p_last_ptr = best_last_ptr;
526         return best_rec_ptr;
527 }
528
529 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
530                        const TDB_DATA *dbufs, int num_dbufs,
531                        int flag, uint32_t hash)
532 {
533         struct tdb_record rec;
534         tdb_off_t rec_ptr, ofs;
535         tdb_len_t rec_len, dbufs_len;
536         int i;
537         int ret = -1;
538
539         dbufs_len = 0;
540
541         for (i=0; i<num_dbufs; i++) {
542                 size_t dsize = dbufs[i].dsize;
543
544                 if ((dsize != 0) && (dbufs[i].dptr == NULL)) {
545                         tdb->ecode = TDB_ERR_EINVAL;
546                         goto fail;
547                 }
548
549                 dbufs_len += dsize;
550                 if (dbufs_len < dsize) {
551                         tdb->ecode = TDB_ERR_OOM;
552                         goto fail;
553                 }
554         }
555
556         rec_len = key.dsize + dbufs_len;
557         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
558                 tdb->ecode = TDB_ERR_OOM;
559                 goto fail;
560         }
561
562         /* check for it existing, on insert. */
563         if (flag == TDB_INSERT) {
564                 if (tdb_exists_hash(tdb, key, hash)) {
565                         tdb->ecode = TDB_ERR_EXISTS;
566                         goto fail;
567                 }
568         } else {
569                 /* first try in-place update, on modify or replace. */
570                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
571                                     dbufs_len) == 0) {
572                         goto done;
573                 }
574                 if (tdb->ecode == TDB_ERR_NOEXIST &&
575                     flag == TDB_MODIFY) {
576                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
577                          we should fail the store */
578                         goto fail;
579                 }
580         }
581         /* reset the error code potentially set by the tdb_update_hash() */
582         tdb->ecode = TDB_SUCCESS;
583
584         /* delete any existing record - if it doesn't exist we don't
585            care.  Doing this first reduces fragmentation, and avoids
586            coalescing with `allocated' block before it's updated. */
587         if (flag != TDB_INSERT)
588                 tdb_delete_hash(tdb, key, hash);
589
590         /* we have to allocate some space */
591         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
592
593         if (rec_ptr == 0) {
594                 goto fail;
595         }
596
597         /* Read hash top into next ptr */
598         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
599                 goto fail;
600
601         rec.key_len = key.dsize;
602         rec.data_len = dbufs_len;
603         rec.full_hash = hash;
604         rec.magic = TDB_MAGIC;
605
606         ofs = rec_ptr;
607
608         /* write out and point the top of the hash chain at it */
609         ret = tdb_rec_write(tdb, ofs, &rec);
610         if (ret == -1) {
611                 goto fail;
612         }
613         ofs += sizeof(rec);
614
615         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
616         if (ret == -1) {
617                 goto fail;
618         }
619         ofs += key.dsize;
620
621         for (i=0; i<num_dbufs; i++) {
622                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
623                                               dbufs[i].dsize);
624                 if (ret == -1) {
625                         goto fail;
626                 }
627                 ofs += dbufs[i].dsize;
628         }
629
630         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
631         if (ret == -1) {
632                 /* Need to tdb_unallocate() here */
633                 goto fail;
634         }
635
636  done:
637         ret = 0;
638  fail:
639         if (ret == 0) {
640                 tdb_increment_seqnum(tdb);
641         }
642         return ret;
643 }
644
645 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
646                       TDB_DATA dbuf, int flag, uint32_t hash)
647 {
648         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
649 }
650
651 /* store an element in the database, replacing any existing element
652    with the same key
653
654    return 0 on success, -1 on failure
655 */
656 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
657 {
658         uint32_t hash;
659         int ret;
660
661         if (tdb->read_only || tdb->traverse_read) {
662                 tdb->ecode = TDB_ERR_RDONLY;
663                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
664                 return -1;
665         }
666
667         /* find which hash bucket it is in */
668         hash = tdb->hash_fn(&key);
669         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
670                 return -1;
671
672         ret = _tdb_store(tdb, key, dbuf, flag, hash);
673         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
674         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
675         return ret;
676 }
677
678 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
679                         const TDB_DATA *dbufs, int num_dbufs, int flag)
680 {
681         uint32_t hash;
682         int ret;
683
684         if (tdb->read_only || tdb->traverse_read) {
685                 tdb->ecode = TDB_ERR_RDONLY;
686                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
687                                               dbufs, num_dbufs, flag, -1);
688                 return -1;
689         }
690
691         /* find which hash bucket it is in */
692         hash = tdb->hash_fn(&key);
693         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
694                 return -1;
695
696         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
697         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
698                                       dbufs, num_dbufs, flag, -1);
699         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
700         return ret;
701 }
702
703 /* Append to an entry. Create if not exist. */
704 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
705 {
706         uint32_t hash;
707         TDB_DATA dbufs[2];
708         int ret = -1;
709
710         /* find which hash bucket it is in */
711         hash = tdb->hash_fn(&key);
712         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
713                 return -1;
714
715         dbufs[0] = _tdb_fetch(tdb, key);
716         dbufs[1] = new_dbuf;
717
718         ret = _tdb_storev(tdb, key, dbufs, 2, 0, hash);
719         tdb_trace_2rec_retrec(tdb, "tdb_append", key, dbufs[0], dbufs[1]);
720
721         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
722         SAFE_FREE(dbufs[0].dptr);
723         return ret;
724 }
725
726
727 /*
728   return the name of the current tdb file
729   useful for external logging functions
730 */
731 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
732 {
733         return tdb->name;
734 }
735
736 /*
737   return the underlying file descriptor being used by tdb, or -1
738   useful for external routines that want to check the device/inode
739   of the fd
740 */
741 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
742 {
743         return tdb->fd;
744 }
745
746 /*
747   return the current logging function
748   useful for external tdb routines that wish to log tdb errors
749 */
750 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
751 {
752         return tdb->log.log_fn;
753 }
754
755
756 /*
757   get the tdb sequence number. Only makes sense if the writers opened
758   with TDB_SEQNUM set. Note that this sequence number will wrap quite
759   quickly, so it should only be used for a 'has something changed'
760   test, not for code that relies on the count of the number of changes
761   made. If you want a counter then use a tdb record.
762
763   The aim of this sequence number is to allow for a very lightweight
764   test of a possible tdb change.
765 */
766 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
767 {
768         tdb_off_t seqnum=0;
769
770         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
771         return seqnum;
772 }
773
774 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
775 {
776         return tdb->hash_size;
777 }
778
779 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
780 {
781         return tdb->map_size;
782 }
783
784 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
785 {
786         return tdb->flags;
787 }
788
789 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
790 {
791         if ((flags & TDB_ALLOW_NESTING) &&
792             (flags & TDB_DISALLOW_NESTING)) {
793                 tdb->ecode = TDB_ERR_NESTING;
794                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
795                         "allow_nesting and disallow_nesting are not allowed together!"));
796                 return;
797         }
798
799         if (flags & TDB_ALLOW_NESTING) {
800                 tdb->flags &= ~TDB_DISALLOW_NESTING;
801         }
802         if (flags & TDB_DISALLOW_NESTING) {
803                 tdb->flags &= ~TDB_ALLOW_NESTING;
804         }
805
806         tdb->flags |= flags;
807 }
808
809 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
810 {
811         if ((flags & TDB_ALLOW_NESTING) &&
812             (flags & TDB_DISALLOW_NESTING)) {
813                 tdb->ecode = TDB_ERR_NESTING;
814                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
815                         "allow_nesting and disallow_nesting are not allowed together!"));
816                 return;
817         }
818
819         if ((flags & TDB_NOLOCK) &&
820             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
821             (tdb->mutexes == NULL)) {
822                 tdb->ecode = TDB_ERR_LOCK;
823                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
824                          "Can not remove NOLOCK flag on mutexed databases"));
825                 return;
826         }
827
828         if (flags & TDB_ALLOW_NESTING) {
829                 tdb->flags |= TDB_DISALLOW_NESTING;
830         }
831         if (flags & TDB_DISALLOW_NESTING) {
832                 tdb->flags |= TDB_ALLOW_NESTING;
833         }
834
835         tdb->flags &= ~flags;
836 }
837
838
839 /*
840   enable sequence number handling on an open tdb
841 */
842 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
843 {
844         tdb->flags |= TDB_SEQNUM;
845 }
846
847
848 /*
849   add a region of the file to the freelist. Length is the size of the region in bytes,
850   which includes the free list header that needs to be added
851  */
852 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
853 {
854         struct tdb_record rec;
855         if (length <= sizeof(rec)) {
856                 /* the region is not worth adding */
857                 return 0;
858         }
859         if (length + offset > tdb->map_size) {
860                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
861                 return -1;
862         }
863         memset(&rec,'\0',sizeof(rec));
864         rec.rec_len = length - sizeof(rec);
865         if (tdb_free(tdb, offset, &rec) == -1) {
866                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
867                 return -1;
868         }
869         return 0;
870 }
871
872 /*
873   wipe the entire database, deleting all records. This can be done
874   very fast by using a allrecord lock. The entire data portion of the
875   file becomes a single entry in the freelist.
876
877   This code carefully steps around the recovery area, leaving it alone
878  */
879 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
880 {
881         uint32_t i;
882         tdb_off_t offset = 0;
883         ssize_t data_len;
884         tdb_off_t recovery_head;
885         tdb_len_t recovery_size = 0;
886
887         if (tdb_lockall(tdb) != 0) {
888                 return -1;
889         }
890
891         tdb_trace(tdb, "tdb_wipe_all");
892
893         /* see if the tdb has a recovery area, and remember its size
894            if so. We don't want to lose this as otherwise each
895            tdb_wipe_all() in a transaction will increase the size of
896            the tdb by the size of the recovery area */
897         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
898                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
899                 goto failed;
900         }
901
902         if (recovery_head != 0) {
903                 struct tdb_record rec;
904                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
905                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
906                         return -1;
907                 }
908                 recovery_size = rec.rec_len + sizeof(rec);
909         }
910
911         /* wipe the hashes */
912         for (i=0;i<tdb->hash_size;i++) {
913                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
914                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
915                         goto failed;
916                 }
917         }
918
919         /* wipe the freelist */
920         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
921                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
922                 goto failed;
923         }
924
925         /* add all the rest of the file to the freelist, possibly leaving a gap
926            for the recovery area */
927         if (recovery_size == 0) {
928                 /* the simple case - the whole file can be used as a freelist */
929                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
930                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
931                         goto failed;
932                 }
933         } else {
934                 /* we need to add two freelist entries - one on either
935                    side of the recovery area
936
937                    Note that we cannot shift the recovery area during
938                    this operation. Only the transaction.c code may
939                    move the recovery area or we risk subtle data
940                    corruption
941                 */
942                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
943                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
944                         goto failed;
945                 }
946                 /* and the 2nd free list entry after the recovery area - if any */
947                 data_len = tdb->map_size - (recovery_head+recovery_size);
948                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
949                         goto failed;
950                 }
951         }
952
953         tdb_increment_seqnum_nonblock(tdb);
954
955         if (tdb_unlockall(tdb) != 0) {
956                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
957                 goto failed;
958         }
959
960         return 0;
961
962 failed:
963         tdb_unlockall(tdb);
964         return -1;
965 }
966
967 struct traverse_state {
968         bool error;
969         struct tdb_context *dest_db;
970 };
971
972 /*
973   traverse function for repacking
974  */
975 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
976 {
977         struct traverse_state *state = (struct traverse_state *)private_data;
978         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
979                 state->error = true;
980                 return -1;
981         }
982         return 0;
983 }
984
985 /*
986   repack a tdb
987  */
988 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
989 {
990         struct tdb_context *tmp_db;
991         struct traverse_state state;
992
993         tdb_trace(tdb, "tdb_repack");
994
995         if (tdb_transaction_start(tdb) != 0) {
996                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
997                 return -1;
998         }
999
1000         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
1001         if (tmp_db == NULL) {
1002                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
1003                 tdb_transaction_cancel(tdb);
1004                 return -1;
1005         }
1006
1007         state.error = false;
1008         state.dest_db = tmp_db;
1009
1010         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1011                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1012                 tdb_transaction_cancel(tdb);
1013                 tdb_close(tmp_db);
1014                 return -1;
1015         }
1016
1017         if (state.error) {
1018                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1019                 tdb_transaction_cancel(tdb);
1020                 tdb_close(tmp_db);
1021                 return -1;
1022         }
1023
1024         if (tdb_wipe_all(tdb) != 0) {
1025                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1026                 tdb_transaction_cancel(tdb);
1027                 tdb_close(tmp_db);
1028                 return -1;
1029         }
1030
1031         state.error = false;
1032         state.dest_db = tdb;
1033
1034         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1035                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1036                 tdb_transaction_cancel(tdb);
1037                 tdb_close(tmp_db);
1038                 return -1;
1039         }
1040
1041         if (state.error) {
1042                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1043                 tdb_transaction_cancel(tdb);
1044                 tdb_close(tmp_db);
1045                 return -1;
1046         }
1047
1048         tdb_close(tmp_db);
1049
1050         if (tdb_transaction_commit(tdb) != 0) {
1051                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1052                 return -1;
1053         }
1054
1055         return 0;
1056 }
1057
1058 /* Even on files, we can get partial writes due to signals. */
1059 bool tdb_write_all(int fd, const void *buf, size_t count)
1060 {
1061         while (count) {
1062                 ssize_t ret;
1063                 ret = write(fd, buf, count);
1064                 if (ret < 0)
1065                         return false;
1066                 buf = (const char *)buf + ret;
1067                 count -= ret;
1068         }
1069         return true;
1070 }
1071
1072 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1073 {
1074         tdb_off_t ret = a + b;
1075
1076         if ((ret < a) || (ret < b)) {
1077                 return false;
1078         }
1079         *pret = ret;
1080         return true;
1081 }
1082
1083 #ifdef TDB_TRACE
1084 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1085 {
1086         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1087                 close(tdb->tracefd);
1088                 tdb->tracefd = -1;
1089         }
1090 }
1091
1092 static void tdb_trace_start(struct tdb_context *tdb)
1093 {
1094         tdb_off_t seqnum=0;
1095         char msg[sizeof(tdb_off_t) * 4 + 1];
1096
1097         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1098         snprintf(msg, sizeof(msg), "%u ", seqnum);
1099         tdb_trace_write(tdb, msg);
1100 }
1101
1102 static void tdb_trace_end(struct tdb_context *tdb)
1103 {
1104         tdb_trace_write(tdb, "\n");
1105 }
1106
1107 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1108 {
1109         char msg[sizeof(ret) * 4 + 4];
1110         snprintf(msg, sizeof(msg), " = %i\n", ret);
1111         tdb_trace_write(tdb, msg);
1112 }
1113
1114 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1115 {
1116         char msg[20 + rec.dsize*2], *p;
1117         unsigned int i;
1118
1119         /* We differentiate zero-length records from non-existent ones. */
1120         if (rec.dptr == NULL) {
1121                 tdb_trace_write(tdb, " NULL");
1122                 return;
1123         }
1124
1125         /* snprintf here is purely cargo-cult programming. */
1126         p = msg;
1127         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1128         for (i = 0; i < rec.dsize; i++)
1129                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1130
1131         tdb_trace_write(tdb, msg);
1132 }
1133
1134 void tdb_trace(struct tdb_context *tdb, const char *op)
1135 {
1136         tdb_trace_start(tdb);
1137         tdb_trace_write(tdb, op);
1138         tdb_trace_end(tdb);
1139 }
1140
1141 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1142 {
1143         char msg[sizeof(tdb_off_t) * 4 + 1];
1144
1145         snprintf(msg, sizeof(msg), "%u ", seqnum);
1146         tdb_trace_write(tdb, msg);
1147         tdb_trace_write(tdb, op);
1148         tdb_trace_end(tdb);
1149 }
1150
1151 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1152                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1153 {
1154         char msg[128];
1155
1156         snprintf(msg, sizeof(msg),
1157                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1158         tdb_trace_start(tdb);
1159         tdb_trace_write(tdb, msg);
1160         tdb_trace_end(tdb);
1161 }
1162
1163 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1164 {
1165         tdb_trace_start(tdb);
1166         tdb_trace_write(tdb, op);
1167         tdb_trace_end_ret(tdb, ret);
1168 }
1169
1170 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1171 {
1172         tdb_trace_start(tdb);
1173         tdb_trace_write(tdb, op);
1174         tdb_trace_write(tdb, " =");
1175         tdb_trace_record(tdb, ret);
1176         tdb_trace_end(tdb);
1177 }
1178
1179 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1180                     TDB_DATA rec)
1181 {
1182         tdb_trace_start(tdb);
1183         tdb_trace_write(tdb, op);
1184         tdb_trace_record(tdb, rec);
1185         tdb_trace_end(tdb);
1186 }
1187
1188 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1189                         TDB_DATA rec, int ret)
1190 {
1191         tdb_trace_start(tdb);
1192         tdb_trace_write(tdb, op);
1193         tdb_trace_record(tdb, rec);
1194         tdb_trace_end_ret(tdb, ret);
1195 }
1196
1197 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1198                            TDB_DATA rec, TDB_DATA ret)
1199 {
1200         tdb_trace_start(tdb);
1201         tdb_trace_write(tdb, op);
1202         tdb_trace_record(tdb, rec);
1203         tdb_trace_write(tdb, " =");
1204         tdb_trace_record(tdb, ret);
1205         tdb_trace_end(tdb);
1206 }
1207
1208 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1209                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1210                              int ret)
1211 {
1212         char msg[1 + sizeof(ret) * 4];
1213
1214         snprintf(msg, sizeof(msg), " %#x", flag);
1215         tdb_trace_start(tdb);
1216         tdb_trace_write(tdb, op);
1217         tdb_trace_record(tdb, rec1);
1218         tdb_trace_record(tdb, rec2);
1219         tdb_trace_write(tdb, msg);
1220         tdb_trace_end_ret(tdb, ret);
1221 }
1222
1223 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1224                                    TDB_DATA rec,
1225                                    const TDB_DATA *recs, int num_recs,
1226                                    unsigned flag, int ret)
1227 {
1228         char msg[1 + sizeof(ret) * 4];
1229         int i;
1230
1231         snprintf(msg, sizeof(msg), " %#x", flag);
1232         tdb_trace_start(tdb);
1233         tdb_trace_write(tdb, op);
1234         tdb_trace_record(tdb, rec);
1235         for (i=0; i<num_recs; i++) {
1236                 tdb_trace_record(tdb, recs[i]);
1237         }
1238         tdb_trace_write(tdb, msg);
1239         tdb_trace_end_ret(tdb, ret);
1240 }
1241
1242 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1243                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1244 {
1245         tdb_trace_start(tdb);
1246         tdb_trace_write(tdb, op);
1247         tdb_trace_record(tdb, rec1);
1248         tdb_trace_record(tdb, rec2);
1249         tdb_trace_write(tdb, " =");
1250         tdb_trace_record(tdb, ret);
1251         tdb_trace_end(tdb);
1252 }
1253 #endif