ldb key_value: Add batch_mode option
[samba.git] / lib / ldb / ldb_key_value / ldb_kv.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_kv
29  *
30  *  Component: ldb key value backend
31  *
32  *  Description: core functions for ldb key value backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_kv.h"
53 #include "ldb_private.h"
54 #include "lib/util/attr.h"
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ldb_kv_req_spy {
60         struct ldb_kv_context *ctx;
61 };
62
63 /*
64  * Determine if this key could hold a record.  We allow the new GUID
65  * index, the old DN index and a possible future ID=
66  */
67 bool ldb_kv_key_is_normal_record(struct ldb_val key)
68 {
69         if (key.length < 4) {
70                 return false;
71         }
72
73         /*
74          * @ records are not normal records, we don't want to index
75          * them nor search on them
76          */
77         if (key.length > 4 &&
78             memcmp(key.data, "DN=@", 4) == 0) {
79                 return false;
80         }
81
82         /* All other DN= records are however */
83         if (memcmp(key.data, "DN=", 3) == 0) {
84                 return true;
85         }
86
87         if (memcmp(key.data, "ID=", 3) == 0) {
88                 return true;
89         }
90
91         if (key.length < sizeof(LDB_KV_GUID_KEY_PREFIX)) {
92                 return false;
93         }
94
95         if (memcmp(key.data, LDB_KV_GUID_KEY_PREFIX,
96                    sizeof(LDB_KV_GUID_KEY_PREFIX) - 1) == 0) {
97                 return true;
98         }
99
100         return false;
101 }
102
103 /*
104   form a ldb_val for a record key
105   caller frees
106
107   note that the key for a record can depend on whether the
108   dn refers to a case sensitive index record or not
109 */
110 struct ldb_val ldb_kv_key_dn(TALLOC_CTX *mem_ctx,
111                              struct ldb_dn *dn)
112 {
113         struct ldb_val key;
114         char *key_str = NULL;
115         const char *dn_folded = NULL;
116
117         /*
118           most DNs are case insensitive. The exception is index DNs for
119           case sensitive attributes
120
121           there are 3 cases dealt with in this code:
122
123           1) if the dn doesn't start with @ then uppercase the attribute
124              names and the attributes values of case insensitive attributes
125           2) if the dn starts with @ then leave it alone -
126              the indexing code handles the rest
127         */
128
129         dn_folded = ldb_dn_get_casefold(dn);
130         if (!dn_folded) {
131                 goto failed;
132         }
133
134         key_str = talloc_strdup(mem_ctx, "DN=");
135         if (!key_str) {
136                 goto failed;
137         }
138
139         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
140         if (!key_str) {
141                 goto failed;
142         }
143
144         key.data = (uint8_t *)key_str;
145         key.length = strlen(key_str) + 1;
146
147         return key;
148
149 failed:
150         errno = ENOMEM;
151         key.data = NULL;
152         key.length = 0;
153         return key;
154 }
155
156 /* The caller is to provide a correctly sized key */
157 int ldb_kv_guid_to_key(const struct ldb_val *GUID_val,
158                        struct ldb_val *key)
159 {
160         const char *GUID_prefix = LDB_KV_GUID_KEY_PREFIX;
161         const int GUID_prefix_len = sizeof(LDB_KV_GUID_KEY_PREFIX) - 1;
162
163         if (key->length != (GUID_val->length+GUID_prefix_len)) {
164                 return LDB_ERR_OPERATIONS_ERROR;
165         }
166
167         memcpy(key->data, GUID_prefix, GUID_prefix_len);
168         memcpy(&key->data[GUID_prefix_len],
169                GUID_val->data, GUID_val->length);
170         return LDB_SUCCESS;
171 }
172
173 /*
174  * The caller is to provide a correctly sized key, used only in
175  * the GUID index mode
176  */
177 int ldb_kv_idx_to_key(struct ldb_module *module,
178                       struct ldb_kv_private *ldb_kv,
179                       TALLOC_CTX *mem_ctx,
180                       const struct ldb_val *idx_val,
181                       struct ldb_val *key)
182 {
183         struct ldb_context *ldb = ldb_module_get_ctx(module);
184         struct ldb_dn *dn;
185
186         if (ldb_kv->cache->GUID_index_attribute != NULL) {
187                 return ldb_kv_guid_to_key(idx_val, key);
188         }
189
190         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
191         if (dn == NULL) {
192                 /*
193                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
194                  * to the caller, as this in an invalid index value
195                  */
196                 return LDB_ERR_OPERATIONS_ERROR;
197         }
198         /* form the key */
199         *key = ldb_kv_key_dn(mem_ctx, dn);
200         TALLOC_FREE(dn);
201         if (!key->data) {
202                 return ldb_module_oom(module);
203         }
204         return LDB_SUCCESS;
205 }
206
207 /*
208   form a TDB_DATA for a record key
209   caller frees mem_ctx, which may or may not have the key
210   as a child.
211
212   note that the key for a record can depend on whether a
213   GUID index is in use, or the DN is used as the key
214 */
215 struct ldb_val ldb_kv_key_msg(struct ldb_module *module,
216                         TALLOC_CTX *mem_ctx,
217                         const struct ldb_message *msg)
218 {
219         void *data = ldb_module_get_private(module);
220         struct ldb_kv_private *ldb_kv =
221             talloc_get_type(data, struct ldb_kv_private);
222         struct ldb_val key;
223         const struct ldb_val *guid_val;
224         int ret;
225
226         if (ldb_kv->cache->GUID_index_attribute == NULL) {
227                 return ldb_kv_key_dn(mem_ctx, msg->dn);
228         }
229
230         if (ldb_dn_is_special(msg->dn)) {
231                 return ldb_kv_key_dn(mem_ctx, msg->dn);
232         }
233
234         guid_val =
235             ldb_msg_find_ldb_val(msg, ldb_kv->cache->GUID_index_attribute);
236         if (guid_val == NULL) {
237                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
238                                        "Did not find GUID attribute %s "
239                                        "in %s, required for TDB record "
240                                        "key in " LDB_KV_IDXGUID " mode.",
241                                        ldb_kv->cache->GUID_index_attribute,
242                                        ldb_dn_get_linearized(msg->dn));
243                 errno = EINVAL;
244                 key.data = NULL;
245                 key.length = 0;
246                 return key;
247         }
248
249         /* In this case, allocate with talloc */
250         key.data = talloc_size(mem_ctx, LDB_KV_GUID_KEY_SIZE);
251         if (key.data == NULL) {
252                 errno = ENOMEM;
253                 key.data = NULL;
254                 key.length = 0;
255                 return key;
256         }
257         key.length = talloc_get_size(key.data);
258
259         ret = ldb_kv_guid_to_key(guid_val, &key);
260
261         if (ret != LDB_SUCCESS) {
262                 errno = EINVAL;
263                 key.data = NULL;
264                 key.length = 0;
265                 return key;
266         }
267         return key;
268 }
269
270 /*
271   check special dn's have valid attributes
272   currently only @ATTRIBUTES is checked
273 */
274 static int ldb_kv_check_special_dn(struct ldb_module *module,
275                                    const struct ldb_message *msg)
276 {
277         struct ldb_context *ldb = ldb_module_get_ctx(module);
278         unsigned int i, j;
279
280         if (! ldb_dn_is_special(msg->dn) ||
281             ! ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
282                 return LDB_SUCCESS;
283         }
284
285         /* we have @ATTRIBUTES, let's check attributes are fine */
286         /* should we check that we deny multivalued attributes ? */
287         for (i = 0; i < msg->num_elements; i++) {
288                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
289
290                 for (j = 0; j < msg->elements[i].num_values; j++) {
291                         if (ldb_kv_check_at_attributes_values(
292                                 &msg->elements[i].values[j]) != 0) {
293                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
294                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
295                         }
296                 }
297         }
298
299         return LDB_SUCCESS;
300 }
301
302 /*
303  * Called after modifies and when starting a transaction. Checks target pack
304  * format version and current pack format version, which are set by cache_load,
305  * and repacks if necessary.
306  */
307 static int ldb_kv_maybe_repack(struct ldb_kv_private *ldb_kv) {
308         /* Override option taken from ldb options */
309         if (ldb_kv->pack_format_override != 0) {
310                 ldb_kv->target_pack_format_version =
311                         ldb_kv->pack_format_override;
312         }
313
314         if (ldb_kv->pack_format_version !=
315             ldb_kv->target_pack_format_version) {
316                 int r;
317                 struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
318                 ldb_kv->pack_format_version =
319                         ldb_kv->target_pack_format_version;
320                 r = ldb_kv_repack(ldb_kv->module);
321                 if (r != LDB_SUCCESS) {
322                         ldb_debug(ldb, LDB_DEBUG_ERROR,
323                                   "Database repack failed.");
324                 }
325                 return r;
326         }
327
328         return LDB_SUCCESS;
329 }
330
331 /*
332   we've made a modification to a dn - possibly reindex and
333   update sequence number
334 */
335 static int ldb_kv_modified(struct ldb_module *module, struct ldb_dn *dn)
336 {
337         int ret = LDB_SUCCESS;
338         struct ldb_kv_private *ldb_kv = talloc_get_type(
339             ldb_module_get_private(module), struct ldb_kv_private);
340
341         /* only allow modifies inside a transaction, otherwise the
342          * ldb is unsafe */
343         if (ldb_kv->kv_ops->transaction_active(ldb_kv) == false) {
344                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
345                 return LDB_ERR_OPERATIONS_ERROR;
346         }
347
348         if (ldb_dn_is_special(dn) &&
349             (ldb_dn_check_special(dn, LDB_KV_INDEXLIST) ||
350              ldb_dn_check_special(dn, LDB_KV_ATTRIBUTES)) )
351         {
352                 if (ldb_kv->warn_reindex) {
353                         ldb_debug(ldb_module_get_ctx(module),
354                                   LDB_DEBUG_ERROR,
355                                   "Reindexing %s due to modification on %s",
356                                   ldb_kv->kv_ops->name(ldb_kv),
357                                   ldb_dn_get_linearized(dn));
358                 }
359                 ret = ldb_kv_reindex(module);
360         }
361
362         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
363         if (ret == LDB_SUCCESS &&
364             !(ldb_dn_is_special(dn) &&
365               ldb_dn_check_special(dn, LDB_KV_BASEINFO)) ) {
366                 ret = ldb_kv_increase_sequence_number(module);
367         }
368
369         /* If the modify was to @OPTIONS, reload the cache */
370         if (ret == LDB_SUCCESS &&
371             ldb_dn_is_special(dn) &&
372             (ldb_dn_check_special(dn, LDB_KV_OPTIONS)) ) {
373                 ret = ldb_kv_cache_reload(module);
374         }
375
376         if (ret != LDB_SUCCESS) {
377                 ldb_kv->reindex_failed = true;
378         }
379
380         return ret;
381 }
382 /*
383   store a record into the db
384 */
385 int ldb_kv_store(struct ldb_module *module,
386                  const struct ldb_message *msg,
387                  int flgs)
388 {
389         void *data = ldb_module_get_private(module);
390         struct ldb_kv_private *ldb_kv =
391             talloc_get_type(data, struct ldb_kv_private);
392         struct ldb_val key;
393         struct ldb_val ldb_data;
394         int ret = LDB_SUCCESS;
395         TALLOC_CTX *key_ctx = talloc_new(module);
396
397         if (key_ctx == NULL) {
398                 return ldb_module_oom(module);
399         }
400
401         if (ldb_kv->read_only) {
402                 talloc_free(key_ctx);
403                 return LDB_ERR_UNWILLING_TO_PERFORM;
404         }
405
406         key = ldb_kv_key_msg(module, key_ctx, msg);
407         if (key.data == NULL) {
408                 TALLOC_FREE(key_ctx);
409                 return LDB_ERR_OTHER;
410         }
411
412         ret = ldb_pack_data(ldb_module_get_ctx(module),
413                             msg, &ldb_data,
414                             ldb_kv->pack_format_version);
415         if (ret == -1) {
416                 TALLOC_FREE(key_ctx);
417                 return LDB_ERR_OTHER;
418         }
419
420         ret = ldb_kv->kv_ops->store(ldb_kv, key, ldb_data, flgs);
421         if (ret != 0) {
422                 bool is_special = ldb_dn_is_special(msg->dn);
423                 ret = ldb_kv->kv_ops->error(ldb_kv);
424
425                 /*
426                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
427                  * the GUID, so re-map
428                  */
429                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS && !is_special &&
430                     ldb_kv->cache->GUID_index_attribute != NULL) {
431                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
432                 }
433                 goto done;
434         }
435
436 done:
437         TALLOC_FREE(key_ctx);
438         talloc_free(ldb_data.data);
439
440         return ret;
441 }
442
443
444 /*
445   check if a attribute is a single valued, for a given element
446  */
447 static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
448                                  struct ldb_message_element *el)
449 {
450         if (!a) return false;
451         if (el != NULL) {
452                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
453                         /* override from a ldb module, for example
454                            used for the description field, which is
455                            marked multi-valued in the schema but which
456                            should not actually accept multiple
457                            values */
458                         return true;
459                 }
460                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
461                         /* override from a ldb module, for example used for
462                            deleted linked attribute entries */
463                         return false;
464                 }
465         }
466         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
467                 return true;
468         }
469         return false;
470 }
471
472 /*
473  * Starts a sub transaction if they are supported by the backend
474  * and the ldb connection has not been opened in batch mode.
475  */
476 static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
477 {
478         int ret = LDB_SUCCESS;
479
480         if (ldb_kv->batch_mode) {
481                 return ret;
482         }
483
484         ret = ldb_kv->kv_ops->begin_nested_write(ldb_kv);
485         if (ret == LDB_SUCCESS) {
486                 ret = ldb_kv_index_sub_transaction_start(ldb_kv);
487         }
488         return ret;
489 }
490
491 /*
492  * Commits a sub transaction if they are supported by the backend
493  * and the ldb connection has not been opened in batch mode.
494  */
495 static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
496 {
497         int ret = LDB_SUCCESS;
498
499         if (ldb_kv->batch_mode) {
500                 return ret;
501         }
502
503         ret = ldb_kv_index_sub_transaction_commit(ldb_kv);
504         if (ret != LDB_SUCCESS) {
505                 return ret;
506         }
507         ret = ldb_kv->kv_ops->finish_nested_write(ldb_kv);
508         return ret;
509 }
510
511 /*
512  * Cancels a sub transaction if they are supported by the backend
513  * and the ldb connection has not been opened in batch mode.
514  */
515 static int ldb_kv_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
516 {
517         int ret = LDB_SUCCESS;
518
519         if (ldb_kv->batch_mode) {
520                 return ret;
521         }
522
523         ret = ldb_kv_index_sub_transaction_cancel(ldb_kv);
524         if (ret != LDB_SUCCESS) {
525                 struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
526                 /*
527                  * In the event of a failure we log the failure and continue
528                  * as we need to cancel the database transaction.
529                  */
530                 ldb_debug(ldb,
531                           LDB_DEBUG_ERROR,
532                           __location__": ldb_kv_index_sub_transaction_cancel "
533                           "failed: %s",
534                           ldb_errstring(ldb));
535         }
536         ret = ldb_kv->kv_ops->abort_nested_write(ldb_kv);
537         return ret;
538 }
539
540 static int ldb_kv_add_internal(struct ldb_module *module,
541                                struct ldb_kv_private *ldb_kv,
542                                const struct ldb_message *msg,
543                                bool check_single_value)
544 {
545         struct ldb_context *ldb = ldb_module_get_ctx(module);
546         int ret = LDB_SUCCESS;
547         unsigned int i;
548         bool valid_dn = false;
549
550         /* Check the new DN is reasonable */
551         valid_dn = ldb_dn_validate(msg->dn);
552         if (valid_dn == false) {
553                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
554                                        "Invalid DN in ADD: %s",
555                                        ldb_dn_get_linearized(msg->dn));
556                 return LDB_ERR_INVALID_DN_SYNTAX;
557         }
558
559         for (i=0;i<msg->num_elements;i++) {
560                 struct ldb_message_element *el = &msg->elements[i];
561                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
562
563                 if (el->num_values == 0) {
564                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
565                                                el->name, ldb_dn_get_linearized(msg->dn));
566                         return LDB_ERR_CONSTRAINT_VIOLATION;
567                 }
568                 if (check_single_value && el->num_values > 1 &&
569                     ldb_kv_single_valued(a, el)) {
570                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
571                                                el->name, ldb_dn_get_linearized(msg->dn));
572                         return LDB_ERR_CONSTRAINT_VIOLATION;
573                 }
574
575                 /* Do not check "@ATTRIBUTES" for duplicated values */
576                 if (ldb_dn_is_special(msg->dn) &&
577                     ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
578                         continue;
579                 }
580
581                 if (check_single_value &&
582                     !(el->flags &
583                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
584                         struct ldb_val *duplicate = NULL;
585
586                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
587                                                          el, &duplicate, 0);
588                         if (ret != LDB_SUCCESS) {
589                                 return ret;
590                         }
591                         if (duplicate != NULL) {
592                                 ldb_asprintf_errstring(
593                                         ldb,
594                                         "attribute '%s': value '%.*s' on '%s' "
595                                         "provided more than once in ADD object",
596                                         el->name,
597                                         (int)duplicate->length,
598                                         duplicate->data,
599                                         ldb_dn_get_linearized(msg->dn));
600                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
601                         }
602                 }
603         }
604
605         ret = ldb_kv_store(module, msg, TDB_INSERT);
606         if (ret != LDB_SUCCESS) {
607                 /*
608                  * Try really hard to get the right error code for
609                  * a re-add situation, as this can matter!
610                  */
611                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
612                         int ret2;
613                         struct ldb_dn *dn2 = NULL;
614                         TALLOC_CTX *mem_ctx = talloc_new(module);
615                         if (mem_ctx == NULL) {
616                                 return ldb_module_operr(module);
617                         }
618                         ret2 =
619                             ldb_kv_search_base(module, mem_ctx, msg->dn, &dn2);
620                         TALLOC_FREE(mem_ctx);
621                         if (ret2 == LDB_SUCCESS) {
622                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
623                         }
624                 }
625                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
626                         ldb_asprintf_errstring(ldb,
627                                                "Entry %s already exists",
628                                                ldb_dn_get_linearized(msg->dn));
629                 }
630                 return ret;
631         }
632
633         ret = ldb_kv_index_add_new(module, ldb_kv, msg);
634         if (ret != LDB_SUCCESS) {
635                 /*
636                  * If we failed to index, delete the message again.
637                  *
638                  * This is particularly important for the GUID index
639                  * case, which will only fail for a duplicate DN
640                  * in the index add.
641                  *
642                  * Note that the caller may not cancel the transation
643                  * and this means the above add might really show up!
644                  */
645                 ldb_kv_delete_noindex(module, msg);
646                 return ret;
647         }
648
649         ret = ldb_kv_modified(module, msg->dn);
650
651         /*
652          * To allow testing of the error recovery code in ldb_kv_add
653          * cmocka tests can define CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
654          * to inject failures at this point.
655          */
656 #ifdef CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
657         CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
658 #endif
659
660         return ret;
661 }
662
663 /*
664   add a record to the database
665 */
666 static int ldb_kv_add(struct ldb_kv_context *ctx)
667 {
668         struct ldb_module *module = ctx->module;
669         struct ldb_request *req = ctx->req;
670         void *data = ldb_module_get_private(module);
671         struct ldb_kv_private *ldb_kv =
672             talloc_get_type(data, struct ldb_kv_private);
673         int ret = LDB_SUCCESS;
674
675         if (ldb_kv->max_key_length != 0 &&
676             ldb_kv->cache->GUID_index_attribute == NULL &&
677             !ldb_dn_is_special(req->op.add.message->dn)) {
678                 ldb_set_errstring(ldb_module_get_ctx(module),
679                                   "Must operate ldb_mdb in GUID "
680                                   "index mode, but " LDB_KV_IDXGUID " not set.");
681                 return LDB_ERR_UNWILLING_TO_PERFORM;
682         }
683
684         ret = ldb_kv_check_special_dn(module, req->op.add.message);
685         if (ret != LDB_SUCCESS) {
686                 return ret;
687         }
688
689         ldb_request_set_state(req, LDB_ASYNC_PENDING);
690
691         if (ldb_kv_cache_load(module) != 0) {
692                 return LDB_ERR_OPERATIONS_ERROR;
693         }
694
695         ret = ldb_kv_sub_transaction_start(ldb_kv);
696         if (ret != LDB_SUCCESS) {
697                 return ret;
698         }
699         ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
700         if (ret != LDB_SUCCESS) {
701                 int r = ldb_kv_sub_transaction_cancel(ldb_kv);
702                 if (r != LDB_SUCCESS) {
703                         ldb_debug(
704                                 ldb_module_get_ctx(module),
705                                 LDB_DEBUG_FATAL,
706                                 __location__
707                                 ": Unable to roll back sub transaction");
708                 }
709                 ldb_kv->operation_failed = true;
710                 return ret;
711         }
712         ret = ldb_kv_sub_transaction_commit(ldb_kv);
713
714         return ret;
715 }
716
717 /*
718   delete a record from the database, not updating indexes (used for deleting
719   index records)
720 */
721 int ldb_kv_delete_noindex(struct ldb_module *module,
722                           const struct ldb_message *msg)
723 {
724         void *data = ldb_module_get_private(module);
725         struct ldb_kv_private *ldb_kv =
726             talloc_get_type(data, struct ldb_kv_private);
727         struct ldb_val key;
728         int ret;
729         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
730
731         if (tdb_key_ctx == NULL) {
732                 return ldb_module_oom(module);
733         }
734
735         if (ldb_kv->read_only) {
736                 talloc_free(tdb_key_ctx);
737                 return LDB_ERR_UNWILLING_TO_PERFORM;
738         }
739
740         key = ldb_kv_key_msg(module, tdb_key_ctx, msg);
741         if (!key.data) {
742                 TALLOC_FREE(tdb_key_ctx);
743                 return LDB_ERR_OTHER;
744         }
745
746         ret = ldb_kv->kv_ops->delete(ldb_kv, key);
747         TALLOC_FREE(tdb_key_ctx);
748
749         if (ret != 0) {
750                 ret = ldb_kv->kv_ops->error(ldb_kv);
751         }
752
753         return ret;
754 }
755
756 static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
757 {
758         struct ldb_message *msg;
759         int ret = LDB_SUCCESS;
760
761         msg = ldb_msg_new(module);
762         if (msg == NULL) {
763                 return LDB_ERR_OPERATIONS_ERROR;
764         }
765
766         /* in case any attribute of the message was indexed, we need
767            to fetch the old record */
768         ret = ldb_kv_search_dn1(module, dn, msg, 0);
769         if (ret != LDB_SUCCESS) {
770                 /* not finding the old record is an error */
771                 goto done;
772         }
773
774         ret = ldb_kv_delete_noindex(module, msg);
775         if (ret != LDB_SUCCESS) {
776                 goto done;
777         }
778
779         /* remove any indexed attributes */
780         ret = ldb_kv_index_delete(module, msg);
781         if (ret != LDB_SUCCESS) {
782                 goto done;
783         }
784
785         ret = ldb_kv_modified(module, dn);
786         if (ret != LDB_SUCCESS) {
787                 goto done;
788         }
789
790 done:
791         talloc_free(msg);
792         /*
793          * To allow testing of the error recovery code in ldb_kv_delete
794          * cmocka tests can define CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
795          * to inject failures at this point.
796          */
797 #ifdef CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
798         CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
799 #endif
800         return ret;
801 }
802
803 /*
804   delete a record from the database
805 */
806 static int ldb_kv_delete(struct ldb_kv_context *ctx)
807 {
808         struct ldb_module *module = ctx->module;
809         struct ldb_request *req = ctx->req;
810         void *data = ldb_module_get_private(module);
811         struct ldb_kv_private *ldb_kv =
812             talloc_get_type(data, struct ldb_kv_private);
813         int ret = LDB_SUCCESS;
814
815         ldb_request_set_state(req, LDB_ASYNC_PENDING);
816
817         if (ldb_kv_cache_load(module) != 0) {
818                 return LDB_ERR_OPERATIONS_ERROR;
819         }
820
821         ret = ldb_kv_sub_transaction_start(ldb_kv);
822         if (ret != LDB_SUCCESS) {
823                 return ret;
824         }
825         ret = ldb_kv_delete_internal(module, req->op.del.dn);
826         if (ret != LDB_SUCCESS) {
827                 int r = ldb_kv_sub_transaction_cancel(ldb_kv);
828                 if (r != LDB_SUCCESS) {
829                         ldb_debug(
830                                 ldb_module_get_ctx(module),
831                                 LDB_DEBUG_FATAL,
832                                 __location__
833                                 ": Unable to roll back sub transaction");
834                 }
835                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
836                         ldb_kv->operation_failed = true;
837                 }
838                 return ret;
839         }
840         ret = ldb_kv_sub_transaction_commit(ldb_kv);
841
842         return ret;
843 }
844
845 /*
846   find an element by attribute name. At the moment this does a linear search,
847   it should be re-coded to use a binary search once all places that modify
848   records guarantee sorted order
849
850   return the index of the first matching element if found, otherwise -1
851 */
852 static int ldb_kv_find_element(const struct ldb_message *msg, const char *name)
853 {
854         unsigned int i;
855         for (i=0;i<msg->num_elements;i++) {
856                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
857                         return i;
858                 }
859         }
860         return -1;
861 }
862
863
864 /*
865   add an element to an existing record. Assumes a elements array that we
866   can call re-alloc on, and assumed that we can re-use the data pointers from
867   the passed in additional values. Use with care!
868
869   returns 0 on success, -1 on failure (and sets errno)
870 */
871 static int ldb_kv_msg_add_element(struct ldb_message *msg,
872                                   struct ldb_message_element *el)
873 {
874         struct ldb_message_element *e2;
875         unsigned int i;
876
877         if (el->num_values == 0) {
878                 /* nothing to do here - we don't add empty elements */
879                 return 0;
880         }
881
882         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
883                               msg->num_elements+1);
884         if (!e2) {
885                 errno = ENOMEM;
886                 return -1;
887         }
888
889         msg->elements = e2;
890
891         e2 = &msg->elements[msg->num_elements];
892
893         e2->name = el->name;
894         e2->flags = el->flags;
895         e2->values = talloc_array(msg->elements,
896                                   struct ldb_val, el->num_values);
897         if (!e2->values) {
898                 errno = ENOMEM;
899                 return -1;
900         }
901         for (i=0;i<el->num_values;i++) {
902                 e2->values[i] = el->values[i];
903         }
904         e2->num_values = el->num_values;
905
906         ++msg->num_elements;
907
908         return 0;
909 }
910
911 /*
912   delete all elements having a specified attribute name
913 */
914 static int ldb_kv_msg_delete_attribute(struct ldb_module *module,
915                                        struct ldb_kv_private *ldb_kv,
916                                        struct ldb_message *msg,
917                                        const char *name)
918 {
919         int ret;
920         struct ldb_message_element *el;
921         bool is_special = ldb_dn_is_special(msg->dn);
922
923         if (!is_special && ldb_kv->cache->GUID_index_attribute != NULL &&
924             ldb_attr_cmp(name, ldb_kv->cache->GUID_index_attribute) == 0) {
925                 struct ldb_context *ldb = ldb_module_get_ctx(module);
926                 ldb_asprintf_errstring(ldb,
927                                        "Must not modify GUID "
928                                        "attribute %s (used as DB index)",
929                                        ldb_kv->cache->GUID_index_attribute);
930                 return LDB_ERR_CONSTRAINT_VIOLATION;
931         }
932
933         el = ldb_msg_find_element(msg, name);
934         if (el == NULL) {
935                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
936         }
937
938         ret = ldb_kv_index_del_element(module, ldb_kv, msg, el);
939         if (ret != LDB_SUCCESS) {
940                 return ret;
941         }
942
943         talloc_free(el->values);
944         ldb_msg_remove_element(msg, el);
945         msg->elements = talloc_realloc(msg, msg->elements,
946                                        struct ldb_message_element,
947                                        msg->num_elements);
948         return LDB_SUCCESS;
949 }
950
951 /*
952   delete all elements matching an attribute name/value
953
954   return LDB Error on failure
955 */
956 static int ldb_kv_msg_delete_element(struct ldb_module *module,
957                                      struct ldb_kv_private *ldb_kv,
958                                      struct ldb_message *msg,
959                                      const char *name,
960                                      const struct ldb_val *val)
961 {
962         struct ldb_context *ldb = ldb_module_get_ctx(module);
963         unsigned int i;
964         int found, ret;
965         struct ldb_message_element *el;
966         const struct ldb_schema_attribute *a;
967
968         found = ldb_kv_find_element(msg, name);
969         if (found == -1) {
970                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
971         }
972
973         i = (unsigned int) found;
974         el = &(msg->elements[i]);
975
976         a = ldb_schema_attribute_by_name(ldb, el->name);
977
978         for (i=0;i<el->num_values;i++) {
979                 bool matched;
980                 if (a->syntax->operator_fn) {
981                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
982                                                      &el->values[i], val, &matched);
983                         if (ret != LDB_SUCCESS) return ret;
984                 } else {
985                         matched = (a->syntax->comparison_fn(ldb, ldb,
986                                                             &el->values[i], val) == 0);
987                 }
988                 if (matched) {
989                         if (el->num_values == 1) {
990                                 return ldb_kv_msg_delete_attribute(
991                                     module, ldb_kv, msg, name);
992                         }
993
994                         ret =
995                             ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
996                         if (ret != LDB_SUCCESS) {
997                                 return ret;
998                         }
999
1000                         if (i<el->num_values-1) {
1001                                 memmove(&el->values[i], &el->values[i+1],
1002                                         sizeof(el->values[i])*
1003                                                 (el->num_values-(i+1)));
1004                         }
1005                         el->num_values--;
1006
1007                         /* per definition we find in a canonicalised message an
1008                            attribute value only once. So we are finished here */
1009                         return LDB_SUCCESS;
1010                 }
1011         }
1012
1013         /* Not found */
1014         return LDB_ERR_NO_SUCH_ATTRIBUTE;
1015 }
1016
1017 /*
1018   modify a record - internal interface
1019
1020   yuck - this is O(n^2). Luckily n is usually small so we probably
1021   get away with it, but if we ever have really large attribute lists
1022   then we'll need to look at this again
1023
1024   'req' is optional, and is used to specify controls if supplied
1025 */
1026 int ldb_kv_modify_internal(struct ldb_module *module,
1027                            const struct ldb_message *msg,
1028                            struct ldb_request *req)
1029 {
1030         struct ldb_context *ldb = ldb_module_get_ctx(module);
1031         void *data = ldb_module_get_private(module);
1032         struct ldb_kv_private *ldb_kv =
1033             talloc_get_type(data, struct ldb_kv_private);
1034         struct ldb_message *msg2;
1035         unsigned int i, j;
1036         int ret = LDB_SUCCESS, idx;
1037         struct ldb_control *control_permissive = NULL;
1038         TALLOC_CTX *mem_ctx = talloc_new(req);
1039
1040         if (mem_ctx == NULL) {
1041                 return ldb_module_oom(module);
1042         }
1043
1044         if (req) {
1045                 control_permissive = ldb_request_get_control(req,
1046                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1047         }
1048
1049         msg2 = ldb_msg_new(mem_ctx);
1050         if (msg2 == NULL) {
1051                 ret = LDB_ERR_OTHER;
1052                 goto done;
1053         }
1054
1055         ret = ldb_kv_search_dn1(module, msg->dn, msg2, 0);
1056         if (ret != LDB_SUCCESS) {
1057                 goto done;
1058         }
1059
1060         for (i=0; i<msg->num_elements; i++) {
1061                 struct ldb_message_element *el = &msg->elements[i], *el2;
1062                 struct ldb_val *vals;
1063                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
1064                 const char *dn;
1065                 uint32_t options = 0;
1066                 if (control_permissive != NULL) {
1067                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
1068                 }
1069
1070                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
1071                 case LDB_FLAG_MOD_ADD:
1072
1073                         if (el->num_values == 0) {
1074                                 ldb_asprintf_errstring(ldb,
1075                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
1076                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1077                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
1078                                 goto done;
1079                         }
1080
1081                         /* make a copy of the array so that a permissive
1082                          * control can remove duplicates without changing the
1083                          * original values, but do not copy data as we do not
1084                          * need to keep it around once the operation is
1085                          * finished */
1086                         if (control_permissive) {
1087                                 el = talloc(msg2, struct ldb_message_element);
1088                                 if (!el) {
1089                                         ret = LDB_ERR_OTHER;
1090                                         goto done;
1091                                 }
1092                                 *el = msg->elements[i];
1093                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1094                                 if (el->values == NULL) {
1095                                         ret = LDB_ERR_OTHER;
1096                                         goto done;
1097                                 }
1098                                 for (j = 0; j < el->num_values; j++) {
1099                                         el->values[j] = msg->elements[i].values[j];
1100                                 }
1101                         }
1102
1103                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1104                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1105                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1106                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1107                                 goto done;
1108                         }
1109
1110                         /* Checks if element already exists */
1111                         idx = ldb_kv_find_element(msg2, el->name);
1112                         if (idx == -1) {
1113                                 if (ldb_kv_msg_add_element(msg2, el) != 0) {
1114                                         ret = LDB_ERR_OTHER;
1115                                         goto done;
1116                                 }
1117                                 ret = ldb_kv_index_add_element(
1118                                     module, ldb_kv, msg2, el);
1119                                 if (ret != LDB_SUCCESS) {
1120                                         goto done;
1121                                 }
1122                         } else {
1123                                 j = (unsigned int) idx;
1124                                 el2 = &(msg2->elements[j]);
1125
1126                                 /* We cannot add another value on a existing one
1127                                    if the attribute is single-valued */
1128                                 if (ldb_kv_single_valued(a, el)) {
1129                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1130                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1131                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1132                                         goto done;
1133                                 }
1134
1135                                 /* Check that values don't exist yet on multi-
1136                                    valued attributes or aren't provided twice */
1137                                 if (!(el->flags &
1138                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1139                                         struct ldb_val *duplicate = NULL;
1140                                         ret = ldb_msg_find_common_values(ldb,
1141                                                                          msg2,
1142                                                                          el,
1143                                                                          el2,
1144                                                                          options);
1145
1146                                         if (ret ==
1147                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1148                                                 ldb_asprintf_errstring(ldb,
1149                                                         "attribute '%s': value "
1150                                                         "#%u on '%s' already "
1151                                                         "exists", el->name, j,
1152                                                         ldb_dn_get_linearized(msg2->dn));
1153                                                 goto done;
1154                                         } else if (ret != LDB_SUCCESS) {
1155                                                 goto done;
1156                                         }
1157
1158                                         ret = ldb_msg_find_duplicate_val(
1159                                                 ldb, msg2, el, &duplicate, 0);
1160                                         if (ret != LDB_SUCCESS) {
1161                                                 goto done;
1162                                         }
1163                                         if (duplicate != NULL) {
1164                                                 ldb_asprintf_errstring(
1165                                                         ldb,
1166                                                         "attribute '%s': value "
1167                                                         "'%.*s' on '%s' "
1168                                                         "provided more than "
1169                                                         "once in ADD",
1170                                                         el->name,
1171                                                         (int)duplicate->length,
1172                                                         duplicate->data,
1173                                                         ldb_dn_get_linearized(msg->dn));
1174                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1175                                                 goto done;
1176                                         }
1177                                 }
1178
1179                                 /* Now combine existing and new values to a new
1180                                    attribute record */
1181                                 vals = talloc_realloc(msg2->elements,
1182                                                       el2->values, struct ldb_val,
1183                                                       el2->num_values + el->num_values);
1184                                 if (vals == NULL) {
1185                                         ldb_oom(ldb);
1186                                         ret = LDB_ERR_OTHER;
1187                                         goto done;
1188                                 }
1189
1190                                 for (j=0; j<el->num_values; j++) {
1191                                         vals[el2->num_values + j] =
1192                                                 ldb_val_dup(vals, &el->values[j]);
1193                                 }
1194
1195                                 el2->values = vals;
1196                                 el2->num_values += el->num_values;
1197
1198                                 ret = ldb_kv_index_add_element(
1199                                     module, ldb_kv, msg2, el);
1200                                 if (ret != LDB_SUCCESS) {
1201                                         goto done;
1202                                 }
1203                         }
1204
1205                         break;
1206
1207                 case LDB_FLAG_MOD_REPLACE:
1208
1209                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1210                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1211                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1212                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1213                                 goto done;
1214                         }
1215
1216                         /*
1217                          * We don't need to check this if we have been
1218                          * pre-screened by the repl_meta_data module
1219                          * in Samba, or someone else who can claim to
1220                          * know what they are doing.
1221                          */
1222                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1223                                 struct ldb_val *duplicate = NULL;
1224
1225                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1226                                                                  &duplicate, 0);
1227                                 if (ret != LDB_SUCCESS) {
1228                                         goto done;
1229                                 }
1230                                 if (duplicate != NULL) {
1231                                         ldb_asprintf_errstring(
1232                                                 ldb,
1233                                                 "attribute '%s': value '%.*s' "
1234                                                 "on '%s' provided more than "
1235                                                 "once in REPLACE",
1236                                                 el->name,
1237                                                 (int)duplicate->length,
1238                                                 duplicate->data,
1239                                                 ldb_dn_get_linearized(msg2->dn));
1240                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1241                                         goto done;
1242                                 }
1243                         }
1244
1245                         /* Checks if element already exists */
1246                         idx = ldb_kv_find_element(msg2, el->name);
1247                         if (idx != -1) {
1248                                 j = (unsigned int) idx;
1249                                 el2 = &(msg2->elements[j]);
1250
1251                                 /* we consider two elements to be
1252                                  * equal only if the order
1253                                  * matches. This allows dbcheck to
1254                                  * fix the ordering on attributes
1255                                  * where order matters, such as
1256                                  * objectClass
1257                                  */
1258                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1259                                         continue;
1260                                 }
1261
1262                                 /* Delete the attribute if it exists in the DB */
1263                                 if (ldb_kv_msg_delete_attribute(
1264                                         module, ldb_kv, msg2, el->name) != 0) {
1265                                         ret = LDB_ERR_OTHER;
1266                                         goto done;
1267                                 }
1268                         }
1269
1270                         /* Recreate it with the new values */
1271                         if (ldb_kv_msg_add_element(msg2, el) != 0) {
1272                                 ret = LDB_ERR_OTHER;
1273                                 goto done;
1274                         }
1275
1276                         ret =
1277                             ldb_kv_index_add_element(module, ldb_kv, msg2, el);
1278                         if (ret != LDB_SUCCESS) {
1279                                 goto done;
1280                         }
1281
1282                         break;
1283
1284                 case LDB_FLAG_MOD_DELETE:
1285                         dn = ldb_dn_get_linearized(msg2->dn);
1286                         if (dn == NULL) {
1287                                 ret = LDB_ERR_OTHER;
1288                                 goto done;
1289                         }
1290
1291                         if (msg->elements[i].num_values == 0) {
1292                                 /* Delete the whole attribute */
1293                                 ret = ldb_kv_msg_delete_attribute(
1294                                     module,
1295                                     ldb_kv,
1296                                     msg2,
1297                                     msg->elements[i].name);
1298                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1299                                     control_permissive) {
1300                                         ret = LDB_SUCCESS;
1301                                 } else {
1302                                         ldb_asprintf_errstring(ldb,
1303                                                                "attribute '%s': no such attribute for delete on '%s'",
1304                                                                msg->elements[i].name, dn);
1305                                 }
1306                                 if (ret != LDB_SUCCESS) {
1307                                         goto done;
1308                                 }
1309                         } else {
1310                                 /* Delete specified values from an attribute */
1311                                 for (j=0; j < msg->elements[i].num_values; j++) {
1312                                         ret = ldb_kv_msg_delete_element(
1313                                             module,
1314                                             ldb_kv,
1315                                             msg2,
1316                                             msg->elements[i].name,
1317                                             &msg->elements[i].values[j]);
1318                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1319                                             control_permissive) {
1320                                                 ret = LDB_SUCCESS;
1321                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1322                                                 ldb_asprintf_errstring(ldb,
1323                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1324                                                                        msg->elements[i].name, dn);
1325                                         }
1326                                         if (ret != LDB_SUCCESS) {
1327                                                 goto done;
1328                                         }
1329                                 }
1330                         }
1331                         break;
1332                 default:
1333                         ldb_asprintf_errstring(ldb,
1334                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1335                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1336                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1337                         ret = LDB_ERR_PROTOCOL_ERROR;
1338                         goto done;
1339                 }
1340         }
1341
1342         ret = ldb_kv_store(module, msg2, TDB_MODIFY);
1343         if (ret != LDB_SUCCESS) {
1344                 goto done;
1345         }
1346
1347         ret = ldb_kv_modified(module, msg2->dn);
1348         if (ret != LDB_SUCCESS) {
1349                 goto done;
1350         }
1351
1352 done:
1353         TALLOC_FREE(mem_ctx);
1354         /*
1355          * To allow testing of the error recovery code in ldb_kv_modify
1356          * cmocka tests can define CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
1357          * to inject failures at this point.
1358          */
1359 #ifdef CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
1360         CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
1361 #endif
1362         return ret;
1363 }
1364
1365 /*
1366   modify a record
1367 */
1368 static int ldb_kv_modify(struct ldb_kv_context *ctx)
1369 {
1370         struct ldb_module *module = ctx->module;
1371         struct ldb_request *req = ctx->req;
1372         void *data = ldb_module_get_private(module);
1373         struct ldb_kv_private *ldb_kv =
1374             talloc_get_type(data, struct ldb_kv_private);
1375         int ret = LDB_SUCCESS;
1376
1377         ret = ldb_kv_check_special_dn(module, req->op.mod.message);
1378         if (ret != LDB_SUCCESS) {
1379                 return ret;
1380         }
1381
1382         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1383
1384         if (ldb_kv_cache_load(module) != 0) {
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         ret = ldb_kv_sub_transaction_start(ldb_kv);
1389         if (ret != LDB_SUCCESS) {
1390                 return ret;
1391         }
1392         ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
1393         if (ret != LDB_SUCCESS) {
1394                 int r = ldb_kv_sub_transaction_cancel(ldb_kv);
1395                 if (r != LDB_SUCCESS) {
1396                         ldb_debug(
1397                                 ldb_module_get_ctx(module),
1398                                 LDB_DEBUG_FATAL,
1399                                 __location__
1400                                 ": Unable to roll back sub transaction");
1401                 }
1402                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
1403                         ldb_kv->operation_failed = true;
1404                 }
1405                 return ret;
1406         }
1407         ret = ldb_kv_sub_transaction_commit(ldb_kv);
1408
1409
1410         return ret;
1411 }
1412
1413 static int ldb_kv_rename_internal(struct ldb_module *module,
1414                            struct ldb_request *req,
1415                            struct ldb_message *msg)
1416 {
1417         void *data = ldb_module_get_private(module);
1418         struct ldb_kv_private *ldb_kv =
1419             talloc_get_type(data, struct ldb_kv_private);
1420         int ret = LDB_SUCCESS;
1421
1422         /* Always delete first then add, to avoid conflicts with
1423          * unique indexes. We rely on the transaction to make this
1424          * atomic
1425          */
1426         ret = ldb_kv_delete_internal(module, msg->dn);
1427         if (ret != LDB_SUCCESS) {
1428                 return ret;
1429         }
1430
1431         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1432         if (msg->dn == NULL) {
1433                 return LDB_ERR_OPERATIONS_ERROR;
1434         }
1435
1436         /* We don't check single value as we can have more than 1 with
1437          * deleted attributes. We could go through all elements but that's
1438          * maybe not the most efficient way
1439          */
1440         ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
1441
1442         /*
1443          * To allow testing of the error recovery code in ldb_kv_rename
1444          * cmocka tests can define CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
1445          * to inject failures at this point.
1446          */
1447 #ifdef CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
1448         CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
1449 #endif
1450         return ret;
1451 }
1452
1453 /*
1454   rename a record
1455 */
1456 static int ldb_kv_rename(struct ldb_kv_context *ctx)
1457 {
1458         struct ldb_module *module = ctx->module;
1459         void *data = ldb_module_get_private(module);
1460         struct ldb_kv_private *ldb_kv =
1461             talloc_get_type(data, struct ldb_kv_private);
1462         struct ldb_request *req = ctx->req;
1463         struct ldb_message *msg;
1464         int ret = LDB_SUCCESS;
1465         struct ldb_val  key, key_old;
1466         struct ldb_dn *db_dn;
1467         bool valid_dn = false;
1468
1469         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1470
1471         if (ldb_kv_cache_load(ctx->module) != 0) {
1472                 return LDB_ERR_OPERATIONS_ERROR;
1473         }
1474
1475         msg = ldb_msg_new(ctx);
1476         if (msg == NULL) {
1477                 return LDB_ERR_OPERATIONS_ERROR;
1478         }
1479
1480         /* Check the new DN is reasonable */
1481         valid_dn = ldb_dn_validate(req->op.rename.newdn);
1482         if (valid_dn == false) {
1483                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1484                                        "Invalid New DN: %s",
1485                                        ldb_dn_get_linearized(req->op.rename.newdn));
1486                 return LDB_ERR_INVALID_DN_SYNTAX;
1487         }
1488
1489         /* we need to fetch the old record to re-add under the new name */
1490         ret = ldb_kv_search_dn1(module, req->op.rename.olddn, msg, 0);
1491         if (ret == LDB_ERR_INVALID_DN_SYNTAX) {
1492                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1493                                        "Invalid Old DN: %s",
1494                                        ldb_dn_get_linearized(req->op.rename.newdn));
1495                 return ret;
1496         } else if (ret != LDB_SUCCESS) {
1497                 /* not finding the old record is an error */
1498                 return ret;
1499         }
1500
1501         /* We need to, before changing the DB, check if the new DN
1502          * exists, so we can return this error to the caller with an
1503          * unmodified DB
1504          *
1505          * Even in GUID index mode we use ltdb_key_dn() as we are
1506          * trying to figure out if this is just a case rename
1507          */
1508         key = ldb_kv_key_dn(msg, req->op.rename.newdn);
1509         if (!key.data) {
1510                 talloc_free(msg);
1511                 return LDB_ERR_OPERATIONS_ERROR;
1512         }
1513
1514         key_old = ldb_kv_key_dn(msg, req->op.rename.olddn);
1515         if (!key_old.data) {
1516                 talloc_free(msg);
1517                 talloc_free(key.data);
1518                 return LDB_ERR_OPERATIONS_ERROR;
1519         }
1520
1521         /*
1522          * Only declare a conflict if the new DN already exists,
1523          * and it isn't a case change on the old DN
1524          */
1525         if (key_old.length != key.length
1526             || memcmp(key.data, key_old.data, key.length) != 0) {
1527                 ret = ldb_kv_search_base(
1528                     module, msg, req->op.rename.newdn, &db_dn);
1529                 if (ret == LDB_SUCCESS) {
1530                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1531                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1532                         ret = LDB_SUCCESS;
1533                 }
1534         }
1535
1536         /* finding the new record already in the DB is an error */
1537
1538         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1539                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1540                                        "Entry %s already exists",
1541                                        ldb_dn_get_linearized(req->op.rename.newdn));
1542         }
1543         if (ret != LDB_SUCCESS) {
1544                 talloc_free(key_old.data);
1545                 talloc_free(key.data);
1546                 talloc_free(msg);
1547                 return ret;
1548         }
1549
1550         talloc_free(key_old.data);
1551         talloc_free(key.data);
1552
1553
1554         ret = ldb_kv_sub_transaction_start(ldb_kv);
1555         if (ret != LDB_SUCCESS) {
1556                 talloc_free(msg);
1557                 return ret;
1558         }
1559         ret = ldb_kv_rename_internal(module, req, msg);
1560         if (ret != LDB_SUCCESS) {
1561                 int r = ldb_kv_sub_transaction_cancel(ldb_kv);
1562                 if (r != LDB_SUCCESS) {
1563                         ldb_debug(
1564                                 ldb_module_get_ctx(module),
1565                                 LDB_DEBUG_FATAL,
1566                                 __location__
1567                                 ": Unable to roll back sub transaction");
1568                 }
1569                 talloc_free(msg);
1570                 ldb_kv->operation_failed = true;
1571                 return ret;
1572         }
1573         ret = ldb_kv_sub_transaction_commit(ldb_kv);
1574         talloc_free(msg);
1575
1576         return ret;
1577 }
1578
1579 static int ldb_kv_start_trans(struct ldb_module *module)
1580 {
1581         void *data = ldb_module_get_private(module);
1582         struct ldb_kv_private *ldb_kv =
1583             talloc_get_type(data, struct ldb_kv_private);
1584
1585         pid_t pid = getpid();
1586
1587         if (ldb_kv->pid != pid) {
1588                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
1589                                        __location__
1590                                        ": Reusing ldb opend by pid %d in "
1591                                        "process %d\n",
1592                                        ldb_kv->pid,
1593                                        pid);
1594                 return LDB_ERR_PROTOCOL_ERROR;
1595         }
1596
1597         /* Do not take out the transaction lock on a read-only DB */
1598         if (ldb_kv->read_only) {
1599                 return LDB_ERR_UNWILLING_TO_PERFORM;
1600         }
1601
1602         if (ldb_kv->kv_ops->begin_write(ldb_kv) != 0) {
1603                 return ldb_kv->kv_ops->error(ldb_kv);
1604         }
1605
1606         ldb_kv_index_transaction_start(
1607                 module,
1608                 ldb_kv->index_transaction_cache_size);
1609
1610         ldb_kv->reindex_failed = false;
1611         ldb_kv->operation_failed = false;
1612
1613         return LDB_SUCCESS;
1614 }
1615
1616 /*
1617  * Forward declaration to allow prepare_commit to in fact abort the
1618  * transaction
1619  */
1620 static int ldb_kv_del_trans(struct ldb_module *module);
1621
1622 static int ldb_kv_prepare_commit(struct ldb_module *module)
1623 {
1624         int ret;
1625         void *data = ldb_module_get_private(module);
1626         struct ldb_kv_private *ldb_kv =
1627             talloc_get_type(data, struct ldb_kv_private);
1628         pid_t pid = getpid();
1629
1630         if (ldb_kv->pid != pid) {
1631                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1632                                        __location__
1633                                        ": Reusing ldb opend by pid %d in "
1634                                        "process %d\n",
1635                                        ldb_kv->pid,
1636                                        pid);
1637                 return LDB_ERR_PROTOCOL_ERROR;
1638         }
1639
1640         if (!ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1641                 ldb_set_errstring(ldb_module_get_ctx(module),
1642                                   "ltdb_prepare_commit() called "
1643                                   "without transaction active");
1644                 return LDB_ERR_OPERATIONS_ERROR;
1645         }
1646
1647         /*
1648          * Check if the last re-index failed.
1649          *
1650          * This can happen if for example a duplicate value was marked
1651          * unique.  We must not write a partial re-index into the DB.
1652          */
1653         if (ldb_kv->reindex_failed) {
1654                 /*
1655                  * We must instead abort the transaction so we get the
1656                  * old values and old index back
1657                  */
1658                 ldb_kv_del_trans(module);
1659                 ldb_set_errstring(ldb_module_get_ctx(module),
1660                                   "Failure during re-index, so "
1661                                   "transaction must be aborted.");
1662                 return LDB_ERR_OPERATIONS_ERROR;
1663         }
1664
1665         ret = ldb_kv_index_transaction_commit(module);
1666         if (ret != LDB_SUCCESS) {
1667                 ldb_kv->kv_ops->abort_write(ldb_kv);
1668                 return ret;
1669         }
1670
1671         /*
1672          * If GUID indexing was toggled in this transaction, we repack at
1673          * format version 2 if GUID indexing was enabled, or version 1 if
1674          * it was disabled.
1675          */
1676         ret = ldb_kv_maybe_repack(ldb_kv);
1677         if (ret != LDB_SUCCESS) {
1678                 ldb_kv_del_trans(module);
1679                 ldb_set_errstring(ldb_module_get_ctx(module),
1680                                   "Failure during re-pack, so "
1681                                   "transaction must be aborted.");
1682                 return ret;
1683         }
1684
1685         if (ldb_kv->kv_ops->prepare_write(ldb_kv) != 0) {
1686                 ret = ldb_kv->kv_ops->error(ldb_kv);
1687                 ldb_debug_set(ldb_module_get_ctx(module),
1688                               LDB_DEBUG_FATAL,
1689                               "Failure during "
1690                               "prepare_write): %s -> %s",
1691                               ldb_kv->kv_ops->errorstr(ldb_kv),
1692                               ldb_strerror(ret));
1693                 return ret;
1694         }
1695
1696         ldb_kv->prepared_commit = true;
1697
1698         return LDB_SUCCESS;
1699 }
1700
1701 static int ldb_kv_end_trans(struct ldb_module *module)
1702 {
1703         int ret;
1704         void *data = ldb_module_get_private(module);
1705         struct ldb_kv_private *ldb_kv =
1706             talloc_get_type(data, struct ldb_kv_private);
1707
1708         /*
1709          * If in batch mode and there has been an operation failure
1710          * rollback the transaction rather than committing it to avoid
1711          * any possible corruption
1712          */
1713         if (ldb_kv->batch_mode && ldb_kv->operation_failed) {
1714                 ret = ldb_kv_del_trans( module);
1715                 if (ret != LDB_SUCCESS) {
1716                         ldb_debug_set(ldb_module_get_ctx(module),
1717                                       LDB_DEBUG_FATAL,
1718                                       "An operation failed during a batch mode "
1719                                       "transaction. The transaction could not"
1720                                       "be rolled back, ldb_kv_del_trans "
1721                                       "returned (%s, %s)",
1722                                       ldb_kv->kv_ops->errorstr(ldb_kv),
1723                                       ldb_strerror(ret));
1724                 } else {
1725                         ldb_debug_set(ldb_module_get_ctx(module),
1726                                       LDB_DEBUG_FATAL,
1727                                       "An operation failed during a batch mode "
1728                                       "transaction, the transaction was "
1729                                       "rolled back");
1730                 }
1731                 return LDB_ERR_OPERATIONS_ERROR;
1732         }
1733
1734         if (!ldb_kv->prepared_commit) {
1735                 ret = ldb_kv_prepare_commit(module);
1736                 if (ret != LDB_SUCCESS) {
1737                         return ret;
1738                 }
1739         }
1740
1741         ldb_kv->prepared_commit = false;
1742
1743         if (ldb_kv->kv_ops->finish_write(ldb_kv) != 0) {
1744                 ret = ldb_kv->kv_ops->error(ldb_kv);
1745                 ldb_asprintf_errstring(
1746                     ldb_module_get_ctx(module),
1747                     "Failure during tdb_transaction_commit(): %s -> %s",
1748                     ldb_kv->kv_ops->errorstr(ldb_kv),
1749                     ldb_strerror(ret));
1750                 return ret;
1751         }
1752
1753         return LDB_SUCCESS;
1754 }
1755
1756 static int ldb_kv_del_trans(struct ldb_module *module)
1757 {
1758         void *data = ldb_module_get_private(module);
1759         struct ldb_kv_private *ldb_kv =
1760             talloc_get_type(data, struct ldb_kv_private);
1761
1762         if (ldb_kv_index_transaction_cancel(module) != 0) {
1763                 ldb_kv->kv_ops->abort_write(ldb_kv);
1764                 return ldb_kv->kv_ops->error(ldb_kv);
1765         }
1766
1767         ldb_kv->kv_ops->abort_write(ldb_kv);
1768         return LDB_SUCCESS;
1769 }
1770
1771 /*
1772   return sequenceNumber from @BASEINFO
1773 */
1774 static int ldb_kv_sequence_number(struct ldb_kv_context *ctx,
1775                                   struct ldb_extended **ext)
1776 {
1777         struct ldb_context *ldb;
1778         struct ldb_module *module = ctx->module;
1779         struct ldb_request *req = ctx->req;
1780         void *data = ldb_module_get_private(module);
1781         struct ldb_kv_private *ldb_kv =
1782             talloc_get_type(data, struct ldb_kv_private);
1783         TALLOC_CTX *tmp_ctx = NULL;
1784         struct ldb_seqnum_request *seq;
1785         struct ldb_seqnum_result *res;
1786         struct ldb_message *msg = NULL;
1787         struct ldb_dn *dn;
1788         const char *date;
1789         int ret = LDB_SUCCESS;
1790
1791         ldb = ldb_module_get_ctx(module);
1792
1793         seq = talloc_get_type(req->op.extended.data,
1794                                 struct ldb_seqnum_request);
1795         if (seq == NULL) {
1796                 return LDB_ERR_OPERATIONS_ERROR;
1797         }
1798
1799         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1800
1801         if (ldb_kv->kv_ops->lock_read(module) != 0) {
1802                 return LDB_ERR_OPERATIONS_ERROR;
1803         }
1804
1805         res = talloc_zero(req, struct ldb_seqnum_result);
1806         if (res == NULL) {
1807                 ret = LDB_ERR_OPERATIONS_ERROR;
1808                 goto done;
1809         }
1810
1811         tmp_ctx = talloc_new(req);
1812         if (tmp_ctx == NULL) {
1813                 ret = LDB_ERR_OPERATIONS_ERROR;
1814                 goto done;
1815         }
1816
1817         dn = ldb_dn_new(tmp_ctx, ldb, LDB_KV_BASEINFO);
1818         if (dn == NULL) {
1819                 ret = LDB_ERR_OPERATIONS_ERROR;
1820                 goto done;
1821         }
1822
1823         msg = ldb_msg_new(tmp_ctx);
1824         if (msg == NULL) {
1825                 ret = LDB_ERR_OPERATIONS_ERROR;
1826                 goto done;
1827         }
1828
1829         ret = ldb_kv_search_dn1(module, dn, msg, 0);
1830         if (ret != LDB_SUCCESS) {
1831                 goto done;
1832         }
1833
1834         switch (seq->type) {
1835         case LDB_SEQ_HIGHEST_SEQ:
1836                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1837                 break;
1838         case LDB_SEQ_NEXT:
1839                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1840                 res->seq_num++;
1841                 break;
1842         case LDB_SEQ_HIGHEST_TIMESTAMP:
1843                 date = ldb_msg_find_attr_as_string(msg, LDB_KV_MOD_TIMESTAMP, NULL);
1844                 if (date) {
1845                         res->seq_num = ldb_string_to_time(date);
1846                 } else {
1847                         res->seq_num = 0;
1848                         /* zero is as good as anything when we don't know */
1849                 }
1850                 break;
1851         }
1852
1853         *ext = talloc_zero(req, struct ldb_extended);
1854         if (*ext == NULL) {
1855                 ret = LDB_ERR_OPERATIONS_ERROR;
1856                 goto done;
1857         }
1858         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1859         (*ext)->data = talloc_steal(*ext, res);
1860
1861 done:
1862         talloc_free(tmp_ctx);
1863
1864         ldb_kv->kv_ops->unlock_read(module);
1865         return ret;
1866 }
1867
1868 static void ldb_kv_request_done(struct ldb_kv_context *ctx, int error)
1869 {
1870         struct ldb_context *ldb;
1871         struct ldb_request *req;
1872         struct ldb_reply *ares;
1873
1874         ldb = ldb_module_get_ctx(ctx->module);
1875         req = ctx->req;
1876
1877         /* if we already returned an error just return */
1878         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1879                 return;
1880         }
1881
1882         ares = talloc_zero(req, struct ldb_reply);
1883         if (!ares) {
1884                 ldb_oom(ldb);
1885                 req->callback(req, NULL);
1886                 return;
1887         }
1888         ares->type = LDB_REPLY_DONE;
1889         ares->error = error;
1890
1891         req->callback(req, ares);
1892 }
1893
1894 static void ldb_kv_timeout(_UNUSED_ struct tevent_context *ev,
1895                            _UNUSED_ struct tevent_timer *te,
1896                            _UNUSED_ struct timeval t,
1897                            void *private_data)
1898 {
1899         struct ldb_kv_context *ctx;
1900         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1901
1902         if (!ctx->request_terminated) {
1903                 /* request is done now */
1904                 ldb_kv_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1905         }
1906
1907         if (ctx->spy) {
1908                 /* neutralize the spy */
1909                 ctx->spy->ctx = NULL;
1910                 ctx->spy = NULL;
1911         }
1912         talloc_free(ctx);
1913 }
1914
1915 static void ldb_kv_request_extended_done(struct ldb_kv_context *ctx,
1916                                          struct ldb_extended *ext,
1917                                          int error)
1918 {
1919         struct ldb_context *ldb;
1920         struct ldb_request *req;
1921         struct ldb_reply *ares;
1922
1923         ldb = ldb_module_get_ctx(ctx->module);
1924         req = ctx->req;
1925
1926         /* if we already returned an error just return */
1927         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1928                 return;
1929         }
1930
1931         ares = talloc_zero(req, struct ldb_reply);
1932         if (!ares) {
1933                 ldb_oom(ldb);
1934                 req->callback(req, NULL);
1935                 return;
1936         }
1937         ares->type = LDB_REPLY_DONE;
1938         ares->response = ext;
1939         ares->error = error;
1940
1941         req->callback(req, ares);
1942 }
1943
1944 static void ldb_kv_handle_extended(struct ldb_kv_context *ctx)
1945 {
1946         struct ldb_extended *ext = NULL;
1947         int ret;
1948
1949         if (strcmp(ctx->req->op.extended.oid,
1950                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1951                 /* get sequence number */
1952                 ret = ldb_kv_sequence_number(ctx, &ext);
1953         } else {
1954                 /* not recognized */
1955                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1956         }
1957
1958         ldb_kv_request_extended_done(ctx, ext, ret);
1959 }
1960
1961 static void ldb_kv_callback(struct tevent_context *ev,
1962                             struct tevent_timer *te,
1963                             struct timeval t,
1964                             void *private_data)
1965 {
1966         struct ldb_kv_context *ctx;
1967         int ret;
1968
1969         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1970
1971         if (ctx->request_terminated) {
1972                 goto done;
1973         }
1974
1975         switch (ctx->req->operation) {
1976         case LDB_SEARCH:
1977                 ret = ldb_kv_search(ctx);
1978                 break;
1979         case LDB_ADD:
1980                 ret = ldb_kv_add(ctx);
1981                 break;
1982         case LDB_MODIFY:
1983                 ret = ldb_kv_modify(ctx);
1984                 break;
1985         case LDB_DELETE:
1986                 ret = ldb_kv_delete(ctx);
1987                 break;
1988         case LDB_RENAME:
1989                 ret = ldb_kv_rename(ctx);
1990                 break;
1991         case LDB_EXTENDED:
1992                 ldb_kv_handle_extended(ctx);
1993                 goto done;
1994         default:
1995                 /* no other op supported */
1996                 ret = LDB_ERR_PROTOCOL_ERROR;
1997         }
1998
1999         if (!ctx->request_terminated) {
2000                 /* request is done now */
2001                 ldb_kv_request_done(ctx, ret);
2002         }
2003
2004 done:
2005         if (ctx->spy) {
2006                 /* neutralize the spy */
2007                 ctx->spy->ctx = NULL;
2008                 ctx->spy = NULL;
2009         }
2010         talloc_free(ctx);
2011 }
2012
2013 static int ldb_kv_request_destructor(void *ptr)
2014 {
2015         struct ldb_kv_req_spy *spy =
2016             talloc_get_type(ptr, struct ldb_kv_req_spy);
2017
2018         if (spy->ctx != NULL) {
2019                 spy->ctx->spy = NULL;
2020                 spy->ctx->request_terminated = true;
2021                 spy->ctx = NULL;
2022         }
2023
2024         return 0;
2025 }
2026
2027 static int ldb_kv_handle_request(struct ldb_module *module,
2028                                  struct ldb_request *req)
2029 {
2030         struct ldb_control *control_permissive;
2031         struct ldb_context *ldb;
2032         struct tevent_context *ev;
2033         struct ldb_kv_context *ac;
2034         struct tevent_timer *te;
2035         struct timeval tv;
2036         unsigned int i;
2037
2038         ldb = ldb_module_get_ctx(module);
2039
2040         control_permissive = ldb_request_get_control(req,
2041                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2042
2043         for (i = 0; req->controls && req->controls[i]; i++) {
2044                 if (req->controls[i]->critical &&
2045                     req->controls[i] != control_permissive) {
2046                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
2047                                                req->controls[i]->oid);
2048                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
2049                 }
2050         }
2051
2052         if (req->starttime == 0 || req->timeout == 0) {
2053                 ldb_set_errstring(ldb, "Invalid timeout settings");
2054                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
2055         }
2056
2057         ev = ldb_handle_get_event_context(req->handle);
2058
2059         ac = talloc_zero(ldb, struct ldb_kv_context);
2060         if (ac == NULL) {
2061                 ldb_oom(ldb);
2062                 return LDB_ERR_OPERATIONS_ERROR;
2063         }
2064
2065         ac->module = module;
2066         ac->req = req;
2067
2068         tv.tv_sec = 0;
2069         tv.tv_usec = 0;
2070         te = tevent_add_timer(ev, ac, tv, ldb_kv_callback, ac);
2071         if (NULL == te) {
2072                 talloc_free(ac);
2073                 return LDB_ERR_OPERATIONS_ERROR;
2074         }
2075
2076         if (req->timeout > 0) {
2077                 tv.tv_sec = req->starttime + req->timeout;
2078                 tv.tv_usec = 0;
2079                 ac->timeout_event =
2080                     tevent_add_timer(ev, ac, tv, ldb_kv_timeout, ac);
2081                 if (NULL == ac->timeout_event) {
2082                         talloc_free(ac);
2083                         return LDB_ERR_OPERATIONS_ERROR;
2084                 }
2085         }
2086
2087         /* set a spy so that we do not try to use the request context
2088          * if it is freed before ltdb_callback fires */
2089         ac->spy = talloc(req, struct ldb_kv_req_spy);
2090         if (NULL == ac->spy) {
2091                 talloc_free(ac);
2092                 return LDB_ERR_OPERATIONS_ERROR;
2093         }
2094         ac->spy->ctx = ac;
2095
2096         talloc_set_destructor((TALLOC_CTX *)ac->spy, ldb_kv_request_destructor);
2097
2098         return LDB_SUCCESS;
2099 }
2100
2101 static int ldb_kv_init_rootdse(struct ldb_module *module)
2102 {
2103         /* ignore errors on this - we expect it for non-sam databases */
2104         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2105
2106         /* there can be no module beyond the backend, just return */
2107         return LDB_SUCCESS;
2108 }
2109
2110 static int ldb_kv_lock_read(struct ldb_module *module)
2111 {
2112         void *data = ldb_module_get_private(module);
2113         struct ldb_kv_private *ldb_kv =
2114             talloc_get_type(data, struct ldb_kv_private);
2115         return ldb_kv->kv_ops->lock_read(module);
2116 }
2117
2118 static int ldb_kv_unlock_read(struct ldb_module *module)
2119 {
2120         void *data = ldb_module_get_private(module);
2121         struct ldb_kv_private *ldb_kv =
2122             talloc_get_type(data, struct ldb_kv_private);
2123         return ldb_kv->kv_ops->unlock_read(module);
2124 }
2125
2126 static const struct ldb_module_ops ldb_kv_ops = {
2127     .name = "tdb",
2128     .init_context = ldb_kv_init_rootdse,
2129     .search = ldb_kv_handle_request,
2130     .add = ldb_kv_handle_request,
2131     .modify = ldb_kv_handle_request,
2132     .del = ldb_kv_handle_request,
2133     .rename = ldb_kv_handle_request,
2134     .extended = ldb_kv_handle_request,
2135     .start_transaction = ldb_kv_start_trans,
2136     .end_transaction = ldb_kv_end_trans,
2137     .prepare_commit = ldb_kv_prepare_commit,
2138     .del_transaction = ldb_kv_del_trans,
2139     .read_lock = ldb_kv_lock_read,
2140     .read_unlock = ldb_kv_unlock_read,
2141 };
2142
2143 int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
2144                       const char *name,
2145                       struct ldb_context *ldb,
2146                       const char *options[],
2147                       struct ldb_module **_module)
2148 {
2149         if (getenv("LDB_WARN_UNINDEXED")) {
2150                 ldb_kv->warn_unindexed = true;
2151         }
2152
2153         if (getenv("LDB_WARN_REINDEX")) {
2154                 ldb_kv->warn_reindex = true;
2155         }
2156
2157         ldb_kv->sequence_number = 0;
2158
2159         ldb_kv->pid = getpid();
2160
2161         ldb_kv->pack_format_override = 0;
2162
2163         ldb_kv->module = ldb_module_new(ldb, ldb, name, &ldb_kv_ops);
2164         if (!ldb_kv->module) {
2165                 ldb_oom(ldb);
2166                 talloc_free(ldb_kv);
2167                 return LDB_ERR_OPERATIONS_ERROR;
2168         }
2169         ldb_module_set_private(ldb_kv->module, ldb_kv);
2170         talloc_steal(ldb_kv->module, ldb_kv);
2171
2172         if (ldb_kv_cache_load(ldb_kv->module) != 0) {
2173                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2174                                        "records for backend '%s'", name);
2175                 talloc_free(ldb_kv->module);
2176                 return LDB_ERR_OPERATIONS_ERROR;
2177         }
2178
2179         *_module = ldb_kv->module;
2180         /*
2181          * Set or override the maximum key length
2182          *
2183          * The ldb_mdb code will have set this to 511, but our tests
2184          * set this even smaller (to make the tests more practical).
2185          *
2186          * This must only be used for the selftest as the length
2187          * becomes encoded in the index keys.
2188          */
2189         {
2190                 const char *len_str =
2191                         ldb_options_find(ldb, options,
2192                                          "max_key_len_for_self_test");
2193                 if (len_str != NULL) {
2194                         unsigned len = strtoul(len_str, NULL, 0);
2195                         ldb_kv->max_key_length = len;
2196                 }
2197         }
2198
2199         /*
2200          * Usually the presence of GUID indexing determines the pack format
2201          * we use but in certain circumstances such as downgrading an
2202          * MDB-backed database, we want to override the target pack format.
2203          *
2204          * We set/get opaques here because in the Samba partitions module,
2205          * 'options' are not passed correctly so sub-databases can't see
2206          * the options they need.
2207          */
2208         {
2209                 const char *pack_format_override =
2210                         ldb_options_find(ldb, options, "pack_format_override");
2211                 if (pack_format_override != NULL) {
2212                         int ret;
2213                         ldb_kv->pack_format_override =
2214                                 strtoul(pack_format_override, NULL, 0);
2215                         ret = ldb_set_opaque(ldb,
2216                                              "pack_format_override",
2217                              (void *)(intptr_t)ldb_kv->pack_format_override);
2218                         if (ret != LDB_SUCCESS) {
2219                                 talloc_free(ldb_kv->module);
2220                                 return ldb_module_operr(ldb_kv->module);
2221                         }
2222                 } else {
2223                         /*
2224                          * NULL -> 0 is fine, otherwise we get back
2225                          * the number we needed
2226                          */
2227                         ldb_kv->pack_format_override
2228                                 = (intptr_t)ldb_get_opaque(ldb,
2229                                                    "pack_format_override");
2230                 }
2231         }
2232
2233         /*
2234          * Override full DB scans
2235          *
2236          * A full DB scan is expensive on a large database.  This
2237          * option is for testing to show that the full DB scan is not
2238          * triggered.
2239          */
2240         {
2241                 const char *len_str =
2242                         ldb_options_find(ldb, options,
2243                                          "disable_full_db_scan_for_self_test");
2244                 if (len_str != NULL) {
2245                         ldb_kv->disable_full_db_scan = true;
2246                 }
2247         }
2248
2249         /*
2250          * Set the size of the transaction index cache.
2251          * If the ldb option "transaction_index_cache_size" is set use that
2252          * otherwise use DEFAULT_INDEX_CACHE_SIZE
2253          */
2254         ldb_kv->index_transaction_cache_size = DEFAULT_INDEX_CACHE_SIZE;
2255         {
2256                 const char *size = ldb_options_find(
2257                         ldb,
2258                         options,
2259                         "transaction_index_cache_size");
2260                 if (size != NULL) {
2261                         size_t cache_size = 0;
2262                         errno = 0;
2263
2264                         cache_size = strtoul( size, NULL, 0);
2265                         if (cache_size == 0 || errno == ERANGE) {
2266                                 ldb_debug(
2267                                         ldb,
2268                                         LDB_DEBUG_WARNING,
2269                                         "Invalid transaction_index_cache_size "
2270                                         "value [%s], using default(%d)\n",
2271                                         size,
2272                                         DEFAULT_INDEX_CACHE_SIZE);
2273                         } else {
2274                                 ldb_kv->index_transaction_cache_size =
2275                                         cache_size;
2276                         }
2277                 }
2278         }
2279         /*
2280          * Set batch mode operation.
2281          * This disables the nested sub transactions, and increases the
2282          * chance of index corruption.  If using this mode the transaction
2283          * commit will be aborted if any operation fails.
2284          */
2285         {
2286                 const char *batch_mode = ldb_options_find(
2287                         ldb, options, "batch_mode");
2288                 if (batch_mode != NULL) {
2289                         ldb_kv->batch_mode = true;
2290                 }
2291         }
2292
2293         return LDB_SUCCESS;
2294 }