tdb: Add tdb_traverse_chain
[samba.git] / lib / tdb / common / traverse.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 #define TDB_NEXT_LOCK_ERR ((tdb_off_t)-1)
31
32 /* Uses traverse lock: 0 = finish, TDB_NEXT_LOCK_ERR = error,
33    other = record offset */
34 static tdb_off_t tdb_next_lock(struct tdb_context *tdb, struct tdb_traverse_lock *tlock,
35                          struct tdb_record *rec)
36 {
37         int want_next = (tlock->off != 0);
38
39         /* Lock each chain from the start one. */
40         for (; tlock->list < tdb->hash_size; tlock->list++) {
41                 if (!tlock->off && tlock->list != 0) {
42                         /* this is an optimisation for the common case where
43                            the hash chain is empty, which is particularly
44                            common for the use of tdb with ldb, where large
45                            hashes are used. In that case we spend most of our
46                            time in tdb_brlock(), locking empty hash chains.
47
48                            To avoid this, we do an unlocked pre-check to see
49                            if the hash chain is empty before starting to look
50                            inside it. If it is empty then we can avoid that
51                            hash chain. If it isn't empty then we can't believe
52                            the value we get back, as we read it without a
53                            lock, so instead we get the lock and re-fetch the
54                            value below.
55
56                            Notice that not doing this optimisation on the
57                            first hash chain is critical. We must guarantee
58                            that we have done at least one fcntl lock at the
59                            start of a search to guarantee that memory is
60                            coherent on SMP systems. If records are added by
61                            others during the search then thats OK, and we
62                            could possibly miss those with this trick, but we
63                            could miss them anyway without this trick, so the
64                            semantics don't change.
65
66                            With a non-indexed ldb search this trick gains us a
67                            factor of around 80 in speed on a linux 2.6.x
68                            system (testing using ldbtest).
69                         */
70                         tdb->methods->next_hash_chain(tdb, &tlock->list);
71                         if (tlock->list == tdb->hash_size) {
72                                 continue;
73                         }
74                 }
75
76                 if (tdb_lock(tdb, tlock->list, tlock->lock_rw) == -1)
77                         return TDB_NEXT_LOCK_ERR;
78
79                 /* No previous record?  Start at top of chain. */
80                 if (!tlock->off) {
81                         if (tdb_ofs_read(tdb, TDB_HASH_TOP(tlock->list),
82                                      &tlock->off) == -1)
83                                 goto fail;
84                 } else {
85                         /* Otherwise unlock the previous record. */
86                         if (tdb_unlock_record(tdb, tlock->off) != 0)
87                                 goto fail;
88                 }
89
90                 if (want_next) {
91                         /* We have offset of old record: grab next */
92                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
93                                 goto fail;
94                         tlock->off = rec->next;
95                 }
96
97                 /* Iterate through chain */
98                 while( tlock->off) {
99                         if (tdb_rec_read(tdb, tlock->off, rec) == -1)
100                                 goto fail;
101
102                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
103                         if (tlock->off == rec->next) {
104                                 tdb->ecode = TDB_ERR_CORRUPT;
105                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: loop detected.\n"));
106                                 goto fail;
107                         }
108
109                         if (!TDB_DEAD(rec)) {
110                                 /* Woohoo: we found one! */
111                                 if (tdb_lock_record(tdb, tlock->off) != 0)
112                                         goto fail;
113                                 return tlock->off;
114                         }
115
116                         tlock->off = rec->next;
117                 }
118                 tdb_unlock(tdb, tlock->list, tlock->lock_rw);
119                 want_next = 0;
120         }
121         /* We finished iteration without finding anything */
122         tdb->ecode = TDB_SUCCESS;
123         return 0;
124
125  fail:
126         tlock->off = 0;
127         if (tdb_unlock(tdb, tlock->list, tlock->lock_rw) != 0)
128                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_next_lock: On error unlock failed!\n"));
129         return TDB_NEXT_LOCK_ERR;
130 }
131
132 /* traverse the entire database - calling fn(tdb, key, data) on each element.
133    return -1 on error or the record count traversed
134    if fn is NULL then it is not called
135    a non-zero return value from fn() indicates that the traversal should stop
136   */
137 static int tdb_traverse_internal(struct tdb_context *tdb,
138                                  tdb_traverse_func fn, void *private_data,
139                                  struct tdb_traverse_lock *tl)
140 {
141         TDB_DATA key, dbuf;
142         struct tdb_record rec;
143         int ret = 0, count = 0;
144         tdb_off_t off;
145         size_t recbuf_len;
146
147         recbuf_len = 4096;
148         key.dptr = malloc(recbuf_len);
149         if (key.dptr == NULL) {
150                 return -1;
151         }
152
153         /* This was in the initialization, above, but the IRIX compiler
154          * did not like it.  crh
155          */
156         tl->next = tdb->travlocks.next;
157
158         /* fcntl locks don't stack: beware traverse inside traverse */
159         tdb->travlocks.next = tl;
160
161         /* tdb_next_lock places locks on the record returned, and its chain */
162         while ((off = tdb_next_lock(tdb, tl, &rec)) != 0) {
163                 tdb_len_t full_len;
164                 int nread;
165
166                 if (off == TDB_NEXT_LOCK_ERR) {
167                         ret = -1;
168                         goto out;
169                 }
170
171                 full_len = rec.key_len + rec.data_len;
172
173                 if (full_len > recbuf_len) {
174                         recbuf_len = full_len;
175
176                         /*
177                          * No realloc, we don't need the old data and thus can
178                          * do without the memcpy
179                          */
180                         free(key.dptr);
181                         key.dptr = malloc(recbuf_len);
182
183                         if (key.dptr == NULL) {
184                                 ret = -1;
185                                 if (tdb_unlock(tdb, tl->list, tl->lock_rw)
186                                     != 0) {
187                                         goto out;
188                                 }
189                                 if (tdb_unlock_record(tdb, tl->off) != 0) {
190                                         TDB_LOG((tdb, TDB_DEBUG_FATAL,
191                                                  "tdb_traverse: malloc "
192                                                  "failed and unlock_record "
193                                                  "failed!\n"));
194                                 }
195                                 goto out;
196                         }
197                 }
198
199                 count++;
200                 /* now read the full record */
201                 nread = tdb->methods->tdb_read(tdb, tl->off + sizeof(rec),
202                                                key.dptr, full_len, 0);
203                 if (nread == -1) {
204                         ret = -1;
205                         if (tdb_unlock(tdb, tl->list, tl->lock_rw) != 0)
206                                 goto out;
207                         if (tdb_unlock_record(tdb, tl->off) != 0)
208                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
209                         goto out;
210                 }
211                 key.dsize = rec.key_len;
212                 dbuf.dptr = key.dptr + rec.key_len;
213                 dbuf.dsize = rec.data_len;
214
215                 tdb_trace_1rec_retrec(tdb, "traverse", key, dbuf);
216
217                 /* Drop chain lock, call out */
218                 if (tdb_unlock(tdb, tl->list, tl->lock_rw) != 0) {
219                         ret = -1;
220                         goto out;
221                 }
222                 if (fn && fn(tdb, key, dbuf, private_data)) {
223                         /* They want us to terminate traversal */
224                         tdb_trace_ret(tdb, "tdb_traverse_end", count);
225                         if (tdb_unlock_record(tdb, tl->off) != 0) {
226                                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_traverse: unlock_record failed!\n"));;
227                                 ret = -1;
228                         }
229                         goto out;
230                 }
231         }
232         tdb_trace(tdb, "tdb_traverse_end");
233 out:
234         SAFE_FREE(key.dptr);
235         tdb->travlocks.next = tl->next;
236         if (ret < 0)
237                 return -1;
238         else
239                 return count;
240 }
241
242
243 /*
244   a read style traverse - temporarily marks each record read only
245 */
246 _PUBLIC_ int tdb_traverse_read(struct tdb_context *tdb,
247                       tdb_traverse_func fn, void *private_data)
248 {
249         struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
250         int ret;
251
252         tdb->traverse_read++;
253         tdb_trace(tdb, "tdb_traverse_read_start");
254         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
255         tdb->traverse_read--;
256
257         return ret;
258 }
259
260 /*
261   a write style traverse - needs to get the transaction lock to
262   prevent deadlocks
263
264   WARNING: The data buffer given to the callback fn does NOT meet the
265   alignment guarantees malloc gives you.
266 */
267 _PUBLIC_ int tdb_traverse(struct tdb_context *tdb,
268                  tdb_traverse_func fn, void *private_data)
269 {
270         struct tdb_traverse_lock tl = { NULL, 0, 0, F_WRLCK };
271         enum tdb_lock_flags lock_flags;
272         int ret;
273
274         if (tdb->read_only || tdb->traverse_read) {
275                 return tdb_traverse_read(tdb, fn, private_data);
276         }
277
278         lock_flags = TDB_LOCK_WAIT;
279
280         if (tdb->allrecord_lock.count != 0) {
281                 /*
282                  * This avoids a deadlock between tdb_lockall() and
283                  * tdb_traverse(). See
284                  * https://bugzilla.samba.org/show_bug.cgi?id=11381
285                  */
286                 lock_flags = TDB_LOCK_NOWAIT;
287         }
288
289         if (tdb_transaction_lock(tdb, F_WRLCK, lock_flags)) {
290                 return -1;
291         }
292
293         tdb->traverse_write++;
294         tdb_trace(tdb, "tdb_traverse_start");
295         ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
296         tdb->traverse_write--;
297
298         tdb_transaction_unlock(tdb, F_WRLCK);
299
300         return ret;
301 }
302
303
304 /* find the first entry in the database and return its key */
305 _PUBLIC_ TDB_DATA tdb_firstkey(struct tdb_context *tdb)
306 {
307         TDB_DATA key;
308         struct tdb_record rec;
309         tdb_off_t off;
310
311         /* release any old lock */
312         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0)
313                 return tdb_null;
314         tdb->travlocks.off = tdb->travlocks.list = 0;
315         tdb->travlocks.lock_rw = F_RDLCK;
316
317         /* Grab first record: locks chain and returned record. */
318         off = tdb_next_lock(tdb, &tdb->travlocks, &rec);
319         if (off == 0 || off == TDB_NEXT_LOCK_ERR) {
320                 tdb_trace_retrec(tdb, "tdb_firstkey", tdb_null);
321                 return tdb_null;
322         }
323         /* now read the key */
324         key.dsize = rec.key_len;
325         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
326
327         tdb_trace_retrec(tdb, "tdb_firstkey", key);
328
329         /* Unlock the hash chain of the record we just read. */
330         if (tdb_unlock(tdb, tdb->travlocks.list, tdb->travlocks.lock_rw) != 0)
331                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
332         return key;
333 }
334
335 /* find the next entry in the database, returning its key */
336 _PUBLIC_ TDB_DATA tdb_nextkey(struct tdb_context *tdb, TDB_DATA oldkey)
337 {
338         uint32_t oldlist;
339         TDB_DATA key = tdb_null;
340         struct tdb_record rec;
341         unsigned char *k = NULL;
342         tdb_off_t off;
343
344         /* Is locked key the old key?  If so, traverse will be reliable. */
345         if (tdb->travlocks.off) {
346                 if (tdb_lock(tdb,tdb->travlocks.list,tdb->travlocks.lock_rw))
347                         return tdb_null;
348                 if (tdb_rec_read(tdb, tdb->travlocks.off, &rec) == -1
349                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
350                                             rec.key_len))
351                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
352                         /* No, it wasn't: unlock it and start from scratch */
353                         if (tdb_unlock_record(tdb, tdb->travlocks.off) != 0) {
354                                 tdb_trace_1rec_retrec(tdb, "tdb_nextkey",
355                                                       oldkey, tdb_null);
356                                 SAFE_FREE(k);
357                                 return tdb_null;
358                         }
359                         if (tdb_unlock(tdb, tdb->travlocks.list, tdb->travlocks.lock_rw) != 0) {
360                                 SAFE_FREE(k);
361                                 return tdb_null;
362                         }
363                         tdb->travlocks.off = 0;
364                 }
365
366                 SAFE_FREE(k);
367         }
368
369         if (!tdb->travlocks.off) {
370                 /* No previous element: do normal find, and lock record */
371                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), tdb->travlocks.lock_rw, &rec);
372                 if (!tdb->travlocks.off) {
373                         tdb_trace_1rec_retrec(tdb, "tdb_nextkey", oldkey, tdb_null);
374                         return tdb_null;
375                 }
376                 tdb->travlocks.list = BUCKET(rec.full_hash);
377                 if (tdb_lock_record(tdb, tdb->travlocks.off) != 0) {
378                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
379                         return tdb_null;
380                 }
381         }
382         oldlist = tdb->travlocks.list;
383
384         /* Grab next record: locks chain and returned record,
385            unlocks old record */
386         off = tdb_next_lock(tdb, &tdb->travlocks, &rec);
387         if (off != TDB_NEXT_LOCK_ERR && off != 0) {
388                 key.dsize = rec.key_len;
389                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
390                                           key.dsize);
391                 /* Unlock the chain of this new record */
392                 if (tdb_unlock(tdb, tdb->travlocks.list, tdb->travlocks.lock_rw) != 0)
393                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
394         }
395         /* Unlock the chain of old record */
396         if (tdb_unlock(tdb, oldlist, tdb->travlocks.lock_rw) != 0)
397                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
398         tdb_trace_1rec_retrec(tdb, "tdb_nextkey", oldkey, key);
399         return key;
400 }
401
402 _PUBLIC_ int tdb_traverse_chain(struct tdb_context *tdb,
403                                 unsigned chain,
404                                 tdb_traverse_func fn,
405                                 void *private_data)
406 {
407         tdb_off_t rec_ptr;
408         struct tdb_chainwalk_ctx chainwalk;
409         int count = 0;
410         int ret;
411
412         if (chain >= tdb->hash_size) {
413                 tdb->ecode = TDB_ERR_EINVAL;
414                 return -1;
415         }
416
417         if (tdb->traverse_read != 0) {
418                 tdb->ecode = TDB_ERR_LOCK;
419                 return -1;
420         }
421
422         ret = tdb_lock(tdb, chain, F_RDLCK);
423         if (ret == -1) {
424                 return -1;
425         }
426
427         tdb->traverse_read += 1;
428
429         ret = tdb_ofs_read(tdb, TDB_HASH_TOP(chain), &rec_ptr);
430         if (ret == -1) {
431                 goto fail;
432         }
433
434         tdb_chainwalk_init(&chainwalk, rec_ptr);
435
436         while (rec_ptr != 0) {
437                 struct tdb_record rec;
438                 bool ok;
439
440                 ret = tdb_rec_read(tdb, rec_ptr, &rec);
441                 if (ret == -1) {
442                         goto fail;
443                 }
444
445                 if (!TDB_DEAD(&rec)) {
446                         /* no overflow checks, tdb_rec_read checked it */
447                         tdb_off_t key_ofs = rec_ptr + sizeof(rec);
448                         size_t full_len = rec.key_len + rec.data_len;
449                         uint8_t *buf = NULL;
450
451                         TDB_DATA key = { .dsize = rec.key_len };
452                         TDB_DATA data = { .dsize = rec.data_len };
453
454                         if ((tdb->transaction == NULL) &&
455                             (tdb->map_ptr != NULL)) {
456                                 ret = tdb->methods->tdb_oob(
457                                         tdb, key_ofs, full_len, 0);
458                                 if (ret == -1) {
459                                         goto fail;
460                                 }
461                                 key.dptr = (uint8_t *)tdb->map_ptr + key_ofs;
462                         } else {
463                                 buf = tdb_alloc_read(tdb, key_ofs, full_len);
464                                 if (buf == NULL) {
465                                         goto fail;
466                                 }
467                                 key.dptr = buf;
468                         }
469                         data.dptr = key.dptr + key.dsize;
470
471                         ret = fn(tdb, key, data, private_data);
472                         free(buf);
473
474                         count += 1;
475
476                         if (ret != 0) {
477                                 break;
478                         }
479                 }
480
481                 rec_ptr = rec.next;
482
483                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
484                 if (!ok) {
485                         goto fail;
486                 }
487         }
488         tdb->traverse_read -= 1;
489         tdb_unlock(tdb, chain, F_RDLCK);
490         return count;
491
492 fail:
493         tdb->traverse_read -= 1;
494         tdb_unlock(tdb, chain, F_RDLCK);
495         return -1;
496 }
497
498 _PUBLIC_ int tdb_traverse_key_chain(struct tdb_context *tdb,
499                                     TDB_DATA key,
500                                     tdb_traverse_func fn,
501                                     void *private_data)
502 {
503         uint32_t hash, chain;
504         int ret;
505
506         hash = tdb->hash_fn(&key);
507         chain = BUCKET(hash);
508         ret = tdb_traverse_chain(tdb, chain, fn, private_data);
509
510         return ret;
511 }