47a2f070b689125c1821bdea6739ae4ba0a5f6d3
[samba.git] / source / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23    
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifdef STANDALONE
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <fcntl.h>
57 #include <unistd.h>
58 #include <string.h>
59 #include <fcntl.h>
60 #include <errno.h>
61 #include <sys/mman.h>
62 #include <sys/stat.h>
63 #include <signal.h>
64 #include "tdb.h"
65 #include "spinlock.h"
66 #else
67 #include "includes.h"
68
69 #if defined(PARANOID_MALLOC_CHECKER)
70 #ifdef malloc
71 #undef malloc
72 #endif
73
74 #ifdef realloc
75 #undef realloc
76 #endif
77
78 #ifdef calloc
79 #undef calloc
80 #endif
81
82 #ifdef strdup
83 #undef strdup
84 #endif
85
86 #ifdef strndup
87 #undef strndup
88 #endif
89
90 #endif
91
92 #endif
93
94 #define TDB_MAGIC_FOOD "TDB file\n"
95 #define TDB_VERSION (0x26011967 + 6)
96 #define TDB_MAGIC (0x26011999U)
97 #define TDB_FREE_MAGIC (~TDB_MAGIC)
98 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
99 #define TDB_ALIGNMENT 4
100 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
101 #define DEFAULT_HASH_SIZE 131
102 #define TDB_PAGE_SIZE 0x2000
103 #define FREELIST_TOP (sizeof(struct tdb_header))
104 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
105 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
106 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
107 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
108 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
109 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
110
111
112 /* NB assumes there is a local variable called "tdb" that is the
113  * current context, also takes doubly-parenthesized print-style
114  * argument. */
115 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
116
117 /* lock offsets */
118 #define GLOBAL_LOCK 0
119 #define ACTIVE_LOCK 4
120
121 #ifndef MAP_FILE
122 #define MAP_FILE 0
123 #endif
124
125 #ifndef MAP_FAILED
126 #define MAP_FAILED ((void *)-1)
127 #endif
128
129 /* free memory if the pointer is valid and zero the pointer */
130 #ifndef SAFE_FREE
131 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
132 #endif
133
134 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
135 TDB_DATA tdb_null;
136
137 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
138 static TDB_CONTEXT *tdbs = NULL;
139
140 static int tdb_munmap(TDB_CONTEXT *tdb)
141 {
142         if (tdb->flags & TDB_INTERNAL)
143                 return 0;
144
145 #ifdef HAVE_MMAP
146         if (tdb->map_ptr) {
147                 int ret = munmap(tdb->map_ptr, tdb->map_size);
148                 if (ret != 0)
149                         return ret;
150         }
151 #endif
152         tdb->map_ptr = NULL;
153         return 0;
154 }
155
156 static void tdb_mmap(TDB_CONTEXT *tdb)
157 {
158         if (tdb->flags & TDB_INTERNAL)
159                 return;
160
161 #ifdef HAVE_MMAP
162         if (!(tdb->flags & TDB_NOMMAP)) {
163                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
164                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
165                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
166
167                 /*
168                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
169                  */
170
171                 if (tdb->map_ptr == MAP_FAILED) {
172                         tdb->map_ptr = NULL;
173                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
174                                  tdb->map_size, strerror(errno)));
175                 }
176         } else {
177                 tdb->map_ptr = NULL;
178         }
179 #else
180         tdb->map_ptr = NULL;
181 #endif
182 }
183
184 /* Endian conversion: we only ever deal with 4 byte quantities */
185 static void *convert(void *buf, u32 size)
186 {
187         u32 i, *p = buf;
188         for (i = 0; i < size / 4; i++)
189                 p[i] = TDB_BYTEREV(p[i]);
190         return buf;
191 }
192 #define DOCONV() (tdb->flags & TDB_CONVERT)
193 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
194
195 /* the body of the database is made of one list_struct for the free space
196    plus a separate data list for each hash value */
197 struct list_struct {
198         tdb_off next; /* offset of the next record in the list */
199         tdb_len rec_len; /* total byte length of record */
200         tdb_len key_len; /* byte length of key */
201         tdb_len data_len; /* byte length of data */
202         u32 full_hash; /* the full 32 bit hash of the key */
203         u32 magic;   /* try to catch errors */
204         /* the following union is implied:
205                 union {
206                         char record[rec_len];
207                         struct {
208                                 char key[key_len];
209                                 char data[data_len];
210                         }
211                         u32 totalsize; (tailer)
212                 }
213         */
214 };
215
216 /***************************************************************
217  Allow a caller to set a "alarm" flag that tdb can check to abort
218  a blocking lock on SIGALRM.
219 ***************************************************************/
220
221 static sig_atomic_t *palarm_fired;
222
223 void tdb_set_lock_alarm(sig_atomic_t *palarm)
224 {
225         palarm_fired = palarm;
226 }
227
228 /* a byte range locking function - return 0 on success
229    this functions locks/unlocks 1 byte at the specified offset.
230
231    On error, errno is also set so that errors are passed back properly
232    through tdb_open(). */
233 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
234                       int rw_type, int lck_type, int probe)
235 {
236         struct flock fl;
237         int ret;
238
239         if (tdb->flags & TDB_NOLOCK)
240                 return 0;
241         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
242                 errno = EACCES;
243                 return -1;
244         }
245
246         fl.l_type = rw_type;
247         fl.l_whence = SEEK_SET;
248         fl.l_start = offset;
249         fl.l_len = 1;
250         fl.l_pid = 0;
251
252         do {
253                 ret = fcntl(tdb->fd,lck_type,&fl);
254                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
255                         break;
256         } while (ret == -1 && errno == EINTR);
257
258         if (ret == -1) {
259                 if (!probe && lck_type != F_SETLK) {
260                         /* Ensure error code is set for log fun to examine. */
261                         if (errno == EINTR && palarm_fired && *palarm_fired)
262                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
263                         else
264                                 tdb->ecode = TDB_ERR_LOCK;
265                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
266                                  tdb->fd, offset, rw_type, lck_type));
267                 }
268                 /* Was it an alarm timeout ? */
269                 if (errno == EINTR && palarm_fired && *palarm_fired) {
270                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
271                                  tdb->fd, offset, rw_type, lck_type));
272                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
273                 }
274                 /* Otherwise - generic lock error. errno set by fcntl.
275                  * EAGAIN is an expected return from non-blocking
276                  * locks. */
277                 if (errno != EAGAIN) {
278                         TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
279                                  tdb->fd, offset, rw_type, lck_type, 
280                                  strerror(errno)));
281                 }
282                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
283         }
284         return 0;
285 }
286
287 /* lock a list in the database. list -1 is the alloc list */
288 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
289 {
290         if (list < -1 || list >= (int)tdb->header.hash_size) {
291                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
292                            list, ltype));
293                 return -1;
294         }
295         if (tdb->flags & TDB_NOLOCK)
296                 return 0;
297
298         /* Since fcntl locks don't nest, we do a lock for the first one,
299            and simply bump the count for future ones */
300         if (tdb->locked[list+1].count == 0) {
301                 if (!tdb->read_only && tdb->header.rwlocks) {
302                         if (tdb_spinlock(tdb, list, ltype)) {
303                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
304                                            list, ltype));
305                                 return -1;
306                         }
307                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
308                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
309                                            list, ltype, strerror(errno)));
310                         return -1;
311                 }
312                 tdb->locked[list+1].ltype = ltype;
313         }
314         tdb->locked[list+1].count++;
315         return 0;
316 }
317
318 /* unlock the database: returns void because it's too late for errors. */
319         /* changed to return int it may be interesting to know there
320            has been an error  --simo */
321 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
322 {
323         int ret = -1;
324
325         if (tdb->flags & TDB_NOLOCK)
326                 return 0;
327
328         /* Sanity checks */
329         if (list < -1 || list >= (int)tdb->header.hash_size) {
330                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
331                 return ret;
332         }
333
334         if (tdb->locked[list+1].count==0) {
335                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
336                 return ret;
337         }
338
339         if (tdb->locked[list+1].count == 1) {
340                 /* Down to last nested lock: unlock underneath */
341                 if (!tdb->read_only && tdb->header.rwlocks) {
342                         ret = tdb_spinunlock(tdb, list, ltype);
343                 } else {
344                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
345                 }
346         } else {
347                 ret = 0;
348         }
349         tdb->locked[list+1].count--;
350
351         if (ret)
352                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
353         return ret;
354 }
355
356 /* check for an out of bounds access - if it is out of bounds then
357    see if the database has been expanded by someone else and expand
358    if necessary 
359    note that "len" is the minimum length needed for the db
360 */
361 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
362 {
363         struct stat st;
364         if (len <= tdb->map_size)
365                 return 0;
366         if (tdb->flags & TDB_INTERNAL) {
367                 if (!probe) {
368                         /* Ensure ecode is set for log fn. */
369                         tdb->ecode = TDB_ERR_IO;
370                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
371                                  (int)len, (int)tdb->map_size));
372                 }
373                 return TDB_ERRCODE(TDB_ERR_IO, -1);
374         }
375
376         if (fstat(tdb->fd, &st) == -1)
377                 return TDB_ERRCODE(TDB_ERR_IO, -1);
378
379         if (st.st_size < (size_t)len) {
380                 if (!probe) {
381                         /* Ensure ecode is set for log fn. */
382                         tdb->ecode = TDB_ERR_IO;
383                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
384                                  (int)len, (int)st.st_size));
385                 }
386                 return TDB_ERRCODE(TDB_ERR_IO, -1);
387         }
388
389         /* Unmap, update size, remap */
390         if (tdb_munmap(tdb) == -1)
391                 return TDB_ERRCODE(TDB_ERR_IO, -1);
392         tdb->map_size = st.st_size;
393         tdb_mmap(tdb);
394         return 0;
395 }
396
397 /* write a lump of data at a specified offset */
398 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
399 {
400         if (tdb_oob(tdb, off + len, 0) != 0)
401                 return -1;
402
403         if (tdb->map_ptr)
404                 memcpy(off + (char *)tdb->map_ptr, buf, len);
405 #ifdef HAVE_PWRITE
406         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
407 #else
408         else if (lseek(tdb->fd, off, SEEK_SET) != off
409                  || write(tdb->fd, buf, len) != (ssize_t)len) {
410 #endif
411                 /* Ensure ecode is set for log fn. */
412                 tdb->ecode = TDB_ERR_IO;
413                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
414                            off, len, strerror(errno)));
415                 return TDB_ERRCODE(TDB_ERR_IO, -1);
416         }
417         return 0;
418 }
419
420 /* read a lump of data at a specified offset, maybe convert */
421 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
422 {
423         if (tdb_oob(tdb, off + len, 0) != 0)
424                 return -1;
425
426         if (tdb->map_ptr)
427                 memcpy(buf, off + (char *)tdb->map_ptr, len);
428 #ifdef HAVE_PREAD
429         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
430 #else
431         else if (lseek(tdb->fd, off, SEEK_SET) != off
432                  || read(tdb->fd, buf, len) != (ssize_t)len) {
433 #endif
434                 /* Ensure ecode is set for log fn. */
435                 tdb->ecode = TDB_ERR_IO;
436                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
437                            off, len, strerror(errno)));
438                 return TDB_ERRCODE(TDB_ERR_IO, -1);
439         }
440         if (cv)
441                 convert(buf, len);
442         return 0;
443 }
444
445 /* read a lump of data, allocating the space for it */
446 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
447 {
448         char *buf;
449
450         if (!(buf = malloc(len))) {
451                 /* Ensure ecode is set for log fn. */
452                 tdb->ecode = TDB_ERR_OOM;
453                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
454                            len, strerror(errno)));
455                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
456         }
457         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
458                 SAFE_FREE(buf);
459                 return NULL;
460         }
461         return buf;
462 }
463
464 /* read/write a tdb_off */
465 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
466 {
467         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
468 }
469 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
470 {
471         tdb_off off = *d;
472         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
473 }
474
475 /* read/write a record */
476 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
477 {
478         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
479                 return -1;
480         if (TDB_BAD_MAGIC(rec)) {
481                 /* Ensure ecode is set for log fn. */
482                 tdb->ecode = TDB_ERR_CORRUPT;
483                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
484                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
485         }
486         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
487 }
488 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
489 {
490         struct list_struct r = *rec;
491         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
492 }
493
494 /* read a freelist record and check for simple errors */
495 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
496 {
497         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
498                 return -1;
499
500         if (rec->magic == TDB_MAGIC) {
501                 /* this happens when a app is showdown while deleting a record - we should
502                    not completely fail when this happens */
503                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
504                          rec->magic, off));
505                 rec->magic = TDB_FREE_MAGIC;
506                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
507                         return -1;
508         }
509
510         if (rec->magic != TDB_FREE_MAGIC) {
511                 /* Ensure ecode is set for log fn. */
512                 tdb->ecode = TDB_ERR_CORRUPT;
513                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
514                            rec->magic, off));
515                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
516         }
517         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
518                 return -1;
519         return 0;
520 }
521
522 /* update a record tailer (must hold allocation lock) */
523 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
524                          const struct list_struct *rec)
525 {
526         tdb_off totalsize;
527
528         /* Offset of tailer from record header */
529         totalsize = sizeof(*rec) + rec->rec_len;
530         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
531                          &totalsize);
532 }
533
534 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
535 {
536         struct list_struct rec;
537         tdb_off tailer_ofs, tailer;
538
539         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
540                 printf("ERROR: failed to read record at %u\n", offset);
541                 return 0;
542         }
543
544         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
545                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
546
547         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
548         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
549                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
550                 return rec.next;
551         }
552
553         if (tailer != rec.rec_len + sizeof(rec)) {
554                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
555                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
556         }
557         return rec.next;
558 }
559
560 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
561 {
562         tdb_off rec_ptr, top;
563
564         top = TDB_HASH_TOP(i);
565
566         if (tdb_lock(tdb, i, F_WRLCK) != 0)
567                 return -1;
568
569         if (ofs_read(tdb, top, &rec_ptr) == -1)
570                 return tdb_unlock(tdb, i, F_WRLCK);
571
572         if (rec_ptr)
573                 printf("hash=%d\n", i);
574
575         while (rec_ptr) {
576                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
577         }
578
579         return tdb_unlock(tdb, i, F_WRLCK);
580 }
581
582 void tdb_dump_all(TDB_CONTEXT *tdb)
583 {
584         int i;
585         for (i=0;i<tdb->header.hash_size;i++) {
586                 tdb_dump_chain(tdb, i);
587         }
588         tdb_printfreelist(tdb);
589 }
590
591 int tdb_printfreelist(TDB_CONTEXT *tdb)
592 {
593         int ret;
594         long total_free = 0;
595         tdb_off offset, rec_ptr;
596         struct list_struct rec;
597
598         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
599                 return ret;
600
601         offset = FREELIST_TOP;
602
603         /* read in the freelist top */
604         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
605                 tdb_unlock(tdb, -1, F_WRLCK);
606                 return 0;
607         }
608
609         printf("freelist top=[0x%08x]\n", rec_ptr );
610         while (rec_ptr) {
611                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
612                         tdb_unlock(tdb, -1, F_WRLCK);
613                         return -1;
614                 }
615
616                 if (rec.magic != TDB_FREE_MAGIC) {
617                         printf("bad magic 0x%08x in free list\n", rec.magic);
618                         tdb_unlock(tdb, -1, F_WRLCK);
619                         return -1;
620                 }
621
622                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
623                 total_free += rec.rec_len;
624
625                 /* move to the next record */
626                 rec_ptr = rec.next;
627         }
628         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
629                (int)total_free);
630
631         return tdb_unlock(tdb, -1, F_WRLCK);
632 }
633
634 /* Remove an element from the freelist.  Must have alloc lock. */
635 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
636 {
637         tdb_off last_ptr, i;
638
639         /* read in the freelist top */
640         last_ptr = FREELIST_TOP;
641         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
642                 if (i == off) {
643                         /* We've found it! */
644                         return ofs_write(tdb, last_ptr, &next);
645                 }
646                 /* Follow chain (next offset is at start of record) */
647                 last_ptr = i;
648         }
649         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
650         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
651 }
652
653 /* Add an element into the freelist. Merge adjacent records if
654    neccessary. */
655 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
656 {
657         tdb_off right, left;
658
659         /* Allocation and tailer lock */
660         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
661                 return -1;
662
663         /* set an initial tailer, so if we fail we don't leave a bogus record */
664         if (update_tailer(tdb, offset, rec) != 0) {
665                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
666                 goto fail;
667         }
668
669         /* Look right first (I'm an Australian, dammit) */
670         right = offset + sizeof(*rec) + rec->rec_len;
671         if (right + sizeof(*rec) <= tdb->map_size) {
672                 struct list_struct r;
673
674                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
675                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
676                         goto left;
677                 }
678
679                 /* If it's free, expand to include it. */
680                 if (r.magic == TDB_FREE_MAGIC) {
681                         if (remove_from_freelist(tdb, right, r.next) == -1) {
682                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
683                                 goto left;
684                         }
685                         rec->rec_len += sizeof(r) + r.rec_len;
686                 }
687         }
688
689 left:
690         /* Look left */
691         left = offset - sizeof(tdb_off);
692         if (left > TDB_DATA_START(tdb->header.hash_size)) {
693                 struct list_struct l;
694                 tdb_off leftsize;
695                 
696                 /* Read in tailer and jump back to header */
697                 if (ofs_read(tdb, left, &leftsize) == -1) {
698                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
699                         goto update;
700                 }
701                 left = offset - leftsize;
702
703                 /* Now read in record */
704                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
705                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
706                         goto update;
707                 }
708
709                 /* If it's free, expand to include it. */
710                 if (l.magic == TDB_FREE_MAGIC) {
711                         if (remove_from_freelist(tdb, left, l.next) == -1) {
712                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
713                                 goto update;
714                         } else {
715                                 offset = left;
716                                 rec->rec_len += leftsize;
717                         }
718                 }
719         }
720
721 update:
722         if (update_tailer(tdb, offset, rec) == -1) {
723                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
724                 goto fail;
725         }
726
727         /* Now, prepend to free list */
728         rec->magic = TDB_FREE_MAGIC;
729
730         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
731             rec_write(tdb, offset, rec) == -1 ||
732             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
733                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
734                 goto fail;
735         }
736
737         /* And we're done. */
738         tdb_unlock(tdb, -1, F_WRLCK);
739         return 0;
740
741  fail:
742         tdb_unlock(tdb, -1, F_WRLCK);
743         return -1;
744 }
745
746
747 /* expand a file.  we prefer to use ftruncate, as that is what posix
748   says to use for mmap expansion */
749 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
750 {
751         char buf[1024];
752 #if HAVE_FTRUNCATE_EXTEND
753         if (ftruncate(tdb->fd, size+addition) != 0) {
754                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
755                            size+addition, strerror(errno)));
756                 return -1;
757         }
758 #else
759         char b = 0;
760
761 #ifdef HAVE_PWRITE
762         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
763 #else
764         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
765             write(tdb->fd, &b, 1) != 1) {
766 #endif
767                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
768                            size+addition, strerror(errno)));
769                 return -1;
770         }
771 #endif
772
773         /* now fill the file with something. This ensures that the file isn't sparse, which would be
774            very bad if we ran out of disk. This must be done with write, not via mmap */
775         memset(buf, 0x42, sizeof(buf));
776         while (addition) {
777                 int n = addition>sizeof(buf)?sizeof(buf):addition;
778 #ifdef HAVE_PWRITE
779                 int ret = pwrite(tdb->fd, buf, n, size);
780 #else
781                 int ret;
782                 if (lseek(tdb->fd, size, SEEK_SET) != size)
783                         return -1;
784                 ret = write(tdb->fd, buf, n);
785 #endif
786                 if (ret != n) {
787                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
788                                    n, strerror(errno)));
789                         return -1;
790                 }
791                 addition -= n;
792                 size += n;
793         }
794         return 0;
795 }
796
797
798 /* expand the database at least size bytes by expanding the underlying
799    file and doing the mmap again if necessary */
800 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
801 {
802         struct list_struct rec;
803         tdb_off offset;
804
805         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
806                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
807                 return -1;
808         }
809
810         /* must know about any previous expansions by another process */
811         tdb_oob(tdb, tdb->map_size + 1, 1);
812
813         /* always make room for at least 10 more records, and round
814            the database up to a multiple of TDB_PAGE_SIZE */
815         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
816
817         if (!(tdb->flags & TDB_INTERNAL))
818                 tdb_munmap(tdb);
819
820         /*
821          * We must ensure the file is unmapped before doing this
822          * to ensure consistency with systems like OpenBSD where
823          * writes and mmaps are not consistent.
824          */
825
826         /* expand the file itself */
827         if (!(tdb->flags & TDB_INTERNAL)) {
828                 if (expand_file(tdb, tdb->map_size, size) != 0)
829                         goto fail;
830         }
831
832         tdb->map_size += size;
833
834         if (tdb->flags & TDB_INTERNAL) {
835                 char *new_map_ptr = realloc(tdb->map_ptr, tdb->map_size);
836                 if (!new_map_ptr) {
837                         tdb->map_size -= size;
838                         goto fail;
839                 }
840                 tdb->map_ptr = new_map_ptr;
841         } else {
842                 /*
843                  * We must ensure the file is remapped before adding the space
844                  * to ensure consistency with systems like OpenBSD where
845                  * writes and mmaps are not consistent.
846                  */
847
848                 /* We're ok if the mmap fails as we'll fallback to read/write */
849                 tdb_mmap(tdb);
850         }
851
852         /* form a new freelist record */
853         memset(&rec,'\0',sizeof(rec));
854         rec.rec_len = size - sizeof(rec);
855
856         /* link it into the free list */
857         offset = tdb->map_size - size;
858         if (tdb_free(tdb, offset, &rec) == -1)
859                 goto fail;
860
861         tdb_unlock(tdb, -1, F_WRLCK);
862         return 0;
863  fail:
864         tdb_unlock(tdb, -1, F_WRLCK);
865         return -1;
866 }
867
868 /* allocate some space from the free list. The offset returned points
869    to a unconnected list_struct within the database with room for at
870    least length bytes of total data
871
872    0 is returned if the space could not be allocated
873  */
874 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
875                             struct list_struct *rec)
876 {
877         tdb_off rec_ptr, last_ptr, newrec_ptr;
878         struct list_struct newrec;
879
880         memset(&newrec, '\0', sizeof(newrec));
881
882         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
883                 return 0;
884
885         /* Extra bytes required for tailer */
886         length += sizeof(tdb_off);
887
888  again:
889         last_ptr = FREELIST_TOP;
890
891         /* read in the freelist top */
892         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
893                 goto fail;
894
895         /* keep looking until we find a freelist record big enough */
896         while (rec_ptr) {
897                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
898                         goto fail;
899
900                 if (rec->rec_len >= length) {
901                         /* found it - now possibly split it up  */
902                         if (rec->rec_len > length + MIN_REC_SIZE) {
903                                 /* Length of left piece */
904                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
905
906                                 /* Right piece to go on free list */
907                                 newrec.rec_len = rec->rec_len
908                                         - (sizeof(*rec) + length);
909                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
910
911                                 /* And left record is shortened */
912                                 rec->rec_len = length;
913                         } else
914                                 newrec_ptr = 0;
915
916                         /* Remove allocated record from the free list */
917                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
918                                 goto fail;
919
920                         /* Update header: do this before we drop alloc
921                            lock, otherwise tdb_free() might try to
922                            merge with us, thinking we're free.
923                            (Thanks Jeremy Allison). */
924                         rec->magic = TDB_MAGIC;
925                         if (rec_write(tdb, rec_ptr, rec) == -1)
926                                 goto fail;
927
928                         /* Did we create new block? */
929                         if (newrec_ptr) {
930                                 /* Update allocated record tailer (we
931                                    shortened it). */
932                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
933                                         goto fail;
934
935                                 /* Free new record */
936                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
937                                         goto fail;
938                         }
939
940                         /* all done - return the new record offset */
941                         tdb_unlock(tdb, -1, F_WRLCK);
942                         return rec_ptr;
943                 }
944                 /* move to the next record */
945                 last_ptr = rec_ptr;
946                 rec_ptr = rec->next;
947         }
948         /* we didn't find enough space. See if we can expand the
949            database and if we can then try again */
950         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
951                 goto again;
952  fail:
953         tdb_unlock(tdb, -1, F_WRLCK);
954         return 0;
955 }
956
957 /* initialise a new database with a specified hash size */
958 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
959 {
960         struct tdb_header *newdb;
961         int size, ret = -1;
962
963         /* We make it up in memory, then write it out if not internal */
964         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
965         if (!(newdb = calloc(size, 1)))
966                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
967
968         /* Fill in the header */
969         newdb->version = TDB_VERSION;
970         newdb->hash_size = hash_size;
971 #ifdef USE_SPINLOCKS
972         newdb->rwlocks = size;
973 #endif
974         if (tdb->flags & TDB_INTERNAL) {
975                 tdb->map_size = size;
976                 tdb->map_ptr = (char *)newdb;
977                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
978                 /* Convert the `ondisk' version if asked. */
979                 CONVERT(*newdb);
980                 return 0;
981         }
982         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
983                 goto fail;
984
985         if (ftruncate(tdb->fd, 0) == -1)
986                 goto fail;
987
988         /* This creates an endian-converted header, as if read from disk */
989         CONVERT(*newdb);
990         memcpy(&tdb->header, newdb, sizeof(tdb->header));
991         /* Don't endian-convert the magic food! */
992         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
993         if (write(tdb->fd, newdb, size) != size)
994                 ret = -1;
995         else
996                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
997
998   fail:
999         SAFE_FREE(newdb);
1000         return ret;
1001 }
1002
1003 /* Returns 0 on fail.  On success, return offset of record, and fills
1004    in rec */
1005 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1006                         struct list_struct *r)
1007 {
1008         tdb_off rec_ptr;
1009         
1010         /* read in the hash top */
1011         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1012                 return 0;
1013
1014         /* keep looking until we find the right record */
1015         while (rec_ptr) {
1016                 if (rec_read(tdb, rec_ptr, r) == -1)
1017                         return 0;
1018
1019                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1020                         char *k;
1021                         /* a very likely hit - read the key */
1022                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1023                                            r->key_len);
1024                         if (!k)
1025                                 return 0;
1026
1027                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1028                                 SAFE_FREE(k);
1029                                 return rec_ptr;
1030                         }
1031                         SAFE_FREE(k);
1032                 }
1033                 rec_ptr = r->next;
1034         }
1035         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1036 }
1037
1038 /* As tdb_find, but if you succeed, keep the lock */
1039 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1040                              struct list_struct *rec)
1041 {
1042         u32 rec_ptr;
1043
1044         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1045                 return 0;
1046         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1047                 tdb_unlock(tdb, BUCKET(hash), locktype);
1048         return rec_ptr;
1049 }
1050
1051 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1052 {
1053         return tdb->ecode;
1054 }
1055
1056 static struct tdb_errname {
1057         enum TDB_ERROR ecode; const char *estring;
1058 } emap[] = { {TDB_SUCCESS, "Success"},
1059              {TDB_ERR_CORRUPT, "Corrupt database"},
1060              {TDB_ERR_IO, "IO Error"},
1061              {TDB_ERR_LOCK, "Locking error"},
1062              {TDB_ERR_OOM, "Out of memory"},
1063              {TDB_ERR_EXISTS, "Record exists"},
1064              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1065              {TDB_ERR_NOEXIST, "Record does not exist"} };
1066
1067 /* Error string for the last tdb error */
1068 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1069 {
1070         u32 i;
1071         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1072                 if (tdb->ecode == emap[i].ecode)
1073                         return emap[i].estring;
1074         return "Invalid error code";
1075 }
1076
1077 /* update an entry in place - this only works if the new data size
1078    is <= the old data size and the key exists.
1079    on failure return -1.
1080 */
1081
1082 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1083 {
1084         struct list_struct rec;
1085         tdb_off rec_ptr;
1086
1087         /* find entry */
1088         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1089                 return -1;
1090
1091         /* must be long enough key, data and tailer */
1092         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1093                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1094                 return -1;
1095         }
1096
1097         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1098                       dbuf.dptr, dbuf.dsize) == -1)
1099                 return -1;
1100
1101         if (dbuf.dsize != rec.data_len) {
1102                 /* update size */
1103                 rec.data_len = dbuf.dsize;
1104                 return rec_write(tdb, rec_ptr, &rec);
1105         }
1106  
1107         return 0;
1108 }
1109
1110 /* find an entry in the database given a key */
1111 /* If an entry doesn't exist tdb_err will be set to
1112  * TDB_ERR_NOEXIST. If a key has no data attached
1113  * tdb_err will not be set. Both will return a
1114  * zero pptr and zero dsize.
1115  */
1116
1117 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1118 {
1119         tdb_off rec_ptr;
1120         struct list_struct rec;
1121         TDB_DATA ret;
1122         u32 hash;
1123
1124         /* find which hash bucket it is in */
1125         hash = tdb->hash_fn(&key);
1126         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1127                 return tdb_null;
1128
1129         if (rec.data_len)
1130                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1131                                           rec.data_len);
1132         else
1133                 ret.dptr = NULL;
1134         ret.dsize = rec.data_len;
1135         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1136         return ret;
1137 }
1138
1139 /* check if an entry in the database exists 
1140
1141    note that 1 is returned if the key is found and 0 is returned if not found
1142    this doesn't match the conventions in the rest of this module, but is
1143    compatible with gdbm
1144 */
1145 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1146 {
1147         struct list_struct rec;
1148         
1149         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1150                 return 0;
1151         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1152         return 1;
1153 }
1154
1155 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1156 {
1157         u32 hash = tdb->hash_fn(&key);
1158         return tdb_exists_hash(tdb, key, hash);
1159 }
1160
1161 /* record lock stops delete underneath */
1162 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1163 {
1164         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1165 }
1166 /*
1167   Write locks override our own fcntl readlocks, so check it here.
1168   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1169   an error to fail to get the lock here.
1170 */
1171  
1172 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1173 {
1174         struct tdb_traverse_lock *i;
1175         for (i = &tdb->travlocks; i; i = i->next)
1176                 if (i->off == off)
1177                         return -1;
1178         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1179 }
1180
1181 /*
1182   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1183   an error to fail to get the lock here.
1184 */
1185
1186 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1187 {
1188         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1189 }
1190 /* fcntl locks don't stack: avoid unlocking someone else's */
1191 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1192 {
1193         struct tdb_traverse_lock *i;
1194         u32 count = 0;
1195
1196         if (off == 0)
1197                 return 0;
1198         for (i = &tdb->travlocks; i; i = i->next)
1199                 if (i->off == off)
1200                         count++;
1201         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1202 }
1203
1204 /* actually delete an entry in the database given the offset */
1205 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1206 {
1207         tdb_off last_ptr, i;
1208         struct list_struct lastrec;
1209
1210         if (tdb->read_only) return -1;
1211
1212         if (write_lock_record(tdb, rec_ptr) == -1) {
1213                 /* Someone traversing here: mark it as dead */
1214                 rec->magic = TDB_DEAD_MAGIC;
1215                 return rec_write(tdb, rec_ptr, rec);
1216         }
1217         if (write_unlock_record(tdb, rec_ptr) != 0)
1218                 return -1;
1219
1220         /* find previous record in hash chain */
1221         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1222                 return -1;
1223         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1224                 if (rec_read(tdb, i, &lastrec) == -1)
1225                         return -1;
1226
1227         /* unlink it: next ptr is at start of record. */
1228         if (last_ptr == 0)
1229                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1230         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1231                 return -1;
1232
1233         /* recover the space */
1234         if (tdb_free(tdb, rec_ptr, rec) == -1)
1235                 return -1;
1236         return 0;
1237 }
1238
1239 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1240 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1241                          struct list_struct *rec)
1242 {
1243         int want_next = (tlock->off != 0);
1244
1245         /* Lock each chain from the start one. */
1246         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1247                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1248                         return -1;
1249
1250                 /* No previous record?  Start at top of chain. */
1251                 if (!tlock->off) {
1252                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1253                                      &tlock->off) == -1)
1254                                 goto fail;
1255                 } else {
1256                         /* Otherwise unlock the previous record. */
1257                         if (unlock_record(tdb, tlock->off) != 0)
1258                                 goto fail;
1259                 }
1260
1261                 if (want_next) {
1262                         /* We have offset of old record: grab next */
1263                         if (rec_read(tdb, tlock->off, rec) == -1)
1264                                 goto fail;
1265                         tlock->off = rec->next;
1266                 }
1267
1268                 /* Iterate through chain */
1269                 while( tlock->off) {
1270                         tdb_off current;
1271                         if (rec_read(tdb, tlock->off, rec) == -1)
1272                                 goto fail;
1273
1274                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
1275                         if (tlock->off == rec->next) {
1276                                 TDB_LOG((tdb, 0, "tdb_next_lock: loop detected.\n"));
1277                                 goto fail;
1278                         }
1279
1280                         if (!TDB_DEAD(rec)) {
1281                                 /* Woohoo: we found one! */
1282                                 if (lock_record(tdb, tlock->off) != 0)
1283                                         goto fail;
1284                                 return tlock->off;
1285                         }
1286
1287                         /* Try to clean dead ones from old traverses */
1288                         current = tlock->off;
1289                         tlock->off = rec->next;
1290                         if (!tdb->read_only && 
1291                             do_delete(tdb, current, rec) != 0)
1292                                 goto fail;
1293                 }
1294                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1295                 want_next = 0;
1296         }
1297         /* We finished iteration without finding anything */
1298         return TDB_ERRCODE(TDB_SUCCESS, 0);
1299
1300  fail:
1301         tlock->off = 0;
1302         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1303                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1304         return -1;
1305 }
1306
1307 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1308    return -1 on error or the record count traversed
1309    if fn is NULL then it is not called
1310    a non-zero return value from fn() indicates that the traversal should stop
1311   */
1312 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1313 {
1314         TDB_DATA key, dbuf;
1315         struct list_struct rec;
1316         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1317         int ret, count = 0;
1318
1319         /* This was in the initializaton, above, but the IRIX compiler
1320          * did not like it.  crh
1321          */
1322         tl.next = tdb->travlocks.next;
1323
1324         /* fcntl locks don't stack: beware traverse inside traverse */
1325         tdb->travlocks.next = &tl;
1326
1327         /* tdb_next_lock places locks on the record returned, and its chain */
1328         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1329                 count++;
1330                 /* now read the full record */
1331                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1332                                           rec.key_len + rec.data_len);
1333                 if (!key.dptr) {
1334                         ret = -1;
1335                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1336                                 goto out;
1337                         if (unlock_record(tdb, tl.off) != 0)
1338                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1339                         goto out;
1340                 }
1341                 key.dsize = rec.key_len;
1342                 dbuf.dptr = key.dptr + rec.key_len;
1343                 dbuf.dsize = rec.data_len;
1344
1345                 /* Drop chain lock, call out */
1346                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1347                         ret = -1;
1348                         goto out;
1349                 }
1350                 if (fn && fn(tdb, key, dbuf, private)) {
1351                         /* They want us to terminate traversal */
1352                         ret = count;
1353                         if (unlock_record(tdb, tl.off) != 0) {
1354                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1355                                 ret = -1;
1356                         }
1357                         tdb->travlocks.next = tl.next;
1358                         SAFE_FREE(key.dptr);
1359                         return count;
1360                 }
1361                 SAFE_FREE(key.dptr);
1362         }
1363 out:
1364         tdb->travlocks.next = tl.next;
1365         if (ret < 0)
1366                 return -1;
1367         else
1368                 return count;
1369 }
1370
1371 /* find the first entry in the database and return its key */
1372 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1373 {
1374         TDB_DATA key;
1375         struct list_struct rec;
1376
1377         /* release any old lock */
1378         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1379                 return tdb_null;
1380         tdb->travlocks.off = tdb->travlocks.hash = 0;
1381
1382         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1383                 return tdb_null;
1384         /* now read the key */
1385         key.dsize = rec.key_len;
1386         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1387         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1388                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1389         return key;
1390 }
1391
1392 /* find the next entry in the database, returning its key */
1393 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1394 {
1395         u32 oldhash;
1396         TDB_DATA key = tdb_null;
1397         struct list_struct rec;
1398         char *k = NULL;
1399
1400         /* Is locked key the old key?  If so, traverse will be reliable. */
1401         if (tdb->travlocks.off) {
1402                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1403                         return tdb_null;
1404                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1405                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1406                                             rec.key_len))
1407                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1408                         /* No, it wasn't: unlock it and start from scratch */
1409                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1410                                 return tdb_null;
1411                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1412                                 return tdb_null;
1413                         tdb->travlocks.off = 0;
1414                 }
1415
1416                 SAFE_FREE(k);
1417         }
1418
1419         if (!tdb->travlocks.off) {
1420                 /* No previous element: do normal find, and lock record */
1421                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
1422                 if (!tdb->travlocks.off)
1423                         return tdb_null;
1424                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1425                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1426                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1427                         return tdb_null;
1428                 }
1429         }
1430         oldhash = tdb->travlocks.hash;
1431
1432         /* Grab next record: locks chain and returned record,
1433            unlocks old record */
1434         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1435                 key.dsize = rec.key_len;
1436                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1437                                           key.dsize);
1438                 /* Unlock the chain of this new record */
1439                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1440                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1441         }
1442         /* Unlock the chain of old record */
1443         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1444                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1445         return key;
1446 }
1447
1448 /* delete an entry in the database given a key */
1449 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1450 {
1451         tdb_off rec_ptr;
1452         struct list_struct rec;
1453         int ret;
1454
1455         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1456                 return -1;
1457         ret = do_delete(tdb, rec_ptr, &rec);
1458         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1459                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1460         return ret;
1461 }
1462
1463 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1464 {
1465         u32 hash = tdb->hash_fn(&key);
1466         return tdb_delete_hash(tdb, key, hash);
1467 }
1468
1469 /* store an element in the database, replacing any existing element
1470    with the same key 
1471
1472    return 0 on success, -1 on failure
1473 */
1474 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1475 {
1476         struct list_struct rec;
1477         u32 hash;
1478         tdb_off rec_ptr;
1479         char *p = NULL;
1480         int ret = 0;
1481
1482         /* find which hash bucket it is in */
1483         hash = tdb->hash_fn(&key);
1484         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1485                 return -1;
1486
1487         /* check for it existing, on insert. */
1488         if (flag == TDB_INSERT) {
1489                 if (tdb_exists_hash(tdb, key, hash)) {
1490                         tdb->ecode = TDB_ERR_EXISTS;
1491                         goto fail;
1492                 }
1493         } else {
1494                 /* first try in-place update, on modify or replace. */
1495                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1496                         goto out;
1497                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1498                     flag == TDB_MODIFY) {
1499                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1500                          we should fail the store */
1501                         goto fail;
1502         }
1503         }
1504         /* reset the error code potentially set by the tdb_update() */
1505         tdb->ecode = TDB_SUCCESS;
1506
1507         /* delete any existing record - if it doesn't exist we don't
1508            care.  Doing this first reduces fragmentation, and avoids
1509            coalescing with `allocated' block before it's updated. */
1510         if (flag != TDB_INSERT)
1511                 tdb_delete_hash(tdb, key, hash);
1512
1513         /* Copy key+value *before* allocating free space in case malloc
1514            fails and we are left with a dead spot in the tdb. */
1515
1516         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1517                 tdb->ecode = TDB_ERR_OOM;
1518                 goto fail;
1519         }
1520
1521         memcpy(p, key.dptr, key.dsize);
1522         if (dbuf.dsize)
1523                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1524
1525         /* we have to allocate some space */
1526         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1527                 goto fail;
1528
1529         /* Read hash top into next ptr */
1530         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1531                 goto fail;
1532
1533         rec.key_len = key.dsize;
1534         rec.data_len = dbuf.dsize;
1535         rec.full_hash = hash;
1536         rec.magic = TDB_MAGIC;
1537
1538         /* write out and point the top of the hash chain at it */
1539         if (rec_write(tdb, rec_ptr, &rec) == -1
1540             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1541             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1542                 /* Need to tdb_unallocate() here */
1543                 goto fail;
1544         }
1545  out:
1546         SAFE_FREE(p); 
1547         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1548         return ret;
1549 fail:
1550         ret = -1;
1551         goto out;
1552 }
1553
1554 /* Attempt to append data to an entry in place - this only works if the new data size
1555    is <= the old data size and the key exists.
1556    on failure return -1. Record must be locked before calling.
1557 */
1558 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1559 {
1560         struct list_struct rec;
1561         tdb_off rec_ptr;
1562
1563         /* find entry */
1564         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1565                 return -1;
1566
1567         /* Append of 0 is always ok. */
1568         if (new_dbuf.dsize == 0)
1569                 return 0;
1570
1571         /* must be long enough for key, old data + new data and tailer */
1572         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1573                 /* No room. */
1574                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1575                 return -1;
1576         }
1577
1578         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1579                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1580                 return -1;
1581
1582         /* update size */
1583         rec.data_len += new_dbuf.dsize;
1584         return rec_write(tdb, rec_ptr, &rec);
1585 }
1586
1587 /* Append to an entry. Create if not exist. */
1588
1589 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1590 {
1591         struct list_struct rec;
1592         u32 hash;
1593         tdb_off rec_ptr;
1594         char *p = NULL;
1595         int ret = 0;
1596         size_t new_data_size = 0;
1597
1598         /* find which hash bucket it is in */
1599         hash = tdb->hash_fn(&key);
1600         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1601                 return -1;
1602
1603         /* first try in-place. */
1604         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1605                 goto out;
1606
1607         /* reset the error code potentially set by the tdb_append_inplace() */
1608         tdb->ecode = TDB_SUCCESS;
1609
1610         /* find entry */
1611         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1612                 if (tdb->ecode != TDB_ERR_NOEXIST)
1613                         goto fail;
1614
1615                 /* Not found - create. */
1616
1617                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1618                 goto out;
1619         }
1620
1621         new_data_size = rec.data_len + new_dbuf.dsize;
1622
1623         /* Copy key+old_value+value *before* allocating free space in case malloc
1624            fails and we are left with a dead spot in the tdb. */
1625
1626         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1627                 tdb->ecode = TDB_ERR_OOM;
1628                 goto fail;
1629         }
1630
1631         /* Copy the key in place. */
1632         memcpy(p, key.dptr, key.dsize);
1633
1634         /* Now read the old data into place. */
1635         if (rec.data_len &&
1636                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1637                         goto fail;
1638
1639         /* Finally append the new data. */
1640         if (new_dbuf.dsize)
1641                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1642
1643         /* delete any existing record - if it doesn't exist we don't
1644            care.  Doing this first reduces fragmentation, and avoids
1645            coalescing with `allocated' block before it's updated. */
1646
1647         tdb_delete_hash(tdb, key, hash);
1648
1649         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1650                 goto fail;
1651
1652         /* Read hash top into next ptr */
1653         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1654                 goto fail;
1655
1656         rec.key_len = key.dsize;
1657         rec.data_len = new_data_size;
1658         rec.full_hash = hash;
1659         rec.magic = TDB_MAGIC;
1660
1661         /* write out and point the top of the hash chain at it */
1662         if (rec_write(tdb, rec_ptr, &rec) == -1
1663             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1664             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1665                 /* Need to tdb_unallocate() here */
1666                 goto fail;
1667         }
1668
1669  out:
1670         SAFE_FREE(p); 
1671         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1672         return ret;
1673
1674 fail:
1675         ret = -1;
1676         goto out;
1677 }
1678
1679 static int tdb_already_open(dev_t device,
1680                             ino_t ino)
1681 {
1682         TDB_CONTEXT *i;
1683         
1684         for (i = tdbs; i; i = i->next) {
1685                 if (i->device == device && i->inode == ino) {
1686                         return 1;
1687                 }
1688         }
1689
1690         return 0;
1691 }
1692
1693 /* This is based on the hash algorithm from gdbm */
1694 static u32 default_tdb_hash(TDB_DATA *key)
1695 {
1696         u32 value;      /* Used to compute the hash value.  */
1697         u32   i;        /* Used to cycle through random values. */
1698
1699         /* Set the initial value from the key size. */
1700         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
1701                 value = (value + (key->dptr[i] << (i*5 % 24)));
1702
1703         return (1103515243 * value + 12345);  
1704 }
1705
1706 /* open the database, creating it if necessary 
1707
1708    The open_flags and mode are passed straight to the open call on the
1709    database file. A flags value of O_WRONLY is invalid. The hash size
1710    is advisory, use zero for a default value.
1711
1712    Return is NULL on error, in which case errno is also set.  Don't 
1713    try to call tdb_error or tdb_errname, just do strerror(errno).
1714
1715    @param name may be NULL for internal databases. */
1716 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1717                       int open_flags, mode_t mode)
1718 {
1719         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
1720 }
1721
1722
1723 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1724                          int open_flags, mode_t mode,
1725                          tdb_log_func log_fn,
1726                          tdb_hash_func hash_fn)
1727 {
1728         TDB_CONTEXT *tdb;
1729         struct stat st;
1730         int rev = 0, locked = 0;
1731         unsigned char *vp;
1732         u32 vertest;
1733
1734         if (!(tdb = calloc(1, sizeof *tdb))) {
1735                 /* Can't log this */
1736                 errno = ENOMEM;
1737                 goto fail;
1738         }
1739         tdb->fd = -1;
1740         tdb->name = NULL;
1741         tdb->map_ptr = NULL;
1742         tdb->flags = tdb_flags;
1743         tdb->open_flags = open_flags;
1744         tdb->log_fn = log_fn;
1745         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
1746
1747         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1748                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1749                          name));
1750                 errno = EINVAL;
1751                 goto fail;
1752         }
1753         
1754         if (hash_size == 0)
1755                 hash_size = DEFAULT_HASH_SIZE;
1756         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1757                 tdb->read_only = 1;
1758                 /* read only databases don't do locking or clear if first */
1759                 tdb->flags |= TDB_NOLOCK;
1760                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1761         }
1762
1763         /* internal databases don't mmap or lock, and start off cleared */
1764         if (tdb->flags & TDB_INTERNAL) {
1765                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1766                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1767                 if (tdb_new_database(tdb, hash_size) != 0) {
1768                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1769                         goto fail;
1770                 }
1771                 goto internal;
1772         }
1773
1774         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1775                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1776                          name, strerror(errno)));
1777                 goto fail;      /* errno set by open(2) */
1778         }
1779
1780         /* ensure there is only one process initialising at once */
1781         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1782                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1783                          name, strerror(errno)));
1784                 goto fail;      /* errno set by tdb_brlock */
1785         }
1786
1787         /* we need to zero database if we are the only one with it open */
1788         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1789                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1790                 open_flags |= O_CREAT;
1791                 if (ftruncate(tdb->fd, 0) == -1) {
1792                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1793                                  "failed to truncate %s: %s\n",
1794                                  name, strerror(errno)));
1795                         goto fail; /* errno set by ftruncate */
1796                 }
1797         }
1798
1799         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1800             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1801             || (tdb->header.version != TDB_VERSION
1802                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1803                 /* its not a valid database - possibly initialise it */
1804                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1805                         errno = EIO; /* ie bad format or something */
1806                         goto fail;
1807                 }
1808                 rev = (tdb->flags & TDB_CONVERT);
1809         }
1810         vp = (unsigned char *)&tdb->header.version;
1811         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1812                   (((u32)vp[2]) << 8) | (u32)vp[3];
1813         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1814         if (!rev)
1815                 tdb->flags &= ~TDB_CONVERT;
1816         else {
1817                 tdb->flags |= TDB_CONVERT;
1818                 convert(&tdb->header, sizeof(tdb->header));
1819         }
1820         if (fstat(tdb->fd, &st) == -1)
1821                 goto fail;
1822
1823         /* Is it already in the open list?  If so, fail. */
1824         if (tdb_already_open(st.st_dev, st.st_ino)) {
1825                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1826                          "%s (%d,%d) is already open in this process\n",
1827                          name, (int)st.st_dev, (int)st.st_ino));
1828                 errno = EBUSY;
1829                 goto fail;
1830         }
1831
1832         if (!(tdb->name = (char *)strdup(name))) {
1833                 errno = ENOMEM;
1834                 goto fail;
1835         }
1836
1837         tdb->map_size = st.st_size;
1838         tdb->device = st.st_dev;
1839         tdb->inode = st.st_ino;
1840         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1841         if (!tdb->locked) {
1842                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1843                          "failed to allocate lock structure for %s\n",
1844                          name));
1845                 errno = ENOMEM;
1846                 goto fail;
1847         }
1848         tdb_mmap(tdb);
1849         if (locked) {
1850                 if (!tdb->read_only)
1851                         if (tdb_clear_spinlocks(tdb) != 0) {
1852                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1853                                 "failed to clear spinlock\n"));
1854                                 goto fail;
1855                         }
1856                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1857                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1858                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1859                                  name, strerror(errno)));
1860                         goto fail;
1861                 }
1862
1863         }
1864
1865         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1866            we didn't get the initial exclusive lock as we need to let all other
1867            users know we're using it. */
1868
1869         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1870                 /* leave this lock in place to indicate it's in use */
1871                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1872                         goto fail;
1873         }
1874
1875
1876  internal:
1877         /* Internal (memory-only) databases skip all the code above to
1878          * do with disk files, and resume here by releasing their
1879          * global lock and hooking into the active list. */
1880         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1881                 goto fail;
1882         tdb->next = tdbs;
1883         tdbs = tdb;
1884         return tdb;
1885
1886  fail:
1887         { int save_errno = errno;
1888
1889         if (!tdb)
1890                 return NULL;
1891         
1892         if (tdb->map_ptr) {
1893                 if (tdb->flags & TDB_INTERNAL)
1894                         SAFE_FREE(tdb->map_ptr);
1895                 else
1896                         tdb_munmap(tdb);
1897         }
1898         SAFE_FREE(tdb->name);
1899         if (tdb->fd != -1)
1900                 if (close(tdb->fd) != 0)
1901                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1902         SAFE_FREE(tdb->locked);
1903         SAFE_FREE(tdb);
1904         errno = save_errno;
1905         return NULL;
1906         }
1907 }
1908
1909 /**
1910  * Close a database.
1911  *
1912  * @returns -1 for error; 0 for success.
1913  **/
1914 int tdb_close(TDB_CONTEXT *tdb)
1915 {
1916         TDB_CONTEXT **i;
1917         int ret = 0;
1918
1919         if (tdb->map_ptr) {
1920                 if (tdb->flags & TDB_INTERNAL)
1921                         SAFE_FREE(tdb->map_ptr);
1922                 else
1923                         tdb_munmap(tdb);
1924         }
1925         SAFE_FREE(tdb->name);
1926         if (tdb->fd != -1)
1927                 ret = close(tdb->fd);
1928         SAFE_FREE(tdb->locked);
1929
1930         /* Remove from contexts list */
1931         for (i = &tdbs; *i; i = &(*i)->next) {
1932                 if (*i == tdb) {
1933                         *i = tdb->next;
1934                         break;
1935                 }
1936         }
1937
1938         memset(tdb, 0, sizeof(*tdb));
1939         SAFE_FREE(tdb);
1940
1941         return ret;
1942 }
1943
1944 /* lock/unlock entire database */
1945 int tdb_lockall(TDB_CONTEXT *tdb)
1946 {
1947         u32 i;
1948
1949         /* There are no locks on read-only dbs */
1950         if (tdb->read_only)
1951                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1952         for (i = 0; i < tdb->header.hash_size; i++) 
1953                 if (tdb_lock(tdb, i, F_WRLCK))
1954                         break;
1955
1956         /* If error, release locks we have... */
1957         if (i < tdb->header.hash_size) {
1958                 u32 j;
1959
1960                 for ( j = 0; j < i; j++)
1961                         tdb_unlock(tdb, j, F_WRLCK);
1962                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1963         }
1964
1965         return 0;
1966 }
1967 void tdb_unlockall(TDB_CONTEXT *tdb)
1968 {
1969         u32 i;
1970         for (i=0; i < tdb->header.hash_size; i++)
1971                 tdb_unlock(tdb, i, F_WRLCK);
1972 }
1973
1974 /* lock/unlock one hash chain. This is meant to be used to reduce
1975    contention - it cannot guarantee how many records will be locked */
1976 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1977 {
1978         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1979 }
1980
1981 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1982 {
1983         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1984 }
1985
1986 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1987 {
1988         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1989 }
1990
1991 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1992 {
1993         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1994 }
1995
1996
1997 /* register a loging function */
1998 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1999 {
2000         tdb->log_fn = fn;
2001 }
2002
2003 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
2004    seek pointer from our parent and to re-establish locks */
2005 int tdb_reopen(TDB_CONTEXT *tdb)
2006 {
2007         struct stat st;
2008
2009         if (tdb->flags & TDB_INTERNAL)
2010                 return 0; /* Nothing to do. */
2011         if (tdb_munmap(tdb) != 0) {
2012                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2013                 goto fail;
2014         }
2015         if (close(tdb->fd) != 0)
2016                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2017         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2018         if (tdb->fd == -1) {
2019                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2020                 goto fail;
2021         }
2022         if (fstat(tdb->fd, &st) != 0) {
2023                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2024                 goto fail;
2025         }
2026         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2027                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2028                 goto fail;
2029         }
2030         tdb_mmap(tdb);
2031         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2032                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2033                 goto fail;
2034         }
2035
2036         return 0;
2037
2038 fail:
2039         tdb_close(tdb);
2040         return -1;
2041 }
2042
2043 /* reopen all tdb's */
2044 int tdb_reopen_all(void)
2045 {
2046         TDB_CONTEXT *tdb;
2047
2048         for (tdb=tdbs; tdb; tdb = tdb->next) {
2049                 /* Ensure no clear-if-first. */
2050                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
2051                 if (tdb_reopen(tdb) != 0)
2052                         return -1;
2053         }
2054
2055         return 0;
2056 }