lib/ldb: Ensure rename target does not exist before deleting old record
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <tdb.h>
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ltdb_req_spy {
59         struct ltdb_context *ctx;
60 };
61
62 /*
63   map a tdb error code to a ldb error code
64 */
65 int ltdb_err_map(enum TDB_ERROR tdb_code)
66 {
67         switch (tdb_code) {
68         case TDB_SUCCESS:
69                 return LDB_SUCCESS;
70         case TDB_ERR_CORRUPT:
71         case TDB_ERR_OOM:
72         case TDB_ERR_EINVAL:
73                 return LDB_ERR_OPERATIONS_ERROR;
74         case TDB_ERR_IO:
75                 return LDB_ERR_PROTOCOL_ERROR;
76         case TDB_ERR_LOCK:
77         case TDB_ERR_NOLOCK:
78                 return LDB_ERR_BUSY;
79         case TDB_ERR_LOCK_TIMEOUT:
80                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
81         case TDB_ERR_EXISTS:
82                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
83         case TDB_ERR_NOEXIST:
84                 return LDB_ERR_NO_SUCH_OBJECT;
85         case TDB_ERR_RDONLY:
86                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
87         default:
88                 break;
89         }
90         return LDB_ERR_OTHER;
91 }
92
93 /*
94   lock the database for read - use by ltdb_search and ltdb_sequence_number
95 */
96 int ltdb_lock_read(struct ldb_module *module)
97 {
98         void *data = ldb_module_get_private(module);
99         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
100         int ret = 0;
101
102         if (ltdb->in_transaction == 0 &&
103             ltdb->read_lock_count == 0) {
104                 ret = tdb_lockall_read(ltdb->tdb);
105         }
106         if (ret == 0) {
107                 ltdb->read_lock_count++;
108         }
109         return ret;
110 }
111
112 /*
113   unlock the database after a ltdb_lock_read()
114 */
115 int ltdb_unlock_read(struct ldb_module *module)
116 {
117         void *data = ldb_module_get_private(module);
118         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
119         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
120                 tdb_unlockall_read(ltdb->tdb);
121                 return 0;
122         }
123         ltdb->read_lock_count--;
124         return 0;
125 }
126
127
128 /*
129   form a TDB_DATA for a record key
130   caller frees
131
132   note that the key for a record can depend on whether the
133   dn refers to a case sensitive index record or not
134 */
135 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
136 {
137         struct ldb_context *ldb = ldb_module_get_ctx(module);
138         TDB_DATA key;
139         char *key_str = NULL;
140         const char *dn_folded = NULL;
141
142         /*
143           most DNs are case insensitive. The exception is index DNs for
144           case sensitive attributes
145
146           there are 3 cases dealt with in this code:
147
148           1) if the dn doesn't start with @ then uppercase the attribute
149              names and the attributes values of case insensitive attributes
150           2) if the dn starts with @ then leave it alone -
151              the indexing code handles the rest
152         */
153
154         dn_folded = ldb_dn_get_casefold(dn);
155         if (!dn_folded) {
156                 goto failed;
157         }
158
159         key_str = talloc_strdup(ldb, "DN=");
160         if (!key_str) {
161                 goto failed;
162         }
163
164         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
165         if (!key_str) {
166                 goto failed;
167         }
168
169         key.dptr = (uint8_t *)key_str;
170         key.dsize = strlen(key_str) + 1;
171
172         return key;
173
174 failed:
175         errno = ENOMEM;
176         key.dptr = NULL;
177         key.dsize = 0;
178         return key;
179 }
180
181 /*
182   check special dn's have valid attributes
183   currently only @ATTRIBUTES is checked
184 */
185 static int ltdb_check_special_dn(struct ldb_module *module,
186                                  const struct ldb_message *msg)
187 {
188         struct ldb_context *ldb = ldb_module_get_ctx(module);
189         unsigned int i, j;
190
191         if (! ldb_dn_is_special(msg->dn) ||
192             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
193                 return LDB_SUCCESS;
194         }
195
196         /* we have @ATTRIBUTES, let's check attributes are fine */
197         /* should we check that we deny multivalued attributes ? */
198         for (i = 0; i < msg->num_elements; i++) {
199                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
200
201                 for (j = 0; j < msg->elements[i].num_values; j++) {
202                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
203                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
204                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
205                         }
206                 }
207         }
208
209         return LDB_SUCCESS;
210 }
211
212
213 /*
214   we've made a modification to a dn - possibly reindex and
215   update sequence number
216 */
217 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
218 {
219         int ret = LDB_SUCCESS;
220         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
221
222         /* only allow modifies inside a transaction, otherwise the
223          * ldb is unsafe */
224         if (ltdb->in_transaction == 0) {
225                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
226                 return LDB_ERR_OPERATIONS_ERROR;
227         }
228
229         if (ldb_dn_is_special(dn) &&
230             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
231              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
232                 ret = ltdb_reindex(module);
233         }
234
235         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
236         if (ret == LDB_SUCCESS &&
237             !(ldb_dn_is_special(dn) &&
238               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
239                 ret = ltdb_increase_sequence_number(module);
240         }
241
242         /* If the modify was to @OPTIONS, reload the cache */
243         if (ret == LDB_SUCCESS &&
244             ldb_dn_is_special(dn) &&
245             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
246                 ret = ltdb_cache_reload(module);
247         }
248
249         return ret;
250 }
251
252 /*
253   store a record into the db
254 */
255 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
256 {
257         void *data = ldb_module_get_private(module);
258         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
259         TDB_DATA tdb_key, tdb_data;
260         int ret = LDB_SUCCESS;
261
262         tdb_key = ltdb_key(module, msg->dn);
263         if (tdb_key.dptr == NULL) {
264                 return LDB_ERR_OTHER;
265         }
266
267         ret = ltdb_pack_data(module, msg, &tdb_data);
268         if (ret == -1) {
269                 talloc_free(tdb_key.dptr);
270                 return LDB_ERR_OTHER;
271         }
272
273         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
274         if (ret != 0) {
275                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
276                 goto done;
277         }
278
279 done:
280         talloc_free(tdb_key.dptr);
281         talloc_free(tdb_data.dptr);
282
283         return ret;
284 }
285
286
287 /*
288   check if a attribute is a single valued, for a given element
289  */
290 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
291                                   struct ldb_message_element *el)
292 {
293         if (!a) return false;
294         if (el != NULL) {
295                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
296                         /* override from a ldb module, for example
297                            used for the description field, which is
298                            marked multi-valued in the schema but which
299                            should not actually accept multiple
300                            values */
301                         return true;
302                 }
303                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
304                         /* override from a ldb module, for example used for
305                            deleted linked attribute entries */
306                         return false;
307                 }
308         }
309         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
310                 return true;
311         }
312         return false;
313 }
314
315 static int ltdb_add_internal(struct ldb_module *module,
316                              const struct ldb_message *msg,
317                              bool check_single_value)
318 {
319         struct ldb_context *ldb = ldb_module_get_ctx(module);
320         int ret = LDB_SUCCESS;
321         unsigned int i;
322
323         for (i=0;i<msg->num_elements;i++) {
324                 struct ldb_message_element *el = &msg->elements[i];
325                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
326
327                 if (el->num_values == 0) {
328                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
329                                                el->name, ldb_dn_get_linearized(msg->dn));
330                         return LDB_ERR_CONSTRAINT_VIOLATION;
331                 }
332                 if (check_single_value &&
333                                 el->num_values > 1 &&
334                                 ldb_tdb_single_valued(a, el)) {
335                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
336                                                el->name, ldb_dn_get_linearized(msg->dn));
337                         return LDB_ERR_CONSTRAINT_VIOLATION;
338                 }
339         }
340
341         ret = ltdb_store(module, msg, TDB_INSERT);
342         if (ret != LDB_SUCCESS) {
343                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
344                         ldb_asprintf_errstring(ldb,
345                                                "Entry %s already exists",
346                                                ldb_dn_get_linearized(msg->dn));
347                 }
348                 return ret;
349         }
350
351         ret = ltdb_index_add_new(module, msg);
352         if (ret != LDB_SUCCESS) {
353                 return ret;
354         }
355
356         ret = ltdb_modified(module, msg->dn);
357
358         return ret;
359 }
360
361 /*
362   add a record to the database
363 */
364 static int ltdb_add(struct ltdb_context *ctx)
365 {
366         struct ldb_module *module = ctx->module;
367         struct ldb_request *req = ctx->req;
368         int ret = LDB_SUCCESS;
369
370         ret = ltdb_check_special_dn(module, req->op.add.message);
371         if (ret != LDB_SUCCESS) {
372                 return ret;
373         }
374
375         ldb_request_set_state(req, LDB_ASYNC_PENDING);
376
377         if (ltdb_cache_load(module) != 0) {
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380
381         ret = ltdb_add_internal(module, req->op.add.message, true);
382
383         return ret;
384 }
385
386 /*
387   delete a record from the database, not updating indexes (used for deleting
388   index records)
389 */
390 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
391 {
392         void *data = ldb_module_get_private(module);
393         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
394         TDB_DATA tdb_key;
395         int ret;
396
397         tdb_key = ltdb_key(module, dn);
398         if (!tdb_key.dptr) {
399                 return LDB_ERR_OTHER;
400         }
401
402         ret = tdb_delete(ltdb->tdb, tdb_key);
403         talloc_free(tdb_key.dptr);
404
405         if (ret != 0) {
406                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
407         }
408
409         return ret;
410 }
411
412 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
413 {
414         struct ldb_message *msg;
415         int ret = LDB_SUCCESS;
416
417         msg = ldb_msg_new(module);
418         if (msg == NULL) {
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         /* in case any attribute of the message was indexed, we need
423            to fetch the old record */
424         ret = ltdb_search_dn1(module, dn, msg);
425         if (ret != LDB_SUCCESS) {
426                 /* not finding the old record is an error */
427                 goto done;
428         }
429
430         ret = ltdb_delete_noindex(module, dn);
431         if (ret != LDB_SUCCESS) {
432                 goto done;
433         }
434
435         /* remove any indexed attributes */
436         ret = ltdb_index_delete(module, msg);
437         if (ret != LDB_SUCCESS) {
438                 goto done;
439         }
440
441         ret = ltdb_modified(module, dn);
442         if (ret != LDB_SUCCESS) {
443                 goto done;
444         }
445
446 done:
447         talloc_free(msg);
448         return ret;
449 }
450
451 /*
452   delete a record from the database
453 */
454 static int ltdb_delete(struct ltdb_context *ctx)
455 {
456         struct ldb_module *module = ctx->module;
457         struct ldb_request *req = ctx->req;
458         int ret = LDB_SUCCESS;
459
460         ldb_request_set_state(req, LDB_ASYNC_PENDING);
461
462         if (ltdb_cache_load(module) != 0) {
463                 return LDB_ERR_OPERATIONS_ERROR;
464         }
465
466         ret = ltdb_delete_internal(module, req->op.del.dn);
467
468         return ret;
469 }
470
471 /*
472   find an element by attribute name. At the moment this does a linear search,
473   it should be re-coded to use a binary search once all places that modify
474   records guarantee sorted order
475
476   return the index of the first matching element if found, otherwise -1
477 */
478 static int find_element(const struct ldb_message *msg, const char *name)
479 {
480         unsigned int i;
481         for (i=0;i<msg->num_elements;i++) {
482                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
483                         return i;
484                 }
485         }
486         return -1;
487 }
488
489
490 /*
491   add an element to an existing record. Assumes a elements array that we
492   can call re-alloc on, and assumed that we can re-use the data pointers from
493   the passed in additional values. Use with care!
494
495   returns 0 on success, -1 on failure (and sets errno)
496 */
497 static int ltdb_msg_add_element(struct ldb_context *ldb,
498                                 struct ldb_message *msg,
499                                 struct ldb_message_element *el)
500 {
501         struct ldb_message_element *e2;
502         unsigned int i;
503
504         if (el->num_values == 0) {
505                 /* nothing to do here - we don't add empty elements */
506                 return 0;
507         }
508
509         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
510                               msg->num_elements+1);
511         if (!e2) {
512                 errno = ENOMEM;
513                 return -1;
514         }
515
516         msg->elements = e2;
517
518         e2 = &msg->elements[msg->num_elements];
519
520         e2->name = el->name;
521         e2->flags = el->flags;
522         e2->values = talloc_array(msg->elements,
523                                   struct ldb_val, el->num_values);
524         if (!e2->values) {
525                 errno = ENOMEM;
526                 return -1;
527         }
528         for (i=0;i<el->num_values;i++) {
529                 e2->values[i] = el->values[i];
530         }
531         e2->num_values = el->num_values;
532
533         ++msg->num_elements;
534
535         return 0;
536 }
537
538 /*
539   delete all elements having a specified attribute name
540 */
541 static int msg_delete_attribute(struct ldb_module *module,
542                                 struct ldb_context *ldb,
543                                 struct ldb_message *msg, const char *name)
544 {
545         unsigned int i;
546         int ret;
547         struct ldb_message_element *el;
548
549         el = ldb_msg_find_element(msg, name);
550         if (el == NULL) {
551                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
552         }
553         i = el - msg->elements;
554
555         ret = ltdb_index_del_element(module, msg->dn, el);
556         if (ret != LDB_SUCCESS) {
557                 return ret;
558         }
559
560         talloc_free(el->values);
561         if (msg->num_elements > (i+1)) {
562                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
563         }
564         msg->num_elements--;
565         msg->elements = talloc_realloc(msg, msg->elements,
566                                        struct ldb_message_element,
567                                        msg->num_elements);
568         return LDB_SUCCESS;
569 }
570
571 /*
572   delete all elements matching an attribute name/value
573
574   return LDB Error on failure
575 */
576 static int msg_delete_element(struct ldb_module *module,
577                               struct ldb_message *msg,
578                               const char *name,
579                               const struct ldb_val *val)
580 {
581         struct ldb_context *ldb = ldb_module_get_ctx(module);
582         unsigned int i;
583         int found, ret;
584         struct ldb_message_element *el;
585         const struct ldb_schema_attribute *a;
586
587         found = find_element(msg, name);
588         if (found == -1) {
589                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
590         }
591
592         i = (unsigned int) found;
593         el = &(msg->elements[i]);
594
595         a = ldb_schema_attribute_by_name(ldb, el->name);
596
597         for (i=0;i<el->num_values;i++) {
598                 bool matched;
599                 if (a->syntax->operator_fn) {
600                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
601                                                      &el->values[i], val, &matched);
602                         if (ret != LDB_SUCCESS) return ret;
603                 } else {
604                         matched = (a->syntax->comparison_fn(ldb, ldb,
605                                                             &el->values[i], val) == 0);
606                 }
607                 if (matched) {
608                         if (el->num_values == 1) {
609                                 return msg_delete_attribute(module, ldb, msg, name);
610                         }
611
612                         ret = ltdb_index_del_value(module, msg->dn, el, i);
613                         if (ret != LDB_SUCCESS) {
614                                 return ret;
615                         }
616
617                         if (i<el->num_values-1) {
618                                 memmove(&el->values[i], &el->values[i+1],
619                                         sizeof(el->values[i])*
620                                                 (el->num_values-(i+1)));
621                         }
622                         el->num_values--;
623
624                         /* per definition we find in a canonicalised message an
625                            attribute value only once. So we are finished here */
626                         return LDB_SUCCESS;
627                 }
628         }
629
630         /* Not found */
631         return LDB_ERR_NO_SUCH_ATTRIBUTE;
632 }
633
634
635 /*
636   modify a record - internal interface
637
638   yuck - this is O(n^2). Luckily n is usually small so we probably
639   get away with it, but if we ever have really large attribute lists
640   then we'll need to look at this again
641
642   'req' is optional, and is used to specify controls if supplied
643 */
644 int ltdb_modify_internal(struct ldb_module *module,
645                          const struct ldb_message *msg,
646                          struct ldb_request *req)
647 {
648         struct ldb_context *ldb = ldb_module_get_ctx(module);
649         void *data = ldb_module_get_private(module);
650         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
651         TDB_DATA tdb_key, tdb_data;
652         struct ldb_message *msg2;
653         unsigned int i, j, k;
654         int ret = LDB_SUCCESS, idx;
655         struct ldb_control *control_permissive = NULL;
656
657         if (req) {
658                 control_permissive = ldb_request_get_control(req,
659                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
660         }
661
662         tdb_key = ltdb_key(module, msg->dn);
663         if (!tdb_key.dptr) {
664                 return LDB_ERR_OTHER;
665         }
666
667         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
668         if (!tdb_data.dptr) {
669                 talloc_free(tdb_key.dptr);
670                 return ltdb_err_map(tdb_error(ltdb->tdb));
671         }
672
673         msg2 = ldb_msg_new(tdb_key.dptr);
674         if (msg2 == NULL) {
675                 free(tdb_data.dptr);
676                 ret = LDB_ERR_OTHER;
677                 goto done;
678         }
679
680         ret = ltdb_unpack_data(module, &tdb_data, msg2);
681         free(tdb_data.dptr);
682         if (ret == -1) {
683                 ret = LDB_ERR_OTHER;
684                 goto done;
685         }
686
687         if (!msg2->dn) {
688                 msg2->dn = msg->dn;
689         }
690
691         for (i=0; i<msg->num_elements; i++) {
692                 struct ldb_message_element *el = &msg->elements[i], *el2;
693                 struct ldb_val *vals;
694                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
695                 const char *dn;
696
697                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
698                 case LDB_FLAG_MOD_ADD:
699
700                         if (el->num_values == 0) {
701                                 ldb_asprintf_errstring(ldb,
702                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
703                                                        el->name, ldb_dn_get_linearized(msg2->dn));
704                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
705                                 goto done;
706                         }
707
708                         /* make a copy of the array so that a permissive
709                          * control can remove duplicates without changing the
710                          * original values, but do not copy data as we do not
711                          * need to keep it around once the operation is
712                          * finished */
713                         if (control_permissive) {
714                                 el = talloc(msg2, struct ldb_message_element);
715                                 if (!el) {
716                                         ret = LDB_ERR_OTHER;
717                                         goto done;
718                                 }
719                                 *el = msg->elements[i];
720                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
721                                 if (el->values == NULL) {
722                                         ret = LDB_ERR_OTHER;
723                                         goto done;
724                                 }
725                                 for (j = 0; j < el->num_values; j++) {
726                                         el->values[j] = msg->elements[i].values[j];
727                                 }
728                         }
729
730                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
731                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
732                                                        el->name, ldb_dn_get_linearized(msg2->dn));
733                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
734                                 goto done;
735                         }
736
737                         /* Checks if element already exists */
738                         idx = find_element(msg2, el->name);
739                         if (idx == -1) {
740                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
741                                         ret = LDB_ERR_OTHER;
742                                         goto done;
743                                 }
744                                 ret = ltdb_index_add_element(module, msg2->dn,
745                                                              el);
746                                 if (ret != LDB_SUCCESS) {
747                                         goto done;
748                                 }
749                         } else {
750                                 j = (unsigned int) idx;
751                                 el2 = &(msg2->elements[j]);
752
753                                 /* We cannot add another value on a existing one
754                                    if the attribute is single-valued */
755                                 if (ldb_tdb_single_valued(a, el)) {
756                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
757                                                                el->name, ldb_dn_get_linearized(msg2->dn));
758                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
759                                         goto done;
760                                 }
761
762                                 /* Check that values don't exist yet on multi-
763                                    valued attributes or aren't provided twice */
764                                 for (j = 0; j < el->num_values; j++) {
765                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
766                                                 if (control_permissive) {
767                                                         /* remove this one as if it was never added */
768                                                         el->num_values--;
769                                                         for (k = j; k < el->num_values; k++) {
770                                                                 el->values[k] = el->values[k + 1];
771                                                         }
772                                                         j--; /* rewind */
773
774                                                         continue;
775                                                 }
776
777                                                 ldb_asprintf_errstring(ldb,
778                                                                        "attribute '%s': value #%u on '%s' already exists",
779                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
780                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
781                                                 goto done;
782                                         }
783                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
784                                                 ldb_asprintf_errstring(ldb,
785                                                                        "attribute '%s': value #%u on '%s' provided more than once",
786                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
787                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
788                                                 goto done;
789                                         }
790                                 }
791
792                                 /* Now combine existing and new values to a new
793                                    attribute record */
794                                 vals = talloc_realloc(msg2->elements,
795                                                       el2->values, struct ldb_val,
796                                                       el2->num_values + el->num_values);
797                                 if (vals == NULL) {
798                                         ldb_oom(ldb);
799                                         ret = LDB_ERR_OTHER;
800                                         goto done;
801                                 }
802
803                                 for (j=0; j<el->num_values; j++) {
804                                         vals[el2->num_values + j] =
805                                                 ldb_val_dup(vals, &el->values[j]);
806                                 }
807
808                                 el2->values = vals;
809                                 el2->num_values += el->num_values;
810
811                                 ret = ltdb_index_add_element(module, msg2->dn, el);
812                                 if (ret != LDB_SUCCESS) {
813                                         goto done;
814                                 }
815                         }
816
817                         break;
818
819                 case LDB_FLAG_MOD_REPLACE:
820
821                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
822                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
823                                                        el->name, ldb_dn_get_linearized(msg2->dn));
824                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
825                                 goto done;
826                         }
827
828                         /* TODO: This is O(n^2) - replace with more efficient check */
829                         for (j=0; j<el->num_values; j++) {
830                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
831                                         ldb_asprintf_errstring(ldb,
832                                                                "attribute '%s': value #%u on '%s' provided more than once",
833                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
834                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
835                                         goto done;
836                                 }
837                         }
838
839                         /* Checks if element already exists */
840                         idx = find_element(msg2, el->name);
841                         if (idx != -1) {
842                                 j = (unsigned int) idx;
843                                 el2 = &(msg2->elements[j]);
844
845                                 /* we consider two elements to be
846                                  * equal only if the order
847                                  * matches. This allows dbcheck to
848                                  * fix the ordering on attributes
849                                  * where order matters, such as
850                                  * objectClass
851                                  */
852                                 if (ldb_msg_element_equal_ordered(el, el2)) {
853                                         continue;
854                                 }
855
856                                 /* Delete the attribute if it exists in the DB */
857                                 if (msg_delete_attribute(module, ldb, msg2,
858                                                          el->name) != 0) {
859                                         ret = LDB_ERR_OTHER;
860                                         goto done;
861                                 }
862                         }
863
864                         /* Recreate it with the new values */
865                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
866                                 ret = LDB_ERR_OTHER;
867                                 goto done;
868                         }
869
870                         ret = ltdb_index_add_element(module, msg2->dn, el);
871                         if (ret != LDB_SUCCESS) {
872                                 goto done;
873                         }
874
875                         break;
876
877                 case LDB_FLAG_MOD_DELETE:
878                         dn = ldb_dn_get_linearized(msg2->dn);
879                         if (dn == NULL) {
880                                 ret = LDB_ERR_OTHER;
881                                 goto done;
882                         }
883
884                         if (msg->elements[i].num_values == 0) {
885                                 /* Delete the whole attribute */
886                                 ret = msg_delete_attribute(module, ldb, msg2,
887                                                            msg->elements[i].name);
888                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
889                                     control_permissive) {
890                                         ret = LDB_SUCCESS;
891                                 } else {
892                                         ldb_asprintf_errstring(ldb,
893                                                                "attribute '%s': no such attribute for delete on '%s'",
894                                                                msg->elements[i].name, dn);
895                                 }
896                                 if (ret != LDB_SUCCESS) {
897                                         goto done;
898                                 }
899                         } else {
900                                 /* Delete specified values from an attribute */
901                                 for (j=0; j < msg->elements[i].num_values; j++) {
902                                         ret = msg_delete_element(module,
903                                                                  msg2,
904                                                                  msg->elements[i].name,
905                                                                  &msg->elements[i].values[j]);
906                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
907                                             control_permissive) {
908                                                 ret = LDB_SUCCESS;
909                                         } else {
910                                                 ldb_asprintf_errstring(ldb,
911                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
912                                                                        msg->elements[i].name, dn);
913                                         }
914                                         if (ret != LDB_SUCCESS) {
915                                                 goto done;
916                                         }
917                                 }
918                         }
919                         break;
920                 default:
921                         ldb_asprintf_errstring(ldb,
922                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
923                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
924                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
925                         ret = LDB_ERR_PROTOCOL_ERROR;
926                         goto done;
927                 }
928         }
929
930         ret = ltdb_store(module, msg2, TDB_MODIFY);
931         if (ret != LDB_SUCCESS) {
932                 goto done;
933         }
934
935         ret = ltdb_modified(module, msg2->dn);
936         if (ret != LDB_SUCCESS) {
937                 goto done;
938         }
939
940 done:
941         talloc_free(tdb_key.dptr);
942         return ret;
943 }
944
945 /*
946   modify a record
947 */
948 static int ltdb_modify(struct ltdb_context *ctx)
949 {
950         struct ldb_module *module = ctx->module;
951         struct ldb_request *req = ctx->req;
952         int ret = LDB_SUCCESS;
953
954         ret = ltdb_check_special_dn(module, req->op.mod.message);
955         if (ret != LDB_SUCCESS) {
956                 return ret;
957         }
958
959         ldb_request_set_state(req, LDB_ASYNC_PENDING);
960
961         if (ltdb_cache_load(module) != 0) {
962                 return LDB_ERR_OPERATIONS_ERROR;
963         }
964
965         ret = ltdb_modify_internal(module, req->op.mod.message, req);
966
967         return ret;
968 }
969
970 /*
971   rename a record
972 */
973 static int ltdb_rename(struct ltdb_context *ctx)
974 {
975         struct ldb_module *module = ctx->module;
976         void *data = ldb_module_get_private(module);
977         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
978         struct ldb_request *req = ctx->req;
979         struct ldb_message *msg;
980         int ret = LDB_SUCCESS;
981         TDB_DATA tdb_key, tdb_key_old;
982
983         ldb_request_set_state(req, LDB_ASYNC_PENDING);
984
985         if (ltdb_cache_load(ctx->module) != 0) {
986                 return LDB_ERR_OPERATIONS_ERROR;
987         }
988
989         msg = ldb_msg_new(ctx);
990         if (msg == NULL) {
991                 return LDB_ERR_OPERATIONS_ERROR;
992         }
993
994         /* we need to fetch the old record to re-add under the new name */
995         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
996         if (ret != LDB_SUCCESS) {
997                 /* not finding the old record is an error */
998                 return ret;
999         }
1000
1001         /* We need to, before changing the DB, check if the new DN
1002          * exists, so we can return this error to the caller with an
1003          * unmodified DB */
1004         tdb_key = ltdb_key(module, req->op.rename.newdn);
1005         if (!tdb_key.dptr) {
1006                 talloc_free(msg);
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1011         if (!tdb_key_old.dptr) {
1012                 talloc_free(msg);
1013                 talloc_free(tdb_key.dptr);
1014                 return LDB_ERR_OPERATIONS_ERROR;
1015         }
1016
1017         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1018         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1019                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1020                         talloc_free(tdb_key_old.dptr);
1021                         talloc_free(tdb_key.dptr);
1022                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1023                                                "Entry %s already exists",
1024                                                ldb_dn_get_linearized(msg->dn));
1025                         /* finding the new record already in the DB is an error */
1026                         talloc_free(msg);
1027                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1028                 }
1029         }
1030         talloc_free(tdb_key_old.dptr);
1031         talloc_free(tdb_key.dptr);
1032
1033         /* Always delete first then add, to avoid conflicts with
1034          * unique indexes. We rely on the transaction to make this
1035          * atomic
1036          */
1037         ret = ltdb_delete_internal(module, msg->dn);
1038         if (ret != LDB_SUCCESS) {
1039                 talloc_free(msg);
1040                 return ret;
1041         }
1042
1043         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1044         if (msg->dn == NULL) {
1045                 talloc_free(msg);
1046                 return LDB_ERR_OPERATIONS_ERROR;
1047         }
1048
1049         /* We don't check single value as we can have more than 1 with
1050          * deleted attributes. We could go through all elements but that's
1051          * maybe not the most efficient way
1052          */
1053         ret = ltdb_add_internal(module, msg, false);
1054
1055         talloc_free(msg);
1056
1057         return ret;
1058 }
1059
1060 static int ltdb_start_trans(struct ldb_module *module)
1061 {
1062         void *data = ldb_module_get_private(module);
1063         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1064
1065         if (tdb_transaction_start(ltdb->tdb) != 0) {
1066                 return ltdb_err_map(tdb_error(ltdb->tdb));
1067         }
1068
1069         ltdb->in_transaction++;
1070
1071         ltdb_index_transaction_start(module);
1072
1073         return LDB_SUCCESS;
1074 }
1075
1076 static int ltdb_prepare_commit(struct ldb_module *module)
1077 {
1078         void *data = ldb_module_get_private(module);
1079         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1080
1081         if (ltdb->in_transaction != 1) {
1082                 return LDB_SUCCESS;
1083         }
1084
1085         if (ltdb_index_transaction_commit(module) != 0) {
1086                 tdb_transaction_cancel(ltdb->tdb);
1087                 ltdb->in_transaction--;
1088                 return ltdb_err_map(tdb_error(ltdb->tdb));
1089         }
1090
1091         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1092                 ltdb->in_transaction--;
1093                 return ltdb_err_map(tdb_error(ltdb->tdb));
1094         }
1095
1096         ltdb->prepared_commit = true;
1097
1098         return LDB_SUCCESS;
1099 }
1100
1101 static int ltdb_end_trans(struct ldb_module *module)
1102 {
1103         void *data = ldb_module_get_private(module);
1104         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1105
1106         if (!ltdb->prepared_commit) {
1107                 int ret = ltdb_prepare_commit(module);
1108                 if (ret != LDB_SUCCESS) {
1109                         return ret;
1110                 }
1111         }
1112
1113         ltdb->in_transaction--;
1114         ltdb->prepared_commit = false;
1115
1116         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1117                 return ltdb_err_map(tdb_error(ltdb->tdb));
1118         }
1119
1120         return LDB_SUCCESS;
1121 }
1122
1123 static int ltdb_del_trans(struct ldb_module *module)
1124 {
1125         void *data = ldb_module_get_private(module);
1126         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1127
1128         ltdb->in_transaction--;
1129
1130         if (ltdb_index_transaction_cancel(module) != 0) {
1131                 tdb_transaction_cancel(ltdb->tdb);
1132                 return ltdb_err_map(tdb_error(ltdb->tdb));
1133         }
1134
1135         tdb_transaction_cancel(ltdb->tdb);
1136         return LDB_SUCCESS;
1137 }
1138
1139 /*
1140   return sequenceNumber from @BASEINFO
1141 */
1142 static int ltdb_sequence_number(struct ltdb_context *ctx,
1143                                 struct ldb_extended **ext)
1144 {
1145         struct ldb_context *ldb;
1146         struct ldb_module *module = ctx->module;
1147         struct ldb_request *req = ctx->req;
1148         TALLOC_CTX *tmp_ctx = NULL;
1149         struct ldb_seqnum_request *seq;
1150         struct ldb_seqnum_result *res;
1151         struct ldb_message *msg = NULL;
1152         struct ldb_dn *dn;
1153         const char *date;
1154         int ret = LDB_SUCCESS;
1155
1156         ldb = ldb_module_get_ctx(module);
1157
1158         seq = talloc_get_type(req->op.extended.data,
1159                                 struct ldb_seqnum_request);
1160         if (seq == NULL) {
1161                 return LDB_ERR_OPERATIONS_ERROR;
1162         }
1163
1164         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1165
1166         if (ltdb_lock_read(module) != 0) {
1167                 return LDB_ERR_OPERATIONS_ERROR;
1168         }
1169
1170         res = talloc_zero(req, struct ldb_seqnum_result);
1171         if (res == NULL) {
1172                 ret = LDB_ERR_OPERATIONS_ERROR;
1173                 goto done;
1174         }
1175
1176         tmp_ctx = talloc_new(req);
1177         if (tmp_ctx == NULL) {
1178                 ret = LDB_ERR_OPERATIONS_ERROR;
1179                 goto done;
1180         }
1181
1182         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1183         if (dn == NULL) {
1184                 ret = LDB_ERR_OPERATIONS_ERROR;
1185                 goto done;
1186         }
1187
1188         msg = ldb_msg_new(tmp_ctx);
1189         if (msg == NULL) {
1190                 ret = LDB_ERR_OPERATIONS_ERROR;
1191                 goto done;
1192         }
1193
1194         ret = ltdb_search_dn1(module, dn, msg);
1195         if (ret != LDB_SUCCESS) {
1196                 goto done;
1197         }
1198
1199         switch (seq->type) {
1200         case LDB_SEQ_HIGHEST_SEQ:
1201                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1202                 break;
1203         case LDB_SEQ_NEXT:
1204                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1205                 res->seq_num++;
1206                 break;
1207         case LDB_SEQ_HIGHEST_TIMESTAMP:
1208                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1209                 if (date) {
1210                         res->seq_num = ldb_string_to_time(date);
1211                 } else {
1212                         res->seq_num = 0;
1213                         /* zero is as good as anything when we don't know */
1214                 }
1215                 break;
1216         }
1217
1218         *ext = talloc_zero(req, struct ldb_extended);
1219         if (*ext == NULL) {
1220                 ret = LDB_ERR_OPERATIONS_ERROR;
1221                 goto done;
1222         }
1223         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1224         (*ext)->data = talloc_steal(*ext, res);
1225
1226 done:
1227         talloc_free(tmp_ctx);
1228         ltdb_unlock_read(module);
1229         return ret;
1230 }
1231
1232 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1233 {
1234         struct ldb_context *ldb;
1235         struct ldb_request *req;
1236         struct ldb_reply *ares;
1237
1238         ldb = ldb_module_get_ctx(ctx->module);
1239         req = ctx->req;
1240
1241         /* if we already returned an error just return */
1242         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1243                 return;
1244         }
1245
1246         ares = talloc_zero(req, struct ldb_reply);
1247         if (!ares) {
1248                 ldb_oom(ldb);
1249                 req->callback(req, NULL);
1250                 return;
1251         }
1252         ares->type = LDB_REPLY_DONE;
1253         ares->error = error;
1254
1255         req->callback(req, ares);
1256 }
1257
1258 static void ltdb_timeout(struct tevent_context *ev,
1259                           struct tevent_timer *te,
1260                           struct timeval t,
1261                           void *private_data)
1262 {
1263         struct ltdb_context *ctx;
1264         ctx = talloc_get_type(private_data, struct ltdb_context);
1265
1266         if (!ctx->request_terminated) {
1267                 /* request is done now */
1268                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1269         }
1270
1271         if (ctx->spy) {
1272                 /* neutralize the spy */
1273                 ctx->spy->ctx = NULL;
1274                 ctx->spy = NULL;
1275         }
1276         talloc_free(ctx);
1277 }
1278
1279 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1280                                         struct ldb_extended *ext,
1281                                         int error)
1282 {
1283         struct ldb_context *ldb;
1284         struct ldb_request *req;
1285         struct ldb_reply *ares;
1286
1287         ldb = ldb_module_get_ctx(ctx->module);
1288         req = ctx->req;
1289
1290         /* if we already returned an error just return */
1291         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1292                 return;
1293         }
1294
1295         ares = talloc_zero(req, struct ldb_reply);
1296         if (!ares) {
1297                 ldb_oom(ldb);
1298                 req->callback(req, NULL);
1299                 return;
1300         }
1301         ares->type = LDB_REPLY_DONE;
1302         ares->response = ext;
1303         ares->error = error;
1304
1305         req->callback(req, ares);
1306 }
1307
1308 static void ltdb_handle_extended(struct ltdb_context *ctx)
1309 {
1310         struct ldb_extended *ext = NULL;
1311         int ret;
1312
1313         if (strcmp(ctx->req->op.extended.oid,
1314                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1315                 /* get sequence number */
1316                 ret = ltdb_sequence_number(ctx, &ext);
1317         } else {
1318                 /* not recognized */
1319                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1320         }
1321
1322         ltdb_request_extended_done(ctx, ext, ret);
1323 }
1324
1325 static void ltdb_callback(struct tevent_context *ev,
1326                           struct tevent_timer *te,
1327                           struct timeval t,
1328                           void *private_data)
1329 {
1330         struct ltdb_context *ctx;
1331         int ret;
1332
1333         ctx = talloc_get_type(private_data, struct ltdb_context);
1334
1335         if (ctx->request_terminated) {
1336                 goto done;
1337         }
1338
1339         switch (ctx->req->operation) {
1340         case LDB_SEARCH:
1341                 ret = ltdb_search(ctx);
1342                 break;
1343         case LDB_ADD:
1344                 ret = ltdb_add(ctx);
1345                 break;
1346         case LDB_MODIFY:
1347                 ret = ltdb_modify(ctx);
1348                 break;
1349         case LDB_DELETE:
1350                 ret = ltdb_delete(ctx);
1351                 break;
1352         case LDB_RENAME:
1353                 ret = ltdb_rename(ctx);
1354                 break;
1355         case LDB_EXTENDED:
1356                 ltdb_handle_extended(ctx);
1357                 goto done;
1358         default:
1359                 /* no other op supported */
1360                 ret = LDB_ERR_PROTOCOL_ERROR;
1361         }
1362
1363         if (!ctx->request_terminated) {
1364                 /* request is done now */
1365                 ltdb_request_done(ctx, ret);
1366         }
1367
1368 done:
1369         if (ctx->spy) {
1370                 /* neutralize the spy */
1371                 ctx->spy->ctx = NULL;
1372                 ctx->spy = NULL;
1373         }
1374         talloc_free(ctx);
1375 }
1376
1377 static int ltdb_request_destructor(void *ptr)
1378 {
1379         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1380
1381         if (spy->ctx != NULL) {
1382                 spy->ctx->spy = NULL;
1383                 spy->ctx->request_terminated = true;
1384                 spy->ctx = NULL;
1385         }
1386
1387         return 0;
1388 }
1389
1390 static int ltdb_handle_request(struct ldb_module *module,
1391                                struct ldb_request *req)
1392 {
1393         struct ldb_control *control_permissive;
1394         struct ldb_context *ldb;
1395         struct tevent_context *ev;
1396         struct ltdb_context *ac;
1397         struct tevent_timer *te;
1398         struct timeval tv;
1399         unsigned int i;
1400
1401         ldb = ldb_module_get_ctx(module);
1402
1403         control_permissive = ldb_request_get_control(req,
1404                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1405
1406         for (i = 0; req->controls && req->controls[i]; i++) {
1407                 if (req->controls[i]->critical &&
1408                     req->controls[i] != control_permissive) {
1409                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1410                                                req->controls[i]->oid);
1411                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1412                 }
1413         }
1414
1415         if (req->starttime == 0 || req->timeout == 0) {
1416                 ldb_set_errstring(ldb, "Invalid timeout settings");
1417                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1418         }
1419
1420         ev = ldb_get_event_context(ldb);
1421
1422         ac = talloc_zero(ldb, struct ltdb_context);
1423         if (ac == NULL) {
1424                 ldb_oom(ldb);
1425                 return LDB_ERR_OPERATIONS_ERROR;
1426         }
1427
1428         ac->module = module;
1429         ac->req = req;
1430
1431         tv.tv_sec = 0;
1432         tv.tv_usec = 0;
1433         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1434         if (NULL == te) {
1435                 talloc_free(ac);
1436                 return LDB_ERR_OPERATIONS_ERROR;
1437         }
1438
1439         tv.tv_sec = req->starttime + req->timeout;
1440         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1441         if (NULL == ac->timeout_event) {
1442                 talloc_free(ac);
1443                 return LDB_ERR_OPERATIONS_ERROR;
1444         }
1445
1446         /* set a spy so that we do not try to use the request context
1447          * if it is freed before ltdb_callback fires */
1448         ac->spy = talloc(req, struct ltdb_req_spy);
1449         if (NULL == ac->spy) {
1450                 talloc_free(ac);
1451                 return LDB_ERR_OPERATIONS_ERROR;
1452         }
1453         ac->spy->ctx = ac;
1454
1455         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1456
1457         return LDB_SUCCESS;
1458 }
1459
1460 static int ltdb_init_rootdse(struct ldb_module *module)
1461 {
1462         /* ignore errors on this - we expect it for non-sam databases */
1463         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1464
1465         /* there can be no module beyond the backend, just return */
1466         return LDB_SUCCESS;
1467 }
1468
1469 static const struct ldb_module_ops ltdb_ops = {
1470         .name              = "tdb",
1471         .init_context      = ltdb_init_rootdse,
1472         .search            = ltdb_handle_request,
1473         .add               = ltdb_handle_request,
1474         .modify            = ltdb_handle_request,
1475         .del               = ltdb_handle_request,
1476         .rename            = ltdb_handle_request,
1477         .extended          = ltdb_handle_request,
1478         .start_transaction = ltdb_start_trans,
1479         .end_transaction   = ltdb_end_trans,
1480         .prepare_commit    = ltdb_prepare_commit,
1481         .del_transaction   = ltdb_del_trans,
1482 };
1483
1484 /*
1485   connect to the database
1486 */
1487 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1488                         unsigned int flags, const char *options[],
1489                         struct ldb_module **_module)
1490 {
1491         struct ldb_module *module;
1492         const char *path;
1493         int tdb_flags, open_flags;
1494         struct ltdb_private *ltdb;
1495
1496         /* parse the url */
1497         if (strchr(url, ':')) {
1498                 if (strncmp(url, "tdb://", 6) != 0) {
1499                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1500                                   "Invalid tdb URL '%s'", url);
1501                         return LDB_ERR_OPERATIONS_ERROR;
1502                 }
1503                 path = url+6;
1504         } else {
1505                 path = url;
1506         }
1507
1508         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1509
1510         /* check for the 'nosync' option */
1511         if (flags & LDB_FLG_NOSYNC) {
1512                 tdb_flags |= TDB_NOSYNC;
1513         }
1514
1515         /* and nommap option */
1516         if (flags & LDB_FLG_NOMMAP) {
1517                 tdb_flags |= TDB_NOMMAP;
1518         }
1519
1520         if (flags & LDB_FLG_RDONLY) {
1521                 open_flags = O_RDONLY;
1522         } else {
1523                 open_flags = O_CREAT | O_RDWR;
1524         }
1525
1526         ltdb = talloc_zero(ldb, struct ltdb_private);
1527         if (!ltdb) {
1528                 ldb_oom(ldb);
1529                 return LDB_ERR_OPERATIONS_ERROR;
1530         }
1531
1532         /* note that we use quite a large default hash size */
1533         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1534                                    tdb_flags, open_flags,
1535                                    ldb_get_create_perms(ldb), ldb);
1536         if (!ltdb->tdb) {
1537                 ldb_asprintf_errstring(ldb,
1538                                        "Unable to open tdb '%s'", path);
1539                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1540                           "Unable to open tdb '%s'", path);
1541                 talloc_free(ltdb);
1542                 return LDB_ERR_OPERATIONS_ERROR;
1543         }
1544
1545         if (getenv("LDB_WARN_UNINDEXED")) {
1546                 ltdb->warn_unindexed = true;
1547         }
1548
1549         ltdb->sequence_number = 0;
1550
1551         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1552         if (!module) {
1553                 ldb_oom(ldb);
1554                 talloc_free(ltdb);
1555                 return LDB_ERR_OPERATIONS_ERROR;
1556         }
1557         ldb_module_set_private(module, ltdb);
1558         talloc_steal(module, ltdb);
1559
1560         if (ltdb_cache_load(module) != 0) {
1561                 ldb_asprintf_errstring(ldb,
1562                                        "Unable to load ltdb cache records of tdb '%s'", path);
1563                 talloc_free(module);
1564                 return LDB_ERR_OPERATIONS_ERROR;
1565         }
1566
1567         *_module = module;
1568         return LDB_SUCCESS;
1569 }
1570
1571 int ldb_tdb_init(const char *version)
1572 {
1573         LDB_MODULE_CHECK_VERSION(version);
1574         return ldb_register_backend("tdb", ltdb_connect, false);
1575 }