ldb_tdb: Delete a successful tdb_store on index add fail in ltdb_add_internal()
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, struct ldb_dn *dn)
179 {
180         struct ldb_context *ldb = ldb_module_get_ctx(module);
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(ldb, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /*
225   form a TDB_DATA for a record key
226   caller frees
227
228   note that the key for a record can depend on whether the
229   dn refers to a case sensitive index record or not
230 */
231 TDB_DATA ltdb_key_msg(struct ldb_module *module,
232                       const struct ldb_message *msg)
233 {
234         return ltdb_key_dn(module, msg->dn);
235 }
236
237 /*
238   check special dn's have valid attributes
239   currently only @ATTRIBUTES is checked
240 */
241 static int ltdb_check_special_dn(struct ldb_module *module,
242                                  const struct ldb_message *msg)
243 {
244         struct ldb_context *ldb = ldb_module_get_ctx(module);
245         unsigned int i, j;
246
247         if (! ldb_dn_is_special(msg->dn) ||
248             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
249                 return LDB_SUCCESS;
250         }
251
252         /* we have @ATTRIBUTES, let's check attributes are fine */
253         /* should we check that we deny multivalued attributes ? */
254         for (i = 0; i < msg->num_elements; i++) {
255                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
256
257                 for (j = 0; j < msg->elements[i].num_values; j++) {
258                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
259                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
260                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
261                         }
262                 }
263         }
264
265         return LDB_SUCCESS;
266 }
267
268
269 /*
270   we've made a modification to a dn - possibly reindex and
271   update sequence number
272 */
273 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
274 {
275         int ret = LDB_SUCCESS;
276         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
277
278         /* only allow modifies inside a transaction, otherwise the
279          * ldb is unsafe */
280         if (ltdb->in_transaction == 0) {
281                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284
285         if (ldb_dn_is_special(dn) &&
286             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
287              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
288         {
289                 if (ltdb->warn_reindex) {
290                         ldb_debug(ldb_module_get_ctx(module),
291                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
292                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
293                 }
294                 ret = ltdb_reindex(module);
295         }
296
297         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
298         if (ret == LDB_SUCCESS &&
299             !(ldb_dn_is_special(dn) &&
300               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
301                 ret = ltdb_increase_sequence_number(module);
302         }
303
304         /* If the modify was to @OPTIONS, reload the cache */
305         if (ret == LDB_SUCCESS &&
306             ldb_dn_is_special(dn) &&
307             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
308                 ret = ltdb_cache_reload(module);
309         }
310
311         return ret;
312 }
313
314 /*
315   store a record into the db
316 */
317 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         TDB_DATA tdb_key, tdb_data;
322         struct ldb_val ldb_data;
323         int ret = LDB_SUCCESS;
324
325         if (ltdb->read_only) {
326                 return LDB_ERR_UNWILLING_TO_PERFORM;
327         }
328
329         tdb_key = ltdb_key_msg(module, msg);
330         if (tdb_key.dptr == NULL) {
331                 return LDB_ERR_OTHER;
332         }
333
334         ret = ldb_pack_data(ldb_module_get_ctx(module),
335                             msg, &ldb_data);
336         if (ret == -1) {
337                 talloc_free(tdb_key.dptr);
338                 return LDB_ERR_OTHER;
339         }
340
341         tdb_data.dptr = ldb_data.data;
342         tdb_data.dsize = ldb_data.length;
343
344         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
345         if (ret != 0) {
346                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
347                 goto done;
348         }
349
350 done:
351         talloc_free(tdb_key.dptr);
352         talloc_free(ldb_data.data);
353
354         return ret;
355 }
356
357
358 /*
359   check if a attribute is a single valued, for a given element
360  */
361 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
362                                   struct ldb_message_element *el)
363 {
364         if (!a) return false;
365         if (el != NULL) {
366                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
367                         /* override from a ldb module, for example
368                            used for the description field, which is
369                            marked multi-valued in the schema but which
370                            should not actually accept multiple
371                            values */
372                         return true;
373                 }
374                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
375                         /* override from a ldb module, for example used for
376                            deleted linked attribute entries */
377                         return false;
378                 }
379         }
380         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
381                 return true;
382         }
383         return false;
384 }
385
386 static int ltdb_add_internal(struct ldb_module *module,
387                              struct ltdb_private *ltdb,
388                              const struct ldb_message *msg,
389                              bool check_single_value)
390 {
391         struct ldb_context *ldb = ldb_module_get_ctx(module);
392         int ret = LDB_SUCCESS;
393         unsigned int i;
394
395         for (i=0;i<msg->num_elements;i++) {
396                 struct ldb_message_element *el = &msg->elements[i];
397                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
398
399                 if (el->num_values == 0) {
400                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
401                                                el->name, ldb_dn_get_linearized(msg->dn));
402                         return LDB_ERR_CONSTRAINT_VIOLATION;
403                 }
404                 if (check_single_value &&
405                                 el->num_values > 1 &&
406                                 ldb_tdb_single_valued(a, el)) {
407                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
408                                                el->name, ldb_dn_get_linearized(msg->dn));
409                         return LDB_ERR_CONSTRAINT_VIOLATION;
410                 }
411
412                 /* Do not check "@ATTRIBUTES" for duplicated values */
413                 if (ldb_dn_is_special(msg->dn) &&
414                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
415                         continue;
416                 }
417
418                 if (check_single_value &&
419                     !(el->flags &
420                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
421                         struct ldb_val *duplicate = NULL;
422
423                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
424                                                          el, &duplicate, 0);
425                         if (ret != LDB_SUCCESS) {
426                                 return ret;
427                         }
428                         if (duplicate != NULL) {
429                                 ldb_asprintf_errstring(
430                                         ldb,
431                                         "attribute '%s': value '%.*s' on '%s' "
432                                         "provided more than once in ADD object",
433                                         el->name,
434                                         (int)duplicate->length,
435                                         duplicate->data,
436                                         ldb_dn_get_linearized(msg->dn));
437                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
438                         }
439                 }
440         }
441
442         ret = ltdb_store(module, msg, TDB_INSERT);
443         if (ret != LDB_SUCCESS) {
444                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
445                         ldb_asprintf_errstring(ldb,
446                                                "Entry %s already exists",
447                                                ldb_dn_get_linearized(msg->dn));
448                 }
449                 return ret;
450         }
451
452         ret = ltdb_index_add_new(module, ltdb, msg);
453         if (ret != LDB_SUCCESS) {
454                 /*
455                  * If we failed to index, delete the message again.
456                  *
457                  * This is particularly important for the GUID index
458                  * case, which will only fail for a duplicate DN
459                  * in the index add.
460                  *
461                  * Note that the caller may not cancel the transation
462                  * and this means the above add might really show up!
463                  */
464                 ltdb_delete_noindex(module, msg);
465                 return ret;
466         }
467
468         ret = ltdb_modified(module, msg->dn);
469
470         return ret;
471 }
472
473 /*
474   add a record to the database
475 */
476 static int ltdb_add(struct ltdb_context *ctx)
477 {
478         struct ldb_module *module = ctx->module;
479         struct ldb_request *req = ctx->req;
480         void *data = ldb_module_get_private(module);
481         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
482         int ret = LDB_SUCCESS;
483
484         ret = ltdb_check_special_dn(module, req->op.add.message);
485         if (ret != LDB_SUCCESS) {
486                 return ret;
487         }
488
489         ldb_request_set_state(req, LDB_ASYNC_PENDING);
490
491         if (ltdb_cache_load(module) != 0) {
492                 return LDB_ERR_OPERATIONS_ERROR;
493         }
494
495         ret = ltdb_add_internal(module, ltdb,
496                                 req->op.add.message, true);
497
498         return ret;
499 }
500
501 /*
502   delete a record from the database, not updating indexes (used for deleting
503   index records)
504 */
505 int ltdb_delete_noindex(struct ldb_module *module,
506                         const struct ldb_message *msg)
507 {
508         void *data = ldb_module_get_private(module);
509         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
510         TDB_DATA tdb_key;
511         int ret;
512
513         if (ltdb->read_only) {
514                 return LDB_ERR_UNWILLING_TO_PERFORM;
515         }
516
517         tdb_key = ltdb_key_dn(module, msg->dn);
518         if (!tdb_key.dptr) {
519                 return LDB_ERR_OTHER;
520         }
521
522         ret = tdb_delete(ltdb->tdb, tdb_key);
523         talloc_free(tdb_key.dptr);
524
525         if (ret != 0) {
526                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
527         }
528
529         return ret;
530 }
531
532 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
533 {
534         struct ldb_message *msg;
535         int ret = LDB_SUCCESS;
536
537         msg = ldb_msg_new(module);
538         if (msg == NULL) {
539                 return LDB_ERR_OPERATIONS_ERROR;
540         }
541
542         /* in case any attribute of the message was indexed, we need
543            to fetch the old record */
544         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
545         if (ret != LDB_SUCCESS) {
546                 /* not finding the old record is an error */
547                 goto done;
548         }
549
550         ret = ltdb_delete_noindex(module, msg);
551         if (ret != LDB_SUCCESS) {
552                 goto done;
553         }
554
555         /* remove any indexed attributes */
556         ret = ltdb_index_delete(module, msg);
557         if (ret != LDB_SUCCESS) {
558                 goto done;
559         }
560
561         ret = ltdb_modified(module, dn);
562         if (ret != LDB_SUCCESS) {
563                 goto done;
564         }
565
566 done:
567         talloc_free(msg);
568         return ret;
569 }
570
571 /*
572   delete a record from the database
573 */
574 static int ltdb_delete(struct ltdb_context *ctx)
575 {
576         struct ldb_module *module = ctx->module;
577         struct ldb_request *req = ctx->req;
578         int ret = LDB_SUCCESS;
579
580         ldb_request_set_state(req, LDB_ASYNC_PENDING);
581
582         if (ltdb_cache_load(module) != 0) {
583                 return LDB_ERR_OPERATIONS_ERROR;
584         }
585
586         ret = ltdb_delete_internal(module, req->op.del.dn);
587
588         return ret;
589 }
590
591 /*
592   find an element by attribute name. At the moment this does a linear search,
593   it should be re-coded to use a binary search once all places that modify
594   records guarantee sorted order
595
596   return the index of the first matching element if found, otherwise -1
597 */
598 static int find_element(const struct ldb_message *msg, const char *name)
599 {
600         unsigned int i;
601         for (i=0;i<msg->num_elements;i++) {
602                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
603                         return i;
604                 }
605         }
606         return -1;
607 }
608
609
610 /*
611   add an element to an existing record. Assumes a elements array that we
612   can call re-alloc on, and assumed that we can re-use the data pointers from
613   the passed in additional values. Use with care!
614
615   returns 0 on success, -1 on failure (and sets errno)
616 */
617 static int ltdb_msg_add_element(struct ldb_message *msg,
618                                 struct ldb_message_element *el)
619 {
620         struct ldb_message_element *e2;
621         unsigned int i;
622
623         if (el->num_values == 0) {
624                 /* nothing to do here - we don't add empty elements */
625                 return 0;
626         }
627
628         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
629                               msg->num_elements+1);
630         if (!e2) {
631                 errno = ENOMEM;
632                 return -1;
633         }
634
635         msg->elements = e2;
636
637         e2 = &msg->elements[msg->num_elements];
638
639         e2->name = el->name;
640         e2->flags = el->flags;
641         e2->values = talloc_array(msg->elements,
642                                   struct ldb_val, el->num_values);
643         if (!e2->values) {
644                 errno = ENOMEM;
645                 return -1;
646         }
647         for (i=0;i<el->num_values;i++) {
648                 e2->values[i] = el->values[i];
649         }
650         e2->num_values = el->num_values;
651
652         ++msg->num_elements;
653
654         return 0;
655 }
656
657 /*
658   delete all elements having a specified attribute name
659 */
660 static int msg_delete_attribute(struct ldb_module *module,
661                                 struct ltdb_private *ltdb,
662                                 struct ldb_message *msg, const char *name)
663 {
664         unsigned int i;
665         int ret;
666         struct ldb_message_element *el;
667         bool is_special = ldb_dn_is_special(msg->dn);
668
669         if (!is_special
670             && ltdb->cache->GUID_index_attribute != NULL
671             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
672                 struct ldb_context *ldb = ldb_module_get_ctx(module);
673                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
674                                        "attribute %s (used as DB index)",
675                                        ltdb->cache->GUID_index_attribute);
676                 return LDB_ERR_CONSTRAINT_VIOLATION;
677         }
678
679         el = ldb_msg_find_element(msg, name);
680         if (el == NULL) {
681                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
682         }
683         i = el - msg->elements;
684
685         ret = ltdb_index_del_element(module, ltdb, msg->dn, el);
686         if (ret != LDB_SUCCESS) {
687                 return ret;
688         }
689
690         talloc_free(el->values);
691         if (msg->num_elements > (i+1)) {
692                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
693         }
694         msg->num_elements--;
695         msg->elements = talloc_realloc(msg, msg->elements,
696                                        struct ldb_message_element,
697                                        msg->num_elements);
698         return LDB_SUCCESS;
699 }
700
701 /*
702   delete all elements matching an attribute name/value
703
704   return LDB Error on failure
705 */
706 static int msg_delete_element(struct ldb_module *module,
707                               struct ltdb_private *ltdb,
708                               struct ldb_message *msg,
709                               const char *name,
710                               const struct ldb_val *val)
711 {
712         struct ldb_context *ldb = ldb_module_get_ctx(module);
713         unsigned int i;
714         int found, ret;
715         struct ldb_message_element *el;
716         const struct ldb_schema_attribute *a;
717
718         found = find_element(msg, name);
719         if (found == -1) {
720                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
721         }
722
723         i = (unsigned int) found;
724         el = &(msg->elements[i]);
725
726         a = ldb_schema_attribute_by_name(ldb, el->name);
727
728         for (i=0;i<el->num_values;i++) {
729                 bool matched;
730                 if (a->syntax->operator_fn) {
731                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
732                                                      &el->values[i], val, &matched);
733                         if (ret != LDB_SUCCESS) return ret;
734                 } else {
735                         matched = (a->syntax->comparison_fn(ldb, ldb,
736                                                             &el->values[i], val) == 0);
737                 }
738                 if (matched) {
739                         if (el->num_values == 1) {
740                                 return msg_delete_attribute(module,
741                                                             ltdb, msg, name);
742                         }
743
744                         ret = ltdb_index_del_value(module, ltdb, msg->dn, el, i);
745                         if (ret != LDB_SUCCESS) {
746                                 return ret;
747                         }
748
749                         if (i<el->num_values-1) {
750                                 memmove(&el->values[i], &el->values[i+1],
751                                         sizeof(el->values[i])*
752                                                 (el->num_values-(i+1)));
753                         }
754                         el->num_values--;
755
756                         /* per definition we find in a canonicalised message an
757                            attribute value only once. So we are finished here */
758                         return LDB_SUCCESS;
759                 }
760         }
761
762         /* Not found */
763         return LDB_ERR_NO_SUCH_ATTRIBUTE;
764 }
765
766
767 /*
768   modify a record - internal interface
769
770   yuck - this is O(n^2). Luckily n is usually small so we probably
771   get away with it, but if we ever have really large attribute lists
772   then we'll need to look at this again
773
774   'req' is optional, and is used to specify controls if supplied
775 */
776 int ltdb_modify_internal(struct ldb_module *module,
777                          const struct ldb_message *msg,
778                          struct ldb_request *req)
779 {
780         struct ldb_context *ldb = ldb_module_get_ctx(module);
781         void *data = ldb_module_get_private(module);
782         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
783         struct ldb_message *msg2;
784         unsigned int i, j;
785         int ret = LDB_SUCCESS, idx;
786         struct ldb_control *control_permissive = NULL;
787         TALLOC_CTX *mem_ctx = talloc_new(req);
788
789         if (mem_ctx == NULL) {
790                 return ldb_module_oom(module);
791         }
792         
793         if (req) {
794                 control_permissive = ldb_request_get_control(req,
795                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
796         }
797
798         msg2 = ldb_msg_new(mem_ctx);
799         if (msg2 == NULL) {
800                 ret = LDB_ERR_OTHER;
801                 goto done;
802         }
803
804         ret = ltdb_search_dn1(module, msg->dn,
805                               msg2,
806                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
807         if (ret != LDB_SUCCESS) {
808                 goto done;
809         }
810
811         for (i=0; i<msg->num_elements; i++) {
812                 struct ldb_message_element *el = &msg->elements[i], *el2;
813                 struct ldb_val *vals;
814                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
815                 const char *dn;
816                 uint32_t options = 0;
817                 if (control_permissive != NULL) {
818                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
819                 }
820
821                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
822                 case LDB_FLAG_MOD_ADD:
823
824                         if (el->num_values == 0) {
825                                 ldb_asprintf_errstring(ldb,
826                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
827                                                        el->name, ldb_dn_get_linearized(msg2->dn));
828                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
829                                 goto done;
830                         }
831
832                         /* make a copy of the array so that a permissive
833                          * control can remove duplicates without changing the
834                          * original values, but do not copy data as we do not
835                          * need to keep it around once the operation is
836                          * finished */
837                         if (control_permissive) {
838                                 el = talloc(msg2, struct ldb_message_element);
839                                 if (!el) {
840                                         ret = LDB_ERR_OTHER;
841                                         goto done;
842                                 }
843                                 *el = msg->elements[i];
844                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
845                                 if (el->values == NULL) {
846                                         ret = LDB_ERR_OTHER;
847                                         goto done;
848                                 }
849                                 for (j = 0; j < el->num_values; j++) {
850                                         el->values[j] = msg->elements[i].values[j];
851                                 }
852                         }
853
854                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
855                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
856                                                        el->name, ldb_dn_get_linearized(msg2->dn));
857                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
858                                 goto done;
859                         }
860
861                         /* Checks if element already exists */
862                         idx = find_element(msg2, el->name);
863                         if (idx == -1) {
864                                 if (ltdb_msg_add_element(msg2, el) != 0) {
865                                         ret = LDB_ERR_OTHER;
866                                         goto done;
867                                 }
868                                 ret = ltdb_index_add_element(module, ltdb,
869                                                              msg2->dn,
870                                                              el);
871                                 if (ret != LDB_SUCCESS) {
872                                         goto done;
873                                 }
874                         } else {
875                                 j = (unsigned int) idx;
876                                 el2 = &(msg2->elements[j]);
877
878                                 /* We cannot add another value on a existing one
879                                    if the attribute is single-valued */
880                                 if (ldb_tdb_single_valued(a, el)) {
881                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
882                                                                el->name, ldb_dn_get_linearized(msg2->dn));
883                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
884                                         goto done;
885                                 }
886
887                                 /* Check that values don't exist yet on multi-
888                                    valued attributes or aren't provided twice */
889                                 if (!(el->flags &
890                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
891                                         struct ldb_val *duplicate = NULL;
892                                         ret = ldb_msg_find_common_values(ldb,
893                                                                          msg2,
894                                                                          el,
895                                                                          el2,
896                                                                          options);
897
898                                         if (ret ==
899                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
900                                                 ldb_asprintf_errstring(ldb,
901                                                         "attribute '%s': value "
902                                                         "#%u on '%s' already "
903                                                         "exists", el->name, j,
904                                                         ldb_dn_get_linearized(msg2->dn));
905                                                 goto done;
906                                         } else if (ret != LDB_SUCCESS) {
907                                                 goto done;
908                                         }
909
910                                         ret = ldb_msg_find_duplicate_val(
911                                                 ldb, msg2, el, &duplicate, 0);
912                                         if (ret != LDB_SUCCESS) {
913                                                 goto done;
914                                         }
915                                         if (duplicate != NULL) {
916                                                 ldb_asprintf_errstring(
917                                                         ldb,
918                                                         "attribute '%s': value "
919                                                         "'%.*s' on '%s' "
920                                                         "provided more than "
921                                                         "once in ADD",
922                                                         el->name,
923                                                         (int)duplicate->length,
924                                                         duplicate->data,
925                                                         ldb_dn_get_linearized(msg->dn));
926                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
927                                                 goto done;
928                                         }
929                                 }
930
931                                 /* Now combine existing and new values to a new
932                                    attribute record */
933                                 vals = talloc_realloc(msg2->elements,
934                                                       el2->values, struct ldb_val,
935                                                       el2->num_values + el->num_values);
936                                 if (vals == NULL) {
937                                         ldb_oom(ldb);
938                                         ret = LDB_ERR_OTHER;
939                                         goto done;
940                                 }
941
942                                 for (j=0; j<el->num_values; j++) {
943                                         vals[el2->num_values + j] =
944                                                 ldb_val_dup(vals, &el->values[j]);
945                                 }
946
947                                 el2->values = vals;
948                                 el2->num_values += el->num_values;
949
950                                 ret = ltdb_index_add_element(module, ltdb,
951                                                              msg2->dn, el);
952                                 if (ret != LDB_SUCCESS) {
953                                         goto done;
954                                 }
955                         }
956
957                         break;
958
959                 case LDB_FLAG_MOD_REPLACE:
960
961                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
962                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
963                                                        el->name, ldb_dn_get_linearized(msg2->dn));
964                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
965                                 goto done;
966                         }
967
968                         /*
969                          * We don't need to check this if we have been
970                          * pre-screened by the repl_meta_data module
971                          * in Samba, or someone else who can claim to
972                          * know what they are doing. 
973                          */
974                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
975                                 struct ldb_val *duplicate = NULL;
976
977                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
978                                                                  &duplicate, 0);
979                                 if (ret != LDB_SUCCESS) {
980                                         goto done;
981                                 }
982                                 if (duplicate != NULL) {
983                                         ldb_asprintf_errstring(
984                                                 ldb,
985                                                 "attribute '%s': value '%.*s' "
986                                                 "on '%s' provided more than "
987                                                 "once in REPLACE",
988                                                 el->name,
989                                                 (int)duplicate->length,
990                                                 duplicate->data,
991                                                 ldb_dn_get_linearized(msg2->dn));
992                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
993                                         goto done;
994                                 }
995                         }
996
997                         /* Checks if element already exists */
998                         idx = find_element(msg2, el->name);
999                         if (idx != -1) {
1000                                 j = (unsigned int) idx;
1001                                 el2 = &(msg2->elements[j]);
1002
1003                                 /* we consider two elements to be
1004                                  * equal only if the order
1005                                  * matches. This allows dbcheck to
1006                                  * fix the ordering on attributes
1007                                  * where order matters, such as
1008                                  * objectClass
1009                                  */
1010                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1011                                         continue;
1012                                 }
1013
1014                                 /* Delete the attribute if it exists in the DB */
1015                                 if (msg_delete_attribute(module, ltdb,
1016                                                          msg2,
1017                                                          el->name) != 0) {
1018                                         ret = LDB_ERR_OTHER;
1019                                         goto done;
1020                                 }
1021                         }
1022
1023                         /* Recreate it with the new values */
1024                         if (ltdb_msg_add_element(msg2, el) != 0) {
1025                                 ret = LDB_ERR_OTHER;
1026                                 goto done;
1027                         }
1028
1029                         ret = ltdb_index_add_element(module, ltdb,
1030                                                      msg2->dn, el);
1031                         if (ret != LDB_SUCCESS) {
1032                                 goto done;
1033                         }
1034
1035                         break;
1036
1037                 case LDB_FLAG_MOD_DELETE:
1038                         dn = ldb_dn_get_linearized(msg2->dn);
1039                         if (dn == NULL) {
1040                                 ret = LDB_ERR_OTHER;
1041                                 goto done;
1042                         }
1043
1044                         if (msg->elements[i].num_values == 0) {
1045                                 /* Delete the whole attribute */
1046                                 ret = msg_delete_attribute(module,
1047                                                            ltdb,
1048                                                            msg2,
1049                                                            msg->elements[i].name);
1050                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1051                                     control_permissive) {
1052                                         ret = LDB_SUCCESS;
1053                                 } else {
1054                                         ldb_asprintf_errstring(ldb,
1055                                                                "attribute '%s': no such attribute for delete on '%s'",
1056                                                                msg->elements[i].name, dn);
1057                                 }
1058                                 if (ret != LDB_SUCCESS) {
1059                                         goto done;
1060                                 }
1061                         } else {
1062                                 /* Delete specified values from an attribute */
1063                                 for (j=0; j < msg->elements[i].num_values; j++) {
1064                                         ret = msg_delete_element(module,
1065                                                                  ltdb,
1066                                                                  msg2,
1067                                                                  msg->elements[i].name,
1068                                                                  &msg->elements[i].values[j]);
1069                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1070                                             control_permissive) {
1071                                                 ret = LDB_SUCCESS;
1072                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1073                                                 ldb_asprintf_errstring(ldb,
1074                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1075                                                                        msg->elements[i].name, dn);
1076                                         }
1077                                         if (ret != LDB_SUCCESS) {
1078                                                 goto done;
1079                                         }
1080                                 }
1081                         }
1082                         break;
1083                 default:
1084                         ldb_asprintf_errstring(ldb,
1085                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1086                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1087                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1088                         ret = LDB_ERR_PROTOCOL_ERROR;
1089                         goto done;
1090                 }
1091         }
1092
1093         ret = ltdb_store(module, msg2, TDB_MODIFY);
1094         if (ret != LDB_SUCCESS) {
1095                 goto done;
1096         }
1097
1098         ret = ltdb_modified(module, msg2->dn);
1099         if (ret != LDB_SUCCESS) {
1100                 goto done;
1101         }
1102
1103 done:
1104         TALLOC_FREE(mem_ctx);
1105         return ret;
1106 }
1107
1108 /*
1109   modify a record
1110 */
1111 static int ltdb_modify(struct ltdb_context *ctx)
1112 {
1113         struct ldb_module *module = ctx->module;
1114         struct ldb_request *req = ctx->req;
1115         int ret = LDB_SUCCESS;
1116
1117         ret = ltdb_check_special_dn(module, req->op.mod.message);
1118         if (ret != LDB_SUCCESS) {
1119                 return ret;
1120         }
1121
1122         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1123
1124         if (ltdb_cache_load(module) != 0) {
1125                 return LDB_ERR_OPERATIONS_ERROR;
1126         }
1127
1128         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1129
1130         return ret;
1131 }
1132
1133 /*
1134   rename a record
1135 */
1136 static int ltdb_rename(struct ltdb_context *ctx)
1137 {
1138         struct ldb_module *module = ctx->module;
1139         void *data = ldb_module_get_private(module);
1140         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1141         struct ldb_request *req = ctx->req;
1142         struct ldb_message *msg;
1143         int ret = LDB_SUCCESS;
1144         TDB_DATA tdb_key, tdb_key_old;
1145
1146         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1147
1148         if (ltdb_cache_load(ctx->module) != 0) {
1149                 return LDB_ERR_OPERATIONS_ERROR;
1150         }
1151
1152         msg = ldb_msg_new(ctx);
1153         if (msg == NULL) {
1154                 return LDB_ERR_OPERATIONS_ERROR;
1155         }
1156
1157         /* we need to fetch the old record to re-add under the new name */
1158         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1159                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1160         if (ret != LDB_SUCCESS) {
1161                 /* not finding the old record is an error */
1162                 return ret;
1163         }
1164
1165         /* We need to, before changing the DB, check if the new DN
1166          * exists, so we can return this error to the caller with an
1167          * unmodified DB */
1168         tdb_key = ltdb_key_dn(module, req->op.rename.newdn);
1169         if (!tdb_key.dptr) {
1170                 talloc_free(msg);
1171                 return LDB_ERR_OPERATIONS_ERROR;
1172         }
1173
1174         tdb_key_old = ltdb_key_dn(module, req->op.rename.olddn);
1175         if (!tdb_key_old.dptr) {
1176                 talloc_free(msg);
1177                 talloc_free(tdb_key.dptr);
1178                 return LDB_ERR_OPERATIONS_ERROR;
1179         }
1180
1181         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1182         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1183                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1184                         talloc_free(tdb_key_old.dptr);
1185                         talloc_free(tdb_key.dptr);
1186                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1187                                                "Entry %s already exists",
1188                                                ldb_dn_get_linearized(req->op.rename.newdn));
1189                         /* finding the new record already in the DB is an error */
1190                         talloc_free(msg);
1191                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1192                 }
1193         }
1194         talloc_free(tdb_key_old.dptr);
1195         talloc_free(tdb_key.dptr);
1196
1197         /* Always delete first then add, to avoid conflicts with
1198          * unique indexes. We rely on the transaction to make this
1199          * atomic
1200          */
1201         ret = ltdb_delete_internal(module, msg->dn);
1202         if (ret != LDB_SUCCESS) {
1203                 talloc_free(msg);
1204                 return ret;
1205         }
1206
1207         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1208         if (msg->dn == NULL) {
1209                 talloc_free(msg);
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         /* We don't check single value as we can have more than 1 with
1214          * deleted attributes. We could go through all elements but that's
1215          * maybe not the most efficient way
1216          */
1217         ret = ltdb_add_internal(module, ltdb, msg, false);
1218
1219         talloc_free(msg);
1220
1221         return ret;
1222 }
1223
1224 static int ltdb_start_trans(struct ldb_module *module)
1225 {
1226         void *data = ldb_module_get_private(module);
1227         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1228
1229         /* Do not take out the transaction lock on a read-only DB */
1230         if (ltdb->read_only) {
1231                 return LDB_ERR_UNWILLING_TO_PERFORM;
1232         }
1233
1234         if (tdb_transaction_start(ltdb->tdb) != 0) {
1235                 return ltdb_err_map(tdb_error(ltdb->tdb));
1236         }
1237
1238         ltdb->in_transaction++;
1239
1240         ltdb_index_transaction_start(module);
1241
1242         return LDB_SUCCESS;
1243 }
1244
1245 static int ltdb_prepare_commit(struct ldb_module *module)
1246 {
1247         int ret;
1248         void *data = ldb_module_get_private(module);
1249         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1250
1251         if (ltdb->in_transaction != 1) {
1252                 return LDB_SUCCESS;
1253         }
1254
1255         ret = ltdb_index_transaction_commit(module);
1256         if (ret != LDB_SUCCESS) {
1257                 tdb_transaction_cancel(ltdb->tdb);
1258                 ltdb->in_transaction--;
1259                 return ret;
1260         }
1261
1262         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1263                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1264                 ltdb->in_transaction--;
1265                 ldb_debug_set(ldb_module_get_ctx(module),
1266                               LDB_DEBUG_FATAL,
1267                               "Failure during "
1268                               "tdb_transaction_prepare_commit(): %s -> %s",
1269                               tdb_errorstr(ltdb->tdb),
1270                               ldb_strerror(ret));
1271                 return ret;
1272         }
1273
1274         ltdb->prepared_commit = true;
1275
1276         return LDB_SUCCESS;
1277 }
1278
1279 static int ltdb_end_trans(struct ldb_module *module)
1280 {
1281         int ret;
1282         void *data = ldb_module_get_private(module);
1283         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1284
1285         if (!ltdb->prepared_commit) {
1286                 ret = ltdb_prepare_commit(module);
1287                 if (ret != LDB_SUCCESS) {
1288                         return ret;
1289                 }
1290         }
1291
1292         ltdb->in_transaction--;
1293         ltdb->prepared_commit = false;
1294
1295         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1296                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1297                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1298                                        "Failure during tdb_transaction_commit(): %s -> %s",
1299                                        tdb_errorstr(ltdb->tdb),
1300                                        ldb_strerror(ret));
1301                 return ret;
1302         }
1303
1304         return LDB_SUCCESS;
1305 }
1306
1307 static int ltdb_del_trans(struct ldb_module *module)
1308 {
1309         void *data = ldb_module_get_private(module);
1310         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1311
1312         ltdb->in_transaction--;
1313
1314         if (ltdb_index_transaction_cancel(module) != 0) {
1315                 tdb_transaction_cancel(ltdb->tdb);
1316                 return ltdb_err_map(tdb_error(ltdb->tdb));
1317         }
1318
1319         tdb_transaction_cancel(ltdb->tdb);
1320         return LDB_SUCCESS;
1321 }
1322
1323 /*
1324   return sequenceNumber from @BASEINFO
1325 */
1326 static int ltdb_sequence_number(struct ltdb_context *ctx,
1327                                 struct ldb_extended **ext)
1328 {
1329         struct ldb_context *ldb;
1330         struct ldb_module *module = ctx->module;
1331         struct ldb_request *req = ctx->req;
1332         TALLOC_CTX *tmp_ctx = NULL;
1333         struct ldb_seqnum_request *seq;
1334         struct ldb_seqnum_result *res;
1335         struct ldb_message *msg = NULL;
1336         struct ldb_dn *dn;
1337         const char *date;
1338         int ret = LDB_SUCCESS;
1339
1340         ldb = ldb_module_get_ctx(module);
1341
1342         seq = talloc_get_type(req->op.extended.data,
1343                                 struct ldb_seqnum_request);
1344         if (seq == NULL) {
1345                 return LDB_ERR_OPERATIONS_ERROR;
1346         }
1347
1348         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1349
1350         if (ltdb_lock_read(module) != 0) {
1351                 return LDB_ERR_OPERATIONS_ERROR;
1352         }
1353
1354         res = talloc_zero(req, struct ldb_seqnum_result);
1355         if (res == NULL) {
1356                 ret = LDB_ERR_OPERATIONS_ERROR;
1357                 goto done;
1358         }
1359
1360         tmp_ctx = talloc_new(req);
1361         if (tmp_ctx == NULL) {
1362                 ret = LDB_ERR_OPERATIONS_ERROR;
1363                 goto done;
1364         }
1365
1366         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1367         if (dn == NULL) {
1368                 ret = LDB_ERR_OPERATIONS_ERROR;
1369                 goto done;
1370         }
1371
1372         msg = ldb_msg_new(tmp_ctx);
1373         if (msg == NULL) {
1374                 ret = LDB_ERR_OPERATIONS_ERROR;
1375                 goto done;
1376         }
1377
1378         ret = ltdb_search_dn1(module, dn, msg, 0);
1379         if (ret != LDB_SUCCESS) {
1380                 goto done;
1381         }
1382
1383         switch (seq->type) {
1384         case LDB_SEQ_HIGHEST_SEQ:
1385                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1386                 break;
1387         case LDB_SEQ_NEXT:
1388                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1389                 res->seq_num++;
1390                 break;
1391         case LDB_SEQ_HIGHEST_TIMESTAMP:
1392                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1393                 if (date) {
1394                         res->seq_num = ldb_string_to_time(date);
1395                 } else {
1396                         res->seq_num = 0;
1397                         /* zero is as good as anything when we don't know */
1398                 }
1399                 break;
1400         }
1401
1402         *ext = talloc_zero(req, struct ldb_extended);
1403         if (*ext == NULL) {
1404                 ret = LDB_ERR_OPERATIONS_ERROR;
1405                 goto done;
1406         }
1407         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1408         (*ext)->data = talloc_steal(*ext, res);
1409
1410 done:
1411         talloc_free(tmp_ctx);
1412         ltdb_unlock_read(module);
1413         return ret;
1414 }
1415
1416 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1417 {
1418         struct ldb_context *ldb;
1419         struct ldb_request *req;
1420         struct ldb_reply *ares;
1421
1422         ldb = ldb_module_get_ctx(ctx->module);
1423         req = ctx->req;
1424
1425         /* if we already returned an error just return */
1426         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1427                 return;
1428         }
1429
1430         ares = talloc_zero(req, struct ldb_reply);
1431         if (!ares) {
1432                 ldb_oom(ldb);
1433                 req->callback(req, NULL);
1434                 return;
1435         }
1436         ares->type = LDB_REPLY_DONE;
1437         ares->error = error;
1438
1439         req->callback(req, ares);
1440 }
1441
1442 static void ltdb_timeout(struct tevent_context *ev,
1443                           struct tevent_timer *te,
1444                           struct timeval t,
1445                           void *private_data)
1446 {
1447         struct ltdb_context *ctx;
1448         ctx = talloc_get_type(private_data, struct ltdb_context);
1449
1450         if (!ctx->request_terminated) {
1451                 /* request is done now */
1452                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1453         }
1454
1455         if (ctx->spy) {
1456                 /* neutralize the spy */
1457                 ctx->spy->ctx = NULL;
1458                 ctx->spy = NULL;
1459         }
1460         talloc_free(ctx);
1461 }
1462
1463 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1464                                         struct ldb_extended *ext,
1465                                         int error)
1466 {
1467         struct ldb_context *ldb;
1468         struct ldb_request *req;
1469         struct ldb_reply *ares;
1470
1471         ldb = ldb_module_get_ctx(ctx->module);
1472         req = ctx->req;
1473
1474         /* if we already returned an error just return */
1475         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1476                 return;
1477         }
1478
1479         ares = talloc_zero(req, struct ldb_reply);
1480         if (!ares) {
1481                 ldb_oom(ldb);
1482                 req->callback(req, NULL);
1483                 return;
1484         }
1485         ares->type = LDB_REPLY_DONE;
1486         ares->response = ext;
1487         ares->error = error;
1488
1489         req->callback(req, ares);
1490 }
1491
1492 static void ltdb_handle_extended(struct ltdb_context *ctx)
1493 {
1494         struct ldb_extended *ext = NULL;
1495         int ret;
1496
1497         if (strcmp(ctx->req->op.extended.oid,
1498                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1499                 /* get sequence number */
1500                 ret = ltdb_sequence_number(ctx, &ext);
1501         } else {
1502                 /* not recognized */
1503                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1504         }
1505
1506         ltdb_request_extended_done(ctx, ext, ret);
1507 }
1508
1509 static void ltdb_callback(struct tevent_context *ev,
1510                           struct tevent_timer *te,
1511                           struct timeval t,
1512                           void *private_data)
1513 {
1514         struct ltdb_context *ctx;
1515         int ret;
1516
1517         ctx = talloc_get_type(private_data, struct ltdb_context);
1518
1519         if (ctx->request_terminated) {
1520                 goto done;
1521         }
1522
1523         switch (ctx->req->operation) {
1524         case LDB_SEARCH:
1525                 ret = ltdb_search(ctx);
1526                 break;
1527         case LDB_ADD:
1528                 ret = ltdb_add(ctx);
1529                 break;
1530         case LDB_MODIFY:
1531                 ret = ltdb_modify(ctx);
1532                 break;
1533         case LDB_DELETE:
1534                 ret = ltdb_delete(ctx);
1535                 break;
1536         case LDB_RENAME:
1537                 ret = ltdb_rename(ctx);
1538                 break;
1539         case LDB_EXTENDED:
1540                 ltdb_handle_extended(ctx);
1541                 goto done;
1542         default:
1543                 /* no other op supported */
1544                 ret = LDB_ERR_PROTOCOL_ERROR;
1545         }
1546
1547         if (!ctx->request_terminated) {
1548                 /* request is done now */
1549                 ltdb_request_done(ctx, ret);
1550         }
1551
1552 done:
1553         if (ctx->spy) {
1554                 /* neutralize the spy */
1555                 ctx->spy->ctx = NULL;
1556                 ctx->spy = NULL;
1557         }
1558         talloc_free(ctx);
1559 }
1560
1561 static int ltdb_request_destructor(void *ptr)
1562 {
1563         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1564
1565         if (spy->ctx != NULL) {
1566                 spy->ctx->spy = NULL;
1567                 spy->ctx->request_terminated = true;
1568                 spy->ctx = NULL;
1569         }
1570
1571         return 0;
1572 }
1573
1574 static int ltdb_handle_request(struct ldb_module *module,
1575                                struct ldb_request *req)
1576 {
1577         struct ldb_control *control_permissive;
1578         struct ldb_context *ldb;
1579         struct tevent_context *ev;
1580         struct ltdb_context *ac;
1581         struct tevent_timer *te;
1582         struct timeval tv;
1583         unsigned int i;
1584
1585         ldb = ldb_module_get_ctx(module);
1586
1587         control_permissive = ldb_request_get_control(req,
1588                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1589
1590         for (i = 0; req->controls && req->controls[i]; i++) {
1591                 if (req->controls[i]->critical &&
1592                     req->controls[i] != control_permissive) {
1593                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1594                                                req->controls[i]->oid);
1595                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1596                 }
1597         }
1598
1599         if (req->starttime == 0 || req->timeout == 0) {
1600                 ldb_set_errstring(ldb, "Invalid timeout settings");
1601                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1602         }
1603
1604         ev = ldb_handle_get_event_context(req->handle);
1605
1606         ac = talloc_zero(ldb, struct ltdb_context);
1607         if (ac == NULL) {
1608                 ldb_oom(ldb);
1609                 return LDB_ERR_OPERATIONS_ERROR;
1610         }
1611
1612         ac->module = module;
1613         ac->req = req;
1614
1615         tv.tv_sec = 0;
1616         tv.tv_usec = 0;
1617         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1618         if (NULL == te) {
1619                 talloc_free(ac);
1620                 return LDB_ERR_OPERATIONS_ERROR;
1621         }
1622
1623         if (req->timeout > 0) {
1624                 tv.tv_sec = req->starttime + req->timeout;
1625                 tv.tv_usec = 0;
1626                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1627                                                      ltdb_timeout, ac);
1628                 if (NULL == ac->timeout_event) {
1629                         talloc_free(ac);
1630                         return LDB_ERR_OPERATIONS_ERROR;
1631                 }
1632         }
1633
1634         /* set a spy so that we do not try to use the request context
1635          * if it is freed before ltdb_callback fires */
1636         ac->spy = talloc(req, struct ltdb_req_spy);
1637         if (NULL == ac->spy) {
1638                 talloc_free(ac);
1639                 return LDB_ERR_OPERATIONS_ERROR;
1640         }
1641         ac->spy->ctx = ac;
1642
1643         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1644
1645         return LDB_SUCCESS;
1646 }
1647
1648 static int ltdb_init_rootdse(struct ldb_module *module)
1649 {
1650         /* ignore errors on this - we expect it for non-sam databases */
1651         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1652
1653         /* there can be no module beyond the backend, just return */
1654         return LDB_SUCCESS;
1655 }
1656
1657 static const struct ldb_module_ops ltdb_ops = {
1658         .name              = "tdb",
1659         .init_context      = ltdb_init_rootdse,
1660         .search            = ltdb_handle_request,
1661         .add               = ltdb_handle_request,
1662         .modify            = ltdb_handle_request,
1663         .del               = ltdb_handle_request,
1664         .rename            = ltdb_handle_request,
1665         .extended          = ltdb_handle_request,
1666         .start_transaction = ltdb_start_trans,
1667         .end_transaction   = ltdb_end_trans,
1668         .prepare_commit    = ltdb_prepare_commit,
1669         .del_transaction   = ltdb_del_trans,
1670         .read_lock         = ltdb_lock_read,
1671         .read_unlock       = ltdb_unlock_read,
1672 };
1673
1674 /*
1675   connect to the database
1676 */
1677 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1678                         unsigned int flags, const char *options[],
1679                         struct ldb_module **_module)
1680 {
1681         struct ldb_module *module;
1682         const char *path;
1683         int tdb_flags, open_flags;
1684         struct ltdb_private *ltdb;
1685
1686         /*
1687          * We hold locks, so we must use a private event context
1688          * on each returned handle
1689          */
1690
1691         ldb_set_require_private_event_context(ldb);
1692
1693         /* parse the url */
1694         if (strchr(url, ':')) {
1695                 if (strncmp(url, "tdb://", 6) != 0) {
1696                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1697                                   "Invalid tdb URL '%s'", url);
1698                         return LDB_ERR_OPERATIONS_ERROR;
1699                 }
1700                 path = url+6;
1701         } else {
1702                 path = url;
1703         }
1704
1705         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1706
1707         /* check for the 'nosync' option */
1708         if (flags & LDB_FLG_NOSYNC) {
1709                 tdb_flags |= TDB_NOSYNC;
1710         }
1711
1712         /* and nommap option */
1713         if (flags & LDB_FLG_NOMMAP) {
1714                 tdb_flags |= TDB_NOMMAP;
1715         }
1716
1717         ltdb = talloc_zero(ldb, struct ltdb_private);
1718         if (!ltdb) {
1719                 ldb_oom(ldb);
1720                 return LDB_ERR_OPERATIONS_ERROR;
1721         }
1722
1723         if (flags & LDB_FLG_RDONLY) {
1724                 /*
1725                  * This is weird, but because we can only have one tdb
1726                  * in this process, and the other one could be
1727                  * read-write, we can't use the tdb readonly.  Plus a
1728                  * read only tdb prohibits the all-record lock.
1729                  */
1730                 open_flags = O_RDWR;
1731
1732                 ltdb->read_only = true;
1733
1734         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1735                 /*
1736                  * This is used by ldbsearch to prevent creation of the database
1737                  * if the name is wrong
1738                  */
1739                 open_flags = O_RDWR;
1740         } else {
1741                 /*
1742                  * This is the normal case
1743                  */
1744                 open_flags = O_CREAT | O_RDWR;
1745         }
1746
1747         /* note that we use quite a large default hash size */
1748         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1749                                    tdb_flags, open_flags,
1750                                    ldb_get_create_perms(ldb), ldb);
1751         if (!ltdb->tdb) {
1752                 ldb_asprintf_errstring(ldb,
1753                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1754                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1755                           "Unable to open tdb '%s': %s", path, strerror(errno));
1756                 talloc_free(ltdb);
1757                 if (errno == EACCES || errno == EPERM) {
1758                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1759                 }
1760                 return LDB_ERR_OPERATIONS_ERROR;
1761         }
1762
1763         if (getenv("LDB_WARN_UNINDEXED")) {
1764                 ltdb->warn_unindexed = true;
1765         }
1766
1767         if (getenv("LDB_WARN_REINDEX")) {
1768                 ltdb->warn_reindex = true;
1769         }
1770
1771         ltdb->sequence_number = 0;
1772
1773         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1774         if (!module) {
1775                 ldb_oom(ldb);
1776                 talloc_free(ltdb);
1777                 return LDB_ERR_OPERATIONS_ERROR;
1778         }
1779         ldb_module_set_private(module, ltdb);
1780         talloc_steal(module, ltdb);
1781
1782         if (ltdb_cache_load(module) != 0) {
1783                 ldb_asprintf_errstring(ldb,
1784                                        "Unable to load ltdb cache records of tdb '%s'", path);
1785                 talloc_free(module);
1786                 return LDB_ERR_OPERATIONS_ERROR;
1787         }
1788
1789         *_module = module;
1790         return LDB_SUCCESS;
1791 }
1792
1793 int ldb_tdb_init(const char *version)
1794 {
1795         LDB_MODULE_CHECK_VERSION(version);
1796         return ldb_register_backend("tdb", ltdb_connect, false);
1797 }