ldb: Explain why an entry can vanish from the index
[samba.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 enum key_truncation {
169         KEY_NOT_TRUNCATED,
170         KEY_TRUNCATED,
171 };
172
173 static int ltdb_write_index_dn_guid(struct ldb_module *module,
174                                     const struct ldb_message *msg,
175                                     int add);
176 static int ltdb_index_dn_base_dn(struct ldb_module *module,
177                                  struct ltdb_private *ltdb,
178                                  struct ldb_dn *base_dn,
179                                  struct dn_list *dn_list,
180                                  enum key_truncation *truncation);
181
182 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
183                               struct dn_list *list);
184
185 /* we put a @IDXVERSION attribute on index entries. This
186    allows us to tell if it was written by an older version
187 */
188 #define LTDB_INDEXING_VERSION 2
189
190 #define LTDB_GUID_INDEXING_VERSION 3
191
192 static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
193         if (ltdb->max_key_length == 0){
194                 return UINT_MAX;
195         }
196         return ltdb->max_key_length;
197 }
198
199 /* enable the idxptr mode when transactions start */
200 int ltdb_index_transaction_start(struct ldb_module *module)
201 {
202         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
203         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
204         if (ltdb->idxptr == NULL) {
205                 return ldb_oom(ldb_module_get_ctx(module));
206         }
207
208         return LDB_SUCCESS;
209 }
210
211 /*
212   see if two ldb_val structures contain exactly the same data
213   return -1 or 1 for a mismatch, 0 for match
214 */
215 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
216                                          const struct ldb_val *v2)
217 {
218         if (v1->length > v2->length) {
219                 return -1;
220         }
221         if (v1->length < v2->length) {
222                 return 1;
223         }
224         return memcmp(v1->data, v2->data, v1->length);
225 }
226
227 /*
228   see if two ldb_val structures contain exactly the same data
229   return -1 or 1 for a mismatch, 0 for match
230 */
231 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
232                                        const struct ldb_val *v2)
233 {
234         if (v1.length > v2->length) {
235                 return -1;
236         }
237         if (v1.length < v2->length) {
238                 return 1;
239         }
240         return memcmp(v1.data, v2->data, v1.length);
241 }
242
243
244 /*
245   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
246   binary-safe comparison for the 'dn' returns -1 if not found
247
248   This is therefore safe when the value is a GUID in the future
249  */
250 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
251                                  const struct dn_list *list,
252                                  const struct ldb_val *v)
253 {
254         unsigned int i;
255         struct ldb_val *exact = NULL, *next = NULL;
256
257         if (ltdb->cache->GUID_index_attribute == NULL) {
258                 for (i=0; i<list->count; i++) {
259                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
260                                 return i;
261                         }
262                 }
263                 return -1;
264         }
265
266         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
267                                 *v, ldb_val_equal_exact_ordered,
268                                 exact, next);
269         if (exact == NULL) {
270                 return -1;
271         }
272         /* Not required, but keeps the compiler quiet */
273         if (next != NULL) {
274                 return -1;
275         }
276
277         i = exact - list->dn;
278         return i;
279 }
280
281 /*
282   find a entry in a dn_list. Uses a case sensitive comparison with the dn
283   returns -1 if not found
284  */
285 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
286                                  struct dn_list *list,
287                                  const struct ldb_message *msg)
288 {
289         struct ldb_val v;
290         const struct ldb_val *key_val;
291         if (ltdb->cache->GUID_index_attribute == NULL) {
292                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
293                 v.data = discard_const_p(unsigned char, dn_str);
294                 v.length = strlen(dn_str);
295         } else {
296                 key_val = ldb_msg_find_ldb_val(msg,
297                                                ltdb->cache->GUID_index_attribute);
298                 if (key_val == NULL) {
299                         return -1;
300                 }
301                 v = *key_val;
302         }
303         return ltdb_dn_list_find_val(ltdb, list, &v);
304 }
305
306 /*
307   this is effectively a cast function, but with lots of paranoia
308   checks and also copes with CPUs that are fussy about pointer
309   alignment
310  */
311 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
312 {
313         struct dn_list *list;
314         if (rec.dsize != sizeof(void *)) {
315                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
316                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
317                 return NULL;
318         }
319         /* note that we can't just use a cast here, as rec.dptr may
320            not be aligned sufficiently for a pointer. A cast would cause
321            platforms like some ARM CPUs to crash */
322         memcpy(&list, rec.dptr, sizeof(void *));
323         list = talloc_get_type(list, struct dn_list);
324         if (list == NULL) {
325                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
326                                        "Bad type '%s' for idxptr",
327                                        talloc_get_name(list));
328                 return NULL;
329         }
330         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
331                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
332                                        "Bad parent '%s' for idxptr",
333                                        talloc_get_name(talloc_parent(list->dn)));
334                 return NULL;
335         }
336         return list;
337 }
338
339 /*
340   return the @IDX list in an index entry for a dn as a
341   struct dn_list
342  */
343 static int ltdb_dn_list_load(struct ldb_module *module,
344                              struct ltdb_private *ltdb,
345                              struct ldb_dn *dn, struct dn_list *list)
346 {
347         struct ldb_message *msg;
348         int ret, version;
349         struct ldb_message_element *el;
350         TDB_DATA rec;
351         struct dn_list *list2;
352         TDB_DATA key;
353
354         list->dn = NULL;
355         list->count = 0;
356
357         /* see if we have any in-memory index entries */
358         if (ltdb->idxptr == NULL ||
359             ltdb->idxptr->itdb == NULL) {
360                 goto normal_index;
361         }
362
363         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
364         key.dsize = strlen((char *)key.dptr);
365
366         rec = tdb_fetch(ltdb->idxptr->itdb, key);
367         if (rec.dptr == NULL) {
368                 goto normal_index;
369         }
370
371         /* we've found an in-memory index entry */
372         list2 = ltdb_index_idxptr(module, rec, true);
373         if (list2 == NULL) {
374                 free(rec.dptr);
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377         free(rec.dptr);
378
379         *list = *list2;
380         return LDB_SUCCESS;
381
382 normal_index:
383         msg = ldb_msg_new(list);
384         if (msg == NULL) {
385                 return LDB_ERR_OPERATIONS_ERROR;
386         }
387
388         ret = ltdb_search_dn1(module, dn, msg,
389                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
390                               |LDB_UNPACK_DATA_FLAG_NO_DN);
391         if (ret != LDB_SUCCESS) {
392                 talloc_free(msg);
393                 return ret;
394         }
395
396         el = ldb_msg_find_element(msg, LTDB_IDX);
397         if (!el) {
398                 talloc_free(msg);
399                 return LDB_SUCCESS;
400         }
401
402         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
403
404         /*
405          * we avoid copying the strings by stealing the list.  We have
406          * to steal msg onto el->values (which looks odd) because we
407          * asked for the memory to be allocated on msg, not on each
408          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
409          */
410         if (ltdb->cache->GUID_index_attribute == NULL) {
411                 /* check indexing version number */
412                 if (version != LTDB_INDEXING_VERSION) {
413                         ldb_debug_set(ldb_module_get_ctx(module),
414                                       LDB_DEBUG_ERROR,
415                                       "Wrong DN index version %d "
416                                       "expected %d for %s",
417                                       version, LTDB_INDEXING_VERSION,
418                                       ldb_dn_get_linearized(dn));
419                         return LDB_ERR_OPERATIONS_ERROR;
420                 }
421
422                 talloc_steal(el->values, msg);
423                 list->dn = talloc_steal(list, el->values);
424                 list->count = el->num_values;
425         } else {
426                 unsigned int i;
427                 if (version != LTDB_GUID_INDEXING_VERSION) {
428                         /* This is quite likely during the DB startup
429                            on first upgrade to using a GUID index */
430                         ldb_debug_set(ldb_module_get_ctx(module),
431                                       LDB_DEBUG_ERROR,
432                                       "Wrong GUID index version %d "
433                                       "expected %d for %s",
434                                       version, LTDB_GUID_INDEXING_VERSION,
435                                       ldb_dn_get_linearized(dn));
436                         return LDB_ERR_OPERATIONS_ERROR;
437                 }
438
439                 if (el->num_values == 0) {
440                         return LDB_ERR_OPERATIONS_ERROR;
441                 }
442
443                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
444                         return LDB_ERR_OPERATIONS_ERROR;
445                 }
446
447                 list->count = el->values[0].length / LTDB_GUID_SIZE;
448                 list->dn = talloc_array(list, struct ldb_val, list->count);
449
450                 /*
451                  * The actual data is on msg, due to
452                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
453                  */
454                 talloc_steal(list->dn, msg);
455                 for (i = 0; i < list->count; i++) {
456                         list->dn[i].data
457                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
458                         list->dn[i].length = LTDB_GUID_SIZE;
459                 }
460         }
461
462         /* We don't need msg->elements any more */
463         talloc_free(msg->elements);
464         return LDB_SUCCESS;
465 }
466
467 int ltdb_key_dn_from_idx(struct ldb_module *module,
468                          struct ltdb_private *ltdb,
469                          TALLOC_CTX *mem_ctx,
470                          struct ldb_dn *dn,
471                          TDB_DATA *tdb_key)
472 {
473         struct ldb_context *ldb = ldb_module_get_ctx(module);
474         int ret;
475         int index = 0;
476         enum key_truncation truncation = KEY_NOT_TRUNCATED;
477         struct dn_list *list = talloc(mem_ctx, struct dn_list);
478         if (list == NULL) {
479                 ldb_oom(ldb);
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483
484         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
485         if (ret != LDB_SUCCESS) {
486                 TALLOC_FREE(list);
487                 return ret;
488         }
489
490         if (list->count == 0) {
491                 TALLOC_FREE(list);
492                 return LDB_ERR_NO_SUCH_OBJECT;
493         }
494
495         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
496                 const char *dn_str = ldb_dn_get_linearized(dn);
497                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
498                                        __location__
499                                        ": Failed to read DN index "
500                                        "against %s for %s: too many "
501                                        "values (%u > 1)",
502                                        ltdb->cache->GUID_index_attribute,
503                                        dn_str, list->count);
504                 TALLOC_FREE(list);
505                 return LDB_ERR_CONSTRAINT_VIOLATION;
506         }
507
508         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
509                 /*
510                  * DN key has been truncated, need to inspect the actual
511                  * records to locate the actual DN
512                  */
513                 int i;
514                 index = -1;
515                 for (i=0; i < list->count; i++) {
516                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
517                         TDB_DATA key = {
518                                 .dptr = guid_key,
519                                 .dsize = sizeof(guid_key)
520                         };
521                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
522                         struct ldb_message *rec = ldb_msg_new(ldb);
523                         if (rec == NULL) {
524                                 return LDB_ERR_OPERATIONS_ERROR;
525                         }
526
527                         ret = ltdb_idx_to_key(module, ltdb,
528                                               ldb, &list->dn[i],
529                                               &key);
530                         if (ret != LDB_SUCCESS) {
531                                 TALLOC_FREE(list);
532                                 TALLOC_FREE(rec);
533                                 return ret;
534                         }
535
536                         ret = ltdb_search_key(module, ltdb, key,
537                                               rec, flags);
538                         if (key.dptr != guid_key) {
539                                 TALLOC_FREE(key.dptr);
540                         }
541                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
542                                 /*
543                                  * the record has disappeared?
544                                  * yes, this can happen
545                                  */
546                                 TALLOC_FREE(rec);
547                                 continue;
548                         }
549
550                         if (ret != LDB_SUCCESS) {
551                                 /* an internal error */
552                                 TALLOC_FREE(rec);
553                                 TALLOC_FREE(list);
554                                 return LDB_ERR_OPERATIONS_ERROR;
555                         }
556
557                         /*
558                          * We found the actual DN that we wanted from in the
559                          * multiple values that matched the index
560                          * (due to truncation), so return that.
561                          *
562                          */
563                         if (ldb_dn_compare(dn, rec->dn) == 0) {
564                                 index = i;
565                                 TALLOC_FREE(rec);
566                                 break;
567                         }
568                 }
569
570                 /*
571                  * We matched the index but the actual DN we wanted
572                  * was not here.
573                  */
574                 if (index == -1) {
575                         TALLOC_FREE(list);
576                         return LDB_ERR_NO_SUCH_OBJECT;
577                 }
578         }
579
580         /* The tdb_key memory is allocated by the caller */
581         ret = ltdb_guid_to_key(module, ltdb,
582                                &list->dn[index], tdb_key);
583         TALLOC_FREE(list);
584
585         if (ret != LDB_SUCCESS) {
586                 return LDB_ERR_OPERATIONS_ERROR;
587         }
588
589         return LDB_SUCCESS;
590 }
591
592
593
594 /*
595   save a dn_list into a full @IDX style record
596  */
597 static int ltdb_dn_list_store_full(struct ldb_module *module,
598                                    struct ltdb_private *ltdb,
599                                    struct ldb_dn *dn,
600                                    struct dn_list *list)
601 {
602         struct ldb_message *msg;
603         int ret;
604
605         msg = ldb_msg_new(module);
606         if (!msg) {
607                 return ldb_module_oom(module);
608         }
609
610         msg->dn = dn;
611
612         if (list->count == 0) {
613                 ret = ltdb_delete_noindex(module, msg);
614                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
615                         talloc_free(msg);
616                         return LDB_SUCCESS;
617                 }
618                 return ret;
619         }
620
621         if (ltdb->cache->GUID_index_attribute == NULL) {
622                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
623                                       LTDB_INDEXING_VERSION);
624                 if (ret != LDB_SUCCESS) {
625                         talloc_free(msg);
626                         return ldb_module_oom(module);
627                 }
628         } else {
629                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
630                                       LTDB_GUID_INDEXING_VERSION);
631                 if (ret != LDB_SUCCESS) {
632                         talloc_free(msg);
633                         return ldb_module_oom(module);
634                 }
635         }
636
637         if (list->count > 0) {
638                 struct ldb_message_element *el;
639
640                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
641                 if (ret != LDB_SUCCESS) {
642                         talloc_free(msg);
643                         return ldb_module_oom(module);
644                 }
645
646                 if (ltdb->cache->GUID_index_attribute == NULL) {
647                         el->values = list->dn;
648                         el->num_values = list->count;
649                 } else {
650                         struct ldb_val v;
651                         unsigned int i;
652                         el->values = talloc_array(msg,
653                                                   struct ldb_val, 1);
654                         if (el->values == NULL) {
655                                 talloc_free(msg);
656                                 return ldb_module_oom(module);
657                         }
658
659                         v.data = talloc_array_size(el->values,
660                                                    list->count,
661                                                    LTDB_GUID_SIZE);
662                         if (v.data == NULL) {
663                                 talloc_free(msg);
664                                 return ldb_module_oom(module);
665                         }
666
667                         v.length = talloc_get_size(v.data);
668
669                         for (i = 0; i < list->count; i++) {
670                                 if (list->dn[i].length !=
671                                     LTDB_GUID_SIZE) {
672                                         talloc_free(msg);
673                                         return ldb_module_operr(module);
674                                 }
675                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
676                                        list->dn[i].data,
677                                        LTDB_GUID_SIZE);
678                         }
679                         el->values[0] = v;
680                         el->num_values = 1;
681                 }
682         }
683
684         ret = ltdb_store(module, msg, TDB_REPLACE);
685         talloc_free(msg);
686         return ret;
687 }
688
689 /*
690   save a dn_list into the database, in either @IDX or internal format
691  */
692 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
693                               struct dn_list *list)
694 {
695         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
696         TDB_DATA rec, key;
697         int ret;
698         struct dn_list *list2;
699
700         if (ltdb->idxptr == NULL) {
701                 return ltdb_dn_list_store_full(module, ltdb,
702                                                dn, list);
703         }
704
705         if (ltdb->idxptr->itdb == NULL) {
706                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
707                 if (ltdb->idxptr->itdb == NULL) {
708                         return LDB_ERR_OPERATIONS_ERROR;
709                 }
710         }
711
712         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
713         key.dsize = strlen((char *)key.dptr);
714
715         rec = tdb_fetch(ltdb->idxptr->itdb, key);
716         if (rec.dptr != NULL) {
717                 list2 = ltdb_index_idxptr(module, rec, false);
718                 if (list2 == NULL) {
719                         free(rec.dptr);
720                         return LDB_ERR_OPERATIONS_ERROR;
721                 }
722                 free(rec.dptr);
723                 list2->dn = talloc_steal(list2, list->dn);
724                 list2->count = list->count;
725                 return LDB_SUCCESS;
726         }
727
728         list2 = talloc(ltdb->idxptr, struct dn_list);
729         if (list2 == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         list2->dn = talloc_steal(list2, list->dn);
733         list2->count = list->count;
734
735         rec.dptr = (uint8_t *)&list2;
736         rec.dsize = sizeof(void *);
737
738
739         /*
740          * This is not a store into the main DB, but into an in-memory
741          * TDB, so we don't need a guard on ltdb->read_only
742          */
743         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
744         if (ret != 0) {
745                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
746         }
747         return LDB_SUCCESS;
748 }
749
750 /*
751   traverse function for storing the in-memory index entries on disk
752  */
753 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
754 {
755         struct ldb_module *module = state;
756         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
757         struct ldb_dn *dn;
758         struct ldb_context *ldb = ldb_module_get_ctx(module);
759         struct ldb_val v;
760         struct dn_list *list;
761
762         list = ltdb_index_idxptr(module, data, true);
763         if (list == NULL) {
764                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
765                 return -1;
766         }
767
768         v.data = key.dptr;
769         v.length = strnlen((char *)key.dptr, key.dsize);
770
771         dn = ldb_dn_from_ldb_val(module, ldb, &v);
772         if (dn == NULL) {
773                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
774                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
775                 return -1;
776         }
777
778         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
779                                                       dn, list);
780         talloc_free(dn);
781         if (ltdb->idxptr->error != 0) {
782                 return -1;
783         }
784         return 0;
785 }
786
787 /* cleanup the idxptr mode when transaction commits */
788 int ltdb_index_transaction_commit(struct ldb_module *module)
789 {
790         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
791         int ret;
792
793         struct ldb_context *ldb = ldb_module_get_ctx(module);
794
795         ldb_reset_err_string(ldb);
796
797         if (ltdb->idxptr->itdb) {
798                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
799                 tdb_close(ltdb->idxptr->itdb);
800         }
801
802         ret = ltdb->idxptr->error;
803         if (ret != LDB_SUCCESS) {
804                 if (!ldb_errstring(ldb)) {
805                         ldb_set_errstring(ldb, ldb_strerror(ret));
806                 }
807                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
808         }
809
810         talloc_free(ltdb->idxptr);
811         ltdb->idxptr = NULL;
812         return ret;
813 }
814
815 /* cleanup the idxptr mode when transaction cancels */
816 int ltdb_index_transaction_cancel(struct ldb_module *module)
817 {
818         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
819         if (ltdb->idxptr && ltdb->idxptr->itdb) {
820                 tdb_close(ltdb->idxptr->itdb);
821         }
822         talloc_free(ltdb->idxptr);
823         ltdb->idxptr = NULL;
824         return LDB_SUCCESS;
825 }
826
827
828 /*
829   return the dn key to be used for an index
830   the caller is responsible for freeing
831 */
832 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
833                                      struct ltdb_private *ltdb,
834                                      const char *attr, const struct ldb_val *value,
835                                      const struct ldb_schema_attribute **ap,
836                                      enum key_truncation *truncation)
837 {
838         struct ldb_dn *ret;
839         struct ldb_val v;
840         const struct ldb_schema_attribute *a = NULL;
841         char *attr_folded = NULL;
842         const char *attr_for_dn = NULL;
843         int r;
844         bool should_b64_encode;
845
846         unsigned int max_key_length = ltdb_max_key_length(ltdb);
847         size_t key_len = 0;
848         size_t attr_len = 0;
849         const size_t indx_len = sizeof(LTDB_INDEX) - 1;
850         unsigned frmt_len = 0;
851         const size_t additional_key_length = 4;
852         unsigned int num_separators = 3; /* Estimate for overflow check */
853         const size_t min_data = 1;
854         const size_t min_key_length = additional_key_length
855                 + indx_len + num_separators + min_data;
856
857         if (attr[0] == '@') {
858                 attr_for_dn = attr;
859                 v = *value;
860                 if (ap != NULL) {
861                         *ap = NULL;
862                 }
863         } else {
864                 attr_folded = ldb_attr_casefold(ldb, attr);
865                 if (!attr_folded) {
866                         return NULL;
867                 }
868
869                 attr_for_dn = attr_folded;
870
871                 a = ldb_schema_attribute_by_name(ldb, attr);
872                 if (ap) {
873                         *ap = a;
874                 }
875                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
876                 if (r != LDB_SUCCESS) {
877                         const char *errstr = ldb_errstring(ldb);
878                         /* canonicalisation can be refused. For
879                            example, a attribute that takes wildcards
880                            will refuse to canonicalise if the value
881                            contains a wildcard */
882                         ldb_asprintf_errstring(ldb,
883                                                "Failed to create index "
884                                                "key for attribute '%s':%s%s%s",
885                                                attr, ldb_strerror(r),
886                                                (errstr?":":""),
887                                                (errstr?errstr:""));
888                         talloc_free(attr_folded);
889                         return NULL;
890                 }
891         }
892         attr_len = strlen(attr_for_dn);
893
894         /*
895          * Check if there is any hope this will fit into the DB.
896          * Overflow here is not actually critical the code below
897          * checks again to make the printf and the DB does another
898          * check for too long keys
899          */
900         if (max_key_length - attr_len < min_key_length) {
901                 ldb_asprintf_errstring(
902                         ldb,
903                         __location__ ": max_key_length "
904                         "is too small (%u) < (%u)",
905                         max_key_length,
906                         (unsigned)(min_key_length + attr_len));
907                 talloc_free(attr_folded);
908                 return NULL;
909         }
910
911         /*
912          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
913          * "DN=" and a trailing string terminator
914          */
915         max_key_length -= additional_key_length;
916
917         /*
918          * We do not base 64 encode a DN in a key, it has already been
919          * casefold and lineraized, that is good enough.  That already
920          * avoids embedded NUL etc.
921          */
922         if (ltdb->cache->GUID_index_attribute != NULL) {
923                 if (strcmp(attr, LTDB_IDXDN) == 0) {
924                         should_b64_encode = false;
925                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
926                         /*
927                          * We can only change the behaviour for IDXONE
928                          * when the GUID index is enabled
929                          */
930                         should_b64_encode = false;
931                 } else {
932                         should_b64_encode
933                                 = ldb_should_b64_encode(ldb, &v);
934                 }
935         } else {
936                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
937         }
938
939         if (should_b64_encode) {
940                 size_t vstr_len = 0;
941                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
942                 if (!vstr) {
943                         talloc_free(attr_folded);
944                         return NULL;
945                 }
946                 vstr_len = strlen(vstr);
947                 /* 
948                  * Overflow here is not critical as we only use this
949                  * to choose the printf truncation
950                  */
951                 key_len = num_separators + indx_len + attr_len + vstr_len;
952                 if (key_len > max_key_length) {
953                         size_t excess = key_len - max_key_length;
954                         frmt_len = vstr_len - excess;
955                         *truncation = KEY_TRUNCATED;
956                         /*
957                         * Truncated keys are placed in a separate key space
958                         * from the non truncated keys
959                         * Note: the double hash "##" is not a typo and
960                         * indicates that the following value is base64 encoded
961                         */
962                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
963                                              LTDB_INDEX, attr_for_dn,
964                                              frmt_len, vstr);
965                 } else {
966                         frmt_len = vstr_len;
967                         *truncation = KEY_NOT_TRUNCATED;
968                         /*
969                          * Note: the double colon "::" is not a typo and
970                          * indicates that the following value is base64 encoded
971                          */
972                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
973                                              LTDB_INDEX, attr_for_dn,
974                                              frmt_len, vstr);
975                 }
976                 talloc_free(vstr);
977         } else {
978                 /* Only need two seperators */
979                 num_separators = 2;
980
981                 /*
982                  * Overflow here is not critical as we only use this
983                  * to choose the printf truncation
984                  */
985                 key_len = num_separators + indx_len + attr_len + (int)v.length;
986                 if (key_len > max_key_length) {
987                         size_t excess = key_len - max_key_length;
988                         frmt_len = v.length - excess;
989                         *truncation = KEY_TRUNCATED;
990                         /*
991                          * Truncated keys are placed in a separate key space
992                          * from the non truncated keys
993                          */
994                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
995                                              LTDB_INDEX, attr_for_dn,
996                                              frmt_len, (char *)v.data);
997                 } else {
998                         frmt_len = v.length;
999                         *truncation = KEY_NOT_TRUNCATED;
1000                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1001                                              LTDB_INDEX, attr_for_dn,
1002                                              frmt_len, (char *)v.data);
1003                 }
1004         }
1005
1006         if (v.data != value->data) {
1007                 talloc_free(v.data);
1008         }
1009         talloc_free(attr_folded);
1010
1011         return ret;
1012 }
1013
1014 /*
1015   see if a attribute value is in the list of indexed attributes
1016 */
1017 static bool ltdb_is_indexed(struct ldb_module *module,
1018                             struct ltdb_private *ltdb,
1019                             const char *attr)
1020 {
1021         struct ldb_context *ldb = ldb_module_get_ctx(module);
1022         unsigned int i;
1023         struct ldb_message_element *el;
1024
1025         if ((ltdb->cache->GUID_index_attribute != NULL) &&
1026             (ldb_attr_cmp(attr,
1027                           ltdb->cache->GUID_index_attribute) == 0)) {
1028                 /* Implicity covered, this is the index key */
1029                 return false;
1030         }
1031         if (ldb->schema.index_handler_override) {
1032                 const struct ldb_schema_attribute *a
1033                         = ldb_schema_attribute_by_name(ldb, attr);
1034
1035                 if (a == NULL) {
1036                         return false;
1037                 }
1038
1039                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1040                         return true;
1041                 } else {
1042                         return false;
1043                 }
1044         }
1045
1046         if (!ltdb->cache->attribute_indexes) {
1047                 return false;
1048         }
1049
1050         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
1051         if (el == NULL) {
1052                 return false;
1053         }
1054
1055         /* TODO: this is too expensive! At least use a binary search */
1056         for (i=0; i<el->num_values; i++) {
1057                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1058                         return true;
1059                 }
1060         }
1061         return false;
1062 }
1063
1064 /*
1065   in the following logic functions, the return value is treated as
1066   follows:
1067
1068      LDB_SUCCESS: we found some matching index values
1069
1070      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1071
1072      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1073                                we'll need a full search
1074  */
1075
1076 /*
1077   return a list of dn's that might match a simple indexed search (an
1078   equality search only)
1079  */
1080 static int ltdb_index_dn_simple(struct ldb_module *module,
1081                                 struct ltdb_private *ltdb,
1082                                 const struct ldb_parse_tree *tree,
1083                                 struct dn_list *list)
1084 {
1085         struct ldb_context *ldb;
1086         struct ldb_dn *dn;
1087         int ret;
1088         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1089
1090         ldb = ldb_module_get_ctx(module);
1091
1092         list->count = 0;
1093         list->dn = NULL;
1094
1095         /* if the attribute isn't in the list of indexed attributes then
1096            this node needs a full search */
1097         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
1098                 return LDB_ERR_OPERATIONS_ERROR;
1099         }
1100
1101         /* the attribute is indexed. Pull the list of DNs that match the
1102            search criterion */
1103         dn = ltdb_index_key(ldb, ltdb,
1104                             tree->u.equality.attr,
1105                             &tree->u.equality.value, NULL, &truncation);
1106         /*
1107          * We ignore truncation here and allow multi-valued matches
1108          * as ltdb_search_indexed will filter out the wrong one in
1109          * ltdb_index_filter() which calls ldb_match_message().
1110          */
1111         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1112
1113         ret = ltdb_dn_list_load(module, ltdb, dn, list);
1114         talloc_free(dn);
1115         return ret;
1116 }
1117
1118
1119 static bool list_union(struct ldb_context *ldb,
1120                        struct ltdb_private *ltdb,
1121                        struct dn_list *list, struct dn_list *list2);
1122
1123 /*
1124   return a list of dn's that might match a leaf indexed search
1125  */
1126 static int ltdb_index_dn_leaf(struct ldb_module *module,
1127                               struct ltdb_private *ltdb,
1128                               const struct ldb_parse_tree *tree,
1129                               struct dn_list *list)
1130 {
1131         if (ltdb->disallow_dn_filter &&
1132             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1133                 /* in AD mode we do not support "(dn=...)" search filters */
1134                 list->dn = NULL;
1135                 list->count = 0;
1136                 return LDB_SUCCESS;
1137         }
1138         if (tree->u.equality.attr[0] == '@') {
1139                 /* Do not allow a indexed search against an @ */
1140                 list->dn = NULL;
1141                 list->count = 0;
1142                 return LDB_SUCCESS;
1143         }
1144         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1145                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1146                 struct ldb_dn *dn
1147                         = ldb_dn_from_ldb_val(list,
1148                                               ldb_module_get_ctx(module),
1149                                               &tree->u.equality.value);
1150                 if (dn == NULL) {
1151                         /* If we can't parse it, no match */
1152                         list->dn = NULL;
1153                         list->count = 0;
1154                         return LDB_SUCCESS;
1155                 }
1156
1157                 /*
1158                  * Re-use the same code we use for a SCOPE_BASE
1159                  * search
1160                  *
1161                  * We can't call TALLOC_FREE(dn) as this must belong
1162                  * to list for the memory to remain valid.
1163                  */
1164                 return ltdb_index_dn_base_dn(module, ltdb, dn, list,
1165                                              &truncation);
1166                 /*
1167                  * We ignore truncation here and allow multi-valued matches
1168                  * as ltdb_search_indexed will filter out the wrong one in
1169                  * ltdb_index_filter() which calls ldb_match_message().
1170                  */
1171
1172         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
1173                    (ldb_attr_cmp(tree->u.equality.attr,
1174                                  ltdb->cache->GUID_index_attribute) == 0)) {
1175                 int ret;
1176                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1177                 list->dn = talloc_array(list, struct ldb_val, 1);
1178                 if (list->dn == NULL) {
1179                         ldb_module_oom(module);
1180                         return LDB_ERR_OPERATIONS_ERROR;
1181                 }
1182                 /*
1183                  * We need to go via the canonicalise_fn() to
1184                  * ensure we get the index in binary, rather
1185                  * than a string
1186                  */
1187                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
1188                                                                list->dn,
1189                                                                &tree->u.equality.value,
1190                                                                &list->dn[0]);
1191                 if (ret != LDB_SUCCESS) {
1192                         return LDB_ERR_OPERATIONS_ERROR;
1193                 }
1194                 list->count = 1;
1195                 return LDB_SUCCESS;
1196         }
1197
1198         return ltdb_index_dn_simple(module, ltdb, tree, list);
1199 }
1200
1201
1202 /*
1203   list intersection
1204   list = list & list2
1205 */
1206 static bool list_intersect(struct ldb_context *ldb,
1207                            struct ltdb_private *ltdb,
1208                            struct dn_list *list, const struct dn_list *list2)
1209 {
1210         const struct dn_list *short_list, *long_list;
1211         struct dn_list *list3;
1212         unsigned int i;
1213
1214         if (list->count == 0) {
1215                 /* 0 & X == 0 */
1216                 return true;
1217         }
1218         if (list2->count == 0) {
1219                 /* X & 0 == 0 */
1220                 list->count = 0;
1221                 list->dn = NULL;
1222                 return true;
1223         }
1224
1225         /* the indexing code is allowed to return a longer list than
1226            what really matches, as all results are filtered by the
1227            full expression at the end - this shortcut avoids a lot of
1228            work in some cases */
1229         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1230                 return true;
1231         }
1232         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1233                 list->count = list2->count;
1234                 list->dn = list2->dn;
1235                 /* note that list2 may not be the parent of list2->dn,
1236                    as list2->dn may be owned by ltdb->idxptr. In that
1237                    case we expect this reparent call to fail, which is
1238                    OK */
1239                 talloc_reparent(list2, list, list2->dn);
1240                 return true;
1241         }
1242
1243         if (list->count > list2->count) {
1244                 short_list = list2;
1245                 long_list = list;
1246         } else {
1247                 short_list = list;
1248                 long_list = list2;
1249         }
1250
1251         list3 = talloc_zero(list, struct dn_list);
1252         if (list3 == NULL) {
1253                 return false;
1254         }
1255
1256         list3->dn = talloc_array(list3, struct ldb_val,
1257                                  MIN(list->count, list2->count));
1258         if (!list3->dn) {
1259                 talloc_free(list3);
1260                 return false;
1261         }
1262         list3->count = 0;
1263
1264         for (i=0;i<short_list->count;i++) {
1265                 /* For the GUID index case, this is a binary search */
1266                 if (ltdb_dn_list_find_val(ltdb, long_list,
1267                                           &short_list->dn[i]) != -1) {
1268                         list3->dn[list3->count] = short_list->dn[i];
1269                         list3->count++;
1270                 }
1271         }
1272
1273         list->strict |= list2->strict;
1274         list->dn = talloc_steal(list, list3->dn);
1275         list->count = list3->count;
1276         talloc_free(list3);
1277
1278         return true;
1279 }
1280
1281
1282 /*
1283   list union
1284   list = list | list2
1285 */
1286 static bool list_union(struct ldb_context *ldb,
1287                        struct ltdb_private *ltdb,
1288                        struct dn_list *list, struct dn_list *list2)
1289 {
1290         struct ldb_val *dn3;
1291         unsigned int i = 0, j = 0, k = 0;
1292
1293         if (list2->count == 0) {
1294                 /* X | 0 == X */
1295                 return true;
1296         }
1297
1298         if (list->count == 0) {
1299                 /* 0 | X == X */
1300                 list->count = list2->count;
1301                 list->dn = list2->dn;
1302                 /* note that list2 may not be the parent of list2->dn,
1303                    as list2->dn may be owned by ltdb->idxptr. In that
1304                    case we expect this reparent call to fail, which is
1305                    OK */
1306                 talloc_reparent(list2, list, list2->dn);
1307                 return true;
1308         }
1309
1310         /*
1311          * Sort the lists (if not in GUID DN mode) so we can do
1312          * the de-duplication during the merge
1313          *
1314          * NOTE: This can sort the in-memory index values, as list or
1315          * list2 might not be a copy!
1316          */
1317         ltdb_dn_list_sort(ltdb, list);
1318         ltdb_dn_list_sort(ltdb, list2);
1319
1320         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1321         if (!dn3) {
1322                 ldb_oom(ldb);
1323                 return false;
1324         }
1325
1326         while (i < list->count || j < list2->count) {
1327                 int cmp;
1328                 if (i >= list->count) {
1329                         cmp = 1;
1330                 } else if (j >= list2->count) {
1331                         cmp = -1;
1332                 } else {
1333                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1334                                                           &list2->dn[j]);
1335                 }
1336
1337                 if (cmp < 0) {
1338                         /* Take list */
1339                         dn3[k] = list->dn[i];
1340                         i++;
1341                         k++;
1342                 } else if (cmp > 0) {
1343                         /* Take list2 */
1344                         dn3[k] = list2->dn[j];
1345                         j++;
1346                         k++;
1347                 } else {
1348                         /* Equal, take list */
1349                         dn3[k] = list->dn[i];
1350                         i++;
1351                         j++;
1352                         k++;
1353                 }
1354         }
1355
1356         list->dn = dn3;
1357         list->count = k;
1358
1359         return true;
1360 }
1361
1362 static int ltdb_index_dn(struct ldb_module *module,
1363                          struct ltdb_private *ltdb,
1364                          const struct ldb_parse_tree *tree,
1365                          struct dn_list *list);
1366
1367
1368 /*
1369   process an OR list (a union)
1370  */
1371 static int ltdb_index_dn_or(struct ldb_module *module,
1372                             struct ltdb_private *ltdb,
1373                             const struct ldb_parse_tree *tree,
1374                             struct dn_list *list)
1375 {
1376         struct ldb_context *ldb;
1377         unsigned int i;
1378
1379         ldb = ldb_module_get_ctx(module);
1380
1381         list->dn = NULL;
1382         list->count = 0;
1383
1384         for (i=0; i<tree->u.list.num_elements; i++) {
1385                 struct dn_list *list2;
1386                 int ret;
1387
1388                 list2 = talloc_zero(list, struct dn_list);
1389                 if (list2 == NULL) {
1390                         return LDB_ERR_OPERATIONS_ERROR;
1391                 }
1392
1393                 ret = ltdb_index_dn(module, ltdb,
1394                                     tree->u.list.elements[i], list2);
1395
1396                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1397                         /* X || 0 == X */
1398                         talloc_free(list2);
1399                         continue;
1400                 }
1401
1402                 if (ret != LDB_SUCCESS) {
1403                         /* X || * == * */
1404                         talloc_free(list2);
1405                         return ret;
1406                 }
1407
1408                 if (!list_union(ldb, ltdb, list, list2)) {
1409                         talloc_free(list2);
1410                         return LDB_ERR_OPERATIONS_ERROR;
1411                 }
1412         }
1413
1414         if (list->count == 0) {
1415                 return LDB_ERR_NO_SUCH_OBJECT;
1416         }
1417
1418         return LDB_SUCCESS;
1419 }
1420
1421
1422 /*
1423   NOT an index results
1424  */
1425 static int ltdb_index_dn_not(struct ldb_module *module,
1426                              struct ltdb_private *ltdb,
1427                              const struct ldb_parse_tree *tree,
1428                              struct dn_list *list)
1429 {
1430         /* the only way to do an indexed not would be if we could
1431            negate the not via another not or if we knew the total
1432            number of database elements so we could know that the
1433            existing expression covered the whole database.
1434
1435            instead, we just give up, and rely on a full index scan
1436            (unless an outer & manages to reduce the list)
1437         */
1438         return LDB_ERR_OPERATIONS_ERROR;
1439 }
1440
1441 /*
1442  * These things are unique, so avoid a full scan if this is a search
1443  * by GUID, DN or a unique attribute
1444  */
1445 static bool ltdb_index_unique(struct ldb_context *ldb,
1446                               struct ltdb_private *ltdb,
1447                               const char *attr)
1448 {
1449         const struct ldb_schema_attribute *a;
1450         if (ltdb->cache->GUID_index_attribute != NULL) {
1451                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1452                         return true;
1453                 }
1454         }
1455         if (ldb_attr_dn(attr) == 0) {
1456                 return true;
1457         }
1458
1459         a = ldb_schema_attribute_by_name(ldb, attr);
1460         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1461                 return true;
1462         }
1463         return false;
1464 }
1465
1466 /*
1467   process an AND expression (intersection)
1468  */
1469 static int ltdb_index_dn_and(struct ldb_module *module,
1470                              struct ltdb_private *ltdb,
1471                              const struct ldb_parse_tree *tree,
1472                              struct dn_list *list)
1473 {
1474         struct ldb_context *ldb;
1475         unsigned int i;
1476         bool found;
1477
1478         ldb = ldb_module_get_ctx(module);
1479
1480         list->dn = NULL;
1481         list->count = 0;
1482
1483         /* in the first pass we only look for unique simple
1484            equality tests, in the hope of avoiding having to look
1485            at any others */
1486         for (i=0; i<tree->u.list.num_elements; i++) {
1487                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1488                 int ret;
1489
1490                 if (subtree->operation != LDB_OP_EQUALITY ||
1491                     !ltdb_index_unique(ldb, ltdb,
1492                                        subtree->u.equality.attr)) {
1493                         continue;
1494                 }
1495
1496                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1497                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1498                         /* 0 && X == 0 */
1499                         return LDB_ERR_NO_SUCH_OBJECT;
1500                 }
1501                 if (ret == LDB_SUCCESS) {
1502                         /* a unique index match means we can
1503                          * stop. Note that we don't care if we return
1504                          * a few too many objects, due to later
1505                          * filtering */
1506                         return LDB_SUCCESS;
1507                 }
1508         }
1509
1510         /* now do a full intersection */
1511         found = false;
1512
1513         for (i=0; i<tree->u.list.num_elements; i++) {
1514                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1515                 struct dn_list *list2;
1516                 int ret;
1517
1518                 list2 = talloc_zero(list, struct dn_list);
1519                 if (list2 == NULL) {
1520                         return ldb_module_oom(module);
1521                 }
1522
1523                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1524
1525                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1526                         /* X && 0 == 0 */
1527                         list->dn = NULL;
1528                         list->count = 0;
1529                         talloc_free(list2);
1530                         return LDB_ERR_NO_SUCH_OBJECT;
1531                 }
1532
1533                 if (ret != LDB_SUCCESS) {
1534                         /* this didn't adding anything */
1535                         talloc_free(list2);
1536                         continue;
1537                 }
1538
1539                 if (!found) {
1540                         talloc_reparent(list2, list, list->dn);
1541                         list->dn = list2->dn;
1542                         list->count = list2->count;
1543                         found = true;
1544                 } else if (!list_intersect(ldb, ltdb,
1545                                            list, list2)) {
1546                         talloc_free(list2);
1547                         return LDB_ERR_OPERATIONS_ERROR;
1548                 }
1549
1550                 if (list->count == 0) {
1551                         list->dn = NULL;
1552                         return LDB_ERR_NO_SUCH_OBJECT;
1553                 }
1554
1555                 if (list->count < 2) {
1556                         /* it isn't worth loading the next part of the tree */
1557                         return LDB_SUCCESS;
1558                 }
1559         }
1560
1561         if (!found) {
1562                 /* none of the attributes were indexed */
1563                 return LDB_ERR_OPERATIONS_ERROR;
1564         }
1565
1566         return LDB_SUCCESS;
1567 }
1568
1569 /*
1570   return a list of matching objects using a one-level index
1571  */
1572 static int ltdb_index_dn_attr(struct ldb_module *module,
1573                               struct ltdb_private *ltdb,
1574                               const char *attr,
1575                               struct ldb_dn *dn,
1576                               struct dn_list *list,
1577                               enum key_truncation *truncation)
1578 {
1579         struct ldb_context *ldb;
1580         struct ldb_dn *key;
1581         struct ldb_val val;
1582         int ret;
1583
1584         ldb = ldb_module_get_ctx(module);
1585
1586         /* work out the index key from the parent DN */
1587         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1588         val.length = strlen((char *)val.data);
1589         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
1590         if (!key) {
1591                 ldb_oom(ldb);
1592                 return LDB_ERR_OPERATIONS_ERROR;
1593         }
1594
1595         ret = ltdb_dn_list_load(module, ltdb, key, list);
1596         talloc_free(key);
1597         if (ret != LDB_SUCCESS) {
1598                 return ret;
1599         }
1600
1601         if (list->count == 0) {
1602                 return LDB_ERR_NO_SUCH_OBJECT;
1603         }
1604
1605         return LDB_SUCCESS;
1606 }
1607
1608 /*
1609   return a list of matching objects using a one-level index
1610  */
1611 static int ltdb_index_dn_one(struct ldb_module *module,
1612                              struct ltdb_private *ltdb,
1613                              struct ldb_dn *parent_dn,
1614                              struct dn_list *list,
1615                              enum key_truncation *truncation)
1616 {
1617         /* Ensure we do not shortcut on intersection for this list */
1618         list->strict = true;
1619         return ltdb_index_dn_attr(module, ltdb,
1620                                   LTDB_IDXONE, parent_dn, list, truncation);
1621
1622 }
1623
1624 /*
1625   return a list of matching objects using the DN index
1626  */
1627 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1628                                  struct ltdb_private *ltdb,
1629                                  struct ldb_dn *base_dn,
1630                                  struct dn_list *dn_list,
1631                                  enum key_truncation *truncation)
1632 {
1633         const struct ldb_val *guid_val = NULL;
1634         if (ltdb->cache->GUID_index_attribute == NULL) {
1635                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1636                 if (dn_list->dn == NULL) {
1637                         return ldb_module_oom(module);
1638                 }
1639                 dn_list->dn[0].data = discard_const_p(unsigned char,
1640                                                       ldb_dn_get_linearized(base_dn));
1641                 if (dn_list->dn[0].data == NULL) {
1642                         return ldb_module_oom(module);
1643                 }
1644                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1645                 dn_list->count = 1;
1646
1647                 return LDB_SUCCESS;
1648         }
1649
1650         if (ltdb->cache->GUID_index_dn_component != NULL) {
1651                 guid_val = ldb_dn_get_extended_component(base_dn,
1652                                                          ltdb->cache->GUID_index_dn_component);
1653         }
1654
1655         if (guid_val != NULL) {
1656                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1657                 if (dn_list->dn == NULL) {
1658                         return ldb_module_oom(module);
1659                 }
1660                 dn_list->dn[0].data = guid_val->data;
1661                 dn_list->dn[0].length = guid_val->length;
1662                 dn_list->count = 1;
1663
1664                 return LDB_SUCCESS;
1665         }
1666
1667         return ltdb_index_dn_attr(module, ltdb,
1668                                   LTDB_IDXDN, base_dn, dn_list, truncation);
1669 }
1670
1671 /*
1672   return a list of dn's that might match a indexed search or
1673   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1674  */
1675 static int ltdb_index_dn(struct ldb_module *module,
1676                          struct ltdb_private *ltdb,
1677                          const struct ldb_parse_tree *tree,
1678                          struct dn_list *list)
1679 {
1680         int ret = LDB_ERR_OPERATIONS_ERROR;
1681
1682         switch (tree->operation) {
1683         case LDB_OP_AND:
1684                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1685                 break;
1686
1687         case LDB_OP_OR:
1688                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1689                 break;
1690
1691         case LDB_OP_NOT:
1692                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1693                 break;
1694
1695         case LDB_OP_EQUALITY:
1696                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1697                 break;
1698
1699         case LDB_OP_SUBSTRING:
1700         case LDB_OP_GREATER:
1701         case LDB_OP_LESS:
1702         case LDB_OP_PRESENT:
1703         case LDB_OP_APPROX:
1704         case LDB_OP_EXTENDED:
1705                 /* we can't index with fancy bitops yet */
1706                 ret = LDB_ERR_OPERATIONS_ERROR;
1707                 break;
1708         }
1709
1710         return ret;
1711 }
1712
1713 /*
1714   filter a candidate dn_list from an indexed search into a set of results
1715   extracting just the given attributes
1716 */
1717 static int ltdb_index_filter(struct ltdb_private *ltdb,
1718                              const struct dn_list *dn_list,
1719                              struct ltdb_context *ac,
1720                              uint32_t *match_count,
1721                              enum key_truncation scope_one_truncation)
1722 {
1723         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1724         struct ldb_message *msg;
1725         struct ldb_message *filtered_msg;
1726         unsigned int i;
1727         unsigned int num_keys = 0;
1728         uint8_t previous_guid_key[LTDB_GUID_KEY_SIZE] = {};
1729         TDB_DATA *keys = NULL;
1730
1731         /*
1732          * We have to allocate the key list (rather than just walk the
1733          * caller supplied list) as the callback could change the list
1734          * (by modifying an indexed attribute hosted in the in-memory
1735          * index cache!)
1736          */
1737         keys = talloc_array(ac, TDB_DATA, dn_list->count);
1738         if (keys == NULL) {
1739                 return ldb_module_oom(ac->module);
1740         }
1741
1742         if (ltdb->cache->GUID_index_attribute != NULL) {
1743                 /*
1744                  * We speculate that the keys will be GUID based and so
1745                  * pre-fill in enough space for a GUID (avoiding a pile of
1746                  * small allocations)
1747                  */
1748                 struct guid_tdb_key {
1749                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1750                 } *key_values = NULL;
1751
1752                 key_values = talloc_array(keys,
1753                                           struct guid_tdb_key,
1754                                           dn_list->count);
1755
1756                 for (i = 0; i < dn_list->count; i++) {
1757                         keys[i].dptr = key_values[i].guid_key;
1758                         keys[i].dsize = sizeof(key_values[i].guid_key);
1759                 }
1760                 if (key_values == NULL) {
1761                         return ldb_module_oom(ac->module);
1762                 }
1763         } else {
1764                 for (i = 0; i < dn_list->count; i++) {
1765                         keys[i].dptr = NULL;
1766                         keys[i].dsize = 0;
1767                 }
1768         }
1769
1770         for (i = 0; i < dn_list->count; i++) {
1771                 int ret;
1772
1773                 ret = ltdb_idx_to_key(ac->module,
1774                                       ltdb,
1775                                       keys,
1776                                       &dn_list->dn[i],
1777                                       &keys[num_keys]);
1778                 if (ret != LDB_SUCCESS) {
1779                         return ret;
1780                 }
1781
1782                 if (ltdb->cache->GUID_index_attribute != NULL) {
1783                         /*
1784                          * If we are in GUID index mode, then the dn_list is
1785                          * sorted.  If we got a duplicate, forget about it, as
1786                          * otherwise we would send the same entry back more
1787                          * than once.
1788                          *
1789                          * This is needed in the truncated DN case, or if a
1790                          * duplicate was forced in via
1791                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1792                          */
1793
1794                         if (memcmp(previous_guid_key,
1795                                    keys[num_keys].dptr,
1796                                    sizeof(previous_guid_key)) == 0) {
1797                                 continue;
1798                         }
1799
1800                         memcpy(previous_guid_key,
1801                                keys[num_keys].dptr,
1802                                sizeof(previous_guid_key));
1803                 }
1804                 num_keys++;
1805         }
1806
1807
1808         /*
1809          * Now that the list is a safe copy, send the callbacks
1810          */
1811         for (i = 0; i < num_keys; i++) {
1812                 int ret;
1813                 bool matched;
1814                 msg = ldb_msg_new(ac);
1815                 if (!msg) {
1816                         return LDB_ERR_OPERATIONS_ERROR;
1817                 }
1818
1819                 ret = ltdb_search_key(ac->module, ltdb,
1820                                       keys[i], msg,
1821                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1822                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1823                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1824                         /*
1825                          * the record has disappeared? yes, this can
1826                          * happen if the entry is deleted by something
1827                          * operating in the callback (not another
1828                          * process, as we have a read lock)
1829                          */
1830                         talloc_free(msg);
1831                         continue;
1832                 }
1833
1834                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1835                         /* an internal error */
1836                         talloc_free(msg);
1837                         return LDB_ERR_OPERATIONS_ERROR;
1838                 }
1839
1840                 /*
1841                  * We trust the index for LDB_SCOPE_ONELEVEL
1842                  * unless the index key has been truncated.
1843                  *
1844                  * LDB_SCOPE_BASE is not passed in by our only caller.
1845                  */
1846                 if (ac->scope == LDB_SCOPE_ONELEVEL
1847                     && ltdb->cache->one_level_indexes
1848                     && scope_one_truncation == KEY_NOT_TRUNCATED) {
1849                         ret = ldb_match_message(ldb, msg, ac->tree,
1850                                                 ac->scope, &matched);
1851                 } else {
1852                         ret = ldb_match_msg_error(ldb, msg,
1853                                                   ac->tree, ac->base,
1854                                                   ac->scope, &matched);
1855                 }
1856
1857                 if (ret != LDB_SUCCESS) {
1858                         talloc_free(msg);
1859                         return ret;
1860                 }
1861                 if (!matched) {
1862                         talloc_free(msg);
1863                         continue;
1864                 }
1865
1866                 /* filter the attributes that the user wants */
1867                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1868
1869                 talloc_free(msg);
1870
1871                 if (ret == -1) {
1872                         return LDB_ERR_OPERATIONS_ERROR;
1873                 }
1874
1875                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1876                 if (ret != LDB_SUCCESS) {
1877                         /* Regardless of success or failure, the msg
1878                          * is the callbacks responsiblity, and should
1879                          * not be talloc_free()'ed */
1880                         ac->request_terminated = true;
1881                         return ret;
1882                 }
1883
1884                 (*match_count)++;
1885         }
1886
1887         TALLOC_FREE(keys);
1888         return LDB_SUCCESS;
1889 }
1890
1891 /*
1892   sort a DN list
1893  */
1894 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1895                               struct dn_list *list)
1896 {
1897         if (list->count < 2) {
1898                 return;
1899         }
1900
1901         /* We know the list is sorted when using the GUID index */
1902         if (ltdb->cache->GUID_index_attribute != NULL) {
1903                 return;
1904         }
1905
1906         TYPESAFE_QSORT(list->dn, list->count,
1907                        ldb_val_equal_exact_for_qsort);
1908 }
1909
1910 /*
1911   search the database with a LDAP-like expression using indexes
1912   returns -1 if an indexed search is not possible, in which
1913   case the caller should call ltdb_search_full()
1914 */
1915 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1916 {
1917         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1918         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1919         struct dn_list *dn_list;
1920         int ret;
1921         enum ldb_scope index_scope;
1922         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1923
1924         /* see if indexing is enabled */
1925         if (!ltdb->cache->attribute_indexes &&
1926             !ltdb->cache->one_level_indexes &&
1927             ac->scope != LDB_SCOPE_BASE) {
1928                 /* fallback to a full search */
1929                 return LDB_ERR_OPERATIONS_ERROR;
1930         }
1931
1932         dn_list = talloc_zero(ac, struct dn_list);
1933         if (dn_list == NULL) {
1934                 return ldb_module_oom(ac->module);
1935         }
1936
1937         /*
1938          * For the purposes of selecting the switch arm below, if we
1939          * don't have a one-level index then treat it like a subtree
1940          * search
1941          */
1942         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1943             !ltdb->cache->one_level_indexes) {
1944                 index_scope = LDB_SCOPE_SUBTREE;
1945         } else {
1946                 index_scope = ac->scope;
1947         }
1948
1949         switch (index_scope) {
1950         case LDB_SCOPE_BASE:
1951                 /*
1952                  * The only caller will have filtered the operation out
1953                  * so we should never get here
1954                  */
1955                 return ldb_operr(ldb);
1956
1957         case LDB_SCOPE_ONELEVEL:
1958                 /*
1959                  * If we ever start to also load the index values for
1960                  * the tree, we must ensure we strictly intersect with
1961                  * this list, as we trust the ONELEVEL index
1962                  */
1963                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
1964                                         &scope_one_truncation);
1965                 if (ret != LDB_SUCCESS) {
1966                         talloc_free(dn_list);
1967                         return ret;
1968                 }
1969
1970                 /*
1971                  * If we have too many matches, running the filter
1972                  * tree over the SCOPE_ONELEVEL can be quite expensive
1973                  * so we now check the filter tree index as well.
1974                  *
1975                  * We only do this in the GUID index mode, which is
1976                  * O(n*log(m)) otherwise the intersection below will
1977                  * be too costly at O(n*m).
1978                  *
1979                  * We don't set a heuristic for 'too many' but instead
1980                  * do it always and rely on the index lookup being
1981                  * fast enough in the small case.
1982                  */
1983                 if (ltdb->cache->GUID_index_attribute != NULL) {
1984                         struct dn_list *idx_one_tree_list
1985                                 = talloc_zero(ac, struct dn_list);
1986                         if (idx_one_tree_list == NULL) {
1987                                 return ldb_module_oom(ac->module);
1988                         }
1989
1990                         if (!ltdb->cache->attribute_indexes) {
1991                                 talloc_free(idx_one_tree_list);
1992                                 talloc_free(dn_list);
1993                                 return LDB_ERR_OPERATIONS_ERROR;
1994                         }
1995                         /*
1996                          * Here we load the index for the tree.
1997                          */
1998                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
1999                                             idx_one_tree_list);
2000                         if (ret != LDB_SUCCESS) {
2001                                 talloc_free(idx_one_tree_list);
2002                                 talloc_free(dn_list);
2003                                 return ret;
2004                         }
2005
2006                         if (!list_intersect(ldb, ltdb,
2007                                             dn_list, idx_one_tree_list)) {
2008                                 talloc_free(idx_one_tree_list);
2009                                 talloc_free(dn_list);
2010                                 return LDB_ERR_OPERATIONS_ERROR;
2011                         }
2012                 }
2013                 break;
2014
2015         case LDB_SCOPE_SUBTREE:
2016         case LDB_SCOPE_DEFAULT:
2017                 if (!ltdb->cache->attribute_indexes) {
2018                         talloc_free(dn_list);
2019                         return LDB_ERR_OPERATIONS_ERROR;
2020                 }
2021                 /*
2022                  * Here we load the index for the tree.  We have no
2023                  * index for the subtree.
2024                  */
2025                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
2026                 if (ret != LDB_SUCCESS) {
2027                         talloc_free(dn_list);
2028                         return ret;
2029                 }
2030                 break;
2031         }
2032
2033         /*
2034          * It is critical that this function do the re-filter even
2035          * on things found by the index as the index can over-match
2036          * in cases of truncation (as well as when it decides it is
2037          * not worth further filtering)
2038          *
2039          * If this changes, then the index code above would need to
2040          * pass up a flag to say if any index was truncated during
2041          * processing as the truncation here refers only to the
2042          * SCOPE_ONELEVEL index.
2043          */
2044         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
2045                                 scope_one_truncation);
2046         talloc_free(dn_list);
2047         return ret;
2048 }
2049
2050 /**
2051  * @brief Add a DN in the index list of a given attribute name/value pair
2052  *
2053  * This function will add the DN in the index list for the index for
2054  * the given attribute name and value.
2055  *
2056  * @param[in]  module       A ldb_module structure
2057  *
2058  * @param[in]  dn           The string representation of the DN as it
2059  *                          will be stored in the index entry
2060  *
2061  * @param[in]  el           A ldb_message_element array, one of the entry
2062  *                          referred by the v_idx is the attribute name and
2063  *                          value pair which will be used to construct the
2064  *                          index name
2065  *
2066  * @param[in]  v_idx        The index of element in the el array to use
2067  *
2068  * @return                  An ldb error code
2069  */
2070 static int ltdb_index_add1(struct ldb_module *module,
2071                            struct ltdb_private *ltdb,
2072                            const struct ldb_message *msg,
2073                            struct ldb_message_element *el, int v_idx)
2074 {
2075         struct ldb_context *ldb;
2076         struct ldb_dn *dn_key;
2077         int ret;
2078         const struct ldb_schema_attribute *a;
2079         struct dn_list *list;
2080         unsigned alloc_len;
2081         enum key_truncation truncation = KEY_TRUNCATED;
2082
2083
2084         ldb = ldb_module_get_ctx(module);
2085
2086         list = talloc_zero(module, struct dn_list);
2087         if (list == NULL) {
2088                 return LDB_ERR_OPERATIONS_ERROR;
2089         }
2090
2091         dn_key = ltdb_index_key(ldb, ltdb,
2092                                 el->name, &el->values[v_idx], &a, &truncation);
2093         if (!dn_key) {
2094                 talloc_free(list);
2095                 return LDB_ERR_OPERATIONS_ERROR;
2096         }
2097         /*
2098          * Samba only maintains unique indexes on the objectSID and objectGUID
2099          * so if a unique index key exceeds the maximum length there is a
2100          * problem.
2101          */
2102         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2103                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2104                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2105
2106                 ldb_asprintf_errstring(
2107                         ldb,
2108                         __location__ ": unique index key on %s in %s, "
2109                         "exceeds maximum key length of %u (encoded).",
2110                         el->name,
2111                         ldb_dn_get_linearized(msg->dn),
2112                         ltdb->max_key_length);
2113                 talloc_free(list);
2114                 return LDB_ERR_CONSTRAINT_VIOLATION;
2115         }
2116         talloc_steal(list, dn_key);
2117
2118         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2119         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2120                 talloc_free(list);
2121                 return ret;
2122         }
2123
2124         /*
2125          * Check for duplicates in the @IDXDN DN -> GUID record
2126          *
2127          * This is very normal, it just means a duplicate DN creation
2128          * was attempted, so don't set the error string or print scary
2129          * messages.
2130          */
2131         if (list->count > 0 &&
2132             ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
2133             truncation == KEY_NOT_TRUNCATED) {
2134
2135                 talloc_free(list);
2136                 return LDB_ERR_CONSTRAINT_VIOLATION;
2137
2138         } else if (list->count > 0
2139                    && ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
2140
2141                 /*
2142                  * At least one existing entry in the DN->GUID index, which
2143                  * arises when the DN indexes have been truncated
2144                  *
2145                  * So need to pull the DN's to check if it's really a duplicate
2146                  */
2147                 int i;
2148                 for (i=0; i < list->count; i++) {
2149                         uint8_t guid_key[LTDB_GUID_KEY_SIZE];
2150                         TDB_DATA key = {
2151                                 .dptr = guid_key,
2152                                 .dsize = sizeof(guid_key)
2153                         };
2154                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2155                         struct ldb_message *rec = ldb_msg_new(ldb);
2156                         if (rec == NULL) {
2157                                 return LDB_ERR_OPERATIONS_ERROR;
2158                         }
2159
2160                         ret = ltdb_idx_to_key(module, ltdb,
2161                                               ldb, &list->dn[i],
2162                                               &key);
2163                         if (ret != LDB_SUCCESS) {
2164                                 TALLOC_FREE(list);
2165                                 TALLOC_FREE(rec);
2166                                 return ret;
2167                         }
2168
2169                         ret = ltdb_search_key(module, ltdb, key,
2170                                               rec, flags);
2171                         if (key.dptr != guid_key) {
2172                                 TALLOC_FREE(key.dptr);
2173                         }
2174                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2175                                 /*
2176                                  * the record has disappeared?
2177                                  * yes, this can happen
2178                                  */
2179                                 talloc_free(rec);
2180                                 continue;
2181                         }
2182
2183                         if (ret != LDB_SUCCESS) {
2184                                 /* an internal error */
2185                                 TALLOC_FREE(rec);
2186                                 TALLOC_FREE(list);
2187                                 return LDB_ERR_OPERATIONS_ERROR;
2188                         }
2189                         /*
2190                          * The DN we are trying to add to the DB and index
2191                          * is already here, so we must deny the addition
2192                          */
2193                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2194                                 TALLOC_FREE(rec);
2195                                 TALLOC_FREE(list);
2196                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2197                         }
2198                 }
2199         }
2200
2201         /*
2202          * Check for duplicates in unique indexes
2203          *
2204          * We don't need to do a loop test like the @IDXDN case
2205          * above as we have a ban on long unique index values
2206          * at the start of this function.
2207          */
2208         if (list->count > 0 &&
2209             ((a != NULL
2210               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2211                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2212                 /*
2213                  * We do not want to print info about a possibly
2214                  * confidential DN that the conflict was with in the
2215                  * user-visible error string
2216                  */
2217
2218                 if (ltdb->cache->GUID_index_attribute == NULL) {
2219                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2220                                   __location__
2221                                   ": unique index violation on %s in %s, "
2222                                   "conficts with %*.*s in %s",
2223                                   el->name, ldb_dn_get_linearized(msg->dn),
2224                                   (int)list->dn[0].length,
2225                                   (int)list->dn[0].length,
2226                                   list->dn[0].data,
2227                                   ldb_dn_get_linearized(dn_key));
2228                 } else {
2229                         /* This can't fail, gives a default at worst */
2230                         const struct ldb_schema_attribute *attr
2231                                 = ldb_schema_attribute_by_name(
2232                                         ldb,
2233                                         ltdb->cache->GUID_index_attribute);
2234                         struct ldb_val v;
2235                         ret = attr->syntax->ldif_write_fn(ldb, list,
2236                                                           &list->dn[0], &v);
2237                         if (ret == LDB_SUCCESS) {
2238                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2239                                           __location__
2240                                           ": unique index violation on %s in "
2241                                           "%s, conficts with %s %*.*s in %s",
2242                                           el->name,
2243                                           ldb_dn_get_linearized(msg->dn),
2244                                           ltdb->cache->GUID_index_attribute,
2245                                           (int)v.length,
2246                                           (int)v.length,
2247                                           v.data,
2248                                           ldb_dn_get_linearized(dn_key));
2249                         }
2250                 }
2251                 ldb_asprintf_errstring(ldb,
2252                                        __location__ ": unique index violation "
2253                                        "on %s in %s",
2254                                        el->name,
2255                                        ldb_dn_get_linearized(msg->dn));
2256                 talloc_free(list);
2257                 return LDB_ERR_CONSTRAINT_VIOLATION;
2258         }
2259
2260         /* overallocate the list a bit, to reduce the number of
2261          * realloc trigered copies */
2262         alloc_len = ((list->count+1)+7) & ~7;
2263         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2264         if (list->dn == NULL) {
2265                 talloc_free(list);
2266                 return LDB_ERR_OPERATIONS_ERROR;
2267         }
2268
2269         if (ltdb->cache->GUID_index_attribute == NULL) {
2270                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2271                 list->dn[list->count].data
2272                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2273                 if (list->dn[list->count].data == NULL) {
2274                         talloc_free(list);
2275                         return LDB_ERR_OPERATIONS_ERROR;
2276                 }
2277                 list->dn[list->count].length = strlen(dn_str);
2278         } else {
2279                 const struct ldb_val *key_val;
2280                 struct ldb_val *exact = NULL, *next = NULL;
2281                 key_val = ldb_msg_find_ldb_val(msg,
2282                                                ltdb->cache->GUID_index_attribute);
2283                 if (key_val == NULL) {
2284                         talloc_free(list);
2285                         return ldb_module_operr(module);
2286                 }
2287
2288                 if (key_val->length != LTDB_GUID_SIZE) {
2289                         talloc_free(list);
2290                         return ldb_module_operr(module);
2291                 }
2292
2293                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2294                                         *key_val, ldb_val_equal_exact_ordered,
2295                                         exact, next);
2296
2297                 /*
2298                  * Give a warning rather than fail, this could be a
2299                  * duplicate value in the record allowed by a caller
2300                  * forcing in the value with
2301                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2302                  */
2303                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2304                         /* This can't fail, gives a default at worst */
2305                         const struct ldb_schema_attribute *attr
2306                                 = ldb_schema_attribute_by_name(
2307                                         ldb,
2308                                         ltdb->cache->GUID_index_attribute);
2309                         struct ldb_val v;
2310                         ret = attr->syntax->ldif_write_fn(ldb, list,
2311                                                           exact, &v);
2312                         if (ret == LDB_SUCCESS) {
2313                                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2314                                           __location__
2315                                           ": duplicate attribute value in %s "
2316                                           "for index on %s, "
2317                                           "duplicate of %s %*.*s in %s",
2318                                           ldb_dn_get_linearized(msg->dn),
2319                                           el->name,
2320                                           ltdb->cache->GUID_index_attribute,
2321                                           (int)v.length,
2322                                           (int)v.length,
2323                                           v.data,
2324                                           ldb_dn_get_linearized(dn_key));
2325                         }
2326                 }
2327
2328                 if (next == NULL) {
2329                         next = &list->dn[list->count];
2330                 } else {
2331                         memmove(&next[1], next,
2332                                 sizeof(*next) * (list->count - (next - list->dn)));
2333                 }
2334                 *next = ldb_val_dup(list->dn, key_val);
2335                 if (next->data == NULL) {
2336                         talloc_free(list);
2337                         return ldb_module_operr(module);
2338                 }
2339         }
2340         list->count++;
2341
2342         ret = ltdb_dn_list_store(module, dn_key, list);
2343
2344         talloc_free(list);
2345
2346         return ret;
2347 }
2348
2349 /*
2350   add index entries for one elements in a message
2351  */
2352 static int ltdb_index_add_el(struct ldb_module *module,
2353                              struct ltdb_private *ltdb,
2354                              const struct ldb_message *msg,
2355                              struct ldb_message_element *el)
2356 {
2357         unsigned int i;
2358         for (i = 0; i < el->num_values; i++) {
2359                 int ret = ltdb_index_add1(module, ltdb,
2360                                           msg, el, i);
2361                 if (ret != LDB_SUCCESS) {
2362                         return ret;
2363                 }
2364         }
2365
2366         return LDB_SUCCESS;
2367 }
2368
2369 /*
2370   add index entries for all elements in a message
2371  */
2372 static int ltdb_index_add_all(struct ldb_module *module,
2373                               struct ltdb_private *ltdb,
2374                               const struct ldb_message *msg)
2375 {
2376         struct ldb_message_element *elements = msg->elements;
2377         unsigned int i;
2378         const char *dn_str;
2379         int ret;
2380
2381         if (ldb_dn_is_special(msg->dn)) {
2382                 return LDB_SUCCESS;
2383         }
2384
2385         dn_str = ldb_dn_get_linearized(msg->dn);
2386         if (dn_str == NULL) {
2387                 return LDB_ERR_OPERATIONS_ERROR;
2388         }
2389
2390         ret = ltdb_write_index_dn_guid(module, msg, 1);
2391         if (ret != LDB_SUCCESS) {
2392                 return ret;
2393         }
2394
2395         if (!ltdb->cache->attribute_indexes) {
2396                 /* no indexed fields */
2397                 return LDB_SUCCESS;
2398         }
2399
2400         for (i = 0; i < msg->num_elements; i++) {
2401                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
2402                         continue;
2403                 }
2404                 ret = ltdb_index_add_el(module, ltdb,
2405                                         msg, &elements[i]);
2406                 if (ret != LDB_SUCCESS) {
2407                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2408                         ldb_asprintf_errstring(ldb,
2409                                                __location__ ": Failed to re-index %s in %s - %s",
2410                                                elements[i].name, dn_str,
2411                                                ldb_errstring(ldb));
2412                         return ret;
2413                 }
2414         }
2415
2416         return LDB_SUCCESS;
2417 }
2418
2419
2420 /*
2421   insert a DN index for a message
2422 */
2423 static int ltdb_modify_index_dn(struct ldb_module *module,
2424                                 struct ltdb_private *ltdb,
2425                                 const struct ldb_message *msg,
2426                                 struct ldb_dn *dn,
2427                                 const char *index, int add)
2428 {
2429         struct ldb_message_element el;
2430         struct ldb_val val;
2431         int ret;
2432
2433         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2434         if (val.data == NULL) {
2435                 const char *dn_str = ldb_dn_get_linearized(dn);
2436                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2437                                        __location__
2438                                        ": Failed to modify %s "
2439                                        "against %s in %s: failed "
2440                                        "to get casefold DN",
2441                                        index,
2442                                        ltdb->cache->GUID_index_attribute,
2443                                        dn_str);
2444                 return LDB_ERR_OPERATIONS_ERROR;
2445         }
2446
2447         val.length = strlen((char *)val.data);
2448         el.name = index;
2449         el.values = &val;
2450         el.num_values = 1;
2451
2452         if (add) {
2453                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2454         } else { /* delete */
2455                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2456         }
2457
2458         if (ret != LDB_SUCCESS) {
2459                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2460                 const char *dn_str = ldb_dn_get_linearized(dn);
2461                 ldb_asprintf_errstring(ldb,
2462                                        __location__
2463                                        ": Failed to modify %s "
2464                                        "against %s in %s - %s",
2465                                        index,
2466                                        ltdb->cache->GUID_index_attribute,
2467                                        dn_str, ldb_errstring(ldb));
2468                 return ret;
2469         }
2470         return ret;
2471 }
2472
2473 /*
2474   insert a one level index for a message
2475 */
2476 static int ltdb_index_onelevel(struct ldb_module *module,
2477                                const struct ldb_message *msg, int add)
2478 {
2479         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2480                                                     struct ltdb_private);
2481         struct ldb_dn *pdn;
2482         int ret;
2483
2484         /* We index for ONE Level only if requested */
2485         if (!ltdb->cache->one_level_indexes) {
2486                 return LDB_SUCCESS;
2487         }
2488
2489         pdn = ldb_dn_get_parent(module, msg->dn);
2490         if (pdn == NULL) {
2491                 return LDB_ERR_OPERATIONS_ERROR;
2492         }
2493         ret = ltdb_modify_index_dn(module, ltdb,
2494                                    msg, pdn, LTDB_IDXONE, add);
2495
2496         talloc_free(pdn);
2497
2498         return ret;
2499 }
2500
2501 /*
2502   insert a one level index for a message
2503 */
2504 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2505                                     const struct ldb_message *msg,
2506                                     int add)
2507 {
2508         int ret;
2509         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2510                                                     struct ltdb_private);
2511
2512         /* We index for DN only if using a GUID index */
2513         if (ltdb->cache->GUID_index_attribute == NULL) {
2514                 return LDB_SUCCESS;
2515         }
2516
2517         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2518                                    LTDB_IDXDN, add);
2519
2520         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2521                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2522                                        "Entry %s already exists",
2523                                        ldb_dn_get_linearized(msg->dn));
2524                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2525         }
2526         return ret;
2527 }
2528
2529 /*
2530   add the index entries for a new element in a record
2531   The caller guarantees that these element values are not yet indexed
2532 */
2533 int ltdb_index_add_element(struct ldb_module *module,
2534                            struct ltdb_private *ltdb,
2535                            const struct ldb_message *msg,
2536                            struct ldb_message_element *el)
2537 {
2538         if (ldb_dn_is_special(msg->dn)) {
2539                 return LDB_SUCCESS;
2540         }
2541         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2542                 return LDB_SUCCESS;
2543         }
2544         return ltdb_index_add_el(module, ltdb, msg, el);
2545 }
2546
2547 /*
2548   add the index entries for a new record
2549 */
2550 int ltdb_index_add_new(struct ldb_module *module,
2551                        struct ltdb_private *ltdb,
2552                        const struct ldb_message *msg)
2553 {
2554         int ret;
2555
2556         if (ldb_dn_is_special(msg->dn)) {
2557                 return LDB_SUCCESS;
2558         }
2559
2560         ret = ltdb_index_add_all(module, ltdb, msg);
2561         if (ret != LDB_SUCCESS) {
2562                 /*
2563                  * Because we can't trust the caller to be doing
2564                  * transactions properly, clean up any index for this
2565                  * entry rather than relying on a transaction
2566                  * cleanup
2567                  */
2568
2569                 ltdb_index_delete(module, msg);
2570                 return ret;
2571         }
2572
2573         ret = ltdb_index_onelevel(module, msg, 1);
2574         if (ret != LDB_SUCCESS) {
2575                 /*
2576                  * Because we can't trust the caller to be doing
2577                  * transactions properly, clean up any index for this
2578                  * entry rather than relying on a transaction
2579                  * cleanup
2580                  */
2581                 ltdb_index_delete(module, msg);
2582                 return ret;
2583         }
2584         return ret;
2585 }
2586
2587
2588 /*
2589   delete an index entry for one message element
2590 */
2591 int ltdb_index_del_value(struct ldb_module *module,
2592                          struct ltdb_private *ltdb,
2593                          const struct ldb_message *msg,
2594                          struct ldb_message_element *el, unsigned int v_idx)
2595 {
2596         struct ldb_context *ldb;
2597         struct ldb_dn *dn_key;
2598         const char *dn_str;
2599         int ret, i;
2600         unsigned int j;
2601         struct dn_list *list;
2602         struct ldb_dn *dn = msg->dn;
2603         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2604
2605         ldb = ldb_module_get_ctx(module);
2606
2607         dn_str = ldb_dn_get_linearized(dn);
2608         if (dn_str == NULL) {
2609                 return LDB_ERR_OPERATIONS_ERROR;
2610         }
2611
2612         if (dn_str[0] == '@') {
2613                 return LDB_SUCCESS;
2614         }
2615
2616         dn_key = ltdb_index_key(ldb, ltdb,
2617                                 el->name, &el->values[v_idx],
2618                                 NULL, &truncation);
2619         /*
2620          * We ignore key truncation in ltdb_index_add1() so
2621          * match that by ignoring it here as well
2622          *
2623          * Multiple values are legitimate and accepted
2624          */
2625         if (!dn_key) {
2626                 return LDB_ERR_OPERATIONS_ERROR;
2627         }
2628
2629         list = talloc_zero(dn_key, struct dn_list);
2630         if (list == NULL) {
2631                 talloc_free(dn_key);
2632                 return LDB_ERR_OPERATIONS_ERROR;
2633         }
2634
2635         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2636         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2637                 /* it wasn't indexed. Did we have an earlier error? If we did then
2638                    its gone now */
2639                 talloc_free(dn_key);
2640                 return LDB_SUCCESS;
2641         }
2642
2643         if (ret != LDB_SUCCESS) {
2644                 talloc_free(dn_key);
2645                 return ret;
2646         }
2647
2648         /*
2649          * Find one of the values matching this message to remove
2650          */
2651         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2652         if (i == -1) {
2653                 /* nothing to delete */
2654                 talloc_free(dn_key);
2655                 return LDB_SUCCESS;
2656         }
2657
2658         j = (unsigned int) i;
2659         if (j != list->count - 1) {
2660                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2661         }
2662         list->count--;
2663         if (list->count == 0) {
2664                 talloc_free(list->dn);
2665                 list->dn = NULL;
2666         } else {
2667                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2668         }
2669
2670         ret = ltdb_dn_list_store(module, dn_key, list);
2671
2672         talloc_free(dn_key);
2673
2674         return ret;
2675 }
2676
2677 /*
2678   delete the index entries for a element
2679   return -1 on failure
2680 */
2681 int ltdb_index_del_element(struct ldb_module *module,
2682                            struct ltdb_private *ltdb,
2683                            const struct ldb_message *msg,
2684                            struct ldb_message_element *el)
2685 {
2686         const char *dn_str;
2687         int ret;
2688         unsigned int i;
2689
2690         if (!ltdb->cache->attribute_indexes) {
2691                 /* no indexed fields */
2692                 return LDB_SUCCESS;
2693         }
2694
2695         dn_str = ldb_dn_get_linearized(msg->dn);
2696         if (dn_str == NULL) {
2697                 return LDB_ERR_OPERATIONS_ERROR;
2698         }
2699
2700         if (dn_str[0] == '@') {
2701                 return LDB_SUCCESS;
2702         }
2703
2704         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2705                 return LDB_SUCCESS;
2706         }
2707         for (i = 0; i < el->num_values; i++) {
2708                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2709                 if (ret != LDB_SUCCESS) {
2710                         return ret;
2711                 }
2712         }
2713
2714         return LDB_SUCCESS;
2715 }
2716
2717 /*
2718   delete the index entries for a record
2719   return -1 on failure
2720 */
2721 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2722 {
2723         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2724         int ret;
2725         unsigned int i;
2726
2727         if (ldb_dn_is_special(msg->dn)) {
2728                 return LDB_SUCCESS;
2729         }
2730
2731         ret = ltdb_index_onelevel(module, msg, 0);
2732         if (ret != LDB_SUCCESS) {
2733                 return ret;
2734         }
2735
2736         ret = ltdb_write_index_dn_guid(module, msg, 0);
2737         if (ret != LDB_SUCCESS) {
2738                 return ret;
2739         }
2740
2741         if (!ltdb->cache->attribute_indexes) {
2742                 /* no indexed fields */
2743                 return LDB_SUCCESS;
2744         }
2745
2746         for (i = 0; i < msg->num_elements; i++) {
2747                 ret = ltdb_index_del_element(module, ltdb,
2748                                              msg, &msg->elements[i]);
2749                 if (ret != LDB_SUCCESS) {
2750                         return ret;
2751                 }
2752         }
2753
2754         return LDB_SUCCESS;
2755 }
2756
2757
2758 /*
2759   traversal function that deletes all @INDEX records in the in-memory
2760   TDB.
2761
2762   This does not touch the actual DB, that is done at transaction
2763   commit, which in turn greatly reduces DB churn as we will likely
2764   be able to do a direct update into the old record.
2765 */
2766 static int delete_index(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, void *state)
2767 {
2768         struct ldb_module *module = state;
2769         const char *dnstr = "DN=" LTDB_INDEX ":";
2770         struct dn_list list;
2771         struct ldb_dn *dn;
2772         struct ldb_val v;
2773         int ret;
2774
2775         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2776                 return 0;
2777         }
2778         /* we need to put a empty list in the internal tdb for this
2779          * index entry */
2780         list.dn = NULL;
2781         list.count = 0;
2782
2783         /* the offset of 3 is to remove the DN= prefix. */
2784         v.data = key.data + 3;
2785         v.length = strnlen((char *)key.data, key.length) - 3;
2786
2787         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2788
2789         /*
2790          * This does not actually touch the DB quite yet, just
2791          * the in-memory index cache
2792          */
2793         ret = ltdb_dn_list_store(module, dn, &list);
2794         if (ret != LDB_SUCCESS) {
2795                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2796                                        "Unable to store null index for %s\n",
2797                                                 ldb_dn_get_linearized(dn));
2798                 talloc_free(dn);
2799                 return -1;
2800         }
2801         talloc_free(dn);
2802         return 0;
2803 }
2804
2805 /*
2806   traversal function that adds @INDEX records during a re index TODO wrong comment
2807 */
2808 static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2809 {
2810         struct ldb_context *ldb;
2811         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2812         struct ldb_module *module = ctx->module;
2813         struct ldb_message *msg;
2814         unsigned int nb_elements_in_db;
2815         int ret;
2816         TDB_DATA key2;
2817         bool is_record;
2818         TDB_DATA key = {
2819                 .dptr = ldb_key.data,
2820                 .dsize = ldb_key.length
2821         };
2822         
2823         ldb = ldb_module_get_ctx(module);
2824
2825         if (key.dsize > 4 &&
2826             memcmp(key.dptr, "DN=@", 4) == 0) {
2827                 return 0;
2828         }
2829
2830         is_record = ltdb_key_is_record(key);
2831         if (is_record == false) {
2832                 return 0;
2833         }
2834         
2835         msg = ldb_msg_new(module);
2836         if (msg == NULL) {
2837                 return -1;
2838         }
2839
2840         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2841                                                    msg,
2842                                                    NULL, 0,
2843                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2844                                                    &nb_elements_in_db);
2845         if (ret != 0) {
2846                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2847                                                 ldb_dn_get_linearized(msg->dn));
2848                 ctx->error = ret;
2849                 talloc_free(msg);
2850                 return -1;
2851         }
2852
2853         if (msg->dn == NULL) {
2854                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2855                           "Refusing to re-index as GUID "
2856                           "key %*.*s with no DN\n",
2857                           (int)key.dsize, (int)key.dsize,
2858                           (char *)key.dptr);
2859                 talloc_free(msg);
2860                 return -1;
2861         }
2862         
2863         /* check if the DN key has changed, perhaps due to the case
2864            insensitivity of an element changing, or a change from DN
2865            to GUID keys */
2866         key2 = ltdb_key_msg(module, msg, msg);
2867         if (key2.dptr == NULL) {
2868                 /* probably a corrupt record ... darn */
2869                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2870                                                 ldb_dn_get_linearized(msg->dn));
2871                 talloc_free(msg);
2872                 return 0;
2873         }
2874         if (key.dsize != key2.dsize ||
2875             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2876                 struct ldb_val ldb_key2 = {
2877                         .data = key2.dptr,
2878                         .length = key2.dsize
2879                 };
2880                 ltdb->kv_ops->update_in_iterate(ltdb, ldb_key, ldb_key2, val, ctx);
2881         }
2882         talloc_free(key2.dptr);
2883
2884         talloc_free(msg);
2885
2886         ctx->count++;
2887         if (ctx->count % 10000 == 0) {
2888                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2889                           "Reindexing: re-keyed %u records so far",
2890                           ctx->count);
2891         }
2892
2893         return 0;
2894 }
2895
2896 /*
2897   traversal function that adds @INDEX records during a re index
2898 */
2899 static int re_index(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
2900 {
2901         struct ldb_context *ldb;
2902         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2903         struct ldb_module *module = ctx->module;
2904         struct ldb_message *msg;
2905         unsigned int nb_elements_in_db;
2906         TDB_DATA key = {
2907                 .dptr = ldb_key.data,
2908                 .dsize = ldb_key.length
2909         };
2910         int ret;
2911         bool is_record;
2912         
2913         ldb = ldb_module_get_ctx(module);
2914
2915         if (key.dsize > 4 &&
2916             memcmp(key.dptr, "DN=@", 4) == 0) {
2917                 return 0;
2918         }
2919
2920         is_record = ltdb_key_is_record(key);
2921         if (is_record == false) {
2922                 return 0;
2923         }
2924         
2925         msg = ldb_msg_new(module);
2926         if (msg == NULL) {
2927                 return -1;
2928         }
2929
2930         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2931                                                    msg,
2932                                                    NULL, 0,
2933                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2934                                                    &nb_elements_in_db);
2935         if (ret != 0) {
2936                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2937                                                 ldb_dn_get_linearized(msg->dn));
2938                 ctx->error = ret;
2939                 talloc_free(msg);
2940                 return -1;
2941         }
2942
2943         if (msg->dn == NULL) {
2944                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2945                           "Refusing to re-index as GUID "
2946                           "key %*.*s with no DN\n",
2947                           (int)key.dsize, (int)key.dsize,
2948                           (char *)key.dptr);
2949                 talloc_free(msg);
2950                 return -1;
2951         }
2952
2953         ret = ltdb_index_onelevel(module, msg, 1);
2954         if (ret != LDB_SUCCESS) {
2955                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2956                           "Adding special ONE LEVEL index failed (%s)!",
2957                                                 ldb_dn_get_linearized(msg->dn));
2958                 talloc_free(msg);
2959                 return -1;
2960         }
2961
2962         ret = ltdb_index_add_all(module, ltdb, msg);
2963
2964         if (ret != LDB_SUCCESS) {
2965                 ctx->error = ret;
2966                 talloc_free(msg);
2967                 return -1;
2968         }
2969
2970         talloc_free(msg);
2971
2972         ctx->count++;
2973         if (ctx->count % 10000 == 0) {
2974                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2975                           "Reindexing: re-indexed %u records so far",
2976                           ctx->count);
2977         }
2978
2979         return 0;
2980 }
2981
2982 /*
2983   force a complete reindex of the database
2984 */
2985 int ltdb_reindex(struct ldb_module *module)
2986 {
2987         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2988         int ret;
2989         struct ltdb_reindex_context ctx;
2990
2991         /*
2992          * Only triggered after a modification, but make clear we do
2993          * not re-index a read-only DB
2994          */
2995         if (ltdb->read_only) {
2996                 return LDB_ERR_UNWILLING_TO_PERFORM;
2997         }
2998
2999         if (ltdb_cache_reload(module) != 0) {
3000                 return LDB_ERR_OPERATIONS_ERROR;
3001         }
3002
3003         /*
3004          * Ensure we read (and so remove) the entries from the real
3005          * DB, no values stored so far are any use as we want to do a
3006          * re-index
3007          */
3008         ltdb_index_transaction_cancel(module);
3009
3010         ret = ltdb_index_transaction_start(module);
3011         if (ret != LDB_SUCCESS) {
3012                 return ret;
3013         }
3014
3015         /* first traverse the database deleting any @INDEX records by
3016          * putting NULL entries in the in-memory tdb
3017          */
3018         ret = ltdb->kv_ops->iterate(ltdb, delete_index, module);
3019         if (ret < 0) {
3020                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3021                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3022                                        ldb_errstring(ldb));
3023                 return LDB_ERR_OPERATIONS_ERROR;
3024         }
3025
3026         ctx.module = module;
3027         ctx.error = 0;
3028         ctx.count = 0;
3029
3030         ret = ltdb->kv_ops->iterate(ltdb, re_key, &ctx);
3031         if (ret < 0) {
3032                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3033                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3034                                        ldb_errstring(ldb));
3035                 return LDB_ERR_OPERATIONS_ERROR;
3036         }
3037
3038         if (ctx.error != LDB_SUCCESS) {
3039                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3040                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3041                 return ctx.error;
3042         }
3043
3044         ctx.error = 0;
3045         ctx.count = 0;
3046
3047         /* now traverse adding any indexes for normal LDB records */
3048         ret = ltdb->kv_ops->iterate(ltdb, re_index, &ctx);
3049         if (ret < 0) {
3050                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3051                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3052                                        ldb_errstring(ldb));
3053                 return LDB_ERR_OPERATIONS_ERROR;
3054         }
3055
3056         if (ctx.error != LDB_SUCCESS) {
3057                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3058                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3059                 return ctx.error;
3060         }
3061
3062         if (ctx.count > 10000) {
3063                 ldb_debug(ldb_module_get_ctx(module),
3064                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
3065                           "final index write-out will be in transaction commit",
3066                           ltdb->kv_ops->name(ltdb));
3067         }
3068         return LDB_SUCCESS;
3069 }