873ebbcab0e58b10ab5d662412c131580627fcb6
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 static int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 static int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         if (ret != LDB_SUCCESS) {
414                 ltdb->reindex_failed = true;
415         }
416
417         return ret;
418 }
419
420 static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
421                           struct ldb_val ldb_data, int flags)
422 {
423         TDB_DATA key = {
424                 .dptr = ldb_key.data,
425                 .dsize = ldb_key.length
426         };
427         TDB_DATA data = {
428                 .dptr = ldb_data.data,
429                 .dsize = ldb_data.length
430         };
431         bool transaction_active = tdb_transaction_active(ltdb->tdb);
432         if (transaction_active == false){
433                 return LDB_ERR_PROTOCOL_ERROR;
434         }
435         return tdb_store(ltdb->tdb, key, data, flags);
436 }
437
438 static int ltdb_error(struct ltdb_private *ltdb)
439 {
440         return ltdb_err_map(tdb_error(ltdb->tdb));
441 }
442
443 static const char *ltdb_errorstr(struct ltdb_private *ltdb)
444 {
445         return tdb_errorstr(ltdb->tdb);
446 }
447
448 /*
449   store a record into the db
450 */
451 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
452 {
453         void *data = ldb_module_get_private(module);
454         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
455         TDB_DATA tdb_key;
456         struct ldb_val ldb_key;
457         struct ldb_val ldb_data;
458         int ret = LDB_SUCCESS;
459         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
460
461         if (tdb_key_ctx == NULL) {
462                 return ldb_module_oom(module);
463         }
464
465         if (ltdb->read_only) {
466                 return LDB_ERR_UNWILLING_TO_PERFORM;
467         }
468
469         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
470         if (tdb_key.dptr == NULL) {
471                 TALLOC_FREE(tdb_key_ctx);
472                 return LDB_ERR_OTHER;
473         }
474
475         ret = ldb_pack_data(ldb_module_get_ctx(module),
476                             msg, &ldb_data);
477         if (ret == -1) {
478                 TALLOC_FREE(tdb_key_ctx);
479                 return LDB_ERR_OTHER;
480         }
481
482         ldb_key.data = tdb_key.dptr;
483         ldb_key.length = tdb_key.dsize;
484
485         ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
486         if (ret != 0) {
487                 bool is_special = ldb_dn_is_special(msg->dn);
488                 ret = ltdb->kv_ops->error(ltdb);
489
490                 /*
491                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
492                  * the GUID, so re-map
493                  */
494                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
495                     && !is_special
496                     && ltdb->cache->GUID_index_attribute != NULL) {
497                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
498                 }
499                 goto done;
500         }
501
502 done:
503         TALLOC_FREE(tdb_key_ctx);
504         talloc_free(ldb_data.data);
505
506         return ret;
507 }
508
509
510 /*
511   check if a attribute is a single valued, for a given element
512  */
513 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
514                                   struct ldb_message_element *el)
515 {
516         if (!a) return false;
517         if (el != NULL) {
518                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
519                         /* override from a ldb module, for example
520                            used for the description field, which is
521                            marked multi-valued in the schema but which
522                            should not actually accept multiple
523                            values */
524                         return true;
525                 }
526                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
527                         /* override from a ldb module, for example used for
528                            deleted linked attribute entries */
529                         return false;
530                 }
531         }
532         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
533                 return true;
534         }
535         return false;
536 }
537
538 static int ltdb_add_internal(struct ldb_module *module,
539                              struct ltdb_private *ltdb,
540                              const struct ldb_message *msg,
541                              bool check_single_value)
542 {
543         struct ldb_context *ldb = ldb_module_get_ctx(module);
544         int ret = LDB_SUCCESS;
545         unsigned int i;
546
547         for (i=0;i<msg->num_elements;i++) {
548                 struct ldb_message_element *el = &msg->elements[i];
549                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
550
551                 if (el->num_values == 0) {
552                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
553                                                el->name, ldb_dn_get_linearized(msg->dn));
554                         return LDB_ERR_CONSTRAINT_VIOLATION;
555                 }
556                 if (check_single_value &&
557                                 el->num_values > 1 &&
558                                 ldb_tdb_single_valued(a, el)) {
559                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
560                                                el->name, ldb_dn_get_linearized(msg->dn));
561                         return LDB_ERR_CONSTRAINT_VIOLATION;
562                 }
563
564                 /* Do not check "@ATTRIBUTES" for duplicated values */
565                 if (ldb_dn_is_special(msg->dn) &&
566                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
567                         continue;
568                 }
569
570                 if (check_single_value &&
571                     !(el->flags &
572                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
573                         struct ldb_val *duplicate = NULL;
574
575                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
576                                                          el, &duplicate, 0);
577                         if (ret != LDB_SUCCESS) {
578                                 return ret;
579                         }
580                         if (duplicate != NULL) {
581                                 ldb_asprintf_errstring(
582                                         ldb,
583                                         "attribute '%s': value '%.*s' on '%s' "
584                                         "provided more than once in ADD object",
585                                         el->name,
586                                         (int)duplicate->length,
587                                         duplicate->data,
588                                         ldb_dn_get_linearized(msg->dn));
589                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
590                         }
591                 }
592         }
593
594         ret = ltdb_store(module, msg, TDB_INSERT);
595         if (ret != LDB_SUCCESS) {
596                 /*
597                  * Try really hard to get the right error code for
598                  * a re-add situation, as this can matter!
599                  */
600                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
601                         int ret2;
602                         struct ldb_dn *dn2 = NULL;
603                         TALLOC_CTX *mem_ctx = talloc_new(module);
604                         if (mem_ctx == NULL) {
605                                 return ldb_module_operr(module);
606                         }
607                         ret2 = ltdb_search_base(module, module,
608                                                 msg->dn, &dn2);
609                         TALLOC_FREE(mem_ctx);
610                         if (ret2 == LDB_SUCCESS) {
611                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
612                         }
613                 }
614                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
615                         ldb_asprintf_errstring(ldb,
616                                                "Entry %s already exists",
617                                                ldb_dn_get_linearized(msg->dn));
618                 }
619                 return ret;
620         }
621
622         ret = ltdb_index_add_new(module, ltdb, msg);
623         if (ret != LDB_SUCCESS) {
624                 /*
625                  * If we failed to index, delete the message again.
626                  *
627                  * This is particularly important for the GUID index
628                  * case, which will only fail for a duplicate DN
629                  * in the index add.
630                  *
631                  * Note that the caller may not cancel the transation
632                  * and this means the above add might really show up!
633                  */
634                 ltdb_delete_noindex(module, msg);
635                 return ret;
636         }
637
638         ret = ltdb_modified(module, msg->dn);
639
640         return ret;
641 }
642
643 /*
644   add a record to the database
645 */
646 static int ltdb_add(struct ltdb_context *ctx)
647 {
648         struct ldb_module *module = ctx->module;
649         struct ldb_request *req = ctx->req;
650         void *data = ldb_module_get_private(module);
651         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
652         int ret = LDB_SUCCESS;
653
654         if (ltdb->max_key_length != 0 &&
655             ltdb->cache->GUID_index_attribute == NULL &&
656             !ldb_dn_is_special(req->op.add.message->dn))
657         {
658                 ldb_set_errstring(ldb_module_get_ctx(module),
659                                   "Must operate ldb_mdb in GUID "
660                                   "index mode, but " LTDB_IDXGUID " not set.");
661                 return LDB_ERR_UNWILLING_TO_PERFORM;
662         }
663
664         ret = ltdb_check_special_dn(module, req->op.add.message);
665         if (ret != LDB_SUCCESS) {
666                 return ret;
667         }
668
669         ldb_request_set_state(req, LDB_ASYNC_PENDING);
670
671         if (ltdb_cache_load(module) != 0) {
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674
675         ret = ltdb_add_internal(module, ltdb,
676                                 req->op.add.message, true);
677
678         return ret;
679 }
680
681 static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
682 {
683         TDB_DATA tdb_key = {
684                 .dptr = ldb_key.data,
685                 .dsize = ldb_key.length
686         };
687         bool transaction_active = tdb_transaction_active(ltdb->tdb);
688         if (transaction_active == false){
689                 return LDB_ERR_PROTOCOL_ERROR;
690         }
691         return tdb_delete(ltdb->tdb, tdb_key);
692 }
693
694 /*
695   delete a record from the database, not updating indexes (used for deleting
696   index records)
697 */
698 int ltdb_delete_noindex(struct ldb_module *module,
699                         const struct ldb_message *msg)
700 {
701         void *data = ldb_module_get_private(module);
702         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
703         struct ldb_val ldb_key;
704         TDB_DATA tdb_key;
705         int ret;
706         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
707
708         if (tdb_key_ctx == NULL) {
709                 return ldb_module_oom(module);
710         }
711
712         if (ltdb->read_only) {
713                 return LDB_ERR_UNWILLING_TO_PERFORM;
714         }
715
716         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
717         if (!tdb_key.dptr) {
718                 TALLOC_FREE(tdb_key_ctx);
719                 return LDB_ERR_OTHER;
720         }
721
722         ldb_key.data = tdb_key.dptr;
723         ldb_key.length = tdb_key.dsize;
724
725         ret = ltdb->kv_ops->delete(ltdb, ldb_key);
726         TALLOC_FREE(tdb_key_ctx);
727
728         if (ret != 0) {
729                 ret = ltdb->kv_ops->error(ltdb);
730         }
731
732         return ret;
733 }
734
735 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
736 {
737         struct ldb_message *msg;
738         int ret = LDB_SUCCESS;
739
740         msg = ldb_msg_new(module);
741         if (msg == NULL) {
742                 return LDB_ERR_OPERATIONS_ERROR;
743         }
744
745         /* in case any attribute of the message was indexed, we need
746            to fetch the old record */
747         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
748         if (ret != LDB_SUCCESS) {
749                 /* not finding the old record is an error */
750                 goto done;
751         }
752
753         ret = ltdb_delete_noindex(module, msg);
754         if (ret != LDB_SUCCESS) {
755                 goto done;
756         }
757
758         /* remove any indexed attributes */
759         ret = ltdb_index_delete(module, msg);
760         if (ret != LDB_SUCCESS) {
761                 goto done;
762         }
763
764         ret = ltdb_modified(module, dn);
765         if (ret != LDB_SUCCESS) {
766                 goto done;
767         }
768
769 done:
770         talloc_free(msg);
771         return ret;
772 }
773
774 /*
775   delete a record from the database
776 */
777 static int ltdb_delete(struct ltdb_context *ctx)
778 {
779         struct ldb_module *module = ctx->module;
780         struct ldb_request *req = ctx->req;
781         int ret = LDB_SUCCESS;
782
783         ldb_request_set_state(req, LDB_ASYNC_PENDING);
784
785         if (ltdb_cache_load(module) != 0) {
786                 return LDB_ERR_OPERATIONS_ERROR;
787         }
788
789         ret = ltdb_delete_internal(module, req->op.del.dn);
790
791         return ret;
792 }
793
794 /*
795   find an element by attribute name. At the moment this does a linear search,
796   it should be re-coded to use a binary search once all places that modify
797   records guarantee sorted order
798
799   return the index of the first matching element if found, otherwise -1
800 */
801 static int find_element(const struct ldb_message *msg, const char *name)
802 {
803         unsigned int i;
804         for (i=0;i<msg->num_elements;i++) {
805                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
806                         return i;
807                 }
808         }
809         return -1;
810 }
811
812
813 /*
814   add an element to an existing record. Assumes a elements array that we
815   can call re-alloc on, and assumed that we can re-use the data pointers from
816   the passed in additional values. Use with care!
817
818   returns 0 on success, -1 on failure (and sets errno)
819 */
820 static int ltdb_msg_add_element(struct ldb_message *msg,
821                                 struct ldb_message_element *el)
822 {
823         struct ldb_message_element *e2;
824         unsigned int i;
825
826         if (el->num_values == 0) {
827                 /* nothing to do here - we don't add empty elements */
828                 return 0;
829         }
830
831         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
832                               msg->num_elements+1);
833         if (!e2) {
834                 errno = ENOMEM;
835                 return -1;
836         }
837
838         msg->elements = e2;
839
840         e2 = &msg->elements[msg->num_elements];
841
842         e2->name = el->name;
843         e2->flags = el->flags;
844         e2->values = talloc_array(msg->elements,
845                                   struct ldb_val, el->num_values);
846         if (!e2->values) {
847                 errno = ENOMEM;
848                 return -1;
849         }
850         for (i=0;i<el->num_values;i++) {
851                 e2->values[i] = el->values[i];
852         }
853         e2->num_values = el->num_values;
854
855         ++msg->num_elements;
856
857         return 0;
858 }
859
860 /*
861   delete all elements having a specified attribute name
862 */
863 static int msg_delete_attribute(struct ldb_module *module,
864                                 struct ltdb_private *ltdb,
865                                 struct ldb_message *msg, const char *name)
866 {
867         unsigned int i;
868         int ret;
869         struct ldb_message_element *el;
870         bool is_special = ldb_dn_is_special(msg->dn);
871
872         if (!is_special
873             && ltdb->cache->GUID_index_attribute != NULL
874             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
875                 struct ldb_context *ldb = ldb_module_get_ctx(module);
876                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
877                                        "attribute %s (used as DB index)",
878                                        ltdb->cache->GUID_index_attribute);
879                 return LDB_ERR_CONSTRAINT_VIOLATION;
880         }
881
882         el = ldb_msg_find_element(msg, name);
883         if (el == NULL) {
884                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
885         }
886         i = el - msg->elements;
887
888         ret = ltdb_index_del_element(module, ltdb, msg, el);
889         if (ret != LDB_SUCCESS) {
890                 return ret;
891         }
892
893         talloc_free(el->values);
894         if (msg->num_elements > (i+1)) {
895                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
896         }
897         msg->num_elements--;
898         msg->elements = talloc_realloc(msg, msg->elements,
899                                        struct ldb_message_element,
900                                        msg->num_elements);
901         return LDB_SUCCESS;
902 }
903
904 /*
905   delete all elements matching an attribute name/value
906
907   return LDB Error on failure
908 */
909 static int msg_delete_element(struct ldb_module *module,
910                               struct ltdb_private *ltdb,
911                               struct ldb_message *msg,
912                               const char *name,
913                               const struct ldb_val *val)
914 {
915         struct ldb_context *ldb = ldb_module_get_ctx(module);
916         unsigned int i;
917         int found, ret;
918         struct ldb_message_element *el;
919         const struct ldb_schema_attribute *a;
920
921         found = find_element(msg, name);
922         if (found == -1) {
923                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
924         }
925
926         i = (unsigned int) found;
927         el = &(msg->elements[i]);
928
929         a = ldb_schema_attribute_by_name(ldb, el->name);
930
931         for (i=0;i<el->num_values;i++) {
932                 bool matched;
933                 if (a->syntax->operator_fn) {
934                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
935                                                      &el->values[i], val, &matched);
936                         if (ret != LDB_SUCCESS) return ret;
937                 } else {
938                         matched = (a->syntax->comparison_fn(ldb, ldb,
939                                                             &el->values[i], val) == 0);
940                 }
941                 if (matched) {
942                         if (el->num_values == 1) {
943                                 return msg_delete_attribute(module,
944                                                             ltdb, msg, name);
945                         }
946
947                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
948                         if (ret != LDB_SUCCESS) {
949                                 return ret;
950                         }
951
952                         if (i<el->num_values-1) {
953                                 memmove(&el->values[i], &el->values[i+1],
954                                         sizeof(el->values[i])*
955                                                 (el->num_values-(i+1)));
956                         }
957                         el->num_values--;
958
959                         /* per definition we find in a canonicalised message an
960                            attribute value only once. So we are finished here */
961                         return LDB_SUCCESS;
962                 }
963         }
964
965         /* Not found */
966         return LDB_ERR_NO_SUCH_ATTRIBUTE;
967 }
968
969 /*
970   modify a record - internal interface
971
972   yuck - this is O(n^2). Luckily n is usually small so we probably
973   get away with it, but if we ever have really large attribute lists
974   then we'll need to look at this again
975
976   'req' is optional, and is used to specify controls if supplied
977 */
978 int ltdb_modify_internal(struct ldb_module *module,
979                          const struct ldb_message *msg,
980                          struct ldb_request *req)
981 {
982         struct ldb_context *ldb = ldb_module_get_ctx(module);
983         void *data = ldb_module_get_private(module);
984         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
985         struct ldb_message *msg2;
986         unsigned int i, j;
987         int ret = LDB_SUCCESS, idx;
988         struct ldb_control *control_permissive = NULL;
989         TALLOC_CTX *mem_ctx = talloc_new(req);
990
991         if (mem_ctx == NULL) {
992                 return ldb_module_oom(module);
993         }
994         
995         if (req) {
996                 control_permissive = ldb_request_get_control(req,
997                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
998         }
999
1000         msg2 = ldb_msg_new(mem_ctx);
1001         if (msg2 == NULL) {
1002                 ret = LDB_ERR_OTHER;
1003                 goto done;
1004         }
1005
1006         ret = ltdb_search_dn1(module, msg->dn,
1007                               msg2,
1008                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1009         if (ret != LDB_SUCCESS) {
1010                 goto done;
1011         }
1012
1013         for (i=0; i<msg->num_elements; i++) {
1014                 struct ldb_message_element *el = &msg->elements[i], *el2;
1015                 struct ldb_val *vals;
1016                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
1017                 const char *dn;
1018                 uint32_t options = 0;
1019                 if (control_permissive != NULL) {
1020                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
1021                 }
1022
1023                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
1024                 case LDB_FLAG_MOD_ADD:
1025
1026                         if (el->num_values == 0) {
1027                                 ldb_asprintf_errstring(ldb,
1028                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
1029                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1030                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
1031                                 goto done;
1032                         }
1033
1034                         /* make a copy of the array so that a permissive
1035                          * control can remove duplicates without changing the
1036                          * original values, but do not copy data as we do not
1037                          * need to keep it around once the operation is
1038                          * finished */
1039                         if (control_permissive) {
1040                                 el = talloc(msg2, struct ldb_message_element);
1041                                 if (!el) {
1042                                         ret = LDB_ERR_OTHER;
1043                                         goto done;
1044                                 }
1045                                 *el = msg->elements[i];
1046                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1047                                 if (el->values == NULL) {
1048                                         ret = LDB_ERR_OTHER;
1049                                         goto done;
1050                                 }
1051                                 for (j = 0; j < el->num_values; j++) {
1052                                         el->values[j] = msg->elements[i].values[j];
1053                                 }
1054                         }
1055
1056                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1057                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1058                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1059                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1060                                 goto done;
1061                         }
1062
1063                         /* Checks if element already exists */
1064                         idx = find_element(msg2, el->name);
1065                         if (idx == -1) {
1066                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1067                                         ret = LDB_ERR_OTHER;
1068                                         goto done;
1069                                 }
1070                                 ret = ltdb_index_add_element(module, ltdb,
1071                                                              msg2,
1072                                                              el);
1073                                 if (ret != LDB_SUCCESS) {
1074                                         goto done;
1075                                 }
1076                         } else {
1077                                 j = (unsigned int) idx;
1078                                 el2 = &(msg2->elements[j]);
1079
1080                                 /* We cannot add another value on a existing one
1081                                    if the attribute is single-valued */
1082                                 if (ldb_tdb_single_valued(a, el)) {
1083                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1084                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1085                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1086                                         goto done;
1087                                 }
1088
1089                                 /* Check that values don't exist yet on multi-
1090                                    valued attributes or aren't provided twice */
1091                                 if (!(el->flags &
1092                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1093                                         struct ldb_val *duplicate = NULL;
1094                                         ret = ldb_msg_find_common_values(ldb,
1095                                                                          msg2,
1096                                                                          el,
1097                                                                          el2,
1098                                                                          options);
1099
1100                                         if (ret ==
1101                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1102                                                 ldb_asprintf_errstring(ldb,
1103                                                         "attribute '%s': value "
1104                                                         "#%u on '%s' already "
1105                                                         "exists", el->name, j,
1106                                                         ldb_dn_get_linearized(msg2->dn));
1107                                                 goto done;
1108                                         } else if (ret != LDB_SUCCESS) {
1109                                                 goto done;
1110                                         }
1111
1112                                         ret = ldb_msg_find_duplicate_val(
1113                                                 ldb, msg2, el, &duplicate, 0);
1114                                         if (ret != LDB_SUCCESS) {
1115                                                 goto done;
1116                                         }
1117                                         if (duplicate != NULL) {
1118                                                 ldb_asprintf_errstring(
1119                                                         ldb,
1120                                                         "attribute '%s': value "
1121                                                         "'%.*s' on '%s' "
1122                                                         "provided more than "
1123                                                         "once in ADD",
1124                                                         el->name,
1125                                                         (int)duplicate->length,
1126                                                         duplicate->data,
1127                                                         ldb_dn_get_linearized(msg->dn));
1128                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1129                                                 goto done;
1130                                         }
1131                                 }
1132
1133                                 /* Now combine existing and new values to a new
1134                                    attribute record */
1135                                 vals = talloc_realloc(msg2->elements,
1136                                                       el2->values, struct ldb_val,
1137                                                       el2->num_values + el->num_values);
1138                                 if (vals == NULL) {
1139                                         ldb_oom(ldb);
1140                                         ret = LDB_ERR_OTHER;
1141                                         goto done;
1142                                 }
1143
1144                                 for (j=0; j<el->num_values; j++) {
1145                                         vals[el2->num_values + j] =
1146                                                 ldb_val_dup(vals, &el->values[j]);
1147                                 }
1148
1149                                 el2->values = vals;
1150                                 el2->num_values += el->num_values;
1151
1152                                 ret = ltdb_index_add_element(module, ltdb,
1153                                                              msg2, el);
1154                                 if (ret != LDB_SUCCESS) {
1155                                         goto done;
1156                                 }
1157                         }
1158
1159                         break;
1160
1161                 case LDB_FLAG_MOD_REPLACE:
1162
1163                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1164                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1165                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1166                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1167                                 goto done;
1168                         }
1169
1170                         /*
1171                          * We don't need to check this if we have been
1172                          * pre-screened by the repl_meta_data module
1173                          * in Samba, or someone else who can claim to
1174                          * know what they are doing. 
1175                          */
1176                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1177                                 struct ldb_val *duplicate = NULL;
1178
1179                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1180                                                                  &duplicate, 0);
1181                                 if (ret != LDB_SUCCESS) {
1182                                         goto done;
1183                                 }
1184                                 if (duplicate != NULL) {
1185                                         ldb_asprintf_errstring(
1186                                                 ldb,
1187                                                 "attribute '%s': value '%.*s' "
1188                                                 "on '%s' provided more than "
1189                                                 "once in REPLACE",
1190                                                 el->name,
1191                                                 (int)duplicate->length,
1192                                                 duplicate->data,
1193                                                 ldb_dn_get_linearized(msg2->dn));
1194                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1195                                         goto done;
1196                                 }
1197                         }
1198
1199                         /* Checks if element already exists */
1200                         idx = find_element(msg2, el->name);
1201                         if (idx != -1) {
1202                                 j = (unsigned int) idx;
1203                                 el2 = &(msg2->elements[j]);
1204
1205                                 /* we consider two elements to be
1206                                  * equal only if the order
1207                                  * matches. This allows dbcheck to
1208                                  * fix the ordering on attributes
1209                                  * where order matters, such as
1210                                  * objectClass
1211                                  */
1212                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1213                                         continue;
1214                                 }
1215
1216                                 /* Delete the attribute if it exists in the DB */
1217                                 if (msg_delete_attribute(module, ltdb,
1218                                                          msg2,
1219                                                          el->name) != 0) {
1220                                         ret = LDB_ERR_OTHER;
1221                                         goto done;
1222                                 }
1223                         }
1224
1225                         /* Recreate it with the new values */
1226                         if (ltdb_msg_add_element(msg2, el) != 0) {
1227                                 ret = LDB_ERR_OTHER;
1228                                 goto done;
1229                         }
1230
1231                         ret = ltdb_index_add_element(module, ltdb,
1232                                                      msg2, el);
1233                         if (ret != LDB_SUCCESS) {
1234                                 goto done;
1235                         }
1236
1237                         break;
1238
1239                 case LDB_FLAG_MOD_DELETE:
1240                         dn = ldb_dn_get_linearized(msg2->dn);
1241                         if (dn == NULL) {
1242                                 ret = LDB_ERR_OTHER;
1243                                 goto done;
1244                         }
1245
1246                         if (msg->elements[i].num_values == 0) {
1247                                 /* Delete the whole attribute */
1248                                 ret = msg_delete_attribute(module,
1249                                                            ltdb,
1250                                                            msg2,
1251                                                            msg->elements[i].name);
1252                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1253                                     control_permissive) {
1254                                         ret = LDB_SUCCESS;
1255                                 } else {
1256                                         ldb_asprintf_errstring(ldb,
1257                                                                "attribute '%s': no such attribute for delete on '%s'",
1258                                                                msg->elements[i].name, dn);
1259                                 }
1260                                 if (ret != LDB_SUCCESS) {
1261                                         goto done;
1262                                 }
1263                         } else {
1264                                 /* Delete specified values from an attribute */
1265                                 for (j=0; j < msg->elements[i].num_values; j++) {
1266                                         ret = msg_delete_element(module,
1267                                                                  ltdb,
1268                                                                  msg2,
1269                                                                  msg->elements[i].name,
1270                                                                  &msg->elements[i].values[j]);
1271                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1272                                             control_permissive) {
1273                                                 ret = LDB_SUCCESS;
1274                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1275                                                 ldb_asprintf_errstring(ldb,
1276                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1277                                                                        msg->elements[i].name, dn);
1278                                         }
1279                                         if (ret != LDB_SUCCESS) {
1280                                                 goto done;
1281                                         }
1282                                 }
1283                         }
1284                         break;
1285                 default:
1286                         ldb_asprintf_errstring(ldb,
1287                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1288                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1289                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1290                         ret = LDB_ERR_PROTOCOL_ERROR;
1291                         goto done;
1292                 }
1293         }
1294
1295         ret = ltdb_store(module, msg2, TDB_MODIFY);
1296         if (ret != LDB_SUCCESS) {
1297                 goto done;
1298         }
1299
1300         ret = ltdb_modified(module, msg2->dn);
1301         if (ret != LDB_SUCCESS) {
1302                 goto done;
1303         }
1304
1305 done:
1306         TALLOC_FREE(mem_ctx);
1307         return ret;
1308 }
1309
1310 /*
1311   modify a record
1312 */
1313 static int ltdb_modify(struct ltdb_context *ctx)
1314 {
1315         struct ldb_module *module = ctx->module;
1316         struct ldb_request *req = ctx->req;
1317         int ret = LDB_SUCCESS;
1318
1319         ret = ltdb_check_special_dn(module, req->op.mod.message);
1320         if (ret != LDB_SUCCESS) {
1321                 return ret;
1322         }
1323
1324         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1325
1326         if (ltdb_cache_load(module) != 0) {
1327                 return LDB_ERR_OPERATIONS_ERROR;
1328         }
1329
1330         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1331
1332         return ret;
1333 }
1334
1335 /*
1336   rename a record
1337 */
1338 static int ltdb_rename(struct ltdb_context *ctx)
1339 {
1340         struct ldb_module *module = ctx->module;
1341         void *data = ldb_module_get_private(module);
1342         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1343         struct ldb_request *req = ctx->req;
1344         struct ldb_message *msg;
1345         int ret = LDB_SUCCESS;
1346         TDB_DATA tdb_key, tdb_key_old;
1347         struct ldb_dn *db_dn;
1348
1349         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1350
1351         if (ltdb_cache_load(ctx->module) != 0) {
1352                 return LDB_ERR_OPERATIONS_ERROR;
1353         }
1354
1355         msg = ldb_msg_new(ctx);
1356         if (msg == NULL) {
1357                 return LDB_ERR_OPERATIONS_ERROR;
1358         }
1359
1360         /* we need to fetch the old record to re-add under the new name */
1361         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1362                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1363         if (ret != LDB_SUCCESS) {
1364                 /* not finding the old record is an error */
1365                 return ret;
1366         }
1367
1368         /* We need to, before changing the DB, check if the new DN
1369          * exists, so we can return this error to the caller with an
1370          * unmodified DB
1371          *
1372          * Even in GUID index mode we use ltdb_key_dn() as we are
1373          * trying to figure out if this is just a case rename
1374          */
1375         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1376         if (!tdb_key.dptr) {
1377                 talloc_free(msg);
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1382         if (!tdb_key_old.dptr) {
1383                 talloc_free(msg);
1384                 talloc_free(tdb_key.dptr);
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         /*
1389          * Only declare a conflict if the new DN already exists,
1390          * and it isn't a case change on the old DN
1391          */
1392         if (tdb_key_old.dsize != tdb_key.dsize
1393             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1394                 ret = ltdb_search_base(module, msg,
1395                                        req->op.rename.newdn,
1396                                        &db_dn);
1397                 if (ret == LDB_SUCCESS) {
1398                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1399                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1400                         ret = LDB_SUCCESS;
1401                 }
1402         }
1403
1404         /* finding the new record already in the DB is an error */
1405
1406         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1407                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1408                                        "Entry %s already exists",
1409                                        ldb_dn_get_linearized(req->op.rename.newdn));
1410         }
1411         if (ret != LDB_SUCCESS) {
1412                 talloc_free(tdb_key_old.dptr);
1413                 talloc_free(tdb_key.dptr);
1414                 talloc_free(msg);
1415                 return ret;
1416         }
1417
1418         talloc_free(tdb_key_old.dptr);
1419         talloc_free(tdb_key.dptr);
1420
1421         /* Always delete first then add, to avoid conflicts with
1422          * unique indexes. We rely on the transaction to make this
1423          * atomic
1424          */
1425         ret = ltdb_delete_internal(module, msg->dn);
1426         if (ret != LDB_SUCCESS) {
1427                 talloc_free(msg);
1428                 return ret;
1429         }
1430
1431         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1432         if (msg->dn == NULL) {
1433                 talloc_free(msg);
1434                 return LDB_ERR_OPERATIONS_ERROR;
1435         }
1436
1437         /* We don't check single value as we can have more than 1 with
1438          * deleted attributes. We could go through all elements but that's
1439          * maybe not the most efficient way
1440          */
1441         ret = ltdb_add_internal(module, ltdb, msg, false);
1442
1443         talloc_free(msg);
1444
1445         return ret;
1446 }
1447
1448 static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
1449 {
1450         return tdb_transaction_start(ltdb->tdb);
1451 }
1452
1453 static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
1454 {
1455         return tdb_transaction_cancel(ltdb->tdb);
1456 }
1457
1458 static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
1459 {
1460         return tdb_transaction_prepare_commit(ltdb->tdb);
1461 }
1462
1463 static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
1464 {
1465         return tdb_transaction_commit(ltdb->tdb);
1466 }
1467
1468 static int ltdb_start_trans(struct ldb_module *module)
1469 {
1470         void *data = ldb_module_get_private(module);
1471         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1472
1473         /* Do not take out the transaction lock on a read-only DB */
1474         if (ltdb->read_only) {
1475                 return LDB_ERR_UNWILLING_TO_PERFORM;
1476         }
1477
1478         if (ltdb->kv_ops->begin_write(ltdb) != 0) {
1479                 return ltdb->kv_ops->error(ltdb);
1480         }
1481
1482         ltdb->in_transaction++;
1483
1484         ltdb_index_transaction_start(module);
1485
1486         ltdb->reindex_failed = false;
1487
1488         return LDB_SUCCESS;
1489 }
1490
1491 /*
1492  * Forward declaration to allow prepare_commit to in fact abort the
1493  * transaction
1494  */
1495 static int ltdb_del_trans(struct ldb_module *module);
1496
1497 static int ltdb_prepare_commit(struct ldb_module *module)
1498 {
1499         int ret;
1500         void *data = ldb_module_get_private(module);
1501         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1502
1503         if (ltdb->in_transaction != 1) {
1504                 return LDB_SUCCESS;
1505         }
1506
1507         /*
1508          * Check if the last re-index failed.
1509          *
1510          * This can happen if for example a duplicate value was marked
1511          * unique.  We must not write a partial re-index into the DB.
1512          */
1513         if (ltdb->reindex_failed) {
1514                 /*
1515                  * We must instead abort the transaction so we get the
1516                  * old values and old index back
1517                  */
1518                 ltdb_del_trans(module);
1519                 ldb_set_errstring(ldb_module_get_ctx(module),
1520                                   "Failure during re-index, so "
1521                                   "transaction must be aborted.");
1522                 return LDB_ERR_OPERATIONS_ERROR;
1523         }
1524
1525         ret = ltdb_index_transaction_commit(module);
1526         if (ret != LDB_SUCCESS) {
1527                 ltdb->kv_ops->abort_write(ltdb);
1528                 ltdb->in_transaction--;
1529                 return ret;
1530         }
1531
1532         if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
1533                 ret = ltdb->kv_ops->error(ltdb);
1534                 ltdb->in_transaction--;
1535                 ldb_debug_set(ldb_module_get_ctx(module),
1536                               LDB_DEBUG_FATAL,
1537                               "Failure during "
1538                               "prepare_write): %s -> %s",
1539                               ltdb->kv_ops->errorstr(ltdb),
1540                               ldb_strerror(ret));
1541                 return ret;
1542         }
1543
1544         ltdb->prepared_commit = true;
1545
1546         return LDB_SUCCESS;
1547 }
1548
1549 static int ltdb_end_trans(struct ldb_module *module)
1550 {
1551         int ret;
1552         void *data = ldb_module_get_private(module);
1553         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1554
1555         if (!ltdb->prepared_commit) {
1556                 ret = ltdb_prepare_commit(module);
1557                 if (ret != LDB_SUCCESS) {
1558                         return ret;
1559                 }
1560         }
1561
1562         ltdb->in_transaction--;
1563         ltdb->prepared_commit = false;
1564
1565         if (ltdb->kv_ops->finish_write(ltdb) != 0) {
1566                 ret = ltdb->kv_ops->error(ltdb);
1567                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1568                                        "Failure during tdb_transaction_commit(): %s -> %s",
1569                                        ltdb->kv_ops->errorstr(ltdb),
1570                                        ldb_strerror(ret));
1571                 return ret;
1572         }
1573
1574         return LDB_SUCCESS;
1575 }
1576
1577 static int ltdb_del_trans(struct ldb_module *module)
1578 {
1579         void *data = ldb_module_get_private(module);
1580         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1581
1582         ltdb->in_transaction--;
1583
1584         if (ltdb_index_transaction_cancel(module) != 0) {
1585                 ltdb->kv_ops->abort_write(ltdb);
1586                 return ltdb->kv_ops->error(ltdb);
1587         }
1588
1589         ltdb->kv_ops->abort_write(ltdb);
1590         return LDB_SUCCESS;
1591 }
1592
1593 /*
1594   return sequenceNumber from @BASEINFO
1595 */
1596 static int ltdb_sequence_number(struct ltdb_context *ctx,
1597                                 struct ldb_extended **ext)
1598 {
1599         struct ldb_context *ldb;
1600         struct ldb_module *module = ctx->module;
1601         struct ldb_request *req = ctx->req;
1602         void *data = ldb_module_get_private(module);
1603         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1604         TALLOC_CTX *tmp_ctx = NULL;
1605         struct ldb_seqnum_request *seq;
1606         struct ldb_seqnum_result *res;
1607         struct ldb_message *msg = NULL;
1608         struct ldb_dn *dn;
1609         const char *date;
1610         int ret = LDB_SUCCESS;
1611
1612         ldb = ldb_module_get_ctx(module);
1613
1614         seq = talloc_get_type(req->op.extended.data,
1615                                 struct ldb_seqnum_request);
1616         if (seq == NULL) {
1617                 return LDB_ERR_OPERATIONS_ERROR;
1618         }
1619
1620         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1621
1622         if (ltdb->kv_ops->lock_read(module) != 0) {
1623                 return LDB_ERR_OPERATIONS_ERROR;
1624         }
1625
1626         res = talloc_zero(req, struct ldb_seqnum_result);
1627         if (res == NULL) {
1628                 ret = LDB_ERR_OPERATIONS_ERROR;
1629                 goto done;
1630         }
1631
1632         tmp_ctx = talloc_new(req);
1633         if (tmp_ctx == NULL) {
1634                 ret = LDB_ERR_OPERATIONS_ERROR;
1635                 goto done;
1636         }
1637
1638         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1639         if (dn == NULL) {
1640                 ret = LDB_ERR_OPERATIONS_ERROR;
1641                 goto done;
1642         }
1643
1644         msg = ldb_msg_new(tmp_ctx);
1645         if (msg == NULL) {
1646                 ret = LDB_ERR_OPERATIONS_ERROR;
1647                 goto done;
1648         }
1649
1650         ret = ltdb_search_dn1(module, dn, msg, 0);
1651         if (ret != LDB_SUCCESS) {
1652                 goto done;
1653         }
1654
1655         switch (seq->type) {
1656         case LDB_SEQ_HIGHEST_SEQ:
1657                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1658                 break;
1659         case LDB_SEQ_NEXT:
1660                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1661                 res->seq_num++;
1662                 break;
1663         case LDB_SEQ_HIGHEST_TIMESTAMP:
1664                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1665                 if (date) {
1666                         res->seq_num = ldb_string_to_time(date);
1667                 } else {
1668                         res->seq_num = 0;
1669                         /* zero is as good as anything when we don't know */
1670                 }
1671                 break;
1672         }
1673
1674         *ext = talloc_zero(req, struct ldb_extended);
1675         if (*ext == NULL) {
1676                 ret = LDB_ERR_OPERATIONS_ERROR;
1677                 goto done;
1678         }
1679         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1680         (*ext)->data = talloc_steal(*ext, res);
1681
1682 done:
1683         talloc_free(tmp_ctx);
1684
1685         ltdb->kv_ops->unlock_read(module);
1686         return ret;
1687 }
1688
1689 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1690 {
1691         struct ldb_context *ldb;
1692         struct ldb_request *req;
1693         struct ldb_reply *ares;
1694
1695         ldb = ldb_module_get_ctx(ctx->module);
1696         req = ctx->req;
1697
1698         /* if we already returned an error just return */
1699         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1700                 return;
1701         }
1702
1703         ares = talloc_zero(req, struct ldb_reply);
1704         if (!ares) {
1705                 ldb_oom(ldb);
1706                 req->callback(req, NULL);
1707                 return;
1708         }
1709         ares->type = LDB_REPLY_DONE;
1710         ares->error = error;
1711
1712         req->callback(req, ares);
1713 }
1714
1715 static void ltdb_timeout(struct tevent_context *ev,
1716                           struct tevent_timer *te,
1717                           struct timeval t,
1718                           void *private_data)
1719 {
1720         struct ltdb_context *ctx;
1721         ctx = talloc_get_type(private_data, struct ltdb_context);
1722
1723         if (!ctx->request_terminated) {
1724                 /* request is done now */
1725                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1726         }
1727
1728         if (ctx->spy) {
1729                 /* neutralize the spy */
1730                 ctx->spy->ctx = NULL;
1731                 ctx->spy = NULL;
1732         }
1733         talloc_free(ctx);
1734 }
1735
1736 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1737                                         struct ldb_extended *ext,
1738                                         int error)
1739 {
1740         struct ldb_context *ldb;
1741         struct ldb_request *req;
1742         struct ldb_reply *ares;
1743
1744         ldb = ldb_module_get_ctx(ctx->module);
1745         req = ctx->req;
1746
1747         /* if we already returned an error just return */
1748         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1749                 return;
1750         }
1751
1752         ares = talloc_zero(req, struct ldb_reply);
1753         if (!ares) {
1754                 ldb_oom(ldb);
1755                 req->callback(req, NULL);
1756                 return;
1757         }
1758         ares->type = LDB_REPLY_DONE;
1759         ares->response = ext;
1760         ares->error = error;
1761
1762         req->callback(req, ares);
1763 }
1764
1765 static void ltdb_handle_extended(struct ltdb_context *ctx)
1766 {
1767         struct ldb_extended *ext = NULL;
1768         int ret;
1769
1770         if (strcmp(ctx->req->op.extended.oid,
1771                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1772                 /* get sequence number */
1773                 ret = ltdb_sequence_number(ctx, &ext);
1774         } else {
1775                 /* not recognized */
1776                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1777         }
1778
1779         ltdb_request_extended_done(ctx, ext, ret);
1780 }
1781
1782 struct kv_ctx {
1783         ldb_kv_traverse_fn kv_traverse_fn;
1784         void *ctx;
1785         struct ltdb_private *ltdb;
1786         int (*parser)(struct ldb_val key,
1787                       struct ldb_val data,
1788                       void *private_data);
1789 };
1790
1791 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
1792 {
1793         struct kv_ctx *kv_ctx = ctx;
1794         struct ldb_val key = {
1795                 .length = tdb_key.dsize,
1796                 .data = tdb_key.dptr,
1797         };
1798         struct ldb_val data = {
1799                 .length = tdb_data.dsize,
1800                 .data = tdb_data.dptr,
1801         };
1802         return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
1803 }
1804
1805 static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
1806 {
1807         struct kv_ctx kv_ctx = {
1808                 .kv_traverse_fn = fn,
1809                 .ctx = ctx,
1810                 .ltdb = ltdb
1811         };
1812         if (ltdb->in_transaction != 0) {
1813                 return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1814         } else {
1815                 return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1816         }
1817 }
1818
1819 static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
1820                                       struct ldb_val ldb_key,
1821                                       struct ldb_val ldb_key2,
1822                                       struct ldb_val ldb_data, void *state)
1823 {
1824         int tdb_ret;
1825         struct ldb_context *ldb;
1826         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1827         struct ldb_module *module = ctx->module;
1828         TDB_DATA key = {
1829                 .dptr = ldb_key.data,
1830                 .dsize = ldb_key.length
1831         };
1832         TDB_DATA key2 = {
1833                 .dptr = ldb_key2.data,
1834                 .dsize = ldb_key2.length
1835         };
1836         TDB_DATA data = {
1837                 .dptr = ldb_data.data,
1838                 .dsize = ldb_data.length
1839         };
1840
1841         ldb = ldb_module_get_ctx(module);
1842
1843         tdb_ret = tdb_delete(ltdb->tdb, key);
1844         if (tdb_ret != 0) {
1845                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1846                           "Failed to delete %*.*s "
1847                           "for rekey as %*.*s: %s",
1848                           (int)key.dsize, (int)key.dsize,
1849                           (const char *)key.dptr,
1850                           (int)key2.dsize, (int)key2.dsize,
1851                           (const char *)key.dptr,
1852                           tdb_errorstr(ltdb->tdb));
1853                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1854                 return -1;
1855         }
1856         tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
1857         if (tdb_ret != 0) {
1858                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1859                           "Failed to rekey %*.*s as %*.*s: %s",
1860                           (int)key.dsize, (int)key.dsize,
1861                           (const char *)key.dptr,
1862                           (int)key2.dsize, (int)key2.dsize,
1863                           (const char *)key.dptr,
1864                           tdb_errorstr(ltdb->tdb));
1865                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1866                 return -1;
1867         }
1868         return tdb_ret;
1869 }
1870
1871 static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
1872                                          void *ctx)
1873 {
1874         struct kv_ctx *kv_ctx = ctx;
1875         struct ldb_val key = {
1876                 .length = tdb_key.dsize,
1877                 .data = tdb_key.dptr,
1878         };
1879         struct ldb_val data = {
1880                 .length = tdb_data.dsize,
1881                 .data = tdb_data.dptr,
1882         };
1883
1884         return kv_ctx->parser(key, data, kv_ctx->ctx);
1885 }
1886
1887 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
1888                                  struct ldb_val ldb_key,
1889                                  int (*parser)(struct ldb_val key,
1890                                                struct ldb_val data,
1891                                                void *private_data),
1892                                  void *ctx)
1893 {
1894         struct kv_ctx kv_ctx = {
1895                 .parser = parser,
1896                 .ctx = ctx,
1897                 .ltdb = ltdb
1898         };
1899         TDB_DATA key = {
1900                 .dptr = ldb_key.data,
1901                 .dsize = ldb_key.length
1902         };
1903         int ret;
1904
1905         ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
1906                                &kv_ctx);
1907         if (ret == 0) {
1908                 return LDB_SUCCESS;
1909         }
1910         return ltdb_err_map(tdb_error(ltdb->tdb));
1911 }
1912
1913 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1914 {
1915         return tdb_name(ltdb->tdb);
1916 }
1917
1918 static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
1919 {
1920         int seq = tdb_get_seqnum(ltdb->tdb);
1921         bool has_changed = (seq != ltdb->tdb_seqnum);
1922
1923         ltdb->tdb_seqnum = seq;
1924
1925         return has_changed;
1926 }
1927
1928 static const struct kv_db_ops key_value_ops = {
1929         .store = ltdb_tdb_store,
1930         .delete = ltdb_tdb_delete,
1931         .iterate = ltdb_tdb_traverse_fn,
1932         .update_in_iterate = ltdb_tdb_update_in_iterate,
1933         .fetch_and_parse = ltdb_tdb_parse_record,
1934         .lock_read = ltdb_lock_read,
1935         .unlock_read = ltdb_unlock_read,
1936         .begin_write = ltdb_tdb_transaction_start,
1937         .prepare_write = ltdb_tdb_transaction_prepare_commit,
1938         .finish_write = ltdb_tdb_transaction_commit,
1939         .abort_write = ltdb_tdb_transaction_cancel,
1940         .error = ltdb_error,
1941         .errorstr = ltdb_errorstr,
1942         .name = ltdb_tdb_name,
1943         .has_changed = ltdb_tdb_changed,
1944 };
1945
1946 static void ltdb_callback(struct tevent_context *ev,
1947                           struct tevent_timer *te,
1948                           struct timeval t,
1949                           void *private_data)
1950 {
1951         struct ltdb_context *ctx;
1952         int ret;
1953
1954         ctx = talloc_get_type(private_data, struct ltdb_context);
1955
1956         if (ctx->request_terminated) {
1957                 goto done;
1958         }
1959
1960         switch (ctx->req->operation) {
1961         case LDB_SEARCH:
1962                 ret = ltdb_search(ctx);
1963                 break;
1964         case LDB_ADD:
1965                 ret = ltdb_add(ctx);
1966                 break;
1967         case LDB_MODIFY:
1968                 ret = ltdb_modify(ctx);
1969                 break;
1970         case LDB_DELETE:
1971                 ret = ltdb_delete(ctx);
1972                 break;
1973         case LDB_RENAME:
1974                 ret = ltdb_rename(ctx);
1975                 break;
1976         case LDB_EXTENDED:
1977                 ltdb_handle_extended(ctx);
1978                 goto done;
1979         default:
1980                 /* no other op supported */
1981                 ret = LDB_ERR_PROTOCOL_ERROR;
1982         }
1983
1984         if (!ctx->request_terminated) {
1985                 /* request is done now */
1986                 ltdb_request_done(ctx, ret);
1987         }
1988
1989 done:
1990         if (ctx->spy) {
1991                 /* neutralize the spy */
1992                 ctx->spy->ctx = NULL;
1993                 ctx->spy = NULL;
1994         }
1995         talloc_free(ctx);
1996 }
1997
1998 static int ltdb_request_destructor(void *ptr)
1999 {
2000         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
2001
2002         if (spy->ctx != NULL) {
2003                 spy->ctx->spy = NULL;
2004                 spy->ctx->request_terminated = true;
2005                 spy->ctx = NULL;
2006         }
2007
2008         return 0;
2009 }
2010
2011 static int ltdb_handle_request(struct ldb_module *module,
2012                                struct ldb_request *req)
2013 {
2014         struct ldb_control *control_permissive;
2015         struct ldb_context *ldb;
2016         struct tevent_context *ev;
2017         struct ltdb_context *ac;
2018         struct tevent_timer *te;
2019         struct timeval tv;
2020         unsigned int i;
2021
2022         ldb = ldb_module_get_ctx(module);
2023
2024         control_permissive = ldb_request_get_control(req,
2025                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2026
2027         for (i = 0; req->controls && req->controls[i]; i++) {
2028                 if (req->controls[i]->critical &&
2029                     req->controls[i] != control_permissive) {
2030                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
2031                                                req->controls[i]->oid);
2032                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
2033                 }
2034         }
2035
2036         if (req->starttime == 0 || req->timeout == 0) {
2037                 ldb_set_errstring(ldb, "Invalid timeout settings");
2038                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
2039         }
2040
2041         ev = ldb_handle_get_event_context(req->handle);
2042
2043         ac = talloc_zero(ldb, struct ltdb_context);
2044         if (ac == NULL) {
2045                 ldb_oom(ldb);
2046                 return LDB_ERR_OPERATIONS_ERROR;
2047         }
2048
2049         ac->module = module;
2050         ac->req = req;
2051
2052         tv.tv_sec = 0;
2053         tv.tv_usec = 0;
2054         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
2055         if (NULL == te) {
2056                 talloc_free(ac);
2057                 return LDB_ERR_OPERATIONS_ERROR;
2058         }
2059
2060         if (req->timeout > 0) {
2061                 tv.tv_sec = req->starttime + req->timeout;
2062                 tv.tv_usec = 0;
2063                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
2064                                                      ltdb_timeout, ac);
2065                 if (NULL == ac->timeout_event) {
2066                         talloc_free(ac);
2067                         return LDB_ERR_OPERATIONS_ERROR;
2068                 }
2069         }
2070
2071         /* set a spy so that we do not try to use the request context
2072          * if it is freed before ltdb_callback fires */
2073         ac->spy = talloc(req, struct ltdb_req_spy);
2074         if (NULL == ac->spy) {
2075                 talloc_free(ac);
2076                 return LDB_ERR_OPERATIONS_ERROR;
2077         }
2078         ac->spy->ctx = ac;
2079
2080         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
2081
2082         return LDB_SUCCESS;
2083 }
2084
2085 static int ltdb_init_rootdse(struct ldb_module *module)
2086 {
2087         /* ignore errors on this - we expect it for non-sam databases */
2088         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2089
2090         /* there can be no module beyond the backend, just return */
2091         return LDB_SUCCESS;
2092 }
2093
2094
2095 static int generic_lock_read(struct ldb_module *module)
2096 {
2097         void *data = ldb_module_get_private(module);
2098         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2099         return ltdb->kv_ops->lock_read(module);
2100 }
2101
2102 static int generic_unlock_read(struct ldb_module *module)
2103 {
2104         void *data = ldb_module_get_private(module);
2105         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2106         return ltdb->kv_ops->unlock_read(module);
2107 }
2108
2109 static const struct ldb_module_ops ltdb_ops = {
2110         .name              = "tdb",
2111         .init_context      = ltdb_init_rootdse,
2112         .search            = ltdb_handle_request,
2113         .add               = ltdb_handle_request,
2114         .modify            = ltdb_handle_request,
2115         .del               = ltdb_handle_request,
2116         .rename            = ltdb_handle_request,
2117         .extended          = ltdb_handle_request,
2118         .start_transaction = ltdb_start_trans,
2119         .end_transaction   = ltdb_end_trans,
2120         .prepare_commit    = ltdb_prepare_commit,
2121         .del_transaction   = ltdb_del_trans,
2122         .read_lock         = generic_lock_read,
2123         .read_unlock       = generic_unlock_read,
2124 };
2125
2126 int init_store(struct ltdb_private *ltdb,
2127                       const char *name,
2128                       struct ldb_context *ldb,
2129                       const char *options[],
2130                       struct ldb_module **_module)
2131 {
2132         struct ldb_module *module;
2133
2134         if (getenv("LDB_WARN_UNINDEXED")) {
2135                 ltdb->warn_unindexed = true;
2136         }
2137
2138         if (getenv("LDB_WARN_REINDEX")) {
2139                 ltdb->warn_reindex = true;
2140         }
2141
2142         ltdb->sequence_number = 0;
2143
2144         module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
2145         if (!module) {
2146                 ldb_oom(ldb);
2147                 talloc_free(ltdb);
2148                 return LDB_ERR_OPERATIONS_ERROR;
2149         }
2150         ldb_module_set_private(module, ltdb);
2151         talloc_steal(module, ltdb);
2152
2153         if (ltdb_cache_load(module) != 0) {
2154                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2155                                        "records for backend '%s'", name);
2156                 talloc_free(module);
2157                 return LDB_ERR_OPERATIONS_ERROR;
2158         }
2159
2160         *_module = module;
2161         /*
2162          * Set or override the maximum key length
2163          *
2164          * The ldb_mdb code will have set this to 511, but our tests
2165          * set this even smaller (to make the tests more practical).
2166          *
2167          * This must only be used for the selftest as the length
2168          * becomes encoded in the index keys.
2169          */
2170         {
2171                 const char *len_str =
2172                         ldb_options_find(ldb, options,
2173                                          "max_key_len_for_self_test");
2174                 if (len_str != NULL) {
2175                         unsigned len = strtoul(len_str, NULL, 0);
2176                         ltdb->max_key_length = len;
2177                 }
2178         }
2179
2180         return LDB_SUCCESS;
2181 }
2182
2183 /*
2184   connect to the database
2185 */
2186 int ltdb_connect(struct ldb_context *ldb, const char *url,
2187                  unsigned int flags, const char *options[],
2188                  struct ldb_module **_module)
2189 {
2190         const char *path;
2191         int tdb_flags, open_flags;
2192         struct ltdb_private *ltdb;
2193
2194         /*
2195          * We hold locks, so we must use a private event context
2196          * on each returned handle
2197          */
2198         ldb_set_require_private_event_context(ldb);
2199
2200         /* parse the url */
2201         if (strchr(url, ':')) {
2202                 if (strncmp(url, "tdb://", 6) != 0) {
2203                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2204                                   "Invalid tdb URL '%s'", url);
2205                         return LDB_ERR_OPERATIONS_ERROR;
2206                 }
2207                 path = url+6;
2208         } else {
2209                 path = url;
2210         }
2211
2212         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
2213
2214         /* check for the 'nosync' option */
2215         if (flags & LDB_FLG_NOSYNC) {
2216                 tdb_flags |= TDB_NOSYNC;
2217         }
2218
2219         /* and nommap option */
2220         if (flags & LDB_FLG_NOMMAP) {
2221                 tdb_flags |= TDB_NOMMAP;
2222         }
2223
2224         ltdb = talloc_zero(ldb, struct ltdb_private);
2225         if (!ltdb) {
2226                 ldb_oom(ldb);
2227                 return LDB_ERR_OPERATIONS_ERROR;
2228         }
2229
2230         if (flags & LDB_FLG_RDONLY) {
2231                 /*
2232                  * This is weird, but because we can only have one tdb
2233                  * in this process, and the other one could be
2234                  * read-write, we can't use the tdb readonly.  Plus a
2235                  * read only tdb prohibits the all-record lock.
2236                  */
2237                 open_flags = O_RDWR;
2238
2239                 ltdb->read_only = true;
2240
2241         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
2242                 /*
2243                  * This is used by ldbsearch to prevent creation of the database
2244                  * if the name is wrong
2245                  */
2246                 open_flags = O_RDWR;
2247         } else {
2248                 /*
2249                  * This is the normal case
2250                  */
2251                 open_flags = O_CREAT | O_RDWR;
2252         }
2253
2254         ltdb->kv_ops = &key_value_ops;
2255
2256         /* note that we use quite a large default hash size */
2257         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
2258                                    tdb_flags, open_flags,
2259                                    ldb_get_create_perms(ldb), ldb);
2260         if (!ltdb->tdb) {
2261                 ldb_asprintf_errstring(ldb,
2262                                        "Unable to open tdb '%s': %s", path, strerror(errno));
2263                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2264                           "Unable to open tdb '%s': %s", path, strerror(errno));
2265                 talloc_free(ltdb);
2266                 if (errno == EACCES || errno == EPERM) {
2267                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
2268                 }
2269                 return LDB_ERR_OPERATIONS_ERROR;
2270         }
2271
2272         return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
2273 }