tdb: Avoid a malloc/memcpy in _tdb_store
[mat/samba.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
63                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 /* update an entry in place - this only works if the new data size
128    is <= the old data size and the key exists.
129    on failure return -1.
130 */
131 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
132 {
133         struct tdb_record rec;
134         tdb_off_t rec_ptr;
135
136         /* find entry */
137         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
138                 return -1;
139
140         /* it could be an exact duplicate of what is there - this is
141          * surprisingly common (eg. with a ldb re-index). */
142         if (rec.key_len == key.dsize && 
143             rec.data_len == dbuf.dsize &&
144             rec.full_hash == hash) {
145                 TDB_DATA data = _tdb_fetch(tdb, key);
146                 if (data.dsize == dbuf.dsize &&
147                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
148                         if (data.dptr) {
149                                 free(data.dptr);
150                         }
151                         return 0;
152                 }
153                 if (data.dptr) {
154                         free(data.dptr);
155                 }
156         }
157
158         /* must be long enough key, data and tailer */
159         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
160                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
161                 return -1;
162         }
163
164         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
165                       dbuf.dptr, dbuf.dsize) == -1)
166                 return -1;
167
168         if (dbuf.dsize != rec.data_len) {
169                 /* update size */
170                 rec.data_len = dbuf.dsize;
171                 return tdb_rec_write(tdb, rec_ptr, &rec);
172         }
173
174         return 0;
175 }
176
177 /* find an entry in the database given a key */
178 /* If an entry doesn't exist tdb_err will be set to
179  * TDB_ERR_NOEXIST. If a key has no data attached
180  * then the TDB_DATA will have zero length but
181  * a non-zero pointer
182  */
183 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
184 {
185         tdb_off_t rec_ptr;
186         struct tdb_record rec;
187         TDB_DATA ret;
188         uint32_t hash;
189
190         /* find which hash bucket it is in */
191         hash = tdb->hash_fn(&key);
192         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
193                 return tdb_null;
194
195         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
196                                   rec.data_len);
197         ret.dsize = rec.data_len;
198         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
199         return ret;
200 }
201
202 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
203 {
204         TDB_DATA ret = _tdb_fetch(tdb, key);
205
206         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
207         return ret;
208 }
209
210 /*
211  * Find an entry in the database and hand the record's data to a parsing
212  * function. The parsing function is executed under the chain read lock, so it
213  * should be fast and should not block on other syscalls.
214  *
215  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
216  *
217  * For mmapped tdb's that do not have a transaction open it points the parsing
218  * function directly at the mmap area, it avoids the malloc/memcpy in this
219  * case. If a transaction is open or no mmap is available, it has to do
220  * malloc/read/parse/free.
221  *
222  * This is interesting for all readers of potentially large data structures in
223  * the tdb records, ldb indexes being one example.
224  *
225  * Return -1 if the record was not found.
226  */
227
228 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
229                      int (*parser)(TDB_DATA key, TDB_DATA data,
230                                    void *private_data),
231                      void *private_data)
232 {
233         tdb_off_t rec_ptr;
234         struct tdb_record rec;
235         int ret;
236         uint32_t hash;
237
238         /* find which hash bucket it is in */
239         hash = tdb->hash_fn(&key);
240
241         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
242                 /* record not found */
243                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
244                 tdb->ecode = TDB_ERR_NOEXIST;
245                 return -1;
246         }
247         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
248
249         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
250                              rec.data_len, parser, private_data);
251
252         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
253
254         return ret;
255 }
256
257 /* check if an entry in the database exists 
258
259    note that 1 is returned if the key is found and 0 is returned if not found
260    this doesn't match the conventions in the rest of this module, but is
261    compatible with gdbm
262 */
263 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
264 {
265         struct tdb_record rec;
266
267         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
268                 return 0;
269         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
270         return 1;
271 }
272
273 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
274 {
275         uint32_t hash = tdb->hash_fn(&key);
276         int ret;
277
278         ret = tdb_exists_hash(tdb, key, hash);
279         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
280         return ret;
281 }
282
283 /* actually delete an entry in the database given the offset */
284 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
285 {
286         tdb_off_t last_ptr, i;
287         struct tdb_record lastrec;
288
289         if (tdb->read_only || tdb->traverse_read) return -1;
290
291         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
292             tdb_write_lock_record(tdb, rec_ptr) == -1) {
293                 /* Someone traversing here: mark it as dead */
294                 rec->magic = TDB_DEAD_MAGIC;
295                 return tdb_rec_write(tdb, rec_ptr, rec);
296         }
297         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
298                 return -1;
299
300         /* find previous record in hash chain */
301         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
302                 return -1;
303         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
304                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
305                         return -1;
306
307         /* unlink it: next ptr is at start of record. */
308         if (last_ptr == 0)
309                 last_ptr = TDB_HASH_TOP(rec->full_hash);
310         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
311                 return -1;
312
313         /* recover the space */
314         if (tdb_free(tdb, rec_ptr, rec) == -1)
315                 return -1;
316         return 0;
317 }
318
319 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
320 {
321         int res = 0;
322         tdb_off_t rec_ptr;
323         struct tdb_record rec;
324
325         /* read in the hash top */
326         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
327                 return 0;
328
329         while (rec_ptr) {
330                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
331                         return 0;
332
333                 if (rec.magic == TDB_DEAD_MAGIC) {
334                         res += 1;
335                 }
336                 rec_ptr = rec.next;
337         }
338         return res;
339 }
340
341 /*
342  * Purge all DEAD records from a hash chain
343  */
344 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
345 {
346         int res = -1;
347         struct tdb_record rec;
348         tdb_off_t rec_ptr;
349
350         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
351                 return -1;
352         }
353
354         /* read in the hash top */
355         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
356                 goto fail;
357
358         while (rec_ptr) {
359                 tdb_off_t next;
360
361                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
362                         goto fail;
363                 }
364
365                 next = rec.next;
366
367                 if (rec.magic == TDB_DEAD_MAGIC
368                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
369                         goto fail;
370                 }
371                 rec_ptr = next;
372         }
373         res = 0;
374  fail:
375         tdb_unlock(tdb, -1, F_WRLCK);
376         return res;
377 }
378
379 /* delete an entry in the database given a key */
380 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
381 {
382         tdb_off_t rec_ptr;
383         struct tdb_record rec;
384         int ret;
385
386         if (tdb->max_dead_records != 0) {
387
388                 /*
389                  * Allow for some dead records per hash chain, mainly for
390                  * tdb's with a very high create/delete rate like locking.tdb.
391                  */
392
393                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
394                         return -1;
395
396                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
397                         /*
398                          * Don't let the per-chain freelist grow too large,
399                          * delete all existing dead records
400                          */
401                         tdb_purge_dead(tdb, hash);
402                 }
403
404                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
405                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
406                         return -1;
407                 }
408
409                 /*
410                  * Just mark the record as dead.
411                  */
412                 rec.magic = TDB_DEAD_MAGIC;
413                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
414         }
415         else {
416                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
417                                                    &rec)))
418                         return -1;
419
420                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
421         }
422
423         if (ret == 0) {
424                 tdb_increment_seqnum(tdb);
425         }
426
427         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
428                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
429         return ret;
430 }
431
432 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
433 {
434         uint32_t hash = tdb->hash_fn(&key);
435         int ret;
436
437         ret = tdb_delete_hash(tdb, key, hash);
438         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
439         return ret;
440 }
441
442 /*
443  * See if we have a dead record around with enough space
444  */
445 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
446                                struct tdb_record *r, tdb_len_t length)
447 {
448         tdb_off_t rec_ptr;
449
450         /* read in the hash top */
451         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
452                 return 0;
453
454         /* keep looking until we find the right record */
455         while (rec_ptr) {
456                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
457                         return 0;
458
459                 if (TDB_DEAD(r) && r->rec_len >= length) {
460                         /*
461                          * First fit for simple coding, TODO: change to best
462                          * fit
463                          */
464                         return rec_ptr;
465                 }
466                 rec_ptr = r->next;
467         }
468         return 0;
469 }
470
471 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
472                        TDB_DATA dbuf, int flag, uint32_t hash)
473 {
474         struct tdb_record rec;
475         tdb_off_t rec_ptr;
476         int ret = -1;
477
478         /* check for it existing, on insert. */
479         if (flag == TDB_INSERT) {
480                 if (tdb_exists_hash(tdb, key, hash)) {
481                         tdb->ecode = TDB_ERR_EXISTS;
482                         goto fail;
483                 }
484         } else {
485                 /* first try in-place update, on modify or replace. */
486                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
487                         goto done;
488                 }
489                 if (tdb->ecode == TDB_ERR_NOEXIST &&
490                     flag == TDB_MODIFY) {
491                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
492                          we should fail the store */
493                         goto fail;
494                 }
495         }
496         /* reset the error code potentially set by the tdb_update() */
497         tdb->ecode = TDB_SUCCESS;
498
499         /* delete any existing record - if it doesn't exist we don't
500            care.  Doing this first reduces fragmentation, and avoids
501            coalescing with `allocated' block before it's updated. */
502         if (flag != TDB_INSERT)
503                 tdb_delete_hash(tdb, key, hash);
504
505         if (tdb->max_dead_records != 0) {
506                 /*
507                  * Allow for some dead records per hash chain, look if we can
508                  * find one that can hold the new record. We need enough space
509                  * for key, data and tailer. If we find one, we don't have to
510                  * consult the central freelist.
511                  */
512                 rec_ptr = tdb_find_dead(
513                         tdb, hash, &rec,
514                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
515
516                 if (rec_ptr != 0) {
517                         rec.key_len = key.dsize;
518                         rec.data_len = dbuf.dsize;
519                         rec.full_hash = hash;
520                         rec.magic = TDB_MAGIC;
521                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
522                             || tdb->methods->tdb_write(
523                                     tdb, rec_ptr + sizeof(rec),
524                                     key.dptr, key.dsize) == -1
525                             || tdb->methods->tdb_write(
526                                     tdb, rec_ptr + sizeof(rec) + key.dsize,
527                                     dbuf.dptr, dbuf.dsize) == -1) {
528                                 goto fail;
529                         }
530                         goto done;
531                 }
532         }
533
534         /*
535          * We have to allocate some space from the freelist, so this means we
536          * have to lock it. Use the chance to purge all the DEAD records from
537          * the hash chain under the freelist lock.
538          */
539
540         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
541                 goto fail;
542         }
543
544         if ((tdb->max_dead_records != 0)
545             && (tdb_purge_dead(tdb, hash) == -1)) {
546                 tdb_unlock(tdb, -1, F_WRLCK);
547                 goto fail;
548         }
549
550         /* we have to allocate some space */
551         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
552
553         tdb_unlock(tdb, -1, F_WRLCK);
554
555         if (rec_ptr == 0) {
556                 goto fail;
557         }
558
559         /* Read hash top into next ptr */
560         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
561                 goto fail;
562
563         rec.key_len = key.dsize;
564         rec.data_len = dbuf.dsize;
565         rec.full_hash = hash;
566         rec.magic = TDB_MAGIC;
567
568         /* write out and point the top of the hash chain at it */
569         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
570             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec),
571                                        key.dptr, key.dsize) == -1
572             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec)+key.dsize,
573                                        dbuf.dptr, dbuf.dsize) == -1
574             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
575                 /* Need to tdb_unallocate() here */
576                 goto fail;
577         }
578
579  done:
580         ret = 0;
581  fail:
582         if (ret == 0) {
583                 tdb_increment_seqnum(tdb);
584         }
585         return ret;
586 }
587
588 /* store an element in the database, replacing any existing element
589    with the same key
590
591    return 0 on success, -1 on failure
592 */
593 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
594 {
595         uint32_t hash;
596         int ret;
597
598         if (tdb->read_only || tdb->traverse_read) {
599                 tdb->ecode = TDB_ERR_RDONLY;
600                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
601                 return -1;
602         }
603
604         /* find which hash bucket it is in */
605         hash = tdb->hash_fn(&key);
606         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
607                 return -1;
608
609         ret = _tdb_store(tdb, key, dbuf, flag, hash);
610         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
611         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
612         return ret;
613 }
614
615 /* Append to an entry. Create if not exist. */
616 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
617 {
618         uint32_t hash;
619         TDB_DATA dbuf;
620         int ret = -1;
621
622         /* find which hash bucket it is in */
623         hash = tdb->hash_fn(&key);
624         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
625                 return -1;
626
627         dbuf = _tdb_fetch(tdb, key);
628
629         if (dbuf.dptr == NULL) {
630                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
631         } else {
632                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
633                 unsigned char *new_dptr;
634
635                 /* realloc '0' is special: don't do that. */
636                 if (new_len == 0)
637                         new_len = 1;
638                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
639                 if (new_dptr == NULL) {
640                         free(dbuf.dptr);
641                 }
642                 dbuf.dptr = new_dptr;
643         }
644
645         if (dbuf.dptr == NULL) {
646                 tdb->ecode = TDB_ERR_OOM;
647                 goto failed;
648         }
649
650         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
651         dbuf.dsize += new_dbuf.dsize;
652
653         ret = _tdb_store(tdb, key, dbuf, 0, hash);
654         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
655
656 failed:
657         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
658         SAFE_FREE(dbuf.dptr);
659         return ret;
660 }
661
662
663 /*
664   return the name of the current tdb file
665   useful for external logging functions
666 */
667 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
668 {
669         return tdb->name;
670 }
671
672 /*
673   return the underlying file descriptor being used by tdb, or -1
674   useful for external routines that want to check the device/inode
675   of the fd
676 */
677 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
678 {
679         return tdb->fd;
680 }
681
682 /*
683   return the current logging function
684   useful for external tdb routines that wish to log tdb errors
685 */
686 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
687 {
688         return tdb->log.log_fn;
689 }
690
691
692 /*
693   get the tdb sequence number. Only makes sense if the writers opened
694   with TDB_SEQNUM set. Note that this sequence number will wrap quite
695   quickly, so it should only be used for a 'has something changed'
696   test, not for code that relies on the count of the number of changes
697   made. If you want a counter then use a tdb record.
698
699   The aim of this sequence number is to allow for a very lightweight
700   test of a possible tdb change.
701 */
702 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
703 {
704         tdb_off_t seqnum=0;
705
706         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
707         return seqnum;
708 }
709
710 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
711 {
712         return tdb->header.hash_size;
713 }
714
715 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
716 {
717         return tdb->map_size;
718 }
719
720 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
721 {
722         return tdb->flags;
723 }
724
725 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
726 {
727         if ((flags & TDB_ALLOW_NESTING) &&
728             (flags & TDB_DISALLOW_NESTING)) {
729                 tdb->ecode = TDB_ERR_NESTING;
730                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
731                         "allow_nesting and disallow_nesting are not allowed together!"));
732                 return;
733         }
734
735         if (flags & TDB_ALLOW_NESTING) {
736                 tdb->flags &= ~TDB_DISALLOW_NESTING;
737         }
738         if (flags & TDB_DISALLOW_NESTING) {
739                 tdb->flags &= ~TDB_ALLOW_NESTING;
740         }
741
742         tdb->flags |= flags;
743 }
744
745 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
746 {
747         if ((flags & TDB_ALLOW_NESTING) &&
748             (flags & TDB_DISALLOW_NESTING)) {
749                 tdb->ecode = TDB_ERR_NESTING;
750                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
751                         "allow_nesting and disallow_nesting are not allowed together!"));
752                 return;
753         }
754
755         if (flags & TDB_ALLOW_NESTING) {
756                 tdb->flags |= TDB_DISALLOW_NESTING;
757         }
758         if (flags & TDB_DISALLOW_NESTING) {
759                 tdb->flags |= TDB_ALLOW_NESTING;
760         }
761
762         tdb->flags &= ~flags;
763 }
764
765
766 /*
767   enable sequence number handling on an open tdb
768 */
769 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
770 {
771         tdb->flags |= TDB_SEQNUM;
772 }
773
774
775 /*
776   add a region of the file to the freelist. Length is the size of the region in bytes, 
777   which includes the free list header that needs to be added
778  */
779 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
780 {
781         struct tdb_record rec;
782         if (length <= sizeof(rec)) {
783                 /* the region is not worth adding */
784                 return 0;
785         }
786         if (length + offset > tdb->map_size) {
787                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
788                 return -1;              
789         }
790         memset(&rec,'\0',sizeof(rec));
791         rec.rec_len = length - sizeof(rec);
792         if (tdb_free(tdb, offset, &rec) == -1) {
793                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
794                 return -1;
795         }
796         return 0;
797 }
798
799 /*
800   wipe the entire database, deleting all records. This can be done
801   very fast by using a allrecord lock. The entire data portion of the
802   file becomes a single entry in the freelist.
803
804   This code carefully steps around the recovery area, leaving it alone
805  */
806 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
807 {
808         int i;
809         tdb_off_t offset = 0;
810         ssize_t data_len;
811         tdb_off_t recovery_head;
812         tdb_len_t recovery_size = 0;
813
814         if (tdb_lockall(tdb) != 0) {
815                 return -1;
816         }
817
818         tdb_trace(tdb, "tdb_wipe_all");
819
820         /* see if the tdb has a recovery area, and remember its size
821            if so. We don't want to lose this as otherwise each
822            tdb_wipe_all() in a transaction will increase the size of
823            the tdb by the size of the recovery area */
824         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
825                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
826                 goto failed;
827         }
828
829         if (recovery_head != 0) {
830                 struct tdb_record rec;
831                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
832                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
833                         return -1;
834                 }       
835                 recovery_size = rec.rec_len + sizeof(rec);
836         }
837
838         /* wipe the hashes */
839         for (i=0;i<tdb->header.hash_size;i++) {
840                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
841                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
842                         goto failed;
843                 }
844         }
845
846         /* wipe the freelist */
847         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
848                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
849                 goto failed;
850         }
851
852         /* add all the rest of the file to the freelist, possibly leaving a gap 
853            for the recovery area */
854         if (recovery_size == 0) {
855                 /* the simple case - the whole file can be used as a freelist */
856                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
857                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
858                         goto failed;
859                 }
860         } else {
861                 /* we need to add two freelist entries - one on either
862                    side of the recovery area 
863
864                    Note that we cannot shift the recovery area during
865                    this operation. Only the transaction.c code may
866                    move the recovery area or we risk subtle data
867                    corruption
868                 */
869                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
870                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
871                         goto failed;
872                 }
873                 /* and the 2nd free list entry after the recovery area - if any */
874                 data_len = tdb->map_size - (recovery_head+recovery_size);
875                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
876                         goto failed;
877                 }
878         }
879
880         tdb_increment_seqnum_nonblock(tdb);
881
882         if (tdb_unlockall(tdb) != 0) {
883                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
884                 goto failed;
885         }
886
887         return 0;
888
889 failed:
890         tdb_unlockall(tdb);
891         return -1;
892 }
893
894 struct traverse_state {
895         bool error;
896         struct tdb_context *dest_db;
897 };
898
899 /*
900   traverse function for repacking
901  */
902 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
903 {
904         struct traverse_state *state = (struct traverse_state *)private_data;
905         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
906                 state->error = true;
907                 return -1;
908         }
909         return 0;
910 }
911
912 /*
913   repack a tdb
914  */
915 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
916 {
917         struct tdb_context *tmp_db;
918         struct traverse_state state;
919
920         tdb_trace(tdb, "tdb_repack");
921
922         if (tdb_transaction_start(tdb) != 0) {
923                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
924                 return -1;
925         }
926
927         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
928         if (tmp_db == NULL) {
929                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
930                 tdb_transaction_cancel(tdb);
931                 return -1;
932         }
933
934         state.error = false;
935         state.dest_db = tmp_db;
936
937         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
938                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
939                 tdb_transaction_cancel(tdb);
940                 tdb_close(tmp_db);
941                 return -1;              
942         }
943
944         if (state.error) {
945                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
946                 tdb_transaction_cancel(tdb);
947                 tdb_close(tmp_db);
948                 return -1;
949         }
950
951         if (tdb_wipe_all(tdb) != 0) {
952                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
953                 tdb_transaction_cancel(tdb);
954                 tdb_close(tmp_db);
955                 return -1;
956         }
957
958         state.error = false;
959         state.dest_db = tdb;
960
961         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
962                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
963                 tdb_transaction_cancel(tdb);
964                 tdb_close(tmp_db);
965                 return -1;              
966         }
967
968         if (state.error) {
969                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
970                 tdb_transaction_cancel(tdb);
971                 tdb_close(tmp_db);
972                 return -1;
973         }
974
975         tdb_close(tmp_db);
976
977         if (tdb_transaction_commit(tdb) != 0) {
978                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
979                 return -1;
980         }
981
982         return 0;
983 }
984
985 /* Even on files, we can get partial writes due to signals. */
986 bool tdb_write_all(int fd, const void *buf, size_t count)
987 {
988         while (count) {
989                 ssize_t ret;
990                 ret = write(fd, buf, count);
991                 if (ret < 0)
992                         return false;
993                 buf = (const char *)buf + ret;
994                 count -= ret;
995         }
996         return true;
997 }
998
999 #ifdef TDB_TRACE
1000 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1001 {
1002         if (!tdb_write_alltdb->tracefd, str, strlen(str)) {
1003                 close(tdb->tracefd);
1004                 tdb->tracefd = -1;
1005         }
1006 }
1007
1008 static void tdb_trace_start(struct tdb_context *tdb)
1009 {
1010         tdb_off_t seqnum=0;
1011         char msg[sizeof(tdb_off_t) * 4 + 1];
1012
1013         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1014         snprintf(msg, sizeof(msg), "%u ", seqnum);
1015         tdb_trace_write(tdb, msg);
1016 }
1017
1018 static void tdb_trace_end(struct tdb_context *tdb)
1019 {
1020         tdb_trace_write(tdb, "\n");
1021 }
1022
1023 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1024 {
1025         char msg[sizeof(ret) * 4 + 4];
1026         snprintf(msg, sizeof(msg), " = %i\n", ret);
1027         tdb_trace_write(tdb, msg);
1028 }
1029
1030 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1031 {
1032         char msg[20 + rec.dsize*2], *p;
1033         unsigned int i;
1034
1035         /* We differentiate zero-length records from non-existent ones. */
1036         if (rec.dptr == NULL) {
1037                 tdb_trace_write(tdb, " NULL");
1038                 return;
1039         }
1040
1041         /* snprintf here is purely cargo-cult programming. */
1042         p = msg;
1043         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1044         for (i = 0; i < rec.dsize; i++)
1045                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1046
1047         tdb_trace_write(tdb, msg);
1048 }
1049
1050 void tdb_trace(struct tdb_context *tdb, const char *op)
1051 {
1052         tdb_trace_start(tdb);
1053         tdb_trace_write(tdb, op);
1054         tdb_trace_end(tdb);
1055 }
1056
1057 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1058 {
1059         char msg[sizeof(tdb_off_t) * 4 + 1];
1060
1061         snprintf(msg, sizeof(msg), "%u ", seqnum);
1062         tdb_trace_write(tdb, msg);
1063         tdb_trace_write(tdb, op);
1064         tdb_trace_end(tdb);
1065 }
1066
1067 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1068                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1069 {
1070         char msg[128];
1071
1072         snprintf(msg, sizeof(msg),
1073                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1074         tdb_trace_start(tdb);
1075         tdb_trace_write(tdb, msg);
1076         tdb_trace_end(tdb);
1077 }
1078
1079 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1080 {
1081         tdb_trace_start(tdb);
1082         tdb_trace_write(tdb, op);
1083         tdb_trace_end_ret(tdb, ret);
1084 }
1085
1086 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1087 {
1088         tdb_trace_start(tdb);
1089         tdb_trace_write(tdb, op);
1090         tdb_trace_write(tdb, " =");
1091         tdb_trace_record(tdb, ret);
1092         tdb_trace_end(tdb);
1093 }
1094
1095 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1096                     TDB_DATA rec)
1097 {
1098         tdb_trace_start(tdb);
1099         tdb_trace_write(tdb, op);
1100         tdb_trace_record(tdb, rec);
1101         tdb_trace_end(tdb);
1102 }
1103
1104 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1105                         TDB_DATA rec, int ret)
1106 {
1107         tdb_trace_start(tdb);
1108         tdb_trace_write(tdb, op);
1109         tdb_trace_record(tdb, rec);
1110         tdb_trace_end_ret(tdb, ret);
1111 }
1112
1113 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1114                            TDB_DATA rec, TDB_DATA ret)
1115 {
1116         tdb_trace_start(tdb);
1117         tdb_trace_write(tdb, op);
1118         tdb_trace_record(tdb, rec);
1119         tdb_trace_write(tdb, " =");
1120         tdb_trace_record(tdb, ret);
1121         tdb_trace_end(tdb);
1122 }
1123
1124 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1125                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1126                              int ret)
1127 {
1128         char msg[1 + sizeof(ret) * 4];
1129
1130         snprintf(msg, sizeof(msg), " %#x", flag);
1131         tdb_trace_start(tdb);
1132         tdb_trace_write(tdb, op);
1133         tdb_trace_record(tdb, rec1);
1134         tdb_trace_record(tdb, rec2);
1135         tdb_trace_write(tdb, msg);
1136         tdb_trace_end_ret(tdb, ret);
1137 }
1138
1139 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1140                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1141 {
1142         tdb_trace_start(tdb);
1143         tdb_trace_write(tdb, op);
1144         tdb_trace_record(tdb, rec1);
1145         tdb_trace_record(tdb, rec2);
1146         tdb_trace_write(tdb, " =");
1147         tdb_trace_record(tdb, ret);
1148         tdb_trace_end(tdb);
1149 }
1150 #endif