s4:ldb add support for permissive modify control
[metze/samba/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ret == LDB_SUCCESS &&
234             ldb_dn_is_special(dn) &&
235             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
236                 ret = ltdb_cache_reload(module);
237         }
238
239         return ret;
240 }
241
242 /*
243   store a record into the db
244 */
245 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
246 {
247         void *data = ldb_module_get_private(module);
248         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
249         TDB_DATA tdb_key, tdb_data;
250         int ret = LDB_SUCCESS;
251
252         tdb_key = ltdb_key(module, msg->dn);
253         if (tdb_key.dptr == NULL) {
254                 return LDB_ERR_OTHER;
255         }
256
257         ret = ltdb_pack_data(module, msg, &tdb_data);
258         if (ret == -1) {
259                 talloc_free(tdb_key.dptr);
260                 return LDB_ERR_OTHER;
261         }
262
263         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
264         if (ret == -1) {
265                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
266                 goto done;
267         }
268
269 done:
270         talloc_free(tdb_key.dptr);
271         talloc_free(tdb_data.dptr);
272
273         return ret;
274 }
275
276
277 static int ltdb_add_internal(struct ldb_module *module,
278                              const struct ldb_message *msg)
279 {
280         struct ldb_context *ldb = ldb_module_get_ctx(module);
281         int ret = LDB_SUCCESS, i;
282
283         ret = ltdb_check_special_dn(module, msg);
284         if (ret != LDB_SUCCESS) {
285                 return ret;
286         }
287
288         if (ltdb_cache_load(module) != 0) {
289                 return LDB_ERR_OPERATIONS_ERROR;
290         }
291
292         for (i=0;i<msg->num_elements;i++) {
293                 struct ldb_message_element *el = &msg->elements[i];
294                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
295
296                 if (el->num_values == 0) {
297                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
298                                                el->name, ldb_dn_get_linearized(msg->dn));
299                         return LDB_ERR_CONSTRAINT_VIOLATION;
300                 }
301                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
302                         if (el->num_values > 1) {
303                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
304                                                        el->name, ldb_dn_get_linearized(msg->dn));
305                                 return LDB_ERR_CONSTRAINT_VIOLATION;
306                         }
307                 }
308         }
309
310         ret = ltdb_store(module, msg, TDB_INSERT);
311         if (ret != LDB_SUCCESS) {
312                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
313                         ldb_asprintf_errstring(ldb,
314                                                "Entry %s already exists",
315                                                ldb_dn_get_linearized(msg->dn));
316                 }
317                 return ret;
318         }
319
320         ret = ltdb_index_add_new(module, msg);
321         if (ret != LDB_SUCCESS) {
322                 return ret;
323         }
324
325         ret = ltdb_modified(module, msg->dn);
326
327         return ret;
328 }
329
330 /*
331   add a record to the database
332 */
333 static int ltdb_add(struct ltdb_context *ctx)
334 {
335         struct ldb_module *module = ctx->module;
336         struct ldb_request *req = ctx->req;
337         int ret = LDB_SUCCESS;
338
339         ldb_request_set_state(req, LDB_ASYNC_PENDING);
340
341         if (ltdb_cache_load(module) != 0) {
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         ret = ltdb_add_internal(module, req->op.add.message);
346
347         return ret;
348 }
349
350 /*
351   delete a record from the database, not updating indexes (used for deleting
352   index records)
353 */
354 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
355 {
356         void *data = ldb_module_get_private(module);
357         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
358         TDB_DATA tdb_key;
359         int ret;
360
361         tdb_key = ltdb_key(module, dn);
362         if (!tdb_key.dptr) {
363                 return LDB_ERR_OTHER;
364         }
365
366         ret = tdb_delete(ltdb->tdb, tdb_key);
367         talloc_free(tdb_key.dptr);
368
369         if (ret != 0) {
370                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
371         }
372
373         return ret;
374 }
375
376 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
377 {
378         struct ldb_message *msg;
379         int ret = LDB_SUCCESS;
380
381         msg = talloc(module, struct ldb_message);
382         if (msg == NULL) {
383                 return LDB_ERR_OPERATIONS_ERROR;
384         }
385
386         /* in case any attribute of the message was indexed, we need
387            to fetch the old record */
388         ret = ltdb_search_dn1(module, dn, msg);
389         if (ret != LDB_SUCCESS) {
390                 /* not finding the old record is an error */
391                 goto done;
392         }
393
394         ret = ltdb_delete_noindex(module, dn);
395         if (ret != LDB_SUCCESS) {
396                 goto done;
397         }
398
399         /* remove any indexed attributes */
400         ret = ltdb_index_delete(module, msg);
401         if (ret != LDB_SUCCESS) {
402                 goto done;
403         }
404
405         ret = ltdb_modified(module, dn);
406         if (ret != LDB_SUCCESS) {
407                 goto done;
408         }
409
410 done:
411         talloc_free(msg);
412         return ret;
413 }
414
415 /*
416   delete a record from the database
417 */
418 static int ltdb_delete(struct ltdb_context *ctx)
419 {
420         struct ldb_module *module = ctx->module;
421         struct ldb_request *req = ctx->req;
422         int ret = LDB_SUCCESS;
423
424         ldb_request_set_state(req, LDB_ASYNC_PENDING);
425
426         if (ltdb_cache_load(module) != 0) {
427                 return LDB_ERR_OPERATIONS_ERROR;
428         }
429
430         ret = ltdb_delete_internal(module, req->op.del.dn);
431
432         return ret;
433 }
434
435 /*
436   find an element by attribute name. At the moment this does a linear search,
437   it should be re-coded to use a binary search once all places that modify
438   records guarantee sorted order
439
440   return the index of the first matching element if found, otherwise -1
441 */
442 static int find_element(const struct ldb_message *msg, const char *name)
443 {
444         unsigned int i;
445         for (i=0;i<msg->num_elements;i++) {
446                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
447                         return i;
448                 }
449         }
450         return -1;
451 }
452
453
454 /*
455   add an element to an existing record. Assumes a elements array that we
456   can call re-alloc on, and assumed that we can re-use the data pointers from
457   the passed in additional values. Use with care!
458
459   returns 0 on success, -1 on failure (and sets errno)
460 */
461 static int ltdb_msg_add_element(struct ldb_context *ldb,
462                                 struct ldb_message *msg,
463                                 struct ldb_message_element *el)
464 {
465         struct ldb_message_element *e2;
466         unsigned int i;
467
468         if (el->num_values == 0) {
469                 /* nothing to do here - we don't add empty elements */
470                 return 0;
471         }
472
473         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
474                               msg->num_elements+1);
475         if (!e2) {
476                 errno = ENOMEM;
477                 return -1;
478         }
479
480         msg->elements = e2;
481
482         e2 = &msg->elements[msg->num_elements];
483
484         e2->name = el->name;
485         e2->flags = el->flags;
486         e2->values = talloc_array(msg->elements,
487                                   struct ldb_val, el->num_values);
488         if (!e2->values) {
489                 errno = ENOMEM;
490                 return -1;
491         }
492         for (i=0;i<el->num_values;i++) {
493                 e2->values[i] = el->values[i];
494         }
495         e2->num_values = el->num_values;
496
497         ++msg->num_elements;
498
499         return 0;
500 }
501
502 /*
503   delete all elements having a specified attribute name
504 */
505 static int msg_delete_attribute(struct ldb_module *module,
506                                 struct ldb_context *ldb,
507                                 struct ldb_message *msg, const char *name)
508 {
509         unsigned int i;
510         int ret;
511         struct ldb_message_element *el;
512
513         el = ldb_msg_find_element(msg, name);
514         if (el == NULL) {
515                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
516         }
517         i = el - msg->elements;
518
519         ret = ltdb_index_del_element(module, msg->dn, el);
520         if (ret != LDB_SUCCESS) {
521                 return ret;
522         }
523
524         talloc_free(el->values);
525         if (msg->num_elements > (i+1)) {
526                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
527         }
528         msg->num_elements--;
529         msg->elements = talloc_realloc(msg, msg->elements,
530                                        struct ldb_message_element,
531                                        msg->num_elements);
532         return LDB_SUCCESS;
533 }
534
535 /*
536   delete all elements matching an attribute name/value
537
538   return LDB Error on failure
539 */
540 static int msg_delete_element(struct ldb_module *module,
541                               struct ldb_message *msg,
542                               const char *name,
543                               const struct ldb_val *val)
544 {
545         struct ldb_context *ldb = ldb_module_get_ctx(module);
546         unsigned int i;
547         int found, ret;
548         struct ldb_message_element *el;
549         const struct ldb_schema_attribute *a;
550
551         found = find_element(msg, name);
552         if (found == -1) {
553                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
554         }
555
556         el = &msg->elements[found];
557
558         a = ldb_schema_attribute_by_name(ldb, el->name);
559
560         for (i=0;i<el->num_values;i++) {
561                 if (a->syntax->comparison_fn(ldb, ldb,
562                                              &el->values[i], val) == 0) {
563                         if (el->num_values == 1) {
564                                 return msg_delete_attribute(module, ldb, msg, name);
565                         }
566
567                         ret = ltdb_index_del_value(module, msg->dn, el, i);
568                         if (ret != LDB_SUCCESS) {
569                                 return ret;
570                         }
571
572                         if (i<el->num_values-1) {
573                                 memmove(&el->values[i], &el->values[i+1],
574                                         sizeof(el->values[i])*
575                                                 (el->num_values-(i+1)));
576                         }
577                         el->num_values--;
578
579                         /* per definition we find in a canonicalised message an
580                            attribute value only once. So we are finished here */
581                         return LDB_SUCCESS;
582                 }
583         }
584
585         /* Not found */
586         return LDB_ERR_NO_SUCH_ATTRIBUTE;
587 }
588
589
590 /*
591   modify a record - internal interface
592
593   yuck - this is O(n^2). Luckily n is usually small so we probably
594   get away with it, but if we ever have really large attribute lists
595   then we'll need to look at this again
596
597   'req' is optional, and is used to specify controls if supplied
598 */
599 int ltdb_modify_internal(struct ldb_module *module,
600                          const struct ldb_message *msg,
601                          struct ldb_request *req)
602 {
603         struct ldb_context *ldb = ldb_module_get_ctx(module);
604         void *data = ldb_module_get_private(module);
605         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
606         TDB_DATA tdb_key, tdb_data;
607         struct ldb_message *msg2;
608         unsigned i, j, k;
609         int ret = LDB_SUCCESS, idx;
610         struct ldb_control *control_permissive = NULL;
611
612         if (req) {
613                 control_permissive = ldb_request_get_control(req,
614                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
615         }
616
617         tdb_key = ltdb_key(module, msg->dn);
618         if (!tdb_key.dptr) {
619                 return LDB_ERR_OTHER;
620         }
621
622         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
623         if (!tdb_data.dptr) {
624                 talloc_free(tdb_key.dptr);
625                 return ltdb_err_map(tdb_error(ltdb->tdb));
626         }
627
628         msg2 = talloc(tdb_key.dptr, struct ldb_message);
629         if (msg2 == NULL) {
630                 free(tdb_data.dptr);
631                 ret = LDB_ERR_OTHER;
632                 goto done;
633         }
634
635         ret = ltdb_unpack_data(module, &tdb_data, msg2);
636         free(tdb_data.dptr);
637         if (ret == -1) {
638                 ret = LDB_ERR_OTHER;
639                 goto done;
640         }
641
642         if (!msg2->dn) {
643                 msg2->dn = msg->dn;
644         }
645
646         for (i=0; i<msg->num_elements; i++) {
647                 struct ldb_message_element *el = &msg->elements[i], *el2;
648                 struct ldb_val *vals;
649                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
650                 const char *dn;
651
652                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
653                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
654                                                ldb_dn_get_linearized(msg2->dn));
655                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
656                         goto done;
657                 }
658
659                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
660                 case LDB_FLAG_MOD_ADD:
661
662                         /* make a copy of the array so that a permissive
663                          * control can remove duplicates without changing the
664                          * original values, but do not copy data as we do not
665                          * need to keep it around once the operation is
666                          * finished */
667                         if (control_permissive) {
668                                 el = talloc(msg2, struct ldb_message_element);
669                                 if (!el) {
670                                         ret = LDB_ERR_OTHER;
671                                         goto done;
672                                 }
673                                 el->name = msg->elements[i].name;
674                                 el->num_values = msg->elements[i].num_values;
675                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
676                                 for (j = 0; j < el->num_values; j++) {
677                                         el->values[j] = msg->elements[i].values[j];
678                                 }
679
680                                 if (el->num_values == 0) {
681                                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
682                                                                el->name, ldb_dn_get_linearized(msg2->dn));
683                                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
684                                         goto done;
685                                 }
686                         }
687
688                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
689                                 if (el->num_values > 1) {
690                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
691                                                                el->name, ldb_dn_get_linearized(msg2->dn));
692                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
693                                         goto done;
694                                 }
695                         }
696
697                         /* Checks if element already exists */
698                         idx = find_element(msg2, el->name);
699                         if (idx == -1) {
700                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
701                                         ret = LDB_ERR_OTHER;
702                                         goto done;
703                                 }
704                                 ret = ltdb_index_add_element(module, msg2->dn, el);
705                                 if (ret != LDB_SUCCESS) {
706                                         goto done;
707                                 }
708                         } else {
709                                 /* We cannot add another value on a existing one
710                                    if the attribute is single-valued */
711                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
712                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
713                                                                el->name, ldb_dn_get_linearized(msg2->dn));
714                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
715                                         goto done;
716                                 }
717
718                                 el2 = &(msg2->elements[idx]);
719
720                                 /* Check that values don't exist yet on multi-
721                                    valued attributes or aren't provided twice */
722                                 for (j = 0; j < el->num_values; j++) {
723                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
724                                                 if (control_permissive) {
725                                                         /* remove this one as if it was never added */
726                                                         el->num_values--;
727                                                         for (k = j; k < el->num_values; k++) {
728                                                                 el->values[k] = el->values[k + 1];
729                                                         }
730                                                         j--; /* rewind */
731
732                                                         continue;
733                                                 }
734
735                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
736                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
737                                                 goto done;
738                                         }
739                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
740                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
741                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
742                                                 goto done;
743                                         }
744                                 }
745
746                                 /* Now combine existing and new values to a new
747                                    attribute record */
748                                 vals = talloc_realloc(msg2->elements,
749                                                       el2->values, struct ldb_val,
750                                                       el2->num_values + el->num_values);
751                                 if (vals == NULL) {
752                                         ldb_oom(ldb);
753                                         ret = LDB_ERR_OTHER;
754                                         goto done;
755                                 }
756
757                                 for (j=0; j<el->num_values; j++) {
758                                         vals[el2->num_values + j] =
759                                                 ldb_val_dup(vals, &el->values[j]);
760                                 }
761
762                                 el2->values = vals;
763                                 el2->num_values += el->num_values;
764
765                                 ret = ltdb_index_add_element(module, msg2->dn, el);
766                                 if (ret != LDB_SUCCESS) {
767                                         goto done;
768                                 }
769                         }
770
771                         break;
772
773                 case LDB_FLAG_MOD_REPLACE:
774
775                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
776                                 /* the RELAX control overrides this
777                                    check for replace. This is needed as
778                                    DRS replication can produce multiple
779                                    values here for a single valued
780                                    attribute when the values are deleted
781                                    links
782                                 */
783                                 if (el->num_values > 1 &&
784                                     (!req || !ldb_request_get_control(req, LDB_CONTROL_RELAX_OID))) {
785                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
786                                                                el->name, ldb_dn_get_linearized(msg2->dn));
787                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
788                                         goto done;
789                                 }
790                         }
791
792                         /* TODO: This is O(n^2) - replace with more efficient check */
793                         for (j=0; j<el->num_values; j++) {
794                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
795                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
796                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
797                                         goto done;
798                                 }
799                         }
800
801                         idx = find_element(msg2, el->name);
802                         if (idx != -1) {
803                                 el2 = &(msg2->elements[idx]);
804                                 if (ldb_msg_element_compare(el, el2) == 0) {
805                                         /* we are replacing with the same values */
806                                         continue;
807                                 }
808                         
809                                 /* Delete the attribute if it exists in the DB */
810                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
811                                         ret = LDB_ERR_OTHER;
812                                         goto done;
813                                 }
814                         }
815
816                         /* Recreate it with the new values */
817                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
818                                 ret = LDB_ERR_OTHER;
819                                 goto done;
820                         }
821
822                         ret = ltdb_index_add_element(module, msg2->dn, el);
823                         if (ret != LDB_SUCCESS) {
824                                 goto done;
825                         }
826
827                         break;
828
829                 case LDB_FLAG_MOD_DELETE:
830                         dn = ldb_dn_get_linearized(msg2->dn);
831                         if (dn == NULL) {
832                                 ret = LDB_ERR_OTHER;
833                                 goto done;
834                         }
835
836                         if (msg->elements[i].num_values == 0) {
837                                 /* Delete the whole attribute */
838                                 ret = msg_delete_attribute(module, ldb, msg2,
839                                                            msg->elements[i].name);
840                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
841                                     control_permissive) {
842                                         ret = LDB_SUCCESS;
843                                 } else {
844                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
845                                                                msg->elements[i].name, dn);
846                                 }
847                                 if (ret != LDB_SUCCESS) {
848                                         goto done;
849                                 }
850                         } else {
851                                 /* Delete specified values from an attribute */
852                                 for (j=0; j < msg->elements[i].num_values; j++) {
853                                         ret = msg_delete_element(module,
854                                                                  msg2,
855                                                                  msg->elements[i].name,
856                                                                  &msg->elements[i].values[j]);
857                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
858                                             control_permissive) {
859                                                 ret = LDB_SUCCESS;
860                                         } else {
861                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
862                                                                        msg->elements[i].name, dn);
863                                         }
864                                         if (ret != LDB_SUCCESS) {
865                                                 goto done;
866                                         }
867                                 }
868                         }
869                         break;
870                 default:
871                         ldb_asprintf_errstring(ldb,
872                                 "Invalid ldb_modify flags on %s: 0x%x",
873                                 msg->elements[i].name,
874                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
875                         ret = LDB_ERR_PROTOCOL_ERROR;
876                         goto done;
877                 }
878         }
879
880         ret = ltdb_store(module, msg2, TDB_MODIFY);
881         if (ret != LDB_SUCCESS) {
882                 goto done;
883         }
884
885         ret = ltdb_modified(module, msg2->dn);
886         if (ret != LDB_SUCCESS) {
887                 goto done;
888         }
889
890 done:
891         talloc_free(tdb_key.dptr);
892         return ret;
893 }
894
895 /*
896   modify a record
897 */
898 static int ltdb_modify(struct ltdb_context *ctx)
899 {
900         struct ldb_module *module = ctx->module;
901         struct ldb_request *req = ctx->req;
902         int ret = LDB_SUCCESS;
903
904         ret = ltdb_check_special_dn(module, req->op.mod.message);
905         if (ret != LDB_SUCCESS) {
906                 return ret;
907         }
908
909         ldb_request_set_state(req, LDB_ASYNC_PENDING);
910
911         if (ltdb_cache_load(module) != 0) {
912                 return LDB_ERR_OPERATIONS_ERROR;
913         }
914
915         ret = ltdb_modify_internal(module, req->op.mod.message, req);
916
917         return ret;
918 }
919
920 /*
921   rename a record
922 */
923 static int ltdb_rename(struct ltdb_context *ctx)
924 {
925         struct ldb_module *module = ctx->module;
926         struct ldb_request *req = ctx->req;
927         struct ldb_message *msg;
928         int ret = LDB_SUCCESS;
929
930         ldb_request_set_state(req, LDB_ASYNC_PENDING);
931
932         if (ltdb_cache_load(ctx->module) != 0) {
933                 return LDB_ERR_OPERATIONS_ERROR;
934         }
935
936         msg = talloc(ctx, struct ldb_message);
937         if (msg == NULL) {
938                 return LDB_ERR_OPERATIONS_ERROR;
939         }
940
941         /* in case any attribute of the message was indexed, we need
942            to fetch the old record */
943         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
944         if (ret != LDB_SUCCESS) {
945                 /* not finding the old record is an error */
946                 return ret;
947         }
948
949         /* Always delete first then add, to avoid conflicts with
950          * unique indexes. We rely on the transaction to make this
951          * atomic
952          */
953         ret = ltdb_delete_internal(module, msg->dn);
954         if (ret != LDB_SUCCESS) {
955                 return ret;
956         }
957
958         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
959         if (msg->dn == NULL) {
960                 return LDB_ERR_OPERATIONS_ERROR;
961         }
962
963         ret = ltdb_add_internal(module, msg);
964
965         return ret;
966 }
967
968 static int ltdb_start_trans(struct ldb_module *module)
969 {
970         void *data = ldb_module_get_private(module);
971         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
972
973         if (tdb_transaction_start(ltdb->tdb) != 0) {
974                 return ltdb_err_map(tdb_error(ltdb->tdb));
975         }
976
977         ltdb->in_transaction++;
978
979         ltdb_index_transaction_start(module);
980
981         return LDB_SUCCESS;
982 }
983
984 static int ltdb_prepare_commit(struct ldb_module *module)
985 {
986         void *data = ldb_module_get_private(module);
987         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
988
989         if (ltdb->in_transaction != 1) {
990                 return LDB_SUCCESS;
991         }
992
993         if (ltdb_index_transaction_commit(module) != 0) {
994                 tdb_transaction_cancel(ltdb->tdb);
995                 ltdb->in_transaction--;
996                 return ltdb_err_map(tdb_error(ltdb->tdb));
997         }
998
999         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1000                 ltdb->in_transaction--;
1001                 return ltdb_err_map(tdb_error(ltdb->tdb));
1002         }
1003
1004         ltdb->prepared_commit = true;
1005
1006         return LDB_SUCCESS;
1007 }
1008
1009 static int ltdb_end_trans(struct ldb_module *module)
1010 {
1011         void *data = ldb_module_get_private(module);
1012         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1013
1014         if (!ltdb->prepared_commit) {
1015                 int ret = ltdb_prepare_commit(module);
1016                 if (ret != LDB_SUCCESS) {
1017                         return ret;
1018                 }
1019         }
1020
1021         ltdb->in_transaction--;
1022         ltdb->prepared_commit = false;
1023
1024         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1025                 return ltdb_err_map(tdb_error(ltdb->tdb));
1026         }
1027
1028         return LDB_SUCCESS;
1029 }
1030
1031 static int ltdb_del_trans(struct ldb_module *module)
1032 {
1033         void *data = ldb_module_get_private(module);
1034         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1035
1036         ltdb->in_transaction--;
1037
1038         if (ltdb_index_transaction_cancel(module) != 0) {
1039                 tdb_transaction_cancel(ltdb->tdb);
1040                 return ltdb_err_map(tdb_error(ltdb->tdb));
1041         }
1042
1043         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
1044                 return ltdb_err_map(tdb_error(ltdb->tdb));
1045         }
1046
1047         return LDB_SUCCESS;
1048 }
1049
1050 /*
1051   return sequenceNumber from @BASEINFO
1052 */
1053 static int ltdb_sequence_number(struct ltdb_context *ctx,
1054                                 struct ldb_extended **ext)
1055 {
1056         struct ldb_context *ldb;
1057         struct ldb_module *module = ctx->module;
1058         struct ldb_request *req = ctx->req;
1059         TALLOC_CTX *tmp_ctx;
1060         struct ldb_seqnum_request *seq;
1061         struct ldb_seqnum_result *res;
1062         struct ldb_message *msg = NULL;
1063         struct ldb_dn *dn;
1064         const char *date;
1065         int ret = LDB_SUCCESS;
1066
1067         ldb = ldb_module_get_ctx(module);
1068
1069         seq = talloc_get_type(req->op.extended.data,
1070                                 struct ldb_seqnum_request);
1071         if (seq == NULL) {
1072                 return LDB_ERR_OPERATIONS_ERROR;
1073         }
1074
1075         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1076
1077         if (ltdb_lock_read(module) != 0) {
1078                 return LDB_ERR_OPERATIONS_ERROR;
1079         }
1080
1081         res = talloc_zero(req, struct ldb_seqnum_result);
1082         if (res == NULL) {
1083                 ret = LDB_ERR_OPERATIONS_ERROR;
1084                 goto done;
1085         }
1086         tmp_ctx = talloc_new(req);
1087         if (tmp_ctx == NULL) {
1088                 ret = LDB_ERR_OPERATIONS_ERROR;
1089                 goto done;
1090         }
1091
1092         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1093
1094         msg = talloc(tmp_ctx, struct ldb_message);
1095         if (msg == NULL) {
1096                 ret = LDB_ERR_OPERATIONS_ERROR;
1097                 goto done;
1098         }
1099
1100         ret = ltdb_search_dn1(module, dn, msg);
1101         if (ret != LDB_SUCCESS) {
1102                 goto done;
1103         }
1104
1105         switch (seq->type) {
1106         case LDB_SEQ_HIGHEST_SEQ:
1107                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1108                 break;
1109         case LDB_SEQ_NEXT:
1110                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1111                 res->seq_num++;
1112                 break;
1113         case LDB_SEQ_HIGHEST_TIMESTAMP:
1114                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1115                 if (date) {
1116                         res->seq_num = ldb_string_to_time(date);
1117                 } else {
1118                         res->seq_num = 0;
1119                         /* zero is as good as anything when we don't know */
1120                 }
1121                 break;
1122         }
1123
1124         *ext = talloc_zero(req, struct ldb_extended);
1125         if (*ext == NULL) {
1126                 ret = LDB_ERR_OPERATIONS_ERROR;
1127                 goto done;
1128         }
1129         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1130         (*ext)->data = talloc_steal(*ext, res);
1131
1132 done:
1133         talloc_free(tmp_ctx);
1134         ltdb_unlock_read(module);
1135         return ret;
1136 }
1137
1138 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1139 {
1140         struct ldb_context *ldb;
1141         struct ldb_request *req;
1142         struct ldb_reply *ares;
1143
1144         ldb = ldb_module_get_ctx(ctx->module);
1145         req = ctx->req;
1146
1147         /* if we already returned an error just return */
1148         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1149                 return;
1150         }
1151
1152         ares = talloc_zero(req, struct ldb_reply);
1153         if (!ares) {
1154                 ldb_oom(ldb);
1155                 req->callback(req, NULL);
1156                 return;
1157         }
1158         ares->type = LDB_REPLY_DONE;
1159         ares->error = error;
1160
1161         req->callback(req, ares);
1162 }
1163
1164 static void ltdb_timeout(struct tevent_context *ev,
1165                           struct tevent_timer *te,
1166                           struct timeval t,
1167                           void *private_data)
1168 {
1169         struct ltdb_context *ctx;
1170         ctx = talloc_get_type(private_data, struct ltdb_context);
1171
1172         if (!ctx->request_terminated) {
1173                 /* request is done now */
1174                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1175         }
1176
1177         if (!ctx->request_terminated) {
1178                 /* neutralize the spy */
1179                 ctx->spy->ctx = NULL;
1180         }
1181         talloc_free(ctx);
1182 }
1183
1184 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1185                                         struct ldb_extended *ext,
1186                                         int error)
1187 {
1188         struct ldb_context *ldb;
1189         struct ldb_request *req;
1190         struct ldb_reply *ares;
1191
1192         ldb = ldb_module_get_ctx(ctx->module);
1193         req = ctx->req;
1194
1195         /* if we already returned an error just return */
1196         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1197                 return;
1198         }
1199
1200         ares = talloc_zero(req, struct ldb_reply);
1201         if (!ares) {
1202                 ldb_oom(ldb);
1203                 req->callback(req, NULL);
1204                 return;
1205         }
1206         ares->type = LDB_REPLY_DONE;
1207         ares->response = ext;
1208         ares->error = error;
1209
1210         req->callback(req, ares);
1211 }
1212
1213 static void ltdb_handle_extended(struct ltdb_context *ctx)
1214 {
1215         struct ldb_extended *ext = NULL;
1216         int ret;
1217
1218         if (strcmp(ctx->req->op.extended.oid,
1219                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1220                 /* get sequence number */
1221                 ret = ltdb_sequence_number(ctx, &ext);
1222         } else {
1223                 /* not recognized */
1224                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1225         }
1226
1227         ltdb_request_extended_done(ctx, ext, ret);
1228 }
1229
1230 static void ltdb_callback(struct tevent_context *ev,
1231                           struct tevent_timer *te,
1232                           struct timeval t,
1233                           void *private_data)
1234 {
1235         struct ltdb_context *ctx;
1236         int ret;
1237
1238         ctx = talloc_get_type(private_data, struct ltdb_context);
1239
1240         if (ctx->request_terminated) {
1241                 goto done;
1242         }
1243
1244         switch (ctx->req->operation) {
1245         case LDB_SEARCH:
1246                 ret = ltdb_search(ctx);
1247                 break;
1248         case LDB_ADD:
1249                 ret = ltdb_add(ctx);
1250                 break;
1251         case LDB_MODIFY:
1252                 ret = ltdb_modify(ctx);
1253                 break;
1254         case LDB_DELETE:
1255                 ret = ltdb_delete(ctx);
1256                 break;
1257         case LDB_RENAME:
1258                 ret = ltdb_rename(ctx);
1259                 break;
1260         case LDB_EXTENDED:
1261                 ltdb_handle_extended(ctx);
1262                 goto done;
1263         default:
1264                 /* no other op supported */
1265                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1266         }
1267
1268         if (!ctx->request_terminated) {
1269                 /* request is done now */
1270                 ltdb_request_done(ctx, ret);
1271         }
1272
1273 done:
1274         if (!ctx->request_terminated) {
1275                 /* neutralize the spy */
1276                 ctx->spy->ctx = NULL;
1277         }
1278         talloc_free(ctx);
1279 }
1280
1281 static int ltdb_request_destructor(void *ptr)
1282 {
1283         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1284
1285         if (spy->ctx != NULL) {
1286                 spy->ctx->request_terminated = true;
1287         }
1288
1289         return 0;
1290 }
1291
1292 static int ltdb_handle_request(struct ldb_module *module,
1293                                struct ldb_request *req)
1294 {
1295         struct ldb_control *control_permissive;
1296         struct ldb_context *ldb;
1297         struct tevent_context *ev;
1298         struct ltdb_context *ac;
1299         struct tevent_timer *te;
1300         struct timeval tv;
1301         int i;
1302
1303         ldb = ldb_module_get_ctx(module);
1304
1305         control_permissive = ldb_request_get_control(req,
1306                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1307
1308         for (i = 0; req->controls && req->controls[i]; i++) {
1309                 if (req->controls[i]->critical &&
1310                     req->controls[i] != control_permissive) {
1311                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1312                                                req->controls[i]->oid);
1313                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1314                 }
1315         }
1316
1317         if (req->starttime == 0 || req->timeout == 0) {
1318                 ldb_set_errstring(ldb, "Invalid timeout settings");
1319                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1320         }
1321
1322         ev = ldb_get_event_context(ldb);
1323
1324         ac = talloc_zero(ldb, struct ltdb_context);
1325         if (ac == NULL) {
1326                 ldb_oom(ldb);
1327                 return LDB_ERR_OPERATIONS_ERROR;
1328         }
1329
1330         ac->module = module;
1331         ac->req = req;
1332
1333         tv.tv_sec = 0;
1334         tv.tv_usec = 0;
1335         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1336         if (NULL == te) {
1337                 talloc_free(ac);
1338                 return LDB_ERR_OPERATIONS_ERROR;
1339         }
1340
1341         tv.tv_sec = req->starttime + req->timeout;
1342         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1343         if (NULL == ac->timeout_event) {
1344                 talloc_free(ac);
1345                 return LDB_ERR_OPERATIONS_ERROR;
1346         }
1347
1348         /* set a spy so that we do not try to use the request context
1349          * if it is freed before ltdb_callback fires */
1350         ac->spy = talloc(req, struct ltdb_req_spy);
1351         if (NULL == ac->spy) {
1352                 talloc_free(ac);
1353                 return LDB_ERR_OPERATIONS_ERROR;
1354         }
1355         ac->spy->ctx = ac;
1356
1357         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1358
1359         return LDB_SUCCESS;
1360 }
1361
1362 static int ltdb_init_rootdse(struct ldb_module *module)
1363 {
1364         struct ldb_context *ldb;
1365         int ret;
1366
1367         ldb = ldb_module_get_ctx(module);
1368
1369         ret = ldb_mod_register_control(module,
1370                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1371         if (ret != LDB_SUCCESS) {
1372                 ldb_debug(ldb, LDB_DEBUG_WARNING, "ldb_tdb: "
1373                           "Unable to register control with rootdse!");
1374         }
1375
1376         /* there can be no module beyond the backend, just return */
1377         return LDB_SUCCESS;
1378 }
1379
1380 static const struct ldb_module_ops ltdb_ops = {
1381         .name              = "tdb",
1382         .init_context      = ltdb_init_rootdse,
1383         .search            = ltdb_handle_request,
1384         .add               = ltdb_handle_request,
1385         .modify            = ltdb_handle_request,
1386         .del               = ltdb_handle_request,
1387         .rename            = ltdb_handle_request,
1388         .extended          = ltdb_handle_request,
1389         .start_transaction = ltdb_start_trans,
1390         .end_transaction   = ltdb_end_trans,
1391         .prepare_commit    = ltdb_prepare_commit,
1392         .del_transaction   = ltdb_del_trans,
1393 };
1394
1395 /*
1396   connect to the database
1397 */
1398 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1399                         unsigned int flags, const char *options[],
1400                         struct ldb_module **_module)
1401 {
1402         struct ldb_module *module;
1403         const char *path;
1404         int tdb_flags, open_flags;
1405         struct ltdb_private *ltdb;
1406
1407         /* parse the url */
1408         if (strchr(url, ':')) {
1409                 if (strncmp(url, "tdb://", 6) != 0) {
1410                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1411                                   "Invalid tdb URL '%s'", url);
1412                         return LDB_ERR_OPERATIONS_ERROR;
1413                 }
1414                 path = url+6;
1415         } else {
1416                 path = url;
1417         }
1418
1419         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1420
1421         /* check for the 'nosync' option */
1422         if (flags & LDB_FLG_NOSYNC) {
1423                 tdb_flags |= TDB_NOSYNC;
1424         }
1425
1426         /* and nommap option */
1427         if (flags & LDB_FLG_NOMMAP) {
1428                 tdb_flags |= TDB_NOMMAP;
1429         }
1430
1431         if (flags & LDB_FLG_RDONLY) {
1432                 open_flags = O_RDONLY;
1433         } else {
1434                 open_flags = O_CREAT | O_RDWR;
1435         }
1436
1437         ltdb = talloc_zero(ldb, struct ltdb_private);
1438         if (!ltdb) {
1439                 ldb_oom(ldb);
1440                 return LDB_ERR_OPERATIONS_ERROR;
1441         }
1442
1443         /* note that we use quite a large default hash size */
1444         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1445                                    tdb_flags, open_flags,
1446                                    ldb_get_create_perms(ldb), ldb);
1447         if (!ltdb->tdb) {
1448                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1449                           "Unable to open tdb '%s'", path);
1450                 talloc_free(ltdb);
1451                 return LDB_ERR_OPERATIONS_ERROR;
1452         }
1453
1454         ltdb->sequence_number = 0;
1455
1456         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1457         if (!module) {
1458                 talloc_free(ltdb);
1459                 return LDB_ERR_OPERATIONS_ERROR;
1460         }
1461         ldb_module_set_private(module, ltdb);
1462         talloc_steal(module, ltdb);
1463
1464         if (ltdb_cache_load(module) != 0) {
1465                 talloc_free(module);
1466                 talloc_free(ltdb);
1467                 return LDB_ERR_OPERATIONS_ERROR;
1468         }
1469
1470         *_module = module;
1471         return LDB_SUCCESS;
1472 }
1473
1474 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1475         .name = "tdb",
1476         .connect_fn = ltdb_connect,
1477 };