ldb_tdb: Build a key value operation library
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 static int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 static int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         return ret;
414 }
415
416 static int ltdb_tdb_store(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags)
417 {
418         return tdb_store(ltdb->tdb, key, data, flags);
419 }
420
421 static int ltdb_error(struct ltdb_private *ltdb)
422 {
423         return ltdb_err_map(tdb_error(ltdb->tdb));
424 }
425
426 static const char *ltdb_errorstr(struct ltdb_private *ltdb)
427 {
428         return tdb_errorstr(ltdb->tdb);
429 }
430
431 /*
432   store a record into the db
433 */
434 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
435 {
436         void *data = ldb_module_get_private(module);
437         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
438         TDB_DATA tdb_key, tdb_data;
439         struct ldb_val ldb_data;
440         int ret = LDB_SUCCESS;
441         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
442
443         if (tdb_key_ctx == NULL) {
444                 return ldb_module_oom(module);
445         }
446
447         if (ltdb->read_only) {
448                 return LDB_ERR_UNWILLING_TO_PERFORM;
449         }
450
451         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
452         if (tdb_key.dptr == NULL) {
453                 TALLOC_FREE(tdb_key_ctx);
454                 return LDB_ERR_OTHER;
455         }
456
457         ret = ldb_pack_data(ldb_module_get_ctx(module),
458                             msg, &ldb_data);
459         if (ret == -1) {
460                 TALLOC_FREE(tdb_key_ctx);
461                 return LDB_ERR_OTHER;
462         }
463
464         tdb_data.dptr = ldb_data.data;
465         tdb_data.dsize = ldb_data.length;
466
467         ret = ltdb->kv_ops->store(ltdb, tdb_key, tdb_data, flgs);
468         if (ret != 0) {
469                 bool is_special = ldb_dn_is_special(msg->dn);
470                 ret = ltdb->kv_ops->error(ltdb);
471
472                 /*
473                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
474                  * the GUID, so re-map
475                  */
476                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
477                     && !is_special
478                     && ltdb->cache->GUID_index_attribute != NULL) {
479                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
480                 }
481                 goto done;
482         }
483
484 done:
485         TALLOC_FREE(tdb_key_ctx);
486         talloc_free(ldb_data.data);
487
488         return ret;
489 }
490
491
492 /*
493   check if a attribute is a single valued, for a given element
494  */
495 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
496                                   struct ldb_message_element *el)
497 {
498         if (!a) return false;
499         if (el != NULL) {
500                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
501                         /* override from a ldb module, for example
502                            used for the description field, which is
503                            marked multi-valued in the schema but which
504                            should not actually accept multiple
505                            values */
506                         return true;
507                 }
508                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
509                         /* override from a ldb module, for example used for
510                            deleted linked attribute entries */
511                         return false;
512                 }
513         }
514         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
515                 return true;
516         }
517         return false;
518 }
519
520 static int ltdb_add_internal(struct ldb_module *module,
521                              struct ltdb_private *ltdb,
522                              const struct ldb_message *msg,
523                              bool check_single_value)
524 {
525         struct ldb_context *ldb = ldb_module_get_ctx(module);
526         int ret = LDB_SUCCESS;
527         unsigned int i;
528
529         for (i=0;i<msg->num_elements;i++) {
530                 struct ldb_message_element *el = &msg->elements[i];
531                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
532
533                 if (el->num_values == 0) {
534                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
535                                                el->name, ldb_dn_get_linearized(msg->dn));
536                         return LDB_ERR_CONSTRAINT_VIOLATION;
537                 }
538                 if (check_single_value &&
539                                 el->num_values > 1 &&
540                                 ldb_tdb_single_valued(a, el)) {
541                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
542                                                el->name, ldb_dn_get_linearized(msg->dn));
543                         return LDB_ERR_CONSTRAINT_VIOLATION;
544                 }
545
546                 /* Do not check "@ATTRIBUTES" for duplicated values */
547                 if (ldb_dn_is_special(msg->dn) &&
548                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
549                         continue;
550                 }
551
552                 if (check_single_value &&
553                     !(el->flags &
554                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
555                         struct ldb_val *duplicate = NULL;
556
557                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
558                                                          el, &duplicate, 0);
559                         if (ret != LDB_SUCCESS) {
560                                 return ret;
561                         }
562                         if (duplicate != NULL) {
563                                 ldb_asprintf_errstring(
564                                         ldb,
565                                         "attribute '%s': value '%.*s' on '%s' "
566                                         "provided more than once in ADD object",
567                                         el->name,
568                                         (int)duplicate->length,
569                                         duplicate->data,
570                                         ldb_dn_get_linearized(msg->dn));
571                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
572                         }
573                 }
574         }
575
576         ret = ltdb_store(module, msg, TDB_INSERT);
577         if (ret != LDB_SUCCESS) {
578                 /*
579                  * Try really hard to get the right error code for
580                  * a re-add situation, as this can matter!
581                  */
582                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
583                         int ret2;
584                         struct ldb_dn *dn2 = NULL;
585                         TALLOC_CTX *mem_ctx = talloc_new(module);
586                         if (mem_ctx == NULL) {
587                                 return ldb_module_operr(module);
588                         }
589                         ret2 = ltdb_search_base(module, module,
590                                                 msg->dn, &dn2);
591                         TALLOC_FREE(mem_ctx);
592                         if (ret2 == LDB_SUCCESS) {
593                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
594                         }
595                 }
596                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
597                         ldb_asprintf_errstring(ldb,
598                                                "Entry %s already exists",
599                                                ldb_dn_get_linearized(msg->dn));
600                 }
601                 return ret;
602         }
603
604         ret = ltdb_index_add_new(module, ltdb, msg);
605         if (ret != LDB_SUCCESS) {
606                 /*
607                  * If we failed to index, delete the message again.
608                  *
609                  * This is particularly important for the GUID index
610                  * case, which will only fail for a duplicate DN
611                  * in the index add.
612                  *
613                  * Note that the caller may not cancel the transation
614                  * and this means the above add might really show up!
615                  */
616                 ltdb_delete_noindex(module, msg);
617                 return ret;
618         }
619
620         ret = ltdb_modified(module, msg->dn);
621
622         return ret;
623 }
624
625 /*
626   add a record to the database
627 */
628 static int ltdb_add(struct ltdb_context *ctx)
629 {
630         struct ldb_module *module = ctx->module;
631         struct ldb_request *req = ctx->req;
632         void *data = ldb_module_get_private(module);
633         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
634         int ret = LDB_SUCCESS;
635
636         ret = ltdb_check_special_dn(module, req->op.add.message);
637         if (ret != LDB_SUCCESS) {
638                 return ret;
639         }
640
641         ldb_request_set_state(req, LDB_ASYNC_PENDING);
642
643         if (ltdb_cache_load(module) != 0) {
644                 return LDB_ERR_OPERATIONS_ERROR;
645         }
646
647         ret = ltdb_add_internal(module, ltdb,
648                                 req->op.add.message, true);
649
650         return ret;
651 }
652
653 static int ltdb_tdb_delete(struct ltdb_private *ltdb, TDB_DATA tdb_key)
654 {
655         return tdb_delete(ltdb->tdb, tdb_key);
656 }
657
658 /*
659   delete a record from the database, not updating indexes (used for deleting
660   index records)
661 */
662 int ltdb_delete_noindex(struct ldb_module *module,
663                         const struct ldb_message *msg)
664 {
665         void *data = ldb_module_get_private(module);
666         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
667         TDB_DATA tdb_key;
668         int ret;
669         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
670
671         if (tdb_key_ctx == NULL) {
672                 return ldb_module_oom(module);
673         }
674
675         if (ltdb->read_only) {
676                 return LDB_ERR_UNWILLING_TO_PERFORM;
677         }
678
679         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
680         if (!tdb_key.dptr) {
681                 TALLOC_FREE(tdb_key_ctx);
682                 return LDB_ERR_OTHER;
683         }
684
685         ret = ltdb->kv_ops->delete(ltdb, tdb_key);
686         TALLOC_FREE(tdb_key_ctx);
687
688         if (ret != 0) {
689                 ret = ltdb->kv_ops->error(ltdb);
690         }
691
692         return ret;
693 }
694
695 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
696 {
697         struct ldb_message *msg;
698         int ret = LDB_SUCCESS;
699
700         msg = ldb_msg_new(module);
701         if (msg == NULL) {
702                 return LDB_ERR_OPERATIONS_ERROR;
703         }
704
705         /* in case any attribute of the message was indexed, we need
706            to fetch the old record */
707         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
708         if (ret != LDB_SUCCESS) {
709                 /* not finding the old record is an error */
710                 goto done;
711         }
712
713         ret = ltdb_delete_noindex(module, msg);
714         if (ret != LDB_SUCCESS) {
715                 goto done;
716         }
717
718         /* remove any indexed attributes */
719         ret = ltdb_index_delete(module, msg);
720         if (ret != LDB_SUCCESS) {
721                 goto done;
722         }
723
724         ret = ltdb_modified(module, dn);
725         if (ret != LDB_SUCCESS) {
726                 goto done;
727         }
728
729 done:
730         talloc_free(msg);
731         return ret;
732 }
733
734 /*
735   delete a record from the database
736 */
737 static int ltdb_delete(struct ltdb_context *ctx)
738 {
739         struct ldb_module *module = ctx->module;
740         struct ldb_request *req = ctx->req;
741         int ret = LDB_SUCCESS;
742
743         ldb_request_set_state(req, LDB_ASYNC_PENDING);
744
745         if (ltdb_cache_load(module) != 0) {
746                 return LDB_ERR_OPERATIONS_ERROR;
747         }
748
749         ret = ltdb_delete_internal(module, req->op.del.dn);
750
751         return ret;
752 }
753
754 /*
755   find an element by attribute name. At the moment this does a linear search,
756   it should be re-coded to use a binary search once all places that modify
757   records guarantee sorted order
758
759   return the index of the first matching element if found, otherwise -1
760 */
761 static int find_element(const struct ldb_message *msg, const char *name)
762 {
763         unsigned int i;
764         for (i=0;i<msg->num_elements;i++) {
765                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
766                         return i;
767                 }
768         }
769         return -1;
770 }
771
772
773 /*
774   add an element to an existing record. Assumes a elements array that we
775   can call re-alloc on, and assumed that we can re-use the data pointers from
776   the passed in additional values. Use with care!
777
778   returns 0 on success, -1 on failure (and sets errno)
779 */
780 static int ltdb_msg_add_element(struct ldb_message *msg,
781                                 struct ldb_message_element *el)
782 {
783         struct ldb_message_element *e2;
784         unsigned int i;
785
786         if (el->num_values == 0) {
787                 /* nothing to do here - we don't add empty elements */
788                 return 0;
789         }
790
791         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
792                               msg->num_elements+1);
793         if (!e2) {
794                 errno = ENOMEM;
795                 return -1;
796         }
797
798         msg->elements = e2;
799
800         e2 = &msg->elements[msg->num_elements];
801
802         e2->name = el->name;
803         e2->flags = el->flags;
804         e2->values = talloc_array(msg->elements,
805                                   struct ldb_val, el->num_values);
806         if (!e2->values) {
807                 errno = ENOMEM;
808                 return -1;
809         }
810         for (i=0;i<el->num_values;i++) {
811                 e2->values[i] = el->values[i];
812         }
813         e2->num_values = el->num_values;
814
815         ++msg->num_elements;
816
817         return 0;
818 }
819
820 /*
821   delete all elements having a specified attribute name
822 */
823 static int msg_delete_attribute(struct ldb_module *module,
824                                 struct ltdb_private *ltdb,
825                                 struct ldb_message *msg, const char *name)
826 {
827         unsigned int i;
828         int ret;
829         struct ldb_message_element *el;
830         bool is_special = ldb_dn_is_special(msg->dn);
831
832         if (!is_special
833             && ltdb->cache->GUID_index_attribute != NULL
834             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
835                 struct ldb_context *ldb = ldb_module_get_ctx(module);
836                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
837                                        "attribute %s (used as DB index)",
838                                        ltdb->cache->GUID_index_attribute);
839                 return LDB_ERR_CONSTRAINT_VIOLATION;
840         }
841
842         el = ldb_msg_find_element(msg, name);
843         if (el == NULL) {
844                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
845         }
846         i = el - msg->elements;
847
848         ret = ltdb_index_del_element(module, ltdb, msg, el);
849         if (ret != LDB_SUCCESS) {
850                 return ret;
851         }
852
853         talloc_free(el->values);
854         if (msg->num_elements > (i+1)) {
855                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
856         }
857         msg->num_elements--;
858         msg->elements = talloc_realloc(msg, msg->elements,
859                                        struct ldb_message_element,
860                                        msg->num_elements);
861         return LDB_SUCCESS;
862 }
863
864 /*
865   delete all elements matching an attribute name/value
866
867   return LDB Error on failure
868 */
869 static int msg_delete_element(struct ldb_module *module,
870                               struct ltdb_private *ltdb,
871                               struct ldb_message *msg,
872                               const char *name,
873                               const struct ldb_val *val)
874 {
875         struct ldb_context *ldb = ldb_module_get_ctx(module);
876         unsigned int i;
877         int found, ret;
878         struct ldb_message_element *el;
879         const struct ldb_schema_attribute *a;
880
881         found = find_element(msg, name);
882         if (found == -1) {
883                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
884         }
885
886         i = (unsigned int) found;
887         el = &(msg->elements[i]);
888
889         a = ldb_schema_attribute_by_name(ldb, el->name);
890
891         for (i=0;i<el->num_values;i++) {
892                 bool matched;
893                 if (a->syntax->operator_fn) {
894                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
895                                                      &el->values[i], val, &matched);
896                         if (ret != LDB_SUCCESS) return ret;
897                 } else {
898                         matched = (a->syntax->comparison_fn(ldb, ldb,
899                                                             &el->values[i], val) == 0);
900                 }
901                 if (matched) {
902                         if (el->num_values == 1) {
903                                 return msg_delete_attribute(module,
904                                                             ltdb, msg, name);
905                         }
906
907                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
908                         if (ret != LDB_SUCCESS) {
909                                 return ret;
910                         }
911
912                         if (i<el->num_values-1) {
913                                 memmove(&el->values[i], &el->values[i+1],
914                                         sizeof(el->values[i])*
915                                                 (el->num_values-(i+1)));
916                         }
917                         el->num_values--;
918
919                         /* per definition we find in a canonicalised message an
920                            attribute value only once. So we are finished here */
921                         return LDB_SUCCESS;
922                 }
923         }
924
925         /* Not found */
926         return LDB_ERR_NO_SUCH_ATTRIBUTE;
927 }
928
929 /*
930   modify a record - internal interface
931
932   yuck - this is O(n^2). Luckily n is usually small so we probably
933   get away with it, but if we ever have really large attribute lists
934   then we'll need to look at this again
935
936   'req' is optional, and is used to specify controls if supplied
937 */
938 int ltdb_modify_internal(struct ldb_module *module,
939                          const struct ldb_message *msg,
940                          struct ldb_request *req)
941 {
942         struct ldb_context *ldb = ldb_module_get_ctx(module);
943         void *data = ldb_module_get_private(module);
944         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
945         struct ldb_message *msg2;
946         unsigned int i, j;
947         int ret = LDB_SUCCESS, idx;
948         struct ldb_control *control_permissive = NULL;
949         TALLOC_CTX *mem_ctx = talloc_new(req);
950
951         if (mem_ctx == NULL) {
952                 return ldb_module_oom(module);
953         }
954         
955         if (req) {
956                 control_permissive = ldb_request_get_control(req,
957                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
958         }
959
960         msg2 = ldb_msg_new(mem_ctx);
961         if (msg2 == NULL) {
962                 ret = LDB_ERR_OTHER;
963                 goto done;
964         }
965
966         ret = ltdb_search_dn1(module, msg->dn,
967                               msg2,
968                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
969         if (ret != LDB_SUCCESS) {
970                 goto done;
971         }
972
973         for (i=0; i<msg->num_elements; i++) {
974                 struct ldb_message_element *el = &msg->elements[i], *el2;
975                 struct ldb_val *vals;
976                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
977                 const char *dn;
978                 uint32_t options = 0;
979                 if (control_permissive != NULL) {
980                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
981                 }
982
983                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
984                 case LDB_FLAG_MOD_ADD:
985
986                         if (el->num_values == 0) {
987                                 ldb_asprintf_errstring(ldb,
988                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
989                                                        el->name, ldb_dn_get_linearized(msg2->dn));
990                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
991                                 goto done;
992                         }
993
994                         /* make a copy of the array so that a permissive
995                          * control can remove duplicates without changing the
996                          * original values, but do not copy data as we do not
997                          * need to keep it around once the operation is
998                          * finished */
999                         if (control_permissive) {
1000                                 el = talloc(msg2, struct ldb_message_element);
1001                                 if (!el) {
1002                                         ret = LDB_ERR_OTHER;
1003                                         goto done;
1004                                 }
1005                                 *el = msg->elements[i];
1006                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1007                                 if (el->values == NULL) {
1008                                         ret = LDB_ERR_OTHER;
1009                                         goto done;
1010                                 }
1011                                 for (j = 0; j < el->num_values; j++) {
1012                                         el->values[j] = msg->elements[i].values[j];
1013                                 }
1014                         }
1015
1016                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1017                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1018                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1019                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1020                                 goto done;
1021                         }
1022
1023                         /* Checks if element already exists */
1024                         idx = find_element(msg2, el->name);
1025                         if (idx == -1) {
1026                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1027                                         ret = LDB_ERR_OTHER;
1028                                         goto done;
1029                                 }
1030                                 ret = ltdb_index_add_element(module, ltdb,
1031                                                              msg2,
1032                                                              el);
1033                                 if (ret != LDB_SUCCESS) {
1034                                         goto done;
1035                                 }
1036                         } else {
1037                                 j = (unsigned int) idx;
1038                                 el2 = &(msg2->elements[j]);
1039
1040                                 /* We cannot add another value on a existing one
1041                                    if the attribute is single-valued */
1042                                 if (ldb_tdb_single_valued(a, el)) {
1043                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1044                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1045                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1046                                         goto done;
1047                                 }
1048
1049                                 /* Check that values don't exist yet on multi-
1050                                    valued attributes or aren't provided twice */
1051                                 if (!(el->flags &
1052                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1053                                         struct ldb_val *duplicate = NULL;
1054                                         ret = ldb_msg_find_common_values(ldb,
1055                                                                          msg2,
1056                                                                          el,
1057                                                                          el2,
1058                                                                          options);
1059
1060                                         if (ret ==
1061                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1062                                                 ldb_asprintf_errstring(ldb,
1063                                                         "attribute '%s': value "
1064                                                         "#%u on '%s' already "
1065                                                         "exists", el->name, j,
1066                                                         ldb_dn_get_linearized(msg2->dn));
1067                                                 goto done;
1068                                         } else if (ret != LDB_SUCCESS) {
1069                                                 goto done;
1070                                         }
1071
1072                                         ret = ldb_msg_find_duplicate_val(
1073                                                 ldb, msg2, el, &duplicate, 0);
1074                                         if (ret != LDB_SUCCESS) {
1075                                                 goto done;
1076                                         }
1077                                         if (duplicate != NULL) {
1078                                                 ldb_asprintf_errstring(
1079                                                         ldb,
1080                                                         "attribute '%s': value "
1081                                                         "'%.*s' on '%s' "
1082                                                         "provided more than "
1083                                                         "once in ADD",
1084                                                         el->name,
1085                                                         (int)duplicate->length,
1086                                                         duplicate->data,
1087                                                         ldb_dn_get_linearized(msg->dn));
1088                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1089                                                 goto done;
1090                                         }
1091                                 }
1092
1093                                 /* Now combine existing and new values to a new
1094                                    attribute record */
1095                                 vals = talloc_realloc(msg2->elements,
1096                                                       el2->values, struct ldb_val,
1097                                                       el2->num_values + el->num_values);
1098                                 if (vals == NULL) {
1099                                         ldb_oom(ldb);
1100                                         ret = LDB_ERR_OTHER;
1101                                         goto done;
1102                                 }
1103
1104                                 for (j=0; j<el->num_values; j++) {
1105                                         vals[el2->num_values + j] =
1106                                                 ldb_val_dup(vals, &el->values[j]);
1107                                 }
1108
1109                                 el2->values = vals;
1110                                 el2->num_values += el->num_values;
1111
1112                                 ret = ltdb_index_add_element(module, ltdb,
1113                                                              msg2, el);
1114                                 if (ret != LDB_SUCCESS) {
1115                                         goto done;
1116                                 }
1117                         }
1118
1119                         break;
1120
1121                 case LDB_FLAG_MOD_REPLACE:
1122
1123                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1124                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1125                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1126                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1127                                 goto done;
1128                         }
1129
1130                         /*
1131                          * We don't need to check this if we have been
1132                          * pre-screened by the repl_meta_data module
1133                          * in Samba, or someone else who can claim to
1134                          * know what they are doing. 
1135                          */
1136                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1137                                 struct ldb_val *duplicate = NULL;
1138
1139                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1140                                                                  &duplicate, 0);
1141                                 if (ret != LDB_SUCCESS) {
1142                                         goto done;
1143                                 }
1144                                 if (duplicate != NULL) {
1145                                         ldb_asprintf_errstring(
1146                                                 ldb,
1147                                                 "attribute '%s': value '%.*s' "
1148                                                 "on '%s' provided more than "
1149                                                 "once in REPLACE",
1150                                                 el->name,
1151                                                 (int)duplicate->length,
1152                                                 duplicate->data,
1153                                                 ldb_dn_get_linearized(msg2->dn));
1154                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1155                                         goto done;
1156                                 }
1157                         }
1158
1159                         /* Checks if element already exists */
1160                         idx = find_element(msg2, el->name);
1161                         if (idx != -1) {
1162                                 j = (unsigned int) idx;
1163                                 el2 = &(msg2->elements[j]);
1164
1165                                 /* we consider two elements to be
1166                                  * equal only if the order
1167                                  * matches. This allows dbcheck to
1168                                  * fix the ordering on attributes
1169                                  * where order matters, such as
1170                                  * objectClass
1171                                  */
1172                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1173                                         continue;
1174                                 }
1175
1176                                 /* Delete the attribute if it exists in the DB */
1177                                 if (msg_delete_attribute(module, ltdb,
1178                                                          msg2,
1179                                                          el->name) != 0) {
1180                                         ret = LDB_ERR_OTHER;
1181                                         goto done;
1182                                 }
1183                         }
1184
1185                         /* Recreate it with the new values */
1186                         if (ltdb_msg_add_element(msg2, el) != 0) {
1187                                 ret = LDB_ERR_OTHER;
1188                                 goto done;
1189                         }
1190
1191                         ret = ltdb_index_add_element(module, ltdb,
1192                                                      msg2, el);
1193                         if (ret != LDB_SUCCESS) {
1194                                 goto done;
1195                         }
1196
1197                         break;
1198
1199                 case LDB_FLAG_MOD_DELETE:
1200                         dn = ldb_dn_get_linearized(msg2->dn);
1201                         if (dn == NULL) {
1202                                 ret = LDB_ERR_OTHER;
1203                                 goto done;
1204                         }
1205
1206                         if (msg->elements[i].num_values == 0) {
1207                                 /* Delete the whole attribute */
1208                                 ret = msg_delete_attribute(module,
1209                                                            ltdb,
1210                                                            msg2,
1211                                                            msg->elements[i].name);
1212                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1213                                     control_permissive) {
1214                                         ret = LDB_SUCCESS;
1215                                 } else {
1216                                         ldb_asprintf_errstring(ldb,
1217                                                                "attribute '%s': no such attribute for delete on '%s'",
1218                                                                msg->elements[i].name, dn);
1219                                 }
1220                                 if (ret != LDB_SUCCESS) {
1221                                         goto done;
1222                                 }
1223                         } else {
1224                                 /* Delete specified values from an attribute */
1225                                 for (j=0; j < msg->elements[i].num_values; j++) {
1226                                         ret = msg_delete_element(module,
1227                                                                  ltdb,
1228                                                                  msg2,
1229                                                                  msg->elements[i].name,
1230                                                                  &msg->elements[i].values[j]);
1231                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1232                                             control_permissive) {
1233                                                 ret = LDB_SUCCESS;
1234                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1235                                                 ldb_asprintf_errstring(ldb,
1236                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1237                                                                        msg->elements[i].name, dn);
1238                                         }
1239                                         if (ret != LDB_SUCCESS) {
1240                                                 goto done;
1241                                         }
1242                                 }
1243                         }
1244                         break;
1245                 default:
1246                         ldb_asprintf_errstring(ldb,
1247                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1248                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1249                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1250                         ret = LDB_ERR_PROTOCOL_ERROR;
1251                         goto done;
1252                 }
1253         }
1254
1255         ret = ltdb_store(module, msg2, TDB_MODIFY);
1256         if (ret != LDB_SUCCESS) {
1257                 goto done;
1258         }
1259
1260         ret = ltdb_modified(module, msg2->dn);
1261         if (ret != LDB_SUCCESS) {
1262                 goto done;
1263         }
1264
1265 done:
1266         TALLOC_FREE(mem_ctx);
1267         return ret;
1268 }
1269
1270 /*
1271   modify a record
1272 */
1273 static int ltdb_modify(struct ltdb_context *ctx)
1274 {
1275         struct ldb_module *module = ctx->module;
1276         struct ldb_request *req = ctx->req;
1277         int ret = LDB_SUCCESS;
1278
1279         ret = ltdb_check_special_dn(module, req->op.mod.message);
1280         if (ret != LDB_SUCCESS) {
1281                 return ret;
1282         }
1283
1284         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1285
1286         if (ltdb_cache_load(module) != 0) {
1287                 return LDB_ERR_OPERATIONS_ERROR;
1288         }
1289
1290         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1291
1292         return ret;
1293 }
1294
1295 /*
1296   rename a record
1297 */
1298 static int ltdb_rename(struct ltdb_context *ctx)
1299 {
1300         struct ldb_module *module = ctx->module;
1301         void *data = ldb_module_get_private(module);
1302         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1303         struct ldb_request *req = ctx->req;
1304         struct ldb_message *msg;
1305         int ret = LDB_SUCCESS;
1306         TDB_DATA tdb_key, tdb_key_old;
1307         struct ldb_dn *db_dn;
1308
1309         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1310
1311         if (ltdb_cache_load(ctx->module) != 0) {
1312                 return LDB_ERR_OPERATIONS_ERROR;
1313         }
1314
1315         msg = ldb_msg_new(ctx);
1316         if (msg == NULL) {
1317                 return LDB_ERR_OPERATIONS_ERROR;
1318         }
1319
1320         /* we need to fetch the old record to re-add under the new name */
1321         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1322                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1323         if (ret != LDB_SUCCESS) {
1324                 /* not finding the old record is an error */
1325                 return ret;
1326         }
1327
1328         /* We need to, before changing the DB, check if the new DN
1329          * exists, so we can return this error to the caller with an
1330          * unmodified DB
1331          *
1332          * Even in GUID index mode we use ltdb_key_dn() as we are
1333          * trying to figure out if this is just a case rename
1334          */
1335         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1336         if (!tdb_key.dptr) {
1337                 talloc_free(msg);
1338                 return LDB_ERR_OPERATIONS_ERROR;
1339         }
1340
1341         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1342         if (!tdb_key_old.dptr) {
1343                 talloc_free(msg);
1344                 talloc_free(tdb_key.dptr);
1345                 return LDB_ERR_OPERATIONS_ERROR;
1346         }
1347
1348         /*
1349          * Only declare a conflict if the new DN already exists,
1350          * and it isn't a case change on the old DN
1351          */
1352         if (tdb_key_old.dsize != tdb_key.dsize
1353             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1354                 ret = ltdb_search_base(module, msg,
1355                                        req->op.rename.newdn,
1356                                        &db_dn);
1357                 if (ret == LDB_SUCCESS) {
1358                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1359                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1360                         ret = LDB_SUCCESS;
1361                 }
1362         }
1363
1364         /* finding the new record already in the DB is an error */
1365
1366         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1367                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1368                                        "Entry %s already exists",
1369                                        ldb_dn_get_linearized(req->op.rename.newdn));
1370         }
1371         if (ret != LDB_SUCCESS) {
1372                 talloc_free(tdb_key_old.dptr);
1373                 talloc_free(tdb_key.dptr);
1374                 talloc_free(msg);
1375                 return ret;
1376         }
1377
1378         talloc_free(tdb_key_old.dptr);
1379         talloc_free(tdb_key.dptr);
1380
1381         /* Always delete first then add, to avoid conflicts with
1382          * unique indexes. We rely on the transaction to make this
1383          * atomic
1384          */
1385         ret = ltdb_delete_internal(module, msg->dn);
1386         if (ret != LDB_SUCCESS) {
1387                 talloc_free(msg);
1388                 return ret;
1389         }
1390
1391         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1392         if (msg->dn == NULL) {
1393                 talloc_free(msg);
1394                 return LDB_ERR_OPERATIONS_ERROR;
1395         }
1396
1397         /* We don't check single value as we can have more than 1 with
1398          * deleted attributes. We could go through all elements but that's
1399          * maybe not the most efficient way
1400          */
1401         ret = ltdb_add_internal(module, ltdb, msg, false);
1402
1403         talloc_free(msg);
1404
1405         return ret;
1406 }
1407
1408 static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
1409 {
1410         return tdb_transaction_start(ltdb->tdb);
1411 }
1412
1413 static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
1414 {
1415         return tdb_transaction_cancel(ltdb->tdb);
1416 }
1417
1418 static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
1419 {
1420         return tdb_transaction_prepare_commit(ltdb->tdb);
1421 }
1422
1423 static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
1424 {
1425         return tdb_transaction_commit(ltdb->tdb);
1426 }
1427
1428 static int ltdb_start_trans(struct ldb_module *module)
1429 {
1430         void *data = ldb_module_get_private(module);
1431         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1432
1433         /* Do not take out the transaction lock on a read-only DB */
1434         if (ltdb->read_only) {
1435                 return LDB_ERR_UNWILLING_TO_PERFORM;
1436         }
1437
1438         if (ltdb->kv_ops->begin_write(ltdb) != 0) {
1439                 return ltdb->kv_ops->error(ltdb);
1440         }
1441
1442         ltdb->in_transaction++;
1443
1444         ltdb_index_transaction_start(module);
1445
1446         return LDB_SUCCESS;
1447 }
1448
1449 static int ltdb_prepare_commit(struct ldb_module *module)
1450 {
1451         int ret;
1452         void *data = ldb_module_get_private(module);
1453         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1454
1455         if (ltdb->in_transaction != 1) {
1456                 return LDB_SUCCESS;
1457         }
1458
1459         ret = ltdb_index_transaction_commit(module);
1460         if (ret != LDB_SUCCESS) {
1461                 ltdb->kv_ops->abort_write(ltdb);
1462                 ltdb->in_transaction--;
1463                 return ret;
1464         }
1465
1466         if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
1467                 ret = ltdb->kv_ops->error(ltdb);
1468                 ltdb->in_transaction--;
1469                 ldb_debug_set(ldb_module_get_ctx(module),
1470                               LDB_DEBUG_FATAL,
1471                               "Failure during "
1472                               "prepare_write): %s -> %s",
1473                               ltdb->kv_ops->errorstr(ltdb),
1474                               ldb_strerror(ret));
1475                 return ret;
1476         }
1477
1478         ltdb->prepared_commit = true;
1479
1480         return LDB_SUCCESS;
1481 }
1482
1483 static int ltdb_end_trans(struct ldb_module *module)
1484 {
1485         int ret;
1486         void *data = ldb_module_get_private(module);
1487         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1488
1489         if (!ltdb->prepared_commit) {
1490                 ret = ltdb_prepare_commit(module);
1491                 if (ret != LDB_SUCCESS) {
1492                         return ret;
1493                 }
1494         }
1495
1496         ltdb->in_transaction--;
1497         ltdb->prepared_commit = false;
1498
1499         if (ltdb->kv_ops->finish_write(ltdb) != 0) {
1500                 ret = ltdb->kv_ops->error(ltdb);
1501                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1502                                        "Failure during tdb_transaction_commit(): %s -> %s",
1503                                        ltdb->kv_ops->errorstr(ltdb),
1504                                        ldb_strerror(ret));
1505                 return ret;
1506         }
1507
1508         return LDB_SUCCESS;
1509 }
1510
1511 static int ltdb_del_trans(struct ldb_module *module)
1512 {
1513         void *data = ldb_module_get_private(module);
1514         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1515
1516         ltdb->in_transaction--;
1517
1518         if (ltdb_index_transaction_cancel(module) != 0) {
1519                 ltdb->kv_ops->abort_write(ltdb);
1520                 return ltdb->kv_ops->error(ltdb);
1521         }
1522
1523         ltdb->kv_ops->abort_write(ltdb);
1524         return LDB_SUCCESS;
1525 }
1526
1527 /*
1528   return sequenceNumber from @BASEINFO
1529 */
1530 static int ltdb_sequence_number(struct ltdb_context *ctx,
1531                                 struct ldb_extended **ext)
1532 {
1533         struct ldb_context *ldb;
1534         struct ldb_module *module = ctx->module;
1535         struct ldb_request *req = ctx->req;
1536         void *data = ldb_module_get_private(module);
1537         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1538         TALLOC_CTX *tmp_ctx = NULL;
1539         struct ldb_seqnum_request *seq;
1540         struct ldb_seqnum_result *res;
1541         struct ldb_message *msg = NULL;
1542         struct ldb_dn *dn;
1543         const char *date;
1544         int ret = LDB_SUCCESS;
1545
1546         ldb = ldb_module_get_ctx(module);
1547
1548         seq = talloc_get_type(req->op.extended.data,
1549                                 struct ldb_seqnum_request);
1550         if (seq == NULL) {
1551                 return LDB_ERR_OPERATIONS_ERROR;
1552         }
1553
1554         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1555
1556         if (ltdb->kv_ops->lock_read(module) != 0) {
1557                 return LDB_ERR_OPERATIONS_ERROR;
1558         }
1559
1560         res = talloc_zero(req, struct ldb_seqnum_result);
1561         if (res == NULL) {
1562                 ret = LDB_ERR_OPERATIONS_ERROR;
1563                 goto done;
1564         }
1565
1566         tmp_ctx = talloc_new(req);
1567         if (tmp_ctx == NULL) {
1568                 ret = LDB_ERR_OPERATIONS_ERROR;
1569                 goto done;
1570         }
1571
1572         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1573         if (dn == NULL) {
1574                 ret = LDB_ERR_OPERATIONS_ERROR;
1575                 goto done;
1576         }
1577
1578         msg = ldb_msg_new(tmp_ctx);
1579         if (msg == NULL) {
1580                 ret = LDB_ERR_OPERATIONS_ERROR;
1581                 goto done;
1582         }
1583
1584         ret = ltdb_search_dn1(module, dn, msg, 0);
1585         if (ret != LDB_SUCCESS) {
1586                 goto done;
1587         }
1588
1589         switch (seq->type) {
1590         case LDB_SEQ_HIGHEST_SEQ:
1591                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1592                 break;
1593         case LDB_SEQ_NEXT:
1594                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1595                 res->seq_num++;
1596                 break;
1597         case LDB_SEQ_HIGHEST_TIMESTAMP:
1598                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1599                 if (date) {
1600                         res->seq_num = ldb_string_to_time(date);
1601                 } else {
1602                         res->seq_num = 0;
1603                         /* zero is as good as anything when we don't know */
1604                 }
1605                 break;
1606         }
1607
1608         *ext = talloc_zero(req, struct ldb_extended);
1609         if (*ext == NULL) {
1610                 ret = LDB_ERR_OPERATIONS_ERROR;
1611                 goto done;
1612         }
1613         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1614         (*ext)->data = talloc_steal(*ext, res);
1615
1616 done:
1617         talloc_free(tmp_ctx);
1618
1619         ltdb->kv_ops->unlock_read(module);
1620         return ret;
1621 }
1622
1623 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1624 {
1625         struct ldb_context *ldb;
1626         struct ldb_request *req;
1627         struct ldb_reply *ares;
1628
1629         ldb = ldb_module_get_ctx(ctx->module);
1630         req = ctx->req;
1631
1632         /* if we already returned an error just return */
1633         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1634                 return;
1635         }
1636
1637         ares = talloc_zero(req, struct ldb_reply);
1638         if (!ares) {
1639                 ldb_oom(ldb);
1640                 req->callback(req, NULL);
1641                 return;
1642         }
1643         ares->type = LDB_REPLY_DONE;
1644         ares->error = error;
1645
1646         req->callback(req, ares);
1647 }
1648
1649 static void ltdb_timeout(struct tevent_context *ev,
1650                           struct tevent_timer *te,
1651                           struct timeval t,
1652                           void *private_data)
1653 {
1654         struct ltdb_context *ctx;
1655         ctx = talloc_get_type(private_data, struct ltdb_context);
1656
1657         if (!ctx->request_terminated) {
1658                 /* request is done now */
1659                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1660         }
1661
1662         if (ctx->spy) {
1663                 /* neutralize the spy */
1664                 ctx->spy->ctx = NULL;
1665                 ctx->spy = NULL;
1666         }
1667         talloc_free(ctx);
1668 }
1669
1670 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1671                                         struct ldb_extended *ext,
1672                                         int error)
1673 {
1674         struct ldb_context *ldb;
1675         struct ldb_request *req;
1676         struct ldb_reply *ares;
1677
1678         ldb = ldb_module_get_ctx(ctx->module);
1679         req = ctx->req;
1680
1681         /* if we already returned an error just return */
1682         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1683                 return;
1684         }
1685
1686         ares = talloc_zero(req, struct ldb_reply);
1687         if (!ares) {
1688                 ldb_oom(ldb);
1689                 req->callback(req, NULL);
1690                 return;
1691         }
1692         ares->type = LDB_REPLY_DONE;
1693         ares->response = ext;
1694         ares->error = error;
1695
1696         req->callback(req, ares);
1697 }
1698
1699 static void ltdb_handle_extended(struct ltdb_context *ctx)
1700 {
1701         struct ldb_extended *ext = NULL;
1702         int ret;
1703
1704         if (strcmp(ctx->req->op.extended.oid,
1705                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1706                 /* get sequence number */
1707                 ret = ltdb_sequence_number(ctx, &ext);
1708         } else {
1709                 /* not recognized */
1710                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1711         }
1712
1713         ltdb_request_extended_done(ctx, ext, ret);
1714 }
1715
1716 struct kv_ctx {
1717         ldb_kv_traverse_fn kv_traverse_fn;
1718         void *ctx;
1719         struct ltdb_private *ltdb;
1720 };
1721
1722 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
1723 {
1724         struct kv_ctx *kv_ctx = ctx;
1725         struct ldb_val key = {
1726                 .length = tdb_key.dsize,
1727                 .data = tdb_key.dptr,
1728         };
1729         struct ldb_val data = {
1730                 .length = tdb_data.dsize,
1731                 .data = tdb_data.dptr,
1732         };
1733         return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
1734 }
1735
1736 static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
1737 {
1738         struct kv_ctx kv_ctx = {
1739                 .kv_traverse_fn = fn,
1740                 .ctx = ctx,
1741                 .ltdb = ltdb
1742         };
1743         if (ltdb->in_transaction != 0) {
1744                 return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1745         } else {
1746                 return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1747         }
1748 }
1749
1750 static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA key2, TDB_DATA data, void *state)
1751 {
1752         int tdb_ret;
1753         struct ldb_context *ldb;
1754         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1755         struct ldb_module *module = ctx->module;
1756
1757         ldb = ldb_module_get_ctx(module);
1758
1759         tdb_ret = tdb_delete(ltdb->tdb, key);
1760         if (tdb_ret != 0) {
1761                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1762                           "Failed to delete %*.*s "
1763                           "for rekey as %*.*s: %s",
1764                           (int)key.dsize, (int)key.dsize,
1765                           (const char *)key.dptr,
1766                           (int)key2.dsize, (int)key2.dsize,
1767                           (const char *)key.dptr,
1768                           tdb_errorstr(ltdb->tdb));
1769                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1770                 return -1;
1771         }
1772         tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
1773         if (tdb_ret != 0) {
1774                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1775                           "Failed to rekey %*.*s as %*.*s: %s",
1776                           (int)key.dsize, (int)key.dsize,
1777                           (const char *)key.dptr,
1778                           (int)key2.dsize, (int)key2.dsize,
1779                           (const char *)key.dptr,
1780                           tdb_errorstr(ltdb->tdb));
1781                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1782                 return -1;
1783         }
1784         return tdb_ret;
1785 }
1786
1787 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb, TDB_DATA key,
1788                                  int (*parser)(TDB_DATA key, TDB_DATA data,
1789                                                void *private_data),
1790                                  void *ctx)
1791 {
1792         return tdb_parse_record(ltdb->tdb, key, parser, ctx);
1793 }
1794
1795 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1796 {
1797         return tdb_name(ltdb->tdb);
1798 }
1799
1800 static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
1801 {
1802         bool ret = (tdb_get_seqnum(ltdb->tdb) != ltdb->tdb_seqnum);
1803
1804         ltdb->tdb_seqnum = tdb_get_seqnum(ltdb->tdb);
1805
1806         return ret;
1807 }
1808
1809 static const struct kv_db_ops key_value_ops = {
1810         .store = ltdb_tdb_store,
1811         .delete = ltdb_tdb_delete,
1812         .iterate = ltdb_tdb_traverse_fn,
1813         .update_in_iterate = ltdb_tdb_update_in_iterate,
1814         .fetch_and_parse = ltdb_tdb_parse_record,
1815         .lock_read = ltdb_lock_read,
1816         .unlock_read = ltdb_unlock_read,
1817         .begin_write = ltdb_tdb_transaction_start,
1818         .prepare_write = ltdb_tdb_transaction_prepare_commit,
1819         .finish_write = ltdb_tdb_transaction_commit,
1820         .abort_write = ltdb_tdb_transaction_cancel,
1821         .error = ltdb_error,
1822         .errorstr = ltdb_errorstr,
1823         .name = ltdb_tdb_name,
1824         .has_changed = ltdb_tdb_changed,
1825 };
1826
1827 static void ltdb_callback(struct tevent_context *ev,
1828                           struct tevent_timer *te,
1829                           struct timeval t,
1830                           void *private_data)
1831 {
1832         struct ltdb_context *ctx;
1833         int ret;
1834
1835         ctx = talloc_get_type(private_data, struct ltdb_context);
1836
1837         if (ctx->request_terminated) {
1838                 goto done;
1839         }
1840
1841         switch (ctx->req->operation) {
1842         case LDB_SEARCH:
1843                 ret = ltdb_search(ctx);
1844                 break;
1845         case LDB_ADD:
1846                 ret = ltdb_add(ctx);
1847                 break;
1848         case LDB_MODIFY:
1849                 ret = ltdb_modify(ctx);
1850                 break;
1851         case LDB_DELETE:
1852                 ret = ltdb_delete(ctx);
1853                 break;
1854         case LDB_RENAME:
1855                 ret = ltdb_rename(ctx);
1856                 break;
1857         case LDB_EXTENDED:
1858                 ltdb_handle_extended(ctx);
1859                 goto done;
1860         default:
1861                 /* no other op supported */
1862                 ret = LDB_ERR_PROTOCOL_ERROR;
1863         }
1864
1865         if (!ctx->request_terminated) {
1866                 /* request is done now */
1867                 ltdb_request_done(ctx, ret);
1868         }
1869
1870 done:
1871         if (ctx->spy) {
1872                 /* neutralize the spy */
1873                 ctx->spy->ctx = NULL;
1874                 ctx->spy = NULL;
1875         }
1876         talloc_free(ctx);
1877 }
1878
1879 static int ltdb_request_destructor(void *ptr)
1880 {
1881         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1882
1883         if (spy->ctx != NULL) {
1884                 spy->ctx->spy = NULL;
1885                 spy->ctx->request_terminated = true;
1886                 spy->ctx = NULL;
1887         }
1888
1889         return 0;
1890 }
1891
1892 static int ltdb_handle_request(struct ldb_module *module,
1893                                struct ldb_request *req)
1894 {
1895         struct ldb_control *control_permissive;
1896         struct ldb_context *ldb;
1897         struct tevent_context *ev;
1898         struct ltdb_context *ac;
1899         struct tevent_timer *te;
1900         struct timeval tv;
1901         unsigned int i;
1902
1903         ldb = ldb_module_get_ctx(module);
1904
1905         control_permissive = ldb_request_get_control(req,
1906                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1907
1908         for (i = 0; req->controls && req->controls[i]; i++) {
1909                 if (req->controls[i]->critical &&
1910                     req->controls[i] != control_permissive) {
1911                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1912                                                req->controls[i]->oid);
1913                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1914                 }
1915         }
1916
1917         if (req->starttime == 0 || req->timeout == 0) {
1918                 ldb_set_errstring(ldb, "Invalid timeout settings");
1919                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1920         }
1921
1922         ev = ldb_handle_get_event_context(req->handle);
1923
1924         ac = talloc_zero(ldb, struct ltdb_context);
1925         if (ac == NULL) {
1926                 ldb_oom(ldb);
1927                 return LDB_ERR_OPERATIONS_ERROR;
1928         }
1929
1930         ac->module = module;
1931         ac->req = req;
1932
1933         tv.tv_sec = 0;
1934         tv.tv_usec = 0;
1935         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1936         if (NULL == te) {
1937                 talloc_free(ac);
1938                 return LDB_ERR_OPERATIONS_ERROR;
1939         }
1940
1941         if (req->timeout > 0) {
1942                 tv.tv_sec = req->starttime + req->timeout;
1943                 tv.tv_usec = 0;
1944                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1945                                                      ltdb_timeout, ac);
1946                 if (NULL == ac->timeout_event) {
1947                         talloc_free(ac);
1948                         return LDB_ERR_OPERATIONS_ERROR;
1949                 }
1950         }
1951
1952         /* set a spy so that we do not try to use the request context
1953          * if it is freed before ltdb_callback fires */
1954         ac->spy = talloc(req, struct ltdb_req_spy);
1955         if (NULL == ac->spy) {
1956                 talloc_free(ac);
1957                 return LDB_ERR_OPERATIONS_ERROR;
1958         }
1959         ac->spy->ctx = ac;
1960
1961         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1962
1963         return LDB_SUCCESS;
1964 }
1965
1966 static int ltdb_init_rootdse(struct ldb_module *module)
1967 {
1968         /* ignore errors on this - we expect it for non-sam databases */
1969         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1970
1971         /* there can be no module beyond the backend, just return */
1972         return LDB_SUCCESS;
1973 }
1974
1975
1976 static int generic_lock_read(struct ldb_module *module)
1977 {
1978         void *data = ldb_module_get_private(module);
1979         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1980         return ltdb->kv_ops->lock_read(module);
1981 }
1982
1983 static int generic_unlock_read(struct ldb_module *module)
1984 {
1985         void *data = ldb_module_get_private(module);
1986         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1987         return ltdb->kv_ops->unlock_read(module);
1988 }
1989
1990 static const struct ldb_module_ops ltdb_ops = {
1991         .name              = "tdb",
1992         .init_context      = ltdb_init_rootdse,
1993         .search            = ltdb_handle_request,
1994         .add               = ltdb_handle_request,
1995         .modify            = ltdb_handle_request,
1996         .del               = ltdb_handle_request,
1997         .rename            = ltdb_handle_request,
1998         .extended          = ltdb_handle_request,
1999         .start_transaction = ltdb_start_trans,
2000         .end_transaction   = ltdb_end_trans,
2001         .prepare_commit    = ltdb_prepare_commit,
2002         .del_transaction   = ltdb_del_trans,
2003         .read_lock         = generic_lock_read,
2004         .read_unlock       = generic_unlock_read,
2005 };
2006
2007 int init_store(struct ltdb_private *ltdb,
2008                       const char *name,
2009                       struct ldb_context *ldb,
2010                       const char *options[],
2011                       struct ldb_module **_module)
2012 {
2013         struct ldb_module *module;
2014
2015         if (getenv("LDB_WARN_UNINDEXED")) {
2016                 ltdb->warn_unindexed = true;
2017         }
2018
2019         if (getenv("LDB_WARN_REINDEX")) {
2020                 ltdb->warn_reindex = true;
2021         }
2022
2023         ltdb->sequence_number = 0;
2024
2025         module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
2026         if (!module) {
2027                 ldb_oom(ldb);
2028                 talloc_free(ltdb);
2029                 return LDB_ERR_OPERATIONS_ERROR;
2030         }
2031         ldb_module_set_private(module, ltdb);
2032         talloc_steal(module, ltdb);
2033
2034         if (ltdb_cache_load(module) != 0) {
2035                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2036                                        "records for backend '%s'", name);
2037                 talloc_free(module);
2038                 return LDB_ERR_OPERATIONS_ERROR;
2039         }
2040
2041         *_module = module;
2042         /*
2043          * Set the maximum key length
2044          */
2045         {
2046                 const char *len_str =
2047                         ldb_options_find(ldb, options,
2048                                          "max_key_len_for_self_test");
2049                 if (len_str != NULL) {
2050                         unsigned len = strtoul(len_str, NULL, 0);
2051                         ltdb->max_key_length = len;
2052                 }
2053         }
2054
2055         return LDB_SUCCESS;
2056 }
2057
2058 /*
2059   connect to the database
2060 */
2061 int ltdb_connect(struct ldb_context *ldb, const char *url,
2062                  unsigned int flags, const char *options[],
2063                  struct ldb_module **_module)
2064 {
2065         const char *path;
2066         int tdb_flags, open_flags;
2067         struct ltdb_private *ltdb;
2068
2069         /*
2070          * We hold locks, so we must use a private event context
2071          * on each returned handle
2072          */
2073         ldb_set_require_private_event_context(ldb);
2074
2075         /* parse the url */
2076         if (strchr(url, ':')) {
2077                 if (strncmp(url, "tdb://", 6) != 0) {
2078                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2079                                   "Invalid tdb URL '%s'", url);
2080                         return LDB_ERR_OPERATIONS_ERROR;
2081                 }
2082                 path = url+6;
2083         } else {
2084                 path = url;
2085         }
2086
2087         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
2088
2089         /* check for the 'nosync' option */
2090         if (flags & LDB_FLG_NOSYNC) {
2091                 tdb_flags |= TDB_NOSYNC;
2092         }
2093
2094         /* and nommap option */
2095         if (flags & LDB_FLG_NOMMAP) {
2096                 tdb_flags |= TDB_NOMMAP;
2097         }
2098
2099         ltdb = talloc_zero(ldb, struct ltdb_private);
2100         if (!ltdb) {
2101                 ldb_oom(ldb);
2102                 return LDB_ERR_OPERATIONS_ERROR;
2103         }
2104
2105         if (flags & LDB_FLG_RDONLY) {
2106                 /*
2107                  * This is weird, but because we can only have one tdb
2108                  * in this process, and the other one could be
2109                  * read-write, we can't use the tdb readonly.  Plus a
2110                  * read only tdb prohibits the all-record lock.
2111                  */
2112                 open_flags = O_RDWR;
2113
2114                 ltdb->read_only = true;
2115
2116         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
2117                 /*
2118                  * This is used by ldbsearch to prevent creation of the database
2119                  * if the name is wrong
2120                  */
2121                 open_flags = O_RDWR;
2122         } else {
2123                 /*
2124                  * This is the normal case
2125                  */
2126                 open_flags = O_CREAT | O_RDWR;
2127         }
2128
2129         ltdb->kv_ops = &key_value_ops;
2130
2131         /* note that we use quite a large default hash size */
2132         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
2133                                    tdb_flags, open_flags,
2134                                    ldb_get_create_perms(ldb), ldb);
2135         if (!ltdb->tdb) {
2136                 ldb_asprintf_errstring(ldb,
2137                                        "Unable to open tdb '%s': %s", path, strerror(errno));
2138                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2139                           "Unable to open tdb '%s': %s", path, strerror(errno));
2140                 talloc_free(ltdb);
2141                 if (errno == EACCES || errno == EPERM) {
2142                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
2143                 }
2144                 return LDB_ERR_OPERATIONS_ERROR;
2145         }
2146
2147         return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
2148 }