ldb_tdb: Add ltdb_idx_to_key() and use it in ltdb_index_filter()
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 TDB_DATA ltdb_guid_to_key(struct ldb_module *module,
225                           struct ltdb_private *ltdb,
226                           TALLOC_CTX *mem_ctx,
227                           const struct ldb_val *GUID_val)
228 {
229         TDB_DATA key;
230         const char *GUID_prefix = "GUID=";
231         const int GUID_prefix_len = strlen(GUID_prefix);
232
233         key.dptr = talloc_size(mem_ctx,
234                                GUID_val->length+GUID_prefix_len);
235
236         if (key.dptr == NULL) {
237                 errno = ENOMEM;
238                 key.dptr = NULL;
239                 key.dsize = 0;
240                 return key;
241         }
242         memcpy(key.dptr, "GUID=", GUID_prefix_len);
243         memcpy(&key.dptr[GUID_prefix_len],
244                GUID_val->data, GUID_val->length);
245
246         key.dsize = talloc_get_size(key.dptr);
247         return key;
248 }
249
250 TDB_DATA ltdb_idx_to_key(struct ldb_module *module,
251                          struct ltdb_private *ltdb,
252                          TALLOC_CTX *mem_ctx,
253                          const struct ldb_val *idx_val)
254 {
255         TDB_DATA key;
256         struct ldb_context *ldb = ldb_module_get_ctx(module);
257         struct ldb_dn *dn;
258
259         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
260         if (dn == NULL) {
261                 errno = EINVAL;
262                 key.dptr = NULL;
263                 key.dsize = 0;
264                 return key;
265         }
266         /* form the key */
267         key = ltdb_key_dn(module, mem_ctx, dn);
268         TALLOC_FREE(dn);
269         if (!key.dptr) {
270                 errno = ENOMEM;
271                 key.dptr = NULL;
272                 key.dsize = 0;
273                 return key;
274         }
275         return key;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293
294         if (ltdb->cache->GUID_index_attribute == NULL) {
295                 return ltdb_key_dn(module, mem_ctx, msg->dn);
296         }
297
298         if (ldb_dn_is_special(msg->dn)) {
299                 return ltdb_key_dn(module, mem_ctx, msg->dn);
300         }
301
302         guid_val = ldb_msg_find_ldb_val(msg,
303                                        ltdb->cache->GUID_index_attribute);
304         if (guid_val == NULL) {
305                 errno = EINVAL;
306                 key.dptr = NULL;
307                 key.dsize = 0;
308                 return key;
309         }
310
311         return ltdb_guid_to_key(module, ltdb, mem_ctx, guid_val);
312
313 }
314
315 /*
316   check special dn's have valid attributes
317   currently only @ATTRIBUTES is checked
318 */
319 static int ltdb_check_special_dn(struct ldb_module *module,
320                                  const struct ldb_message *msg)
321 {
322         struct ldb_context *ldb = ldb_module_get_ctx(module);
323         unsigned int i, j;
324
325         if (! ldb_dn_is_special(msg->dn) ||
326             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
327                 return LDB_SUCCESS;
328         }
329
330         /* we have @ATTRIBUTES, let's check attributes are fine */
331         /* should we check that we deny multivalued attributes ? */
332         for (i = 0; i < msg->num_elements; i++) {
333                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
334
335                 for (j = 0; j < msg->elements[i].num_values; j++) {
336                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
337                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
338                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
339                         }
340                 }
341         }
342
343         return LDB_SUCCESS;
344 }
345
346
347 /*
348   we've made a modification to a dn - possibly reindex and
349   update sequence number
350 */
351 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
352 {
353         int ret = LDB_SUCCESS;
354         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
355
356         /* only allow modifies inside a transaction, otherwise the
357          * ldb is unsafe */
358         if (ltdb->in_transaction == 0) {
359                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
360                 return LDB_ERR_OPERATIONS_ERROR;
361         }
362
363         if (ldb_dn_is_special(dn) &&
364             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
365              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
366         {
367                 if (ltdb->warn_reindex) {
368                         ldb_debug(ldb_module_get_ctx(module),
369                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
370                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
371                 }
372                 ret = ltdb_reindex(module);
373         }
374
375         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
376         if (ret == LDB_SUCCESS &&
377             !(ldb_dn_is_special(dn) &&
378               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
379                 ret = ltdb_increase_sequence_number(module);
380         }
381
382         /* If the modify was to @OPTIONS, reload the cache */
383         if (ret == LDB_SUCCESS &&
384             ldb_dn_is_special(dn) &&
385             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
386                 ret = ltdb_cache_reload(module);
387         }
388
389         return ret;
390 }
391
392 /*
393   store a record into the db
394 */
395 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
396 {
397         void *data = ldb_module_get_private(module);
398         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
399         TDB_DATA tdb_key, tdb_data;
400         struct ldb_val ldb_data;
401         int ret = LDB_SUCCESS;
402         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
403
404         if (tdb_key_ctx == NULL) {
405                 return ldb_module_oom(module);
406         }
407
408         if (ltdb->read_only) {
409                 return LDB_ERR_UNWILLING_TO_PERFORM;
410         }
411
412         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
413         if (tdb_key.dptr == NULL) {
414                 TALLOC_FREE(tdb_key_ctx);
415                 return LDB_ERR_OTHER;
416         }
417
418         ret = ldb_pack_data(ldb_module_get_ctx(module),
419                             msg, &ldb_data);
420         if (ret == -1) {
421                 TALLOC_FREE(tdb_key_ctx);
422                 return LDB_ERR_OTHER;
423         }
424
425         tdb_data.dptr = ldb_data.data;
426         tdb_data.dsize = ldb_data.length;
427
428         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
429         if (ret != 0) {
430                 bool is_special = ldb_dn_is_special(msg->dn);
431                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
432
433                 /*
434                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
435                  * the GUID, so re-map
436                  */
437                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
438                     && !is_special
439                     && ltdb->cache->GUID_index_attribute != NULL) {
440                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
441                 }
442                 goto done;
443         }
444
445 done:
446         TALLOC_FREE(tdb_key_ctx);
447         talloc_free(ldb_data.data);
448
449         return ret;
450 }
451
452
453 /*
454   check if a attribute is a single valued, for a given element
455  */
456 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
457                                   struct ldb_message_element *el)
458 {
459         if (!a) return false;
460         if (el != NULL) {
461                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
462                         /* override from a ldb module, for example
463                            used for the description field, which is
464                            marked multi-valued in the schema but which
465                            should not actually accept multiple
466                            values */
467                         return true;
468                 }
469                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
470                         /* override from a ldb module, for example used for
471                            deleted linked attribute entries */
472                         return false;
473                 }
474         }
475         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
476                 return true;
477         }
478         return false;
479 }
480
481 static int ltdb_add_internal(struct ldb_module *module,
482                              struct ltdb_private *ltdb,
483                              const struct ldb_message *msg,
484                              bool check_single_value)
485 {
486         struct ldb_context *ldb = ldb_module_get_ctx(module);
487         int ret = LDB_SUCCESS;
488         unsigned int i;
489
490         for (i=0;i<msg->num_elements;i++) {
491                 struct ldb_message_element *el = &msg->elements[i];
492                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
493
494                 if (el->num_values == 0) {
495                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
496                                                el->name, ldb_dn_get_linearized(msg->dn));
497                         return LDB_ERR_CONSTRAINT_VIOLATION;
498                 }
499                 if (check_single_value &&
500                                 el->num_values > 1 &&
501                                 ldb_tdb_single_valued(a, el)) {
502                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
503                                                el->name, ldb_dn_get_linearized(msg->dn));
504                         return LDB_ERR_CONSTRAINT_VIOLATION;
505                 }
506
507                 /* Do not check "@ATTRIBUTES" for duplicated values */
508                 if (ldb_dn_is_special(msg->dn) &&
509                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
510                         continue;
511                 }
512
513                 if (check_single_value &&
514                     !(el->flags &
515                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
516                         struct ldb_val *duplicate = NULL;
517
518                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
519                                                          el, &duplicate, 0);
520                         if (ret != LDB_SUCCESS) {
521                                 return ret;
522                         }
523                         if (duplicate != NULL) {
524                                 ldb_asprintf_errstring(
525                                         ldb,
526                                         "attribute '%s': value '%.*s' on '%s' "
527                                         "provided more than once in ADD object",
528                                         el->name,
529                                         (int)duplicate->length,
530                                         duplicate->data,
531                                         ldb_dn_get_linearized(msg->dn));
532                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
533                         }
534                 }
535         }
536
537         ret = ltdb_store(module, msg, TDB_INSERT);
538         if (ret != LDB_SUCCESS) {
539                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
540                         ldb_asprintf_errstring(ldb,
541                                                "Entry %s already exists",
542                                                ldb_dn_get_linearized(msg->dn));
543                 }
544                 return ret;
545         }
546
547         ret = ltdb_index_add_new(module, ltdb, msg);
548         if (ret != LDB_SUCCESS) {
549                 /*
550                  * If we failed to index, delete the message again.
551                  *
552                  * This is particularly important for the GUID index
553                  * case, which will only fail for a duplicate DN
554                  * in the index add.
555                  *
556                  * Note that the caller may not cancel the transation
557                  * and this means the above add might really show up!
558                  */
559                 ltdb_delete_noindex(module, msg);
560                 return ret;
561         }
562
563         ret = ltdb_modified(module, msg->dn);
564
565         return ret;
566 }
567
568 /*
569   add a record to the database
570 */
571 static int ltdb_add(struct ltdb_context *ctx)
572 {
573         struct ldb_module *module = ctx->module;
574         struct ldb_request *req = ctx->req;
575         void *data = ldb_module_get_private(module);
576         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
577         int ret = LDB_SUCCESS;
578
579         ret = ltdb_check_special_dn(module, req->op.add.message);
580         if (ret != LDB_SUCCESS) {
581                 return ret;
582         }
583
584         ldb_request_set_state(req, LDB_ASYNC_PENDING);
585
586         if (ltdb_cache_load(module) != 0) {
587                 return LDB_ERR_OPERATIONS_ERROR;
588         }
589
590         ret = ltdb_add_internal(module, ltdb,
591                                 req->op.add.message, true);
592
593         return ret;
594 }
595
596 /*
597   delete a record from the database, not updating indexes (used for deleting
598   index records)
599 */
600 int ltdb_delete_noindex(struct ldb_module *module,
601                         const struct ldb_message *msg)
602 {
603         void *data = ldb_module_get_private(module);
604         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
605         TDB_DATA tdb_key;
606         int ret;
607         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
608
609         if (tdb_key_ctx == NULL) {
610                 return ldb_module_oom(module);
611         }
612
613         if (ltdb->read_only) {
614                 return LDB_ERR_UNWILLING_TO_PERFORM;
615         }
616
617         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
618         if (!tdb_key.dptr) {
619                 TALLOC_FREE(tdb_key_ctx);
620                 return LDB_ERR_OTHER;
621         }
622
623         ret = tdb_delete(ltdb->tdb, tdb_key);
624         TALLOC_FREE(tdb_key_ctx);
625
626         if (ret != 0) {
627                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
628         }
629
630         return ret;
631 }
632
633 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
634 {
635         struct ldb_message *msg;
636         int ret = LDB_SUCCESS;
637
638         msg = ldb_msg_new(module);
639         if (msg == NULL) {
640                 return LDB_ERR_OPERATIONS_ERROR;
641         }
642
643         /* in case any attribute of the message was indexed, we need
644            to fetch the old record */
645         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
646         if (ret != LDB_SUCCESS) {
647                 /* not finding the old record is an error */
648                 goto done;
649         }
650
651         ret = ltdb_delete_noindex(module, msg);
652         if (ret != LDB_SUCCESS) {
653                 goto done;
654         }
655
656         /* remove any indexed attributes */
657         ret = ltdb_index_delete(module, msg);
658         if (ret != LDB_SUCCESS) {
659                 goto done;
660         }
661
662         ret = ltdb_modified(module, dn);
663         if (ret != LDB_SUCCESS) {
664                 goto done;
665         }
666
667 done:
668         talloc_free(msg);
669         return ret;
670 }
671
672 /*
673   delete a record from the database
674 */
675 static int ltdb_delete(struct ltdb_context *ctx)
676 {
677         struct ldb_module *module = ctx->module;
678         struct ldb_request *req = ctx->req;
679         int ret = LDB_SUCCESS;
680
681         ldb_request_set_state(req, LDB_ASYNC_PENDING);
682
683         if (ltdb_cache_load(module) != 0) {
684                 return LDB_ERR_OPERATIONS_ERROR;
685         }
686
687         ret = ltdb_delete_internal(module, req->op.del.dn);
688
689         return ret;
690 }
691
692 /*
693   find an element by attribute name. At the moment this does a linear search,
694   it should be re-coded to use a binary search once all places that modify
695   records guarantee sorted order
696
697   return the index of the first matching element if found, otherwise -1
698 */
699 static int find_element(const struct ldb_message *msg, const char *name)
700 {
701         unsigned int i;
702         for (i=0;i<msg->num_elements;i++) {
703                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
704                         return i;
705                 }
706         }
707         return -1;
708 }
709
710
711 /*
712   add an element to an existing record. Assumes a elements array that we
713   can call re-alloc on, and assumed that we can re-use the data pointers from
714   the passed in additional values. Use with care!
715
716   returns 0 on success, -1 on failure (and sets errno)
717 */
718 static int ltdb_msg_add_element(struct ldb_message *msg,
719                                 struct ldb_message_element *el)
720 {
721         struct ldb_message_element *e2;
722         unsigned int i;
723
724         if (el->num_values == 0) {
725                 /* nothing to do here - we don't add empty elements */
726                 return 0;
727         }
728
729         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
730                               msg->num_elements+1);
731         if (!e2) {
732                 errno = ENOMEM;
733                 return -1;
734         }
735
736         msg->elements = e2;
737
738         e2 = &msg->elements[msg->num_elements];
739
740         e2->name = el->name;
741         e2->flags = el->flags;
742         e2->values = talloc_array(msg->elements,
743                                   struct ldb_val, el->num_values);
744         if (!e2->values) {
745                 errno = ENOMEM;
746                 return -1;
747         }
748         for (i=0;i<el->num_values;i++) {
749                 e2->values[i] = el->values[i];
750         }
751         e2->num_values = el->num_values;
752
753         ++msg->num_elements;
754
755         return 0;
756 }
757
758 /*
759   delete all elements having a specified attribute name
760 */
761 static int msg_delete_attribute(struct ldb_module *module,
762                                 struct ltdb_private *ltdb,
763                                 struct ldb_message *msg, const char *name)
764 {
765         unsigned int i;
766         int ret;
767         struct ldb_message_element *el;
768         bool is_special = ldb_dn_is_special(msg->dn);
769
770         if (!is_special
771             && ltdb->cache->GUID_index_attribute != NULL
772             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
773                 struct ldb_context *ldb = ldb_module_get_ctx(module);
774                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
775                                        "attribute %s (used as DB index)",
776                                        ltdb->cache->GUID_index_attribute);
777                 return LDB_ERR_CONSTRAINT_VIOLATION;
778         }
779
780         el = ldb_msg_find_element(msg, name);
781         if (el == NULL) {
782                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
783         }
784         i = el - msg->elements;
785
786         ret = ltdb_index_del_element(module, ltdb, msg, el);
787         if (ret != LDB_SUCCESS) {
788                 return ret;
789         }
790
791         talloc_free(el->values);
792         if (msg->num_elements > (i+1)) {
793                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
794         }
795         msg->num_elements--;
796         msg->elements = talloc_realloc(msg, msg->elements,
797                                        struct ldb_message_element,
798                                        msg->num_elements);
799         return LDB_SUCCESS;
800 }
801
802 /*
803   delete all elements matching an attribute name/value
804
805   return LDB Error on failure
806 */
807 static int msg_delete_element(struct ldb_module *module,
808                               struct ltdb_private *ltdb,
809                               struct ldb_message *msg,
810                               const char *name,
811                               const struct ldb_val *val)
812 {
813         struct ldb_context *ldb = ldb_module_get_ctx(module);
814         unsigned int i;
815         int found, ret;
816         struct ldb_message_element *el;
817         const struct ldb_schema_attribute *a;
818
819         found = find_element(msg, name);
820         if (found == -1) {
821                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
822         }
823
824         i = (unsigned int) found;
825         el = &(msg->elements[i]);
826
827         a = ldb_schema_attribute_by_name(ldb, el->name);
828
829         for (i=0;i<el->num_values;i++) {
830                 bool matched;
831                 if (a->syntax->operator_fn) {
832                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
833                                                      &el->values[i], val, &matched);
834                         if (ret != LDB_SUCCESS) return ret;
835                 } else {
836                         matched = (a->syntax->comparison_fn(ldb, ldb,
837                                                             &el->values[i], val) == 0);
838                 }
839                 if (matched) {
840                         if (el->num_values == 1) {
841                                 return msg_delete_attribute(module,
842                                                             ltdb, msg, name);
843                         }
844
845                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
846                         if (ret != LDB_SUCCESS) {
847                                 return ret;
848                         }
849
850                         if (i<el->num_values-1) {
851                                 memmove(&el->values[i], &el->values[i+1],
852                                         sizeof(el->values[i])*
853                                                 (el->num_values-(i+1)));
854                         }
855                         el->num_values--;
856
857                         /* per definition we find in a canonicalised message an
858                            attribute value only once. So we are finished here */
859                         return LDB_SUCCESS;
860                 }
861         }
862
863         /* Not found */
864         return LDB_ERR_NO_SUCH_ATTRIBUTE;
865 }
866
867
868 /*
869   modify a record - internal interface
870
871   yuck - this is O(n^2). Luckily n is usually small so we probably
872   get away with it, but if we ever have really large attribute lists
873   then we'll need to look at this again
874
875   'req' is optional, and is used to specify controls if supplied
876 */
877 int ltdb_modify_internal(struct ldb_module *module,
878                          const struct ldb_message *msg,
879                          struct ldb_request *req)
880 {
881         struct ldb_context *ldb = ldb_module_get_ctx(module);
882         void *data = ldb_module_get_private(module);
883         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
884         struct ldb_message *msg2;
885         unsigned int i, j;
886         int ret = LDB_SUCCESS, idx;
887         struct ldb_control *control_permissive = NULL;
888         TALLOC_CTX *mem_ctx = talloc_new(req);
889
890         if (mem_ctx == NULL) {
891                 return ldb_module_oom(module);
892         }
893         
894         if (req) {
895                 control_permissive = ldb_request_get_control(req,
896                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
897         }
898
899         msg2 = ldb_msg_new(mem_ctx);
900         if (msg2 == NULL) {
901                 ret = LDB_ERR_OTHER;
902                 goto done;
903         }
904
905         ret = ltdb_search_dn1(module, msg->dn,
906                               msg2,
907                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
908         if (ret != LDB_SUCCESS) {
909                 goto done;
910         }
911
912         for (i=0; i<msg->num_elements; i++) {
913                 struct ldb_message_element *el = &msg->elements[i], *el2;
914                 struct ldb_val *vals;
915                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
916                 const char *dn;
917                 uint32_t options = 0;
918                 if (control_permissive != NULL) {
919                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
920                 }
921
922                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
923                 case LDB_FLAG_MOD_ADD:
924
925                         if (el->num_values == 0) {
926                                 ldb_asprintf_errstring(ldb,
927                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
928                                                        el->name, ldb_dn_get_linearized(msg2->dn));
929                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
930                                 goto done;
931                         }
932
933                         /* make a copy of the array so that a permissive
934                          * control can remove duplicates without changing the
935                          * original values, but do not copy data as we do not
936                          * need to keep it around once the operation is
937                          * finished */
938                         if (control_permissive) {
939                                 el = talloc(msg2, struct ldb_message_element);
940                                 if (!el) {
941                                         ret = LDB_ERR_OTHER;
942                                         goto done;
943                                 }
944                                 *el = msg->elements[i];
945                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
946                                 if (el->values == NULL) {
947                                         ret = LDB_ERR_OTHER;
948                                         goto done;
949                                 }
950                                 for (j = 0; j < el->num_values; j++) {
951                                         el->values[j] = msg->elements[i].values[j];
952                                 }
953                         }
954
955                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
956                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
957                                                        el->name, ldb_dn_get_linearized(msg2->dn));
958                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
959                                 goto done;
960                         }
961
962                         /* Checks if element already exists */
963                         idx = find_element(msg2, el->name);
964                         if (idx == -1) {
965                                 if (ltdb_msg_add_element(msg2, el) != 0) {
966                                         ret = LDB_ERR_OTHER;
967                                         goto done;
968                                 }
969                                 ret = ltdb_index_add_element(module, ltdb,
970                                                              msg2,
971                                                              el);
972                                 if (ret != LDB_SUCCESS) {
973                                         goto done;
974                                 }
975                         } else {
976                                 j = (unsigned int) idx;
977                                 el2 = &(msg2->elements[j]);
978
979                                 /* We cannot add another value on a existing one
980                                    if the attribute is single-valued */
981                                 if (ldb_tdb_single_valued(a, el)) {
982                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
983                                                                el->name, ldb_dn_get_linearized(msg2->dn));
984                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
985                                         goto done;
986                                 }
987
988                                 /* Check that values don't exist yet on multi-
989                                    valued attributes or aren't provided twice */
990                                 if (!(el->flags &
991                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
992                                         struct ldb_val *duplicate = NULL;
993                                         ret = ldb_msg_find_common_values(ldb,
994                                                                          msg2,
995                                                                          el,
996                                                                          el2,
997                                                                          options);
998
999                                         if (ret ==
1000                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1001                                                 ldb_asprintf_errstring(ldb,
1002                                                         "attribute '%s': value "
1003                                                         "#%u on '%s' already "
1004                                                         "exists", el->name, j,
1005                                                         ldb_dn_get_linearized(msg2->dn));
1006                                                 goto done;
1007                                         } else if (ret != LDB_SUCCESS) {
1008                                                 goto done;
1009                                         }
1010
1011                                         ret = ldb_msg_find_duplicate_val(
1012                                                 ldb, msg2, el, &duplicate, 0);
1013                                         if (ret != LDB_SUCCESS) {
1014                                                 goto done;
1015                                         }
1016                                         if (duplicate != NULL) {
1017                                                 ldb_asprintf_errstring(
1018                                                         ldb,
1019                                                         "attribute '%s': value "
1020                                                         "'%.*s' on '%s' "
1021                                                         "provided more than "
1022                                                         "once in ADD",
1023                                                         el->name,
1024                                                         (int)duplicate->length,
1025                                                         duplicate->data,
1026                                                         ldb_dn_get_linearized(msg->dn));
1027                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1028                                                 goto done;
1029                                         }
1030                                 }
1031
1032                                 /* Now combine existing and new values to a new
1033                                    attribute record */
1034                                 vals = talloc_realloc(msg2->elements,
1035                                                       el2->values, struct ldb_val,
1036                                                       el2->num_values + el->num_values);
1037                                 if (vals == NULL) {
1038                                         ldb_oom(ldb);
1039                                         ret = LDB_ERR_OTHER;
1040                                         goto done;
1041                                 }
1042
1043                                 for (j=0; j<el->num_values; j++) {
1044                                         vals[el2->num_values + j] =
1045                                                 ldb_val_dup(vals, &el->values[j]);
1046                                 }
1047
1048                                 el2->values = vals;
1049                                 el2->num_values += el->num_values;
1050
1051                                 ret = ltdb_index_add_element(module, ltdb,
1052                                                              msg2, el);
1053                                 if (ret != LDB_SUCCESS) {
1054                                         goto done;
1055                                 }
1056                         }
1057
1058                         break;
1059
1060                 case LDB_FLAG_MOD_REPLACE:
1061
1062                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1063                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1064                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1065                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1066                                 goto done;
1067                         }
1068
1069                         /*
1070                          * We don't need to check this if we have been
1071                          * pre-screened by the repl_meta_data module
1072                          * in Samba, or someone else who can claim to
1073                          * know what they are doing. 
1074                          */
1075                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1076                                 struct ldb_val *duplicate = NULL;
1077
1078                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1079                                                                  &duplicate, 0);
1080                                 if (ret != LDB_SUCCESS) {
1081                                         goto done;
1082                                 }
1083                                 if (duplicate != NULL) {
1084                                         ldb_asprintf_errstring(
1085                                                 ldb,
1086                                                 "attribute '%s': value '%.*s' "
1087                                                 "on '%s' provided more than "
1088                                                 "once in REPLACE",
1089                                                 el->name,
1090                                                 (int)duplicate->length,
1091                                                 duplicate->data,
1092                                                 ldb_dn_get_linearized(msg2->dn));
1093                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1094                                         goto done;
1095                                 }
1096                         }
1097
1098                         /* Checks if element already exists */
1099                         idx = find_element(msg2, el->name);
1100                         if (idx != -1) {
1101                                 j = (unsigned int) idx;
1102                                 el2 = &(msg2->elements[j]);
1103
1104                                 /* we consider two elements to be
1105                                  * equal only if the order
1106                                  * matches. This allows dbcheck to
1107                                  * fix the ordering on attributes
1108                                  * where order matters, such as
1109                                  * objectClass
1110                                  */
1111                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1112                                         continue;
1113                                 }
1114
1115                                 /* Delete the attribute if it exists in the DB */
1116                                 if (msg_delete_attribute(module, ltdb,
1117                                                          msg2,
1118                                                          el->name) != 0) {
1119                                         ret = LDB_ERR_OTHER;
1120                                         goto done;
1121                                 }
1122                         }
1123
1124                         /* Recreate it with the new values */
1125                         if (ltdb_msg_add_element(msg2, el) != 0) {
1126                                 ret = LDB_ERR_OTHER;
1127                                 goto done;
1128                         }
1129
1130                         ret = ltdb_index_add_element(module, ltdb,
1131                                                      msg2, el);
1132                         if (ret != LDB_SUCCESS) {
1133                                 goto done;
1134                         }
1135
1136                         break;
1137
1138                 case LDB_FLAG_MOD_DELETE:
1139                         dn = ldb_dn_get_linearized(msg2->dn);
1140                         if (dn == NULL) {
1141                                 ret = LDB_ERR_OTHER;
1142                                 goto done;
1143                         }
1144
1145                         if (msg->elements[i].num_values == 0) {
1146                                 /* Delete the whole attribute */
1147                                 ret = msg_delete_attribute(module,
1148                                                            ltdb,
1149                                                            msg2,
1150                                                            msg->elements[i].name);
1151                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1152                                     control_permissive) {
1153                                         ret = LDB_SUCCESS;
1154                                 } else {
1155                                         ldb_asprintf_errstring(ldb,
1156                                                                "attribute '%s': no such attribute for delete on '%s'",
1157                                                                msg->elements[i].name, dn);
1158                                 }
1159                                 if (ret != LDB_SUCCESS) {
1160                                         goto done;
1161                                 }
1162                         } else {
1163                                 /* Delete specified values from an attribute */
1164                                 for (j=0; j < msg->elements[i].num_values; j++) {
1165                                         ret = msg_delete_element(module,
1166                                                                  ltdb,
1167                                                                  msg2,
1168                                                                  msg->elements[i].name,
1169                                                                  &msg->elements[i].values[j]);
1170                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1171                                             control_permissive) {
1172                                                 ret = LDB_SUCCESS;
1173                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1174                                                 ldb_asprintf_errstring(ldb,
1175                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1176                                                                        msg->elements[i].name, dn);
1177                                         }
1178                                         if (ret != LDB_SUCCESS) {
1179                                                 goto done;
1180                                         }
1181                                 }
1182                         }
1183                         break;
1184                 default:
1185                         ldb_asprintf_errstring(ldb,
1186                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1187                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1188                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1189                         ret = LDB_ERR_PROTOCOL_ERROR;
1190                         goto done;
1191                 }
1192         }
1193
1194         ret = ltdb_store(module, msg2, TDB_MODIFY);
1195         if (ret != LDB_SUCCESS) {
1196                 goto done;
1197         }
1198
1199         ret = ltdb_modified(module, msg2->dn);
1200         if (ret != LDB_SUCCESS) {
1201                 goto done;
1202         }
1203
1204 done:
1205         TALLOC_FREE(mem_ctx);
1206         return ret;
1207 }
1208
1209 /*
1210   modify a record
1211 */
1212 static int ltdb_modify(struct ltdb_context *ctx)
1213 {
1214         struct ldb_module *module = ctx->module;
1215         struct ldb_request *req = ctx->req;
1216         int ret = LDB_SUCCESS;
1217
1218         ret = ltdb_check_special_dn(module, req->op.mod.message);
1219         if (ret != LDB_SUCCESS) {
1220                 return ret;
1221         }
1222
1223         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1224
1225         if (ltdb_cache_load(module) != 0) {
1226                 return LDB_ERR_OPERATIONS_ERROR;
1227         }
1228
1229         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1230
1231         return ret;
1232 }
1233
1234 /*
1235   rename a record
1236 */
1237 static int ltdb_rename(struct ltdb_context *ctx)
1238 {
1239         struct ldb_module *module = ctx->module;
1240         void *data = ldb_module_get_private(module);
1241         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1242         struct ldb_request *req = ctx->req;
1243         struct ldb_message *msg;
1244         int ret = LDB_SUCCESS;
1245         TDB_DATA tdb_key, tdb_key_old;
1246
1247         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1248
1249         if (ltdb_cache_load(ctx->module) != 0) {
1250                 return LDB_ERR_OPERATIONS_ERROR;
1251         }
1252
1253         msg = ldb_msg_new(ctx);
1254         if (msg == NULL) {
1255                 return LDB_ERR_OPERATIONS_ERROR;
1256         }
1257
1258         /* we need to fetch the old record to re-add under the new name */
1259         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1260                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1261         if (ret != LDB_SUCCESS) {
1262                 /* not finding the old record is an error */
1263                 return ret;
1264         }
1265
1266         /* We need to, before changing the DB, check if the new DN
1267          * exists, so we can return this error to the caller with an
1268          * unmodified DB
1269          *
1270          * Even in GUID index mode we use ltdb_key_dn() as we are
1271          * trying to figure out if this is just a case rename
1272          */
1273         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1274         if (!tdb_key.dptr) {
1275                 talloc_free(msg);
1276                 return LDB_ERR_OPERATIONS_ERROR;
1277         }
1278
1279         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1280         if (!tdb_key_old.dptr) {
1281                 talloc_free(msg);
1282                 talloc_free(tdb_key.dptr);
1283                 return LDB_ERR_OPERATIONS_ERROR;
1284         }
1285
1286         /*
1287          * Only declare a conflict if the new DN already exists,
1288          * and it isn't a case change on the old DN
1289          */
1290         if (tdb_key_old.dsize != tdb_key.dsize
1291             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1292                 ret = ltdb_search_base(module,
1293                                        req->op.rename.newdn);
1294                 if (ret == LDB_SUCCESS) {
1295                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1296                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1297                         ret = LDB_SUCCESS;
1298                 }
1299         }
1300
1301         /* finding the new record already in the DB is an error */
1302
1303         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1304                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1305                                        "Entry %s already exists",
1306                                        ldb_dn_get_linearized(req->op.rename.newdn));
1307         }
1308         if (ret != LDB_SUCCESS) {
1309                 talloc_free(tdb_key_old.dptr);
1310                 talloc_free(tdb_key.dptr);
1311                 talloc_free(msg);
1312                 return ret;
1313         }
1314
1315         talloc_free(tdb_key_old.dptr);
1316         talloc_free(tdb_key.dptr);
1317
1318         /* Always delete first then add, to avoid conflicts with
1319          * unique indexes. We rely on the transaction to make this
1320          * atomic
1321          */
1322         ret = ltdb_delete_internal(module, msg->dn);
1323         if (ret != LDB_SUCCESS) {
1324                 talloc_free(msg);
1325                 return ret;
1326         }
1327
1328         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1329         if (msg->dn == NULL) {
1330                 talloc_free(msg);
1331                 return LDB_ERR_OPERATIONS_ERROR;
1332         }
1333
1334         /* We don't check single value as we can have more than 1 with
1335          * deleted attributes. We could go through all elements but that's
1336          * maybe not the most efficient way
1337          */
1338         ret = ltdb_add_internal(module, ltdb, msg, false);
1339
1340         talloc_free(msg);
1341
1342         return ret;
1343 }
1344
1345 static int ltdb_start_trans(struct ldb_module *module)
1346 {
1347         void *data = ldb_module_get_private(module);
1348         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1349
1350         /* Do not take out the transaction lock on a read-only DB */
1351         if (ltdb->read_only) {
1352                 return LDB_ERR_UNWILLING_TO_PERFORM;
1353         }
1354
1355         if (tdb_transaction_start(ltdb->tdb) != 0) {
1356                 return ltdb_err_map(tdb_error(ltdb->tdb));
1357         }
1358
1359         ltdb->in_transaction++;
1360
1361         ltdb_index_transaction_start(module);
1362
1363         return LDB_SUCCESS;
1364 }
1365
1366 static int ltdb_prepare_commit(struct ldb_module *module)
1367 {
1368         int ret;
1369         void *data = ldb_module_get_private(module);
1370         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1371
1372         if (ltdb->in_transaction != 1) {
1373                 return LDB_SUCCESS;
1374         }
1375
1376         ret = ltdb_index_transaction_commit(module);
1377         if (ret != LDB_SUCCESS) {
1378                 tdb_transaction_cancel(ltdb->tdb);
1379                 ltdb->in_transaction--;
1380                 return ret;
1381         }
1382
1383         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1384                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1385                 ltdb->in_transaction--;
1386                 ldb_debug_set(ldb_module_get_ctx(module),
1387                               LDB_DEBUG_FATAL,
1388                               "Failure during "
1389                               "tdb_transaction_prepare_commit(): %s -> %s",
1390                               tdb_errorstr(ltdb->tdb),
1391                               ldb_strerror(ret));
1392                 return ret;
1393         }
1394
1395         ltdb->prepared_commit = true;
1396
1397         return LDB_SUCCESS;
1398 }
1399
1400 static int ltdb_end_trans(struct ldb_module *module)
1401 {
1402         int ret;
1403         void *data = ldb_module_get_private(module);
1404         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1405
1406         if (!ltdb->prepared_commit) {
1407                 ret = ltdb_prepare_commit(module);
1408                 if (ret != LDB_SUCCESS) {
1409                         return ret;
1410                 }
1411         }
1412
1413         ltdb->in_transaction--;
1414         ltdb->prepared_commit = false;
1415
1416         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1417                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1418                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1419                                        "Failure during tdb_transaction_commit(): %s -> %s",
1420                                        tdb_errorstr(ltdb->tdb),
1421                                        ldb_strerror(ret));
1422                 return ret;
1423         }
1424
1425         return LDB_SUCCESS;
1426 }
1427
1428 static int ltdb_del_trans(struct ldb_module *module)
1429 {
1430         void *data = ldb_module_get_private(module);
1431         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1432
1433         ltdb->in_transaction--;
1434
1435         if (ltdb_index_transaction_cancel(module) != 0) {
1436                 tdb_transaction_cancel(ltdb->tdb);
1437                 return ltdb_err_map(tdb_error(ltdb->tdb));
1438         }
1439
1440         tdb_transaction_cancel(ltdb->tdb);
1441         return LDB_SUCCESS;
1442 }
1443
1444 /*
1445   return sequenceNumber from @BASEINFO
1446 */
1447 static int ltdb_sequence_number(struct ltdb_context *ctx,
1448                                 struct ldb_extended **ext)
1449 {
1450         struct ldb_context *ldb;
1451         struct ldb_module *module = ctx->module;
1452         struct ldb_request *req = ctx->req;
1453         TALLOC_CTX *tmp_ctx = NULL;
1454         struct ldb_seqnum_request *seq;
1455         struct ldb_seqnum_result *res;
1456         struct ldb_message *msg = NULL;
1457         struct ldb_dn *dn;
1458         const char *date;
1459         int ret = LDB_SUCCESS;
1460
1461         ldb = ldb_module_get_ctx(module);
1462
1463         seq = talloc_get_type(req->op.extended.data,
1464                                 struct ldb_seqnum_request);
1465         if (seq == NULL) {
1466                 return LDB_ERR_OPERATIONS_ERROR;
1467         }
1468
1469         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1470
1471         if (ltdb_lock_read(module) != 0) {
1472                 return LDB_ERR_OPERATIONS_ERROR;
1473         }
1474
1475         res = talloc_zero(req, struct ldb_seqnum_result);
1476         if (res == NULL) {
1477                 ret = LDB_ERR_OPERATIONS_ERROR;
1478                 goto done;
1479         }
1480
1481         tmp_ctx = talloc_new(req);
1482         if (tmp_ctx == NULL) {
1483                 ret = LDB_ERR_OPERATIONS_ERROR;
1484                 goto done;
1485         }
1486
1487         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1488         if (dn == NULL) {
1489                 ret = LDB_ERR_OPERATIONS_ERROR;
1490                 goto done;
1491         }
1492
1493         msg = ldb_msg_new(tmp_ctx);
1494         if (msg == NULL) {
1495                 ret = LDB_ERR_OPERATIONS_ERROR;
1496                 goto done;
1497         }
1498
1499         ret = ltdb_search_dn1(module, dn, msg, 0);
1500         if (ret != LDB_SUCCESS) {
1501                 goto done;
1502         }
1503
1504         switch (seq->type) {
1505         case LDB_SEQ_HIGHEST_SEQ:
1506                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1507                 break;
1508         case LDB_SEQ_NEXT:
1509                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1510                 res->seq_num++;
1511                 break;
1512         case LDB_SEQ_HIGHEST_TIMESTAMP:
1513                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1514                 if (date) {
1515                         res->seq_num = ldb_string_to_time(date);
1516                 } else {
1517                         res->seq_num = 0;
1518                         /* zero is as good as anything when we don't know */
1519                 }
1520                 break;
1521         }
1522
1523         *ext = talloc_zero(req, struct ldb_extended);
1524         if (*ext == NULL) {
1525                 ret = LDB_ERR_OPERATIONS_ERROR;
1526                 goto done;
1527         }
1528         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1529         (*ext)->data = talloc_steal(*ext, res);
1530
1531 done:
1532         talloc_free(tmp_ctx);
1533         ltdb_unlock_read(module);
1534         return ret;
1535 }
1536
1537 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1538 {
1539         struct ldb_context *ldb;
1540         struct ldb_request *req;
1541         struct ldb_reply *ares;
1542
1543         ldb = ldb_module_get_ctx(ctx->module);
1544         req = ctx->req;
1545
1546         /* if we already returned an error just return */
1547         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1548                 return;
1549         }
1550
1551         ares = talloc_zero(req, struct ldb_reply);
1552         if (!ares) {
1553                 ldb_oom(ldb);
1554                 req->callback(req, NULL);
1555                 return;
1556         }
1557         ares->type = LDB_REPLY_DONE;
1558         ares->error = error;
1559
1560         req->callback(req, ares);
1561 }
1562
1563 static void ltdb_timeout(struct tevent_context *ev,
1564                           struct tevent_timer *te,
1565                           struct timeval t,
1566                           void *private_data)
1567 {
1568         struct ltdb_context *ctx;
1569         ctx = talloc_get_type(private_data, struct ltdb_context);
1570
1571         if (!ctx->request_terminated) {
1572                 /* request is done now */
1573                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1574         }
1575
1576         if (ctx->spy) {
1577                 /* neutralize the spy */
1578                 ctx->spy->ctx = NULL;
1579                 ctx->spy = NULL;
1580         }
1581         talloc_free(ctx);
1582 }
1583
1584 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1585                                         struct ldb_extended *ext,
1586                                         int error)
1587 {
1588         struct ldb_context *ldb;
1589         struct ldb_request *req;
1590         struct ldb_reply *ares;
1591
1592         ldb = ldb_module_get_ctx(ctx->module);
1593         req = ctx->req;
1594
1595         /* if we already returned an error just return */
1596         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1597                 return;
1598         }
1599
1600         ares = talloc_zero(req, struct ldb_reply);
1601         if (!ares) {
1602                 ldb_oom(ldb);
1603                 req->callback(req, NULL);
1604                 return;
1605         }
1606         ares->type = LDB_REPLY_DONE;
1607         ares->response = ext;
1608         ares->error = error;
1609
1610         req->callback(req, ares);
1611 }
1612
1613 static void ltdb_handle_extended(struct ltdb_context *ctx)
1614 {
1615         struct ldb_extended *ext = NULL;
1616         int ret;
1617
1618         if (strcmp(ctx->req->op.extended.oid,
1619                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1620                 /* get sequence number */
1621                 ret = ltdb_sequence_number(ctx, &ext);
1622         } else {
1623                 /* not recognized */
1624                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1625         }
1626
1627         ltdb_request_extended_done(ctx, ext, ret);
1628 }
1629
1630 static void ltdb_callback(struct tevent_context *ev,
1631                           struct tevent_timer *te,
1632                           struct timeval t,
1633                           void *private_data)
1634 {
1635         struct ltdb_context *ctx;
1636         int ret;
1637
1638         ctx = talloc_get_type(private_data, struct ltdb_context);
1639
1640         if (ctx->request_terminated) {
1641                 goto done;
1642         }
1643
1644         switch (ctx->req->operation) {
1645         case LDB_SEARCH:
1646                 ret = ltdb_search(ctx);
1647                 break;
1648         case LDB_ADD:
1649                 ret = ltdb_add(ctx);
1650                 break;
1651         case LDB_MODIFY:
1652                 ret = ltdb_modify(ctx);
1653                 break;
1654         case LDB_DELETE:
1655                 ret = ltdb_delete(ctx);
1656                 break;
1657         case LDB_RENAME:
1658                 ret = ltdb_rename(ctx);
1659                 break;
1660         case LDB_EXTENDED:
1661                 ltdb_handle_extended(ctx);
1662                 goto done;
1663         default:
1664                 /* no other op supported */
1665                 ret = LDB_ERR_PROTOCOL_ERROR;
1666         }
1667
1668         if (!ctx->request_terminated) {
1669                 /* request is done now */
1670                 ltdb_request_done(ctx, ret);
1671         }
1672
1673 done:
1674         if (ctx->spy) {
1675                 /* neutralize the spy */
1676                 ctx->spy->ctx = NULL;
1677                 ctx->spy = NULL;
1678         }
1679         talloc_free(ctx);
1680 }
1681
1682 static int ltdb_request_destructor(void *ptr)
1683 {
1684         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1685
1686         if (spy->ctx != NULL) {
1687                 spy->ctx->spy = NULL;
1688                 spy->ctx->request_terminated = true;
1689                 spy->ctx = NULL;
1690         }
1691
1692         return 0;
1693 }
1694
1695 static int ltdb_handle_request(struct ldb_module *module,
1696                                struct ldb_request *req)
1697 {
1698         struct ldb_control *control_permissive;
1699         struct ldb_context *ldb;
1700         struct tevent_context *ev;
1701         struct ltdb_context *ac;
1702         struct tevent_timer *te;
1703         struct timeval tv;
1704         unsigned int i;
1705
1706         ldb = ldb_module_get_ctx(module);
1707
1708         control_permissive = ldb_request_get_control(req,
1709                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1710
1711         for (i = 0; req->controls && req->controls[i]; i++) {
1712                 if (req->controls[i]->critical &&
1713                     req->controls[i] != control_permissive) {
1714                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1715                                                req->controls[i]->oid);
1716                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1717                 }
1718         }
1719
1720         if (req->starttime == 0 || req->timeout == 0) {
1721                 ldb_set_errstring(ldb, "Invalid timeout settings");
1722                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1723         }
1724
1725         ev = ldb_handle_get_event_context(req->handle);
1726
1727         ac = talloc_zero(ldb, struct ltdb_context);
1728         if (ac == NULL) {
1729                 ldb_oom(ldb);
1730                 return LDB_ERR_OPERATIONS_ERROR;
1731         }
1732
1733         ac->module = module;
1734         ac->req = req;
1735
1736         tv.tv_sec = 0;
1737         tv.tv_usec = 0;
1738         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1739         if (NULL == te) {
1740                 talloc_free(ac);
1741                 return LDB_ERR_OPERATIONS_ERROR;
1742         }
1743
1744         if (req->timeout > 0) {
1745                 tv.tv_sec = req->starttime + req->timeout;
1746                 tv.tv_usec = 0;
1747                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1748                                                      ltdb_timeout, ac);
1749                 if (NULL == ac->timeout_event) {
1750                         talloc_free(ac);
1751                         return LDB_ERR_OPERATIONS_ERROR;
1752                 }
1753         }
1754
1755         /* set a spy so that we do not try to use the request context
1756          * if it is freed before ltdb_callback fires */
1757         ac->spy = talloc(req, struct ltdb_req_spy);
1758         if (NULL == ac->spy) {
1759                 talloc_free(ac);
1760                 return LDB_ERR_OPERATIONS_ERROR;
1761         }
1762         ac->spy->ctx = ac;
1763
1764         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1765
1766         return LDB_SUCCESS;
1767 }
1768
1769 static int ltdb_init_rootdse(struct ldb_module *module)
1770 {
1771         /* ignore errors on this - we expect it for non-sam databases */
1772         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1773
1774         /* there can be no module beyond the backend, just return */
1775         return LDB_SUCCESS;
1776 }
1777
1778 static const struct ldb_module_ops ltdb_ops = {
1779         .name              = "tdb",
1780         .init_context      = ltdb_init_rootdse,
1781         .search            = ltdb_handle_request,
1782         .add               = ltdb_handle_request,
1783         .modify            = ltdb_handle_request,
1784         .del               = ltdb_handle_request,
1785         .rename            = ltdb_handle_request,
1786         .extended          = ltdb_handle_request,
1787         .start_transaction = ltdb_start_trans,
1788         .end_transaction   = ltdb_end_trans,
1789         .prepare_commit    = ltdb_prepare_commit,
1790         .del_transaction   = ltdb_del_trans,
1791         .read_lock         = ltdb_lock_read,
1792         .read_unlock       = ltdb_unlock_read,
1793 };
1794
1795 /*
1796   connect to the database
1797 */
1798 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1799                         unsigned int flags, const char *options[],
1800                         struct ldb_module **_module)
1801 {
1802         struct ldb_module *module;
1803         const char *path;
1804         int tdb_flags, open_flags;
1805         struct ltdb_private *ltdb;
1806
1807         /*
1808          * We hold locks, so we must use a private event context
1809          * on each returned handle
1810          */
1811
1812         ldb_set_require_private_event_context(ldb);
1813
1814         /* parse the url */
1815         if (strchr(url, ':')) {
1816                 if (strncmp(url, "tdb://", 6) != 0) {
1817                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1818                                   "Invalid tdb URL '%s'", url);
1819                         return LDB_ERR_OPERATIONS_ERROR;
1820                 }
1821                 path = url+6;
1822         } else {
1823                 path = url;
1824         }
1825
1826         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1827
1828         /* check for the 'nosync' option */
1829         if (flags & LDB_FLG_NOSYNC) {
1830                 tdb_flags |= TDB_NOSYNC;
1831         }
1832
1833         /* and nommap option */
1834         if (flags & LDB_FLG_NOMMAP) {
1835                 tdb_flags |= TDB_NOMMAP;
1836         }
1837
1838         ltdb = talloc_zero(ldb, struct ltdb_private);
1839         if (!ltdb) {
1840                 ldb_oom(ldb);
1841                 return LDB_ERR_OPERATIONS_ERROR;
1842         }
1843
1844         if (flags & LDB_FLG_RDONLY) {
1845                 /*
1846                  * This is weird, but because we can only have one tdb
1847                  * in this process, and the other one could be
1848                  * read-write, we can't use the tdb readonly.  Plus a
1849                  * read only tdb prohibits the all-record lock.
1850                  */
1851                 open_flags = O_RDWR;
1852
1853                 ltdb->read_only = true;
1854
1855         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1856                 /*
1857                  * This is used by ldbsearch to prevent creation of the database
1858                  * if the name is wrong
1859                  */
1860                 open_flags = O_RDWR;
1861         } else {
1862                 /*
1863                  * This is the normal case
1864                  */
1865                 open_flags = O_CREAT | O_RDWR;
1866         }
1867
1868         /* note that we use quite a large default hash size */
1869         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1870                                    tdb_flags, open_flags,
1871                                    ldb_get_create_perms(ldb), ldb);
1872         if (!ltdb->tdb) {
1873                 ldb_asprintf_errstring(ldb,
1874                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1875                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1876                           "Unable to open tdb '%s': %s", path, strerror(errno));
1877                 talloc_free(ltdb);
1878                 if (errno == EACCES || errno == EPERM) {
1879                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1880                 }
1881                 return LDB_ERR_OPERATIONS_ERROR;
1882         }
1883
1884         if (getenv("LDB_WARN_UNINDEXED")) {
1885                 ltdb->warn_unindexed = true;
1886         }
1887
1888         if (getenv("LDB_WARN_REINDEX")) {
1889                 ltdb->warn_reindex = true;
1890         }
1891
1892         ltdb->sequence_number = 0;
1893
1894         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1895         if (!module) {
1896                 ldb_oom(ldb);
1897                 talloc_free(ltdb);
1898                 return LDB_ERR_OPERATIONS_ERROR;
1899         }
1900         ldb_module_set_private(module, ltdb);
1901         talloc_steal(module, ltdb);
1902
1903         if (ltdb_cache_load(module) != 0) {
1904                 ldb_asprintf_errstring(ldb,
1905                                        "Unable to load ltdb cache records of tdb '%s'", path);
1906                 talloc_free(module);
1907                 return LDB_ERR_OPERATIONS_ERROR;
1908         }
1909
1910         *_module = module;
1911         return LDB_SUCCESS;
1912 }
1913
1914 int ldb_tdb_init(const char *version)
1915 {
1916         LDB_MODULE_CHECK_VERSION(version);
1917         return ldb_register_backend("tdb", ltdb_connect, false);
1918 }