ldb_tdb: Re-add of both existing DN and GUID must gives LDB_ERR_ENTRY_ALREADY_EXISTS
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 TDB_DATA ltdb_guid_to_key(struct ldb_module *module,
225                           struct ltdb_private *ltdb,
226                           TALLOC_CTX *mem_ctx,
227                           const struct ldb_val *GUID_val)
228 {
229         TDB_DATA key;
230         const char *GUID_prefix = "GUID=";
231         const int GUID_prefix_len = strlen(GUID_prefix);
232
233         key.dptr = talloc_size(mem_ctx,
234                                GUID_val->length+GUID_prefix_len);
235
236         if (key.dptr == NULL) {
237                 errno = ENOMEM;
238                 key.dptr = NULL;
239                 key.dsize = 0;
240                 return key;
241         }
242         memcpy(key.dptr, "GUID=", GUID_prefix_len);
243         memcpy(&key.dptr[GUID_prefix_len],
244                GUID_val->data, GUID_val->length);
245
246         key.dsize = talloc_get_size(key.dptr);
247         return key;
248 }
249
250 TDB_DATA ltdb_idx_to_key(struct ldb_module *module,
251                          struct ltdb_private *ltdb,
252                          TALLOC_CTX *mem_ctx,
253                          const struct ldb_val *idx_val)
254 {
255         TDB_DATA key;
256         struct ldb_context *ldb = ldb_module_get_ctx(module);
257         struct ldb_dn *dn;
258
259         if (ltdb->cache->GUID_index_attribute != NULL) {
260                 return ltdb_guid_to_key(module, ltdb, mem_ctx, idx_val);
261         }
262
263         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
264         if (dn == NULL) {
265                 errno = EINVAL;
266                 key.dptr = NULL;
267                 key.dsize = 0;
268                 return key;
269         }
270         /* form the key */
271         key = ltdb_key_dn(module, mem_ctx, dn);
272         TALLOC_FREE(dn);
273         if (!key.dptr) {
274                 errno = ENOMEM;
275                 key.dptr = NULL;
276                 key.dsize = 0;
277                 return key;
278         }
279         return key;
280 }
281
282 /*
283   form a TDB_DATA for a record key
284   caller frees mem_ctx, which may or may not have the key
285   as a child.
286
287   note that the key for a record can depend on whether a
288   GUID index is in use, or the DN is used as the key
289 */
290 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
291                       const struct ldb_message *msg)
292 {
293         void *data = ldb_module_get_private(module);
294         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
295         TDB_DATA key;
296         const struct ldb_val *guid_val;
297
298         if (ltdb->cache->GUID_index_attribute == NULL) {
299                 return ltdb_key_dn(module, mem_ctx, msg->dn);
300         }
301
302         if (ldb_dn_is_special(msg->dn)) {
303                 return ltdb_key_dn(module, mem_ctx, msg->dn);
304         }
305
306         guid_val = ldb_msg_find_ldb_val(msg,
307                                        ltdb->cache->GUID_index_attribute);
308         if (guid_val == NULL) {
309                 errno = EINVAL;
310                 key.dptr = NULL;
311                 key.dsize = 0;
312                 return key;
313         }
314
315         return ltdb_guid_to_key(module, ltdb, mem_ctx, guid_val);
316
317 }
318
319 /*
320   check special dn's have valid attributes
321   currently only @ATTRIBUTES is checked
322 */
323 static int ltdb_check_special_dn(struct ldb_module *module,
324                                  const struct ldb_message *msg)
325 {
326         struct ldb_context *ldb = ldb_module_get_ctx(module);
327         unsigned int i, j;
328
329         if (! ldb_dn_is_special(msg->dn) ||
330             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
331                 return LDB_SUCCESS;
332         }
333
334         /* we have @ATTRIBUTES, let's check attributes are fine */
335         /* should we check that we deny multivalued attributes ? */
336         for (i = 0; i < msg->num_elements; i++) {
337                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
338
339                 for (j = 0; j < msg->elements[i].num_values; j++) {
340                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
341                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
342                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
343                         }
344                 }
345         }
346
347         return LDB_SUCCESS;
348 }
349
350
351 /*
352   we've made a modification to a dn - possibly reindex and
353   update sequence number
354 */
355 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
356 {
357         int ret = LDB_SUCCESS;
358         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
359
360         /* only allow modifies inside a transaction, otherwise the
361          * ldb is unsafe */
362         if (ltdb->in_transaction == 0) {
363                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
364                 return LDB_ERR_OPERATIONS_ERROR;
365         }
366
367         if (ldb_dn_is_special(dn) &&
368             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
369              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
370         {
371                 if (ltdb->warn_reindex) {
372                         ldb_debug(ldb_module_get_ctx(module),
373                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
374                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
375                 }
376                 ret = ltdb_reindex(module);
377         }
378
379         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
380         if (ret == LDB_SUCCESS &&
381             !(ldb_dn_is_special(dn) &&
382               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
383                 ret = ltdb_increase_sequence_number(module);
384         }
385
386         /* If the modify was to @OPTIONS, reload the cache */
387         if (ret == LDB_SUCCESS &&
388             ldb_dn_is_special(dn) &&
389             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
390                 ret = ltdb_cache_reload(module);
391         }
392
393         return ret;
394 }
395
396 /*
397   store a record into the db
398 */
399 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
400 {
401         void *data = ldb_module_get_private(module);
402         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
403         TDB_DATA tdb_key, tdb_data;
404         struct ldb_val ldb_data;
405         int ret = LDB_SUCCESS;
406         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
407
408         if (tdb_key_ctx == NULL) {
409                 return ldb_module_oom(module);
410         }
411
412         if (ltdb->read_only) {
413                 return LDB_ERR_UNWILLING_TO_PERFORM;
414         }
415
416         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
417         if (tdb_key.dptr == NULL) {
418                 TALLOC_FREE(tdb_key_ctx);
419                 return LDB_ERR_OTHER;
420         }
421
422         ret = ldb_pack_data(ldb_module_get_ctx(module),
423                             msg, &ldb_data);
424         if (ret == -1) {
425                 TALLOC_FREE(tdb_key_ctx);
426                 return LDB_ERR_OTHER;
427         }
428
429         tdb_data.dptr = ldb_data.data;
430         tdb_data.dsize = ldb_data.length;
431
432         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
433         if (ret != 0) {
434                 bool is_special = ldb_dn_is_special(msg->dn);
435                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
436
437                 /*
438                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
439                  * the GUID, so re-map
440                  */
441                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
442                     && !is_special
443                     && ltdb->cache->GUID_index_attribute != NULL) {
444                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
445                 }
446                 goto done;
447         }
448
449 done:
450         TALLOC_FREE(tdb_key_ctx);
451         talloc_free(ldb_data.data);
452
453         return ret;
454 }
455
456
457 /*
458   check if a attribute is a single valued, for a given element
459  */
460 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
461                                   struct ldb_message_element *el)
462 {
463         if (!a) return false;
464         if (el != NULL) {
465                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
466                         /* override from a ldb module, for example
467                            used for the description field, which is
468                            marked multi-valued in the schema but which
469                            should not actually accept multiple
470                            values */
471                         return true;
472                 }
473                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
474                         /* override from a ldb module, for example used for
475                            deleted linked attribute entries */
476                         return false;
477                 }
478         }
479         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
480                 return true;
481         }
482         return false;
483 }
484
485 static int ltdb_add_internal(struct ldb_module *module,
486                              struct ltdb_private *ltdb,
487                              const struct ldb_message *msg,
488                              bool check_single_value)
489 {
490         struct ldb_context *ldb = ldb_module_get_ctx(module);
491         int ret = LDB_SUCCESS;
492         unsigned int i;
493
494         for (i=0;i<msg->num_elements;i++) {
495                 struct ldb_message_element *el = &msg->elements[i];
496                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
497
498                 if (el->num_values == 0) {
499                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
500                                                el->name, ldb_dn_get_linearized(msg->dn));
501                         return LDB_ERR_CONSTRAINT_VIOLATION;
502                 }
503                 if (check_single_value &&
504                                 el->num_values > 1 &&
505                                 ldb_tdb_single_valued(a, el)) {
506                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
507                                                el->name, ldb_dn_get_linearized(msg->dn));
508                         return LDB_ERR_CONSTRAINT_VIOLATION;
509                 }
510
511                 /* Do not check "@ATTRIBUTES" for duplicated values */
512                 if (ldb_dn_is_special(msg->dn) &&
513                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
514                         continue;
515                 }
516
517                 if (check_single_value &&
518                     !(el->flags &
519                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
520                         struct ldb_val *duplicate = NULL;
521
522                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
523                                                          el, &duplicate, 0);
524                         if (ret != LDB_SUCCESS) {
525                                 return ret;
526                         }
527                         if (duplicate != NULL) {
528                                 ldb_asprintf_errstring(
529                                         ldb,
530                                         "attribute '%s': value '%.*s' on '%s' "
531                                         "provided more than once in ADD object",
532                                         el->name,
533                                         (int)duplicate->length,
534                                         duplicate->data,
535                                         ldb_dn_get_linearized(msg->dn));
536                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
537                         }
538                 }
539         }
540
541         ret = ltdb_store(module, msg, TDB_INSERT);
542         if (ret != LDB_SUCCESS) {
543                 /*
544                  * Try really hard to get the right error code for
545                  * a re-add situation, as this can matter!
546                  */
547                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
548                         int ret2;
549                         struct ldb_dn *dn2 = NULL;
550                         TALLOC_CTX *mem_ctx = talloc_new(module);
551                         if (mem_ctx == NULL) {
552                                 return ldb_module_operr(module);
553                         }
554                         ret2 = ltdb_search_base(module, module,
555                                                 msg->dn, &dn2);
556                         TALLOC_FREE(mem_ctx);
557                         if (ret2 == LDB_SUCCESS) {
558                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
559                         }
560                 }
561                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
562                         ldb_asprintf_errstring(ldb,
563                                                "Entry %s already exists",
564                                                ldb_dn_get_linearized(msg->dn));
565                 }
566                 return ret;
567         }
568
569         ret = ltdb_index_add_new(module, ltdb, msg);
570         if (ret != LDB_SUCCESS) {
571                 /*
572                  * If we failed to index, delete the message again.
573                  *
574                  * This is particularly important for the GUID index
575                  * case, which will only fail for a duplicate DN
576                  * in the index add.
577                  *
578                  * Note that the caller may not cancel the transation
579                  * and this means the above add might really show up!
580                  */
581                 ltdb_delete_noindex(module, msg);
582                 return ret;
583         }
584
585         ret = ltdb_modified(module, msg->dn);
586
587         return ret;
588 }
589
590 /*
591   add a record to the database
592 */
593 static int ltdb_add(struct ltdb_context *ctx)
594 {
595         struct ldb_module *module = ctx->module;
596         struct ldb_request *req = ctx->req;
597         void *data = ldb_module_get_private(module);
598         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
599         int ret = LDB_SUCCESS;
600
601         ret = ltdb_check_special_dn(module, req->op.add.message);
602         if (ret != LDB_SUCCESS) {
603                 return ret;
604         }
605
606         ldb_request_set_state(req, LDB_ASYNC_PENDING);
607
608         if (ltdb_cache_load(module) != 0) {
609                 return LDB_ERR_OPERATIONS_ERROR;
610         }
611
612         ret = ltdb_add_internal(module, ltdb,
613                                 req->op.add.message, true);
614
615         return ret;
616 }
617
618 /*
619   delete a record from the database, not updating indexes (used for deleting
620   index records)
621 */
622 int ltdb_delete_noindex(struct ldb_module *module,
623                         const struct ldb_message *msg)
624 {
625         void *data = ldb_module_get_private(module);
626         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
627         TDB_DATA tdb_key;
628         int ret;
629         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
630
631         if (tdb_key_ctx == NULL) {
632                 return ldb_module_oom(module);
633         }
634
635         if (ltdb->read_only) {
636                 return LDB_ERR_UNWILLING_TO_PERFORM;
637         }
638
639         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
640         if (!tdb_key.dptr) {
641                 TALLOC_FREE(tdb_key_ctx);
642                 return LDB_ERR_OTHER;
643         }
644
645         ret = tdb_delete(ltdb->tdb, tdb_key);
646         TALLOC_FREE(tdb_key_ctx);
647
648         if (ret != 0) {
649                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
650         }
651
652         return ret;
653 }
654
655 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
656 {
657         struct ldb_message *msg;
658         int ret = LDB_SUCCESS;
659
660         msg = ldb_msg_new(module);
661         if (msg == NULL) {
662                 return LDB_ERR_OPERATIONS_ERROR;
663         }
664
665         /* in case any attribute of the message was indexed, we need
666            to fetch the old record */
667         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
668         if (ret != LDB_SUCCESS) {
669                 /* not finding the old record is an error */
670                 goto done;
671         }
672
673         ret = ltdb_delete_noindex(module, msg);
674         if (ret != LDB_SUCCESS) {
675                 goto done;
676         }
677
678         /* remove any indexed attributes */
679         ret = ltdb_index_delete(module, msg);
680         if (ret != LDB_SUCCESS) {
681                 goto done;
682         }
683
684         ret = ltdb_modified(module, dn);
685         if (ret != LDB_SUCCESS) {
686                 goto done;
687         }
688
689 done:
690         talloc_free(msg);
691         return ret;
692 }
693
694 /*
695   delete a record from the database
696 */
697 static int ltdb_delete(struct ltdb_context *ctx)
698 {
699         struct ldb_module *module = ctx->module;
700         struct ldb_request *req = ctx->req;
701         int ret = LDB_SUCCESS;
702
703         ldb_request_set_state(req, LDB_ASYNC_PENDING);
704
705         if (ltdb_cache_load(module) != 0) {
706                 return LDB_ERR_OPERATIONS_ERROR;
707         }
708
709         ret = ltdb_delete_internal(module, req->op.del.dn);
710
711         return ret;
712 }
713
714 /*
715   find an element by attribute name. At the moment this does a linear search,
716   it should be re-coded to use a binary search once all places that modify
717   records guarantee sorted order
718
719   return the index of the first matching element if found, otherwise -1
720 */
721 static int find_element(const struct ldb_message *msg, const char *name)
722 {
723         unsigned int i;
724         for (i=0;i<msg->num_elements;i++) {
725                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
726                         return i;
727                 }
728         }
729         return -1;
730 }
731
732
733 /*
734   add an element to an existing record. Assumes a elements array that we
735   can call re-alloc on, and assumed that we can re-use the data pointers from
736   the passed in additional values. Use with care!
737
738   returns 0 on success, -1 on failure (and sets errno)
739 */
740 static int ltdb_msg_add_element(struct ldb_message *msg,
741                                 struct ldb_message_element *el)
742 {
743         struct ldb_message_element *e2;
744         unsigned int i;
745
746         if (el->num_values == 0) {
747                 /* nothing to do here - we don't add empty elements */
748                 return 0;
749         }
750
751         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
752                               msg->num_elements+1);
753         if (!e2) {
754                 errno = ENOMEM;
755                 return -1;
756         }
757
758         msg->elements = e2;
759
760         e2 = &msg->elements[msg->num_elements];
761
762         e2->name = el->name;
763         e2->flags = el->flags;
764         e2->values = talloc_array(msg->elements,
765                                   struct ldb_val, el->num_values);
766         if (!e2->values) {
767                 errno = ENOMEM;
768                 return -1;
769         }
770         for (i=0;i<el->num_values;i++) {
771                 e2->values[i] = el->values[i];
772         }
773         e2->num_values = el->num_values;
774
775         ++msg->num_elements;
776
777         return 0;
778 }
779
780 /*
781   delete all elements having a specified attribute name
782 */
783 static int msg_delete_attribute(struct ldb_module *module,
784                                 struct ltdb_private *ltdb,
785                                 struct ldb_message *msg, const char *name)
786 {
787         unsigned int i;
788         int ret;
789         struct ldb_message_element *el;
790         bool is_special = ldb_dn_is_special(msg->dn);
791
792         if (!is_special
793             && ltdb->cache->GUID_index_attribute != NULL
794             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
795                 struct ldb_context *ldb = ldb_module_get_ctx(module);
796                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
797                                        "attribute %s (used as DB index)",
798                                        ltdb->cache->GUID_index_attribute);
799                 return LDB_ERR_CONSTRAINT_VIOLATION;
800         }
801
802         el = ldb_msg_find_element(msg, name);
803         if (el == NULL) {
804                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
805         }
806         i = el - msg->elements;
807
808         ret = ltdb_index_del_element(module, ltdb, msg, el);
809         if (ret != LDB_SUCCESS) {
810                 return ret;
811         }
812
813         talloc_free(el->values);
814         if (msg->num_elements > (i+1)) {
815                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
816         }
817         msg->num_elements--;
818         msg->elements = talloc_realloc(msg, msg->elements,
819                                        struct ldb_message_element,
820                                        msg->num_elements);
821         return LDB_SUCCESS;
822 }
823
824 /*
825   delete all elements matching an attribute name/value
826
827   return LDB Error on failure
828 */
829 static int msg_delete_element(struct ldb_module *module,
830                               struct ltdb_private *ltdb,
831                               struct ldb_message *msg,
832                               const char *name,
833                               const struct ldb_val *val)
834 {
835         struct ldb_context *ldb = ldb_module_get_ctx(module);
836         unsigned int i;
837         int found, ret;
838         struct ldb_message_element *el;
839         const struct ldb_schema_attribute *a;
840
841         found = find_element(msg, name);
842         if (found == -1) {
843                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
844         }
845
846         i = (unsigned int) found;
847         el = &(msg->elements[i]);
848
849         a = ldb_schema_attribute_by_name(ldb, el->name);
850
851         for (i=0;i<el->num_values;i++) {
852                 bool matched;
853                 if (a->syntax->operator_fn) {
854                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
855                                                      &el->values[i], val, &matched);
856                         if (ret != LDB_SUCCESS) return ret;
857                 } else {
858                         matched = (a->syntax->comparison_fn(ldb, ldb,
859                                                             &el->values[i], val) == 0);
860                 }
861                 if (matched) {
862                         if (el->num_values == 1) {
863                                 return msg_delete_attribute(module,
864                                                             ltdb, msg, name);
865                         }
866
867                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
868                         if (ret != LDB_SUCCESS) {
869                                 return ret;
870                         }
871
872                         if (i<el->num_values-1) {
873                                 memmove(&el->values[i], &el->values[i+1],
874                                         sizeof(el->values[i])*
875                                                 (el->num_values-(i+1)));
876                         }
877                         el->num_values--;
878
879                         /* per definition we find in a canonicalised message an
880                            attribute value only once. So we are finished here */
881                         return LDB_SUCCESS;
882                 }
883         }
884
885         /* Not found */
886         return LDB_ERR_NO_SUCH_ATTRIBUTE;
887 }
888
889
890 /*
891   modify a record - internal interface
892
893   yuck - this is O(n^2). Luckily n is usually small so we probably
894   get away with it, but if we ever have really large attribute lists
895   then we'll need to look at this again
896
897   'req' is optional, and is used to specify controls if supplied
898 */
899 int ltdb_modify_internal(struct ldb_module *module,
900                          const struct ldb_message *msg,
901                          struct ldb_request *req)
902 {
903         struct ldb_context *ldb = ldb_module_get_ctx(module);
904         void *data = ldb_module_get_private(module);
905         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
906         struct ldb_message *msg2;
907         unsigned int i, j;
908         int ret = LDB_SUCCESS, idx;
909         struct ldb_control *control_permissive = NULL;
910         TALLOC_CTX *mem_ctx = talloc_new(req);
911
912         if (mem_ctx == NULL) {
913                 return ldb_module_oom(module);
914         }
915         
916         if (req) {
917                 control_permissive = ldb_request_get_control(req,
918                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
919         }
920
921         msg2 = ldb_msg_new(mem_ctx);
922         if (msg2 == NULL) {
923                 ret = LDB_ERR_OTHER;
924                 goto done;
925         }
926
927         ret = ltdb_search_dn1(module, msg->dn,
928                               msg2,
929                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
930         if (ret != LDB_SUCCESS) {
931                 goto done;
932         }
933
934         for (i=0; i<msg->num_elements; i++) {
935                 struct ldb_message_element *el = &msg->elements[i], *el2;
936                 struct ldb_val *vals;
937                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
938                 const char *dn;
939                 uint32_t options = 0;
940                 if (control_permissive != NULL) {
941                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
942                 }
943
944                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
945                 case LDB_FLAG_MOD_ADD:
946
947                         if (el->num_values == 0) {
948                                 ldb_asprintf_errstring(ldb,
949                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
950                                                        el->name, ldb_dn_get_linearized(msg2->dn));
951                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
952                                 goto done;
953                         }
954
955                         /* make a copy of the array so that a permissive
956                          * control can remove duplicates without changing the
957                          * original values, but do not copy data as we do not
958                          * need to keep it around once the operation is
959                          * finished */
960                         if (control_permissive) {
961                                 el = talloc(msg2, struct ldb_message_element);
962                                 if (!el) {
963                                         ret = LDB_ERR_OTHER;
964                                         goto done;
965                                 }
966                                 *el = msg->elements[i];
967                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
968                                 if (el->values == NULL) {
969                                         ret = LDB_ERR_OTHER;
970                                         goto done;
971                                 }
972                                 for (j = 0; j < el->num_values; j++) {
973                                         el->values[j] = msg->elements[i].values[j];
974                                 }
975                         }
976
977                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
978                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
979                                                        el->name, ldb_dn_get_linearized(msg2->dn));
980                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
981                                 goto done;
982                         }
983
984                         /* Checks if element already exists */
985                         idx = find_element(msg2, el->name);
986                         if (idx == -1) {
987                                 if (ltdb_msg_add_element(msg2, el) != 0) {
988                                         ret = LDB_ERR_OTHER;
989                                         goto done;
990                                 }
991                                 ret = ltdb_index_add_element(module, ltdb,
992                                                              msg2,
993                                                              el);
994                                 if (ret != LDB_SUCCESS) {
995                                         goto done;
996                                 }
997                         } else {
998                                 j = (unsigned int) idx;
999                                 el2 = &(msg2->elements[j]);
1000
1001                                 /* We cannot add another value on a existing one
1002                                    if the attribute is single-valued */
1003                                 if (ldb_tdb_single_valued(a, el)) {
1004                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1005                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1006                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1007                                         goto done;
1008                                 }
1009
1010                                 /* Check that values don't exist yet on multi-
1011                                    valued attributes or aren't provided twice */
1012                                 if (!(el->flags &
1013                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1014                                         struct ldb_val *duplicate = NULL;
1015                                         ret = ldb_msg_find_common_values(ldb,
1016                                                                          msg2,
1017                                                                          el,
1018                                                                          el2,
1019                                                                          options);
1020
1021                                         if (ret ==
1022                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1023                                                 ldb_asprintf_errstring(ldb,
1024                                                         "attribute '%s': value "
1025                                                         "#%u on '%s' already "
1026                                                         "exists", el->name, j,
1027                                                         ldb_dn_get_linearized(msg2->dn));
1028                                                 goto done;
1029                                         } else if (ret != LDB_SUCCESS) {
1030                                                 goto done;
1031                                         }
1032
1033                                         ret = ldb_msg_find_duplicate_val(
1034                                                 ldb, msg2, el, &duplicate, 0);
1035                                         if (ret != LDB_SUCCESS) {
1036                                                 goto done;
1037                                         }
1038                                         if (duplicate != NULL) {
1039                                                 ldb_asprintf_errstring(
1040                                                         ldb,
1041                                                         "attribute '%s': value "
1042                                                         "'%.*s' on '%s' "
1043                                                         "provided more than "
1044                                                         "once in ADD",
1045                                                         el->name,
1046                                                         (int)duplicate->length,
1047                                                         duplicate->data,
1048                                                         ldb_dn_get_linearized(msg->dn));
1049                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1050                                                 goto done;
1051                                         }
1052                                 }
1053
1054                                 /* Now combine existing and new values to a new
1055                                    attribute record */
1056                                 vals = talloc_realloc(msg2->elements,
1057                                                       el2->values, struct ldb_val,
1058                                                       el2->num_values + el->num_values);
1059                                 if (vals == NULL) {
1060                                         ldb_oom(ldb);
1061                                         ret = LDB_ERR_OTHER;
1062                                         goto done;
1063                                 }
1064
1065                                 for (j=0; j<el->num_values; j++) {
1066                                         vals[el2->num_values + j] =
1067                                                 ldb_val_dup(vals, &el->values[j]);
1068                                 }
1069
1070                                 el2->values = vals;
1071                                 el2->num_values += el->num_values;
1072
1073                                 ret = ltdb_index_add_element(module, ltdb,
1074                                                              msg2, el);
1075                                 if (ret != LDB_SUCCESS) {
1076                                         goto done;
1077                                 }
1078                         }
1079
1080                         break;
1081
1082                 case LDB_FLAG_MOD_REPLACE:
1083
1084                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1085                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1086                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1087                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1088                                 goto done;
1089                         }
1090
1091                         /*
1092                          * We don't need to check this if we have been
1093                          * pre-screened by the repl_meta_data module
1094                          * in Samba, or someone else who can claim to
1095                          * know what they are doing. 
1096                          */
1097                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1098                                 struct ldb_val *duplicate = NULL;
1099
1100                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1101                                                                  &duplicate, 0);
1102                                 if (ret != LDB_SUCCESS) {
1103                                         goto done;
1104                                 }
1105                                 if (duplicate != NULL) {
1106                                         ldb_asprintf_errstring(
1107                                                 ldb,
1108                                                 "attribute '%s': value '%.*s' "
1109                                                 "on '%s' provided more than "
1110                                                 "once in REPLACE",
1111                                                 el->name,
1112                                                 (int)duplicate->length,
1113                                                 duplicate->data,
1114                                                 ldb_dn_get_linearized(msg2->dn));
1115                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1116                                         goto done;
1117                                 }
1118                         }
1119
1120                         /* Checks if element already exists */
1121                         idx = find_element(msg2, el->name);
1122                         if (idx != -1) {
1123                                 j = (unsigned int) idx;
1124                                 el2 = &(msg2->elements[j]);
1125
1126                                 /* we consider two elements to be
1127                                  * equal only if the order
1128                                  * matches. This allows dbcheck to
1129                                  * fix the ordering on attributes
1130                                  * where order matters, such as
1131                                  * objectClass
1132                                  */
1133                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1134                                         continue;
1135                                 }
1136
1137                                 /* Delete the attribute if it exists in the DB */
1138                                 if (msg_delete_attribute(module, ltdb,
1139                                                          msg2,
1140                                                          el->name) != 0) {
1141                                         ret = LDB_ERR_OTHER;
1142                                         goto done;
1143                                 }
1144                         }
1145
1146                         /* Recreate it with the new values */
1147                         if (ltdb_msg_add_element(msg2, el) != 0) {
1148                                 ret = LDB_ERR_OTHER;
1149                                 goto done;
1150                         }
1151
1152                         ret = ltdb_index_add_element(module, ltdb,
1153                                                      msg2, el);
1154                         if (ret != LDB_SUCCESS) {
1155                                 goto done;
1156                         }
1157
1158                         break;
1159
1160                 case LDB_FLAG_MOD_DELETE:
1161                         dn = ldb_dn_get_linearized(msg2->dn);
1162                         if (dn == NULL) {
1163                                 ret = LDB_ERR_OTHER;
1164                                 goto done;
1165                         }
1166
1167                         if (msg->elements[i].num_values == 0) {
1168                                 /* Delete the whole attribute */
1169                                 ret = msg_delete_attribute(module,
1170                                                            ltdb,
1171                                                            msg2,
1172                                                            msg->elements[i].name);
1173                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1174                                     control_permissive) {
1175                                         ret = LDB_SUCCESS;
1176                                 } else {
1177                                         ldb_asprintf_errstring(ldb,
1178                                                                "attribute '%s': no such attribute for delete on '%s'",
1179                                                                msg->elements[i].name, dn);
1180                                 }
1181                                 if (ret != LDB_SUCCESS) {
1182                                         goto done;
1183                                 }
1184                         } else {
1185                                 /* Delete specified values from an attribute */
1186                                 for (j=0; j < msg->elements[i].num_values; j++) {
1187                                         ret = msg_delete_element(module,
1188                                                                  ltdb,
1189                                                                  msg2,
1190                                                                  msg->elements[i].name,
1191                                                                  &msg->elements[i].values[j]);
1192                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1193                                             control_permissive) {
1194                                                 ret = LDB_SUCCESS;
1195                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1196                                                 ldb_asprintf_errstring(ldb,
1197                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1198                                                                        msg->elements[i].name, dn);
1199                                         }
1200                                         if (ret != LDB_SUCCESS) {
1201                                                 goto done;
1202                                         }
1203                                 }
1204                         }
1205                         break;
1206                 default:
1207                         ldb_asprintf_errstring(ldb,
1208                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1209                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1210                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1211                         ret = LDB_ERR_PROTOCOL_ERROR;
1212                         goto done;
1213                 }
1214         }
1215
1216         ret = ltdb_store(module, msg2, TDB_MODIFY);
1217         if (ret != LDB_SUCCESS) {
1218                 goto done;
1219         }
1220
1221         ret = ltdb_modified(module, msg2->dn);
1222         if (ret != LDB_SUCCESS) {
1223                 goto done;
1224         }
1225
1226 done:
1227         TALLOC_FREE(mem_ctx);
1228         return ret;
1229 }
1230
1231 /*
1232   modify a record
1233 */
1234 static int ltdb_modify(struct ltdb_context *ctx)
1235 {
1236         struct ldb_module *module = ctx->module;
1237         struct ldb_request *req = ctx->req;
1238         int ret = LDB_SUCCESS;
1239
1240         ret = ltdb_check_special_dn(module, req->op.mod.message);
1241         if (ret != LDB_SUCCESS) {
1242                 return ret;
1243         }
1244
1245         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1246
1247         if (ltdb_cache_load(module) != 0) {
1248                 return LDB_ERR_OPERATIONS_ERROR;
1249         }
1250
1251         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1252
1253         return ret;
1254 }
1255
1256 /*
1257   rename a record
1258 */
1259 static int ltdb_rename(struct ltdb_context *ctx)
1260 {
1261         struct ldb_module *module = ctx->module;
1262         void *data = ldb_module_get_private(module);
1263         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1264         struct ldb_request *req = ctx->req;
1265         struct ldb_message *msg;
1266         int ret = LDB_SUCCESS;
1267         TDB_DATA tdb_key, tdb_key_old;
1268
1269         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1270
1271         if (ltdb_cache_load(ctx->module) != 0) {
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274
1275         msg = ldb_msg_new(ctx);
1276         if (msg == NULL) {
1277                 return LDB_ERR_OPERATIONS_ERROR;
1278         }
1279
1280         /* we need to fetch the old record to re-add under the new name */
1281         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1282                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1283         if (ret != LDB_SUCCESS) {
1284                 /* not finding the old record is an error */
1285                 return ret;
1286         }
1287
1288         /* We need to, before changing the DB, check if the new DN
1289          * exists, so we can return this error to the caller with an
1290          * unmodified DB
1291          *
1292          * Even in GUID index mode we use ltdb_key_dn() as we are
1293          * trying to figure out if this is just a case rename
1294          */
1295         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1296         if (!tdb_key.dptr) {
1297                 talloc_free(msg);
1298                 return LDB_ERR_OPERATIONS_ERROR;
1299         }
1300
1301         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1302         if (!tdb_key_old.dptr) {
1303                 talloc_free(msg);
1304                 talloc_free(tdb_key.dptr);
1305                 return LDB_ERR_OPERATIONS_ERROR;
1306         }
1307
1308         /*
1309          * Only declare a conflict if the new DN already exists,
1310          * and it isn't a case change on the old DN
1311          */
1312         if (tdb_key_old.dsize != tdb_key.dsize
1313             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1314                 ret = ltdb_search_base(module,
1315                                        req->op.rename.newdn);
1316                 if (ret == LDB_SUCCESS) {
1317                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1318                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1319                         ret = LDB_SUCCESS;
1320                 }
1321         }
1322
1323         /* finding the new record already in the DB is an error */
1324
1325         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1326                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1327                                        "Entry %s already exists",
1328                                        ldb_dn_get_linearized(req->op.rename.newdn));
1329         }
1330         if (ret != LDB_SUCCESS) {
1331                 talloc_free(tdb_key_old.dptr);
1332                 talloc_free(tdb_key.dptr);
1333                 talloc_free(msg);
1334                 return ret;
1335         }
1336
1337         talloc_free(tdb_key_old.dptr);
1338         talloc_free(tdb_key.dptr);
1339
1340         /* Always delete first then add, to avoid conflicts with
1341          * unique indexes. We rely on the transaction to make this
1342          * atomic
1343          */
1344         ret = ltdb_delete_internal(module, msg->dn);
1345         if (ret != LDB_SUCCESS) {
1346                 talloc_free(msg);
1347                 return ret;
1348         }
1349
1350         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1351         if (msg->dn == NULL) {
1352                 talloc_free(msg);
1353                 return LDB_ERR_OPERATIONS_ERROR;
1354         }
1355
1356         /* We don't check single value as we can have more than 1 with
1357          * deleted attributes. We could go through all elements but that's
1358          * maybe not the most efficient way
1359          */
1360         ret = ltdb_add_internal(module, ltdb, msg, false);
1361
1362         talloc_free(msg);
1363
1364         return ret;
1365 }
1366
1367 static int ltdb_start_trans(struct ldb_module *module)
1368 {
1369         void *data = ldb_module_get_private(module);
1370         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1371
1372         /* Do not take out the transaction lock on a read-only DB */
1373         if (ltdb->read_only) {
1374                 return LDB_ERR_UNWILLING_TO_PERFORM;
1375         }
1376
1377         if (tdb_transaction_start(ltdb->tdb) != 0) {
1378                 return ltdb_err_map(tdb_error(ltdb->tdb));
1379         }
1380
1381         ltdb->in_transaction++;
1382
1383         ltdb_index_transaction_start(module);
1384
1385         return LDB_SUCCESS;
1386 }
1387
1388 static int ltdb_prepare_commit(struct ldb_module *module)
1389 {
1390         int ret;
1391         void *data = ldb_module_get_private(module);
1392         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1393
1394         if (ltdb->in_transaction != 1) {
1395                 return LDB_SUCCESS;
1396         }
1397
1398         ret = ltdb_index_transaction_commit(module);
1399         if (ret != LDB_SUCCESS) {
1400                 tdb_transaction_cancel(ltdb->tdb);
1401                 ltdb->in_transaction--;
1402                 return ret;
1403         }
1404
1405         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1406                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1407                 ltdb->in_transaction--;
1408                 ldb_debug_set(ldb_module_get_ctx(module),
1409                               LDB_DEBUG_FATAL,
1410                               "Failure during "
1411                               "tdb_transaction_prepare_commit(): %s -> %s",
1412                               tdb_errorstr(ltdb->tdb),
1413                               ldb_strerror(ret));
1414                 return ret;
1415         }
1416
1417         ltdb->prepared_commit = true;
1418
1419         return LDB_SUCCESS;
1420 }
1421
1422 static int ltdb_end_trans(struct ldb_module *module)
1423 {
1424         int ret;
1425         void *data = ldb_module_get_private(module);
1426         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1427
1428         if (!ltdb->prepared_commit) {
1429                 ret = ltdb_prepare_commit(module);
1430                 if (ret != LDB_SUCCESS) {
1431                         return ret;
1432                 }
1433         }
1434
1435         ltdb->in_transaction--;
1436         ltdb->prepared_commit = false;
1437
1438         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1439                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1440                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1441                                        "Failure during tdb_transaction_commit(): %s -> %s",
1442                                        tdb_errorstr(ltdb->tdb),
1443                                        ldb_strerror(ret));
1444                 return ret;
1445         }
1446
1447         return LDB_SUCCESS;
1448 }
1449
1450 static int ltdb_del_trans(struct ldb_module *module)
1451 {
1452         void *data = ldb_module_get_private(module);
1453         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1454
1455         ltdb->in_transaction--;
1456
1457         if (ltdb_index_transaction_cancel(module) != 0) {
1458                 tdb_transaction_cancel(ltdb->tdb);
1459                 return ltdb_err_map(tdb_error(ltdb->tdb));
1460         }
1461
1462         tdb_transaction_cancel(ltdb->tdb);
1463         return LDB_SUCCESS;
1464 }
1465
1466 /*
1467   return sequenceNumber from @BASEINFO
1468 */
1469 static int ltdb_sequence_number(struct ltdb_context *ctx,
1470                                 struct ldb_extended **ext)
1471 {
1472         struct ldb_context *ldb;
1473         struct ldb_module *module = ctx->module;
1474         struct ldb_request *req = ctx->req;
1475         TALLOC_CTX *tmp_ctx = NULL;
1476         struct ldb_seqnum_request *seq;
1477         struct ldb_seqnum_result *res;
1478         struct ldb_message *msg = NULL;
1479         struct ldb_dn *dn;
1480         const char *date;
1481         int ret = LDB_SUCCESS;
1482
1483         ldb = ldb_module_get_ctx(module);
1484
1485         seq = talloc_get_type(req->op.extended.data,
1486                                 struct ldb_seqnum_request);
1487         if (seq == NULL) {
1488                 return LDB_ERR_OPERATIONS_ERROR;
1489         }
1490
1491         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1492
1493         if (ltdb_lock_read(module) != 0) {
1494                 return LDB_ERR_OPERATIONS_ERROR;
1495         }
1496
1497         res = talloc_zero(req, struct ldb_seqnum_result);
1498         if (res == NULL) {
1499                 ret = LDB_ERR_OPERATIONS_ERROR;
1500                 goto done;
1501         }
1502
1503         tmp_ctx = talloc_new(req);
1504         if (tmp_ctx == NULL) {
1505                 ret = LDB_ERR_OPERATIONS_ERROR;
1506                 goto done;
1507         }
1508
1509         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1510         if (dn == NULL) {
1511                 ret = LDB_ERR_OPERATIONS_ERROR;
1512                 goto done;
1513         }
1514
1515         msg = ldb_msg_new(tmp_ctx);
1516         if (msg == NULL) {
1517                 ret = LDB_ERR_OPERATIONS_ERROR;
1518                 goto done;
1519         }
1520
1521         ret = ltdb_search_dn1(module, dn, msg, 0);
1522         if (ret != LDB_SUCCESS) {
1523                 goto done;
1524         }
1525
1526         switch (seq->type) {
1527         case LDB_SEQ_HIGHEST_SEQ:
1528                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1529                 break;
1530         case LDB_SEQ_NEXT:
1531                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1532                 res->seq_num++;
1533                 break;
1534         case LDB_SEQ_HIGHEST_TIMESTAMP:
1535                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1536                 if (date) {
1537                         res->seq_num = ldb_string_to_time(date);
1538                 } else {
1539                         res->seq_num = 0;
1540                         /* zero is as good as anything when we don't know */
1541                 }
1542                 break;
1543         }
1544
1545         *ext = talloc_zero(req, struct ldb_extended);
1546         if (*ext == NULL) {
1547                 ret = LDB_ERR_OPERATIONS_ERROR;
1548                 goto done;
1549         }
1550         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1551         (*ext)->data = talloc_steal(*ext, res);
1552
1553 done:
1554         talloc_free(tmp_ctx);
1555         ltdb_unlock_read(module);
1556         return ret;
1557 }
1558
1559 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1560 {
1561         struct ldb_context *ldb;
1562         struct ldb_request *req;
1563         struct ldb_reply *ares;
1564
1565         ldb = ldb_module_get_ctx(ctx->module);
1566         req = ctx->req;
1567
1568         /* if we already returned an error just return */
1569         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1570                 return;
1571         }
1572
1573         ares = talloc_zero(req, struct ldb_reply);
1574         if (!ares) {
1575                 ldb_oom(ldb);
1576                 req->callback(req, NULL);
1577                 return;
1578         }
1579         ares->type = LDB_REPLY_DONE;
1580         ares->error = error;
1581
1582         req->callback(req, ares);
1583 }
1584
1585 static void ltdb_timeout(struct tevent_context *ev,
1586                           struct tevent_timer *te,
1587                           struct timeval t,
1588                           void *private_data)
1589 {
1590         struct ltdb_context *ctx;
1591         ctx = talloc_get_type(private_data, struct ltdb_context);
1592
1593         if (!ctx->request_terminated) {
1594                 /* request is done now */
1595                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1596         }
1597
1598         if (ctx->spy) {
1599                 /* neutralize the spy */
1600                 ctx->spy->ctx = NULL;
1601                 ctx->spy = NULL;
1602         }
1603         talloc_free(ctx);
1604 }
1605
1606 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1607                                         struct ldb_extended *ext,
1608                                         int error)
1609 {
1610         struct ldb_context *ldb;
1611         struct ldb_request *req;
1612         struct ldb_reply *ares;
1613
1614         ldb = ldb_module_get_ctx(ctx->module);
1615         req = ctx->req;
1616
1617         /* if we already returned an error just return */
1618         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1619                 return;
1620         }
1621
1622         ares = talloc_zero(req, struct ldb_reply);
1623         if (!ares) {
1624                 ldb_oom(ldb);
1625                 req->callback(req, NULL);
1626                 return;
1627         }
1628         ares->type = LDB_REPLY_DONE;
1629         ares->response = ext;
1630         ares->error = error;
1631
1632         req->callback(req, ares);
1633 }
1634
1635 static void ltdb_handle_extended(struct ltdb_context *ctx)
1636 {
1637         struct ldb_extended *ext = NULL;
1638         int ret;
1639
1640         if (strcmp(ctx->req->op.extended.oid,
1641                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1642                 /* get sequence number */
1643                 ret = ltdb_sequence_number(ctx, &ext);
1644         } else {
1645                 /* not recognized */
1646                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1647         }
1648
1649         ltdb_request_extended_done(ctx, ext, ret);
1650 }
1651
1652 static void ltdb_callback(struct tevent_context *ev,
1653                           struct tevent_timer *te,
1654                           struct timeval t,
1655                           void *private_data)
1656 {
1657         struct ltdb_context *ctx;
1658         int ret;
1659
1660         ctx = talloc_get_type(private_data, struct ltdb_context);
1661
1662         if (ctx->request_terminated) {
1663                 goto done;
1664         }
1665
1666         switch (ctx->req->operation) {
1667         case LDB_SEARCH:
1668                 ret = ltdb_search(ctx);
1669                 break;
1670         case LDB_ADD:
1671                 ret = ltdb_add(ctx);
1672                 break;
1673         case LDB_MODIFY:
1674                 ret = ltdb_modify(ctx);
1675                 break;
1676         case LDB_DELETE:
1677                 ret = ltdb_delete(ctx);
1678                 break;
1679         case LDB_RENAME:
1680                 ret = ltdb_rename(ctx);
1681                 break;
1682         case LDB_EXTENDED:
1683                 ltdb_handle_extended(ctx);
1684                 goto done;
1685         default:
1686                 /* no other op supported */
1687                 ret = LDB_ERR_PROTOCOL_ERROR;
1688         }
1689
1690         if (!ctx->request_terminated) {
1691                 /* request is done now */
1692                 ltdb_request_done(ctx, ret);
1693         }
1694
1695 done:
1696         if (ctx->spy) {
1697                 /* neutralize the spy */
1698                 ctx->spy->ctx = NULL;
1699                 ctx->spy = NULL;
1700         }
1701         talloc_free(ctx);
1702 }
1703
1704 static int ltdb_request_destructor(void *ptr)
1705 {
1706         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1707
1708         if (spy->ctx != NULL) {
1709                 spy->ctx->spy = NULL;
1710                 spy->ctx->request_terminated = true;
1711                 spy->ctx = NULL;
1712         }
1713
1714         return 0;
1715 }
1716
1717 static int ltdb_handle_request(struct ldb_module *module,
1718                                struct ldb_request *req)
1719 {
1720         struct ldb_control *control_permissive;
1721         struct ldb_context *ldb;
1722         struct tevent_context *ev;
1723         struct ltdb_context *ac;
1724         struct tevent_timer *te;
1725         struct timeval tv;
1726         unsigned int i;
1727
1728         ldb = ldb_module_get_ctx(module);
1729
1730         control_permissive = ldb_request_get_control(req,
1731                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1732
1733         for (i = 0; req->controls && req->controls[i]; i++) {
1734                 if (req->controls[i]->critical &&
1735                     req->controls[i] != control_permissive) {
1736                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1737                                                req->controls[i]->oid);
1738                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1739                 }
1740         }
1741
1742         if (req->starttime == 0 || req->timeout == 0) {
1743                 ldb_set_errstring(ldb, "Invalid timeout settings");
1744                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1745         }
1746
1747         ev = ldb_handle_get_event_context(req->handle);
1748
1749         ac = talloc_zero(ldb, struct ltdb_context);
1750         if (ac == NULL) {
1751                 ldb_oom(ldb);
1752                 return LDB_ERR_OPERATIONS_ERROR;
1753         }
1754
1755         ac->module = module;
1756         ac->req = req;
1757
1758         tv.tv_sec = 0;
1759         tv.tv_usec = 0;
1760         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1761         if (NULL == te) {
1762                 talloc_free(ac);
1763                 return LDB_ERR_OPERATIONS_ERROR;
1764         }
1765
1766         if (req->timeout > 0) {
1767                 tv.tv_sec = req->starttime + req->timeout;
1768                 tv.tv_usec = 0;
1769                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1770                                                      ltdb_timeout, ac);
1771                 if (NULL == ac->timeout_event) {
1772                         talloc_free(ac);
1773                         return LDB_ERR_OPERATIONS_ERROR;
1774                 }
1775         }
1776
1777         /* set a spy so that we do not try to use the request context
1778          * if it is freed before ltdb_callback fires */
1779         ac->spy = talloc(req, struct ltdb_req_spy);
1780         if (NULL == ac->spy) {
1781                 talloc_free(ac);
1782                 return LDB_ERR_OPERATIONS_ERROR;
1783         }
1784         ac->spy->ctx = ac;
1785
1786         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1787
1788         return LDB_SUCCESS;
1789 }
1790
1791 static int ltdb_init_rootdse(struct ldb_module *module)
1792 {
1793         /* ignore errors on this - we expect it for non-sam databases */
1794         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1795
1796         /* there can be no module beyond the backend, just return */
1797         return LDB_SUCCESS;
1798 }
1799
1800 static const struct ldb_module_ops ltdb_ops = {
1801         .name              = "tdb",
1802         .init_context      = ltdb_init_rootdse,
1803         .search            = ltdb_handle_request,
1804         .add               = ltdb_handle_request,
1805         .modify            = ltdb_handle_request,
1806         .del               = ltdb_handle_request,
1807         .rename            = ltdb_handle_request,
1808         .extended          = ltdb_handle_request,
1809         .start_transaction = ltdb_start_trans,
1810         .end_transaction   = ltdb_end_trans,
1811         .prepare_commit    = ltdb_prepare_commit,
1812         .del_transaction   = ltdb_del_trans,
1813         .read_lock         = ltdb_lock_read,
1814         .read_unlock       = ltdb_unlock_read,
1815 };
1816
1817 /*
1818   connect to the database
1819 */
1820 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1821                         unsigned int flags, const char *options[],
1822                         struct ldb_module **_module)
1823 {
1824         struct ldb_module *module;
1825         const char *path;
1826         int tdb_flags, open_flags;
1827         struct ltdb_private *ltdb;
1828
1829         /*
1830          * We hold locks, so we must use a private event context
1831          * on each returned handle
1832          */
1833
1834         ldb_set_require_private_event_context(ldb);
1835
1836         /* parse the url */
1837         if (strchr(url, ':')) {
1838                 if (strncmp(url, "tdb://", 6) != 0) {
1839                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1840                                   "Invalid tdb URL '%s'", url);
1841                         return LDB_ERR_OPERATIONS_ERROR;
1842                 }
1843                 path = url+6;
1844         } else {
1845                 path = url;
1846         }
1847
1848         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1849
1850         /* check for the 'nosync' option */
1851         if (flags & LDB_FLG_NOSYNC) {
1852                 tdb_flags |= TDB_NOSYNC;
1853         }
1854
1855         /* and nommap option */
1856         if (flags & LDB_FLG_NOMMAP) {
1857                 tdb_flags |= TDB_NOMMAP;
1858         }
1859
1860         ltdb = talloc_zero(ldb, struct ltdb_private);
1861         if (!ltdb) {
1862                 ldb_oom(ldb);
1863                 return LDB_ERR_OPERATIONS_ERROR;
1864         }
1865
1866         if (flags & LDB_FLG_RDONLY) {
1867                 /*
1868                  * This is weird, but because we can only have one tdb
1869                  * in this process, and the other one could be
1870                  * read-write, we can't use the tdb readonly.  Plus a
1871                  * read only tdb prohibits the all-record lock.
1872                  */
1873                 open_flags = O_RDWR;
1874
1875                 ltdb->read_only = true;
1876
1877         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1878                 /*
1879                  * This is used by ldbsearch to prevent creation of the database
1880                  * if the name is wrong
1881                  */
1882                 open_flags = O_RDWR;
1883         } else {
1884                 /*
1885                  * This is the normal case
1886                  */
1887                 open_flags = O_CREAT | O_RDWR;
1888         }
1889
1890         /* note that we use quite a large default hash size */
1891         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1892                                    tdb_flags, open_flags,
1893                                    ldb_get_create_perms(ldb), ldb);
1894         if (!ltdb->tdb) {
1895                 ldb_asprintf_errstring(ldb,
1896                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1897                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1898                           "Unable to open tdb '%s': %s", path, strerror(errno));
1899                 talloc_free(ltdb);
1900                 if (errno == EACCES || errno == EPERM) {
1901                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1902                 }
1903                 return LDB_ERR_OPERATIONS_ERROR;
1904         }
1905
1906         if (getenv("LDB_WARN_UNINDEXED")) {
1907                 ltdb->warn_unindexed = true;
1908         }
1909
1910         if (getenv("LDB_WARN_REINDEX")) {
1911                 ltdb->warn_reindex = true;
1912         }
1913
1914         ltdb->sequence_number = 0;
1915
1916         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1917         if (!module) {
1918                 ldb_oom(ldb);
1919                 talloc_free(ltdb);
1920                 return LDB_ERR_OPERATIONS_ERROR;
1921         }
1922         ldb_module_set_private(module, ltdb);
1923         talloc_steal(module, ltdb);
1924
1925         if (ltdb_cache_load(module) != 0) {
1926                 ldb_asprintf_errstring(ldb,
1927                                        "Unable to load ltdb cache records of tdb '%s'", path);
1928                 talloc_free(module);
1929                 return LDB_ERR_OPERATIONS_ERROR;
1930         }
1931
1932         *_module = module;
1933         return LDB_SUCCESS;
1934 }
1935
1936 int ldb_tdb_init(const char *version)
1937 {
1938         LDB_MODULE_CHECK_VERSION(version);
1939         return ldb_register_backend("tdb", ltdb_connect, false);
1940 }