lib ldb key_value: Add get_size method
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include "../ldb_key_value/ldb_kv.h"
55 #include <tdb.h>
56
57 /*
58   lock the database for read - use by ltdb_search and ltdb_sequence_number
59 */
60 static int ltdb_lock_read(struct ldb_module *module)
61 {
62         void *data = ldb_module_get_private(module);
63         struct ldb_kv_private *ldb_kv =
64             talloc_get_type(data, struct ldb_kv_private);
65         int tdb_ret = 0;
66         int ret;
67         pid_t pid = getpid();
68
69         if (ldb_kv->pid != pid) {
70                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
71                                        __location__
72                                        ": Reusing ldb opend by pid %d in "
73                                        "process %d\n",
74                                        ldb_kv->pid,
75                                        pid);
76                 return LDB_ERR_PROTOCOL_ERROR;
77         }
78
79         if (tdb_transaction_active(ldb_kv->tdb) == false &&
80             ldb_kv->read_lock_count == 0) {
81                 tdb_ret = tdb_lockall_read(ldb_kv->tdb);
82         }
83         if (tdb_ret == 0) {
84                 ldb_kv->read_lock_count++;
85                 return LDB_SUCCESS;
86         }
87         ret = ltdb_err_map(tdb_error(ldb_kv->tdb));
88         if (ret == LDB_SUCCESS) {
89                 ret = LDB_ERR_OPERATIONS_ERROR;
90         }
91         ldb_debug_set(ldb_module_get_ctx(module),
92                       LDB_DEBUG_FATAL,
93                       "Failure during ltdb_lock_read(): %s -> %s",
94                       tdb_errorstr(ldb_kv->tdb),
95                       ldb_strerror(ret));
96         return ret;
97 }
98
99 /*
100   unlock the database after a ltdb_lock_read()
101 */
102 static int ltdb_unlock_read(struct ldb_module *module)
103 {
104         void *data = ldb_module_get_private(module);
105         struct ldb_kv_private *ldb_kv =
106             talloc_get_type(data, struct ldb_kv_private);
107         pid_t pid = getpid();
108
109         if (ldb_kv->pid != pid) {
110                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
111                                        __location__
112                                        ": Reusing ldb opend by pid %d in "
113                                        "process %d\n",
114                                        ldb_kv->pid,
115                                        pid);
116                 return LDB_ERR_PROTOCOL_ERROR;
117         }
118         if (!tdb_transaction_active(ldb_kv->tdb) &&
119             ldb_kv->read_lock_count == 1) {
120                 tdb_unlockall_read(ldb_kv->tdb);
121                 ldb_kv->read_lock_count--;
122                 return 0;
123         }
124         ldb_kv->read_lock_count--;
125         return 0;
126 }
127
128 static int ltdb_store(struct ldb_kv_private *ldb_kv,
129                       struct ldb_val ldb_key,
130                       struct ldb_val ldb_data,
131                       int flags)
132 {
133         TDB_DATA key = {
134                 .dptr = ldb_key.data,
135                 .dsize = ldb_key.length
136         };
137         TDB_DATA data = {
138                 .dptr = ldb_data.data,
139                 .dsize = ldb_data.length
140         };
141         bool transaction_active = tdb_transaction_active(ldb_kv->tdb);
142         if (transaction_active == false){
143                 return LDB_ERR_PROTOCOL_ERROR;
144         }
145         return tdb_store(ldb_kv->tdb, key, data, flags);
146 }
147
148 static int ltdb_error(struct ldb_kv_private *ldb_kv)
149 {
150         return ltdb_err_map(tdb_error(ldb_kv->tdb));
151 }
152
153 static const char *ltdb_errorstr(struct ldb_kv_private *ldb_kv)
154 {
155         return tdb_errorstr(ldb_kv->tdb);
156 }
157
158 static int ltdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val ldb_key)
159 {
160         TDB_DATA tdb_key = {
161                 .dptr = ldb_key.data,
162                 .dsize = ldb_key.length
163         };
164         bool transaction_active = tdb_transaction_active(ldb_kv->tdb);
165         if (transaction_active == false){
166                 return LDB_ERR_PROTOCOL_ERROR;
167         }
168         return tdb_delete(ldb_kv->tdb, tdb_key);
169 }
170
171 static int ltdb_transaction_start(struct ldb_kv_private *ldb_kv)
172 {
173         pid_t pid = getpid();
174
175         if (ldb_kv->pid != pid) {
176                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
177                                        __location__
178                                        ": Reusing ldb opend by pid %d in "
179                                        "process %d\n",
180                                        ldb_kv->pid,
181                                        pid);
182                 return LDB_ERR_PROTOCOL_ERROR;
183         }
184
185         return tdb_transaction_start(ldb_kv->tdb);
186 }
187
188 static int ltdb_transaction_cancel(struct ldb_kv_private *ldb_kv)
189 {
190         pid_t pid = getpid();
191
192         if (ldb_kv->pid != pid) {
193                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
194                                        __location__
195                                        ": Reusing ldb opend by pid %d in "
196                                        "process %d\n",
197                                        ldb_kv->pid,
198                                        pid);
199                 return LDB_ERR_PROTOCOL_ERROR;
200         }
201
202         return tdb_transaction_cancel(ldb_kv->tdb);
203 }
204
205 static int ltdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv)
206 {
207         pid_t pid = getpid();
208
209         if (ldb_kv->pid != pid) {
210                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
211                                        __location__
212                                        ": Reusing ldb opend by pid %d in "
213                                        "process %d\n",
214                                        ldb_kv->pid,
215                                        pid);
216                 return LDB_ERR_PROTOCOL_ERROR;
217         }
218
219         return tdb_transaction_prepare_commit(ldb_kv->tdb);
220 }
221
222 static int ltdb_transaction_commit(struct ldb_kv_private *ldb_kv)
223 {
224         pid_t pid = getpid();
225
226         if (ldb_kv->pid != pid) {
227                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
228                                        __location__
229                                        ": Reusing ldb opend by pid %d in "
230                                        "process %d\n",
231                                        ldb_kv->pid,
232                                        pid);
233                 return LDB_ERR_PROTOCOL_ERROR;
234         }
235
236         return tdb_transaction_commit(ldb_kv->tdb);
237 }
238 struct kv_ctx {
239         ldb_kv_traverse_fn kv_traverse_fn;
240         void *ctx;
241         struct ldb_kv_private *ldb_kv;
242         int (*parser)(struct ldb_val key,
243                       struct ldb_val data,
244                       void *private_data);
245 };
246
247 static int ltdb_traverse_fn_wrapper(struct tdb_context *tdb,
248                                     TDB_DATA tdb_key,
249                                     TDB_DATA tdb_data,
250                                     void *ctx)
251 {
252         struct kv_ctx *kv_ctx = ctx;
253         struct ldb_val key = {
254                 .length = tdb_key.dsize,
255                 .data = tdb_key.dptr,
256         };
257         struct ldb_val data = {
258                 .length = tdb_data.dsize,
259                 .data = tdb_data.dptr,
260         };
261         return kv_ctx->kv_traverse_fn(kv_ctx->ldb_kv, key, data, kv_ctx->ctx);
262 }
263
264 static int ltdb_traverse_fn(struct ldb_kv_private *ldb_kv,
265                             ldb_kv_traverse_fn fn,
266                             void *ctx)
267 {
268         struct kv_ctx kv_ctx = {
269             .kv_traverse_fn = fn, .ctx = ctx, .ldb_kv = ldb_kv};
270         if (tdb_transaction_active(ldb_kv->tdb)) {
271                 return tdb_traverse(
272                     ldb_kv->tdb, ltdb_traverse_fn_wrapper, &kv_ctx);
273         } else {
274                 return tdb_traverse_read(
275                     ldb_kv->tdb, ltdb_traverse_fn_wrapper, &kv_ctx);
276         }
277 }
278
279 static int ltdb_update_in_iterate(struct ldb_kv_private *ldb_kv,
280                                   struct ldb_val ldb_key,
281                                   struct ldb_val ldb_key2,
282                                   struct ldb_val ldb_data,
283                                   void *state)
284 {
285         int tdb_ret;
286         struct ldb_context *ldb;
287         struct ldb_kv_reindex_context *ctx =
288             (struct ldb_kv_reindex_context *)state;
289         struct ldb_module *module = ctx->module;
290         TDB_DATA key = {
291                 .dptr = ldb_key.data,
292                 .dsize = ldb_key.length
293         };
294         TDB_DATA key2 = {
295                 .dptr = ldb_key2.data,
296                 .dsize = ldb_key2.length
297         };
298         TDB_DATA data = {
299                 .dptr = ldb_data.data,
300                 .dsize = ldb_data.length
301         };
302
303         ldb = ldb_module_get_ctx(module);
304
305         tdb_ret = tdb_delete(ldb_kv->tdb, key);
306         if (tdb_ret != 0) {
307                 ldb_debug(ldb,
308                           LDB_DEBUG_ERROR,
309                           "Failed to delete %*.*s "
310                           "for rekey as %*.*s: %s",
311                           (int)key.dsize,
312                           (int)key.dsize,
313                           (const char *)key.dptr,
314                           (int)key2.dsize,
315                           (int)key2.dsize,
316                           (const char *)key.dptr,
317                           tdb_errorstr(ldb_kv->tdb));
318                 ctx->error = ltdb_err_map(tdb_error(ldb_kv->tdb));
319                 return -1;
320         }
321         tdb_ret = tdb_store(ldb_kv->tdb, key2, data, 0);
322         if (tdb_ret != 0) {
323                 ldb_debug(ldb,
324                           LDB_DEBUG_ERROR,
325                           "Failed to rekey %*.*s as %*.*s: %s",
326                           (int)key.dsize,
327                           (int)key.dsize,
328                           (const char *)key.dptr,
329                           (int)key2.dsize,
330                           (int)key2.dsize,
331                           (const char *)key.dptr,
332                           tdb_errorstr(ldb_kv->tdb));
333                 ctx->error = ltdb_err_map(tdb_error(ldb_kv->tdb));
334                 return -1;
335         }
336         return tdb_ret;
337 }
338
339 static int ltdb_parse_record_wrapper(TDB_DATA tdb_key,
340                                      TDB_DATA tdb_data,
341                                      void *ctx)
342 {
343         struct kv_ctx *kv_ctx = ctx;
344         struct ldb_val key = {
345                 .length = tdb_key.dsize,
346                 .data = tdb_key.dptr,
347         };
348         struct ldb_val data = {
349                 .length = tdb_data.dsize,
350                 .data = tdb_data.dptr,
351         };
352
353         return kv_ctx->parser(key, data, kv_ctx->ctx);
354 }
355
356 static int ltdb_parse_record(struct ldb_kv_private *ldb_kv,
357                              struct ldb_val ldb_key,
358                              int (*parser)(struct ldb_val key,
359                                            struct ldb_val data,
360                                            void *private_data),
361                              void *ctx)
362 {
363         struct kv_ctx kv_ctx = {.parser = parser, .ctx = ctx, .ldb_kv = ldb_kv};
364         TDB_DATA key = {
365                 .dptr = ldb_key.data,
366                 .dsize = ldb_key.length
367         };
368         int ret;
369
370         if (tdb_transaction_active(ldb_kv->tdb) == false &&
371             ldb_kv->read_lock_count == 0) {
372                 return LDB_ERR_PROTOCOL_ERROR;
373         }
374
375         ret = tdb_parse_record(
376             ldb_kv->tdb, key, ltdb_parse_record_wrapper, &kv_ctx);
377         if (ret == 0) {
378                 return LDB_SUCCESS;
379         }
380         return ltdb_err_map(tdb_error(ldb_kv->tdb));
381 }
382
383 static const char *ltdb_name(struct ldb_kv_private *ldb_kv)
384 {
385         return tdb_name(ldb_kv->tdb);
386 }
387
388 static bool ltdb_changed(struct ldb_kv_private *ldb_kv)
389 {
390         int seq = tdb_get_seqnum(ldb_kv->tdb);
391         bool has_changed = (seq != ldb_kv->tdb_seqnum);
392
393         ldb_kv->tdb_seqnum = seq;
394
395         return has_changed;
396 }
397
398 static bool ltdb_transaction_active(struct ldb_kv_private *ldb_kv)
399 {
400         return tdb_transaction_active(ldb_kv->tdb);
401 }
402
403 /*
404  * Get an estimate of the number of records in a tdb database.
405  *
406  * This implementation will overestimate the number of records in a sparsely
407  * populated database. The size estimate is only used for allocating
408  * an in memory tdb to cache index records during a reindex, overestimating
409  * the contents is acceptable, and preferable to underestimating
410  */
411 #define RECORD_SIZE 500
412 static size_t ltdb_get_size(struct ldb_kv_private *ldb_kv)
413 {
414         size_t map_size = tdb_map_size(ldb_kv->tdb);
415         size_t size = map_size / RECORD_SIZE;
416
417         return size;
418 }
419
420 static const struct kv_db_ops key_value_ops = {
421     .store = ltdb_store,
422     .delete = ltdb_delete,
423     .iterate = ltdb_traverse_fn,
424     .update_in_iterate = ltdb_update_in_iterate,
425     .fetch_and_parse = ltdb_parse_record,
426     .lock_read = ltdb_lock_read,
427     .unlock_read = ltdb_unlock_read,
428     .begin_write = ltdb_transaction_start,
429     .prepare_write = ltdb_transaction_prepare_commit,
430     .finish_write = ltdb_transaction_commit,
431     .abort_write = ltdb_transaction_cancel,
432     .error = ltdb_error,
433     .errorstr = ltdb_errorstr,
434     .name = ltdb_name,
435     .has_changed = ltdb_changed,
436     .transaction_active = ltdb_transaction_active,
437     .get_size = ltdb_get_size,
438 };
439
440 /*
441   connect to the database
442 */
443 int ltdb_connect(struct ldb_context *ldb, const char *url,
444                  unsigned int flags, const char *options[],
445                  struct ldb_module **_module)
446 {
447         const char *path;
448         int tdb_flags, open_flags;
449         struct ldb_kv_private *ldb_kv;
450
451         /*
452          * We hold locks, so we must use a private event context
453          * on each returned handle
454          */
455         ldb_set_require_private_event_context(ldb);
456
457         /* parse the url */
458         if (strchr(url, ':')) {
459                 if (strncmp(url, "tdb://", 6) != 0) {
460                         ldb_debug(ldb, LDB_DEBUG_ERROR,
461                                   "Invalid tdb URL '%s'", url);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464                 path = url+6;
465         } else {
466                 path = url;
467         }
468
469         tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
470
471         /* check for the 'nosync' option */
472         if (flags & LDB_FLG_NOSYNC) {
473                 tdb_flags |= TDB_NOSYNC;
474         }
475
476         /* and nommap option */
477         if (flags & LDB_FLG_NOMMAP) {
478                 tdb_flags |= TDB_NOMMAP;
479         }
480
481         ldb_kv = talloc_zero(ldb, struct ldb_kv_private);
482         if (!ldb_kv) {
483                 ldb_oom(ldb);
484                 return LDB_ERR_OPERATIONS_ERROR;
485         }
486
487         if (flags & LDB_FLG_RDONLY) {
488                 /*
489                  * This is weird, but because we can only have one tdb
490                  * in this process, and the other one could be
491                  * read-write, we can't use the tdb readonly.  Plus a
492                  * read only tdb prohibits the all-record lock.
493                  */
494                 open_flags = O_RDWR;
495
496                 ldb_kv->read_only = true;
497
498         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
499                 /*
500                  * This is used by ldbsearch to prevent creation of the database
501                  * if the name is wrong
502                  */
503                 open_flags = O_RDWR;
504         } else {
505                 /*
506                  * This is the normal case
507                  */
508                 open_flags = O_CREAT | O_RDWR;
509         }
510
511         ldb_kv->kv_ops = &key_value_ops;
512
513         errno = 0;
514         /* note that we use quite a large default hash size */
515         ldb_kv->tdb = ltdb_wrap_open(ldb_kv,
516                                      path,
517                                      10000,
518                                      tdb_flags,
519                                      open_flags,
520                                      ldb_get_create_perms(ldb),
521                                      ldb);
522         if (!ldb_kv->tdb) {
523                 ldb_asprintf_errstring(ldb,
524                                        "Unable to open tdb '%s': %s", path, strerror(errno));
525                 ldb_debug(ldb, LDB_DEBUG_ERROR,
526                           "Unable to open tdb '%s': %s", path, strerror(errno));
527                 talloc_free(ldb_kv);
528                 if (errno == EACCES || errno == EPERM) {
529                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
530                 }
531                 return LDB_ERR_OPERATIONS_ERROR;
532         }
533
534         return ldb_kv_init_store(
535             ldb_kv, "ldb_tdb backend", ldb, options, _module);
536 }