s4-ldb: added LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK
[metze/samba/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         default:
81                 break;
82         }
83         return LDB_ERR_OTHER;
84 }
85
86 /*
87   lock the database for read - use by ltdb_search and ltdb_sequence_number
88 */
89 int ltdb_lock_read(struct ldb_module *module)
90 {
91         void *data = ldb_module_get_private(module);
92         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
93         int ret = 0;
94
95         if (ltdb->in_transaction == 0 &&
96             ltdb->read_lock_count == 0) {
97                 ret = tdb_lockall_read(ltdb->tdb);
98         }
99         if (ret == 0) {
100                 ltdb->read_lock_count++;
101         }
102         return ret;
103 }
104
105 /*
106   unlock the database after a ltdb_lock_read()
107 */
108 int ltdb_unlock_read(struct ldb_module *module)
109 {
110         void *data = ldb_module_get_private(module);
111         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
112         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
113                 return tdb_unlockall_read(ltdb->tdb);
114         }
115         ltdb->read_lock_count--;
116         return 0;
117 }
118
119
120 /*
121   form a TDB_DATA for a record key
122   caller frees
123
124   note that the key for a record can depend on whether the
125   dn refers to a case sensitive index record or not
126 */
127 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
128 {
129         struct ldb_context *ldb = ldb_module_get_ctx(module);
130         TDB_DATA key;
131         char *key_str = NULL;
132         const char *dn_folded = NULL;
133
134         /*
135           most DNs are case insensitive. The exception is index DNs for
136           case sensitive attributes
137
138           there are 3 cases dealt with in this code:
139
140           1) if the dn doesn't start with @ then uppercase the attribute
141              names and the attributes values of case insensitive attributes
142           2) if the dn starts with @ then leave it alone -
143              the indexing code handles the rest
144         */
145
146         dn_folded = ldb_dn_get_casefold(dn);
147         if (!dn_folded) {
148                 goto failed;
149         }
150
151         key_str = talloc_strdup(ldb, "DN=");
152         if (!key_str) {
153                 goto failed;
154         }
155
156         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
157         if (!key_str) {
158                 goto failed;
159         }
160
161         key.dptr = (uint8_t *)key_str;
162         key.dsize = strlen(key_str) + 1;
163
164         return key;
165
166 failed:
167         errno = ENOMEM;
168         key.dptr = NULL;
169         key.dsize = 0;
170         return key;
171 }
172
173 /*
174   check special dn's have valid attributes
175   currently only @ATTRIBUTES is checked
176 */
177 static int ltdb_check_special_dn(struct ldb_module *module,
178                                  const struct ldb_message *msg)
179 {
180         struct ldb_context *ldb = ldb_module_get_ctx(module);
181         unsigned int i, j;
182
183         if (! ldb_dn_is_special(msg->dn) ||
184             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
185                 return LDB_SUCCESS;
186         }
187
188         /* we have @ATTRIBUTES, let's check attributes are fine */
189         /* should we check that we deny multivalued attributes ? */
190         for (i = 0; i < msg->num_elements; i++) {
191                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
192
193                 for (j = 0; j < msg->elements[i].num_values; j++) {
194                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
195                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
196                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
197                         }
198                 }
199         }
200
201         return LDB_SUCCESS;
202 }
203
204
205 /*
206   we've made a modification to a dn - possibly reindex and
207   update sequence number
208 */
209 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
210 {
211         int ret = LDB_SUCCESS;
212         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
213
214         /* only allow modifies inside a transaction, otherwise the
215          * ldb is unsafe */
216         if (ltdb->in_transaction == 0) {
217                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
218                 return LDB_ERR_OPERATIONS_ERROR;
219         }
220
221         if (ldb_dn_is_special(dn) &&
222             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
223              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
224                 ret = ltdb_reindex(module);
225         }
226
227         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
228         if (ret == LDB_SUCCESS &&
229             !(ldb_dn_is_special(dn) &&
230               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
231                 ret = ltdb_increase_sequence_number(module);
232         }
233
234         /* If the modify was to @OPTIONS, reload the cache */
235         if (ret == LDB_SUCCESS &&
236             ldb_dn_is_special(dn) &&
237             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
238                 ret = ltdb_cache_reload(module);
239         }
240
241         return ret;
242 }
243
244 /*
245   store a record into the db
246 */
247 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
248 {
249         void *data = ldb_module_get_private(module);
250         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
251         TDB_DATA tdb_key, tdb_data;
252         int ret = LDB_SUCCESS;
253
254         tdb_key = ltdb_key(module, msg->dn);
255         if (tdb_key.dptr == NULL) {
256                 return LDB_ERR_OTHER;
257         }
258
259         ret = ltdb_pack_data(module, msg, &tdb_data);
260         if (ret == -1) {
261                 talloc_free(tdb_key.dptr);
262                 return LDB_ERR_OTHER;
263         }
264
265         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
266         if (ret == -1) {
267                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
268                 goto done;
269         }
270
271 done:
272         talloc_free(tdb_key.dptr);
273         talloc_free(tdb_data.dptr);
274
275         return ret;
276 }
277
278
279 /*
280   check if a attribute is a single valued, for a given element
281  */
282 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
283                                   struct ldb_message_element *el)
284 {
285         if (!a) return false;
286         if (el != NULL) {
287                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
288                         /* override from a ldb module, for example
289                            used for the description field, which is
290                            marked multi-valued in the schema but which
291                            should not actually accept multiple
292                            values */
293                         return true;
294                 }
295                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
296                         /* override from a ldb module, for example used for
297                            deleted linked attribute entries */
298                         return false;
299                 }
300         }
301         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
302                 return true;
303         }
304         return false;
305 }
306
307 static int ltdb_add_internal(struct ldb_module *module,
308                              const struct ldb_message *msg)
309 {
310         struct ldb_context *ldb = ldb_module_get_ctx(module);
311         int ret = LDB_SUCCESS;
312         unsigned int i;
313
314         for (i=0;i<msg->num_elements;i++) {
315                 struct ldb_message_element *el = &msg->elements[i];
316                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
317
318                 if (el->num_values == 0) {
319                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
320                                                el->name, ldb_dn_get_linearized(msg->dn));
321                         return LDB_ERR_CONSTRAINT_VIOLATION;
322                 }
323                 if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
324                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
325                                                el->name, ldb_dn_get_linearized(msg->dn));
326                         return LDB_ERR_CONSTRAINT_VIOLATION;
327                 }
328         }
329
330         ret = ltdb_store(module, msg, TDB_INSERT);
331         if (ret != LDB_SUCCESS) {
332                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
333                         ldb_asprintf_errstring(ldb,
334                                                "Entry %s already exists",
335                                                ldb_dn_get_linearized(msg->dn));
336                 }
337                 return ret;
338         }
339
340         ret = ltdb_index_add_new(module, msg);
341         if (ret != LDB_SUCCESS) {
342                 return ret;
343         }
344
345         ret = ltdb_modified(module, msg->dn);
346
347         return ret;
348 }
349
350 /*
351   add a record to the database
352 */
353 static int ltdb_add(struct ltdb_context *ctx)
354 {
355         struct ldb_module *module = ctx->module;
356         struct ldb_request *req = ctx->req;
357         int ret = LDB_SUCCESS;
358
359         ret = ltdb_check_special_dn(module, req->op.add.message);
360         if (ret != LDB_SUCCESS) {
361                 return ret;
362         }
363
364         ldb_request_set_state(req, LDB_ASYNC_PENDING);
365
366         if (ltdb_cache_load(module) != 0) {
367                 return LDB_ERR_OPERATIONS_ERROR;
368         }
369
370         ret = ltdb_add_internal(module, req->op.add.message);
371
372         return ret;
373 }
374
375 /*
376   delete a record from the database, not updating indexes (used for deleting
377   index records)
378 */
379 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
380 {
381         void *data = ldb_module_get_private(module);
382         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
383         TDB_DATA tdb_key;
384         int ret;
385
386         tdb_key = ltdb_key(module, dn);
387         if (!tdb_key.dptr) {
388                 return LDB_ERR_OTHER;
389         }
390
391         ret = tdb_delete(ltdb->tdb, tdb_key);
392         talloc_free(tdb_key.dptr);
393
394         if (ret != 0) {
395                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
396         }
397
398         return ret;
399 }
400
401 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
402 {
403         struct ldb_message *msg;
404         int ret = LDB_SUCCESS;
405
406         msg = ldb_msg_new(module);
407         if (msg == NULL) {
408                 return LDB_ERR_OPERATIONS_ERROR;
409         }
410
411         /* in case any attribute of the message was indexed, we need
412            to fetch the old record */
413         ret = ltdb_search_dn1(module, dn, msg);
414         if (ret != LDB_SUCCESS) {
415                 /* not finding the old record is an error */
416                 goto done;
417         }
418
419         ret = ltdb_delete_noindex(module, dn);
420         if (ret != LDB_SUCCESS) {
421                 goto done;
422         }
423
424         /* remove any indexed attributes */
425         ret = ltdb_index_delete(module, msg);
426         if (ret != LDB_SUCCESS) {
427                 goto done;
428         }
429
430         ret = ltdb_modified(module, dn);
431         if (ret != LDB_SUCCESS) {
432                 goto done;
433         }
434
435 done:
436         talloc_free(msg);
437         return ret;
438 }
439
440 /*
441   delete a record from the database
442 */
443 static int ltdb_delete(struct ltdb_context *ctx)
444 {
445         struct ldb_module *module = ctx->module;
446         struct ldb_request *req = ctx->req;
447         int ret = LDB_SUCCESS;
448
449         ldb_request_set_state(req, LDB_ASYNC_PENDING);
450
451         if (ltdb_cache_load(module) != 0) {
452                 return LDB_ERR_OPERATIONS_ERROR;
453         }
454
455         ret = ltdb_delete_internal(module, req->op.del.dn);
456
457         return ret;
458 }
459
460 /*
461   find an element by attribute name. At the moment this does a linear search,
462   it should be re-coded to use a binary search once all places that modify
463   records guarantee sorted order
464
465   return the index of the first matching element if found, otherwise -1
466 */
467 static int find_element(const struct ldb_message *msg, const char *name)
468 {
469         unsigned int i;
470         for (i=0;i<msg->num_elements;i++) {
471                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
472                         return i;
473                 }
474         }
475         return -1;
476 }
477
478
479 /*
480   add an element to an existing record. Assumes a elements array that we
481   can call re-alloc on, and assumed that we can re-use the data pointers from
482   the passed in additional values. Use with care!
483
484   returns 0 on success, -1 on failure (and sets errno)
485 */
486 static int ltdb_msg_add_element(struct ldb_context *ldb,
487                                 struct ldb_message *msg,
488                                 struct ldb_message_element *el)
489 {
490         struct ldb_message_element *e2;
491         unsigned int i;
492
493         if (el->num_values == 0) {
494                 /* nothing to do here - we don't add empty elements */
495                 return 0;
496         }
497
498         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
499                               msg->num_elements+1);
500         if (!e2) {
501                 errno = ENOMEM;
502                 return -1;
503         }
504
505         msg->elements = e2;
506
507         e2 = &msg->elements[msg->num_elements];
508
509         e2->name = el->name;
510         e2->flags = el->flags;
511         e2->values = talloc_array(msg->elements,
512                                   struct ldb_val, el->num_values);
513         if (!e2->values) {
514                 errno = ENOMEM;
515                 return -1;
516         }
517         for (i=0;i<el->num_values;i++) {
518                 e2->values[i] = el->values[i];
519         }
520         e2->num_values = el->num_values;
521
522         ++msg->num_elements;
523
524         return 0;
525 }
526
527 /*
528   delete all elements having a specified attribute name
529 */
530 static int msg_delete_attribute(struct ldb_module *module,
531                                 struct ldb_context *ldb,
532                                 struct ldb_message *msg, const char *name)
533 {
534         unsigned int i;
535         int ret;
536         struct ldb_message_element *el;
537
538         el = ldb_msg_find_element(msg, name);
539         if (el == NULL) {
540                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
541         }
542         i = el - msg->elements;
543
544         ret = ltdb_index_del_element(module, msg->dn, el);
545         if (ret != LDB_SUCCESS) {
546                 return ret;
547         }
548
549         talloc_free(el->values);
550         if (msg->num_elements > (i+1)) {
551                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
552         }
553         msg->num_elements--;
554         msg->elements = talloc_realloc(msg, msg->elements,
555                                        struct ldb_message_element,
556                                        msg->num_elements);
557         return LDB_SUCCESS;
558 }
559
560 /*
561   delete all elements matching an attribute name/value
562
563   return LDB Error on failure
564 */
565 static int msg_delete_element(struct ldb_module *module,
566                               struct ldb_message *msg,
567                               const char *name,
568                               const struct ldb_val *val)
569 {
570         struct ldb_context *ldb = ldb_module_get_ctx(module);
571         unsigned int i;
572         int found, ret;
573         struct ldb_message_element *el;
574         const struct ldb_schema_attribute *a;
575
576         found = find_element(msg, name);
577         if (found == -1) {
578                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
579         }
580
581         i = (unsigned int) found;
582         el = &(msg->elements[i]);
583
584         a = ldb_schema_attribute_by_name(ldb, el->name);
585
586         for (i=0;i<el->num_values;i++) {
587                 bool matched;
588                 if (a->syntax->operator_fn) {
589                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
590                                                      &el->values[i], val, &matched);
591                         if (ret != LDB_SUCCESS) return ret;
592                 } else {
593                         matched = (a->syntax->comparison_fn(ldb, ldb,
594                                                             &el->values[i], val) == 0);
595                 }
596                 if (matched) {
597                         if (el->num_values == 1) {
598                                 return msg_delete_attribute(module, ldb, msg, name);
599                         }
600
601                         ret = ltdb_index_del_value(module, msg->dn, el, i);
602                         if (ret != LDB_SUCCESS) {
603                                 return ret;
604                         }
605
606                         if (i<el->num_values-1) {
607                                 memmove(&el->values[i], &el->values[i+1],
608                                         sizeof(el->values[i])*
609                                                 (el->num_values-(i+1)));
610                         }
611                         el->num_values--;
612
613                         /* per definition we find in a canonicalised message an
614                            attribute value only once. So we are finished here */
615                         return LDB_SUCCESS;
616                 }
617         }
618
619         /* Not found */
620         return LDB_ERR_NO_SUCH_ATTRIBUTE;
621 }
622
623
624 /*
625   modify a record - internal interface
626
627   yuck - this is O(n^2). Luckily n is usually small so we probably
628   get away with it, but if we ever have really large attribute lists
629   then we'll need to look at this again
630
631   'req' is optional, and is used to specify controls if supplied
632 */
633 int ltdb_modify_internal(struct ldb_module *module,
634                          const struct ldb_message *msg,
635                          struct ldb_request *req)
636 {
637         struct ldb_context *ldb = ldb_module_get_ctx(module);
638         void *data = ldb_module_get_private(module);
639         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
640         TDB_DATA tdb_key, tdb_data;
641         struct ldb_message *msg2;
642         unsigned int i, j, k;
643         int ret = LDB_SUCCESS, idx;
644         struct ldb_control *control_permissive = NULL;
645
646         if (req) {
647                 control_permissive = ldb_request_get_control(req,
648                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
649         }
650
651         tdb_key = ltdb_key(module, msg->dn);
652         if (!tdb_key.dptr) {
653                 return LDB_ERR_OTHER;
654         }
655
656         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
657         if (!tdb_data.dptr) {
658                 talloc_free(tdb_key.dptr);
659                 return ltdb_err_map(tdb_error(ltdb->tdb));
660         }
661
662         msg2 = ldb_msg_new(tdb_key.dptr);
663         if (msg2 == NULL) {
664                 free(tdb_data.dptr);
665                 ret = LDB_ERR_OTHER;
666                 goto done;
667         }
668
669         ret = ltdb_unpack_data(module, &tdb_data, msg2);
670         free(tdb_data.dptr);
671         if (ret == -1) {
672                 ret = LDB_ERR_OTHER;
673                 goto done;
674         }
675
676         if (!msg2->dn) {
677                 msg2->dn = msg->dn;
678         }
679
680         for (i=0; i<msg->num_elements; i++) {
681                 struct ldb_message_element *el = &msg->elements[i], *el2;
682                 struct ldb_val *vals;
683                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
684                 const char *dn;
685
686                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
687                 case LDB_FLAG_MOD_ADD:
688
689                         if (el->num_values == 0) {
690                                 ldb_asprintf_errstring(ldb,
691                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
692                                                        el->name, ldb_dn_get_linearized(msg2->dn));
693                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
694                                 goto done;
695                         }
696
697                         /* make a copy of the array so that a permissive
698                          * control can remove duplicates without changing the
699                          * original values, but do not copy data as we do not
700                          * need to keep it around once the operation is
701                          * finished */
702                         if (control_permissive) {
703                                 el = talloc(msg2, struct ldb_message_element);
704                                 if (!el) {
705                                         ret = LDB_ERR_OTHER;
706                                         goto done;
707                                 }
708                                 el->name = msg->elements[i].name;
709                                 el->num_values = msg->elements[i].num_values;
710                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
711                                 if (el->values == NULL) {
712                                         ret = LDB_ERR_OTHER;
713                                         goto done;
714                                 }
715                                 for (j = 0; j < el->num_values; j++) {
716                                         el->values[j] = msg->elements[i].values[j];
717                                 }
718                         }
719
720                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
721                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
722                                                        el->name, ldb_dn_get_linearized(msg2->dn));
723                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
724                                 goto done;
725                         }
726
727                         /* Checks if element already exists */
728                         idx = find_element(msg2, el->name);
729                         if (idx == -1) {
730                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
731                                         ret = LDB_ERR_OTHER;
732                                         goto done;
733                                 }
734                                 ret = ltdb_index_add_element(module, msg2->dn,
735                                                              el);
736                                 if (ret != LDB_SUCCESS) {
737                                         goto done;
738                                 }
739                         } else {
740                                 j = (unsigned int) idx;
741                                 el2 = &(msg2->elements[j]);
742
743                                 /* We cannot add another value on a existing one
744                                    if the attribute is single-valued */
745                                 if (ldb_tdb_single_valued(a, el)) {
746                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
747                                                                el->name, ldb_dn_get_linearized(msg2->dn));
748                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
749                                         goto done;
750                                 }
751
752                                 /* Check that values don't exist yet on multi-
753                                    valued attributes or aren't provided twice */
754                                 for (j = 0; j < el->num_values; j++) {
755                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
756                                                 if (control_permissive) {
757                                                         /* remove this one as if it was never added */
758                                                         el->num_values--;
759                                                         for (k = j; k < el->num_values; k++) {
760                                                                 el->values[k] = el->values[k + 1];
761                                                         }
762                                                         j--; /* rewind */
763
764                                                         continue;
765                                                 }
766
767                                                 ldb_asprintf_errstring(ldb,
768                                                                        "attribute '%s': value #%u on '%s' already exists",
769                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
770                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
771                                                 goto done;
772                                         }
773                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
774                                                 ldb_asprintf_errstring(ldb,
775                                                                        "attribute '%s': value #%u on '%s' provided more than once",
776                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
777                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
778                                                 goto done;
779                                         }
780                                 }
781
782                                 /* Now combine existing and new values to a new
783                                    attribute record */
784                                 vals = talloc_realloc(msg2->elements,
785                                                       el2->values, struct ldb_val,
786                                                       el2->num_values + el->num_values);
787                                 if (vals == NULL) {
788                                         ldb_oom(ldb);
789                                         ret = LDB_ERR_OTHER;
790                                         goto done;
791                                 }
792
793                                 for (j=0; j<el->num_values; j++) {
794                                         vals[el2->num_values + j] =
795                                                 ldb_val_dup(vals, &el->values[j]);
796                                 }
797
798                                 el2->values = vals;
799                                 el2->num_values += el->num_values;
800
801                                 ret = ltdb_index_add_element(module, msg2->dn, el);
802                                 if (ret != LDB_SUCCESS) {
803                                         goto done;
804                                 }
805                         }
806
807                         break;
808
809                 case LDB_FLAG_MOD_REPLACE:
810
811                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
812                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
813                                                        el->name, ldb_dn_get_linearized(msg2->dn));
814                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
815                                 goto done;
816                         }
817
818                         /* TODO: This is O(n^2) - replace with more efficient check */
819                         for (j=0; j<el->num_values; j++) {
820                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
821                                         ldb_asprintf_errstring(ldb,
822                                                                "attribute '%s': value #%u on '%s' provided more than once",
823                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
824                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
825                                         goto done;
826                                 }
827                         }
828
829                         /* Checks if element already exists */
830                         idx = find_element(msg2, el->name);
831                         if (idx != -1) {
832                                 j = (unsigned int) idx;
833                                 el2 = &(msg2->elements[j]);
834                                 if (ldb_msg_element_compare(el, el2) == 0) {
835                                         /* we are replacing with the same values */
836                                         continue;
837                                 }
838                         
839                                 /* Delete the attribute if it exists in the DB */
840                                 if (msg_delete_attribute(module, ldb, msg2,
841                                                          el->name) != 0) {
842                                         ret = LDB_ERR_OTHER;
843                                         goto done;
844                                 }
845                         }
846
847                         /* Recreate it with the new values */
848                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
849                                 ret = LDB_ERR_OTHER;
850                                 goto done;
851                         }
852
853                         ret = ltdb_index_add_element(module, msg2->dn, el);
854                         if (ret != LDB_SUCCESS) {
855                                 goto done;
856                         }
857
858                         break;
859
860                 case LDB_FLAG_MOD_DELETE:
861                         dn = ldb_dn_get_linearized(msg2->dn);
862                         if (dn == NULL) {
863                                 ret = LDB_ERR_OTHER;
864                                 goto done;
865                         }
866
867                         if (msg->elements[i].num_values == 0) {
868                                 /* Delete the whole attribute */
869                                 ret = msg_delete_attribute(module, ldb, msg2,
870                                                            msg->elements[i].name);
871                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
872                                     control_permissive) {
873                                         ret = LDB_SUCCESS;
874                                 } else {
875                                         ldb_asprintf_errstring(ldb,
876                                                                "attribute '%s': no such attribute for delete on '%s'",
877                                                                msg->elements[i].name, dn);
878                                 }
879                                 if (ret != LDB_SUCCESS) {
880                                         goto done;
881                                 }
882                         } else {
883                                 /* Delete specified values from an attribute */
884                                 for (j=0; j < msg->elements[i].num_values; j++) {
885                                         ret = msg_delete_element(module,
886                                                                  msg2,
887                                                                  msg->elements[i].name,
888                                                                  &msg->elements[i].values[j]);
889                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
890                                             control_permissive) {
891                                                 ret = LDB_SUCCESS;
892                                         } else {
893                                                 ldb_asprintf_errstring(ldb,
894                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
895                                                                        msg->elements[i].name, dn);
896                                         }
897                                         if (ret != LDB_SUCCESS) {
898                                                 goto done;
899                                         }
900                                 }
901                         }
902                         break;
903                 default:
904                         ldb_asprintf_errstring(ldb,
905                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
906                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
907                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
908                         ret = LDB_ERR_PROTOCOL_ERROR;
909                         goto done;
910                 }
911         }
912
913         ret = ltdb_store(module, msg2, TDB_MODIFY);
914         if (ret != LDB_SUCCESS) {
915                 goto done;
916         }
917
918         ret = ltdb_modified(module, msg2->dn);
919         if (ret != LDB_SUCCESS) {
920                 goto done;
921         }
922
923 done:
924         talloc_free(tdb_key.dptr);
925         return ret;
926 }
927
928 /*
929   modify a record
930 */
931 static int ltdb_modify(struct ltdb_context *ctx)
932 {
933         struct ldb_module *module = ctx->module;
934         struct ldb_request *req = ctx->req;
935         int ret = LDB_SUCCESS;
936
937         ret = ltdb_check_special_dn(module, req->op.mod.message);
938         if (ret != LDB_SUCCESS) {
939                 return ret;
940         }
941
942         ldb_request_set_state(req, LDB_ASYNC_PENDING);
943
944         if (ltdb_cache_load(module) != 0) {
945                 return LDB_ERR_OPERATIONS_ERROR;
946         }
947
948         ret = ltdb_modify_internal(module, req->op.mod.message, req);
949
950         return ret;
951 }
952
953 /*
954   rename a record
955 */
956 static int ltdb_rename(struct ltdb_context *ctx)
957 {
958         struct ldb_module *module = ctx->module;
959         struct ldb_request *req = ctx->req;
960         struct ldb_message *msg;
961         int ret = LDB_SUCCESS;
962
963         ldb_request_set_state(req, LDB_ASYNC_PENDING);
964
965         if (ltdb_cache_load(ctx->module) != 0) {
966                 return LDB_ERR_OPERATIONS_ERROR;
967         }
968
969         msg = ldb_msg_new(ctx);
970         if (msg == NULL) {
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973
974         /* in case any attribute of the message was indexed, we need
975            to fetch the old record */
976         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
977         if (ret != LDB_SUCCESS) {
978                 /* not finding the old record is an error */
979                 return ret;
980         }
981
982         /* Always delete first then add, to avoid conflicts with
983          * unique indexes. We rely on the transaction to make this
984          * atomic
985          */
986         ret = ltdb_delete_internal(module, msg->dn);
987         if (ret != LDB_SUCCESS) {
988                 return ret;
989         }
990
991         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
992         if (msg->dn == NULL) {
993                 return LDB_ERR_OPERATIONS_ERROR;
994         }
995
996         ret = ltdb_add_internal(module, msg);
997
998         return ret;
999 }
1000
1001 static int ltdb_start_trans(struct ldb_module *module)
1002 {
1003         void *data = ldb_module_get_private(module);
1004         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1005
1006         if (tdb_transaction_start(ltdb->tdb) != 0) {
1007                 return ltdb_err_map(tdb_error(ltdb->tdb));
1008         }
1009
1010         ltdb->in_transaction++;
1011
1012         ltdb_index_transaction_start(module);
1013
1014         return LDB_SUCCESS;
1015 }
1016
1017 static int ltdb_prepare_commit(struct ldb_module *module)
1018 {
1019         void *data = ldb_module_get_private(module);
1020         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1021
1022         if (ltdb->in_transaction != 1) {
1023                 return LDB_SUCCESS;
1024         }
1025
1026         if (ltdb_index_transaction_commit(module) != 0) {
1027                 tdb_transaction_cancel(ltdb->tdb);
1028                 ltdb->in_transaction--;
1029                 return ltdb_err_map(tdb_error(ltdb->tdb));
1030         }
1031
1032         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1033                 ltdb->in_transaction--;
1034                 return ltdb_err_map(tdb_error(ltdb->tdb));
1035         }
1036
1037         ltdb->prepared_commit = true;
1038
1039         return LDB_SUCCESS;
1040 }
1041
1042 static int ltdb_end_trans(struct ldb_module *module)
1043 {
1044         void *data = ldb_module_get_private(module);
1045         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1046
1047         if (!ltdb->prepared_commit) {
1048                 int ret = ltdb_prepare_commit(module);
1049                 if (ret != LDB_SUCCESS) {
1050                         return ret;
1051                 }
1052         }
1053
1054         ltdb->in_transaction--;
1055         ltdb->prepared_commit = false;
1056
1057         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1058                 return ltdb_err_map(tdb_error(ltdb->tdb));
1059         }
1060
1061         return LDB_SUCCESS;
1062 }
1063
1064 static int ltdb_del_trans(struct ldb_module *module)
1065 {
1066         void *data = ldb_module_get_private(module);
1067         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1068
1069         ltdb->in_transaction--;
1070
1071         if (ltdb_index_transaction_cancel(module) != 0) {
1072                 tdb_transaction_cancel(ltdb->tdb);
1073                 return ltdb_err_map(tdb_error(ltdb->tdb));
1074         }
1075
1076         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
1077                 return ltdb_err_map(tdb_error(ltdb->tdb));
1078         }
1079
1080         return LDB_SUCCESS;
1081 }
1082
1083 /*
1084   return sequenceNumber from @BASEINFO
1085 */
1086 static int ltdb_sequence_number(struct ltdb_context *ctx,
1087                                 struct ldb_extended **ext)
1088 {
1089         struct ldb_context *ldb;
1090         struct ldb_module *module = ctx->module;
1091         struct ldb_request *req = ctx->req;
1092         TALLOC_CTX *tmp_ctx = NULL;
1093         struct ldb_seqnum_request *seq;
1094         struct ldb_seqnum_result *res;
1095         struct ldb_message *msg = NULL;
1096         struct ldb_dn *dn;
1097         const char *date;
1098         int ret = LDB_SUCCESS;
1099
1100         ldb = ldb_module_get_ctx(module);
1101
1102         seq = talloc_get_type(req->op.extended.data,
1103                                 struct ldb_seqnum_request);
1104         if (seq == NULL) {
1105                 return LDB_ERR_OPERATIONS_ERROR;
1106         }
1107
1108         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1109
1110         if (ltdb_lock_read(module) != 0) {
1111                 return LDB_ERR_OPERATIONS_ERROR;
1112         }
1113
1114         res = talloc_zero(req, struct ldb_seqnum_result);
1115         if (res == NULL) {
1116                 ret = LDB_ERR_OPERATIONS_ERROR;
1117                 goto done;
1118         }
1119
1120         tmp_ctx = talloc_new(req);
1121         if (tmp_ctx == NULL) {
1122                 ret = LDB_ERR_OPERATIONS_ERROR;
1123                 goto done;
1124         }
1125
1126         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1127         if (dn == NULL) {
1128                 ret = LDB_ERR_OPERATIONS_ERROR;
1129                 goto done;
1130         }
1131
1132         msg = ldb_msg_new(tmp_ctx);
1133         if (msg == NULL) {
1134                 ret = LDB_ERR_OPERATIONS_ERROR;
1135                 goto done;
1136         }
1137
1138         ret = ltdb_search_dn1(module, dn, msg);
1139         if (ret != LDB_SUCCESS) {
1140                 goto done;
1141         }
1142
1143         switch (seq->type) {
1144         case LDB_SEQ_HIGHEST_SEQ:
1145                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1146                 break;
1147         case LDB_SEQ_NEXT:
1148                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1149                 res->seq_num++;
1150                 break;
1151         case LDB_SEQ_HIGHEST_TIMESTAMP:
1152                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1153                 if (date) {
1154                         res->seq_num = ldb_string_to_time(date);
1155                 } else {
1156                         res->seq_num = 0;
1157                         /* zero is as good as anything when we don't know */
1158                 }
1159                 break;
1160         }
1161
1162         *ext = talloc_zero(req, struct ldb_extended);
1163         if (*ext == NULL) {
1164                 ret = LDB_ERR_OPERATIONS_ERROR;
1165                 goto done;
1166         }
1167         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1168         (*ext)->data = talloc_steal(*ext, res);
1169
1170 done:
1171         talloc_free(tmp_ctx);
1172         ltdb_unlock_read(module);
1173         return ret;
1174 }
1175
1176 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1177 {
1178         struct ldb_context *ldb;
1179         struct ldb_request *req;
1180         struct ldb_reply *ares;
1181
1182         ldb = ldb_module_get_ctx(ctx->module);
1183         req = ctx->req;
1184
1185         /* if we already returned an error just return */
1186         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1187                 return;
1188         }
1189
1190         ares = talloc_zero(req, struct ldb_reply);
1191         if (!ares) {
1192                 ldb_oom(ldb);
1193                 req->callback(req, NULL);
1194                 return;
1195         }
1196         ares->type = LDB_REPLY_DONE;
1197         ares->error = error;
1198
1199         req->callback(req, ares);
1200 }
1201
1202 static void ltdb_timeout(struct tevent_context *ev,
1203                           struct tevent_timer *te,
1204                           struct timeval t,
1205                           void *private_data)
1206 {
1207         struct ltdb_context *ctx;
1208         ctx = talloc_get_type(private_data, struct ltdb_context);
1209
1210         if (!ctx->request_terminated) {
1211                 /* request is done now */
1212                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1213         }
1214
1215         if (!ctx->request_terminated) {
1216                 /* neutralize the spy */
1217                 ctx->spy->ctx = NULL;
1218         }
1219         talloc_free(ctx);
1220 }
1221
1222 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1223                                         struct ldb_extended *ext,
1224                                         int error)
1225 {
1226         struct ldb_context *ldb;
1227         struct ldb_request *req;
1228         struct ldb_reply *ares;
1229
1230         ldb = ldb_module_get_ctx(ctx->module);
1231         req = ctx->req;
1232
1233         /* if we already returned an error just return */
1234         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1235                 return;
1236         }
1237
1238         ares = talloc_zero(req, struct ldb_reply);
1239         if (!ares) {
1240                 ldb_oom(ldb);
1241                 req->callback(req, NULL);
1242                 return;
1243         }
1244         ares->type = LDB_REPLY_DONE;
1245         ares->response = ext;
1246         ares->error = error;
1247
1248         req->callback(req, ares);
1249 }
1250
1251 static void ltdb_handle_extended(struct ltdb_context *ctx)
1252 {
1253         struct ldb_extended *ext = NULL;
1254         int ret;
1255
1256         if (strcmp(ctx->req->op.extended.oid,
1257                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1258                 /* get sequence number */
1259                 ret = ltdb_sequence_number(ctx, &ext);
1260         } else {
1261                 /* not recognized */
1262                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1263         }
1264
1265         ltdb_request_extended_done(ctx, ext, ret);
1266 }
1267
1268 static void ltdb_callback(struct tevent_context *ev,
1269                           struct tevent_timer *te,
1270                           struct timeval t,
1271                           void *private_data)
1272 {
1273         struct ltdb_context *ctx;
1274         int ret;
1275
1276         ctx = talloc_get_type(private_data, struct ltdb_context);
1277
1278         if (ctx->request_terminated) {
1279                 goto done;
1280         }
1281
1282         switch (ctx->req->operation) {
1283         case LDB_SEARCH:
1284                 ret = ltdb_search(ctx);
1285                 break;
1286         case LDB_ADD:
1287                 ret = ltdb_add(ctx);
1288                 break;
1289         case LDB_MODIFY:
1290                 ret = ltdb_modify(ctx);
1291                 break;
1292         case LDB_DELETE:
1293                 ret = ltdb_delete(ctx);
1294                 break;
1295         case LDB_RENAME:
1296                 ret = ltdb_rename(ctx);
1297                 break;
1298         case LDB_EXTENDED:
1299                 ltdb_handle_extended(ctx);
1300                 goto done;
1301         default:
1302                 /* no other op supported */
1303                 ret = LDB_ERR_PROTOCOL_ERROR;
1304         }
1305
1306         if (!ctx->request_terminated) {
1307                 /* request is done now */
1308                 ltdb_request_done(ctx, ret);
1309         }
1310
1311 done:
1312         if (!ctx->request_terminated) {
1313                 /* neutralize the spy */
1314                 ctx->spy->ctx = NULL;
1315         }
1316         talloc_free(ctx);
1317 }
1318
1319 static int ltdb_request_destructor(void *ptr)
1320 {
1321         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1322
1323         if (spy->ctx != NULL) {
1324                 spy->ctx->request_terminated = true;
1325         }
1326
1327         return 0;
1328 }
1329
1330 static int ltdb_handle_request(struct ldb_module *module,
1331                                struct ldb_request *req)
1332 {
1333         struct ldb_control *control_permissive;
1334         struct ldb_context *ldb;
1335         struct tevent_context *ev;
1336         struct ltdb_context *ac;
1337         struct tevent_timer *te;
1338         struct timeval tv;
1339         unsigned int i;
1340
1341         ldb = ldb_module_get_ctx(module);
1342
1343         control_permissive = ldb_request_get_control(req,
1344                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1345
1346         for (i = 0; req->controls && req->controls[i]; i++) {
1347                 if (req->controls[i]->critical &&
1348                     req->controls[i] != control_permissive) {
1349                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1350                                                req->controls[i]->oid);
1351                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1352                 }
1353         }
1354
1355         if (req->starttime == 0 || req->timeout == 0) {
1356                 ldb_set_errstring(ldb, "Invalid timeout settings");
1357                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1358         }
1359
1360         ev = ldb_get_event_context(ldb);
1361
1362         ac = talloc_zero(ldb, struct ltdb_context);
1363         if (ac == NULL) {
1364                 ldb_oom(ldb);
1365                 return LDB_ERR_OPERATIONS_ERROR;
1366         }
1367
1368         ac->module = module;
1369         ac->req = req;
1370
1371         tv.tv_sec = 0;
1372         tv.tv_usec = 0;
1373         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1374         if (NULL == te) {
1375                 talloc_free(ac);
1376                 return LDB_ERR_OPERATIONS_ERROR;
1377         }
1378
1379         tv.tv_sec = req->starttime + req->timeout;
1380         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1381         if (NULL == ac->timeout_event) {
1382                 talloc_free(ac);
1383                 return LDB_ERR_OPERATIONS_ERROR;
1384         }
1385
1386         /* set a spy so that we do not try to use the request context
1387          * if it is freed before ltdb_callback fires */
1388         ac->spy = talloc(req, struct ltdb_req_spy);
1389         if (NULL == ac->spy) {
1390                 talloc_free(ac);
1391                 return LDB_ERR_OPERATIONS_ERROR;
1392         }
1393         ac->spy->ctx = ac;
1394
1395         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1396
1397         return LDB_SUCCESS;
1398 }
1399
1400 static int ltdb_init_rootdse(struct ldb_module *module)
1401 {
1402         struct ldb_context *ldb;
1403         int ret;
1404
1405         ldb = ldb_module_get_ctx(module);
1406
1407         ret = ldb_mod_register_control(module,
1408                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1409         /* ignore errors on this - we expect it for non-sam databases */
1410
1411         /* there can be no module beyond the backend, just return */
1412         return LDB_SUCCESS;
1413 }
1414
1415 static const struct ldb_module_ops ltdb_ops = {
1416         .name              = "tdb",
1417         .init_context      = ltdb_init_rootdse,
1418         .search            = ltdb_handle_request,
1419         .add               = ltdb_handle_request,
1420         .modify            = ltdb_handle_request,
1421         .del               = ltdb_handle_request,
1422         .rename            = ltdb_handle_request,
1423         .extended          = ltdb_handle_request,
1424         .start_transaction = ltdb_start_trans,
1425         .end_transaction   = ltdb_end_trans,
1426         .prepare_commit    = ltdb_prepare_commit,
1427         .del_transaction   = ltdb_del_trans,
1428 };
1429
1430 /*
1431   connect to the database
1432 */
1433 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1434                         unsigned int flags, const char *options[],
1435                         struct ldb_module **_module)
1436 {
1437         struct ldb_module *module;
1438         const char *path;
1439         int tdb_flags, open_flags;
1440         struct ltdb_private *ltdb;
1441
1442         /* parse the url */
1443         if (strchr(url, ':')) {
1444                 if (strncmp(url, "tdb://", 6) != 0) {
1445                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1446                                   "Invalid tdb URL '%s'", url);
1447                         return LDB_ERR_OPERATIONS_ERROR;
1448                 }
1449                 path = url+6;
1450         } else {
1451                 path = url;
1452         }
1453
1454         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1455
1456         /* check for the 'nosync' option */
1457         if (flags & LDB_FLG_NOSYNC) {
1458                 tdb_flags |= TDB_NOSYNC;
1459         }
1460
1461         /* and nommap option */
1462         if (flags & LDB_FLG_NOMMAP) {
1463                 tdb_flags |= TDB_NOMMAP;
1464         }
1465
1466         if (flags & LDB_FLG_RDONLY) {
1467                 open_flags = O_RDONLY;
1468         } else {
1469                 open_flags = O_CREAT | O_RDWR;
1470         }
1471
1472         ltdb = talloc_zero(ldb, struct ltdb_private);
1473         if (!ltdb) {
1474                 ldb_oom(ldb);
1475                 return LDB_ERR_OPERATIONS_ERROR;
1476         }
1477
1478         /* note that we use quite a large default hash size */
1479         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1480                                    tdb_flags, open_flags,
1481                                    ldb_get_create_perms(ldb), ldb);
1482         if (!ltdb->tdb) {
1483                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1484                           "Unable to open tdb '%s'", path);
1485                 talloc_free(ltdb);
1486                 return LDB_ERR_OPERATIONS_ERROR;
1487         }
1488
1489         if (getenv("LDB_WARN_UNINDEXED")) {
1490                 ltdb->warn_unindexed = true;
1491         }
1492
1493         ltdb->sequence_number = 0;
1494
1495         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1496         if (!module) {
1497                 talloc_free(ltdb);
1498                 return LDB_ERR_OPERATIONS_ERROR;
1499         }
1500         ldb_module_set_private(module, ltdb);
1501         talloc_steal(module, ltdb);
1502
1503         if (ltdb_cache_load(module) != 0) {
1504                 talloc_free(module);
1505                 talloc_free(ltdb);
1506                 return LDB_ERR_OPERATIONS_ERROR;
1507         }
1508
1509         *_module = module;
1510         return LDB_SUCCESS;
1511 }
1512
1513 int ldb_tdb_init(const char *version)
1514 {
1515         LDB_MODULE_CHECK_VERSION(version);
1516         return ldb_register_backend("tdb", ltdb_connect, false);
1517 }