ldb: Elaborate on ldb_kv_search_indexed() comments
[metze/samba/wip.git] / lib / ldb / ldb_key_value / ldb_kv_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb key value backend - indexing
28  *
29  *  Description: indexing routines for ldb key value backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_kv.h"
148 #include "../ldb_tdb/ldb_tdb.h"
149 #include "ldb_private.h"
150 #include "lib/util/binsearch.h"
151
152 struct dn_list {
153         unsigned int count;
154         struct ldb_val *dn;
155         /*
156          * Do not optimise the intersection of this list,
157          * we must never return an entry not in this
158          * list.  This allows the index for
159          * SCOPE_ONELEVEL to be trusted.
160          */
161         bool strict;
162 };
163
164 struct ldb_kv_idxptr {
165         struct tdb_context *itdb;
166         int error;
167 };
168
169 enum key_truncation {
170         KEY_NOT_TRUNCATED,
171         KEY_TRUNCATED,
172 };
173
174 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
175                                       const struct ldb_message *msg,
176                                       int add);
177 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
178                                    struct ldb_kv_private *ldb_kv,
179                                    struct ldb_dn *base_dn,
180                                    struct dn_list *dn_list,
181                                    enum key_truncation *truncation);
182
183 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ldb_kv,
184                                 struct dn_list *list);
185
186 /* we put a @IDXVERSION attribute on index entries. This
187    allows us to tell if it was written by an older version
188 */
189 #define LDB_KV_INDEXING_VERSION 2
190
191 #define LDB_KV_GUID_INDEXING_VERSION 3
192
193 static unsigned ldb_kv_max_key_length(struct ldb_kv_private *ldb_kv)
194 {
195         if (ldb_kv->max_key_length == 0) {
196                 return UINT_MAX;
197         }
198         return ldb_kv->max_key_length;
199 }
200
201 /* enable the idxptr mode when transactions start */
202 int ldb_kv_index_transaction_start(struct ldb_module *module)
203 {
204         struct ldb_kv_private *ldb_kv = talloc_get_type(
205             ldb_module_get_private(module), struct ldb_kv_private);
206         ldb_kv->idxptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
207         if (ldb_kv->idxptr == NULL) {
208                 return ldb_oom(ldb_module_get_ctx(module));
209         }
210
211         return LDB_SUCCESS;
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
219                                          const struct ldb_val *v2)
220 {
221         if (v1->length > v2->length) {
222                 return -1;
223         }
224         if (v1->length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1->data, v2->data, v1->length);
228 }
229
230 /*
231   see if two ldb_val structures contain exactly the same data
232   return -1 or 1 for a mismatch, 0 for match
233 */
234 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
235                                        const struct ldb_val *v2)
236 {
237         if (v1.length > v2->length) {
238                 return -1;
239         }
240         if (v1.length < v2->length) {
241                 return 1;
242         }
243         return memcmp(v1.data, v2->data, v1.length);
244 }
245
246
247 /*
248   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
249   binary-safe comparison for the 'dn' returns -1 if not found
250
251   This is therefore safe when the value is a GUID in the future
252  */
253 static int ldb_kv_dn_list_find_val(struct ldb_kv_private *ldb_kv,
254                                    const struct dn_list *list,
255                                    const struct ldb_val *v)
256 {
257         unsigned int i;
258         struct ldb_val *exact = NULL, *next = NULL;
259
260         if (ldb_kv->cache->GUID_index_attribute == NULL) {
261                 for (i=0; i<list->count; i++) {
262                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
263                                 return i;
264                         }
265                 }
266                 return -1;
267         }
268
269         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
270                                 *v, ldb_val_equal_exact_ordered,
271                                 exact, next);
272         if (exact == NULL) {
273                 return -1;
274         }
275         /* Not required, but keeps the compiler quiet */
276         if (next != NULL) {
277                 return -1;
278         }
279
280         i = exact - list->dn;
281         return i;
282 }
283
284 /*
285   find a entry in a dn_list. Uses a case sensitive comparison with the dn
286   returns -1 if not found
287  */
288 static int ldb_kv_dn_list_find_msg(struct ldb_kv_private *ldb_kv,
289                                    struct dn_list *list,
290                                    const struct ldb_message *msg)
291 {
292         struct ldb_val v;
293         const struct ldb_val *key_val;
294         if (ldb_kv->cache->GUID_index_attribute == NULL) {
295                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
296                 v.data = discard_const_p(unsigned char, dn_str);
297                 v.length = strlen(dn_str);
298         } else {
299                 key_val = ldb_msg_find_ldb_val(
300                     msg, ldb_kv->cache->GUID_index_attribute);
301                 if (key_val == NULL) {
302                         return -1;
303                 }
304                 v = *key_val;
305         }
306         return ldb_kv_dn_list_find_val(ldb_kv, list, &v);
307 }
308
309 /*
310   this is effectively a cast function, but with lots of paranoia
311   checks and also copes with CPUs that are fussy about pointer
312   alignment
313  */
314 static struct dn_list *ldb_kv_index_idxptr(struct ldb_module *module,
315                                            TDB_DATA rec,
316                                            bool check_parent)
317 {
318         struct dn_list *list;
319         if (rec.dsize != sizeof(void *)) {
320                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
321                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
322                 return NULL;
323         }
324         /* note that we can't just use a cast here, as rec.dptr may
325            not be aligned sufficiently for a pointer. A cast would cause
326            platforms like some ARM CPUs to crash */
327         memcpy(&list, rec.dptr, sizeof(void *));
328         list = talloc_get_type(list, struct dn_list);
329         if (list == NULL) {
330                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
331                                        "Bad type '%s' for idxptr",
332                                        talloc_get_name(list));
333                 return NULL;
334         }
335         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
336                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
337                                        "Bad parent '%s' for idxptr",
338                                        talloc_get_name(talloc_parent(list->dn)));
339                 return NULL;
340         }
341         return list;
342 }
343
344 /*
345   return the @IDX list in an index entry for a dn as a
346   struct dn_list
347  */
348 static int ldb_kv_dn_list_load(struct ldb_module *module,
349                                struct ldb_kv_private *ldb_kv,
350                                struct ldb_dn *dn,
351                                struct dn_list *list)
352 {
353         struct ldb_message *msg;
354         int ret, version;
355         struct ldb_message_element *el;
356         TDB_DATA rec;
357         struct dn_list *list2;
358         TDB_DATA key;
359
360         list->dn = NULL;
361         list->count = 0;
362
363         /* see if we have any in-memory index entries */
364         if (ldb_kv->idxptr == NULL || ldb_kv->idxptr->itdb == NULL) {
365                 goto normal_index;
366         }
367
368         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
369         key.dsize = strlen((char *)key.dptr);
370
371         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
372         if (rec.dptr == NULL) {
373                 goto normal_index;
374         }
375
376         /* we've found an in-memory index entry */
377         list2 = ldb_kv_index_idxptr(module, rec, true);
378         if (list2 == NULL) {
379                 free(rec.dptr);
380                 return LDB_ERR_OPERATIONS_ERROR;
381         }
382         free(rec.dptr);
383
384         *list = *list2;
385         return LDB_SUCCESS;
386
387 normal_index:
388         msg = ldb_msg_new(list);
389         if (msg == NULL) {
390                 return LDB_ERR_OPERATIONS_ERROR;
391         }
392
393         ret = ldb_kv_search_dn1(module,
394                                 dn,
395                                 msg,
396                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
397                                     LDB_UNPACK_DATA_FLAG_NO_DN);
398         if (ret != LDB_SUCCESS) {
399                 talloc_free(msg);
400                 return ret;
401         }
402
403         el = ldb_msg_find_element(msg, LDB_KV_IDX);
404         if (!el) {
405                 talloc_free(msg);
406                 return LDB_SUCCESS;
407         }
408
409         version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
410
411         /*
412          * we avoid copying the strings by stealing the list.  We have
413          * to steal msg onto el->values (which looks odd) because we
414          * asked for the memory to be allocated on msg, not on each
415          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
416          */
417         if (ldb_kv->cache->GUID_index_attribute == NULL) {
418                 /* check indexing version number */
419                 if (version != LDB_KV_INDEXING_VERSION) {
420                         ldb_debug_set(ldb_module_get_ctx(module),
421                                       LDB_DEBUG_ERROR,
422                                       "Wrong DN index version %d "
423                                       "expected %d for %s",
424                                       version, LDB_KV_INDEXING_VERSION,
425                                       ldb_dn_get_linearized(dn));
426                         talloc_free(msg);
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 talloc_steal(el->values, msg);
431                 list->dn = talloc_steal(list, el->values);
432                 list->count = el->num_values;
433         } else {
434                 unsigned int i;
435                 if (version != LDB_KV_GUID_INDEXING_VERSION) {
436                         /* This is quite likely during the DB startup
437                            on first upgrade to using a GUID index */
438                         ldb_debug_set(ldb_module_get_ctx(module),
439                                       LDB_DEBUG_ERROR,
440                                       "Wrong GUID index version %d "
441                                       "expected %d for %s",
442                                       version, LDB_KV_GUID_INDEXING_VERSION,
443                                       ldb_dn_get_linearized(dn));
444                         talloc_free(msg);
445                         return LDB_ERR_OPERATIONS_ERROR;
446                 }
447
448                 if (el->num_values == 0) {
449                         talloc_free(msg);
450                         return LDB_ERR_OPERATIONS_ERROR;
451                 }
452
453                 if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0) {
454                         talloc_free(msg);
455                         return LDB_ERR_OPERATIONS_ERROR;
456                 }
457
458                 list->count = el->values[0].length / LDB_KV_GUID_SIZE;
459                 list->dn = talloc_array(list, struct ldb_val, list->count);
460                 if (list->dn == NULL) {
461                         talloc_free(msg);
462                         return LDB_ERR_OPERATIONS_ERROR;
463                 }
464
465                 /*
466                  * The actual data is on msg, due to
467                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
468                  */
469                 talloc_steal(list->dn, msg);
470                 for (i = 0; i < list->count; i++) {
471                         list->dn[i].data
472                                 = &el->values[0].data[i * LDB_KV_GUID_SIZE];
473                         list->dn[i].length = LDB_KV_GUID_SIZE;
474                 }
475         }
476
477         /* We don't need msg->elements any more */
478         talloc_free(msg->elements);
479         return LDB_SUCCESS;
480 }
481
482 int ldb_kv_key_dn_from_idx(struct ldb_module *module,
483                            struct ldb_kv_private *ldb_kv,
484                            TALLOC_CTX *mem_ctx,
485                            struct ldb_dn *dn,
486                            struct ldb_val *ldb_key)
487 {
488         struct ldb_context *ldb = ldb_module_get_ctx(module);
489         int ret;
490         int index = 0;
491         enum key_truncation truncation = KEY_NOT_TRUNCATED;
492         struct dn_list *list = talloc(mem_ctx, struct dn_list);
493         if (list == NULL) {
494                 ldb_oom(ldb);
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ldb_kv_index_dn_base_dn(module, ldb_kv, dn, list, &truncation);
499         if (ret != LDB_SUCCESS) {
500                 TALLOC_FREE(list);
501                 return ret;
502         }
503
504         if (list->count == 0) {
505                 TALLOC_FREE(list);
506                 return LDB_ERR_NO_SUCH_OBJECT;
507         }
508
509         if (list->count > 1 && truncation == KEY_NOT_TRUNCATED)  {
510                 const char *dn_str = ldb_dn_get_linearized(dn);
511                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
512                                        __location__
513                                        ": Failed to read DN index "
514                                        "against %s for %s: too many "
515                                        "values (%u > 1)",
516                                        ldb_kv->cache->GUID_index_attribute,
517                                        dn_str,
518                                        list->count);
519                 TALLOC_FREE(list);
520                 return LDB_ERR_CONSTRAINT_VIOLATION;
521         }
522
523         if (list->count > 0 && truncation == KEY_TRUNCATED)  {
524                 /*
525                  * DN key has been truncated, need to inspect the actual
526                  * records to locate the actual DN
527                  */
528                 int i;
529                 index = -1;
530                 for (i=0; i < list->count; i++) {
531                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
532                         struct ldb_val key = {
533                                 .data = guid_key,
534                                 .length = sizeof(guid_key)
535                         };
536                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
537                         struct ldb_message *rec = ldb_msg_new(ldb);
538                         if (rec == NULL) {
539                                 TALLOC_FREE(list);
540                                 return LDB_ERR_OPERATIONS_ERROR;
541                         }
542
543                         ret = ldb_kv_idx_to_key(
544                             module, ldb_kv, ldb, &list->dn[i], &key);
545                         if (ret != LDB_SUCCESS) {
546                                 TALLOC_FREE(list);
547                                 TALLOC_FREE(rec);
548                                 return ret;
549                         }
550
551                         ret =
552                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
553                         if (key.data != guid_key) {
554                                 TALLOC_FREE(key.data);
555                         }
556                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
557                                 /*
558                                  * the record has disappeared?
559                                  * yes, this can happen
560                                  */
561                                 TALLOC_FREE(rec);
562                                 continue;
563                         }
564
565                         if (ret != LDB_SUCCESS) {
566                                 /* an internal error */
567                                 TALLOC_FREE(rec);
568                                 TALLOC_FREE(list);
569                                 return LDB_ERR_OPERATIONS_ERROR;
570                         }
571
572                         /*
573                          * We found the actual DN that we wanted from in the
574                          * multiple values that matched the index
575                          * (due to truncation), so return that.
576                          *
577                          */
578                         if (ldb_dn_compare(dn, rec->dn) == 0) {
579                                 index = i;
580                                 TALLOC_FREE(rec);
581                                 break;
582                         }
583                 }
584
585                 /*
586                  * We matched the index but the actual DN we wanted
587                  * was not here.
588                  */
589                 if (index == -1) {
590                         TALLOC_FREE(list);
591                         return LDB_ERR_NO_SUCH_OBJECT;
592                 }
593         }
594
595         /* The ldb_key memory is allocated by the caller */
596         ret = ldb_kv_guid_to_key(module, ldb_kv, &list->dn[index], ldb_key);
597         TALLOC_FREE(list);
598
599         if (ret != LDB_SUCCESS) {
600                 return LDB_ERR_OPERATIONS_ERROR;
601         }
602
603         return LDB_SUCCESS;
604 }
605
606
607
608 /*
609   save a dn_list into a full @IDX style record
610  */
611 static int ldb_kv_dn_list_store_full(struct ldb_module *module,
612                                      struct ldb_kv_private *ldb_kv,
613                                      struct ldb_dn *dn,
614                                      struct dn_list *list)
615 {
616         struct ldb_message *msg;
617         int ret;
618
619         msg = ldb_msg_new(module);
620         if (!msg) {
621                 return ldb_module_oom(module);
622         }
623
624         msg->dn = dn;
625
626         if (list->count == 0) {
627                 ret = ldb_kv_delete_noindex(module, msg);
628                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
629                         ret = LDB_SUCCESS;
630                 }
631                 talloc_free(msg);
632                 return ret;
633         }
634
635         if (ldb_kv->cache->GUID_index_attribute == NULL) {
636                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
637                                       LDB_KV_INDEXING_VERSION);
638                 if (ret != LDB_SUCCESS) {
639                         talloc_free(msg);
640                         return ldb_module_oom(module);
641                 }
642         } else {
643                 ret = ldb_msg_add_fmt(msg, LDB_KV_IDXVERSION, "%u",
644                                       LDB_KV_GUID_INDEXING_VERSION);
645                 if (ret != LDB_SUCCESS) {
646                         talloc_free(msg);
647                         return ldb_module_oom(module);
648                 }
649         }
650
651         if (list->count > 0) {
652                 struct ldb_message_element *el;
653
654                 ret = ldb_msg_add_empty(msg, LDB_KV_IDX, LDB_FLAG_MOD_ADD, &el);
655                 if (ret != LDB_SUCCESS) {
656                         talloc_free(msg);
657                         return ldb_module_oom(module);
658                 }
659
660                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
661                         el->values = list->dn;
662                         el->num_values = list->count;
663                 } else {
664                         struct ldb_val v;
665                         unsigned int i;
666                         el->values = talloc_array(msg,
667                                                   struct ldb_val, 1);
668                         if (el->values == NULL) {
669                                 talloc_free(msg);
670                                 return ldb_module_oom(module);
671                         }
672
673                         v.data = talloc_array_size(el->values,
674                                                    list->count,
675                                                    LDB_KV_GUID_SIZE);
676                         if (v.data == NULL) {
677                                 talloc_free(msg);
678                                 return ldb_module_oom(module);
679                         }
680
681                         v.length = talloc_get_size(v.data);
682
683                         for (i = 0; i < list->count; i++) {
684                                 if (list->dn[i].length !=
685                                     LDB_KV_GUID_SIZE) {
686                                         talloc_free(msg);
687                                         return ldb_module_operr(module);
688                                 }
689                                 memcpy(&v.data[LDB_KV_GUID_SIZE*i],
690                                        list->dn[i].data,
691                                        LDB_KV_GUID_SIZE);
692                         }
693                         el->values[0] = v;
694                         el->num_values = 1;
695                 }
696         }
697
698         ret = ldb_kv_store(module, msg, TDB_REPLACE);
699         talloc_free(msg);
700         return ret;
701 }
702
703 /*
704   save a dn_list into the database, in either @IDX or internal format
705  */
706 static int ldb_kv_dn_list_store(struct ldb_module *module,
707                                 struct ldb_dn *dn,
708                                 struct dn_list *list)
709 {
710         struct ldb_kv_private *ldb_kv = talloc_get_type(
711             ldb_module_get_private(module), struct ldb_kv_private);
712         TDB_DATA rec, key;
713         int ret;
714         struct dn_list *list2;
715
716         if (ldb_kv->idxptr == NULL) {
717                 return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
718         }
719
720         if (ldb_kv->idxptr->itdb == NULL) {
721                 ldb_kv->idxptr->itdb =
722                     tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
723                 if (ldb_kv->idxptr->itdb == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726         }
727
728         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
729         if (key.dptr == NULL) {
730                 return LDB_ERR_OPERATIONS_ERROR;
731         }
732         key.dsize = strlen((char *)key.dptr);
733
734         rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
735         if (rec.dptr != NULL) {
736                 list2 = ldb_kv_index_idxptr(module, rec, false);
737                 if (list2 == NULL) {
738                         free(rec.dptr);
739                         return LDB_ERR_OPERATIONS_ERROR;
740                 }
741                 free(rec.dptr);
742                 list2->dn = talloc_steal(list2, list->dn);
743                 list2->count = list->count;
744                 return LDB_SUCCESS;
745         }
746
747         list2 = talloc(ldb_kv->idxptr, struct dn_list);
748         if (list2 == NULL) {
749                 return LDB_ERR_OPERATIONS_ERROR;
750         }
751         list2->dn = talloc_steal(list2, list->dn);
752         list2->count = list->count;
753
754         rec.dptr = (uint8_t *)&list2;
755         rec.dsize = sizeof(void *);
756
757
758         /*
759          * This is not a store into the main DB, but into an in-memory
760          * TDB, so we don't need a guard on ltdb->read_only
761          */
762         ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
763         if (ret != 0) {
764                 return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
765         }
766         return LDB_SUCCESS;
767 }
768
769 /*
770   traverse function for storing the in-memory index entries on disk
771  */
772 static int ldb_kv_index_traverse_store(struct tdb_context *tdb,
773                                        TDB_DATA key,
774                                        TDB_DATA data,
775                                        void *state)
776 {
777         struct ldb_module *module = state;
778         struct ldb_kv_private *ldb_kv = talloc_get_type(
779             ldb_module_get_private(module), struct ldb_kv_private);
780         struct ldb_dn *dn;
781         struct ldb_context *ldb = ldb_module_get_ctx(module);
782         struct ldb_val v;
783         struct dn_list *list;
784
785         list = ldb_kv_index_idxptr(module, data, true);
786         if (list == NULL) {
787                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
788                 return -1;
789         }
790
791         v.data = key.dptr;
792         v.length = strnlen((char *)key.dptr, key.dsize);
793
794         dn = ldb_dn_from_ldb_val(module, ldb, &v);
795         if (dn == NULL) {
796                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
797                 ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
798                 return -1;
799         }
800
801         ldb_kv->idxptr->error =
802             ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
803         talloc_free(dn);
804         if (ldb_kv->idxptr->error != 0) {
805                 return -1;
806         }
807         return 0;
808 }
809
810 /* cleanup the idxptr mode when transaction commits */
811 int ldb_kv_index_transaction_commit(struct ldb_module *module)
812 {
813         struct ldb_kv_private *ldb_kv = talloc_get_type(
814             ldb_module_get_private(module), struct ldb_kv_private);
815         int ret;
816
817         struct ldb_context *ldb = ldb_module_get_ctx(module);
818
819         ldb_reset_err_string(ldb);
820
821         if (ldb_kv->idxptr->itdb) {
822                 tdb_traverse(
823                     ldb_kv->idxptr->itdb, ldb_kv_index_traverse_store, module);
824                 tdb_close(ldb_kv->idxptr->itdb);
825         }
826
827         ret = ldb_kv->idxptr->error;
828         if (ret != LDB_SUCCESS) {
829                 if (!ldb_errstring(ldb)) {
830                         ldb_set_errstring(ldb, ldb_strerror(ret));
831                 }
832                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
833         }
834
835         talloc_free(ldb_kv->idxptr);
836         ldb_kv->idxptr = NULL;
837         return ret;
838 }
839
840 /* cleanup the idxptr mode when transaction cancels */
841 int ldb_kv_index_transaction_cancel(struct ldb_module *module)
842 {
843         struct ldb_kv_private *ldb_kv = talloc_get_type(
844             ldb_module_get_private(module), struct ldb_kv_private);
845         if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
846                 tdb_close(ldb_kv->idxptr->itdb);
847         }
848         talloc_free(ldb_kv->idxptr);
849         ldb_kv->idxptr = NULL;
850         return LDB_SUCCESS;
851 }
852
853
854 /*
855   return the dn key to be used for an index
856   the caller is responsible for freeing
857 */
858 static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
859                                        struct ldb_kv_private *ldb_kv,
860                                        const char *attr,
861                                        const struct ldb_val *value,
862                                        const struct ldb_schema_attribute **ap,
863                                        enum key_truncation *truncation)
864 {
865         struct ldb_dn *ret;
866         struct ldb_val v;
867         const struct ldb_schema_attribute *a = NULL;
868         char *attr_folded = NULL;
869         const char *attr_for_dn = NULL;
870         int r;
871         bool should_b64_encode;
872
873         unsigned int max_key_length = ldb_kv_max_key_length(ldb_kv);
874         size_t key_len = 0;
875         size_t attr_len = 0;
876         const size_t indx_len = sizeof(LDB_KV_INDEX) - 1;
877         unsigned frmt_len = 0;
878         const size_t additional_key_length = 4;
879         unsigned int num_separators = 3; /* Estimate for overflow check */
880         const size_t min_data = 1;
881         const size_t min_key_length = additional_key_length
882                 + indx_len + num_separators + min_data;
883
884         if (attr[0] == '@') {
885                 attr_for_dn = attr;
886                 v = *value;
887                 if (ap != NULL) {
888                         *ap = NULL;
889                 }
890         } else {
891                 attr_folded = ldb_attr_casefold(ldb, attr);
892                 if (!attr_folded) {
893                         return NULL;
894                 }
895
896                 attr_for_dn = attr_folded;
897
898                 a = ldb_schema_attribute_by_name(ldb, attr);
899                 if (ap) {
900                         *ap = a;
901                 }
902                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
903                 if (r != LDB_SUCCESS) {
904                         const char *errstr = ldb_errstring(ldb);
905                         /* canonicalisation can be refused. For
906                            example, a attribute that takes wildcards
907                            will refuse to canonicalise if the value
908                            contains a wildcard */
909                         ldb_asprintf_errstring(ldb,
910                                                "Failed to create index "
911                                                "key for attribute '%s':%s%s%s",
912                                                attr, ldb_strerror(r),
913                                                (errstr?":":""),
914                                                (errstr?errstr:""));
915                         talloc_free(attr_folded);
916                         return NULL;
917                 }
918         }
919         attr_len = strlen(attr_for_dn);
920
921         /*
922          * Check if there is any hope this will fit into the DB.
923          * Overflow here is not actually critical the code below
924          * checks again to make the printf and the DB does another
925          * check for too long keys
926          */
927         if (max_key_length - attr_len < min_key_length) {
928                 ldb_asprintf_errstring(
929                         ldb,
930                         __location__ ": max_key_length "
931                         "is too small (%u) < (%u)",
932                         max_key_length,
933                         (unsigned)(min_key_length + attr_len));
934                 talloc_free(attr_folded);
935                 return NULL;
936         }
937
938         /*
939          * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
940          * "DN=" and a trailing string terminator
941          */
942         max_key_length -= additional_key_length;
943
944         /*
945          * We do not base 64 encode a DN in a key, it has already been
946          * casefold and lineraized, that is good enough.  That already
947          * avoids embedded NUL etc.
948          */
949         if (ldb_kv->cache->GUID_index_attribute != NULL) {
950                 if (strcmp(attr, LDB_KV_IDXDN) == 0) {
951                         should_b64_encode = false;
952                 } else if (strcmp(attr, LDB_KV_IDXONE) == 0) {
953                         /*
954                          * We can only change the behaviour for IDXONE
955                          * when the GUID index is enabled
956                          */
957                         should_b64_encode = false;
958                 } else {
959                         should_b64_encode
960                                 = ldb_should_b64_encode(ldb, &v);
961                 }
962         } else {
963                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
964         }
965
966         if (should_b64_encode) {
967                 size_t vstr_len = 0;
968                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
969                 if (!vstr) {
970                         talloc_free(attr_folded);
971                         return NULL;
972                 }
973                 vstr_len = strlen(vstr);
974                 /*
975                  * Overflow here is not critical as we only use this
976                  * to choose the printf truncation
977                  */
978                 key_len = num_separators + indx_len + attr_len + vstr_len;
979                 if (key_len > max_key_length) {
980                         size_t excess = key_len - max_key_length;
981                         frmt_len = vstr_len - excess;
982                         *truncation = KEY_TRUNCATED;
983                         /*
984                         * Truncated keys are placed in a separate key space
985                         * from the non truncated keys
986                         * Note: the double hash "##" is not a typo and
987                         * indicates that the following value is base64 encoded
988                         */
989                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
990                                              LDB_KV_INDEX, attr_for_dn,
991                                              frmt_len, vstr);
992                 } else {
993                         frmt_len = vstr_len;
994                         *truncation = KEY_NOT_TRUNCATED;
995                         /*
996                          * Note: the double colon "::" is not a typo and
997                          * indicates that the following value is base64 encoded
998                          */
999                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
1000                                              LDB_KV_INDEX, attr_for_dn,
1001                                              frmt_len, vstr);
1002                 }
1003                 talloc_free(vstr);
1004         } else {
1005                 /* Only need two seperators */
1006                 num_separators = 2;
1007
1008                 /*
1009                  * Overflow here is not critical as we only use this
1010                  * to choose the printf truncation
1011                  */
1012                 key_len = num_separators + indx_len + attr_len + (int)v.length;
1013                 if (key_len > max_key_length) {
1014                         size_t excess = key_len - max_key_length;
1015                         frmt_len = v.length - excess;
1016                         *truncation = KEY_TRUNCATED;
1017                         /*
1018                          * Truncated keys are placed in a separate key space
1019                          * from the non truncated keys
1020                          */
1021                         ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
1022                                              LDB_KV_INDEX, attr_for_dn,
1023                                              frmt_len, (char *)v.data);
1024                 } else {
1025                         frmt_len = v.length;
1026                         *truncation = KEY_NOT_TRUNCATED;
1027                         ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
1028                                              LDB_KV_INDEX, attr_for_dn,
1029                                              frmt_len, (char *)v.data);
1030                 }
1031         }
1032
1033         if (v.data != value->data) {
1034                 talloc_free(v.data);
1035         }
1036         talloc_free(attr_folded);
1037
1038         return ret;
1039 }
1040
1041 /*
1042   see if a attribute value is in the list of indexed attributes
1043 */
1044 static bool ldb_kv_is_indexed(struct ldb_module *module,
1045                               struct ldb_kv_private *ldb_kv,
1046                               const char *attr)
1047 {
1048         struct ldb_context *ldb = ldb_module_get_ctx(module);
1049         unsigned int i;
1050         struct ldb_message_element *el;
1051
1052         if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1053             (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) == 0)) {
1054                 /* Implicity covered, this is the index key */
1055                 return false;
1056         }
1057         if (ldb->schema.index_handler_override) {
1058                 const struct ldb_schema_attribute *a
1059                         = ldb_schema_attribute_by_name(ldb, attr);
1060
1061                 if (a == NULL) {
1062                         return false;
1063                 }
1064
1065                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
1066                         return true;
1067                 } else {
1068                         return false;
1069                 }
1070         }
1071
1072         if (!ldb_kv->cache->attribute_indexes) {
1073                 return false;
1074         }
1075
1076         el = ldb_msg_find_element(ldb_kv->cache->indexlist, LDB_KV_IDXATTR);
1077         if (el == NULL) {
1078                 return false;
1079         }
1080
1081         /* TODO: this is too expensive! At least use a binary search */
1082         for (i=0; i<el->num_values; i++) {
1083                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
1084                         return true;
1085                 }
1086         }
1087         return false;
1088 }
1089
1090 /*
1091   in the following logic functions, the return value is treated as
1092   follows:
1093
1094      LDB_SUCCESS: we found some matching index values
1095
1096      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
1097
1098      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
1099                                we'll need a full search
1100  */
1101
1102 /*
1103   return a list of dn's that might match a simple indexed search (an
1104   equality search only)
1105  */
1106 static int ldb_kv_index_dn_simple(struct ldb_module *module,
1107                                   struct ldb_kv_private *ldb_kv,
1108                                   const struct ldb_parse_tree *tree,
1109                                   struct dn_list *list)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_dn *dn;
1113         int ret;
1114         enum key_truncation truncation = KEY_NOT_TRUNCATED;
1115
1116         ldb = ldb_module_get_ctx(module);
1117
1118         list->count = 0;
1119         list->dn = NULL;
1120
1121         /* if the attribute isn't in the list of indexed attributes then
1122            this node needs a full search */
1123         if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.equality.attr)) {
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* the attribute is indexed. Pull the list of DNs that match the
1128            search criterion */
1129         dn = ldb_kv_index_key(ldb,
1130                               ldb_kv,
1131                               tree->u.equality.attr,
1132                               &tree->u.equality.value,
1133                               NULL,
1134                               &truncation);
1135         /*
1136          * We ignore truncation here and allow multi-valued matches
1137          * as ltdb_search_indexed will filter out the wrong one in
1138          * ltdb_index_filter() which calls ldb_match_message().
1139          */
1140         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
1141
1142         ret = ldb_kv_dn_list_load(module, ldb_kv, dn, list);
1143         talloc_free(dn);
1144         return ret;
1145 }
1146
1147 static bool list_union(struct ldb_context *ldb,
1148                        struct ldb_kv_private *ldb_kv,
1149                        struct dn_list *list,
1150                        struct dn_list *list2);
1151
1152 /*
1153   return a list of dn's that might match a leaf indexed search
1154  */
1155 static int ldb_kv_index_dn_leaf(struct ldb_module *module,
1156                                 struct ldb_kv_private *ldb_kv,
1157                                 const struct ldb_parse_tree *tree,
1158                                 struct dn_list *list)
1159 {
1160         if (ldb_kv->disallow_dn_filter &&
1161             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
1162                 /* in AD mode we do not support "(dn=...)" search filters */
1163                 list->dn = NULL;
1164                 list->count = 0;
1165                 return LDB_SUCCESS;
1166         }
1167         if (tree->u.equality.attr[0] == '@') {
1168                 /* Do not allow a indexed search against an @ */
1169                 list->dn = NULL;
1170                 list->count = 0;
1171                 return LDB_SUCCESS;
1172         }
1173         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
1174                 enum key_truncation truncation = KEY_NOT_TRUNCATED;
1175                 bool valid_dn = false;
1176                 struct ldb_dn *dn
1177                         = ldb_dn_from_ldb_val(list,
1178                                               ldb_module_get_ctx(module),
1179                                               &tree->u.equality.value);
1180                 if (dn == NULL) {
1181                         /* If we can't parse it, no match */
1182                         list->dn = NULL;
1183                         list->count = 0;
1184                         return LDB_SUCCESS;
1185                 }
1186
1187                 valid_dn = ldb_dn_validate(dn);
1188                 if (valid_dn == false) {
1189                         /* If we can't parse it, no match */
1190                         list->dn = NULL;
1191                         list->count = 0;
1192                         return LDB_SUCCESS;
1193                 }
1194
1195                 /*
1196                  * Re-use the same code we use for a SCOPE_BASE
1197                  * search
1198                  *
1199                  * We can't call TALLOC_FREE(dn) as this must belong
1200                  * to list for the memory to remain valid.
1201                  */
1202                 return ldb_kv_index_dn_base_dn(
1203                     module, ldb_kv, dn, list, &truncation);
1204                 /*
1205                  * We ignore truncation here and allow multi-valued matches
1206                  * as ltdb_search_indexed will filter out the wrong one in
1207                  * ltdb_index_filter() which calls ldb_match_message().
1208                  */
1209
1210         } else if ((ldb_kv->cache->GUID_index_attribute != NULL) &&
1211                    (ldb_attr_cmp(tree->u.equality.attr,
1212                                  ldb_kv->cache->GUID_index_attribute) == 0)) {
1213                 int ret;
1214                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1215                 list->dn = talloc_array(list, struct ldb_val, 1);
1216                 if (list->dn == NULL) {
1217                         ldb_module_oom(module);
1218                         return LDB_ERR_OPERATIONS_ERROR;
1219                 }
1220                 /*
1221                  * We need to go via the canonicalise_fn() to
1222                  * ensure we get the index in binary, rather
1223                  * than a string
1224                  */
1225                 ret = ldb_kv->GUID_index_syntax->canonicalise_fn(
1226                     ldb, list->dn, &tree->u.equality.value, &list->dn[0]);
1227                 if (ret != LDB_SUCCESS) {
1228                         return LDB_ERR_OPERATIONS_ERROR;
1229                 }
1230                 list->count = 1;
1231                 return LDB_SUCCESS;
1232         }
1233
1234         return ldb_kv_index_dn_simple(module, ldb_kv, tree, list);
1235 }
1236
1237
1238 /*
1239   list intersection
1240   list = list & list2
1241 */
1242 static bool list_intersect(struct ldb_context *ldb,
1243                            struct ldb_kv_private *ldb_kv,
1244                            struct dn_list *list,
1245                            const struct dn_list *list2)
1246 {
1247         const struct dn_list *short_list, *long_list;
1248         struct dn_list *list3;
1249         unsigned int i;
1250
1251         if (list->count == 0) {
1252                 /* 0 & X == 0 */
1253                 return true;
1254         }
1255         if (list2->count == 0) {
1256                 /* X & 0 == 0 */
1257                 list->count = 0;
1258                 list->dn = NULL;
1259                 return true;
1260         }
1261
1262         /* the indexing code is allowed to return a longer list than
1263            what really matches, as all results are filtered by the
1264            full expression at the end - this shortcut avoids a lot of
1265            work in some cases */
1266         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1267                 return true;
1268         }
1269         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1270                 list->count = list2->count;
1271                 list->dn = list2->dn;
1272                 /* note that list2 may not be the parent of list2->dn,
1273                    as list2->dn may be owned by ltdb->idxptr. In that
1274                    case we expect this reparent call to fail, which is
1275                    OK */
1276                 talloc_reparent(list2, list, list2->dn);
1277                 return true;
1278         }
1279
1280         if (list->count > list2->count) {
1281                 short_list = list2;
1282                 long_list = list;
1283         } else {
1284                 short_list = list;
1285                 long_list = list2;
1286         }
1287
1288         list3 = talloc_zero(list, struct dn_list);
1289         if (list3 == NULL) {
1290                 return false;
1291         }
1292
1293         list3->dn = talloc_array(list3, struct ldb_val,
1294                                  MIN(list->count, list2->count));
1295         if (!list3->dn) {
1296                 talloc_free(list3);
1297                 return false;
1298         }
1299         list3->count = 0;
1300
1301         for (i=0;i<short_list->count;i++) {
1302                 /* For the GUID index case, this is a binary search */
1303                 if (ldb_kv_dn_list_find_val(
1304                         ldb_kv, long_list, &short_list->dn[i]) != -1) {
1305                         list3->dn[list3->count] = short_list->dn[i];
1306                         list3->count++;
1307                 }
1308         }
1309
1310         list->strict |= list2->strict;
1311         list->dn = talloc_steal(list, list3->dn);
1312         list->count = list3->count;
1313         talloc_free(list3);
1314
1315         return true;
1316 }
1317
1318
1319 /*
1320   list union
1321   list = list | list2
1322 */
1323 static bool list_union(struct ldb_context *ldb,
1324                        struct ldb_kv_private *ldb_kv,
1325                        struct dn_list *list,
1326                        struct dn_list *list2)
1327 {
1328         struct ldb_val *dn3;
1329         unsigned int i = 0, j = 0, k = 0;
1330
1331         if (list2->count == 0) {
1332                 /* X | 0 == X */
1333                 return true;
1334         }
1335
1336         if (list->count == 0) {
1337                 /* 0 | X == X */
1338                 list->count = list2->count;
1339                 list->dn = list2->dn;
1340                 /* note that list2 may not be the parent of list2->dn,
1341                    as list2->dn may be owned by ltdb->idxptr. In that
1342                    case we expect this reparent call to fail, which is
1343                    OK */
1344                 talloc_reparent(list2, list, list2->dn);
1345                 return true;
1346         }
1347
1348         /*
1349          * Sort the lists (if not in GUID DN mode) so we can do
1350          * the de-duplication during the merge
1351          *
1352          * NOTE: This can sort the in-memory index values, as list or
1353          * list2 might not be a copy!
1354          */
1355         ldb_kv_dn_list_sort(ldb_kv, list);
1356         ldb_kv_dn_list_sort(ldb_kv, list2);
1357
1358         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1359         if (!dn3) {
1360                 ldb_oom(ldb);
1361                 return false;
1362         }
1363
1364         while (i < list->count || j < list2->count) {
1365                 int cmp;
1366                 if (i >= list->count) {
1367                         cmp = 1;
1368                 } else if (j >= list2->count) {
1369                         cmp = -1;
1370                 } else {
1371                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1372                                                           &list2->dn[j]);
1373                 }
1374
1375                 if (cmp < 0) {
1376                         /* Take list */
1377                         dn3[k] = list->dn[i];
1378                         i++;
1379                         k++;
1380                 } else if (cmp > 0) {
1381                         /* Take list2 */
1382                         dn3[k] = list2->dn[j];
1383                         j++;
1384                         k++;
1385                 } else {
1386                         /* Equal, take list */
1387                         dn3[k] = list->dn[i];
1388                         i++;
1389                         j++;
1390                         k++;
1391                 }
1392         }
1393
1394         list->dn = dn3;
1395         list->count = k;
1396
1397         return true;
1398 }
1399
1400 static int ldb_kv_index_dn(struct ldb_module *module,
1401                            struct ldb_kv_private *ldb_kv,
1402                            const struct ldb_parse_tree *tree,
1403                            struct dn_list *list);
1404
1405 /*
1406   process an OR list (a union)
1407  */
1408 static int ldb_kv_index_dn_or(struct ldb_module *module,
1409                               struct ldb_kv_private *ldb_kv,
1410                               const struct ldb_parse_tree *tree,
1411                               struct dn_list *list)
1412 {
1413         struct ldb_context *ldb;
1414         unsigned int i;
1415
1416         ldb = ldb_module_get_ctx(module);
1417
1418         list->dn = NULL;
1419         list->count = 0;
1420
1421         for (i=0; i<tree->u.list.num_elements; i++) {
1422                 struct dn_list *list2;
1423                 int ret;
1424
1425                 list2 = talloc_zero(list, struct dn_list);
1426                 if (list2 == NULL) {
1427                         return LDB_ERR_OPERATIONS_ERROR;
1428                 }
1429
1430                 ret = ldb_kv_index_dn(
1431                     module, ldb_kv, tree->u.list.elements[i], list2);
1432
1433                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1434                         /* X || 0 == X */
1435                         talloc_free(list2);
1436                         continue;
1437                 }
1438
1439                 if (ret != LDB_SUCCESS) {
1440                         /* X || * == * */
1441                         talloc_free(list2);
1442                         return ret;
1443                 }
1444
1445                 if (!list_union(ldb, ldb_kv, list, list2)) {
1446                         talloc_free(list2);
1447                         return LDB_ERR_OPERATIONS_ERROR;
1448                 }
1449         }
1450
1451         if (list->count == 0) {
1452                 return LDB_ERR_NO_SUCH_OBJECT;
1453         }
1454
1455         return LDB_SUCCESS;
1456 }
1457
1458
1459 /*
1460   NOT an index results
1461  */
1462 static int ldb_kv_index_dn_not(struct ldb_module *module,
1463                                struct ldb_kv_private *ldb_kv,
1464                                const struct ldb_parse_tree *tree,
1465                                struct dn_list *list)
1466 {
1467         /* the only way to do an indexed not would be if we could
1468            negate the not via another not or if we knew the total
1469            number of database elements so we could know that the
1470            existing expression covered the whole database.
1471
1472            instead, we just give up, and rely on a full index scan
1473            (unless an outer & manages to reduce the list)
1474         */
1475         return LDB_ERR_OPERATIONS_ERROR;
1476 }
1477
1478 /*
1479  * These things are unique, so avoid a full scan if this is a search
1480  * by GUID, DN or a unique attribute
1481  */
1482 static bool ldb_kv_index_unique(struct ldb_context *ldb,
1483                                 struct ldb_kv_private *ldb_kv,
1484                                 const char *attr)
1485 {
1486         const struct ldb_schema_attribute *a;
1487         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1488                 if (ldb_attr_cmp(attr, ldb_kv->cache->GUID_index_attribute) ==
1489                     0) {
1490                         return true;
1491                 }
1492         }
1493         if (ldb_attr_dn(attr) == 0) {
1494                 return true;
1495         }
1496
1497         a = ldb_schema_attribute_by_name(ldb, attr);
1498         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1499                 return true;
1500         }
1501         return false;
1502 }
1503
1504 /*
1505   process an AND expression (intersection)
1506  */
1507 static int ldb_kv_index_dn_and(struct ldb_module *module,
1508                                struct ldb_kv_private *ldb_kv,
1509                                const struct ldb_parse_tree *tree,
1510                                struct dn_list *list)
1511 {
1512         struct ldb_context *ldb;
1513         unsigned int i;
1514         bool found;
1515
1516         ldb = ldb_module_get_ctx(module);
1517
1518         list->dn = NULL;
1519         list->count = 0;
1520
1521         /* in the first pass we only look for unique simple
1522            equality tests, in the hope of avoiding having to look
1523            at any others */
1524         for (i=0; i<tree->u.list.num_elements; i++) {
1525                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1526                 int ret;
1527
1528                 if (subtree->operation != LDB_OP_EQUALITY ||
1529                     !ldb_kv_index_unique(
1530                         ldb, ldb_kv, subtree->u.equality.attr)) {
1531                         continue;
1532                 }
1533
1534                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list);
1535                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1536                         /* 0 && X == 0 */
1537                         return LDB_ERR_NO_SUCH_OBJECT;
1538                 }
1539                 if (ret == LDB_SUCCESS) {
1540                         /* a unique index match means we can
1541                          * stop. Note that we don't care if we return
1542                          * a few too many objects, due to later
1543                          * filtering */
1544                         return LDB_SUCCESS;
1545                 }
1546         }
1547
1548         /* now do a full intersection */
1549         found = false;
1550
1551         for (i=0; i<tree->u.list.num_elements; i++) {
1552                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1553                 struct dn_list *list2;
1554                 int ret;
1555
1556                 list2 = talloc_zero(list, struct dn_list);
1557                 if (list2 == NULL) {
1558                         return ldb_module_oom(module);
1559                 }
1560
1561                 ret = ldb_kv_index_dn(module, ldb_kv, subtree, list2);
1562
1563                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1564                         /* X && 0 == 0 */
1565                         list->dn = NULL;
1566                         list->count = 0;
1567                         talloc_free(list2);
1568                         return LDB_ERR_NO_SUCH_OBJECT;
1569                 }
1570
1571                 if (ret != LDB_SUCCESS) {
1572                         /* this didn't adding anything */
1573                         talloc_free(list2);
1574                         continue;
1575                 }
1576
1577                 if (!found) {
1578                         talloc_reparent(list2, list, list->dn);
1579                         list->dn = list2->dn;
1580                         list->count = list2->count;
1581                         found = true;
1582                 } else if (!list_intersect(ldb, ldb_kv, list, list2)) {
1583                         talloc_free(list2);
1584                         return LDB_ERR_OPERATIONS_ERROR;
1585                 }
1586
1587                 if (list->count == 0) {
1588                         list->dn = NULL;
1589                         return LDB_ERR_NO_SUCH_OBJECT;
1590                 }
1591
1592                 if (list->count < 2) {
1593                         /* it isn't worth loading the next part of the tree */
1594                         return LDB_SUCCESS;
1595                 }
1596         }
1597
1598         if (!found) {
1599                 /* none of the attributes were indexed */
1600                 return LDB_ERR_OPERATIONS_ERROR;
1601         }
1602
1603         return LDB_SUCCESS;
1604 }
1605
1606 /*
1607   return a list of matching objects using a one-level index
1608  */
1609 static int ldb_kv_index_dn_attr(struct ldb_module *module,
1610                                 struct ldb_kv_private *ldb_kv,
1611                                 const char *attr,
1612                                 struct ldb_dn *dn,
1613                                 struct dn_list *list,
1614                                 enum key_truncation *truncation)
1615 {
1616         struct ldb_context *ldb;
1617         struct ldb_dn *key;
1618         struct ldb_val val;
1619         int ret;
1620
1621         ldb = ldb_module_get_ctx(module);
1622
1623         /* work out the index key from the parent DN */
1624         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1625         if (val.data == NULL) {
1626                 const char *dn_str = ldb_dn_get_linearized(dn);
1627                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1628                                        __location__
1629                                        ": Failed to get casefold DN "
1630                                        "from: %s",
1631                                        dn_str);
1632                 return LDB_ERR_OPERATIONS_ERROR;
1633         }
1634         val.length = strlen((char *)val.data);
1635         key = ldb_kv_index_key(ldb, ldb_kv, attr, &val, NULL, truncation);
1636         if (!key) {
1637                 ldb_oom(ldb);
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         ret = ldb_kv_dn_list_load(module, ldb_kv, key, list);
1642         talloc_free(key);
1643         if (ret != LDB_SUCCESS) {
1644                 return ret;
1645         }
1646
1647         if (list->count == 0) {
1648                 return LDB_ERR_NO_SUCH_OBJECT;
1649         }
1650
1651         return LDB_SUCCESS;
1652 }
1653
1654 /*
1655   return a list of matching objects using a one-level index
1656  */
1657 static int ldb_kv_index_dn_one(struct ldb_module *module,
1658                                struct ldb_kv_private *ldb_kv,
1659                                struct ldb_dn *parent_dn,
1660                                struct dn_list *list,
1661                                enum key_truncation *truncation)
1662 {
1663         /* Ensure we do not shortcut on intersection for this list */
1664         list->strict = true;
1665         return ldb_kv_index_dn_attr(
1666             module, ldb_kv, LDB_KV_IDXONE, parent_dn, list, truncation);
1667 }
1668
1669 /*
1670   return a list of matching objects using the DN index
1671  */
1672 static int ldb_kv_index_dn_base_dn(struct ldb_module *module,
1673                                    struct ldb_kv_private *ldb_kv,
1674                                    struct ldb_dn *base_dn,
1675                                    struct dn_list *dn_list,
1676                                    enum key_truncation *truncation)
1677 {
1678         const struct ldb_val *guid_val = NULL;
1679         if (ldb_kv->cache->GUID_index_attribute == NULL) {
1680                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1681                 if (dn_list->dn == NULL) {
1682                         return ldb_module_oom(module);
1683                 }
1684                 dn_list->dn[0].data = discard_const_p(unsigned char,
1685                                                       ldb_dn_get_linearized(base_dn));
1686                 if (dn_list->dn[0].data == NULL) {
1687                         talloc_free(dn_list->dn);
1688                         return ldb_module_oom(module);
1689                 }
1690                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1691                 dn_list->count = 1;
1692
1693                 return LDB_SUCCESS;
1694         }
1695
1696         if (ldb_kv->cache->GUID_index_dn_component != NULL) {
1697                 guid_val = ldb_dn_get_extended_component(
1698                     base_dn, ldb_kv->cache->GUID_index_dn_component);
1699         }
1700
1701         if (guid_val != NULL) {
1702                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1703                 if (dn_list->dn == NULL) {
1704                         return ldb_module_oom(module);
1705                 }
1706                 dn_list->dn[0].data = guid_val->data;
1707                 dn_list->dn[0].length = guid_val->length;
1708                 dn_list->count = 1;
1709
1710                 return LDB_SUCCESS;
1711         }
1712
1713         return ldb_kv_index_dn_attr(
1714             module, ldb_kv, LDB_KV_IDXDN, base_dn, dn_list, truncation);
1715 }
1716
1717 /*
1718   return a list of dn's that might match a indexed search or
1719   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1720  */
1721 static int ldb_kv_index_dn(struct ldb_module *module,
1722                            struct ldb_kv_private *ldb_kv,
1723                            const struct ldb_parse_tree *tree,
1724                            struct dn_list *list)
1725 {
1726         int ret = LDB_ERR_OPERATIONS_ERROR;
1727
1728         switch (tree->operation) {
1729         case LDB_OP_AND:
1730                 ret = ldb_kv_index_dn_and(module, ldb_kv, tree, list);
1731                 break;
1732
1733         case LDB_OP_OR:
1734                 ret = ldb_kv_index_dn_or(module, ldb_kv, tree, list);
1735                 break;
1736
1737         case LDB_OP_NOT:
1738                 ret = ldb_kv_index_dn_not(module, ldb_kv, tree, list);
1739                 break;
1740
1741         case LDB_OP_EQUALITY:
1742                 ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
1743                 break;
1744
1745         case LDB_OP_SUBSTRING:
1746         case LDB_OP_GREATER:
1747         case LDB_OP_LESS:
1748         case LDB_OP_PRESENT:
1749         case LDB_OP_APPROX:
1750         case LDB_OP_EXTENDED:
1751                 /* we can't index with fancy bitops yet */
1752                 ret = LDB_ERR_OPERATIONS_ERROR;
1753                 break;
1754         }
1755
1756         return ret;
1757 }
1758
1759 /*
1760   filter a candidate dn_list from an indexed search into a set of results
1761   extracting just the given attributes
1762 */
1763 static int ldb_kv_index_filter(struct ldb_kv_private *ldb_kv,
1764                                const struct dn_list *dn_list,
1765                                struct ldb_kv_context *ac,
1766                                uint32_t *match_count,
1767                                enum key_truncation scope_one_truncation)
1768 {
1769         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1770         struct ldb_message *msg;
1771         struct ldb_message *filtered_msg;
1772         unsigned int i;
1773         unsigned int num_keys = 0;
1774         uint8_t previous_guid_key[LDB_KV_GUID_KEY_SIZE] = {};
1775         struct ldb_val *keys = NULL;
1776
1777         /*
1778          * We have to allocate the key list (rather than just walk the
1779          * caller supplied list) as the callback could change the list
1780          * (by modifying an indexed attribute hosted in the in-memory
1781          * index cache!)
1782          */
1783         keys = talloc_array(ac, struct ldb_val, dn_list->count);
1784         if (keys == NULL) {
1785                 return ldb_module_oom(ac->module);
1786         }
1787
1788         if (ldb_kv->cache->GUID_index_attribute != NULL) {
1789                 /*
1790                  * We speculate that the keys will be GUID based and so
1791                  * pre-fill in enough space for a GUID (avoiding a pile of
1792                  * small allocations)
1793                  */
1794                 struct guid_tdb_key {
1795                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
1796                 } *key_values = NULL;
1797
1798                 key_values = talloc_array(keys,
1799                                           struct guid_tdb_key,
1800                                           dn_list->count);
1801
1802                 if (key_values == NULL) {
1803                         talloc_free(keys);
1804                         return ldb_module_oom(ac->module);
1805                 }
1806                 for (i = 0; i < dn_list->count; i++) {
1807                         keys[i].data = key_values[i].guid_key;
1808                         keys[i].length = sizeof(key_values[i].guid_key);
1809                 }
1810         } else {
1811                 for (i = 0; i < dn_list->count; i++) {
1812                         keys[i].data = NULL;
1813                         keys[i].length = 0;
1814                 }
1815         }
1816
1817         for (i = 0; i < dn_list->count; i++) {
1818                 int ret;
1819
1820                 ret = ldb_kv_idx_to_key(
1821                     ac->module, ldb_kv, keys, &dn_list->dn[i], &keys[num_keys]);
1822                 if (ret != LDB_SUCCESS) {
1823                         talloc_free(keys);
1824                         return ret;
1825                 }
1826
1827                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
1828                         /*
1829                          * If we are in GUID index mode, then the dn_list is
1830                          * sorted.  If we got a duplicate, forget about it, as
1831                          * otherwise we would send the same entry back more
1832                          * than once.
1833                          *
1834                          * This is needed in the truncated DN case, or if a
1835                          * duplicate was forced in via
1836                          * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
1837                          */
1838
1839                         if (memcmp(previous_guid_key,
1840                                    keys[num_keys].data,
1841                                    sizeof(previous_guid_key)) == 0) {
1842                                 continue;
1843                         }
1844
1845                         memcpy(previous_guid_key,
1846                                keys[num_keys].data,
1847                                sizeof(previous_guid_key));
1848                 }
1849                 num_keys++;
1850         }
1851
1852
1853         /*
1854          * Now that the list is a safe copy, send the callbacks
1855          */
1856         for (i = 0; i < num_keys; i++) {
1857                 int ret;
1858                 bool matched;
1859                 msg = ldb_msg_new(ac);
1860                 if (!msg) {
1861                         talloc_free(keys);
1862                         return LDB_ERR_OPERATIONS_ERROR;
1863                 }
1864
1865                 ret =
1866                     ldb_kv_search_key(ac->module,
1867                                       ldb_kv,
1868                                       keys[i],
1869                                       msg,
1870                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
1871                                           LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1872                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1873                         /*
1874                          * the record has disappeared? yes, this can
1875                          * happen if the entry is deleted by something
1876                          * operating in the callback (not another
1877                          * process, as we have a read lock)
1878                          */
1879                         talloc_free(msg);
1880                         continue;
1881                 }
1882
1883                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1884                         /* an internal error */
1885                         talloc_free(keys);
1886                         talloc_free(msg);
1887                         return LDB_ERR_OPERATIONS_ERROR;
1888                 }
1889
1890                 /*
1891                  * We trust the index for LDB_SCOPE_ONELEVEL
1892                  * unless the index key has been truncated.
1893                  *
1894                  * LDB_SCOPE_BASE is not passed in by our only caller.
1895                  */
1896                 if (ac->scope == LDB_SCOPE_ONELEVEL &&
1897                     ldb_kv->cache->one_level_indexes &&
1898                     scope_one_truncation == KEY_NOT_TRUNCATED) {
1899                         ret = ldb_match_message(ldb, msg, ac->tree,
1900                                                 ac->scope, &matched);
1901                 } else {
1902                         ret = ldb_match_msg_error(ldb, msg,
1903                                                   ac->tree, ac->base,
1904                                                   ac->scope, &matched);
1905                 }
1906
1907                 if (ret != LDB_SUCCESS) {
1908                         talloc_free(keys);
1909                         talloc_free(msg);
1910                         return ret;
1911                 }
1912                 if (!matched) {
1913                         talloc_free(msg);
1914                         continue;
1915                 }
1916
1917                 /* filter the attributes that the user wants */
1918                 ret = ldb_kv_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1919
1920                 talloc_free(msg);
1921
1922                 if (ret == -1) {
1923                         talloc_free(keys);
1924                         return LDB_ERR_OPERATIONS_ERROR;
1925                 }
1926
1927                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1928                 if (ret != LDB_SUCCESS) {
1929                         /* Regardless of success or failure, the msg
1930                          * is the callbacks responsiblity, and should
1931                          * not be talloc_free()'ed */
1932                         ac->request_terminated = true;
1933                         talloc_free(keys);
1934                         return ret;
1935                 }
1936
1937                 (*match_count)++;
1938         }
1939
1940         TALLOC_FREE(keys);
1941         return LDB_SUCCESS;
1942 }
1943
1944 /*
1945   sort a DN list
1946  */
1947 static void ldb_kv_dn_list_sort(struct ldb_kv_private *ltdb,
1948                                 struct dn_list *list)
1949 {
1950         if (list->count < 2) {
1951                 return;
1952         }
1953
1954         /* We know the list is sorted when using the GUID index */
1955         if (ltdb->cache->GUID_index_attribute != NULL) {
1956                 return;
1957         }
1958
1959         TYPESAFE_QSORT(list->dn, list->count,
1960                        ldb_val_equal_exact_for_qsort);
1961 }
1962
1963 /*
1964   search the database with a LDAP-like expression using indexes
1965   returns -1 if an indexed search is not possible, in which
1966   case the caller should call ltdb_search_full()
1967 */
1968 int ldb_kv_search_indexed(struct ldb_kv_context *ac, uint32_t *match_count)
1969 {
1970         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1971         struct ldb_kv_private *ldb_kv = talloc_get_type(
1972             ldb_module_get_private(ac->module), struct ldb_kv_private);
1973         struct dn_list *dn_list;
1974         int ret;
1975         enum ldb_scope index_scope;
1976         enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
1977
1978         /* see if indexing is enabled */
1979         if (!ldb_kv->cache->attribute_indexes &&
1980             !ldb_kv->cache->one_level_indexes && ac->scope != LDB_SCOPE_BASE) {
1981                 /* fallback to a full search */
1982                 return LDB_ERR_OPERATIONS_ERROR;
1983         }
1984
1985         dn_list = talloc_zero(ac, struct dn_list);
1986         if (dn_list == NULL) {
1987                 return ldb_module_oom(ac->module);
1988         }
1989
1990         /*
1991          * For the purposes of selecting the switch arm below, if we
1992          * don't have a one-level index then treat it like a subtree
1993          * search
1994          */
1995         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1996             !ldb_kv->cache->one_level_indexes) {
1997                 index_scope = LDB_SCOPE_SUBTREE;
1998         } else {
1999                 index_scope = ac->scope;
2000         }
2001
2002         switch (index_scope) {
2003         case LDB_SCOPE_BASE:
2004                 /*
2005                  * The only caller will have filtered the operation out
2006                  * so we should never get here
2007                  */
2008                 return ldb_operr(ldb);
2009
2010         case LDB_SCOPE_ONELEVEL:
2011
2012                 /*
2013                  * First, load all the one-level child objects (regardless of
2014                  * whether they match the search filter or not). The database
2015                  * maintains a one-level index, so retrieving this is quick.
2016                  */
2017                 ret = ldb_kv_index_dn_one(ac->module,
2018                                           ldb_kv,
2019                                           ac->base,
2020                                           dn_list,
2021                                           &scope_one_truncation);
2022                 if (ret != LDB_SUCCESS) {
2023                         talloc_free(dn_list);
2024                         return ret;
2025                 }
2026
2027                 /*
2028                  * If we have too many children, running ldb_kv_index_filter()
2029                  * over all the child objects can be quite expensive. So next
2030                  * we do a separate indexed query using the search filter.
2031                  *
2032                  * This should be quick, but it may return objects that are not
2033                  * the direct one-level child objects we're interested in.
2034                  *
2035                  * We only do this in the GUID index mode, which is
2036                  * O(n*log(m)) otherwise the intersection below will
2037                  * be too costly at O(n*m).
2038                  *
2039                  * We don't set a heuristic for 'too many' but instead
2040                  * do it always and rely on the index lookup being
2041                  * fast enough in the small case.
2042                  */
2043                 if (ldb_kv->cache->GUID_index_attribute != NULL) {
2044                         struct dn_list *idx_one_tree_list
2045                                 = talloc_zero(ac, struct dn_list);
2046                         if (idx_one_tree_list == NULL) {
2047                                 talloc_free(dn_list);
2048                                 return ldb_module_oom(ac->module);
2049                         }
2050
2051                         if (!ldb_kv->cache->attribute_indexes) {
2052                                 talloc_free(idx_one_tree_list);
2053                                 talloc_free(dn_list);
2054                                 return LDB_ERR_OPERATIONS_ERROR;
2055                         }
2056
2057                         /*
2058                          * Try to do an indexed database search
2059                          */
2060                         ret = ldb_kv_index_dn(
2061                             ac->module, ldb_kv, ac->tree, idx_one_tree_list);
2062
2063                         /*
2064                          * We can stop if we're sure the object doesn't exist
2065                          */
2066                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2067                                 talloc_free(idx_one_tree_list);
2068                                 talloc_free(dn_list);
2069                                 return LDB_ERR_NO_SUCH_OBJECT;
2070                         }
2071
2072                         /*
2073                          * Once we have a successful search result, we
2074                          * intersect it with the one-level children (dn_list).
2075                          * This should give us exactly the result we're after
2076                          * (we still need to run ldb_kv_index_filter() to
2077                          * handle potential index truncation cases).
2078                          *
2079                          * The indexed search may fail because we don't support
2080                          * indexing on that type of search operation, e.g.
2081                          * matching against '*'. In which case we fall through
2082                          * and run ldb_kv_index_filter() over all the one-level
2083                          * children (which is still better than bailing out here
2084                          * and falling back to a full DB scan).
2085                          */
2086                         if (ret == LDB_SUCCESS) {
2087                                 if (!list_intersect(ldb,
2088                                                     ldb_kv,
2089                                                     dn_list,
2090                                                     idx_one_tree_list)) {
2091                                         talloc_free(idx_one_tree_list);
2092                                         talloc_free(dn_list);
2093                                         return LDB_ERR_OPERATIONS_ERROR;
2094                                 }
2095                         }
2096                 }
2097                 break;
2098
2099         case LDB_SCOPE_SUBTREE:
2100         case LDB_SCOPE_DEFAULT:
2101                 if (!ldb_kv->cache->attribute_indexes) {
2102                         talloc_free(dn_list);
2103                         return LDB_ERR_OPERATIONS_ERROR;
2104                 }
2105                 /*
2106                  * Here we load the index for the tree.  We have no
2107                  * index for the subtree.
2108                  */
2109                 ret = ldb_kv_index_dn(ac->module, ldb_kv, ac->tree, dn_list);
2110                 if (ret != LDB_SUCCESS) {
2111                         talloc_free(dn_list);
2112                         return ret;
2113                 }
2114                 break;
2115         }
2116
2117         /*
2118          * It is critical that this function do the re-filter even
2119          * on things found by the index as the index can over-match
2120          * in cases of truncation (as well as when it decides it is
2121          * not worth further filtering)
2122          *
2123          * If this changes, then the index code above would need to
2124          * pass up a flag to say if any index was truncated during
2125          * processing as the truncation here refers only to the
2126          * SCOPE_ONELEVEL index.
2127          */
2128         ret = ldb_kv_index_filter(
2129             ldb_kv, dn_list, ac, match_count, scope_one_truncation);
2130         talloc_free(dn_list);
2131         return ret;
2132 }
2133
2134 /**
2135  * @brief Add a DN in the index list of a given attribute name/value pair
2136  *
2137  * This function will add the DN in the index list for the index for
2138  * the given attribute name and value.
2139  *
2140  * @param[in]  module       A ldb_module structure
2141  *
2142  * @param[in]  dn           The string representation of the DN as it
2143  *                          will be stored in the index entry
2144  *
2145  * @param[in]  el           A ldb_message_element array, one of the entry
2146  *                          referred by the v_idx is the attribute name and
2147  *                          value pair which will be used to construct the
2148  *                          index name
2149  *
2150  * @param[in]  v_idx        The index of element in the el array to use
2151  *
2152  * @return                  An ldb error code
2153  */
2154 static int ldb_kv_index_add1(struct ldb_module *module,
2155                              struct ldb_kv_private *ldb_kv,
2156                              const struct ldb_message *msg,
2157                              struct ldb_message_element *el,
2158                              int v_idx)
2159 {
2160         struct ldb_context *ldb;
2161         struct ldb_dn *dn_key;
2162         int ret;
2163         const struct ldb_schema_attribute *a;
2164         struct dn_list *list;
2165         unsigned alloc_len;
2166         enum key_truncation truncation = KEY_TRUNCATED;
2167
2168
2169         ldb = ldb_module_get_ctx(module);
2170
2171         list = talloc_zero(module, struct dn_list);
2172         if (list == NULL) {
2173                 return LDB_ERR_OPERATIONS_ERROR;
2174         }
2175
2176         dn_key = ldb_kv_index_key(
2177             ldb, ldb_kv, el->name, &el->values[v_idx], &a, &truncation);
2178         if (!dn_key) {
2179                 talloc_free(list);
2180                 return LDB_ERR_OPERATIONS_ERROR;
2181         }
2182         /*
2183          * Samba only maintains unique indexes on the objectSID and objectGUID
2184          * so if a unique index key exceeds the maximum length there is a
2185          * problem.
2186          */
2187         if ((truncation == KEY_TRUNCATED) && (a != NULL &&
2188                 (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2189                 (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX)))) {
2190
2191                 ldb_asprintf_errstring(
2192                     ldb,
2193                     __location__ ": unique index key on %s in %s, "
2194                                  "exceeds maximum key length of %u (encoded).",
2195                     el->name,
2196                     ldb_dn_get_linearized(msg->dn),
2197                     ldb_kv->max_key_length);
2198                 talloc_free(list);
2199                 return LDB_ERR_CONSTRAINT_VIOLATION;
2200         }
2201         talloc_steal(list, dn_key);
2202
2203         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2204         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
2205                 talloc_free(list);
2206                 return ret;
2207         }
2208
2209         /*
2210          * Check for duplicates in the @IDXDN DN -> GUID record
2211          *
2212          * This is very normal, it just means a duplicate DN creation
2213          * was attempted, so don't set the error string or print scary
2214          * messages.
2215          */
2216         if (list->count > 0 &&
2217             ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0 &&
2218             truncation == KEY_NOT_TRUNCATED) {
2219
2220                 talloc_free(list);
2221                 return LDB_ERR_CONSTRAINT_VIOLATION;
2222
2223         } else if (list->count > 0
2224                    && ldb_attr_cmp(el->name, LDB_KV_IDXDN) == 0) {
2225
2226                 /*
2227                  * At least one existing entry in the DN->GUID index, which
2228                  * arises when the DN indexes have been truncated
2229                  *
2230                  * So need to pull the DN's to check if it's really a duplicate
2231                  */
2232                 int i;
2233                 for (i=0; i < list->count; i++) {
2234                         uint8_t guid_key[LDB_KV_GUID_KEY_SIZE];
2235                         struct ldb_val key = {
2236                                 .data = guid_key,
2237                                 .length = sizeof(guid_key)
2238                         };
2239                         const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
2240                         struct ldb_message *rec = ldb_msg_new(ldb);
2241                         if (rec == NULL) {
2242                                 return LDB_ERR_OPERATIONS_ERROR;
2243                         }
2244
2245                         ret = ldb_kv_idx_to_key(
2246                             module, ldb_kv, ldb, &list->dn[i], &key);
2247                         if (ret != LDB_SUCCESS) {
2248                                 TALLOC_FREE(list);
2249                                 TALLOC_FREE(rec);
2250                                 return ret;
2251                         }
2252
2253                         ret =
2254                             ldb_kv_search_key(module, ldb_kv, key, rec, flags);
2255                         if (key.data != guid_key) {
2256                                 TALLOC_FREE(key.data);
2257                         }
2258                         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2259                                 /*
2260                                  * the record has disappeared?
2261                                  * yes, this can happen
2262                                  */
2263                                 talloc_free(rec);
2264                                 continue;
2265                         }
2266
2267                         if (ret != LDB_SUCCESS) {
2268                                 /* an internal error */
2269                                 TALLOC_FREE(rec);
2270                                 TALLOC_FREE(list);
2271                                 return LDB_ERR_OPERATIONS_ERROR;
2272                         }
2273                         /*
2274                          * The DN we are trying to add to the DB and index
2275                          * is already here, so we must deny the addition
2276                          */
2277                         if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
2278                                 TALLOC_FREE(rec);
2279                                 TALLOC_FREE(list);
2280                                 return LDB_ERR_CONSTRAINT_VIOLATION;
2281                         }
2282                 }
2283         }
2284
2285         /*
2286          * Check for duplicates in unique indexes
2287          *
2288          * We don't need to do a loop test like the @IDXDN case
2289          * above as we have a ban on long unique index values
2290          * at the start of this function.
2291          */
2292         if (list->count > 0 &&
2293             ((a != NULL
2294               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
2295                   (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))))) {
2296                 /*
2297                  * We do not want to print info about a possibly
2298                  * confidential DN that the conflict was with in the
2299                  * user-visible error string
2300                  */
2301
2302                 if (ldb_kv->cache->GUID_index_attribute == NULL) {
2303                         ldb_debug(ldb, LDB_DEBUG_WARNING,
2304                                   __location__
2305                                   ": unique index violation on %s in %s, "
2306                                   "conficts with %*.*s in %s",
2307                                   el->name, ldb_dn_get_linearized(msg->dn),
2308                                   (int)list->dn[0].length,
2309                                   (int)list->dn[0].length,
2310                                   list->dn[0].data,
2311                                   ldb_dn_get_linearized(dn_key));
2312                 } else {
2313                         /* This can't fail, gives a default at worst */
2314                         const struct ldb_schema_attribute *attr =
2315                             ldb_schema_attribute_by_name(
2316                                 ldb, ldb_kv->cache->GUID_index_attribute);
2317                         struct ldb_val v;
2318                         ret = attr->syntax->ldif_write_fn(ldb, list,
2319                                                           &list->dn[0], &v);
2320                         if (ret == LDB_SUCCESS) {
2321                                 ldb_debug(ldb,
2322                                           LDB_DEBUG_WARNING,
2323                                           __location__
2324                                           ": unique index violation on %s in "
2325                                           "%s, conficts with %s %*.*s in %s",
2326                                           el->name,
2327                                           ldb_dn_get_linearized(msg->dn),
2328                                           ldb_kv->cache->GUID_index_attribute,
2329                                           (int)v.length,
2330                                           (int)v.length,
2331                                           v.data,
2332                                           ldb_dn_get_linearized(dn_key));
2333                         }
2334                 }
2335                 ldb_asprintf_errstring(ldb,
2336                                        __location__ ": unique index violation "
2337                                        "on %s in %s",
2338                                        el->name,
2339                                        ldb_dn_get_linearized(msg->dn));
2340                 talloc_free(list);
2341                 return LDB_ERR_CONSTRAINT_VIOLATION;
2342         }
2343
2344         /* overallocate the list a bit, to reduce the number of
2345          * realloc trigered copies */
2346         alloc_len = ((list->count+1)+7) & ~7;
2347         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
2348         if (list->dn == NULL) {
2349                 talloc_free(list);
2350                 return LDB_ERR_OPERATIONS_ERROR;
2351         }
2352
2353         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2354                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
2355                 list->dn[list->count].data
2356                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
2357                 if (list->dn[list->count].data == NULL) {
2358                         talloc_free(list);
2359                         return LDB_ERR_OPERATIONS_ERROR;
2360                 }
2361                 list->dn[list->count].length = strlen(dn_str);
2362         } else {
2363                 const struct ldb_val *key_val;
2364                 struct ldb_val *exact = NULL, *next = NULL;
2365                 key_val = ldb_msg_find_ldb_val(
2366                     msg, ldb_kv->cache->GUID_index_attribute);
2367                 if (key_val == NULL) {
2368                         talloc_free(list);
2369                         return ldb_module_operr(module);
2370                 }
2371
2372                 if (key_val->length != LDB_KV_GUID_SIZE) {
2373                         talloc_free(list);
2374                         return ldb_module_operr(module);
2375                 }
2376
2377                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
2378                                         *key_val, ldb_val_equal_exact_ordered,
2379                                         exact, next);
2380
2381                 /*
2382                  * Give a warning rather than fail, this could be a
2383                  * duplicate value in the record allowed by a caller
2384                  * forcing in the value with
2385                  * LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK
2386                  */
2387                 if (exact != NULL && truncation == KEY_NOT_TRUNCATED) {
2388                         /* This can't fail, gives a default at worst */
2389                         const struct ldb_schema_attribute *attr =
2390                             ldb_schema_attribute_by_name(
2391                                 ldb, ldb_kv->cache->GUID_index_attribute);
2392                         struct ldb_val v;
2393                         ret = attr->syntax->ldif_write_fn(ldb, list,
2394                                                           exact, &v);
2395                         if (ret == LDB_SUCCESS) {
2396                                 ldb_debug(ldb,
2397                                           LDB_DEBUG_WARNING,
2398                                           __location__
2399                                           ": duplicate attribute value in %s "
2400                                           "for index on %s, "
2401                                           "duplicate of %s %*.*s in %s",
2402                                           ldb_dn_get_linearized(msg->dn),
2403                                           el->name,
2404                                           ldb_kv->cache->GUID_index_attribute,
2405                                           (int)v.length,
2406                                           (int)v.length,
2407                                           v.data,
2408                                           ldb_dn_get_linearized(dn_key));
2409                         }
2410                 }
2411
2412                 if (next == NULL) {
2413                         next = &list->dn[list->count];
2414                 } else {
2415                         memmove(&next[1], next,
2416                                 sizeof(*next) * (list->count - (next - list->dn)));
2417                 }
2418                 *next = ldb_val_dup(list->dn, key_val);
2419                 if (next->data == NULL) {
2420                         talloc_free(list);
2421                         return ldb_module_operr(module);
2422                 }
2423         }
2424         list->count++;
2425
2426         ret = ldb_kv_dn_list_store(module, dn_key, list);
2427
2428         talloc_free(list);
2429
2430         return ret;
2431 }
2432
2433 /*
2434   add index entries for one elements in a message
2435  */
2436 static int ldb_kv_index_add_el(struct ldb_module *module,
2437                                struct ldb_kv_private *ldb_kv,
2438                                const struct ldb_message *msg,
2439                                struct ldb_message_element *el)
2440 {
2441         unsigned int i;
2442         for (i = 0; i < el->num_values; i++) {
2443                 int ret = ldb_kv_index_add1(module, ldb_kv, msg, el, i);
2444                 if (ret != LDB_SUCCESS) {
2445                         return ret;
2446                 }
2447         }
2448
2449         return LDB_SUCCESS;
2450 }
2451
2452 /*
2453   add index entries for all elements in a message
2454  */
2455 static int ldb_kv_index_add_all(struct ldb_module *module,
2456                                 struct ldb_kv_private *ldb_kv,
2457                                 const struct ldb_message *msg)
2458 {
2459         struct ldb_message_element *elements = msg->elements;
2460         unsigned int i;
2461         const char *dn_str;
2462         int ret;
2463
2464         if (ldb_dn_is_special(msg->dn)) {
2465                 return LDB_SUCCESS;
2466         }
2467
2468         dn_str = ldb_dn_get_linearized(msg->dn);
2469         if (dn_str == NULL) {
2470                 return LDB_ERR_OPERATIONS_ERROR;
2471         }
2472
2473         ret = ldb_kv_write_index_dn_guid(module, msg, 1);
2474         if (ret != LDB_SUCCESS) {
2475                 return ret;
2476         }
2477
2478         if (!ldb_kv->cache->attribute_indexes) {
2479                 /* no indexed fields */
2480                 return LDB_SUCCESS;
2481         }
2482
2483         for (i = 0; i < msg->num_elements; i++) {
2484                 if (!ldb_kv_is_indexed(module, ldb_kv, elements[i].name)) {
2485                         continue;
2486                 }
2487                 ret = ldb_kv_index_add_el(module, ldb_kv, msg, &elements[i]);
2488                 if (ret != LDB_SUCCESS) {
2489                         struct ldb_context *ldb = ldb_module_get_ctx(module);
2490                         ldb_asprintf_errstring(ldb,
2491                                                __location__ ": Failed to re-index %s in %s - %s",
2492                                                elements[i].name, dn_str,
2493                                                ldb_errstring(ldb));
2494                         return ret;
2495                 }
2496         }
2497
2498         return LDB_SUCCESS;
2499 }
2500
2501
2502 /*
2503   insert a DN index for a message
2504 */
2505 static int ldb_kv_modify_index_dn(struct ldb_module *module,
2506                                   struct ldb_kv_private *ldb_kv,
2507                                   const struct ldb_message *msg,
2508                                   struct ldb_dn *dn,
2509                                   const char *index,
2510                                   int add)
2511 {
2512         struct ldb_message_element el;
2513         struct ldb_val val;
2514         int ret;
2515
2516         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
2517         if (val.data == NULL) {
2518                 const char *dn_str = ldb_dn_get_linearized(dn);
2519                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2520                                        __location__ ": Failed to modify %s "
2521                                                     "against %s in %s: failed "
2522                                                     "to get casefold DN",
2523                                        index,
2524                                        ldb_kv->cache->GUID_index_attribute,
2525                                        dn_str);
2526                 return LDB_ERR_OPERATIONS_ERROR;
2527         }
2528
2529         val.length = strlen((char *)val.data);
2530         el.name = index;
2531         el.values = &val;
2532         el.num_values = 1;
2533
2534         if (add) {
2535                 ret = ldb_kv_index_add1(module, ldb_kv, msg, &el, 0);
2536         } else { /* delete */
2537                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, &el, 0);
2538         }
2539
2540         if (ret != LDB_SUCCESS) {
2541                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2542                 const char *dn_str = ldb_dn_get_linearized(dn);
2543                 ldb_asprintf_errstring(ldb,
2544                                        __location__ ": Failed to modify %s "
2545                                                     "against %s in %s - %s",
2546                                        index,
2547                                        ldb_kv->cache->GUID_index_attribute,
2548                                        dn_str,
2549                                        ldb_errstring(ldb));
2550                 return ret;
2551         }
2552         return ret;
2553 }
2554
2555 /*
2556   insert a one level index for a message
2557 */
2558 static int ldb_kv_index_onelevel(struct ldb_module *module,
2559                                  const struct ldb_message *msg,
2560                                  int add)
2561 {
2562         struct ldb_kv_private *ldb_kv = talloc_get_type(
2563             ldb_module_get_private(module), struct ldb_kv_private);
2564         struct ldb_dn *pdn;
2565         int ret;
2566
2567         /* We index for ONE Level only if requested */
2568         if (!ldb_kv->cache->one_level_indexes) {
2569                 return LDB_SUCCESS;
2570         }
2571
2572         pdn = ldb_dn_get_parent(module, msg->dn);
2573         if (pdn == NULL) {
2574                 return LDB_ERR_OPERATIONS_ERROR;
2575         }
2576         ret =
2577             ldb_kv_modify_index_dn(module, ldb_kv, msg, pdn, LDB_KV_IDXONE, add);
2578
2579         talloc_free(pdn);
2580
2581         return ret;
2582 }
2583
2584 /*
2585   insert a one level index for a message
2586 */
2587 static int ldb_kv_write_index_dn_guid(struct ldb_module *module,
2588                                       const struct ldb_message *msg,
2589                                       int add)
2590 {
2591         int ret;
2592         struct ldb_kv_private *ldb_kv = talloc_get_type(
2593             ldb_module_get_private(module), struct ldb_kv_private);
2594
2595         /* We index for DN only if using a GUID index */
2596         if (ldb_kv->cache->GUID_index_attribute == NULL) {
2597                 return LDB_SUCCESS;
2598         }
2599
2600         ret = ldb_kv_modify_index_dn(
2601             module, ldb_kv, msg, msg->dn, LDB_KV_IDXDN, add);
2602
2603         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2604                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2605                                        "Entry %s already exists",
2606                                        ldb_dn_get_linearized(msg->dn));
2607                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2608         }
2609         return ret;
2610 }
2611
2612 /*
2613   add the index entries for a new element in a record
2614   The caller guarantees that these element values are not yet indexed
2615 */
2616 int ldb_kv_index_add_element(struct ldb_module *module,
2617                              struct ldb_kv_private *ldb_kv,
2618                              const struct ldb_message *msg,
2619                              struct ldb_message_element *el)
2620 {
2621         if (ldb_dn_is_special(msg->dn)) {
2622                 return LDB_SUCCESS;
2623         }
2624         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2625                 return LDB_SUCCESS;
2626         }
2627         return ldb_kv_index_add_el(module, ldb_kv, msg, el);
2628 }
2629
2630 /*
2631   add the index entries for a new record
2632 */
2633 int ldb_kv_index_add_new(struct ldb_module *module,
2634                          struct ldb_kv_private *ldb_kv,
2635                          const struct ldb_message *msg)
2636 {
2637         int ret;
2638
2639         if (ldb_dn_is_special(msg->dn)) {
2640                 return LDB_SUCCESS;
2641         }
2642
2643         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
2644         if (ret != LDB_SUCCESS) {
2645                 /*
2646                  * Because we can't trust the caller to be doing
2647                  * transactions properly, clean up any index for this
2648                  * entry rather than relying on a transaction
2649                  * cleanup
2650                  */
2651
2652                 ldb_kv_index_delete(module, msg);
2653                 return ret;
2654         }
2655
2656         ret = ldb_kv_index_onelevel(module, msg, 1);
2657         if (ret != LDB_SUCCESS) {
2658                 /*
2659                  * Because we can't trust the caller to be doing
2660                  * transactions properly, clean up any index for this
2661                  * entry rather than relying on a transaction
2662                  * cleanup
2663                  */
2664                 ldb_kv_index_delete(module, msg);
2665                 return ret;
2666         }
2667         return ret;
2668 }
2669
2670
2671 /*
2672   delete an index entry for one message element
2673 */
2674 int ldb_kv_index_del_value(struct ldb_module *module,
2675                            struct ldb_kv_private *ldb_kv,
2676                            const struct ldb_message *msg,
2677                            struct ldb_message_element *el,
2678                            unsigned int v_idx)
2679 {
2680         struct ldb_context *ldb;
2681         struct ldb_dn *dn_key;
2682         const char *dn_str;
2683         int ret, i;
2684         unsigned int j;
2685         struct dn_list *list;
2686         struct ldb_dn *dn = msg->dn;
2687         enum key_truncation truncation = KEY_NOT_TRUNCATED;
2688
2689         ldb = ldb_module_get_ctx(module);
2690
2691         dn_str = ldb_dn_get_linearized(dn);
2692         if (dn_str == NULL) {
2693                 return LDB_ERR_OPERATIONS_ERROR;
2694         }
2695
2696         if (dn_str[0] == '@') {
2697                 return LDB_SUCCESS;
2698         }
2699
2700         dn_key = ldb_kv_index_key(
2701             ldb, ldb_kv, el->name, &el->values[v_idx], NULL, &truncation);
2702         /*
2703          * We ignore key truncation in ltdb_index_add1() so
2704          * match that by ignoring it here as well
2705          *
2706          * Multiple values are legitimate and accepted
2707          */
2708         if (!dn_key) {
2709                 return LDB_ERR_OPERATIONS_ERROR;
2710         }
2711
2712         list = talloc_zero(dn_key, struct dn_list);
2713         if (list == NULL) {
2714                 talloc_free(dn_key);
2715                 return LDB_ERR_OPERATIONS_ERROR;
2716         }
2717
2718         ret = ldb_kv_dn_list_load(module, ldb_kv, dn_key, list);
2719         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2720                 /* it wasn't indexed. Did we have an earlier error? If we did then
2721                    its gone now */
2722                 talloc_free(dn_key);
2723                 return LDB_SUCCESS;
2724         }
2725
2726         if (ret != LDB_SUCCESS) {
2727                 talloc_free(dn_key);
2728                 return ret;
2729         }
2730
2731         /*
2732          * Find one of the values matching this message to remove
2733          */
2734         i = ldb_kv_dn_list_find_msg(ldb_kv, list, msg);
2735         if (i == -1) {
2736                 /* nothing to delete */
2737                 talloc_free(dn_key);
2738                 return LDB_SUCCESS;
2739         }
2740
2741         j = (unsigned int) i;
2742         if (j != list->count - 1) {
2743                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2744         }
2745         list->count--;
2746         if (list->count == 0) {
2747                 talloc_free(list->dn);
2748                 list->dn = NULL;
2749         } else {
2750                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2751         }
2752
2753         ret = ldb_kv_dn_list_store(module, dn_key, list);
2754
2755         talloc_free(dn_key);
2756
2757         return ret;
2758 }
2759
2760 /*
2761   delete the index entries for a element
2762   return -1 on failure
2763 */
2764 int ldb_kv_index_del_element(struct ldb_module *module,
2765                              struct ldb_kv_private *ldb_kv,
2766                              const struct ldb_message *msg,
2767                              struct ldb_message_element *el)
2768 {
2769         const char *dn_str;
2770         int ret;
2771         unsigned int i;
2772
2773         if (!ldb_kv->cache->attribute_indexes) {
2774                 /* no indexed fields */
2775                 return LDB_SUCCESS;
2776         }
2777
2778         dn_str = ldb_dn_get_linearized(msg->dn);
2779         if (dn_str == NULL) {
2780                 return LDB_ERR_OPERATIONS_ERROR;
2781         }
2782
2783         if (dn_str[0] == '@') {
2784                 return LDB_SUCCESS;
2785         }
2786
2787         if (!ldb_kv_is_indexed(module, ldb_kv, el->name)) {
2788                 return LDB_SUCCESS;
2789         }
2790         for (i = 0; i < el->num_values; i++) {
2791                 ret = ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
2792                 if (ret != LDB_SUCCESS) {
2793                         return ret;
2794                 }
2795         }
2796
2797         return LDB_SUCCESS;
2798 }
2799
2800 /*
2801   delete the index entries for a record
2802   return -1 on failure
2803 */
2804 int ldb_kv_index_delete(struct ldb_module *module,
2805                         const struct ldb_message *msg)
2806 {
2807         struct ldb_kv_private *ldb_kv = talloc_get_type(
2808             ldb_module_get_private(module), struct ldb_kv_private);
2809         int ret;
2810         unsigned int i;
2811
2812         if (ldb_dn_is_special(msg->dn)) {
2813                 return LDB_SUCCESS;
2814         }
2815
2816         ret = ldb_kv_index_onelevel(module, msg, 0);
2817         if (ret != LDB_SUCCESS) {
2818                 return ret;
2819         }
2820
2821         ret = ldb_kv_write_index_dn_guid(module, msg, 0);
2822         if (ret != LDB_SUCCESS) {
2823                 return ret;
2824         }
2825
2826         if (!ldb_kv->cache->attribute_indexes) {
2827                 /* no indexed fields */
2828                 return LDB_SUCCESS;
2829         }
2830
2831         for (i = 0; i < msg->num_elements; i++) {
2832                 ret = ldb_kv_index_del_element(
2833                     module, ldb_kv, msg, &msg->elements[i]);
2834                 if (ret != LDB_SUCCESS) {
2835                         return ret;
2836                 }
2837         }
2838
2839         return LDB_SUCCESS;
2840 }
2841
2842
2843 /*
2844   traversal function that deletes all @INDEX records in the in-memory
2845   TDB.
2846
2847   This does not touch the actual DB, that is done at transaction
2848   commit, which in turn greatly reduces DB churn as we will likely
2849   be able to do a direct update into the old record.
2850 */
2851 static int delete_index(struct ldb_kv_private *ldb_kv,
2852                         struct ldb_val key,
2853                         struct ldb_val data,
2854                         void *state)
2855 {
2856         struct ldb_module *module = state;
2857         const char *dnstr = "DN=" LDB_KV_INDEX ":";
2858         struct dn_list list;
2859         struct ldb_dn *dn;
2860         struct ldb_val v;
2861         int ret;
2862
2863         if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
2864                 return 0;
2865         }
2866         /* we need to put a empty list in the internal tdb for this
2867          * index entry */
2868         list.dn = NULL;
2869         list.count = 0;
2870
2871         /* the offset of 3 is to remove the DN= prefix. */
2872         v.data = key.data + 3;
2873         v.length = strnlen((char *)key.data, key.length) - 3;
2874
2875         dn = ldb_dn_from_ldb_val(ldb_kv, ldb_module_get_ctx(module), &v);
2876
2877         /*
2878          * This does not actually touch the DB quite yet, just
2879          * the in-memory index cache
2880          */
2881         ret = ldb_kv_dn_list_store(module, dn, &list);
2882         if (ret != LDB_SUCCESS) {
2883                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2884                                        "Unable to store null index for %s\n",
2885                                                 ldb_dn_get_linearized(dn));
2886                 talloc_free(dn);
2887                 return -1;
2888         }
2889         talloc_free(dn);
2890         return 0;
2891 }
2892
2893 /*
2894   traversal function that adds @INDEX records during a re index TODO wrong comment
2895 */
2896 static int re_key(struct ldb_kv_private *ldb_kv,
2897                   struct ldb_val key,
2898                   struct ldb_val val,
2899                   void *state)
2900 {
2901         struct ldb_context *ldb;
2902         struct ldb_kv_reindex_context *ctx =
2903             (struct ldb_kv_reindex_context *)state;
2904         struct ldb_module *module = ctx->module;
2905         struct ldb_message *msg;
2906         unsigned int nb_elements_in_db;
2907         int ret;
2908         struct ldb_val key2;
2909         bool is_record;
2910
2911         ldb = ldb_module_get_ctx(module);
2912
2913         if (key.length > 4 &&
2914             memcmp(key.data, "DN=@", 4) == 0) {
2915                 return 0;
2916         }
2917
2918         is_record = ldb_kv_key_is_record(key);
2919         if (is_record == false) {
2920                 return 0;
2921         }
2922
2923         msg = ldb_msg_new(module);
2924         if (msg == NULL) {
2925                 return -1;
2926         }
2927
2928         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2929                                                    msg,
2930                                                    NULL, 0,
2931                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2932                                                    &nb_elements_in_db);
2933         if (ret != 0) {
2934                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2935                                                 ldb_dn_get_linearized(msg->dn));
2936                 ctx->error = ret;
2937                 talloc_free(msg);
2938                 return -1;
2939         }
2940
2941         if (msg->dn == NULL) {
2942                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2943                           "Refusing to re-index as GUID "
2944                           "key %*.*s with no DN\n",
2945                           (int)key.length, (int)key.length,
2946                           (char *)key.data);
2947                 talloc_free(msg);
2948                 return -1;
2949         }
2950
2951         /* check if the DN key has changed, perhaps due to the case
2952            insensitivity of an element changing, or a change from DN
2953            to GUID keys */
2954         key2 = ldb_kv_key_msg(module, msg, msg);
2955         if (key2.data == NULL) {
2956                 /* probably a corrupt record ... darn */
2957                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2958                                                 ldb_dn_get_linearized(msg->dn));
2959                 talloc_free(msg);
2960                 return 0;
2961         }
2962         if (key.length != key2.length ||
2963             (memcmp(key.data, key2.data, key.length) != 0)) {
2964                 ldb_kv->kv_ops->update_in_iterate(
2965                     ldb_kv, key, key2, val, ctx);
2966         }
2967         talloc_free(key2.data);
2968
2969         talloc_free(msg);
2970
2971         ctx->count++;
2972         if (ctx->count % 10000 == 0) {
2973                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2974                           "Reindexing: re-keyed %u records so far",
2975                           ctx->count);
2976         }
2977
2978         return 0;
2979 }
2980
2981 /*
2982   traversal function that adds @INDEX records during a re index
2983 */
2984 static int re_index(struct ldb_kv_private *ldb_kv,
2985                     struct ldb_val key,
2986                     struct ldb_val val,
2987                     void *state)
2988 {
2989         struct ldb_context *ldb;
2990         struct ldb_kv_reindex_context *ctx =
2991             (struct ldb_kv_reindex_context *)state;
2992         struct ldb_module *module = ctx->module;
2993         struct ldb_message *msg;
2994         unsigned int nb_elements_in_db;
2995         int ret;
2996         bool is_record;
2997
2998         ldb = ldb_module_get_ctx(module);
2999
3000         if (key.length > 4 &&
3001             memcmp(key.data, "DN=@", 4) == 0) {
3002                 return 0;
3003         }
3004
3005         is_record = ldb_kv_key_is_record(key);
3006         if (is_record == false) {
3007                 return 0;
3008         }
3009
3010         msg = ldb_msg_new(module);
3011         if (msg == NULL) {
3012                 return -1;
3013         }
3014
3015         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
3016                                                    msg,
3017                                                    NULL, 0,
3018                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
3019                                                    &nb_elements_in_db);
3020         if (ret != 0) {
3021                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
3022                                                 ldb_dn_get_linearized(msg->dn));
3023                 ctx->error = ret;
3024                 talloc_free(msg);
3025                 return -1;
3026         }
3027
3028         if (msg->dn == NULL) {
3029                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3030                           "Refusing to re-index as GUID "
3031                           "key %*.*s with no DN\n",
3032                           (int)key.length, (int)key.length,
3033                           (char *)key.data);
3034                 talloc_free(msg);
3035                 return -1;
3036         }
3037
3038         ret = ldb_kv_index_onelevel(module, msg, 1);
3039         if (ret != LDB_SUCCESS) {
3040                 ldb_debug(ldb, LDB_DEBUG_ERROR,
3041                           "Adding special ONE LEVEL index failed (%s)!",
3042                                                 ldb_dn_get_linearized(msg->dn));
3043                 talloc_free(msg);
3044                 return -1;
3045         }
3046
3047         ret = ldb_kv_index_add_all(module, ldb_kv, msg);
3048
3049         if (ret != LDB_SUCCESS) {
3050                 ctx->error = ret;
3051                 talloc_free(msg);
3052                 return -1;
3053         }
3054
3055         talloc_free(msg);
3056
3057         ctx->count++;
3058         if (ctx->count % 10000 == 0) {
3059                 ldb_debug(ldb, LDB_DEBUG_WARNING,
3060                           "Reindexing: re-indexed %u records so far",
3061                           ctx->count);
3062         }
3063
3064         return 0;
3065 }
3066
3067 /*
3068   force a complete reindex of the database
3069 */
3070 int ldb_kv_reindex(struct ldb_module *module)
3071 {
3072         struct ldb_kv_private *ldb_kv = talloc_get_type(
3073             ldb_module_get_private(module), struct ldb_kv_private);
3074         int ret;
3075         struct ldb_kv_reindex_context ctx;
3076
3077         /*
3078          * Only triggered after a modification, but make clear we do
3079          * not re-index a read-only DB
3080          */
3081         if (ldb_kv->read_only) {
3082                 return LDB_ERR_UNWILLING_TO_PERFORM;
3083         }
3084
3085         if (ldb_kv_cache_reload(module) != 0) {
3086                 return LDB_ERR_OPERATIONS_ERROR;
3087         }
3088
3089         /*
3090          * Ensure we read (and so remove) the entries from the real
3091          * DB, no values stored so far are any use as we want to do a
3092          * re-index
3093          */
3094         ldb_kv_index_transaction_cancel(module);
3095
3096         ret = ldb_kv_index_transaction_start(module);
3097         if (ret != LDB_SUCCESS) {
3098                 return ret;
3099         }
3100
3101         /* first traverse the database deleting any @INDEX records by
3102          * putting NULL entries in the in-memory tdb
3103          */
3104         ret = ldb_kv->kv_ops->iterate(ldb_kv, delete_index, module);
3105         if (ret < 0) {
3106                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3107                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
3108                                        ldb_errstring(ldb));
3109                 return LDB_ERR_OPERATIONS_ERROR;
3110         }
3111
3112         ctx.module = module;
3113         ctx.error = 0;
3114         ctx.count = 0;
3115
3116         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_key, &ctx);
3117         if (ret < 0) {
3118                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3119                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
3120                                        ldb_errstring(ldb));
3121                 return LDB_ERR_OPERATIONS_ERROR;
3122         }
3123
3124         if (ctx.error != LDB_SUCCESS) {
3125                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3126                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3127                 return ctx.error;
3128         }
3129
3130         ctx.error = 0;
3131         ctx.count = 0;
3132
3133         /* now traverse adding any indexes for normal LDB records */
3134         ret = ldb_kv->kv_ops->iterate(ldb_kv, re_index, &ctx);
3135         if (ret < 0) {
3136                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3137                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
3138                                        ldb_errstring(ldb));
3139                 return LDB_ERR_OPERATIONS_ERROR;
3140         }
3141
3142         if (ctx.error != LDB_SUCCESS) {
3143                 struct ldb_context *ldb = ldb_module_get_ctx(module);
3144                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
3145                 return ctx.error;
3146         }
3147
3148         if (ctx.count > 10000) {
3149                 ldb_debug(ldb_module_get_ctx(module),
3150                           LDB_DEBUG_WARNING,
3151                           "Reindexing: re_index successful on %s, "
3152                           "final index write-out will be in transaction commit",
3153                           ldb_kv->kv_ops->name(ldb_kv));
3154         }
3155         return LDB_SUCCESS;
3156 }