ldb_tdb: Replace exists, name and error_map with key value ops
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         return ret;
414 }
415
416 static int ltdb_tdb_store(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags)
417 {
418         return tdb_store(ltdb->tdb, key, data, flags);
419 }
420
421 static int ltdb_error(struct ltdb_private *ltdb)
422 {
423         return ltdb_err_map(tdb_error(ltdb->tdb));
424 }
425
426 /*
427   store a record into the db
428 */
429 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
430 {
431         void *data = ldb_module_get_private(module);
432         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
433         TDB_DATA tdb_key, tdb_data;
434         struct ldb_val ldb_data;
435         int ret = LDB_SUCCESS;
436         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
437
438         if (tdb_key_ctx == NULL) {
439                 return ldb_module_oom(module);
440         }
441
442         if (ltdb->read_only) {
443                 return LDB_ERR_UNWILLING_TO_PERFORM;
444         }
445
446         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
447         if (tdb_key.dptr == NULL) {
448                 TALLOC_FREE(tdb_key_ctx);
449                 return LDB_ERR_OTHER;
450         }
451
452         ret = ldb_pack_data(ldb_module_get_ctx(module),
453                             msg, &ldb_data);
454         if (ret == -1) {
455                 TALLOC_FREE(tdb_key_ctx);
456                 return LDB_ERR_OTHER;
457         }
458
459         tdb_data.dptr = ldb_data.data;
460         tdb_data.dsize = ldb_data.length;
461
462         ret = ltdb->kv_ops->store(ltdb, tdb_key, tdb_data, flgs);
463         if (ret != 0) {
464                 bool is_special = ldb_dn_is_special(msg->dn);
465                 ret = ltdb->kv_ops->error(ltdb);
466
467                 /*
468                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
469                  * the GUID, so re-map
470                  */
471                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
472                     && !is_special
473                     && ltdb->cache->GUID_index_attribute != NULL) {
474                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
475                 }
476                 goto done;
477         }
478
479 done:
480         TALLOC_FREE(tdb_key_ctx);
481         talloc_free(ldb_data.data);
482
483         return ret;
484 }
485
486
487 /*
488   check if a attribute is a single valued, for a given element
489  */
490 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
491                                   struct ldb_message_element *el)
492 {
493         if (!a) return false;
494         if (el != NULL) {
495                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
496                         /* override from a ldb module, for example
497                            used for the description field, which is
498                            marked multi-valued in the schema but which
499                            should not actually accept multiple
500                            values */
501                         return true;
502                 }
503                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
504                         /* override from a ldb module, for example used for
505                            deleted linked attribute entries */
506                         return false;
507                 }
508         }
509         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
510                 return true;
511         }
512         return false;
513 }
514
515 static int ltdb_add_internal(struct ldb_module *module,
516                              struct ltdb_private *ltdb,
517                              const struct ldb_message *msg,
518                              bool check_single_value)
519 {
520         struct ldb_context *ldb = ldb_module_get_ctx(module);
521         int ret = LDB_SUCCESS;
522         unsigned int i;
523
524         for (i=0;i<msg->num_elements;i++) {
525                 struct ldb_message_element *el = &msg->elements[i];
526                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
527
528                 if (el->num_values == 0) {
529                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
530                                                el->name, ldb_dn_get_linearized(msg->dn));
531                         return LDB_ERR_CONSTRAINT_VIOLATION;
532                 }
533                 if (check_single_value &&
534                                 el->num_values > 1 &&
535                                 ldb_tdb_single_valued(a, el)) {
536                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
537                                                el->name, ldb_dn_get_linearized(msg->dn));
538                         return LDB_ERR_CONSTRAINT_VIOLATION;
539                 }
540
541                 /* Do not check "@ATTRIBUTES" for duplicated values */
542                 if (ldb_dn_is_special(msg->dn) &&
543                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
544                         continue;
545                 }
546
547                 if (check_single_value &&
548                     !(el->flags &
549                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
550                         struct ldb_val *duplicate = NULL;
551
552                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
553                                                          el, &duplicate, 0);
554                         if (ret != LDB_SUCCESS) {
555                                 return ret;
556                         }
557                         if (duplicate != NULL) {
558                                 ldb_asprintf_errstring(
559                                         ldb,
560                                         "attribute '%s': value '%.*s' on '%s' "
561                                         "provided more than once in ADD object",
562                                         el->name,
563                                         (int)duplicate->length,
564                                         duplicate->data,
565                                         ldb_dn_get_linearized(msg->dn));
566                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
567                         }
568                 }
569         }
570
571         ret = ltdb_store(module, msg, TDB_INSERT);
572         if (ret != LDB_SUCCESS) {
573                 /*
574                  * Try really hard to get the right error code for
575                  * a re-add situation, as this can matter!
576                  */
577                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
578                         int ret2;
579                         struct ldb_dn *dn2 = NULL;
580                         TALLOC_CTX *mem_ctx = talloc_new(module);
581                         if (mem_ctx == NULL) {
582                                 return ldb_module_operr(module);
583                         }
584                         ret2 = ltdb_search_base(module, module,
585                                                 msg->dn, &dn2);
586                         TALLOC_FREE(mem_ctx);
587                         if (ret2 == LDB_SUCCESS) {
588                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
589                         }
590                 }
591                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
592                         ldb_asprintf_errstring(ldb,
593                                                "Entry %s already exists",
594                                                ldb_dn_get_linearized(msg->dn));
595                 }
596                 return ret;
597         }
598
599         ret = ltdb_index_add_new(module, ltdb, msg);
600         if (ret != LDB_SUCCESS) {
601                 /*
602                  * If we failed to index, delete the message again.
603                  *
604                  * This is particularly important for the GUID index
605                  * case, which will only fail for a duplicate DN
606                  * in the index add.
607                  *
608                  * Note that the caller may not cancel the transation
609                  * and this means the above add might really show up!
610                  */
611                 ltdb_delete_noindex(module, msg);
612                 return ret;
613         }
614
615         ret = ltdb_modified(module, msg->dn);
616
617         return ret;
618 }
619
620 /*
621   add a record to the database
622 */
623 static int ltdb_add(struct ltdb_context *ctx)
624 {
625         struct ldb_module *module = ctx->module;
626         struct ldb_request *req = ctx->req;
627         void *data = ldb_module_get_private(module);
628         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
629         int ret = LDB_SUCCESS;
630
631         ret = ltdb_check_special_dn(module, req->op.add.message);
632         if (ret != LDB_SUCCESS) {
633                 return ret;
634         }
635
636         ldb_request_set_state(req, LDB_ASYNC_PENDING);
637
638         if (ltdb_cache_load(module) != 0) {
639                 return LDB_ERR_OPERATIONS_ERROR;
640         }
641
642         ret = ltdb_add_internal(module, ltdb,
643                                 req->op.add.message, true);
644
645         return ret;
646 }
647
648 static int ltdb_tdb_delete(struct ltdb_private *ltdb, TDB_DATA tdb_key)
649 {
650         return tdb_delete(ltdb->tdb, tdb_key);
651 }
652
653 /*
654   delete a record from the database, not updating indexes (used for deleting
655   index records)
656 */
657 int ltdb_delete_noindex(struct ldb_module *module,
658                         const struct ldb_message *msg)
659 {
660         void *data = ldb_module_get_private(module);
661         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
662         TDB_DATA tdb_key;
663         int ret;
664         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
665
666         if (tdb_key_ctx == NULL) {
667                 return ldb_module_oom(module);
668         }
669
670         if (ltdb->read_only) {
671                 return LDB_ERR_UNWILLING_TO_PERFORM;
672         }
673
674         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
675         if (!tdb_key.dptr) {
676                 TALLOC_FREE(tdb_key_ctx);
677                 return LDB_ERR_OTHER;
678         }
679
680         ret = ltdb->kv_ops->delete(ltdb, tdb_key);
681         TALLOC_FREE(tdb_key_ctx);
682
683         if (ret != 0) {
684                 ret = ltdb->kv_ops->error(ltdb);
685         }
686
687         return ret;
688 }
689
690 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
691 {
692         struct ldb_message *msg;
693         int ret = LDB_SUCCESS;
694
695         msg = ldb_msg_new(module);
696         if (msg == NULL) {
697                 return LDB_ERR_OPERATIONS_ERROR;
698         }
699
700         /* in case any attribute of the message was indexed, we need
701            to fetch the old record */
702         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
703         if (ret != LDB_SUCCESS) {
704                 /* not finding the old record is an error */
705                 goto done;
706         }
707
708         ret = ltdb_delete_noindex(module, msg);
709         if (ret != LDB_SUCCESS) {
710                 goto done;
711         }
712
713         /* remove any indexed attributes */
714         ret = ltdb_index_delete(module, msg);
715         if (ret != LDB_SUCCESS) {
716                 goto done;
717         }
718
719         ret = ltdb_modified(module, dn);
720         if (ret != LDB_SUCCESS) {
721                 goto done;
722         }
723
724 done:
725         talloc_free(msg);
726         return ret;
727 }
728
729 /*
730   delete a record from the database
731 */
732 static int ltdb_delete(struct ltdb_context *ctx)
733 {
734         struct ldb_module *module = ctx->module;
735         struct ldb_request *req = ctx->req;
736         int ret = LDB_SUCCESS;
737
738         ldb_request_set_state(req, LDB_ASYNC_PENDING);
739
740         if (ltdb_cache_load(module) != 0) {
741                 return LDB_ERR_OPERATIONS_ERROR;
742         }
743
744         ret = ltdb_delete_internal(module, req->op.del.dn);
745
746         return ret;
747 }
748
749 /*
750   find an element by attribute name. At the moment this does a linear search,
751   it should be re-coded to use a binary search once all places that modify
752   records guarantee sorted order
753
754   return the index of the first matching element if found, otherwise -1
755 */
756 static int find_element(const struct ldb_message *msg, const char *name)
757 {
758         unsigned int i;
759         for (i=0;i<msg->num_elements;i++) {
760                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
761                         return i;
762                 }
763         }
764         return -1;
765 }
766
767
768 /*
769   add an element to an existing record. Assumes a elements array that we
770   can call re-alloc on, and assumed that we can re-use the data pointers from
771   the passed in additional values. Use with care!
772
773   returns 0 on success, -1 on failure (and sets errno)
774 */
775 static int ltdb_msg_add_element(struct ldb_message *msg,
776                                 struct ldb_message_element *el)
777 {
778         struct ldb_message_element *e2;
779         unsigned int i;
780
781         if (el->num_values == 0) {
782                 /* nothing to do here - we don't add empty elements */
783                 return 0;
784         }
785
786         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
787                               msg->num_elements+1);
788         if (!e2) {
789                 errno = ENOMEM;
790                 return -1;
791         }
792
793         msg->elements = e2;
794
795         e2 = &msg->elements[msg->num_elements];
796
797         e2->name = el->name;
798         e2->flags = el->flags;
799         e2->values = talloc_array(msg->elements,
800                                   struct ldb_val, el->num_values);
801         if (!e2->values) {
802                 errno = ENOMEM;
803                 return -1;
804         }
805         for (i=0;i<el->num_values;i++) {
806                 e2->values[i] = el->values[i];
807         }
808         e2->num_values = el->num_values;
809
810         ++msg->num_elements;
811
812         return 0;
813 }
814
815 /*
816   delete all elements having a specified attribute name
817 */
818 static int msg_delete_attribute(struct ldb_module *module,
819                                 struct ltdb_private *ltdb,
820                                 struct ldb_message *msg, const char *name)
821 {
822         unsigned int i;
823         int ret;
824         struct ldb_message_element *el;
825         bool is_special = ldb_dn_is_special(msg->dn);
826
827         if (!is_special
828             && ltdb->cache->GUID_index_attribute != NULL
829             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
830                 struct ldb_context *ldb = ldb_module_get_ctx(module);
831                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
832                                        "attribute %s (used as DB index)",
833                                        ltdb->cache->GUID_index_attribute);
834                 return LDB_ERR_CONSTRAINT_VIOLATION;
835         }
836
837         el = ldb_msg_find_element(msg, name);
838         if (el == NULL) {
839                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
840         }
841         i = el - msg->elements;
842
843         ret = ltdb_index_del_element(module, ltdb, msg, el);
844         if (ret != LDB_SUCCESS) {
845                 return ret;
846         }
847
848         talloc_free(el->values);
849         if (msg->num_elements > (i+1)) {
850                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
851         }
852         msg->num_elements--;
853         msg->elements = talloc_realloc(msg, msg->elements,
854                                        struct ldb_message_element,
855                                        msg->num_elements);
856         return LDB_SUCCESS;
857 }
858
859 /*
860   delete all elements matching an attribute name/value
861
862   return LDB Error on failure
863 */
864 static int msg_delete_element(struct ldb_module *module,
865                               struct ltdb_private *ltdb,
866                               struct ldb_message *msg,
867                               const char *name,
868                               const struct ldb_val *val)
869 {
870         struct ldb_context *ldb = ldb_module_get_ctx(module);
871         unsigned int i;
872         int found, ret;
873         struct ldb_message_element *el;
874         const struct ldb_schema_attribute *a;
875
876         found = find_element(msg, name);
877         if (found == -1) {
878                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
879         }
880
881         i = (unsigned int) found;
882         el = &(msg->elements[i]);
883
884         a = ldb_schema_attribute_by_name(ldb, el->name);
885
886         for (i=0;i<el->num_values;i++) {
887                 bool matched;
888                 if (a->syntax->operator_fn) {
889                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
890                                                      &el->values[i], val, &matched);
891                         if (ret != LDB_SUCCESS) return ret;
892                 } else {
893                         matched = (a->syntax->comparison_fn(ldb, ldb,
894                                                             &el->values[i], val) == 0);
895                 }
896                 if (matched) {
897                         if (el->num_values == 1) {
898                                 return msg_delete_attribute(module,
899                                                             ltdb, msg, name);
900                         }
901
902                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
903                         if (ret != LDB_SUCCESS) {
904                                 return ret;
905                         }
906
907                         if (i<el->num_values-1) {
908                                 memmove(&el->values[i], &el->values[i+1],
909                                         sizeof(el->values[i])*
910                                                 (el->num_values-(i+1)));
911                         }
912                         el->num_values--;
913
914                         /* per definition we find in a canonicalised message an
915                            attribute value only once. So we are finished here */
916                         return LDB_SUCCESS;
917                 }
918         }
919
920         /* Not found */
921         return LDB_ERR_NO_SUCH_ATTRIBUTE;
922 }
923
924
925 /*
926   modify a record - internal interface
927
928   yuck - this is O(n^2). Luckily n is usually small so we probably
929   get away with it, but if we ever have really large attribute lists
930   then we'll need to look at this again
931
932   'req' is optional, and is used to specify controls if supplied
933 */
934 int ltdb_modify_internal(struct ldb_module *module,
935                          const struct ldb_message *msg,
936                          struct ldb_request *req)
937 {
938         struct ldb_context *ldb = ldb_module_get_ctx(module);
939         void *data = ldb_module_get_private(module);
940         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
941         struct ldb_message *msg2;
942         unsigned int i, j;
943         int ret = LDB_SUCCESS, idx;
944         struct ldb_control *control_permissive = NULL;
945         TALLOC_CTX *mem_ctx = talloc_new(req);
946
947         if (mem_ctx == NULL) {
948                 return ldb_module_oom(module);
949         }
950         
951         if (req) {
952                 control_permissive = ldb_request_get_control(req,
953                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
954         }
955
956         msg2 = ldb_msg_new(mem_ctx);
957         if (msg2 == NULL) {
958                 ret = LDB_ERR_OTHER;
959                 goto done;
960         }
961
962         ret = ltdb_search_dn1(module, msg->dn,
963                               msg2,
964                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
965         if (ret != LDB_SUCCESS) {
966                 goto done;
967         }
968
969         for (i=0; i<msg->num_elements; i++) {
970                 struct ldb_message_element *el = &msg->elements[i], *el2;
971                 struct ldb_val *vals;
972                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
973                 const char *dn;
974                 uint32_t options = 0;
975                 if (control_permissive != NULL) {
976                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
977                 }
978
979                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
980                 case LDB_FLAG_MOD_ADD:
981
982                         if (el->num_values == 0) {
983                                 ldb_asprintf_errstring(ldb,
984                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
985                                                        el->name, ldb_dn_get_linearized(msg2->dn));
986                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
987                                 goto done;
988                         }
989
990                         /* make a copy of the array so that a permissive
991                          * control can remove duplicates without changing the
992                          * original values, but do not copy data as we do not
993                          * need to keep it around once the operation is
994                          * finished */
995                         if (control_permissive) {
996                                 el = talloc(msg2, struct ldb_message_element);
997                                 if (!el) {
998                                         ret = LDB_ERR_OTHER;
999                                         goto done;
1000                                 }
1001                                 *el = msg->elements[i];
1002                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1003                                 if (el->values == NULL) {
1004                                         ret = LDB_ERR_OTHER;
1005                                         goto done;
1006                                 }
1007                                 for (j = 0; j < el->num_values; j++) {
1008                                         el->values[j] = msg->elements[i].values[j];
1009                                 }
1010                         }
1011
1012                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1013                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1014                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1015                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1016                                 goto done;
1017                         }
1018
1019                         /* Checks if element already exists */
1020                         idx = find_element(msg2, el->name);
1021                         if (idx == -1) {
1022                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1023                                         ret = LDB_ERR_OTHER;
1024                                         goto done;
1025                                 }
1026                                 ret = ltdb_index_add_element(module, ltdb,
1027                                                              msg2,
1028                                                              el);
1029                                 if (ret != LDB_SUCCESS) {
1030                                         goto done;
1031                                 }
1032                         } else {
1033                                 j = (unsigned int) idx;
1034                                 el2 = &(msg2->elements[j]);
1035
1036                                 /* We cannot add another value on a existing one
1037                                    if the attribute is single-valued */
1038                                 if (ldb_tdb_single_valued(a, el)) {
1039                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1040                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1041                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1042                                         goto done;
1043                                 }
1044
1045                                 /* Check that values don't exist yet on multi-
1046                                    valued attributes or aren't provided twice */
1047                                 if (!(el->flags &
1048                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1049                                         struct ldb_val *duplicate = NULL;
1050                                         ret = ldb_msg_find_common_values(ldb,
1051                                                                          msg2,
1052                                                                          el,
1053                                                                          el2,
1054                                                                          options);
1055
1056                                         if (ret ==
1057                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1058                                                 ldb_asprintf_errstring(ldb,
1059                                                         "attribute '%s': value "
1060                                                         "#%u on '%s' already "
1061                                                         "exists", el->name, j,
1062                                                         ldb_dn_get_linearized(msg2->dn));
1063                                                 goto done;
1064                                         } else if (ret != LDB_SUCCESS) {
1065                                                 goto done;
1066                                         }
1067
1068                                         ret = ldb_msg_find_duplicate_val(
1069                                                 ldb, msg2, el, &duplicate, 0);
1070                                         if (ret != LDB_SUCCESS) {
1071                                                 goto done;
1072                                         }
1073                                         if (duplicate != NULL) {
1074                                                 ldb_asprintf_errstring(
1075                                                         ldb,
1076                                                         "attribute '%s': value "
1077                                                         "'%.*s' on '%s' "
1078                                                         "provided more than "
1079                                                         "once in ADD",
1080                                                         el->name,
1081                                                         (int)duplicate->length,
1082                                                         duplicate->data,
1083                                                         ldb_dn_get_linearized(msg->dn));
1084                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1085                                                 goto done;
1086                                         }
1087                                 }
1088
1089                                 /* Now combine existing and new values to a new
1090                                    attribute record */
1091                                 vals = talloc_realloc(msg2->elements,
1092                                                       el2->values, struct ldb_val,
1093                                                       el2->num_values + el->num_values);
1094                                 if (vals == NULL) {
1095                                         ldb_oom(ldb);
1096                                         ret = LDB_ERR_OTHER;
1097                                         goto done;
1098                                 }
1099
1100                                 for (j=0; j<el->num_values; j++) {
1101                                         vals[el2->num_values + j] =
1102                                                 ldb_val_dup(vals, &el->values[j]);
1103                                 }
1104
1105                                 el2->values = vals;
1106                                 el2->num_values += el->num_values;
1107
1108                                 ret = ltdb_index_add_element(module, ltdb,
1109                                                              msg2, el);
1110                                 if (ret != LDB_SUCCESS) {
1111                                         goto done;
1112                                 }
1113                         }
1114
1115                         break;
1116
1117                 case LDB_FLAG_MOD_REPLACE:
1118
1119                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1120                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1121                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1122                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1123                                 goto done;
1124                         }
1125
1126                         /*
1127                          * We don't need to check this if we have been
1128                          * pre-screened by the repl_meta_data module
1129                          * in Samba, or someone else who can claim to
1130                          * know what they are doing. 
1131                          */
1132                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1133                                 struct ldb_val *duplicate = NULL;
1134
1135                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1136                                                                  &duplicate, 0);
1137                                 if (ret != LDB_SUCCESS) {
1138                                         goto done;
1139                                 }
1140                                 if (duplicate != NULL) {
1141                                         ldb_asprintf_errstring(
1142                                                 ldb,
1143                                                 "attribute '%s': value '%.*s' "
1144                                                 "on '%s' provided more than "
1145                                                 "once in REPLACE",
1146                                                 el->name,
1147                                                 (int)duplicate->length,
1148                                                 duplicate->data,
1149                                                 ldb_dn_get_linearized(msg2->dn));
1150                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1151                                         goto done;
1152                                 }
1153                         }
1154
1155                         /* Checks if element already exists */
1156                         idx = find_element(msg2, el->name);
1157                         if (idx != -1) {
1158                                 j = (unsigned int) idx;
1159                                 el2 = &(msg2->elements[j]);
1160
1161                                 /* we consider two elements to be
1162                                  * equal only if the order
1163                                  * matches. This allows dbcheck to
1164                                  * fix the ordering on attributes
1165                                  * where order matters, such as
1166                                  * objectClass
1167                                  */
1168                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1169                                         continue;
1170                                 }
1171
1172                                 /* Delete the attribute if it exists in the DB */
1173                                 if (msg_delete_attribute(module, ltdb,
1174                                                          msg2,
1175                                                          el->name) != 0) {
1176                                         ret = LDB_ERR_OTHER;
1177                                         goto done;
1178                                 }
1179                         }
1180
1181                         /* Recreate it with the new values */
1182                         if (ltdb_msg_add_element(msg2, el) != 0) {
1183                                 ret = LDB_ERR_OTHER;
1184                                 goto done;
1185                         }
1186
1187                         ret = ltdb_index_add_element(module, ltdb,
1188                                                      msg2, el);
1189                         if (ret != LDB_SUCCESS) {
1190                                 goto done;
1191                         }
1192
1193                         break;
1194
1195                 case LDB_FLAG_MOD_DELETE:
1196                         dn = ldb_dn_get_linearized(msg2->dn);
1197                         if (dn == NULL) {
1198                                 ret = LDB_ERR_OTHER;
1199                                 goto done;
1200                         }
1201
1202                         if (msg->elements[i].num_values == 0) {
1203                                 /* Delete the whole attribute */
1204                                 ret = msg_delete_attribute(module,
1205                                                            ltdb,
1206                                                            msg2,
1207                                                            msg->elements[i].name);
1208                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1209                                     control_permissive) {
1210                                         ret = LDB_SUCCESS;
1211                                 } else {
1212                                         ldb_asprintf_errstring(ldb,
1213                                                                "attribute '%s': no such attribute for delete on '%s'",
1214                                                                msg->elements[i].name, dn);
1215                                 }
1216                                 if (ret != LDB_SUCCESS) {
1217                                         goto done;
1218                                 }
1219                         } else {
1220                                 /* Delete specified values from an attribute */
1221                                 for (j=0; j < msg->elements[i].num_values; j++) {
1222                                         ret = msg_delete_element(module,
1223                                                                  ltdb,
1224                                                                  msg2,
1225                                                                  msg->elements[i].name,
1226                                                                  &msg->elements[i].values[j]);
1227                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1228                                             control_permissive) {
1229                                                 ret = LDB_SUCCESS;
1230                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1231                                                 ldb_asprintf_errstring(ldb,
1232                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1233                                                                        msg->elements[i].name, dn);
1234                                         }
1235                                         if (ret != LDB_SUCCESS) {
1236                                                 goto done;
1237                                         }
1238                                 }
1239                         }
1240                         break;
1241                 default:
1242                         ldb_asprintf_errstring(ldb,
1243                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1244                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1245                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1246                         ret = LDB_ERR_PROTOCOL_ERROR;
1247                         goto done;
1248                 }
1249         }
1250
1251         ret = ltdb_store(module, msg2, TDB_MODIFY);
1252         if (ret != LDB_SUCCESS) {
1253                 goto done;
1254         }
1255
1256         ret = ltdb_modified(module, msg2->dn);
1257         if (ret != LDB_SUCCESS) {
1258                 goto done;
1259         }
1260
1261 done:
1262         TALLOC_FREE(mem_ctx);
1263         return ret;
1264 }
1265
1266 /*
1267   modify a record
1268 */
1269 static int ltdb_modify(struct ltdb_context *ctx)
1270 {
1271         struct ldb_module *module = ctx->module;
1272         struct ldb_request *req = ctx->req;
1273         int ret = LDB_SUCCESS;
1274
1275         ret = ltdb_check_special_dn(module, req->op.mod.message);
1276         if (ret != LDB_SUCCESS) {
1277                 return ret;
1278         }
1279
1280         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1281
1282         if (ltdb_cache_load(module) != 0) {
1283                 return LDB_ERR_OPERATIONS_ERROR;
1284         }
1285
1286         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1287
1288         return ret;
1289 }
1290
1291 /*
1292   rename a record
1293 */
1294 static int ltdb_rename(struct ltdb_context *ctx)
1295 {
1296         struct ldb_module *module = ctx->module;
1297         void *data = ldb_module_get_private(module);
1298         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1299         struct ldb_request *req = ctx->req;
1300         struct ldb_message *msg;
1301         int ret = LDB_SUCCESS;
1302         TDB_DATA tdb_key, tdb_key_old;
1303         struct ldb_dn *db_dn;
1304
1305         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1306
1307         if (ltdb_cache_load(ctx->module) != 0) {
1308                 return LDB_ERR_OPERATIONS_ERROR;
1309         }
1310
1311         msg = ldb_msg_new(ctx);
1312         if (msg == NULL) {
1313                 return LDB_ERR_OPERATIONS_ERROR;
1314         }
1315
1316         /* we need to fetch the old record to re-add under the new name */
1317         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1318                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1319         if (ret != LDB_SUCCESS) {
1320                 /* not finding the old record is an error */
1321                 return ret;
1322         }
1323
1324         /* We need to, before changing the DB, check if the new DN
1325          * exists, so we can return this error to the caller with an
1326          * unmodified DB
1327          *
1328          * Even in GUID index mode we use ltdb_key_dn() as we are
1329          * trying to figure out if this is just a case rename
1330          */
1331         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1332         if (!tdb_key.dptr) {
1333                 talloc_free(msg);
1334                 return LDB_ERR_OPERATIONS_ERROR;
1335         }
1336
1337         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1338         if (!tdb_key_old.dptr) {
1339                 talloc_free(msg);
1340                 talloc_free(tdb_key.dptr);
1341                 return LDB_ERR_OPERATIONS_ERROR;
1342         }
1343
1344         /*
1345          * Only declare a conflict if the new DN already exists,
1346          * and it isn't a case change on the old DN
1347          */
1348         if (tdb_key_old.dsize != tdb_key.dsize
1349             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1350                 ret = ltdb_search_base(module, msg,
1351                                        req->op.rename.newdn,
1352                                        &db_dn);
1353                 if (ret == LDB_SUCCESS) {
1354                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1355                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1356                         ret = LDB_SUCCESS;
1357                 }
1358         }
1359
1360         /* finding the new record already in the DB is an error */
1361
1362         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1363                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1364                                        "Entry %s already exists",
1365                                        ldb_dn_get_linearized(req->op.rename.newdn));
1366         }
1367         if (ret != LDB_SUCCESS) {
1368                 talloc_free(tdb_key_old.dptr);
1369                 talloc_free(tdb_key.dptr);
1370                 talloc_free(msg);
1371                 return ret;
1372         }
1373
1374         talloc_free(tdb_key_old.dptr);
1375         talloc_free(tdb_key.dptr);
1376
1377         /* Always delete first then add, to avoid conflicts with
1378          * unique indexes. We rely on the transaction to make this
1379          * atomic
1380          */
1381         ret = ltdb_delete_internal(module, msg->dn);
1382         if (ret != LDB_SUCCESS) {
1383                 talloc_free(msg);
1384                 return ret;
1385         }
1386
1387         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1388         if (msg->dn == NULL) {
1389                 talloc_free(msg);
1390                 return LDB_ERR_OPERATIONS_ERROR;
1391         }
1392
1393         /* We don't check single value as we can have more than 1 with
1394          * deleted attributes. We could go through all elements but that's
1395          * maybe not the most efficient way
1396          */
1397         ret = ltdb_add_internal(module, ltdb, msg, false);
1398
1399         talloc_free(msg);
1400
1401         return ret;
1402 }
1403
1404 static int ltdb_start_trans(struct ldb_module *module)
1405 {
1406         void *data = ldb_module_get_private(module);
1407         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1408
1409         /* Do not take out the transaction lock on a read-only DB */
1410         if (ltdb->read_only) {
1411                 return LDB_ERR_UNWILLING_TO_PERFORM;
1412         }
1413
1414         if (tdb_transaction_start(ltdb->tdb) != 0) {
1415                 return ltdb->kv_ops->error(ltdb);
1416         }
1417
1418         ltdb->in_transaction++;
1419
1420         ltdb_index_transaction_start(module);
1421
1422         return LDB_SUCCESS;
1423 }
1424
1425 static int ltdb_prepare_commit(struct ldb_module *module)
1426 {
1427         int ret;
1428         void *data = ldb_module_get_private(module);
1429         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1430
1431         if (ltdb->in_transaction != 1) {
1432                 return LDB_SUCCESS;
1433         }
1434
1435         ret = ltdb_index_transaction_commit(module);
1436         if (ret != LDB_SUCCESS) {
1437                 tdb_transaction_cancel(ltdb->tdb);
1438                 ltdb->in_transaction--;
1439                 return ret;
1440         }
1441
1442         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1443                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1444                 ltdb->in_transaction--;
1445                 ldb_debug_set(ldb_module_get_ctx(module),
1446                               LDB_DEBUG_FATAL,
1447                               "Failure during "
1448                               "tdb_transaction_prepare_commit(): %s -> %s",
1449                               tdb_errorstr(ltdb->tdb),
1450                               ldb_strerror(ret));
1451                 return ret;
1452         }
1453
1454         ltdb->prepared_commit = true;
1455
1456         return LDB_SUCCESS;
1457 }
1458
1459 static int ltdb_end_trans(struct ldb_module *module)
1460 {
1461         int ret;
1462         void *data = ldb_module_get_private(module);
1463         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1464
1465         if (!ltdb->prepared_commit) {
1466                 ret = ltdb_prepare_commit(module);
1467                 if (ret != LDB_SUCCESS) {
1468                         return ret;
1469                 }
1470         }
1471
1472         ltdb->in_transaction--;
1473         ltdb->prepared_commit = false;
1474
1475         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1476                 ret = ltdb->kv_ops->error(ltdb);
1477                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1478                                        "Failure during tdb_transaction_commit(): %s -> %s",
1479                                        tdb_errorstr(ltdb->tdb),
1480                                        ldb_strerror(ret));
1481                 return ret;
1482         }
1483
1484         return LDB_SUCCESS;
1485 }
1486
1487 static int ltdb_del_trans(struct ldb_module *module)
1488 {
1489         void *data = ldb_module_get_private(module);
1490         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1491
1492         ltdb->in_transaction--;
1493
1494         if (ltdb_index_transaction_cancel(module) != 0) {
1495                 tdb_transaction_cancel(ltdb->tdb);
1496                 return ltdb->kv_ops->error(ltdb);
1497         }
1498
1499         tdb_transaction_cancel(ltdb->tdb);
1500         return LDB_SUCCESS;
1501 }
1502
1503 /*
1504   return sequenceNumber from @BASEINFO
1505 */
1506 static int ltdb_sequence_number(struct ltdb_context *ctx,
1507                                 struct ldb_extended **ext)
1508 {
1509         struct ldb_context *ldb;
1510         struct ldb_module *module = ctx->module;
1511         struct ldb_request *req = ctx->req;
1512         TALLOC_CTX *tmp_ctx = NULL;
1513         struct ldb_seqnum_request *seq;
1514         struct ldb_seqnum_result *res;
1515         struct ldb_message *msg = NULL;
1516         struct ldb_dn *dn;
1517         const char *date;
1518         int ret = LDB_SUCCESS;
1519
1520         ldb = ldb_module_get_ctx(module);
1521
1522         seq = talloc_get_type(req->op.extended.data,
1523                                 struct ldb_seqnum_request);
1524         if (seq == NULL) {
1525                 return LDB_ERR_OPERATIONS_ERROR;
1526         }
1527
1528         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1529
1530         if (ltdb_lock_read(module) != 0) {
1531                 return LDB_ERR_OPERATIONS_ERROR;
1532         }
1533
1534         res = talloc_zero(req, struct ldb_seqnum_result);
1535         if (res == NULL) {
1536                 ret = LDB_ERR_OPERATIONS_ERROR;
1537                 goto done;
1538         }
1539
1540         tmp_ctx = talloc_new(req);
1541         if (tmp_ctx == NULL) {
1542                 ret = LDB_ERR_OPERATIONS_ERROR;
1543                 goto done;
1544         }
1545
1546         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1547         if (dn == NULL) {
1548                 ret = LDB_ERR_OPERATIONS_ERROR;
1549                 goto done;
1550         }
1551
1552         msg = ldb_msg_new(tmp_ctx);
1553         if (msg == NULL) {
1554                 ret = LDB_ERR_OPERATIONS_ERROR;
1555                 goto done;
1556         }
1557
1558         ret = ltdb_search_dn1(module, dn, msg, 0);
1559         if (ret != LDB_SUCCESS) {
1560                 goto done;
1561         }
1562
1563         switch (seq->type) {
1564         case LDB_SEQ_HIGHEST_SEQ:
1565                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1566                 break;
1567         case LDB_SEQ_NEXT:
1568                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1569                 res->seq_num++;
1570                 break;
1571         case LDB_SEQ_HIGHEST_TIMESTAMP:
1572                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1573                 if (date) {
1574                         res->seq_num = ldb_string_to_time(date);
1575                 } else {
1576                         res->seq_num = 0;
1577                         /* zero is as good as anything when we don't know */
1578                 }
1579                 break;
1580         }
1581
1582         *ext = talloc_zero(req, struct ldb_extended);
1583         if (*ext == NULL) {
1584                 ret = LDB_ERR_OPERATIONS_ERROR;
1585                 goto done;
1586         }
1587         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1588         (*ext)->data = talloc_steal(*ext, res);
1589
1590 done:
1591         talloc_free(tmp_ctx);
1592         ltdb_unlock_read(module);
1593         return ret;
1594 }
1595
1596 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1597 {
1598         struct ldb_context *ldb;
1599         struct ldb_request *req;
1600         struct ldb_reply *ares;
1601
1602         ldb = ldb_module_get_ctx(ctx->module);
1603         req = ctx->req;
1604
1605         /* if we already returned an error just return */
1606         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1607                 return;
1608         }
1609
1610         ares = talloc_zero(req, struct ldb_reply);
1611         if (!ares) {
1612                 ldb_oom(ldb);
1613                 req->callback(req, NULL);
1614                 return;
1615         }
1616         ares->type = LDB_REPLY_DONE;
1617         ares->error = error;
1618
1619         req->callback(req, ares);
1620 }
1621
1622 static void ltdb_timeout(struct tevent_context *ev,
1623                           struct tevent_timer *te,
1624                           struct timeval t,
1625                           void *private_data)
1626 {
1627         struct ltdb_context *ctx;
1628         ctx = talloc_get_type(private_data, struct ltdb_context);
1629
1630         if (!ctx->request_terminated) {
1631                 /* request is done now */
1632                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1633         }
1634
1635         if (ctx->spy) {
1636                 /* neutralize the spy */
1637                 ctx->spy->ctx = NULL;
1638                 ctx->spy = NULL;
1639         }
1640         talloc_free(ctx);
1641 }
1642
1643 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1644                                         struct ldb_extended *ext,
1645                                         int error)
1646 {
1647         struct ldb_context *ldb;
1648         struct ldb_request *req;
1649         struct ldb_reply *ares;
1650
1651         ldb = ldb_module_get_ctx(ctx->module);
1652         req = ctx->req;
1653
1654         /* if we already returned an error just return */
1655         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1656                 return;
1657         }
1658
1659         ares = talloc_zero(req, struct ldb_reply);
1660         if (!ares) {
1661                 ldb_oom(ldb);
1662                 req->callback(req, NULL);
1663                 return;
1664         }
1665         ares->type = LDB_REPLY_DONE;
1666         ares->response = ext;
1667         ares->error = error;
1668
1669         req->callback(req, ares);
1670 }
1671
1672 static void ltdb_handle_extended(struct ltdb_context *ctx)
1673 {
1674         struct ldb_extended *ext = NULL;
1675         int ret;
1676
1677         if (strcmp(ctx->req->op.extended.oid,
1678                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1679                 /* get sequence number */
1680                 ret = ltdb_sequence_number(ctx, &ext);
1681         } else {
1682                 /* not recognized */
1683                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1684         }
1685
1686         ltdb_request_extended_done(ctx, ext, ret);
1687 }
1688
1689 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1690 {
1691         return tdb_name(ltdb->tdb);
1692 }
1693
1694 static const struct kv_db_ops key_value_ops = {
1695         .store = ltdb_tdb_store,
1696         .delete = ltdb_tdb_delete,
1697         .lock_read = ltdb_lock_read,
1698         .unlock_read = ltdb_unlock_read,
1699         .error = ltdb_error,
1700         .name = ltdb_tdb_name,
1701 };
1702
1703 static void ltdb_callback(struct tevent_context *ev,
1704                           struct tevent_timer *te,
1705                           struct timeval t,
1706                           void *private_data)
1707 {
1708         struct ltdb_context *ctx;
1709         int ret;
1710
1711         ctx = talloc_get_type(private_data, struct ltdb_context);
1712
1713         if (ctx->request_terminated) {
1714                 goto done;
1715         }
1716
1717         switch (ctx->req->operation) {
1718         case LDB_SEARCH:
1719                 ret = ltdb_search(ctx);
1720                 break;
1721         case LDB_ADD:
1722                 ret = ltdb_add(ctx);
1723                 break;
1724         case LDB_MODIFY:
1725                 ret = ltdb_modify(ctx);
1726                 break;
1727         case LDB_DELETE:
1728                 ret = ltdb_delete(ctx);
1729                 break;
1730         case LDB_RENAME:
1731                 ret = ltdb_rename(ctx);
1732                 break;
1733         case LDB_EXTENDED:
1734                 ltdb_handle_extended(ctx);
1735                 goto done;
1736         default:
1737                 /* no other op supported */
1738                 ret = LDB_ERR_PROTOCOL_ERROR;
1739         }
1740
1741         if (!ctx->request_terminated) {
1742                 /* request is done now */
1743                 ltdb_request_done(ctx, ret);
1744         }
1745
1746 done:
1747         if (ctx->spy) {
1748                 /* neutralize the spy */
1749                 ctx->spy->ctx = NULL;
1750                 ctx->spy = NULL;
1751         }
1752         talloc_free(ctx);
1753 }
1754
1755 static int ltdb_request_destructor(void *ptr)
1756 {
1757         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1758
1759         if (spy->ctx != NULL) {
1760                 spy->ctx->spy = NULL;
1761                 spy->ctx->request_terminated = true;
1762                 spy->ctx = NULL;
1763         }
1764
1765         return 0;
1766 }
1767
1768 static int ltdb_handle_request(struct ldb_module *module,
1769                                struct ldb_request *req)
1770 {
1771         struct ldb_control *control_permissive;
1772         struct ldb_context *ldb;
1773         struct tevent_context *ev;
1774         struct ltdb_context *ac;
1775         struct tevent_timer *te;
1776         struct timeval tv;
1777         unsigned int i;
1778
1779         ldb = ldb_module_get_ctx(module);
1780
1781         control_permissive = ldb_request_get_control(req,
1782                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1783
1784         for (i = 0; req->controls && req->controls[i]; i++) {
1785                 if (req->controls[i]->critical &&
1786                     req->controls[i] != control_permissive) {
1787                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1788                                                req->controls[i]->oid);
1789                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1790                 }
1791         }
1792
1793         if (req->starttime == 0 || req->timeout == 0) {
1794                 ldb_set_errstring(ldb, "Invalid timeout settings");
1795                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1796         }
1797
1798         ev = ldb_handle_get_event_context(req->handle);
1799
1800         ac = talloc_zero(ldb, struct ltdb_context);
1801         if (ac == NULL) {
1802                 ldb_oom(ldb);
1803                 return LDB_ERR_OPERATIONS_ERROR;
1804         }
1805
1806         ac->module = module;
1807         ac->req = req;
1808
1809         tv.tv_sec = 0;
1810         tv.tv_usec = 0;
1811         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1812         if (NULL == te) {
1813                 talloc_free(ac);
1814                 return LDB_ERR_OPERATIONS_ERROR;
1815         }
1816
1817         if (req->timeout > 0) {
1818                 tv.tv_sec = req->starttime + req->timeout;
1819                 tv.tv_usec = 0;
1820                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1821                                                      ltdb_timeout, ac);
1822                 if (NULL == ac->timeout_event) {
1823                         talloc_free(ac);
1824                         return LDB_ERR_OPERATIONS_ERROR;
1825                 }
1826         }
1827
1828         /* set a spy so that we do not try to use the request context
1829          * if it is freed before ltdb_callback fires */
1830         ac->spy = talloc(req, struct ltdb_req_spy);
1831         if (NULL == ac->spy) {
1832                 talloc_free(ac);
1833                 return LDB_ERR_OPERATIONS_ERROR;
1834         }
1835         ac->spy->ctx = ac;
1836
1837         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1838
1839         return LDB_SUCCESS;
1840 }
1841
1842 static int ltdb_init_rootdse(struct ldb_module *module)
1843 {
1844         /* ignore errors on this - we expect it for non-sam databases */
1845         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1846
1847         /* there can be no module beyond the backend, just return */
1848         return LDB_SUCCESS;
1849 }
1850
1851 static const struct ldb_module_ops ltdb_ops = {
1852         .name              = "tdb",
1853         .init_context      = ltdb_init_rootdse,
1854         .search            = ltdb_handle_request,
1855         .add               = ltdb_handle_request,
1856         .modify            = ltdb_handle_request,
1857         .del               = ltdb_handle_request,
1858         .rename            = ltdb_handle_request,
1859         .extended          = ltdb_handle_request,
1860         .start_transaction = ltdb_start_trans,
1861         .end_transaction   = ltdb_end_trans,
1862         .prepare_commit    = ltdb_prepare_commit,
1863         .del_transaction   = ltdb_del_trans,
1864         .read_lock         = ltdb_lock_read,
1865         .read_unlock       = ltdb_unlock_read,
1866 };
1867
1868 /*
1869   connect to the database
1870 */
1871 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1872                         unsigned int flags, const char *options[],
1873                         struct ldb_module **_module)
1874 {
1875         struct ldb_module *module;
1876         const char *path;
1877         int tdb_flags, open_flags;
1878         struct ltdb_private *ltdb;
1879
1880         /*
1881          * We hold locks, so we must use a private event context
1882          * on each returned handle
1883          */
1884
1885         ldb_set_require_private_event_context(ldb);
1886
1887         /* parse the url */
1888         if (strchr(url, ':')) {
1889                 if (strncmp(url, "tdb://", 6) != 0) {
1890                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1891                                   "Invalid tdb URL '%s'", url);
1892                         return LDB_ERR_OPERATIONS_ERROR;
1893                 }
1894                 path = url+6;
1895         } else {
1896                 path = url;
1897         }
1898
1899         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1900
1901         /* check for the 'nosync' option */
1902         if (flags & LDB_FLG_NOSYNC) {
1903                 tdb_flags |= TDB_NOSYNC;
1904         }
1905
1906         /* and nommap option */
1907         if (flags & LDB_FLG_NOMMAP) {
1908                 tdb_flags |= TDB_NOMMAP;
1909         }
1910
1911         ltdb = talloc_zero(ldb, struct ltdb_private);
1912         if (!ltdb) {
1913                 ldb_oom(ldb);
1914                 return LDB_ERR_OPERATIONS_ERROR;
1915         }
1916
1917         if (flags & LDB_FLG_RDONLY) {
1918                 /*
1919                  * This is weird, but because we can only have one tdb
1920                  * in this process, and the other one could be
1921                  * read-write, we can't use the tdb readonly.  Plus a
1922                  * read only tdb prohibits the all-record lock.
1923                  */
1924                 open_flags = O_RDWR;
1925
1926                 ltdb->read_only = true;
1927
1928         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1929                 /*
1930                  * This is used by ldbsearch to prevent creation of the database
1931                  * if the name is wrong
1932                  */
1933                 open_flags = O_RDWR;
1934         } else {
1935                 /*
1936                  * This is the normal case
1937                  */
1938                 open_flags = O_CREAT | O_RDWR;
1939         }
1940
1941         /* note that we use quite a large default hash size */
1942         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1943                                    tdb_flags, open_flags,
1944                                    ldb_get_create_perms(ldb), ldb);
1945         if (!ltdb->tdb) {
1946                 ldb_asprintf_errstring(ldb,
1947                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1948                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1949                           "Unable to open tdb '%s': %s", path, strerror(errno));
1950                 talloc_free(ltdb);
1951                 if (errno == EACCES || errno == EPERM) {
1952                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1953                 }
1954                 return LDB_ERR_OPERATIONS_ERROR;
1955         }
1956
1957         if (getenv("LDB_WARN_UNINDEXED")) {
1958                 ltdb->warn_unindexed = true;
1959         }
1960
1961         if (getenv("LDB_WARN_REINDEX")) {
1962                 ltdb->warn_reindex = true;
1963         }
1964
1965         ltdb->sequence_number = 0;
1966         ltdb->kv_ops = &key_value_ops;
1967
1968         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1969         if (!module) {
1970                 ldb_oom(ldb);
1971                 talloc_free(ltdb);
1972                 return LDB_ERR_OPERATIONS_ERROR;
1973         }
1974         ldb_module_set_private(module, ltdb);
1975         talloc_steal(module, ltdb);
1976
1977         if (ltdb_cache_load(module) != 0) {
1978                 ldb_asprintf_errstring(ldb,
1979                                        "Unable to load ltdb cache records of tdb '%s'", path);
1980                 talloc_free(module);
1981                 return LDB_ERR_OPERATIONS_ERROR;
1982         }
1983
1984         *_module = module;
1985
1986         /*
1987          * Set the maximum key length
1988          */
1989         {
1990                 const char *len_str =
1991                         ldb_options_find(ldb, options,
1992                                          "max_key_len_for_self_test");
1993                 if (len_str != NULL) {
1994                         unsigned len = strtoul(len_str, NULL, 0);
1995                         ltdb->max_key_length = len;
1996                 }
1997         }
1998         return LDB_SUCCESS;
1999 }
2000
2001 int ldb_tdb_init(const char *version)
2002 {
2003         LDB_MODULE_CHECK_VERSION(version);
2004         return ldb_register_backend("tdb", ltdb_connect, false);
2005 }