ldb_tdb: modify ltdb_delete_noindex() to take a struct ldb_message
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, struct ldb_dn *dn)
179 {
180         struct ldb_context *ldb = ldb_module_get_ctx(module);
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(ldb, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /*
225   form a TDB_DATA for a record key
226   caller frees
227
228   note that the key for a record can depend on whether the
229   dn refers to a case sensitive index record or not
230 */
231 TDB_DATA ltdb_key_msg(struct ldb_module *module,
232                       const struct ldb_message *msg)
233 {
234         return ltdb_key_dn(module, msg->dn);
235 }
236
237 /*
238   check special dn's have valid attributes
239   currently only @ATTRIBUTES is checked
240 */
241 static int ltdb_check_special_dn(struct ldb_module *module,
242                                  const struct ldb_message *msg)
243 {
244         struct ldb_context *ldb = ldb_module_get_ctx(module);
245         unsigned int i, j;
246
247         if (! ldb_dn_is_special(msg->dn) ||
248             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
249                 return LDB_SUCCESS;
250         }
251
252         /* we have @ATTRIBUTES, let's check attributes are fine */
253         /* should we check that we deny multivalued attributes ? */
254         for (i = 0; i < msg->num_elements; i++) {
255                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
256
257                 for (j = 0; j < msg->elements[i].num_values; j++) {
258                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
259                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
260                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
261                         }
262                 }
263         }
264
265         return LDB_SUCCESS;
266 }
267
268
269 /*
270   we've made a modification to a dn - possibly reindex and
271   update sequence number
272 */
273 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
274 {
275         int ret = LDB_SUCCESS;
276         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
277
278         /* only allow modifies inside a transaction, otherwise the
279          * ldb is unsafe */
280         if (ltdb->in_transaction == 0) {
281                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
282                 return LDB_ERR_OPERATIONS_ERROR;
283         }
284
285         if (ldb_dn_is_special(dn) &&
286             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
287              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
288         {
289                 if (ltdb->warn_reindex) {
290                         ldb_debug(ldb_module_get_ctx(module),
291                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
292                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
293                 }
294                 ret = ltdb_reindex(module);
295         }
296
297         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
298         if (ret == LDB_SUCCESS &&
299             !(ldb_dn_is_special(dn) &&
300               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
301                 ret = ltdb_increase_sequence_number(module);
302         }
303
304         /* If the modify was to @OPTIONS, reload the cache */
305         if (ret == LDB_SUCCESS &&
306             ldb_dn_is_special(dn) &&
307             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
308                 ret = ltdb_cache_reload(module);
309         }
310
311         return ret;
312 }
313
314 /*
315   store a record into the db
316 */
317 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         TDB_DATA tdb_key, tdb_data;
322         struct ldb_val ldb_data;
323         int ret = LDB_SUCCESS;
324
325         if (ltdb->read_only) {
326                 return LDB_ERR_UNWILLING_TO_PERFORM;
327         }
328
329         tdb_key = ltdb_key_msg(module, msg);
330         if (tdb_key.dptr == NULL) {
331                 return LDB_ERR_OTHER;
332         }
333
334         ret = ldb_pack_data(ldb_module_get_ctx(module),
335                             msg, &ldb_data);
336         if (ret == -1) {
337                 talloc_free(tdb_key.dptr);
338                 return LDB_ERR_OTHER;
339         }
340
341         tdb_data.dptr = ldb_data.data;
342         tdb_data.dsize = ldb_data.length;
343
344         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
345         if (ret != 0) {
346                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
347                 goto done;
348         }
349
350 done:
351         talloc_free(tdb_key.dptr);
352         talloc_free(ldb_data.data);
353
354         return ret;
355 }
356
357
358 /*
359   check if a attribute is a single valued, for a given element
360  */
361 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
362                                   struct ldb_message_element *el)
363 {
364         if (!a) return false;
365         if (el != NULL) {
366                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
367                         /* override from a ldb module, for example
368                            used for the description field, which is
369                            marked multi-valued in the schema but which
370                            should not actually accept multiple
371                            values */
372                         return true;
373                 }
374                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
375                         /* override from a ldb module, for example used for
376                            deleted linked attribute entries */
377                         return false;
378                 }
379         }
380         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
381                 return true;
382         }
383         return false;
384 }
385
386 static int ltdb_add_internal(struct ldb_module *module,
387                              struct ltdb_private *ltdb,
388                              const struct ldb_message *msg,
389                              bool check_single_value)
390 {
391         struct ldb_context *ldb = ldb_module_get_ctx(module);
392         int ret = LDB_SUCCESS;
393         unsigned int i;
394
395         for (i=0;i<msg->num_elements;i++) {
396                 struct ldb_message_element *el = &msg->elements[i];
397                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
398
399                 if (el->num_values == 0) {
400                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
401                                                el->name, ldb_dn_get_linearized(msg->dn));
402                         return LDB_ERR_CONSTRAINT_VIOLATION;
403                 }
404                 if (check_single_value &&
405                                 el->num_values > 1 &&
406                                 ldb_tdb_single_valued(a, el)) {
407                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
408                                                el->name, ldb_dn_get_linearized(msg->dn));
409                         return LDB_ERR_CONSTRAINT_VIOLATION;
410                 }
411
412                 /* Do not check "@ATTRIBUTES" for duplicated values */
413                 if (ldb_dn_is_special(msg->dn) &&
414                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
415                         continue;
416                 }
417
418                 if (check_single_value &&
419                     !(el->flags &
420                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
421                         struct ldb_val *duplicate = NULL;
422
423                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
424                                                          el, &duplicate, 0);
425                         if (ret != LDB_SUCCESS) {
426                                 return ret;
427                         }
428                         if (duplicate != NULL) {
429                                 ldb_asprintf_errstring(
430                                         ldb,
431                                         "attribute '%s': value '%.*s' on '%s' "
432                                         "provided more than once in ADD object",
433                                         el->name,
434                                         (int)duplicate->length,
435                                         duplicate->data,
436                                         ldb_dn_get_linearized(msg->dn));
437                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
438                         }
439                 }
440         }
441
442         ret = ltdb_store(module, msg, TDB_INSERT);
443         if (ret != LDB_SUCCESS) {
444                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
445                         ldb_asprintf_errstring(ldb,
446                                                "Entry %s already exists",
447                                                ldb_dn_get_linearized(msg->dn));
448                 }
449                 return ret;
450         }
451
452         ret = ltdb_index_add_new(module, ltdb, msg);
453         if (ret != LDB_SUCCESS) {
454                 return ret;
455         }
456
457         ret = ltdb_modified(module, msg->dn);
458
459         return ret;
460 }
461
462 /*
463   add a record to the database
464 */
465 static int ltdb_add(struct ltdb_context *ctx)
466 {
467         struct ldb_module *module = ctx->module;
468         struct ldb_request *req = ctx->req;
469         void *data = ldb_module_get_private(module);
470         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
471         int ret = LDB_SUCCESS;
472
473         ret = ltdb_check_special_dn(module, req->op.add.message);
474         if (ret != LDB_SUCCESS) {
475                 return ret;
476         }
477
478         ldb_request_set_state(req, LDB_ASYNC_PENDING);
479
480         if (ltdb_cache_load(module) != 0) {
481                 return LDB_ERR_OPERATIONS_ERROR;
482         }
483
484         ret = ltdb_add_internal(module, ltdb,
485                                 req->op.add.message, true);
486
487         return ret;
488 }
489
490 /*
491   delete a record from the database, not updating indexes (used for deleting
492   index records)
493 */
494 int ltdb_delete_noindex(struct ldb_module *module,
495                         const struct ldb_message *msg)
496 {
497         void *data = ldb_module_get_private(module);
498         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
499         TDB_DATA tdb_key;
500         int ret;
501
502         if (ltdb->read_only) {
503                 return LDB_ERR_UNWILLING_TO_PERFORM;
504         }
505
506         tdb_key = ltdb_key_dn(module, msg->dn);
507         if (!tdb_key.dptr) {
508                 return LDB_ERR_OTHER;
509         }
510
511         ret = tdb_delete(ltdb->tdb, tdb_key);
512         talloc_free(tdb_key.dptr);
513
514         if (ret != 0) {
515                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
516         }
517
518         return ret;
519 }
520
521 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
522 {
523         struct ldb_message *msg;
524         int ret = LDB_SUCCESS;
525
526         msg = ldb_msg_new(module);
527         if (msg == NULL) {
528                 return LDB_ERR_OPERATIONS_ERROR;
529         }
530
531         /* in case any attribute of the message was indexed, we need
532            to fetch the old record */
533         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
534         if (ret != LDB_SUCCESS) {
535                 /* not finding the old record is an error */
536                 goto done;
537         }
538
539         ret = ltdb_delete_noindex(module, msg);
540         if (ret != LDB_SUCCESS) {
541                 goto done;
542         }
543
544         /* remove any indexed attributes */
545         ret = ltdb_index_delete(module, msg);
546         if (ret != LDB_SUCCESS) {
547                 goto done;
548         }
549
550         ret = ltdb_modified(module, dn);
551         if (ret != LDB_SUCCESS) {
552                 goto done;
553         }
554
555 done:
556         talloc_free(msg);
557         return ret;
558 }
559
560 /*
561   delete a record from the database
562 */
563 static int ltdb_delete(struct ltdb_context *ctx)
564 {
565         struct ldb_module *module = ctx->module;
566         struct ldb_request *req = ctx->req;
567         int ret = LDB_SUCCESS;
568
569         ldb_request_set_state(req, LDB_ASYNC_PENDING);
570
571         if (ltdb_cache_load(module) != 0) {
572                 return LDB_ERR_OPERATIONS_ERROR;
573         }
574
575         ret = ltdb_delete_internal(module, req->op.del.dn);
576
577         return ret;
578 }
579
580 /*
581   find an element by attribute name. At the moment this does a linear search,
582   it should be re-coded to use a binary search once all places that modify
583   records guarantee sorted order
584
585   return the index of the first matching element if found, otherwise -1
586 */
587 static int find_element(const struct ldb_message *msg, const char *name)
588 {
589         unsigned int i;
590         for (i=0;i<msg->num_elements;i++) {
591                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
592                         return i;
593                 }
594         }
595         return -1;
596 }
597
598
599 /*
600   add an element to an existing record. Assumes a elements array that we
601   can call re-alloc on, and assumed that we can re-use the data pointers from
602   the passed in additional values. Use with care!
603
604   returns 0 on success, -1 on failure (and sets errno)
605 */
606 static int ltdb_msg_add_element(struct ldb_message *msg,
607                                 struct ldb_message_element *el)
608 {
609         struct ldb_message_element *e2;
610         unsigned int i;
611
612         if (el->num_values == 0) {
613                 /* nothing to do here - we don't add empty elements */
614                 return 0;
615         }
616
617         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
618                               msg->num_elements+1);
619         if (!e2) {
620                 errno = ENOMEM;
621                 return -1;
622         }
623
624         msg->elements = e2;
625
626         e2 = &msg->elements[msg->num_elements];
627
628         e2->name = el->name;
629         e2->flags = el->flags;
630         e2->values = talloc_array(msg->elements,
631                                   struct ldb_val, el->num_values);
632         if (!e2->values) {
633                 errno = ENOMEM;
634                 return -1;
635         }
636         for (i=0;i<el->num_values;i++) {
637                 e2->values[i] = el->values[i];
638         }
639         e2->num_values = el->num_values;
640
641         ++msg->num_elements;
642
643         return 0;
644 }
645
646 /*
647   delete all elements having a specified attribute name
648 */
649 static int msg_delete_attribute(struct ldb_module *module,
650                                 struct ltdb_private *ltdb,
651                                 struct ldb_message *msg, const char *name)
652 {
653         unsigned int i;
654         int ret;
655         struct ldb_message_element *el;
656         bool is_special = ldb_dn_is_special(msg->dn);
657
658         if (!is_special
659             && ltdb->cache->GUID_index_attribute != NULL
660             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
661                 struct ldb_context *ldb = ldb_module_get_ctx(module);
662                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
663                                        "attribute %s (used as DB index)",
664                                        ltdb->cache->GUID_index_attribute);
665                 return LDB_ERR_CONSTRAINT_VIOLATION;
666         }
667
668         el = ldb_msg_find_element(msg, name);
669         if (el == NULL) {
670                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
671         }
672         i = el - msg->elements;
673
674         ret = ltdb_index_del_element(module, ltdb, msg->dn, el);
675         if (ret != LDB_SUCCESS) {
676                 return ret;
677         }
678
679         talloc_free(el->values);
680         if (msg->num_elements > (i+1)) {
681                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
682         }
683         msg->num_elements--;
684         msg->elements = talloc_realloc(msg, msg->elements,
685                                        struct ldb_message_element,
686                                        msg->num_elements);
687         return LDB_SUCCESS;
688 }
689
690 /*
691   delete all elements matching an attribute name/value
692
693   return LDB Error on failure
694 */
695 static int msg_delete_element(struct ldb_module *module,
696                               struct ltdb_private *ltdb,
697                               struct ldb_message *msg,
698                               const char *name,
699                               const struct ldb_val *val)
700 {
701         struct ldb_context *ldb = ldb_module_get_ctx(module);
702         unsigned int i;
703         int found, ret;
704         struct ldb_message_element *el;
705         const struct ldb_schema_attribute *a;
706
707         found = find_element(msg, name);
708         if (found == -1) {
709                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
710         }
711
712         i = (unsigned int) found;
713         el = &(msg->elements[i]);
714
715         a = ldb_schema_attribute_by_name(ldb, el->name);
716
717         for (i=0;i<el->num_values;i++) {
718                 bool matched;
719                 if (a->syntax->operator_fn) {
720                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
721                                                      &el->values[i], val, &matched);
722                         if (ret != LDB_SUCCESS) return ret;
723                 } else {
724                         matched = (a->syntax->comparison_fn(ldb, ldb,
725                                                             &el->values[i], val) == 0);
726                 }
727                 if (matched) {
728                         if (el->num_values == 1) {
729                                 return msg_delete_attribute(module,
730                                                             ltdb, msg, name);
731                         }
732
733                         ret = ltdb_index_del_value(module, ltdb, msg->dn, el, i);
734                         if (ret != LDB_SUCCESS) {
735                                 return ret;
736                         }
737
738                         if (i<el->num_values-1) {
739                                 memmove(&el->values[i], &el->values[i+1],
740                                         sizeof(el->values[i])*
741                                                 (el->num_values-(i+1)));
742                         }
743                         el->num_values--;
744
745                         /* per definition we find in a canonicalised message an
746                            attribute value only once. So we are finished here */
747                         return LDB_SUCCESS;
748                 }
749         }
750
751         /* Not found */
752         return LDB_ERR_NO_SUCH_ATTRIBUTE;
753 }
754
755
756 /*
757   modify a record - internal interface
758
759   yuck - this is O(n^2). Luckily n is usually small so we probably
760   get away with it, but if we ever have really large attribute lists
761   then we'll need to look at this again
762
763   'req' is optional, and is used to specify controls if supplied
764 */
765 int ltdb_modify_internal(struct ldb_module *module,
766                          const struct ldb_message *msg,
767                          struct ldb_request *req)
768 {
769         struct ldb_context *ldb = ldb_module_get_ctx(module);
770         void *data = ldb_module_get_private(module);
771         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
772         struct ldb_message *msg2;
773         unsigned int i, j;
774         int ret = LDB_SUCCESS, idx;
775         struct ldb_control *control_permissive = NULL;
776         TALLOC_CTX *mem_ctx = talloc_new(req);
777
778         if (mem_ctx == NULL) {
779                 return ldb_module_oom(module);
780         }
781         
782         if (req) {
783                 control_permissive = ldb_request_get_control(req,
784                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
785         }
786
787         msg2 = ldb_msg_new(mem_ctx);
788         if (msg2 == NULL) {
789                 ret = LDB_ERR_OTHER;
790                 goto done;
791         }
792
793         ret = ltdb_search_dn1(module, msg->dn,
794                               msg2,
795                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
796         if (ret != LDB_SUCCESS) {
797                 goto done;
798         }
799
800         for (i=0; i<msg->num_elements; i++) {
801                 struct ldb_message_element *el = &msg->elements[i], *el2;
802                 struct ldb_val *vals;
803                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
804                 const char *dn;
805                 uint32_t options = 0;
806                 if (control_permissive != NULL) {
807                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
808                 }
809
810                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
811                 case LDB_FLAG_MOD_ADD:
812
813                         if (el->num_values == 0) {
814                                 ldb_asprintf_errstring(ldb,
815                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
816                                                        el->name, ldb_dn_get_linearized(msg2->dn));
817                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
818                                 goto done;
819                         }
820
821                         /* make a copy of the array so that a permissive
822                          * control can remove duplicates without changing the
823                          * original values, but do not copy data as we do not
824                          * need to keep it around once the operation is
825                          * finished */
826                         if (control_permissive) {
827                                 el = talloc(msg2, struct ldb_message_element);
828                                 if (!el) {
829                                         ret = LDB_ERR_OTHER;
830                                         goto done;
831                                 }
832                                 *el = msg->elements[i];
833                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
834                                 if (el->values == NULL) {
835                                         ret = LDB_ERR_OTHER;
836                                         goto done;
837                                 }
838                                 for (j = 0; j < el->num_values; j++) {
839                                         el->values[j] = msg->elements[i].values[j];
840                                 }
841                         }
842
843                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
844                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
845                                                        el->name, ldb_dn_get_linearized(msg2->dn));
846                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
847                                 goto done;
848                         }
849
850                         /* Checks if element already exists */
851                         idx = find_element(msg2, el->name);
852                         if (idx == -1) {
853                                 if (ltdb_msg_add_element(msg2, el) != 0) {
854                                         ret = LDB_ERR_OTHER;
855                                         goto done;
856                                 }
857                                 ret = ltdb_index_add_element(module, ltdb,
858                                                              msg2->dn,
859                                                              el);
860                                 if (ret != LDB_SUCCESS) {
861                                         goto done;
862                                 }
863                         } else {
864                                 j = (unsigned int) idx;
865                                 el2 = &(msg2->elements[j]);
866
867                                 /* We cannot add another value on a existing one
868                                    if the attribute is single-valued */
869                                 if (ldb_tdb_single_valued(a, el)) {
870                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
871                                                                el->name, ldb_dn_get_linearized(msg2->dn));
872                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
873                                         goto done;
874                                 }
875
876                                 /* Check that values don't exist yet on multi-
877                                    valued attributes or aren't provided twice */
878                                 if (!(el->flags &
879                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
880                                         struct ldb_val *duplicate = NULL;
881                                         ret = ldb_msg_find_common_values(ldb,
882                                                                          msg2,
883                                                                          el,
884                                                                          el2,
885                                                                          options);
886
887                                         if (ret ==
888                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
889                                                 ldb_asprintf_errstring(ldb,
890                                                         "attribute '%s': value "
891                                                         "#%u on '%s' already "
892                                                         "exists", el->name, j,
893                                                         ldb_dn_get_linearized(msg2->dn));
894                                                 goto done;
895                                         } else if (ret != LDB_SUCCESS) {
896                                                 goto done;
897                                         }
898
899                                         ret = ldb_msg_find_duplicate_val(
900                                                 ldb, msg2, el, &duplicate, 0);
901                                         if (ret != LDB_SUCCESS) {
902                                                 goto done;
903                                         }
904                                         if (duplicate != NULL) {
905                                                 ldb_asprintf_errstring(
906                                                         ldb,
907                                                         "attribute '%s': value "
908                                                         "'%.*s' on '%s' "
909                                                         "provided more than "
910                                                         "once in ADD",
911                                                         el->name,
912                                                         (int)duplicate->length,
913                                                         duplicate->data,
914                                                         ldb_dn_get_linearized(msg->dn));
915                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
916                                                 goto done;
917                                         }
918                                 }
919
920                                 /* Now combine existing and new values to a new
921                                    attribute record */
922                                 vals = talloc_realloc(msg2->elements,
923                                                       el2->values, struct ldb_val,
924                                                       el2->num_values + el->num_values);
925                                 if (vals == NULL) {
926                                         ldb_oom(ldb);
927                                         ret = LDB_ERR_OTHER;
928                                         goto done;
929                                 }
930
931                                 for (j=0; j<el->num_values; j++) {
932                                         vals[el2->num_values + j] =
933                                                 ldb_val_dup(vals, &el->values[j]);
934                                 }
935
936                                 el2->values = vals;
937                                 el2->num_values += el->num_values;
938
939                                 ret = ltdb_index_add_element(module, ltdb,
940                                                              msg2->dn, el);
941                                 if (ret != LDB_SUCCESS) {
942                                         goto done;
943                                 }
944                         }
945
946                         break;
947
948                 case LDB_FLAG_MOD_REPLACE:
949
950                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
951                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
952                                                        el->name, ldb_dn_get_linearized(msg2->dn));
953                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
954                                 goto done;
955                         }
956
957                         /*
958                          * We don't need to check this if we have been
959                          * pre-screened by the repl_meta_data module
960                          * in Samba, or someone else who can claim to
961                          * know what they are doing. 
962                          */
963                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
964                                 struct ldb_val *duplicate = NULL;
965
966                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
967                                                                  &duplicate, 0);
968                                 if (ret != LDB_SUCCESS) {
969                                         goto done;
970                                 }
971                                 if (duplicate != NULL) {
972                                         ldb_asprintf_errstring(
973                                                 ldb,
974                                                 "attribute '%s': value '%.*s' "
975                                                 "on '%s' provided more than "
976                                                 "once in REPLACE",
977                                                 el->name,
978                                                 (int)duplicate->length,
979                                                 duplicate->data,
980                                                 ldb_dn_get_linearized(msg2->dn));
981                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
982                                         goto done;
983                                 }
984                         }
985
986                         /* Checks if element already exists */
987                         idx = find_element(msg2, el->name);
988                         if (idx != -1) {
989                                 j = (unsigned int) idx;
990                                 el2 = &(msg2->elements[j]);
991
992                                 /* we consider two elements to be
993                                  * equal only if the order
994                                  * matches. This allows dbcheck to
995                                  * fix the ordering on attributes
996                                  * where order matters, such as
997                                  * objectClass
998                                  */
999                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1000                                         continue;
1001                                 }
1002
1003                                 /* Delete the attribute if it exists in the DB */
1004                                 if (msg_delete_attribute(module, ltdb,
1005                                                          msg2,
1006                                                          el->name) != 0) {
1007                                         ret = LDB_ERR_OTHER;
1008                                         goto done;
1009                                 }
1010                         }
1011
1012                         /* Recreate it with the new values */
1013                         if (ltdb_msg_add_element(msg2, el) != 0) {
1014                                 ret = LDB_ERR_OTHER;
1015                                 goto done;
1016                         }
1017
1018                         ret = ltdb_index_add_element(module, ltdb,
1019                                                      msg2->dn, el);
1020                         if (ret != LDB_SUCCESS) {
1021                                 goto done;
1022                         }
1023
1024                         break;
1025
1026                 case LDB_FLAG_MOD_DELETE:
1027                         dn = ldb_dn_get_linearized(msg2->dn);
1028                         if (dn == NULL) {
1029                                 ret = LDB_ERR_OTHER;
1030                                 goto done;
1031                         }
1032
1033                         if (msg->elements[i].num_values == 0) {
1034                                 /* Delete the whole attribute */
1035                                 ret = msg_delete_attribute(module,
1036                                                            ltdb,
1037                                                            msg2,
1038                                                            msg->elements[i].name);
1039                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1040                                     control_permissive) {
1041                                         ret = LDB_SUCCESS;
1042                                 } else {
1043                                         ldb_asprintf_errstring(ldb,
1044                                                                "attribute '%s': no such attribute for delete on '%s'",
1045                                                                msg->elements[i].name, dn);
1046                                 }
1047                                 if (ret != LDB_SUCCESS) {
1048                                         goto done;
1049                                 }
1050                         } else {
1051                                 /* Delete specified values from an attribute */
1052                                 for (j=0; j < msg->elements[i].num_values; j++) {
1053                                         ret = msg_delete_element(module,
1054                                                                  ltdb,
1055                                                                  msg2,
1056                                                                  msg->elements[i].name,
1057                                                                  &msg->elements[i].values[j]);
1058                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1059                                             control_permissive) {
1060                                                 ret = LDB_SUCCESS;
1061                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1062                                                 ldb_asprintf_errstring(ldb,
1063                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1064                                                                        msg->elements[i].name, dn);
1065                                         }
1066                                         if (ret != LDB_SUCCESS) {
1067                                                 goto done;
1068                                         }
1069                                 }
1070                         }
1071                         break;
1072                 default:
1073                         ldb_asprintf_errstring(ldb,
1074                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1075                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1076                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1077                         ret = LDB_ERR_PROTOCOL_ERROR;
1078                         goto done;
1079                 }
1080         }
1081
1082         ret = ltdb_store(module, msg2, TDB_MODIFY);
1083         if (ret != LDB_SUCCESS) {
1084                 goto done;
1085         }
1086
1087         ret = ltdb_modified(module, msg2->dn);
1088         if (ret != LDB_SUCCESS) {
1089                 goto done;
1090         }
1091
1092 done:
1093         TALLOC_FREE(mem_ctx);
1094         return ret;
1095 }
1096
1097 /*
1098   modify a record
1099 */
1100 static int ltdb_modify(struct ltdb_context *ctx)
1101 {
1102         struct ldb_module *module = ctx->module;
1103         struct ldb_request *req = ctx->req;
1104         int ret = LDB_SUCCESS;
1105
1106         ret = ltdb_check_special_dn(module, req->op.mod.message);
1107         if (ret != LDB_SUCCESS) {
1108                 return ret;
1109         }
1110
1111         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1112
1113         if (ltdb_cache_load(module) != 0) {
1114                 return LDB_ERR_OPERATIONS_ERROR;
1115         }
1116
1117         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1118
1119         return ret;
1120 }
1121
1122 /*
1123   rename a record
1124 */
1125 static int ltdb_rename(struct ltdb_context *ctx)
1126 {
1127         struct ldb_module *module = ctx->module;
1128         void *data = ldb_module_get_private(module);
1129         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1130         struct ldb_request *req = ctx->req;
1131         struct ldb_message *msg;
1132         int ret = LDB_SUCCESS;
1133         TDB_DATA tdb_key, tdb_key_old;
1134
1135         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1136
1137         if (ltdb_cache_load(ctx->module) != 0) {
1138                 return LDB_ERR_OPERATIONS_ERROR;
1139         }
1140
1141         msg = ldb_msg_new(ctx);
1142         if (msg == NULL) {
1143                 return LDB_ERR_OPERATIONS_ERROR;
1144         }
1145
1146         /* we need to fetch the old record to re-add under the new name */
1147         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1148                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1149         if (ret != LDB_SUCCESS) {
1150                 /* not finding the old record is an error */
1151                 return ret;
1152         }
1153
1154         /* We need to, before changing the DB, check if the new DN
1155          * exists, so we can return this error to the caller with an
1156          * unmodified DB */
1157         tdb_key = ltdb_key_dn(module, req->op.rename.newdn);
1158         if (!tdb_key.dptr) {
1159                 talloc_free(msg);
1160                 return LDB_ERR_OPERATIONS_ERROR;
1161         }
1162
1163         tdb_key_old = ltdb_key_dn(module, req->op.rename.olddn);
1164         if (!tdb_key_old.dptr) {
1165                 talloc_free(msg);
1166                 talloc_free(tdb_key.dptr);
1167                 return LDB_ERR_OPERATIONS_ERROR;
1168         }
1169
1170         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1171         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1172                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1173                         talloc_free(tdb_key_old.dptr);
1174                         talloc_free(tdb_key.dptr);
1175                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1176                                                "Entry %s already exists",
1177                                                ldb_dn_get_linearized(req->op.rename.newdn));
1178                         /* finding the new record already in the DB is an error */
1179                         talloc_free(msg);
1180                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1181                 }
1182         }
1183         talloc_free(tdb_key_old.dptr);
1184         talloc_free(tdb_key.dptr);
1185
1186         /* Always delete first then add, to avoid conflicts with
1187          * unique indexes. We rely on the transaction to make this
1188          * atomic
1189          */
1190         ret = ltdb_delete_internal(module, msg->dn);
1191         if (ret != LDB_SUCCESS) {
1192                 talloc_free(msg);
1193                 return ret;
1194         }
1195
1196         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1197         if (msg->dn == NULL) {
1198                 talloc_free(msg);
1199                 return LDB_ERR_OPERATIONS_ERROR;
1200         }
1201
1202         /* We don't check single value as we can have more than 1 with
1203          * deleted attributes. We could go through all elements but that's
1204          * maybe not the most efficient way
1205          */
1206         ret = ltdb_add_internal(module, ltdb, msg, false);
1207
1208         talloc_free(msg);
1209
1210         return ret;
1211 }
1212
1213 static int ltdb_start_trans(struct ldb_module *module)
1214 {
1215         void *data = ldb_module_get_private(module);
1216         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1217
1218         /* Do not take out the transaction lock on a read-only DB */
1219         if (ltdb->read_only) {
1220                 return LDB_ERR_UNWILLING_TO_PERFORM;
1221         }
1222
1223         if (tdb_transaction_start(ltdb->tdb) != 0) {
1224                 return ltdb_err_map(tdb_error(ltdb->tdb));
1225         }
1226
1227         ltdb->in_transaction++;
1228
1229         ltdb_index_transaction_start(module);
1230
1231         return LDB_SUCCESS;
1232 }
1233
1234 static int ltdb_prepare_commit(struct ldb_module *module)
1235 {
1236         int ret;
1237         void *data = ldb_module_get_private(module);
1238         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1239
1240         if (ltdb->in_transaction != 1) {
1241                 return LDB_SUCCESS;
1242         }
1243
1244         ret = ltdb_index_transaction_commit(module);
1245         if (ret != LDB_SUCCESS) {
1246                 tdb_transaction_cancel(ltdb->tdb);
1247                 ltdb->in_transaction--;
1248                 return ret;
1249         }
1250
1251         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1252                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1253                 ltdb->in_transaction--;
1254                 ldb_debug_set(ldb_module_get_ctx(module),
1255                               LDB_DEBUG_FATAL,
1256                               "Failure during "
1257                               "tdb_transaction_prepare_commit(): %s -> %s",
1258                               tdb_errorstr(ltdb->tdb),
1259                               ldb_strerror(ret));
1260                 return ret;
1261         }
1262
1263         ltdb->prepared_commit = true;
1264
1265         return LDB_SUCCESS;
1266 }
1267
1268 static int ltdb_end_trans(struct ldb_module *module)
1269 {
1270         int ret;
1271         void *data = ldb_module_get_private(module);
1272         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1273
1274         if (!ltdb->prepared_commit) {
1275                 ret = ltdb_prepare_commit(module);
1276                 if (ret != LDB_SUCCESS) {
1277                         return ret;
1278                 }
1279         }
1280
1281         ltdb->in_transaction--;
1282         ltdb->prepared_commit = false;
1283
1284         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1285                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1286                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1287                                        "Failure during tdb_transaction_commit(): %s -> %s",
1288                                        tdb_errorstr(ltdb->tdb),
1289                                        ldb_strerror(ret));
1290                 return ret;
1291         }
1292
1293         return LDB_SUCCESS;
1294 }
1295
1296 static int ltdb_del_trans(struct ldb_module *module)
1297 {
1298         void *data = ldb_module_get_private(module);
1299         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1300
1301         ltdb->in_transaction--;
1302
1303         if (ltdb_index_transaction_cancel(module) != 0) {
1304                 tdb_transaction_cancel(ltdb->tdb);
1305                 return ltdb_err_map(tdb_error(ltdb->tdb));
1306         }
1307
1308         tdb_transaction_cancel(ltdb->tdb);
1309         return LDB_SUCCESS;
1310 }
1311
1312 /*
1313   return sequenceNumber from @BASEINFO
1314 */
1315 static int ltdb_sequence_number(struct ltdb_context *ctx,
1316                                 struct ldb_extended **ext)
1317 {
1318         struct ldb_context *ldb;
1319         struct ldb_module *module = ctx->module;
1320         struct ldb_request *req = ctx->req;
1321         TALLOC_CTX *tmp_ctx = NULL;
1322         struct ldb_seqnum_request *seq;
1323         struct ldb_seqnum_result *res;
1324         struct ldb_message *msg = NULL;
1325         struct ldb_dn *dn;
1326         const char *date;
1327         int ret = LDB_SUCCESS;
1328
1329         ldb = ldb_module_get_ctx(module);
1330
1331         seq = talloc_get_type(req->op.extended.data,
1332                                 struct ldb_seqnum_request);
1333         if (seq == NULL) {
1334                 return LDB_ERR_OPERATIONS_ERROR;
1335         }
1336
1337         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1338
1339         if (ltdb_lock_read(module) != 0) {
1340                 return LDB_ERR_OPERATIONS_ERROR;
1341         }
1342
1343         res = talloc_zero(req, struct ldb_seqnum_result);
1344         if (res == NULL) {
1345                 ret = LDB_ERR_OPERATIONS_ERROR;
1346                 goto done;
1347         }
1348
1349         tmp_ctx = talloc_new(req);
1350         if (tmp_ctx == NULL) {
1351                 ret = LDB_ERR_OPERATIONS_ERROR;
1352                 goto done;
1353         }
1354
1355         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1356         if (dn == NULL) {
1357                 ret = LDB_ERR_OPERATIONS_ERROR;
1358                 goto done;
1359         }
1360
1361         msg = ldb_msg_new(tmp_ctx);
1362         if (msg == NULL) {
1363                 ret = LDB_ERR_OPERATIONS_ERROR;
1364                 goto done;
1365         }
1366
1367         ret = ltdb_search_dn1(module, dn, msg, 0);
1368         if (ret != LDB_SUCCESS) {
1369                 goto done;
1370         }
1371
1372         switch (seq->type) {
1373         case LDB_SEQ_HIGHEST_SEQ:
1374                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1375                 break;
1376         case LDB_SEQ_NEXT:
1377                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1378                 res->seq_num++;
1379                 break;
1380         case LDB_SEQ_HIGHEST_TIMESTAMP:
1381                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1382                 if (date) {
1383                         res->seq_num = ldb_string_to_time(date);
1384                 } else {
1385                         res->seq_num = 0;
1386                         /* zero is as good as anything when we don't know */
1387                 }
1388                 break;
1389         }
1390
1391         *ext = talloc_zero(req, struct ldb_extended);
1392         if (*ext == NULL) {
1393                 ret = LDB_ERR_OPERATIONS_ERROR;
1394                 goto done;
1395         }
1396         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1397         (*ext)->data = talloc_steal(*ext, res);
1398
1399 done:
1400         talloc_free(tmp_ctx);
1401         ltdb_unlock_read(module);
1402         return ret;
1403 }
1404
1405 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1406 {
1407         struct ldb_context *ldb;
1408         struct ldb_request *req;
1409         struct ldb_reply *ares;
1410
1411         ldb = ldb_module_get_ctx(ctx->module);
1412         req = ctx->req;
1413
1414         /* if we already returned an error just return */
1415         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1416                 return;
1417         }
1418
1419         ares = talloc_zero(req, struct ldb_reply);
1420         if (!ares) {
1421                 ldb_oom(ldb);
1422                 req->callback(req, NULL);
1423                 return;
1424         }
1425         ares->type = LDB_REPLY_DONE;
1426         ares->error = error;
1427
1428         req->callback(req, ares);
1429 }
1430
1431 static void ltdb_timeout(struct tevent_context *ev,
1432                           struct tevent_timer *te,
1433                           struct timeval t,
1434                           void *private_data)
1435 {
1436         struct ltdb_context *ctx;
1437         ctx = talloc_get_type(private_data, struct ltdb_context);
1438
1439         if (!ctx->request_terminated) {
1440                 /* request is done now */
1441                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1442         }
1443
1444         if (ctx->spy) {
1445                 /* neutralize the spy */
1446                 ctx->spy->ctx = NULL;
1447                 ctx->spy = NULL;
1448         }
1449         talloc_free(ctx);
1450 }
1451
1452 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1453                                         struct ldb_extended *ext,
1454                                         int error)
1455 {
1456         struct ldb_context *ldb;
1457         struct ldb_request *req;
1458         struct ldb_reply *ares;
1459
1460         ldb = ldb_module_get_ctx(ctx->module);
1461         req = ctx->req;
1462
1463         /* if we already returned an error just return */
1464         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1465                 return;
1466         }
1467
1468         ares = talloc_zero(req, struct ldb_reply);
1469         if (!ares) {
1470                 ldb_oom(ldb);
1471                 req->callback(req, NULL);
1472                 return;
1473         }
1474         ares->type = LDB_REPLY_DONE;
1475         ares->response = ext;
1476         ares->error = error;
1477
1478         req->callback(req, ares);
1479 }
1480
1481 static void ltdb_handle_extended(struct ltdb_context *ctx)
1482 {
1483         struct ldb_extended *ext = NULL;
1484         int ret;
1485
1486         if (strcmp(ctx->req->op.extended.oid,
1487                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1488                 /* get sequence number */
1489                 ret = ltdb_sequence_number(ctx, &ext);
1490         } else {
1491                 /* not recognized */
1492                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1493         }
1494
1495         ltdb_request_extended_done(ctx, ext, ret);
1496 }
1497
1498 static void ltdb_callback(struct tevent_context *ev,
1499                           struct tevent_timer *te,
1500                           struct timeval t,
1501                           void *private_data)
1502 {
1503         struct ltdb_context *ctx;
1504         int ret;
1505
1506         ctx = talloc_get_type(private_data, struct ltdb_context);
1507
1508         if (ctx->request_terminated) {
1509                 goto done;
1510         }
1511
1512         switch (ctx->req->operation) {
1513         case LDB_SEARCH:
1514                 ret = ltdb_search(ctx);
1515                 break;
1516         case LDB_ADD:
1517                 ret = ltdb_add(ctx);
1518                 break;
1519         case LDB_MODIFY:
1520                 ret = ltdb_modify(ctx);
1521                 break;
1522         case LDB_DELETE:
1523                 ret = ltdb_delete(ctx);
1524                 break;
1525         case LDB_RENAME:
1526                 ret = ltdb_rename(ctx);
1527                 break;
1528         case LDB_EXTENDED:
1529                 ltdb_handle_extended(ctx);
1530                 goto done;
1531         default:
1532                 /* no other op supported */
1533                 ret = LDB_ERR_PROTOCOL_ERROR;
1534         }
1535
1536         if (!ctx->request_terminated) {
1537                 /* request is done now */
1538                 ltdb_request_done(ctx, ret);
1539         }
1540
1541 done:
1542         if (ctx->spy) {
1543                 /* neutralize the spy */
1544                 ctx->spy->ctx = NULL;
1545                 ctx->spy = NULL;
1546         }
1547         talloc_free(ctx);
1548 }
1549
1550 static int ltdb_request_destructor(void *ptr)
1551 {
1552         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1553
1554         if (spy->ctx != NULL) {
1555                 spy->ctx->spy = NULL;
1556                 spy->ctx->request_terminated = true;
1557                 spy->ctx = NULL;
1558         }
1559
1560         return 0;
1561 }
1562
1563 static int ltdb_handle_request(struct ldb_module *module,
1564                                struct ldb_request *req)
1565 {
1566         struct ldb_control *control_permissive;
1567         struct ldb_context *ldb;
1568         struct tevent_context *ev;
1569         struct ltdb_context *ac;
1570         struct tevent_timer *te;
1571         struct timeval tv;
1572         unsigned int i;
1573
1574         ldb = ldb_module_get_ctx(module);
1575
1576         control_permissive = ldb_request_get_control(req,
1577                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1578
1579         for (i = 0; req->controls && req->controls[i]; i++) {
1580                 if (req->controls[i]->critical &&
1581                     req->controls[i] != control_permissive) {
1582                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1583                                                req->controls[i]->oid);
1584                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1585                 }
1586         }
1587
1588         if (req->starttime == 0 || req->timeout == 0) {
1589                 ldb_set_errstring(ldb, "Invalid timeout settings");
1590                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1591         }
1592
1593         ev = ldb_handle_get_event_context(req->handle);
1594
1595         ac = talloc_zero(ldb, struct ltdb_context);
1596         if (ac == NULL) {
1597                 ldb_oom(ldb);
1598                 return LDB_ERR_OPERATIONS_ERROR;
1599         }
1600
1601         ac->module = module;
1602         ac->req = req;
1603
1604         tv.tv_sec = 0;
1605         tv.tv_usec = 0;
1606         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1607         if (NULL == te) {
1608                 talloc_free(ac);
1609                 return LDB_ERR_OPERATIONS_ERROR;
1610         }
1611
1612         if (req->timeout > 0) {
1613                 tv.tv_sec = req->starttime + req->timeout;
1614                 tv.tv_usec = 0;
1615                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1616                                                      ltdb_timeout, ac);
1617                 if (NULL == ac->timeout_event) {
1618                         talloc_free(ac);
1619                         return LDB_ERR_OPERATIONS_ERROR;
1620                 }
1621         }
1622
1623         /* set a spy so that we do not try to use the request context
1624          * if it is freed before ltdb_callback fires */
1625         ac->spy = talloc(req, struct ltdb_req_spy);
1626         if (NULL == ac->spy) {
1627                 talloc_free(ac);
1628                 return LDB_ERR_OPERATIONS_ERROR;
1629         }
1630         ac->spy->ctx = ac;
1631
1632         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1633
1634         return LDB_SUCCESS;
1635 }
1636
1637 static int ltdb_init_rootdse(struct ldb_module *module)
1638 {
1639         /* ignore errors on this - we expect it for non-sam databases */
1640         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1641
1642         /* there can be no module beyond the backend, just return */
1643         return LDB_SUCCESS;
1644 }
1645
1646 static const struct ldb_module_ops ltdb_ops = {
1647         .name              = "tdb",
1648         .init_context      = ltdb_init_rootdse,
1649         .search            = ltdb_handle_request,
1650         .add               = ltdb_handle_request,
1651         .modify            = ltdb_handle_request,
1652         .del               = ltdb_handle_request,
1653         .rename            = ltdb_handle_request,
1654         .extended          = ltdb_handle_request,
1655         .start_transaction = ltdb_start_trans,
1656         .end_transaction   = ltdb_end_trans,
1657         .prepare_commit    = ltdb_prepare_commit,
1658         .del_transaction   = ltdb_del_trans,
1659         .read_lock         = ltdb_lock_read,
1660         .read_unlock       = ltdb_unlock_read,
1661 };
1662
1663 /*
1664   connect to the database
1665 */
1666 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1667                         unsigned int flags, const char *options[],
1668                         struct ldb_module **_module)
1669 {
1670         struct ldb_module *module;
1671         const char *path;
1672         int tdb_flags, open_flags;
1673         struct ltdb_private *ltdb;
1674
1675         /*
1676          * We hold locks, so we must use a private event context
1677          * on each returned handle
1678          */
1679
1680         ldb_set_require_private_event_context(ldb);
1681
1682         /* parse the url */
1683         if (strchr(url, ':')) {
1684                 if (strncmp(url, "tdb://", 6) != 0) {
1685                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1686                                   "Invalid tdb URL '%s'", url);
1687                         return LDB_ERR_OPERATIONS_ERROR;
1688                 }
1689                 path = url+6;
1690         } else {
1691                 path = url;
1692         }
1693
1694         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1695
1696         /* check for the 'nosync' option */
1697         if (flags & LDB_FLG_NOSYNC) {
1698                 tdb_flags |= TDB_NOSYNC;
1699         }
1700
1701         /* and nommap option */
1702         if (flags & LDB_FLG_NOMMAP) {
1703                 tdb_flags |= TDB_NOMMAP;
1704         }
1705
1706         ltdb = talloc_zero(ldb, struct ltdb_private);
1707         if (!ltdb) {
1708                 ldb_oom(ldb);
1709                 return LDB_ERR_OPERATIONS_ERROR;
1710         }
1711
1712         if (flags & LDB_FLG_RDONLY) {
1713                 /*
1714                  * This is weird, but because we can only have one tdb
1715                  * in this process, and the other one could be
1716                  * read-write, we can't use the tdb readonly.  Plus a
1717                  * read only tdb prohibits the all-record lock.
1718                  */
1719                 open_flags = O_RDWR;
1720
1721                 ltdb->read_only = true;
1722
1723         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1724                 /*
1725                  * This is used by ldbsearch to prevent creation of the database
1726                  * if the name is wrong
1727                  */
1728                 open_flags = O_RDWR;
1729         } else {
1730                 /*
1731                  * This is the normal case
1732                  */
1733                 open_flags = O_CREAT | O_RDWR;
1734         }
1735
1736         /* note that we use quite a large default hash size */
1737         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1738                                    tdb_flags, open_flags,
1739                                    ldb_get_create_perms(ldb), ldb);
1740         if (!ltdb->tdb) {
1741                 ldb_asprintf_errstring(ldb,
1742                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1743                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1744                           "Unable to open tdb '%s': %s", path, strerror(errno));
1745                 talloc_free(ltdb);
1746                 if (errno == EACCES || errno == EPERM) {
1747                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1748                 }
1749                 return LDB_ERR_OPERATIONS_ERROR;
1750         }
1751
1752         if (getenv("LDB_WARN_UNINDEXED")) {
1753                 ltdb->warn_unindexed = true;
1754         }
1755
1756         if (getenv("LDB_WARN_REINDEX")) {
1757                 ltdb->warn_reindex = true;
1758         }
1759
1760         ltdb->sequence_number = 0;
1761
1762         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1763         if (!module) {
1764                 ldb_oom(ldb);
1765                 talloc_free(ltdb);
1766                 return LDB_ERR_OPERATIONS_ERROR;
1767         }
1768         ldb_module_set_private(module, ltdb);
1769         talloc_steal(module, ltdb);
1770
1771         if (ltdb_cache_load(module) != 0) {
1772                 ldb_asprintf_errstring(ldb,
1773                                        "Unable to load ltdb cache records of tdb '%s'", path);
1774                 talloc_free(module);
1775                 return LDB_ERR_OPERATIONS_ERROR;
1776         }
1777
1778         *_module = module;
1779         return LDB_SUCCESS;
1780 }
1781
1782 int ldb_tdb_init(const char *version)
1783 {
1784         LDB_MODULE_CHECK_VERSION(version);
1785         return ldb_register_backend("tdb", ltdb_connect, false);
1786 }