tdb: Make record deletion circular-chain safe
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 void tdb_chainwalk_init(struct tdb_chainwalk_ctx *ctx, tdb_off_t ptr)
83 {
84         *ctx = (struct tdb_chainwalk_ctx) { .slow_ptr = ptr };
85 }
86
87 bool tdb_chainwalk_check(struct tdb_context *tdb,
88                          struct tdb_chainwalk_ctx *ctx,
89                          tdb_off_t next_ptr)
90 {
91         int ret;
92
93         if (ctx->slow_chase) {
94                 ret = tdb_ofs_read(tdb, ctx->slow_ptr, &ctx->slow_ptr);
95                 if (ret == -1) {
96                         return false;
97                 }
98         }
99         ctx->slow_chase = !ctx->slow_chase;
100
101         if (next_ptr == ctx->slow_ptr) {
102                 tdb->ecode = TDB_ERR_CORRUPT;
103                 TDB_LOG((tdb, TDB_DEBUG_ERROR,
104                          "tdb_chainwalk_check: circular chain\n"));
105                 return false;
106         }
107
108         return true;
109 }
110
111 /* Returns 0 on fail.  On success, return offset of record, and fills
112    in rec */
113 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
114                         struct tdb_record *r)
115 {
116         tdb_off_t rec_ptr;
117         struct tdb_chainwalk_ctx chainwalk;
118
119         /* read in the hash top */
120         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
121                 return 0;
122
123         tdb_chainwalk_init(&chainwalk, rec_ptr);
124
125         /* keep looking until we find the right record */
126         while (rec_ptr) {
127                 bool ok;
128
129                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
130                         return 0;
131
132                 if (!TDB_DEAD(r) && hash==r->full_hash
133                     && key.dsize==r->key_len
134                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
135                                       r->key_len, tdb_key_compare,
136                                       NULL) == 0) {
137                         return rec_ptr;
138                 }
139                 rec_ptr = r->next;
140
141                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
142                 if (!ok) {
143                         return 0;
144                 }
145         }
146         tdb->ecode = TDB_ERR_NOEXIST;
147         return 0;
148 }
149
150 /* As tdb_find, but if you succeed, keep the lock */
151 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
152                            struct tdb_record *rec)
153 {
154         uint32_t rec_ptr;
155
156         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
157                 return 0;
158         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
159                 tdb_unlock(tdb, BUCKET(hash), locktype);
160         return rec_ptr;
161 }
162
163 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
164
165 struct tdb_update_hash_state {
166         const TDB_DATA *dbufs;
167         int num_dbufs;
168         tdb_len_t dbufs_len;
169 };
170
171 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
172 {
173         struct tdb_update_hash_state *state = private_data;
174         unsigned char *dptr = data.dptr;
175         int i;
176
177         if (state->dbufs_len != data.dsize) {
178                 return -1;
179         }
180
181         for (i=0; i<state->num_dbufs; i++) {
182                 TDB_DATA dbuf = state->dbufs[i];
183                 int ret;
184                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
185                 if (ret != 0) {
186                         return -1;
187                 }
188                 dptr += dbuf.dsize;
189         }
190
191         return 0;
192 }
193
194 /* update an entry in place - this only works if the new data size
195    is <= the old data size and the key exists.
196    on failure return -1.
197 */
198 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
199                            uint32_t hash,
200                            const TDB_DATA *dbufs, int num_dbufs,
201                            tdb_len_t dbufs_len)
202 {
203         struct tdb_record rec;
204         tdb_off_t rec_ptr, ofs;
205         int i;
206
207         /* find entry */
208         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
209                 return -1;
210
211         /* it could be an exact duplicate of what is there - this is
212          * surprisingly common (eg. with a ldb re-index). */
213         if (rec.data_len == dbufs_len) {
214                 struct tdb_update_hash_state state = {
215                         .dbufs = dbufs, .num_dbufs = num_dbufs,
216                         .dbufs_len = dbufs_len
217                 };
218                 int ret;
219
220                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
221                 if (ret == 0) {
222                         return 0;
223                 }
224         }
225
226         /* must be long enough key, data and tailer */
227         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
228                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
229                 return -1;
230         }
231
232         ofs = rec_ptr + sizeof(rec) + rec.key_len;
233
234         for (i=0; i<num_dbufs; i++) {
235                 TDB_DATA dbuf = dbufs[i];
236                 int ret;
237
238                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
239                 if (ret == -1) {
240                         return -1;
241                 }
242                 ofs += dbuf.dsize;
243         }
244
245         if (dbufs_len != rec.data_len) {
246                 /* update size */
247                 rec.data_len = dbufs_len;
248                 return tdb_rec_write(tdb, rec_ptr, &rec);
249         }
250
251         return 0;
252 }
253
254 /* find an entry in the database given a key */
255 /* If an entry doesn't exist tdb_err will be set to
256  * TDB_ERR_NOEXIST. If a key has no data attached
257  * then the TDB_DATA will have zero length but
258  * a non-zero pointer
259  */
260 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
261 {
262         tdb_off_t rec_ptr;
263         struct tdb_record rec;
264         TDB_DATA ret;
265         uint32_t hash;
266
267         /* find which hash bucket it is in */
268         hash = tdb->hash_fn(&key);
269         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
270                 return tdb_null;
271
272         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
273                                   rec.data_len);
274         ret.dsize = rec.data_len;
275         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
276         return ret;
277 }
278
279 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
280 {
281         TDB_DATA ret = _tdb_fetch(tdb, key);
282
283         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
284         return ret;
285 }
286
287 /*
288  * Find an entry in the database and hand the record's data to a parsing
289  * function. The parsing function is executed under the chain read lock, so it
290  * should be fast and should not block on other syscalls.
291  *
292  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
293  *
294  * For mmapped tdb's that do not have a transaction open it points the parsing
295  * function directly at the mmap area, it avoids the malloc/memcpy in this
296  * case. If a transaction is open or no mmap is available, it has to do
297  * malloc/read/parse/free.
298  *
299  * This is interesting for all readers of potentially large data structures in
300  * the tdb records, ldb indexes being one example.
301  *
302  * Return -1 if the record was not found.
303  */
304
305 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
306                      int (*parser)(TDB_DATA key, TDB_DATA data,
307                                    void *private_data),
308                      void *private_data)
309 {
310         tdb_off_t rec_ptr;
311         struct tdb_record rec;
312         int ret;
313         uint32_t hash;
314
315         /* find which hash bucket it is in */
316         hash = tdb->hash_fn(&key);
317
318         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
319                 /* record not found */
320                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
321                 tdb->ecode = TDB_ERR_NOEXIST;
322                 return -1;
323         }
324         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
325
326         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
327                              rec.data_len, parser, private_data);
328
329         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
330
331         return ret;
332 }
333
334 /* check if an entry in the database exists
335
336    note that 1 is returned if the key is found and 0 is returned if not found
337    this doesn't match the conventions in the rest of this module, but is
338    compatible with gdbm
339 */
340 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
341 {
342         struct tdb_record rec;
343
344         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
345                 return 0;
346         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
347         return 1;
348 }
349
350 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
351 {
352         uint32_t hash = tdb->hash_fn(&key);
353         int ret;
354
355         ret = tdb_exists_hash(tdb, key, hash);
356         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
357         return ret;
358 }
359
360 /*
361  * Move a dead record to the freelist. The hash chain and freelist
362  * must be locked.
363  */
364 static int tdb_del_dead(struct tdb_context *tdb,
365                         uint32_t last_ptr,
366                         uint32_t rec_ptr,
367                         struct tdb_record *rec,
368                         bool *deleted)
369 {
370         int ret;
371
372         ret = tdb_write_lock_record(tdb, rec_ptr);
373         if (ret == -1) {
374                 /* Someone traversing here: Just leave it dead */
375                 return 0;
376         }
377         ret = tdb_write_unlock_record(tdb, rec_ptr);
378         if (ret == -1) {
379                 return -1;
380         }
381         ret = tdb_ofs_write(tdb, last_ptr, &rec->next);
382         if (ret == -1) {
383                 return -1;
384         }
385
386         *deleted = true;
387
388         ret = tdb_free(tdb, rec_ptr, rec);
389         return ret;
390 }
391
392 /*
393  * Walk the hash chain and leave tdb->max_dead_records around. Move
394  * the rest of dead records to the freelist.
395  */
396 int tdb_trim_dead(struct tdb_context *tdb, uint32_t hash)
397 {
398         struct tdb_chainwalk_ctx chainwalk;
399         struct tdb_record rec;
400         tdb_off_t last_ptr, rec_ptr;
401         bool locked_freelist = false;
402         int num_dead = 0;
403         int ret;
404
405         last_ptr = TDB_HASH_TOP(hash);
406
407         /*
408          * Init chainwalk with the pointer to the hash top. It might
409          * be that the very first record in the chain is a dead one
410          * that we have to delete.
411          */
412         tdb_chainwalk_init(&chainwalk, last_ptr);
413
414         ret = tdb_ofs_read(tdb, last_ptr, &rec_ptr);
415         if (ret == -1) {
416                 return -1;
417         }
418
419         while (rec_ptr != 0) {
420                 bool deleted = false;
421                 uint32_t next;
422
423                 ret = tdb_rec_read(tdb, rec_ptr, &rec);
424                 if (ret == -1) {
425                         goto fail;
426                 }
427
428                 /*
429                  * Make a copy of rec.next: Further down we might
430                  * delete and put the record on the freelist. Make
431                  * sure that modifications in that code path can't
432                  * break the chainwalk here.
433                  */
434                 next = rec.next;
435
436                 if (rec.magic == TDB_DEAD_MAGIC) {
437                         num_dead += 1;
438
439                         if (num_dead > tdb->max_dead_records) {
440
441                                 if (!locked_freelist) {
442                                         /*
443                                          * Lock the freelist only if
444                                          * it's really required.
445                                          */
446                                         ret = tdb_lock(tdb, -1, F_WRLCK);
447                                         if (ret == -1) {
448                                                 goto fail;
449                                         };
450                                         locked_freelist = true;
451                                 }
452
453                                 ret = tdb_del_dead(
454                                         tdb,
455                                         last_ptr,
456                                         rec_ptr,
457                                         &rec,
458                                         &deleted);
459
460                                 if (ret == -1) {
461                                         goto fail;
462                                 }
463                         }
464                 }
465
466                 /*
467                  * Don't do the chainwalk check if "rec_ptr" was
468                  * deleted. We reduced the chain, and the chainwalk
469                  * check might catch up early. Imagine a valid chain
470                  * with just dead records: We never can bump the
471                  * "slow" pointer in chainwalk_check, as there isn't
472                  * anything left to jump to and compare.
473                  */
474                 if (!deleted) {
475                         bool ok;
476
477                         last_ptr = rec_ptr;
478
479                         ok = tdb_chainwalk_check(tdb, &chainwalk, next);
480                         if (!ok) {
481                                 ret = -1;
482                                 goto fail;
483                         }
484                 }
485                 rec_ptr = next;
486         }
487         ret = 0;
488 fail:
489         if (locked_freelist) {
490                 tdb_unlock(tdb, -1, F_WRLCK);
491         }
492         return ret;
493 }
494
495 /* delete an entry in the database given a key */
496 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
497 {
498         tdb_off_t rec_ptr;
499         struct tdb_record rec;
500         int ret;
501
502         if (tdb->read_only || tdb->traverse_read) {
503                 tdb->ecode = TDB_ERR_RDONLY;
504                 return -1;
505         }
506
507         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
508         if (rec_ptr == 0) {
509                 return -1;
510         }
511
512         /*
513          * Mark the record dead
514          */
515         rec.magic = TDB_DEAD_MAGIC;
516         ret = tdb_rec_write(tdb, rec_ptr, &rec);
517         if (ret == -1) {
518                 goto done;
519         }
520
521         tdb_increment_seqnum(tdb);
522
523         ret = tdb_trim_dead(tdb, hash);
524 done:
525         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
526                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
527         return ret;
528 }
529
530 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
531 {
532         uint32_t hash = tdb->hash_fn(&key);
533         int ret;
534
535         ret = tdb_delete_hash(tdb, key, hash);
536         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
537         return ret;
538 }
539
540 /*
541  * See if we have a dead record around with enough space
542  */
543 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
544                         struct tdb_record *r, tdb_len_t length,
545                         tdb_off_t *p_last_ptr)
546 {
547         tdb_off_t rec_ptr, last_ptr;
548         struct tdb_chainwalk_ctx chainwalk;
549         tdb_off_t best_rec_ptr = 0;
550         tdb_off_t best_last_ptr = 0;
551         struct tdb_record best = { .rec_len = UINT32_MAX };
552
553         length += sizeof(tdb_off_t); /* tailer */
554
555         last_ptr = TDB_HASH_TOP(hash);
556
557         /* read in the hash top */
558         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
559                 return 0;
560
561         tdb_chainwalk_init(&chainwalk, rec_ptr);
562
563         /* keep looking until we find the right record */
564         while (rec_ptr) {
565                 bool ok;
566
567                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
568                         return 0;
569
570                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
571                     (r->rec_len < best.rec_len)) {
572                         best_rec_ptr = rec_ptr;
573                         best_last_ptr = last_ptr;
574                         best = *r;
575                 }
576                 last_ptr = rec_ptr;
577                 rec_ptr = r->next;
578
579                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
580                 if (!ok) {
581                         return 0;
582                 }
583         }
584
585         if (best.rec_len == UINT32_MAX) {
586                 return 0;
587         }
588
589         *r = best;
590         *p_last_ptr = best_last_ptr;
591         return best_rec_ptr;
592 }
593
594 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
595                        const TDB_DATA *dbufs, int num_dbufs,
596                        int flag, uint32_t hash)
597 {
598         struct tdb_record rec;
599         tdb_off_t rec_ptr, ofs;
600         tdb_len_t rec_len, dbufs_len;
601         int i;
602         int ret = -1;
603
604         dbufs_len = 0;
605
606         for (i=0; i<num_dbufs; i++) {
607                 size_t dsize = dbufs[i].dsize;
608
609                 if ((dsize != 0) && (dbufs[i].dptr == NULL)) {
610                         tdb->ecode = TDB_ERR_EINVAL;
611                         goto fail;
612                 }
613
614                 dbufs_len += dsize;
615                 if (dbufs_len < dsize) {
616                         tdb->ecode = TDB_ERR_OOM;
617                         goto fail;
618                 }
619         }
620
621         rec_len = key.dsize + dbufs_len;
622         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
623                 tdb->ecode = TDB_ERR_OOM;
624                 goto fail;
625         }
626
627         /* check for it existing, on insert. */
628         if (flag == TDB_INSERT) {
629                 if (tdb_exists_hash(tdb, key, hash)) {
630                         tdb->ecode = TDB_ERR_EXISTS;
631                         goto fail;
632                 }
633         } else {
634                 /* first try in-place update, on modify or replace. */
635                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
636                                     dbufs_len) == 0) {
637                         goto done;
638                 }
639                 if (tdb->ecode == TDB_ERR_NOEXIST &&
640                     flag == TDB_MODIFY) {
641                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
642                          we should fail the store */
643                         goto fail;
644                 }
645         }
646         /* reset the error code potentially set by the tdb_update_hash() */
647         tdb->ecode = TDB_SUCCESS;
648
649         /* delete any existing record - if it doesn't exist we don't
650            care.  Doing this first reduces fragmentation, and avoids
651            coalescing with `allocated' block before it's updated. */
652         if (flag != TDB_INSERT)
653                 tdb_delete_hash(tdb, key, hash);
654
655         /* we have to allocate some space */
656         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
657
658         if (rec_ptr == 0) {
659                 goto fail;
660         }
661
662         /* Read hash top into next ptr */
663         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
664                 goto fail;
665
666         rec.key_len = key.dsize;
667         rec.data_len = dbufs_len;
668         rec.full_hash = hash;
669         rec.magic = TDB_MAGIC;
670
671         ofs = rec_ptr;
672
673         /* write out and point the top of the hash chain at it */
674         ret = tdb_rec_write(tdb, ofs, &rec);
675         if (ret == -1) {
676                 goto fail;
677         }
678         ofs += sizeof(rec);
679
680         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
681         if (ret == -1) {
682                 goto fail;
683         }
684         ofs += key.dsize;
685
686         for (i=0; i<num_dbufs; i++) {
687                 if (dbufs[i].dsize == 0) {
688                         continue;
689                 }
690
691                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
692                                               dbufs[i].dsize);
693                 if (ret == -1) {
694                         goto fail;
695                 }
696                 ofs += dbufs[i].dsize;
697         }
698
699         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
700         if (ret == -1) {
701                 /* Need to tdb_unallocate() here */
702                 goto fail;
703         }
704
705  done:
706         ret = 0;
707  fail:
708         if (ret == 0) {
709                 tdb_increment_seqnum(tdb);
710         }
711         return ret;
712 }
713
714 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
715                       TDB_DATA dbuf, int flag, uint32_t hash)
716 {
717         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
718 }
719
720 /* store an element in the database, replacing any existing element
721    with the same key
722
723    return 0 on success, -1 on failure
724 */
725 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
726 {
727         uint32_t hash;
728         int ret;
729
730         if (tdb->read_only || tdb->traverse_read) {
731                 tdb->ecode = TDB_ERR_RDONLY;
732                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
733                 return -1;
734         }
735
736         /* find which hash bucket it is in */
737         hash = tdb->hash_fn(&key);
738         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
739                 return -1;
740
741         ret = _tdb_store(tdb, key, dbuf, flag, hash);
742         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
743         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
744         return ret;
745 }
746
747 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
748                         const TDB_DATA *dbufs, int num_dbufs, int flag)
749 {
750         uint32_t hash;
751         int ret;
752
753         if (tdb->read_only || tdb->traverse_read) {
754                 tdb->ecode = TDB_ERR_RDONLY;
755                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
756                                               dbufs, num_dbufs, flag, -1);
757                 return -1;
758         }
759
760         /* find which hash bucket it is in */
761         hash = tdb->hash_fn(&key);
762         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
763                 return -1;
764
765         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
766         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
767                                       dbufs, num_dbufs, flag, -1);
768         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
769         return ret;
770 }
771
772 /* Append to an entry. Create if not exist. */
773 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
774 {
775         uint32_t hash;
776         TDB_DATA dbufs[2];
777         int ret = -1;
778
779         /* find which hash bucket it is in */
780         hash = tdb->hash_fn(&key);
781         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
782                 return -1;
783
784         dbufs[0] = _tdb_fetch(tdb, key);
785         dbufs[1] = new_dbuf;
786
787         ret = _tdb_storev(tdb, key, dbufs, 2, 0, hash);
788         tdb_trace_2rec_retrec(tdb, "tdb_append", key, dbufs[0], dbufs[1]);
789
790         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
791         SAFE_FREE(dbufs[0].dptr);
792         return ret;
793 }
794
795
796 /*
797   return the name of the current tdb file
798   useful for external logging functions
799 */
800 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
801 {
802         return tdb->name;
803 }
804
805 /*
806   return the underlying file descriptor being used by tdb, or -1
807   useful for external routines that want to check the device/inode
808   of the fd
809 */
810 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
811 {
812         return tdb->fd;
813 }
814
815 /*
816   return the current logging function
817   useful for external tdb routines that wish to log tdb errors
818 */
819 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
820 {
821         return tdb->log.log_fn;
822 }
823
824
825 /*
826   get the tdb sequence number. Only makes sense if the writers opened
827   with TDB_SEQNUM set. Note that this sequence number will wrap quite
828   quickly, so it should only be used for a 'has something changed'
829   test, not for code that relies on the count of the number of changes
830   made. If you want a counter then use a tdb record.
831
832   The aim of this sequence number is to allow for a very lightweight
833   test of a possible tdb change.
834 */
835 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
836 {
837         tdb_off_t seqnum=0;
838
839         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
840         return seqnum;
841 }
842
843 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
844 {
845         return tdb->hash_size;
846 }
847
848 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
849 {
850         return tdb->map_size;
851 }
852
853 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
854 {
855         return tdb->flags;
856 }
857
858 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
859 {
860         if ((flags & TDB_ALLOW_NESTING) &&
861             (flags & TDB_DISALLOW_NESTING)) {
862                 tdb->ecode = TDB_ERR_NESTING;
863                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
864                         "allow_nesting and disallow_nesting are not allowed together!"));
865                 return;
866         }
867
868         if (flags & TDB_ALLOW_NESTING) {
869                 tdb->flags &= ~TDB_DISALLOW_NESTING;
870         }
871         if (flags & TDB_DISALLOW_NESTING) {
872                 tdb->flags &= ~TDB_ALLOW_NESTING;
873         }
874
875         tdb->flags |= flags;
876 }
877
878 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
879 {
880         if ((flags & TDB_ALLOW_NESTING) &&
881             (flags & TDB_DISALLOW_NESTING)) {
882                 tdb->ecode = TDB_ERR_NESTING;
883                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
884                         "allow_nesting and disallow_nesting are not allowed together!"));
885                 return;
886         }
887
888         if ((flags & TDB_NOLOCK) &&
889             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
890             (tdb->mutexes == NULL)) {
891                 tdb->ecode = TDB_ERR_LOCK;
892                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
893                          "Can not remove NOLOCK flag on mutexed databases"));
894                 return;
895         }
896
897         if (flags & TDB_ALLOW_NESTING) {
898                 tdb->flags |= TDB_DISALLOW_NESTING;
899         }
900         if (flags & TDB_DISALLOW_NESTING) {
901                 tdb->flags |= TDB_ALLOW_NESTING;
902         }
903
904         tdb->flags &= ~flags;
905 }
906
907
908 /*
909   enable sequence number handling on an open tdb
910 */
911 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
912 {
913         tdb->flags |= TDB_SEQNUM;
914 }
915
916
917 /*
918   add a region of the file to the freelist. Length is the size of the region in bytes,
919   which includes the free list header that needs to be added
920  */
921 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
922 {
923         struct tdb_record rec;
924         if (length <= sizeof(rec)) {
925                 /* the region is not worth adding */
926                 return 0;
927         }
928         if (length + offset > tdb->map_size) {
929                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
930                 return -1;
931         }
932         memset(&rec,'\0',sizeof(rec));
933         rec.rec_len = length - sizeof(rec);
934         if (tdb_free(tdb, offset, &rec) == -1) {
935                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
936                 return -1;
937         }
938         return 0;
939 }
940
941 /*
942   wipe the entire database, deleting all records. This can be done
943   very fast by using a allrecord lock. The entire data portion of the
944   file becomes a single entry in the freelist.
945
946   This code carefully steps around the recovery area, leaving it alone
947  */
948 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
949 {
950         uint32_t i;
951         tdb_off_t offset = 0;
952         ssize_t data_len;
953         tdb_off_t recovery_head;
954         tdb_len_t recovery_size = 0;
955
956         if (tdb_lockall(tdb) != 0) {
957                 return -1;
958         }
959
960         tdb_trace(tdb, "tdb_wipe_all");
961
962         /* see if the tdb has a recovery area, and remember its size
963            if so. We don't want to lose this as otherwise each
964            tdb_wipe_all() in a transaction will increase the size of
965            the tdb by the size of the recovery area */
966         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
967                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
968                 goto failed;
969         }
970
971         if (recovery_head != 0) {
972                 struct tdb_record rec;
973                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
974                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
975                         return -1;
976                 }
977                 recovery_size = rec.rec_len + sizeof(rec);
978         }
979
980         /* wipe the hashes */
981         for (i=0;i<tdb->hash_size;i++) {
982                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
983                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
984                         goto failed;
985                 }
986         }
987
988         /* wipe the freelist */
989         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
990                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
991                 goto failed;
992         }
993
994         /* add all the rest of the file to the freelist, possibly leaving a gap
995            for the recovery area */
996         if (recovery_size == 0) {
997                 /* the simple case - the whole file can be used as a freelist */
998                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
999                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
1000                         goto failed;
1001                 }
1002         } else {
1003                 /* we need to add two freelist entries - one on either
1004                    side of the recovery area
1005
1006                    Note that we cannot shift the recovery area during
1007                    this operation. Only the transaction.c code may
1008                    move the recovery area or we risk subtle data
1009                    corruption
1010                 */
1011                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
1012                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
1013                         goto failed;
1014                 }
1015                 /* and the 2nd free list entry after the recovery area - if any */
1016                 data_len = tdb->map_size - (recovery_head+recovery_size);
1017                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
1018                         goto failed;
1019                 }
1020         }
1021
1022         tdb_increment_seqnum_nonblock(tdb);
1023
1024         if (tdb_unlockall(tdb) != 0) {
1025                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
1026                 goto failed;
1027         }
1028
1029         return 0;
1030
1031 failed:
1032         tdb_unlockall(tdb);
1033         return -1;
1034 }
1035
1036 struct traverse_state {
1037         bool error;
1038         struct tdb_context *dest_db;
1039 };
1040
1041 /*
1042   traverse function for repacking
1043  */
1044 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
1045 {
1046         struct traverse_state *state = (struct traverse_state *)private_data;
1047         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
1048                 state->error = true;
1049                 return -1;
1050         }
1051         return 0;
1052 }
1053
1054 /*
1055   repack a tdb
1056  */
1057 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
1058 {
1059         struct tdb_context *tmp_db;
1060         struct traverse_state state;
1061
1062         tdb_trace(tdb, "tdb_repack");
1063
1064         if (tdb_transaction_start(tdb) != 0) {
1065                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
1066                 return -1;
1067         }
1068
1069         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
1070         if (tmp_db == NULL) {
1071                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
1072                 tdb_transaction_cancel(tdb);
1073                 return -1;
1074         }
1075
1076         state.error = false;
1077         state.dest_db = tmp_db;
1078
1079         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1080                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1081                 tdb_transaction_cancel(tdb);
1082                 tdb_close(tmp_db);
1083                 return -1;
1084         }
1085
1086         if (state.error) {
1087                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1088                 tdb_transaction_cancel(tdb);
1089                 tdb_close(tmp_db);
1090                 return -1;
1091         }
1092
1093         if (tdb_wipe_all(tdb) != 0) {
1094                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1095                 tdb_transaction_cancel(tdb);
1096                 tdb_close(tmp_db);
1097                 return -1;
1098         }
1099
1100         state.error = false;
1101         state.dest_db = tdb;
1102
1103         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1104                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1105                 tdb_transaction_cancel(tdb);
1106                 tdb_close(tmp_db);
1107                 return -1;
1108         }
1109
1110         if (state.error) {
1111                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1112                 tdb_transaction_cancel(tdb);
1113                 tdb_close(tmp_db);
1114                 return -1;
1115         }
1116
1117         tdb_close(tmp_db);
1118
1119         if (tdb_transaction_commit(tdb) != 0) {
1120                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1121                 return -1;
1122         }
1123
1124         return 0;
1125 }
1126
1127 /* Even on files, we can get partial writes due to signals. */
1128 bool tdb_write_all(int fd, const void *buf, size_t count)
1129 {
1130         while (count) {
1131                 ssize_t ret;
1132                 ret = write(fd, buf, count);
1133                 if (ret < 0)
1134                         return false;
1135                 buf = (const char *)buf + ret;
1136                 count -= ret;
1137         }
1138         return true;
1139 }
1140
1141 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1142 {
1143         tdb_off_t ret = a + b;
1144
1145         if ((ret < a) || (ret < b)) {
1146                 return false;
1147         }
1148         *pret = ret;
1149         return true;
1150 }
1151
1152 #ifdef TDB_TRACE
1153 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1154 {
1155         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1156                 close(tdb->tracefd);
1157                 tdb->tracefd = -1;
1158         }
1159 }
1160
1161 static void tdb_trace_start(struct tdb_context *tdb)
1162 {
1163         tdb_off_t seqnum=0;
1164         char msg[sizeof(tdb_off_t) * 4 + 1];
1165
1166         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1167         snprintf(msg, sizeof(msg), "%u ", seqnum);
1168         tdb_trace_write(tdb, msg);
1169 }
1170
1171 static void tdb_trace_end(struct tdb_context *tdb)
1172 {
1173         tdb_trace_write(tdb, "\n");
1174 }
1175
1176 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1177 {
1178         char msg[sizeof(ret) * 4 + 4];
1179         snprintf(msg, sizeof(msg), " = %i\n", ret);
1180         tdb_trace_write(tdb, msg);
1181 }
1182
1183 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1184 {
1185         char msg[20 + rec.dsize*2], *p;
1186         unsigned int i;
1187
1188         /* We differentiate zero-length records from non-existent ones. */
1189         if (rec.dptr == NULL) {
1190                 tdb_trace_write(tdb, " NULL");
1191                 return;
1192         }
1193
1194         /* snprintf here is purely cargo-cult programming. */
1195         p = msg;
1196         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1197         for (i = 0; i < rec.dsize; i++)
1198                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1199
1200         tdb_trace_write(tdb, msg);
1201 }
1202
1203 void tdb_trace(struct tdb_context *tdb, const char *op)
1204 {
1205         tdb_trace_start(tdb);
1206         tdb_trace_write(tdb, op);
1207         tdb_trace_end(tdb);
1208 }
1209
1210 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1211 {
1212         char msg[sizeof(tdb_off_t) * 4 + 1];
1213
1214         snprintf(msg, sizeof(msg), "%u ", seqnum);
1215         tdb_trace_write(tdb, msg);
1216         tdb_trace_write(tdb, op);
1217         tdb_trace_end(tdb);
1218 }
1219
1220 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1221                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1222 {
1223         char msg[128];
1224
1225         snprintf(msg, sizeof(msg),
1226                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1227         tdb_trace_start(tdb);
1228         tdb_trace_write(tdb, msg);
1229         tdb_trace_end(tdb);
1230 }
1231
1232 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1233 {
1234         tdb_trace_start(tdb);
1235         tdb_trace_write(tdb, op);
1236         tdb_trace_end_ret(tdb, ret);
1237 }
1238
1239 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1240 {
1241         tdb_trace_start(tdb);
1242         tdb_trace_write(tdb, op);
1243         tdb_trace_write(tdb, " =");
1244         tdb_trace_record(tdb, ret);
1245         tdb_trace_end(tdb);
1246 }
1247
1248 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1249                     TDB_DATA rec)
1250 {
1251         tdb_trace_start(tdb);
1252         tdb_trace_write(tdb, op);
1253         tdb_trace_record(tdb, rec);
1254         tdb_trace_end(tdb);
1255 }
1256
1257 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1258                         TDB_DATA rec, int ret)
1259 {
1260         tdb_trace_start(tdb);
1261         tdb_trace_write(tdb, op);
1262         tdb_trace_record(tdb, rec);
1263         tdb_trace_end_ret(tdb, ret);
1264 }
1265
1266 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1267                            TDB_DATA rec, TDB_DATA ret)
1268 {
1269         tdb_trace_start(tdb);
1270         tdb_trace_write(tdb, op);
1271         tdb_trace_record(tdb, rec);
1272         tdb_trace_write(tdb, " =");
1273         tdb_trace_record(tdb, ret);
1274         tdb_trace_end(tdb);
1275 }
1276
1277 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1278                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1279                              int ret)
1280 {
1281         char msg[1 + sizeof(ret) * 4];
1282
1283         snprintf(msg, sizeof(msg), " %#x", flag);
1284         tdb_trace_start(tdb);
1285         tdb_trace_write(tdb, op);
1286         tdb_trace_record(tdb, rec1);
1287         tdb_trace_record(tdb, rec2);
1288         tdb_trace_write(tdb, msg);
1289         tdb_trace_end_ret(tdb, ret);
1290 }
1291
1292 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1293                                    TDB_DATA rec,
1294                                    const TDB_DATA *recs, int num_recs,
1295                                    unsigned flag, int ret)
1296 {
1297         char msg[1 + sizeof(ret) * 4];
1298         int i;
1299
1300         snprintf(msg, sizeof(msg), " %#x", flag);
1301         tdb_trace_start(tdb);
1302         tdb_trace_write(tdb, op);
1303         tdb_trace_record(tdb, rec);
1304         for (i=0; i<num_recs; i++) {
1305                 tdb_trace_record(tdb, recs[i]);
1306         }
1307         tdb_trace_write(tdb, msg);
1308         tdb_trace_end_ret(tdb, ret);
1309 }
1310
1311 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1312                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1313 {
1314         tdb_trace_start(tdb);
1315         tdb_trace_write(tdb, op);
1316         tdb_trace_record(tdb, rec1);
1317         tdb_trace_record(tdb, rec2);
1318         tdb_trace_write(tdb, " =");
1319         tdb_trace_record(tdb, ret);
1320         tdb_trace_end(tdb);
1321 }
1322 #endif