ldb_tdb: ltdb_tdb_parse_record map tdb error codes
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 static int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 static int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         if (ret != LDB_SUCCESS) {
414                 ltdb->reindex_failed = true;
415         }
416
417         return ret;
418 }
419
420 static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
421                           struct ldb_val ldb_data, int flags)
422 {
423         TDB_DATA key = {
424                 .dptr = ldb_key.data,
425                 .dsize = ldb_key.length
426         };
427         TDB_DATA data = {
428                 .dptr = ldb_data.data,
429                 .dsize = ldb_data.length
430         };
431         return tdb_store(ltdb->tdb, key, data, flags);
432 }
433
434 static int ltdb_error(struct ltdb_private *ltdb)
435 {
436         return ltdb_err_map(tdb_error(ltdb->tdb));
437 }
438
439 static const char *ltdb_errorstr(struct ltdb_private *ltdb)
440 {
441         return tdb_errorstr(ltdb->tdb);
442 }
443
444 /*
445   store a record into the db
446 */
447 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
448 {
449         void *data = ldb_module_get_private(module);
450         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
451         TDB_DATA tdb_key;
452         struct ldb_val ldb_key;
453         struct ldb_val ldb_data;
454         int ret = LDB_SUCCESS;
455         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
456
457         if (tdb_key_ctx == NULL) {
458                 return ldb_module_oom(module);
459         }
460
461         if (ltdb->read_only) {
462                 return LDB_ERR_UNWILLING_TO_PERFORM;
463         }
464
465         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
466         if (tdb_key.dptr == NULL) {
467                 TALLOC_FREE(tdb_key_ctx);
468                 return LDB_ERR_OTHER;
469         }
470
471         ret = ldb_pack_data(ldb_module_get_ctx(module),
472                             msg, &ldb_data);
473         if (ret == -1) {
474                 TALLOC_FREE(tdb_key_ctx);
475                 return LDB_ERR_OTHER;
476         }
477
478         ldb_key.data = tdb_key.dptr;
479         ldb_key.length = tdb_key.dsize;
480
481         ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
482         if (ret != 0) {
483                 bool is_special = ldb_dn_is_special(msg->dn);
484                 ret = ltdb->kv_ops->error(ltdb);
485
486                 /*
487                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
488                  * the GUID, so re-map
489                  */
490                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
491                     && !is_special
492                     && ltdb->cache->GUID_index_attribute != NULL) {
493                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
494                 }
495                 goto done;
496         }
497
498 done:
499         TALLOC_FREE(tdb_key_ctx);
500         talloc_free(ldb_data.data);
501
502         return ret;
503 }
504
505
506 /*
507   check if a attribute is a single valued, for a given element
508  */
509 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
510                                   struct ldb_message_element *el)
511 {
512         if (!a) return false;
513         if (el != NULL) {
514                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
515                         /* override from a ldb module, for example
516                            used for the description field, which is
517                            marked multi-valued in the schema but which
518                            should not actually accept multiple
519                            values */
520                         return true;
521                 }
522                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
523                         /* override from a ldb module, for example used for
524                            deleted linked attribute entries */
525                         return false;
526                 }
527         }
528         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
529                 return true;
530         }
531         return false;
532 }
533
534 static int ltdb_add_internal(struct ldb_module *module,
535                              struct ltdb_private *ltdb,
536                              const struct ldb_message *msg,
537                              bool check_single_value)
538 {
539         struct ldb_context *ldb = ldb_module_get_ctx(module);
540         int ret = LDB_SUCCESS;
541         unsigned int i;
542
543         for (i=0;i<msg->num_elements;i++) {
544                 struct ldb_message_element *el = &msg->elements[i];
545                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
546
547                 if (el->num_values == 0) {
548                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
549                                                el->name, ldb_dn_get_linearized(msg->dn));
550                         return LDB_ERR_CONSTRAINT_VIOLATION;
551                 }
552                 if (check_single_value &&
553                                 el->num_values > 1 &&
554                                 ldb_tdb_single_valued(a, el)) {
555                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
556                                                el->name, ldb_dn_get_linearized(msg->dn));
557                         return LDB_ERR_CONSTRAINT_VIOLATION;
558                 }
559
560                 /* Do not check "@ATTRIBUTES" for duplicated values */
561                 if (ldb_dn_is_special(msg->dn) &&
562                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
563                         continue;
564                 }
565
566                 if (check_single_value &&
567                     !(el->flags &
568                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
569                         struct ldb_val *duplicate = NULL;
570
571                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
572                                                          el, &duplicate, 0);
573                         if (ret != LDB_SUCCESS) {
574                                 return ret;
575                         }
576                         if (duplicate != NULL) {
577                                 ldb_asprintf_errstring(
578                                         ldb,
579                                         "attribute '%s': value '%.*s' on '%s' "
580                                         "provided more than once in ADD object",
581                                         el->name,
582                                         (int)duplicate->length,
583                                         duplicate->data,
584                                         ldb_dn_get_linearized(msg->dn));
585                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
586                         }
587                 }
588         }
589
590         ret = ltdb_store(module, msg, TDB_INSERT);
591         if (ret != LDB_SUCCESS) {
592                 /*
593                  * Try really hard to get the right error code for
594                  * a re-add situation, as this can matter!
595                  */
596                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
597                         int ret2;
598                         struct ldb_dn *dn2 = NULL;
599                         TALLOC_CTX *mem_ctx = talloc_new(module);
600                         if (mem_ctx == NULL) {
601                                 return ldb_module_operr(module);
602                         }
603                         ret2 = ltdb_search_base(module, module,
604                                                 msg->dn, &dn2);
605                         TALLOC_FREE(mem_ctx);
606                         if (ret2 == LDB_SUCCESS) {
607                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
608                         }
609                 }
610                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
611                         ldb_asprintf_errstring(ldb,
612                                                "Entry %s already exists",
613                                                ldb_dn_get_linearized(msg->dn));
614                 }
615                 return ret;
616         }
617
618         ret = ltdb_index_add_new(module, ltdb, msg);
619         if (ret != LDB_SUCCESS) {
620                 /*
621                  * If we failed to index, delete the message again.
622                  *
623                  * This is particularly important for the GUID index
624                  * case, which will only fail for a duplicate DN
625                  * in the index add.
626                  *
627                  * Note that the caller may not cancel the transation
628                  * and this means the above add might really show up!
629                  */
630                 ltdb_delete_noindex(module, msg);
631                 return ret;
632         }
633
634         ret = ltdb_modified(module, msg->dn);
635
636         return ret;
637 }
638
639 /*
640   add a record to the database
641 */
642 static int ltdb_add(struct ltdb_context *ctx)
643 {
644         struct ldb_module *module = ctx->module;
645         struct ldb_request *req = ctx->req;
646         void *data = ldb_module_get_private(module);
647         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
648         int ret = LDB_SUCCESS;
649
650         ret = ltdb_check_special_dn(module, req->op.add.message);
651         if (ret != LDB_SUCCESS) {
652                 return ret;
653         }
654
655         ldb_request_set_state(req, LDB_ASYNC_PENDING);
656
657         if (ltdb_cache_load(module) != 0) {
658                 return LDB_ERR_OPERATIONS_ERROR;
659         }
660
661         ret = ltdb_add_internal(module, ltdb,
662                                 req->op.add.message, true);
663
664         return ret;
665 }
666
667 static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
668 {
669         TDB_DATA tdb_key = {
670                 .dptr = ldb_key.data,
671                 .dsize = ldb_key.length
672         };
673         return tdb_delete(ltdb->tdb, tdb_key);
674 }
675
676 /*
677   delete a record from the database, not updating indexes (used for deleting
678   index records)
679 */
680 int ltdb_delete_noindex(struct ldb_module *module,
681                         const struct ldb_message *msg)
682 {
683         void *data = ldb_module_get_private(module);
684         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
685         struct ldb_val ldb_key;
686         TDB_DATA tdb_key;
687         int ret;
688         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
689
690         if (tdb_key_ctx == NULL) {
691                 return ldb_module_oom(module);
692         }
693
694         if (ltdb->read_only) {
695                 return LDB_ERR_UNWILLING_TO_PERFORM;
696         }
697
698         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
699         if (!tdb_key.dptr) {
700                 TALLOC_FREE(tdb_key_ctx);
701                 return LDB_ERR_OTHER;
702         }
703
704         ldb_key.data = tdb_key.dptr;
705         ldb_key.length = tdb_key.dsize;
706
707         ret = ltdb->kv_ops->delete(ltdb, ldb_key);
708         TALLOC_FREE(tdb_key_ctx);
709
710         if (ret != 0) {
711                 ret = ltdb->kv_ops->error(ltdb);
712         }
713
714         return ret;
715 }
716
717 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
718 {
719         struct ldb_message *msg;
720         int ret = LDB_SUCCESS;
721
722         msg = ldb_msg_new(module);
723         if (msg == NULL) {
724                 return LDB_ERR_OPERATIONS_ERROR;
725         }
726
727         /* in case any attribute of the message was indexed, we need
728            to fetch the old record */
729         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
730         if (ret != LDB_SUCCESS) {
731                 /* not finding the old record is an error */
732                 goto done;
733         }
734
735         ret = ltdb_delete_noindex(module, msg);
736         if (ret != LDB_SUCCESS) {
737                 goto done;
738         }
739
740         /* remove any indexed attributes */
741         ret = ltdb_index_delete(module, msg);
742         if (ret != LDB_SUCCESS) {
743                 goto done;
744         }
745
746         ret = ltdb_modified(module, dn);
747         if (ret != LDB_SUCCESS) {
748                 goto done;
749         }
750
751 done:
752         talloc_free(msg);
753         return ret;
754 }
755
756 /*
757   delete a record from the database
758 */
759 static int ltdb_delete(struct ltdb_context *ctx)
760 {
761         struct ldb_module *module = ctx->module;
762         struct ldb_request *req = ctx->req;
763         int ret = LDB_SUCCESS;
764
765         ldb_request_set_state(req, LDB_ASYNC_PENDING);
766
767         if (ltdb_cache_load(module) != 0) {
768                 return LDB_ERR_OPERATIONS_ERROR;
769         }
770
771         ret = ltdb_delete_internal(module, req->op.del.dn);
772
773         return ret;
774 }
775
776 /*
777   find an element by attribute name. At the moment this does a linear search,
778   it should be re-coded to use a binary search once all places that modify
779   records guarantee sorted order
780
781   return the index of the first matching element if found, otherwise -1
782 */
783 static int find_element(const struct ldb_message *msg, const char *name)
784 {
785         unsigned int i;
786         for (i=0;i<msg->num_elements;i++) {
787                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
788                         return i;
789                 }
790         }
791         return -1;
792 }
793
794
795 /*
796   add an element to an existing record. Assumes a elements array that we
797   can call re-alloc on, and assumed that we can re-use the data pointers from
798   the passed in additional values. Use with care!
799
800   returns 0 on success, -1 on failure (and sets errno)
801 */
802 static int ltdb_msg_add_element(struct ldb_message *msg,
803                                 struct ldb_message_element *el)
804 {
805         struct ldb_message_element *e2;
806         unsigned int i;
807
808         if (el->num_values == 0) {
809                 /* nothing to do here - we don't add empty elements */
810                 return 0;
811         }
812
813         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
814                               msg->num_elements+1);
815         if (!e2) {
816                 errno = ENOMEM;
817                 return -1;
818         }
819
820         msg->elements = e2;
821
822         e2 = &msg->elements[msg->num_elements];
823
824         e2->name = el->name;
825         e2->flags = el->flags;
826         e2->values = talloc_array(msg->elements,
827                                   struct ldb_val, el->num_values);
828         if (!e2->values) {
829                 errno = ENOMEM;
830                 return -1;
831         }
832         for (i=0;i<el->num_values;i++) {
833                 e2->values[i] = el->values[i];
834         }
835         e2->num_values = el->num_values;
836
837         ++msg->num_elements;
838
839         return 0;
840 }
841
842 /*
843   delete all elements having a specified attribute name
844 */
845 static int msg_delete_attribute(struct ldb_module *module,
846                                 struct ltdb_private *ltdb,
847                                 struct ldb_message *msg, const char *name)
848 {
849         unsigned int i;
850         int ret;
851         struct ldb_message_element *el;
852         bool is_special = ldb_dn_is_special(msg->dn);
853
854         if (!is_special
855             && ltdb->cache->GUID_index_attribute != NULL
856             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
857                 struct ldb_context *ldb = ldb_module_get_ctx(module);
858                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
859                                        "attribute %s (used as DB index)",
860                                        ltdb->cache->GUID_index_attribute);
861                 return LDB_ERR_CONSTRAINT_VIOLATION;
862         }
863
864         el = ldb_msg_find_element(msg, name);
865         if (el == NULL) {
866                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
867         }
868         i = el - msg->elements;
869
870         ret = ltdb_index_del_element(module, ltdb, msg, el);
871         if (ret != LDB_SUCCESS) {
872                 return ret;
873         }
874
875         talloc_free(el->values);
876         if (msg->num_elements > (i+1)) {
877                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
878         }
879         msg->num_elements--;
880         msg->elements = talloc_realloc(msg, msg->elements,
881                                        struct ldb_message_element,
882                                        msg->num_elements);
883         return LDB_SUCCESS;
884 }
885
886 /*
887   delete all elements matching an attribute name/value
888
889   return LDB Error on failure
890 */
891 static int msg_delete_element(struct ldb_module *module,
892                               struct ltdb_private *ltdb,
893                               struct ldb_message *msg,
894                               const char *name,
895                               const struct ldb_val *val)
896 {
897         struct ldb_context *ldb = ldb_module_get_ctx(module);
898         unsigned int i;
899         int found, ret;
900         struct ldb_message_element *el;
901         const struct ldb_schema_attribute *a;
902
903         found = find_element(msg, name);
904         if (found == -1) {
905                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
906         }
907
908         i = (unsigned int) found;
909         el = &(msg->elements[i]);
910
911         a = ldb_schema_attribute_by_name(ldb, el->name);
912
913         for (i=0;i<el->num_values;i++) {
914                 bool matched;
915                 if (a->syntax->operator_fn) {
916                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
917                                                      &el->values[i], val, &matched);
918                         if (ret != LDB_SUCCESS) return ret;
919                 } else {
920                         matched = (a->syntax->comparison_fn(ldb, ldb,
921                                                             &el->values[i], val) == 0);
922                 }
923                 if (matched) {
924                         if (el->num_values == 1) {
925                                 return msg_delete_attribute(module,
926                                                             ltdb, msg, name);
927                         }
928
929                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
930                         if (ret != LDB_SUCCESS) {
931                                 return ret;
932                         }
933
934                         if (i<el->num_values-1) {
935                                 memmove(&el->values[i], &el->values[i+1],
936                                         sizeof(el->values[i])*
937                                                 (el->num_values-(i+1)));
938                         }
939                         el->num_values--;
940
941                         /* per definition we find in a canonicalised message an
942                            attribute value only once. So we are finished here */
943                         return LDB_SUCCESS;
944                 }
945         }
946
947         /* Not found */
948         return LDB_ERR_NO_SUCH_ATTRIBUTE;
949 }
950
951 /*
952   modify a record - internal interface
953
954   yuck - this is O(n^2). Luckily n is usually small so we probably
955   get away with it, but if we ever have really large attribute lists
956   then we'll need to look at this again
957
958   'req' is optional, and is used to specify controls if supplied
959 */
960 int ltdb_modify_internal(struct ldb_module *module,
961                          const struct ldb_message *msg,
962                          struct ldb_request *req)
963 {
964         struct ldb_context *ldb = ldb_module_get_ctx(module);
965         void *data = ldb_module_get_private(module);
966         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
967         struct ldb_message *msg2;
968         unsigned int i, j;
969         int ret = LDB_SUCCESS, idx;
970         struct ldb_control *control_permissive = NULL;
971         TALLOC_CTX *mem_ctx = talloc_new(req);
972
973         if (mem_ctx == NULL) {
974                 return ldb_module_oom(module);
975         }
976         
977         if (req) {
978                 control_permissive = ldb_request_get_control(req,
979                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
980         }
981
982         msg2 = ldb_msg_new(mem_ctx);
983         if (msg2 == NULL) {
984                 ret = LDB_ERR_OTHER;
985                 goto done;
986         }
987
988         ret = ltdb_search_dn1(module, msg->dn,
989                               msg2,
990                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
991         if (ret != LDB_SUCCESS) {
992                 goto done;
993         }
994
995         for (i=0; i<msg->num_elements; i++) {
996                 struct ldb_message_element *el = &msg->elements[i], *el2;
997                 struct ldb_val *vals;
998                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
999                 const char *dn;
1000                 uint32_t options = 0;
1001                 if (control_permissive != NULL) {
1002                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
1003                 }
1004
1005                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
1006                 case LDB_FLAG_MOD_ADD:
1007
1008                         if (el->num_values == 0) {
1009                                 ldb_asprintf_errstring(ldb,
1010                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
1011                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1012                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
1013                                 goto done;
1014                         }
1015
1016                         /* make a copy of the array so that a permissive
1017                          * control can remove duplicates without changing the
1018                          * original values, but do not copy data as we do not
1019                          * need to keep it around once the operation is
1020                          * finished */
1021                         if (control_permissive) {
1022                                 el = talloc(msg2, struct ldb_message_element);
1023                                 if (!el) {
1024                                         ret = LDB_ERR_OTHER;
1025                                         goto done;
1026                                 }
1027                                 *el = msg->elements[i];
1028                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1029                                 if (el->values == NULL) {
1030                                         ret = LDB_ERR_OTHER;
1031                                         goto done;
1032                                 }
1033                                 for (j = 0; j < el->num_values; j++) {
1034                                         el->values[j] = msg->elements[i].values[j];
1035                                 }
1036                         }
1037
1038                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1039                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1040                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1041                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1042                                 goto done;
1043                         }
1044
1045                         /* Checks if element already exists */
1046                         idx = find_element(msg2, el->name);
1047                         if (idx == -1) {
1048                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1049                                         ret = LDB_ERR_OTHER;
1050                                         goto done;
1051                                 }
1052                                 ret = ltdb_index_add_element(module, ltdb,
1053                                                              msg2,
1054                                                              el);
1055                                 if (ret != LDB_SUCCESS) {
1056                                         goto done;
1057                                 }
1058                         } else {
1059                                 j = (unsigned int) idx;
1060                                 el2 = &(msg2->elements[j]);
1061
1062                                 /* We cannot add another value on a existing one
1063                                    if the attribute is single-valued */
1064                                 if (ldb_tdb_single_valued(a, el)) {
1065                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1066                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1067                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1068                                         goto done;
1069                                 }
1070
1071                                 /* Check that values don't exist yet on multi-
1072                                    valued attributes or aren't provided twice */
1073                                 if (!(el->flags &
1074                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1075                                         struct ldb_val *duplicate = NULL;
1076                                         ret = ldb_msg_find_common_values(ldb,
1077                                                                          msg2,
1078                                                                          el,
1079                                                                          el2,
1080                                                                          options);
1081
1082                                         if (ret ==
1083                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1084                                                 ldb_asprintf_errstring(ldb,
1085                                                         "attribute '%s': value "
1086                                                         "#%u on '%s' already "
1087                                                         "exists", el->name, j,
1088                                                         ldb_dn_get_linearized(msg2->dn));
1089                                                 goto done;
1090                                         } else if (ret != LDB_SUCCESS) {
1091                                                 goto done;
1092                                         }
1093
1094                                         ret = ldb_msg_find_duplicate_val(
1095                                                 ldb, msg2, el, &duplicate, 0);
1096                                         if (ret != LDB_SUCCESS) {
1097                                                 goto done;
1098                                         }
1099                                         if (duplicate != NULL) {
1100                                                 ldb_asprintf_errstring(
1101                                                         ldb,
1102                                                         "attribute '%s': value "
1103                                                         "'%.*s' on '%s' "
1104                                                         "provided more than "
1105                                                         "once in ADD",
1106                                                         el->name,
1107                                                         (int)duplicate->length,
1108                                                         duplicate->data,
1109                                                         ldb_dn_get_linearized(msg->dn));
1110                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1111                                                 goto done;
1112                                         }
1113                                 }
1114
1115                                 /* Now combine existing and new values to a new
1116                                    attribute record */
1117                                 vals = talloc_realloc(msg2->elements,
1118                                                       el2->values, struct ldb_val,
1119                                                       el2->num_values + el->num_values);
1120                                 if (vals == NULL) {
1121                                         ldb_oom(ldb);
1122                                         ret = LDB_ERR_OTHER;
1123                                         goto done;
1124                                 }
1125
1126                                 for (j=0; j<el->num_values; j++) {
1127                                         vals[el2->num_values + j] =
1128                                                 ldb_val_dup(vals, &el->values[j]);
1129                                 }
1130
1131                                 el2->values = vals;
1132                                 el2->num_values += el->num_values;
1133
1134                                 ret = ltdb_index_add_element(module, ltdb,
1135                                                              msg2, el);
1136                                 if (ret != LDB_SUCCESS) {
1137                                         goto done;
1138                                 }
1139                         }
1140
1141                         break;
1142
1143                 case LDB_FLAG_MOD_REPLACE:
1144
1145                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1146                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1147                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1148                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1149                                 goto done;
1150                         }
1151
1152                         /*
1153                          * We don't need to check this if we have been
1154                          * pre-screened by the repl_meta_data module
1155                          * in Samba, or someone else who can claim to
1156                          * know what they are doing. 
1157                          */
1158                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1159                                 struct ldb_val *duplicate = NULL;
1160
1161                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1162                                                                  &duplicate, 0);
1163                                 if (ret != LDB_SUCCESS) {
1164                                         goto done;
1165                                 }
1166                                 if (duplicate != NULL) {
1167                                         ldb_asprintf_errstring(
1168                                                 ldb,
1169                                                 "attribute '%s': value '%.*s' "
1170                                                 "on '%s' provided more than "
1171                                                 "once in REPLACE",
1172                                                 el->name,
1173                                                 (int)duplicate->length,
1174                                                 duplicate->data,
1175                                                 ldb_dn_get_linearized(msg2->dn));
1176                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1177                                         goto done;
1178                                 }
1179                         }
1180
1181                         /* Checks if element already exists */
1182                         idx = find_element(msg2, el->name);
1183                         if (idx != -1) {
1184                                 j = (unsigned int) idx;
1185                                 el2 = &(msg2->elements[j]);
1186
1187                                 /* we consider two elements to be
1188                                  * equal only if the order
1189                                  * matches. This allows dbcheck to
1190                                  * fix the ordering on attributes
1191                                  * where order matters, such as
1192                                  * objectClass
1193                                  */
1194                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1195                                         continue;
1196                                 }
1197
1198                                 /* Delete the attribute if it exists in the DB */
1199                                 if (msg_delete_attribute(module, ltdb,
1200                                                          msg2,
1201                                                          el->name) != 0) {
1202                                         ret = LDB_ERR_OTHER;
1203                                         goto done;
1204                                 }
1205                         }
1206
1207                         /* Recreate it with the new values */
1208                         if (ltdb_msg_add_element(msg2, el) != 0) {
1209                                 ret = LDB_ERR_OTHER;
1210                                 goto done;
1211                         }
1212
1213                         ret = ltdb_index_add_element(module, ltdb,
1214                                                      msg2, el);
1215                         if (ret != LDB_SUCCESS) {
1216                                 goto done;
1217                         }
1218
1219                         break;
1220
1221                 case LDB_FLAG_MOD_DELETE:
1222                         dn = ldb_dn_get_linearized(msg2->dn);
1223                         if (dn == NULL) {
1224                                 ret = LDB_ERR_OTHER;
1225                                 goto done;
1226                         }
1227
1228                         if (msg->elements[i].num_values == 0) {
1229                                 /* Delete the whole attribute */
1230                                 ret = msg_delete_attribute(module,
1231                                                            ltdb,
1232                                                            msg2,
1233                                                            msg->elements[i].name);
1234                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1235                                     control_permissive) {
1236                                         ret = LDB_SUCCESS;
1237                                 } else {
1238                                         ldb_asprintf_errstring(ldb,
1239                                                                "attribute '%s': no such attribute for delete on '%s'",
1240                                                                msg->elements[i].name, dn);
1241                                 }
1242                                 if (ret != LDB_SUCCESS) {
1243                                         goto done;
1244                                 }
1245                         } else {
1246                                 /* Delete specified values from an attribute */
1247                                 for (j=0; j < msg->elements[i].num_values; j++) {
1248                                         ret = msg_delete_element(module,
1249                                                                  ltdb,
1250                                                                  msg2,
1251                                                                  msg->elements[i].name,
1252                                                                  &msg->elements[i].values[j]);
1253                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1254                                             control_permissive) {
1255                                                 ret = LDB_SUCCESS;
1256                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1257                                                 ldb_asprintf_errstring(ldb,
1258                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1259                                                                        msg->elements[i].name, dn);
1260                                         }
1261                                         if (ret != LDB_SUCCESS) {
1262                                                 goto done;
1263                                         }
1264                                 }
1265                         }
1266                         break;
1267                 default:
1268                         ldb_asprintf_errstring(ldb,
1269                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1270                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1271                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1272                         ret = LDB_ERR_PROTOCOL_ERROR;
1273                         goto done;
1274                 }
1275         }
1276
1277         ret = ltdb_store(module, msg2, TDB_MODIFY);
1278         if (ret != LDB_SUCCESS) {
1279                 goto done;
1280         }
1281
1282         ret = ltdb_modified(module, msg2->dn);
1283         if (ret != LDB_SUCCESS) {
1284                 goto done;
1285         }
1286
1287 done:
1288         TALLOC_FREE(mem_ctx);
1289         return ret;
1290 }
1291
1292 /*
1293   modify a record
1294 */
1295 static int ltdb_modify(struct ltdb_context *ctx)
1296 {
1297         struct ldb_module *module = ctx->module;
1298         struct ldb_request *req = ctx->req;
1299         int ret = LDB_SUCCESS;
1300
1301         ret = ltdb_check_special_dn(module, req->op.mod.message);
1302         if (ret != LDB_SUCCESS) {
1303                 return ret;
1304         }
1305
1306         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1307
1308         if (ltdb_cache_load(module) != 0) {
1309                 return LDB_ERR_OPERATIONS_ERROR;
1310         }
1311
1312         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1313
1314         return ret;
1315 }
1316
1317 /*
1318   rename a record
1319 */
1320 static int ltdb_rename(struct ltdb_context *ctx)
1321 {
1322         struct ldb_module *module = ctx->module;
1323         void *data = ldb_module_get_private(module);
1324         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1325         struct ldb_request *req = ctx->req;
1326         struct ldb_message *msg;
1327         int ret = LDB_SUCCESS;
1328         TDB_DATA tdb_key, tdb_key_old;
1329         struct ldb_dn *db_dn;
1330
1331         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1332
1333         if (ltdb_cache_load(ctx->module) != 0) {
1334                 return LDB_ERR_OPERATIONS_ERROR;
1335         }
1336
1337         msg = ldb_msg_new(ctx);
1338         if (msg == NULL) {
1339                 return LDB_ERR_OPERATIONS_ERROR;
1340         }
1341
1342         /* we need to fetch the old record to re-add under the new name */
1343         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1344                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1345         if (ret != LDB_SUCCESS) {
1346                 /* not finding the old record is an error */
1347                 return ret;
1348         }
1349
1350         /* We need to, before changing the DB, check if the new DN
1351          * exists, so we can return this error to the caller with an
1352          * unmodified DB
1353          *
1354          * Even in GUID index mode we use ltdb_key_dn() as we are
1355          * trying to figure out if this is just a case rename
1356          */
1357         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1358         if (!tdb_key.dptr) {
1359                 talloc_free(msg);
1360                 return LDB_ERR_OPERATIONS_ERROR;
1361         }
1362
1363         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1364         if (!tdb_key_old.dptr) {
1365                 talloc_free(msg);
1366                 talloc_free(tdb_key.dptr);
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         /*
1371          * Only declare a conflict if the new DN already exists,
1372          * and it isn't a case change on the old DN
1373          */
1374         if (tdb_key_old.dsize != tdb_key.dsize
1375             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1376                 ret = ltdb_search_base(module, msg,
1377                                        req->op.rename.newdn,
1378                                        &db_dn);
1379                 if (ret == LDB_SUCCESS) {
1380                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1381                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1382                         ret = LDB_SUCCESS;
1383                 }
1384         }
1385
1386         /* finding the new record already in the DB is an error */
1387
1388         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1389                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1390                                        "Entry %s already exists",
1391                                        ldb_dn_get_linearized(req->op.rename.newdn));
1392         }
1393         if (ret != LDB_SUCCESS) {
1394                 talloc_free(tdb_key_old.dptr);
1395                 talloc_free(tdb_key.dptr);
1396                 talloc_free(msg);
1397                 return ret;
1398         }
1399
1400         talloc_free(tdb_key_old.dptr);
1401         talloc_free(tdb_key.dptr);
1402
1403         /* Always delete first then add, to avoid conflicts with
1404          * unique indexes. We rely on the transaction to make this
1405          * atomic
1406          */
1407         ret = ltdb_delete_internal(module, msg->dn);
1408         if (ret != LDB_SUCCESS) {
1409                 talloc_free(msg);
1410                 return ret;
1411         }
1412
1413         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1414         if (msg->dn == NULL) {
1415                 talloc_free(msg);
1416                 return LDB_ERR_OPERATIONS_ERROR;
1417         }
1418
1419         /* We don't check single value as we can have more than 1 with
1420          * deleted attributes. We could go through all elements but that's
1421          * maybe not the most efficient way
1422          */
1423         ret = ltdb_add_internal(module, ltdb, msg, false);
1424
1425         talloc_free(msg);
1426
1427         return ret;
1428 }
1429
1430 static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
1431 {
1432         return tdb_transaction_start(ltdb->tdb);
1433 }
1434
1435 static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
1436 {
1437         return tdb_transaction_cancel(ltdb->tdb);
1438 }
1439
1440 static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
1441 {
1442         return tdb_transaction_prepare_commit(ltdb->tdb);
1443 }
1444
1445 static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
1446 {
1447         return tdb_transaction_commit(ltdb->tdb);
1448 }
1449
1450 static int ltdb_start_trans(struct ldb_module *module)
1451 {
1452         void *data = ldb_module_get_private(module);
1453         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1454
1455         /* Do not take out the transaction lock on a read-only DB */
1456         if (ltdb->read_only) {
1457                 return LDB_ERR_UNWILLING_TO_PERFORM;
1458         }
1459
1460         if (ltdb->kv_ops->begin_write(ltdb) != 0) {
1461                 return ltdb->kv_ops->error(ltdb);
1462         }
1463
1464         ltdb->in_transaction++;
1465
1466         ltdb_index_transaction_start(module);
1467
1468         ltdb->reindex_failed = false;
1469
1470         return LDB_SUCCESS;
1471 }
1472
1473 /*
1474  * Forward declaration to allow prepare_commit to in fact abort the
1475  * transaction
1476  */
1477 static int ltdb_del_trans(struct ldb_module *module);
1478
1479 static int ltdb_prepare_commit(struct ldb_module *module)
1480 {
1481         int ret;
1482         void *data = ldb_module_get_private(module);
1483         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1484
1485         if (ltdb->in_transaction != 1) {
1486                 return LDB_SUCCESS;
1487         }
1488
1489         /*
1490          * Check if the last re-index failed.
1491          *
1492          * This can happen if for example a duplicate value was marked
1493          * unique.  We must not write a partial re-index into the DB.
1494          */
1495         if (ltdb->reindex_failed) {
1496                 /*
1497                  * We must instead abort the transaction so we get the
1498                  * old values and old index back
1499                  */
1500                 ltdb_del_trans(module);
1501                 ldb_set_errstring(ldb_module_get_ctx(module),
1502                                   "Failure during re-index, so "
1503                                   "transaction must be aborted.");
1504                 return LDB_ERR_OPERATIONS_ERROR;
1505         }
1506
1507         ret = ltdb_index_transaction_commit(module);
1508         if (ret != LDB_SUCCESS) {
1509                 ltdb->kv_ops->abort_write(ltdb);
1510                 ltdb->in_transaction--;
1511                 return ret;
1512         }
1513
1514         if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
1515                 ret = ltdb->kv_ops->error(ltdb);
1516                 ltdb->in_transaction--;
1517                 ldb_debug_set(ldb_module_get_ctx(module),
1518                               LDB_DEBUG_FATAL,
1519                               "Failure during "
1520                               "prepare_write): %s -> %s",
1521                               ltdb->kv_ops->errorstr(ltdb),
1522                               ldb_strerror(ret));
1523                 return ret;
1524         }
1525
1526         ltdb->prepared_commit = true;
1527
1528         return LDB_SUCCESS;
1529 }
1530
1531 static int ltdb_end_trans(struct ldb_module *module)
1532 {
1533         int ret;
1534         void *data = ldb_module_get_private(module);
1535         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1536
1537         if (!ltdb->prepared_commit) {
1538                 ret = ltdb_prepare_commit(module);
1539                 if (ret != LDB_SUCCESS) {
1540                         return ret;
1541                 }
1542         }
1543
1544         ltdb->in_transaction--;
1545         ltdb->prepared_commit = false;
1546
1547         if (ltdb->kv_ops->finish_write(ltdb) != 0) {
1548                 ret = ltdb->kv_ops->error(ltdb);
1549                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1550                                        "Failure during tdb_transaction_commit(): %s -> %s",
1551                                        ltdb->kv_ops->errorstr(ltdb),
1552                                        ldb_strerror(ret));
1553                 return ret;
1554         }
1555
1556         return LDB_SUCCESS;
1557 }
1558
1559 static int ltdb_del_trans(struct ldb_module *module)
1560 {
1561         void *data = ldb_module_get_private(module);
1562         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1563
1564         ltdb->in_transaction--;
1565
1566         if (ltdb_index_transaction_cancel(module) != 0) {
1567                 ltdb->kv_ops->abort_write(ltdb);
1568                 return ltdb->kv_ops->error(ltdb);
1569         }
1570
1571         ltdb->kv_ops->abort_write(ltdb);
1572         return LDB_SUCCESS;
1573 }
1574
1575 /*
1576   return sequenceNumber from @BASEINFO
1577 */
1578 static int ltdb_sequence_number(struct ltdb_context *ctx,
1579                                 struct ldb_extended **ext)
1580 {
1581         struct ldb_context *ldb;
1582         struct ldb_module *module = ctx->module;
1583         struct ldb_request *req = ctx->req;
1584         void *data = ldb_module_get_private(module);
1585         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1586         TALLOC_CTX *tmp_ctx = NULL;
1587         struct ldb_seqnum_request *seq;
1588         struct ldb_seqnum_result *res;
1589         struct ldb_message *msg = NULL;
1590         struct ldb_dn *dn;
1591         const char *date;
1592         int ret = LDB_SUCCESS;
1593
1594         ldb = ldb_module_get_ctx(module);
1595
1596         seq = talloc_get_type(req->op.extended.data,
1597                                 struct ldb_seqnum_request);
1598         if (seq == NULL) {
1599                 return LDB_ERR_OPERATIONS_ERROR;
1600         }
1601
1602         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1603
1604         if (ltdb->kv_ops->lock_read(module) != 0) {
1605                 return LDB_ERR_OPERATIONS_ERROR;
1606         }
1607
1608         res = talloc_zero(req, struct ldb_seqnum_result);
1609         if (res == NULL) {
1610                 ret = LDB_ERR_OPERATIONS_ERROR;
1611                 goto done;
1612         }
1613
1614         tmp_ctx = talloc_new(req);
1615         if (tmp_ctx == NULL) {
1616                 ret = LDB_ERR_OPERATIONS_ERROR;
1617                 goto done;
1618         }
1619
1620         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1621         if (dn == NULL) {
1622                 ret = LDB_ERR_OPERATIONS_ERROR;
1623                 goto done;
1624         }
1625
1626         msg = ldb_msg_new(tmp_ctx);
1627         if (msg == NULL) {
1628                 ret = LDB_ERR_OPERATIONS_ERROR;
1629                 goto done;
1630         }
1631
1632         ret = ltdb_search_dn1(module, dn, msg, 0);
1633         if (ret != LDB_SUCCESS) {
1634                 goto done;
1635         }
1636
1637         switch (seq->type) {
1638         case LDB_SEQ_HIGHEST_SEQ:
1639                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1640                 break;
1641         case LDB_SEQ_NEXT:
1642                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1643                 res->seq_num++;
1644                 break;
1645         case LDB_SEQ_HIGHEST_TIMESTAMP:
1646                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1647                 if (date) {
1648                         res->seq_num = ldb_string_to_time(date);
1649                 } else {
1650                         res->seq_num = 0;
1651                         /* zero is as good as anything when we don't know */
1652                 }
1653                 break;
1654         }
1655
1656         *ext = talloc_zero(req, struct ldb_extended);
1657         if (*ext == NULL) {
1658                 ret = LDB_ERR_OPERATIONS_ERROR;
1659                 goto done;
1660         }
1661         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1662         (*ext)->data = talloc_steal(*ext, res);
1663
1664 done:
1665         talloc_free(tmp_ctx);
1666
1667         ltdb->kv_ops->unlock_read(module);
1668         return ret;
1669 }
1670
1671 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1672 {
1673         struct ldb_context *ldb;
1674         struct ldb_request *req;
1675         struct ldb_reply *ares;
1676
1677         ldb = ldb_module_get_ctx(ctx->module);
1678         req = ctx->req;
1679
1680         /* if we already returned an error just return */
1681         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1682                 return;
1683         }
1684
1685         ares = talloc_zero(req, struct ldb_reply);
1686         if (!ares) {
1687                 ldb_oom(ldb);
1688                 req->callback(req, NULL);
1689                 return;
1690         }
1691         ares->type = LDB_REPLY_DONE;
1692         ares->error = error;
1693
1694         req->callback(req, ares);
1695 }
1696
1697 static void ltdb_timeout(struct tevent_context *ev,
1698                           struct tevent_timer *te,
1699                           struct timeval t,
1700                           void *private_data)
1701 {
1702         struct ltdb_context *ctx;
1703         ctx = talloc_get_type(private_data, struct ltdb_context);
1704
1705         if (!ctx->request_terminated) {
1706                 /* request is done now */
1707                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1708         }
1709
1710         if (ctx->spy) {
1711                 /* neutralize the spy */
1712                 ctx->spy->ctx = NULL;
1713                 ctx->spy = NULL;
1714         }
1715         talloc_free(ctx);
1716 }
1717
1718 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1719                                         struct ldb_extended *ext,
1720                                         int error)
1721 {
1722         struct ldb_context *ldb;
1723         struct ldb_request *req;
1724         struct ldb_reply *ares;
1725
1726         ldb = ldb_module_get_ctx(ctx->module);
1727         req = ctx->req;
1728
1729         /* if we already returned an error just return */
1730         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1731                 return;
1732         }
1733
1734         ares = talloc_zero(req, struct ldb_reply);
1735         if (!ares) {
1736                 ldb_oom(ldb);
1737                 req->callback(req, NULL);
1738                 return;
1739         }
1740         ares->type = LDB_REPLY_DONE;
1741         ares->response = ext;
1742         ares->error = error;
1743
1744         req->callback(req, ares);
1745 }
1746
1747 static void ltdb_handle_extended(struct ltdb_context *ctx)
1748 {
1749         struct ldb_extended *ext = NULL;
1750         int ret;
1751
1752         if (strcmp(ctx->req->op.extended.oid,
1753                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1754                 /* get sequence number */
1755                 ret = ltdb_sequence_number(ctx, &ext);
1756         } else {
1757                 /* not recognized */
1758                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1759         }
1760
1761         ltdb_request_extended_done(ctx, ext, ret);
1762 }
1763
1764 struct kv_ctx {
1765         ldb_kv_traverse_fn kv_traverse_fn;
1766         void *ctx;
1767         struct ltdb_private *ltdb;
1768         int (*parser)(struct ldb_val key,
1769                       struct ldb_val data,
1770                       void *private_data);
1771 };
1772
1773 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
1774 {
1775         struct kv_ctx *kv_ctx = ctx;
1776         struct ldb_val key = {
1777                 .length = tdb_key.dsize,
1778                 .data = tdb_key.dptr,
1779         };
1780         struct ldb_val data = {
1781                 .length = tdb_data.dsize,
1782                 .data = tdb_data.dptr,
1783         };
1784         return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
1785 }
1786
1787 static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
1788 {
1789         struct kv_ctx kv_ctx = {
1790                 .kv_traverse_fn = fn,
1791                 .ctx = ctx,
1792                 .ltdb = ltdb
1793         };
1794         if (ltdb->in_transaction != 0) {
1795                 return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1796         } else {
1797                 return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1798         }
1799 }
1800
1801 static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
1802                                       struct ldb_val ldb_key,
1803                                       struct ldb_val ldb_key2,
1804                                       struct ldb_val ldb_data, void *state)
1805 {
1806         int tdb_ret;
1807         struct ldb_context *ldb;
1808         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1809         struct ldb_module *module = ctx->module;
1810         TDB_DATA key = {
1811                 .dptr = ldb_key.data,
1812                 .dsize = ldb_key.length
1813         };
1814         TDB_DATA key2 = {
1815                 .dptr = ldb_key2.data,
1816                 .dsize = ldb_key2.length
1817         };
1818         TDB_DATA data = {
1819                 .dptr = ldb_data.data,
1820                 .dsize = ldb_data.length
1821         };
1822
1823         ldb = ldb_module_get_ctx(module);
1824
1825         tdb_ret = tdb_delete(ltdb->tdb, key);
1826         if (tdb_ret != 0) {
1827                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1828                           "Failed to delete %*.*s "
1829                           "for rekey as %*.*s: %s",
1830                           (int)key.dsize, (int)key.dsize,
1831                           (const char *)key.dptr,
1832                           (int)key2.dsize, (int)key2.dsize,
1833                           (const char *)key.dptr,
1834                           tdb_errorstr(ltdb->tdb));
1835                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1836                 return -1;
1837         }
1838         tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
1839         if (tdb_ret != 0) {
1840                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1841                           "Failed to rekey %*.*s as %*.*s: %s",
1842                           (int)key.dsize, (int)key.dsize,
1843                           (const char *)key.dptr,
1844                           (int)key2.dsize, (int)key2.dsize,
1845                           (const char *)key.dptr,
1846                           tdb_errorstr(ltdb->tdb));
1847                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1848                 return -1;
1849         }
1850         return tdb_ret;
1851 }
1852
1853 static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
1854                                          void *ctx)
1855 {
1856         struct kv_ctx *kv_ctx = ctx;
1857         struct ldb_val key = {
1858                 .length = tdb_key.dsize,
1859                 .data = tdb_key.dptr,
1860         };
1861         struct ldb_val data = {
1862                 .length = tdb_data.dsize,
1863                 .data = tdb_data.dptr,
1864         };
1865
1866         return kv_ctx->parser(key, data, kv_ctx->ctx);
1867 }
1868
1869 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
1870                                  struct ldb_val ldb_key,
1871                                  int (*parser)(struct ldb_val key,
1872                                                struct ldb_val data,
1873                                                void *private_data),
1874                                  void *ctx)
1875 {
1876         struct kv_ctx kv_ctx = {
1877                 .parser = parser,
1878                 .ctx = ctx,
1879                 .ltdb = ltdb
1880         };
1881         TDB_DATA key = {
1882                 .dptr = ldb_key.data,
1883                 .dsize = ldb_key.length
1884         };
1885         int ret;
1886
1887         ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
1888                                &kv_ctx);
1889         if (ret == 0) {
1890                 return LDB_SUCCESS;
1891         }
1892         return ltdb_err_map(tdb_error(ltdb->tdb));
1893 }
1894
1895 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1896 {
1897         return tdb_name(ltdb->tdb);
1898 }
1899
1900 static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
1901 {
1902         int seq = tdb_get_seqnum(ltdb->tdb);
1903         bool has_changed = (seq != ltdb->tdb_seqnum);
1904
1905         ltdb->tdb_seqnum = seq;
1906
1907         return has_changed;
1908 }
1909
1910 static const struct kv_db_ops key_value_ops = {
1911         .store = ltdb_tdb_store,
1912         .delete = ltdb_tdb_delete,
1913         .iterate = ltdb_tdb_traverse_fn,
1914         .update_in_iterate = ltdb_tdb_update_in_iterate,
1915         .fetch_and_parse = ltdb_tdb_parse_record,
1916         .lock_read = ltdb_lock_read,
1917         .unlock_read = ltdb_unlock_read,
1918         .begin_write = ltdb_tdb_transaction_start,
1919         .prepare_write = ltdb_tdb_transaction_prepare_commit,
1920         .finish_write = ltdb_tdb_transaction_commit,
1921         .abort_write = ltdb_tdb_transaction_cancel,
1922         .error = ltdb_error,
1923         .errorstr = ltdb_errorstr,
1924         .name = ltdb_tdb_name,
1925         .has_changed = ltdb_tdb_changed,
1926 };
1927
1928 static void ltdb_callback(struct tevent_context *ev,
1929                           struct tevent_timer *te,
1930                           struct timeval t,
1931                           void *private_data)
1932 {
1933         struct ltdb_context *ctx;
1934         int ret;
1935
1936         ctx = talloc_get_type(private_data, struct ltdb_context);
1937
1938         if (ctx->request_terminated) {
1939                 goto done;
1940         }
1941
1942         switch (ctx->req->operation) {
1943         case LDB_SEARCH:
1944                 ret = ltdb_search(ctx);
1945                 break;
1946         case LDB_ADD:
1947                 ret = ltdb_add(ctx);
1948                 break;
1949         case LDB_MODIFY:
1950                 ret = ltdb_modify(ctx);
1951                 break;
1952         case LDB_DELETE:
1953                 ret = ltdb_delete(ctx);
1954                 break;
1955         case LDB_RENAME:
1956                 ret = ltdb_rename(ctx);
1957                 break;
1958         case LDB_EXTENDED:
1959                 ltdb_handle_extended(ctx);
1960                 goto done;
1961         default:
1962                 /* no other op supported */
1963                 ret = LDB_ERR_PROTOCOL_ERROR;
1964         }
1965
1966         if (!ctx->request_terminated) {
1967                 /* request is done now */
1968                 ltdb_request_done(ctx, ret);
1969         }
1970
1971 done:
1972         if (ctx->spy) {
1973                 /* neutralize the spy */
1974                 ctx->spy->ctx = NULL;
1975                 ctx->spy = NULL;
1976         }
1977         talloc_free(ctx);
1978 }
1979
1980 static int ltdb_request_destructor(void *ptr)
1981 {
1982         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1983
1984         if (spy->ctx != NULL) {
1985                 spy->ctx->spy = NULL;
1986                 spy->ctx->request_terminated = true;
1987                 spy->ctx = NULL;
1988         }
1989
1990         return 0;
1991 }
1992
1993 static int ltdb_handle_request(struct ldb_module *module,
1994                                struct ldb_request *req)
1995 {
1996         struct ldb_control *control_permissive;
1997         struct ldb_context *ldb;
1998         struct tevent_context *ev;
1999         struct ltdb_context *ac;
2000         struct tevent_timer *te;
2001         struct timeval tv;
2002         unsigned int i;
2003
2004         ldb = ldb_module_get_ctx(module);
2005
2006         control_permissive = ldb_request_get_control(req,
2007                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2008
2009         for (i = 0; req->controls && req->controls[i]; i++) {
2010                 if (req->controls[i]->critical &&
2011                     req->controls[i] != control_permissive) {
2012                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
2013                                                req->controls[i]->oid);
2014                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
2015                 }
2016         }
2017
2018         if (req->starttime == 0 || req->timeout == 0) {
2019                 ldb_set_errstring(ldb, "Invalid timeout settings");
2020                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
2021         }
2022
2023         ev = ldb_handle_get_event_context(req->handle);
2024
2025         ac = talloc_zero(ldb, struct ltdb_context);
2026         if (ac == NULL) {
2027                 ldb_oom(ldb);
2028                 return LDB_ERR_OPERATIONS_ERROR;
2029         }
2030
2031         ac->module = module;
2032         ac->req = req;
2033
2034         tv.tv_sec = 0;
2035         tv.tv_usec = 0;
2036         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
2037         if (NULL == te) {
2038                 talloc_free(ac);
2039                 return LDB_ERR_OPERATIONS_ERROR;
2040         }
2041
2042         if (req->timeout > 0) {
2043                 tv.tv_sec = req->starttime + req->timeout;
2044                 tv.tv_usec = 0;
2045                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
2046                                                      ltdb_timeout, ac);
2047                 if (NULL == ac->timeout_event) {
2048                         talloc_free(ac);
2049                         return LDB_ERR_OPERATIONS_ERROR;
2050                 }
2051         }
2052
2053         /* set a spy so that we do not try to use the request context
2054          * if it is freed before ltdb_callback fires */
2055         ac->spy = talloc(req, struct ltdb_req_spy);
2056         if (NULL == ac->spy) {
2057                 talloc_free(ac);
2058                 return LDB_ERR_OPERATIONS_ERROR;
2059         }
2060         ac->spy->ctx = ac;
2061
2062         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
2063
2064         return LDB_SUCCESS;
2065 }
2066
2067 static int ltdb_init_rootdse(struct ldb_module *module)
2068 {
2069         /* ignore errors on this - we expect it for non-sam databases */
2070         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2071
2072         /* there can be no module beyond the backend, just return */
2073         return LDB_SUCCESS;
2074 }
2075
2076
2077 static int generic_lock_read(struct ldb_module *module)
2078 {
2079         void *data = ldb_module_get_private(module);
2080         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2081         return ltdb->kv_ops->lock_read(module);
2082 }
2083
2084 static int generic_unlock_read(struct ldb_module *module)
2085 {
2086         void *data = ldb_module_get_private(module);
2087         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2088         return ltdb->kv_ops->unlock_read(module);
2089 }
2090
2091 static const struct ldb_module_ops ltdb_ops = {
2092         .name              = "tdb",
2093         .init_context      = ltdb_init_rootdse,
2094         .search            = ltdb_handle_request,
2095         .add               = ltdb_handle_request,
2096         .modify            = ltdb_handle_request,
2097         .del               = ltdb_handle_request,
2098         .rename            = ltdb_handle_request,
2099         .extended          = ltdb_handle_request,
2100         .start_transaction = ltdb_start_trans,
2101         .end_transaction   = ltdb_end_trans,
2102         .prepare_commit    = ltdb_prepare_commit,
2103         .del_transaction   = ltdb_del_trans,
2104         .read_lock         = generic_lock_read,
2105         .read_unlock       = generic_unlock_read,
2106 };
2107
2108 int init_store(struct ltdb_private *ltdb,
2109                       const char *name,
2110                       struct ldb_context *ldb,
2111                       const char *options[],
2112                       struct ldb_module **_module)
2113 {
2114         struct ldb_module *module;
2115
2116         if (getenv("LDB_WARN_UNINDEXED")) {
2117                 ltdb->warn_unindexed = true;
2118         }
2119
2120         if (getenv("LDB_WARN_REINDEX")) {
2121                 ltdb->warn_reindex = true;
2122         }
2123
2124         ltdb->sequence_number = 0;
2125
2126         module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
2127         if (!module) {
2128                 ldb_oom(ldb);
2129                 talloc_free(ltdb);
2130                 return LDB_ERR_OPERATIONS_ERROR;
2131         }
2132         ldb_module_set_private(module, ltdb);
2133         talloc_steal(module, ltdb);
2134
2135         if (ltdb_cache_load(module) != 0) {
2136                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2137                                        "records for backend '%s'", name);
2138                 talloc_free(module);
2139                 return LDB_ERR_OPERATIONS_ERROR;
2140         }
2141
2142         *_module = module;
2143         /*
2144          * Set the maximum key length
2145          */
2146         {
2147                 const char *len_str =
2148                         ldb_options_find(ldb, options,
2149                                          "max_key_len_for_self_test");
2150                 if (len_str != NULL) {
2151                         unsigned len = strtoul(len_str, NULL, 0);
2152                         ltdb->max_key_length = len;
2153                 }
2154         }
2155
2156         return LDB_SUCCESS;
2157 }
2158
2159 /*
2160   connect to the database
2161 */
2162 int ltdb_connect(struct ldb_context *ldb, const char *url,
2163                  unsigned int flags, const char *options[],
2164                  struct ldb_module **_module)
2165 {
2166         const char *path;
2167         int tdb_flags, open_flags;
2168         struct ltdb_private *ltdb;
2169
2170         /*
2171          * We hold locks, so we must use a private event context
2172          * on each returned handle
2173          */
2174         ldb_set_require_private_event_context(ldb);
2175
2176         /* parse the url */
2177         if (strchr(url, ':')) {
2178                 if (strncmp(url, "tdb://", 6) != 0) {
2179                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2180                                   "Invalid tdb URL '%s'", url);
2181                         return LDB_ERR_OPERATIONS_ERROR;
2182                 }
2183                 path = url+6;
2184         } else {
2185                 path = url;
2186         }
2187
2188         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
2189
2190         /* check for the 'nosync' option */
2191         if (flags & LDB_FLG_NOSYNC) {
2192                 tdb_flags |= TDB_NOSYNC;
2193         }
2194
2195         /* and nommap option */
2196         if (flags & LDB_FLG_NOMMAP) {
2197                 tdb_flags |= TDB_NOMMAP;
2198         }
2199
2200         ltdb = talloc_zero(ldb, struct ltdb_private);
2201         if (!ltdb) {
2202                 ldb_oom(ldb);
2203                 return LDB_ERR_OPERATIONS_ERROR;
2204         }
2205
2206         if (flags & LDB_FLG_RDONLY) {
2207                 /*
2208                  * This is weird, but because we can only have one tdb
2209                  * in this process, and the other one could be
2210                  * read-write, we can't use the tdb readonly.  Plus a
2211                  * read only tdb prohibits the all-record lock.
2212                  */
2213                 open_flags = O_RDWR;
2214
2215                 ltdb->read_only = true;
2216
2217         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
2218                 /*
2219                  * This is used by ldbsearch to prevent creation of the database
2220                  * if the name is wrong
2221                  */
2222                 open_flags = O_RDWR;
2223         } else {
2224                 /*
2225                  * This is the normal case
2226                  */
2227                 open_flags = O_CREAT | O_RDWR;
2228         }
2229
2230         ltdb->kv_ops = &key_value_ops;
2231
2232         /* note that we use quite a large default hash size */
2233         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
2234                                    tdb_flags, open_flags,
2235                                    ldb_get_create_perms(ldb), ldb);
2236         if (!ltdb->tdb) {
2237                 ldb_asprintf_errstring(ldb,
2238                                        "Unable to open tdb '%s': %s", path, strerror(errno));
2239                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2240                           "Unable to open tdb '%s': %s", path, strerror(errno));
2241                 talloc_free(ltdb);
2242                 if (errno == EACCES || errno == EPERM) {
2243                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
2244                 }
2245                 return LDB_ERR_OPERATIONS_ERROR;
2246         }
2247
2248         return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
2249 }