ldb: Intersect the index from SCOPE_ONELEVEL with the index for the search expression
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 /*
35
36 LDB Index design and choice of TDB key:
37 =======================================
38
39 LDB has index records held as LDB objects with a special record like:
40
41 dn: @INDEX:attr:value
42
43 value may be base64 encoded, if it is deemed not printable:
44
45 dn: @INDEX:attr::base64-value
46
47 In each record, there is two possible formats:
48
49 The original format is:
50 -----------------------
51
52 dn: @INDEX:NAME:DNSUPDATEPROXY
53 @IDXVERSION: 2
54 @IDX: CN=DnsUpdateProxy,CN=Users,DC=addom,DC=samba,DC=example,DC=com
55
56 In this format, @IDX is multi-valued, one entry for each match
57
58 The corrosponding entry is stored in a TDB record with key:
59
60 DN=CN=DNSUPDATEPROXY,CN=USERS,DC=ADDOM,DC=SAMBA,DC=EXAMPLE,DC=COM
61
62 (This allows a scope BASE search to directly find the record via
63 a simple casefold of the DN).
64
65 The original mixed-case DN is stored in the entry iself.
66
67
68 The new 'GUID index' format is:
69 -------------------------------
70
71 dn: @INDEX:NAME:DNSUPDATEPROXY
72 @IDXVERSION: 3
73 @IDX: <binary GUID>[<binary GUID>[...]]
74
75 The binary guid is 16 bytes, as bytes and not expanded as hexidecimal
76 or pretty-printed.  The GUID is chosen from the message to be stored
77 by the @IDXGUID attribute on @INDEXLIST.
78
79 If there are multiple values the @IDX value simply becomes longer,
80 in multiples of 16.
81
82 The corrosponding entry is stored in a TDB record with key:
83
84 GUID=<binary GUID>
85
86 This allows a very quick translation between the fixed-length index
87 values and the TDB key, while seperating entries from other data
88 in the TDB, should they be unlucky enough to start with the bytes of
89 the 'DN=' prefix.
90
91 Additionally, this allows a scope BASE search to directly find the
92 record via a simple match on a GUID= extended DN, controlled via
93 @IDX_DN_GUID on @INDEXLIST
94
95 Exception for special @ DNs:
96
97 @BASEINFO, @INDEXLIST and all other special DNs are stored as per the
98 original format, as they are never referenced in an index and are used
99 to bootstrap the database.
100
101
102 Control points for choice of index mode
103 ---------------------------------------
104
105 The choice of index and TDB key mode is made based (for example, from
106 Samba) on entries in the @INDEXLIST DN:
107
108 dn: @INDEXLIST
109 @IDXGUID: objectGUID
110 @IDX_DN_GUID: GUID
111
112 By default, the original DN format is used.
113
114
115 Control points for choosing indexed attributes
116 ----------------------------------------------
117
118 @IDXATTR controls if an attribute is indexed
119
120 dn: @INDEXLIST
121 @IDXATTR: samAccountName
122 @IDXATTR: nETBIOSName
123
124
125 C Override functions
126 --------------------
127
128 void ldb_schema_set_override_GUID_index(struct ldb_context *ldb,
129                                         const char *GUID_index_attribute,
130                                         const char *GUID_index_dn_component)
131
132 This is used, particularly in combination with the below, instead of
133 the @IDXGUID and @IDX_DN_GUID values in @INDEXLIST.
134
135 void ldb_schema_set_override_indexlist(struct ldb_context *ldb,
136                                        bool one_level_indexes);
137 void ldb_schema_attribute_set_override_handler(struct ldb_context *ldb,
138                                                ldb_attribute_handler_override_fn_t override,
139                                                void *private_data);
140
141 When the above two functions are called in combination, the @INDEXLIST
142 values are not read from the DB, so
143 ldb_schema_set_override_GUID_index() must be called.
144
145 */
146
147 #include "ldb_tdb.h"
148 #include "ldb_private.h"
149 #include "lib/util/binsearch.h"
150
151 struct dn_list {
152         unsigned int count;
153         struct ldb_val *dn;
154         /*
155          * Do not optimise the intersection of this list,
156          * we must never return an entry not in this
157          * list.  This allows the index for
158          * SCOPE_ONELEVEL to be trusted.
159          */
160         bool strict;
161 };
162
163 struct ltdb_idxptr {
164         struct tdb_context *itdb;
165         int error;
166 };
167
168 static int ltdb_write_index_dn_guid(struct ldb_module *module,
169                                     const struct ldb_message *msg,
170                                     int add);
171 static int ltdb_index_dn_base_dn(struct ldb_module *module,
172                                  struct ltdb_private *ltdb,
173                                  struct ldb_dn *base_dn,
174                                  struct dn_list *dn_list);
175
176 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
177                               struct dn_list *list);
178
179 /* we put a @IDXVERSION attribute on index entries. This
180    allows us to tell if it was written by an older version
181 */
182 #define LTDB_INDEXING_VERSION 2
183
184 #define LTDB_GUID_INDEXING_VERSION 3
185
186 /* enable the idxptr mode when transactions start */
187 int ltdb_index_transaction_start(struct ldb_module *module)
188 {
189         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
190         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
191         if (ltdb->idxptr == NULL) {
192                 return ldb_oom(ldb_module_get_ctx(module));
193         }
194
195         return LDB_SUCCESS;
196 }
197
198 /*
199   see if two ldb_val structures contain exactly the same data
200   return -1 or 1 for a mismatch, 0 for match
201 */
202 static int ldb_val_equal_exact_for_qsort(const struct ldb_val *v1,
203                                          const struct ldb_val *v2)
204 {
205         if (v1->length > v2->length) {
206                 return -1;
207         }
208         if (v1->length < v2->length) {
209                 return 1;
210         }
211         return memcmp(v1->data, v2->data, v1->length);
212 }
213
214 /*
215   see if two ldb_val structures contain exactly the same data
216   return -1 or 1 for a mismatch, 0 for match
217 */
218 static int ldb_val_equal_exact_ordered(const struct ldb_val v1,
219                                        const struct ldb_val *v2)
220 {
221         if (v1.length > v2->length) {
222                 return -1;
223         }
224         if (v1.length < v2->length) {
225                 return 1;
226         }
227         return memcmp(v1.data, v2->data, v1.length);
228 }
229
230
231 /*
232   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
233   binary-safe comparison for the 'dn' returns -1 if not found
234
235   This is therefore safe when the value is a GUID in the future
236  */
237 static int ltdb_dn_list_find_val(struct ltdb_private *ltdb,
238                                  const struct dn_list *list,
239                                  const struct ldb_val *v)
240 {
241         unsigned int i;
242         struct ldb_val *exact = NULL, *next = NULL;
243
244         if (ltdb->cache->GUID_index_attribute == NULL) {
245                 for (i=0; i<list->count; i++) {
246                         if (ldb_val_equal_exact(&list->dn[i], v) == 1) {
247                                 return i;
248                         }
249                 }
250                 return -1;
251         }
252
253         BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
254                                 *v, ldb_val_equal_exact_ordered,
255                                 exact, next);
256         if (exact == NULL) {
257                 return -1;
258         }
259         /* Not required, but keeps the compiler quiet */
260         if (next != NULL) {
261                 return -1;
262         }
263
264         i = exact - list->dn;
265         return i;
266 }
267
268 /*
269   find a entry in a dn_list. Uses a case sensitive comparison with the dn
270   returns -1 if not found
271  */
272 static int ltdb_dn_list_find_msg(struct ltdb_private *ltdb,
273                                  struct dn_list *list,
274                                  const struct ldb_message *msg)
275 {
276         struct ldb_val v;
277         const struct ldb_val *key_val;
278         if (ltdb->cache->GUID_index_attribute == NULL) {
279                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
280                 v.data = discard_const_p(unsigned char, dn_str);
281                 v.length = strlen(dn_str);
282         } else {
283                 key_val = ldb_msg_find_ldb_val(msg,
284                                                ltdb->cache->GUID_index_attribute);
285                 if (key_val == NULL) {
286                         return -1;
287                 }
288                 v = *key_val;
289         }
290         return ltdb_dn_list_find_val(ltdb, list, &v);
291 }
292
293 /*
294   this is effectively a cast function, but with lots of paranoia
295   checks and also copes with CPUs that are fussy about pointer
296   alignment
297  */
298 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
299 {
300         struct dn_list *list;
301         if (rec.dsize != sizeof(void *)) {
302                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
303                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
304                 return NULL;
305         }
306         /* note that we can't just use a cast here, as rec.dptr may
307            not be aligned sufficiently for a pointer. A cast would cause
308            platforms like some ARM CPUs to crash */
309         memcpy(&list, rec.dptr, sizeof(void *));
310         list = talloc_get_type(list, struct dn_list);
311         if (list == NULL) {
312                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
313                                        "Bad type '%s' for idxptr",
314                                        talloc_get_name(list));
315                 return NULL;
316         }
317         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
318                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
319                                        "Bad parent '%s' for idxptr",
320                                        talloc_get_name(talloc_parent(list->dn)));
321                 return NULL;
322         }
323         return list;
324 }
325
326 /*
327   return the @IDX list in an index entry for a dn as a
328   struct dn_list
329  */
330 static int ltdb_dn_list_load(struct ldb_module *module,
331                              struct ltdb_private *ltdb,
332                              struct ldb_dn *dn, struct dn_list *list)
333 {
334         struct ldb_message *msg;
335         int ret, version;
336         struct ldb_message_element *el;
337         TDB_DATA rec;
338         struct dn_list *list2;
339         TDB_DATA key;
340
341         list->dn = NULL;
342         list->count = 0;
343
344         /* see if we have any in-memory index entries */
345         if (ltdb->idxptr == NULL ||
346             ltdb->idxptr->itdb == NULL) {
347                 goto normal_index;
348         }
349
350         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
351         key.dsize = strlen((char *)key.dptr);
352
353         rec = tdb_fetch(ltdb->idxptr->itdb, key);
354         if (rec.dptr == NULL) {
355                 goto normal_index;
356         }
357
358         /* we've found an in-memory index entry */
359         list2 = ltdb_index_idxptr(module, rec, true);
360         if (list2 == NULL) {
361                 free(rec.dptr);
362                 return LDB_ERR_OPERATIONS_ERROR;
363         }
364         free(rec.dptr);
365
366         *list = *list2;
367         return LDB_SUCCESS;
368
369 normal_index:
370         msg = ldb_msg_new(list);
371         if (msg == NULL) {
372                 return LDB_ERR_OPERATIONS_ERROR;
373         }
374
375         ret = ltdb_search_dn1(module, dn, msg,
376                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
377                               |LDB_UNPACK_DATA_FLAG_NO_DN);
378         if (ret != LDB_SUCCESS) {
379                 talloc_free(msg);
380                 return ret;
381         }
382
383         el = ldb_msg_find_element(msg, LTDB_IDX);
384         if (!el) {
385                 talloc_free(msg);
386                 return LDB_SUCCESS;
387         }
388
389         version = ldb_msg_find_attr_as_int(msg, LTDB_IDXVERSION, 0);
390
391         /*
392          * we avoid copying the strings by stealing the list.  We have
393          * to steal msg onto el->values (which looks odd) because we
394          * asked for the memory to be allocated on msg, not on each
395          * value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
396          */
397         if (ltdb->cache->GUID_index_attribute == NULL) {
398                 /* check indexing version number */
399                 if (version != LTDB_INDEXING_VERSION) {
400                         ldb_debug_set(ldb_module_get_ctx(module),
401                                       LDB_DEBUG_ERROR,
402                                       "Wrong DN index version %d "
403                                       "expected %d for %s",
404                                       version, LTDB_INDEXING_VERSION,
405                                       ldb_dn_get_linearized(dn));
406                         return LDB_ERR_OPERATIONS_ERROR;
407                 }
408
409                 talloc_steal(el->values, msg);
410                 list->dn = talloc_steal(list, el->values);
411                 list->count = el->num_values;
412         } else {
413                 unsigned int i;
414                 if (version != LTDB_GUID_INDEXING_VERSION) {
415                         /* This is quite likely during the DB startup
416                            on first upgrade to using a GUID index */
417                         ldb_debug_set(ldb_module_get_ctx(module),
418                                       LDB_DEBUG_ERROR,
419                                       "Wrong GUID index version %d "
420                                       "expected %d for %s",
421                                       version, LTDB_GUID_INDEXING_VERSION,
422                                       ldb_dn_get_linearized(dn));
423                         return LDB_ERR_OPERATIONS_ERROR;
424                 }
425
426                 if (el->num_values != 1) {
427                         return LDB_ERR_OPERATIONS_ERROR;
428                 }
429
430                 if ((el->values[0].length % LTDB_GUID_SIZE) != 0) {
431                         return LDB_ERR_OPERATIONS_ERROR;
432                 }
433
434                 list->count = el->values[0].length / LTDB_GUID_SIZE;
435                 list->dn = talloc_array(list, struct ldb_val, list->count);
436
437                 /*
438                  * The actual data is on msg, due to
439                  * LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
440                  */
441                 talloc_steal(list->dn, msg);
442                 for (i = 0; i < list->count; i++) {
443                         list->dn[i].data
444                                 = &el->values[0].data[i * LTDB_GUID_SIZE];
445                         list->dn[i].length = LTDB_GUID_SIZE;
446                 }
447         }
448
449         /* We don't need msg->elements any more */
450         talloc_free(msg->elements);
451         return LDB_SUCCESS;
452 }
453
454 int ltdb_key_dn_from_idx(struct ldb_module *module,
455                          struct ltdb_private *ltdb,
456                          TALLOC_CTX *mem_ctx,
457                          struct ldb_dn *dn,
458                          TDB_DATA *tdb_key)
459 {
460         struct ldb_context *ldb = ldb_module_get_ctx(module);
461         int ret;
462         struct dn_list *list = talloc(mem_ctx, struct dn_list);
463         if (list == NULL) {
464                 ldb_oom(ldb);
465                 return LDB_ERR_OPERATIONS_ERROR;
466         }
467
468         ret = ltdb_index_dn_base_dn(module, ltdb, dn, list);
469         if (ret != LDB_SUCCESS) {
470                 TALLOC_FREE(list);
471                 return ret;
472         }
473
474         if (list->count == 0) {
475                 TALLOC_FREE(list);
476                 return LDB_ERR_NO_SUCH_OBJECT;
477         }
478         if (list->count > 1) {
479                 const char *dn_str = ldb_dn_get_linearized(dn);
480                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
481                                        __location__
482                                        ": Failed to read DN index "
483                                        "against %s for %s: too many "
484                                        "values (%u > 1)",
485                                        ltdb->cache->GUID_index_attribute,
486                                        dn_str, list->count);
487                 TALLOC_FREE(list);
488                 return LDB_ERR_CONSTRAINT_VIOLATION;
489         }
490
491         /* The tdb_key memory is allocated by the caller */
492         ret = ltdb_guid_to_key(module, ltdb,
493                                &list->dn[0], tdb_key);
494         TALLOC_FREE(list);
495
496         if (ret != LDB_SUCCESS) {
497                 return LDB_ERR_OPERATIONS_ERROR;
498         }
499
500         return LDB_SUCCESS;
501 }
502
503
504
505 /*
506   save a dn_list into a full @IDX style record
507  */
508 static int ltdb_dn_list_store_full(struct ldb_module *module,
509                                    struct ltdb_private *ltdb,
510                                    struct ldb_dn *dn,
511                                    struct dn_list *list)
512 {
513         struct ldb_message *msg;
514         int ret;
515
516         msg = ldb_msg_new(module);
517         if (!msg) {
518                 return ldb_module_oom(module);
519         }
520
521         msg->dn = dn;
522
523         if (list->count == 0) {
524                 ret = ltdb_delete_noindex(module, msg);
525                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
526                         talloc_free(msg);
527                         return LDB_SUCCESS;
528                 }
529                 return ret;
530         }
531
532         if (ltdb->cache->GUID_index_attribute == NULL) {
533                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
534                                       LTDB_INDEXING_VERSION);
535                 if (ret != LDB_SUCCESS) {
536                         talloc_free(msg);
537                         return ldb_module_oom(module);
538                 }
539         } else {
540                 ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u",
541                                       LTDB_GUID_INDEXING_VERSION);
542                 if (ret != LDB_SUCCESS) {
543                         talloc_free(msg);
544                         return ldb_module_oom(module);
545                 }
546         }
547
548         if (list->count > 0) {
549                 struct ldb_message_element *el;
550
551                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
552                 if (ret != LDB_SUCCESS) {
553                         talloc_free(msg);
554                         return ldb_module_oom(module);
555                 }
556
557                 if (ltdb->cache->GUID_index_attribute == NULL) {
558                         el->values = list->dn;
559                         el->num_values = list->count;
560                 } else {
561                         struct ldb_val v;
562                         unsigned int i;
563                         el->values = talloc_array(msg,
564                                                   struct ldb_val, 1);
565                         if (el->values == NULL) {
566                                 talloc_free(msg);
567                                 return ldb_module_oom(module);
568                         }
569
570                         v.data = talloc_array_size(el->values,
571                                                    list->count,
572                                                    LTDB_GUID_SIZE);
573                         if (v.data == NULL) {
574                                 talloc_free(msg);
575                                 return ldb_module_oom(module);
576                         }
577
578                         v.length = talloc_get_size(v.data);
579
580                         for (i = 0; i < list->count; i++) {
581                                 if (list->dn[i].length !=
582                                     LTDB_GUID_SIZE) {
583                                         talloc_free(msg);
584                                         return ldb_module_operr(module);
585                                 }
586                                 memcpy(&v.data[LTDB_GUID_SIZE*i],
587                                        list->dn[i].data,
588                                        LTDB_GUID_SIZE);
589                         }
590                         el->values[0] = v;
591                         el->num_values = 1;
592                 }
593         }
594
595         ret = ltdb_store(module, msg, TDB_REPLACE);
596         talloc_free(msg);
597         return ret;
598 }
599
600 /*
601   save a dn_list into the database, in either @IDX or internal format
602  */
603 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn,
604                               struct dn_list *list)
605 {
606         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
607         TDB_DATA rec, key;
608         int ret;
609         struct dn_list *list2;
610
611         if (ltdb->idxptr == NULL) {
612                 return ltdb_dn_list_store_full(module, ltdb,
613                                                dn, list);
614         }
615
616         if (ltdb->idxptr->itdb == NULL) {
617                 ltdb->idxptr->itdb = tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
618                 if (ltdb->idxptr->itdb == NULL) {
619                         return LDB_ERR_OPERATIONS_ERROR;
620                 }
621         }
622
623         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
624         key.dsize = strlen((char *)key.dptr);
625
626         rec = tdb_fetch(ltdb->idxptr->itdb, key);
627         if (rec.dptr != NULL) {
628                 list2 = ltdb_index_idxptr(module, rec, false);
629                 if (list2 == NULL) {
630                         free(rec.dptr);
631                         return LDB_ERR_OPERATIONS_ERROR;
632                 }
633                 free(rec.dptr);
634                 list2->dn = talloc_steal(list2, list->dn);
635                 list2->count = list->count;
636                 return LDB_SUCCESS;
637         }
638
639         list2 = talloc(ltdb->idxptr, struct dn_list);
640         if (list2 == NULL) {
641                 return LDB_ERR_OPERATIONS_ERROR;
642         }
643         list2->dn = talloc_steal(list2, list->dn);
644         list2->count = list->count;
645
646         rec.dptr = (uint8_t *)&list2;
647         rec.dsize = sizeof(void *);
648
649
650         /*
651          * This is not a store into the main DB, but into an in-memory
652          * TDB, so we don't need a guard on ltdb->read_only
653          */
654         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
655         if (ret != 0) {
656                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
657         }
658         return LDB_SUCCESS;
659 }
660
661 /*
662   traverse function for storing the in-memory index entries on disk
663  */
664 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
665 {
666         struct ldb_module *module = state;
667         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
668         struct ldb_dn *dn;
669         struct ldb_context *ldb = ldb_module_get_ctx(module);
670         struct ldb_val v;
671         struct dn_list *list;
672
673         list = ltdb_index_idxptr(module, data, true);
674         if (list == NULL) {
675                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
676                 return -1;
677         }
678
679         v.data = key.dptr;
680         v.length = strnlen((char *)key.dptr, key.dsize);
681
682         dn = ldb_dn_from_ldb_val(module, ldb, &v);
683         if (dn == NULL) {
684                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
685                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
686                 return -1;
687         }
688
689         ltdb->idxptr->error = ltdb_dn_list_store_full(module, ltdb,
690                                                       dn, list);
691         talloc_free(dn);
692         if (ltdb->idxptr->error != 0) {
693                 return -1;
694         }
695         return 0;
696 }
697
698 /* cleanup the idxptr mode when transaction commits */
699 int ltdb_index_transaction_commit(struct ldb_module *module)
700 {
701         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
702         int ret;
703
704         struct ldb_context *ldb = ldb_module_get_ctx(module);
705
706         ldb_reset_err_string(ldb);
707
708         if (ltdb->idxptr->itdb) {
709                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
710                 tdb_close(ltdb->idxptr->itdb);
711         }
712
713         ret = ltdb->idxptr->error;
714         if (ret != LDB_SUCCESS) {
715                 if (!ldb_errstring(ldb)) {
716                         ldb_set_errstring(ldb, ldb_strerror(ret));
717                 }
718                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
719         }
720
721         talloc_free(ltdb->idxptr);
722         ltdb->idxptr = NULL;
723         return ret;
724 }
725
726 /* cleanup the idxptr mode when transaction cancels */
727 int ltdb_index_transaction_cancel(struct ldb_module *module)
728 {
729         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
730         if (ltdb->idxptr && ltdb->idxptr->itdb) {
731                 tdb_close(ltdb->idxptr->itdb);
732         }
733         talloc_free(ltdb->idxptr);
734         ltdb->idxptr = NULL;
735         return LDB_SUCCESS;
736 }
737
738
739 /*
740   return the dn key to be used for an index
741   the caller is responsible for freeing
742 */
743 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
744                                      struct ltdb_private *ltdb,
745                                      const char *attr, const struct ldb_val *value,
746                                      const struct ldb_schema_attribute **ap)
747 {
748         struct ldb_dn *ret;
749         struct ldb_val v;
750         const struct ldb_schema_attribute *a = NULL;
751         char *attr_folded = NULL;
752         const char *attr_for_dn = NULL;
753         int r;
754         bool should_b64_encode;
755
756         if (attr[0] == '@') {
757                 attr_for_dn = attr;
758                 v = *value;
759                 if (ap != NULL) {
760                         *ap = NULL;
761                 }
762         } else {
763                 attr_folded = ldb_attr_casefold(ldb, attr);
764                 if (!attr_folded) {
765                         return NULL;
766                 }
767
768                 attr_for_dn = attr_folded;
769
770                 a = ldb_schema_attribute_by_name(ldb, attr);
771                 if (ap) {
772                         *ap = a;
773                 }
774                 r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
775                 if (r != LDB_SUCCESS) {
776                         const char *errstr = ldb_errstring(ldb);
777                         /* canonicalisation can be refused. For
778                            example, a attribute that takes wildcards
779                            will refuse to canonicalise if the value
780                            contains a wildcard */
781                         ldb_asprintf_errstring(ldb,
782                                                "Failed to create index "
783                                                "key for attribute '%s':%s%s%s",
784                                                attr, ldb_strerror(r),
785                                                (errstr?":":""),
786                                                (errstr?errstr:""));
787                         talloc_free(attr_folded);
788                         return NULL;
789                 }
790         }
791
792         /*
793          * We do not base 64 encode a DN in a key, it has already been
794          * casefold and lineraized, that is good enough.  That already
795          * avoids embedded NUL etc.
796          */
797         if (ltdb->cache->GUID_index_attribute != NULL) {
798                 if (strcmp(attr, LTDB_IDXDN) == 0) {
799                         should_b64_encode = false;
800                 } else if (strcmp(attr, LTDB_IDXONE) == 0) {
801                         /*
802                          * We can only change the behaviour for IDXONE
803                          * when the GUID index is enabled
804                          */
805                         should_b64_encode = false;
806                 } else {
807                         should_b64_encode
808                                 = ldb_should_b64_encode(ldb, &v);
809                 }
810         } else {
811                 should_b64_encode = ldb_should_b64_encode(ldb, &v);
812         }
813
814         if (should_b64_encode) {
815                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
816                 if (!vstr) {
817                         talloc_free(attr_folded);
818                         return NULL;
819                 }
820                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX,
821                                      attr_for_dn, vstr);
822                 talloc_free(vstr);
823         } else {
824                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX,
825                                      attr_for_dn,
826                                      (int)v.length, (char *)v.data);
827         }
828
829         if (v.data != value->data) {
830                 talloc_free(v.data);
831         }
832         talloc_free(attr_folded);
833
834         return ret;
835 }
836
837 /*
838   see if a attribute value is in the list of indexed attributes
839 */
840 static bool ltdb_is_indexed(struct ldb_module *module,
841                             struct ltdb_private *ltdb,
842                             const char *attr)
843 {
844         struct ldb_context *ldb = ldb_module_get_ctx(module);
845         unsigned int i;
846         struct ldb_message_element *el;
847
848         if ((ltdb->cache->GUID_index_attribute != NULL) &&
849             (ldb_attr_cmp(attr,
850                           ltdb->cache->GUID_index_attribute) == 0)) {
851                 /* Implicity covered, this is the index key */
852                 return false;
853         }
854         if (ldb->schema.index_handler_override) {
855                 const struct ldb_schema_attribute *a
856                         = ldb_schema_attribute_by_name(ldb, attr);
857
858                 if (a == NULL) {
859                         return false;
860                 }
861
862                 if (a->flags & LDB_ATTR_FLAG_INDEXED) {
863                         return true;
864                 } else {
865                         return false;
866                 }
867         }
868
869         if (!ltdb->cache->attribute_indexes) {
870                 return false;
871         }
872
873         el = ldb_msg_find_element(ltdb->cache->indexlist, LTDB_IDXATTR);
874         if (el == NULL) {
875                 return false;
876         }
877
878         /* TODO: this is too expensive! At least use a binary search */
879         for (i=0; i<el->num_values; i++) {
880                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
881                         return true;
882                 }
883         }
884         return false;
885 }
886
887 /*
888   in the following logic functions, the return value is treated as
889   follows:
890
891      LDB_SUCCESS: we found some matching index values
892
893      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
894
895      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
896                                we'll need a full search
897  */
898
899 /*
900   return a list of dn's that might match a simple indexed search (an
901   equality search only)
902  */
903 static int ltdb_index_dn_simple(struct ldb_module *module,
904                                 struct ltdb_private *ltdb,
905                                 const struct ldb_parse_tree *tree,
906                                 struct dn_list *list)
907 {
908         struct ldb_context *ldb;
909         struct ldb_dn *dn;
910         int ret;
911
912         ldb = ldb_module_get_ctx(module);
913
914         list->count = 0;
915         list->dn = NULL;
916
917         /* if the attribute isn't in the list of indexed attributes then
918            this node needs a full search */
919         if (!ltdb_is_indexed(module, ltdb, tree->u.equality.attr)) {
920                 return LDB_ERR_OPERATIONS_ERROR;
921         }
922
923         /* the attribute is indexed. Pull the list of DNs that match the
924            search criterion */
925         dn = ltdb_index_key(ldb, ltdb,
926                             tree->u.equality.attr,
927                             &tree->u.equality.value, NULL);
928         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
929
930         ret = ltdb_dn_list_load(module, ltdb, dn, list);
931         talloc_free(dn);
932         return ret;
933 }
934
935
936 static bool list_union(struct ldb_context *ldb,
937                        struct ltdb_private *ltdb,
938                        struct dn_list *list, struct dn_list *list2);
939
940 /*
941   return a list of dn's that might match a leaf indexed search
942  */
943 static int ltdb_index_dn_leaf(struct ldb_module *module,
944                               struct ltdb_private *ltdb,
945                               const struct ldb_parse_tree *tree,
946                               struct dn_list *list)
947 {
948         if (ltdb->disallow_dn_filter &&
949             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
950                 /* in AD mode we do not support "(dn=...)" search filters */
951                 list->dn = NULL;
952                 list->count = 0;
953                 return LDB_SUCCESS;
954         }
955         if (tree->u.equality.attr[0] == '@') {
956                 /* Do not allow a indexed search against an @ */
957                 list->dn = NULL;
958                 list->count = 0;
959                 return LDB_SUCCESS;
960         }
961         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
962                 struct ldb_dn *dn
963                         = ldb_dn_from_ldb_val(list,
964                                               ldb_module_get_ctx(module),
965                                               &tree->u.equality.value);
966                 if (dn == NULL) {
967                         /* If we can't parse it, no match */
968                         list->dn = NULL;
969                         list->count = 0;
970                         return LDB_SUCCESS;
971                 }
972
973                 /*
974                  * Re-use the same code we use for a SCOPE_BASE
975                  * search
976                  *
977                  * We can't call TALLOC_FREE(dn) as this must belong
978                  * to list for the memory to remain valid.
979                  */
980                 return ltdb_index_dn_base_dn(module, ltdb, dn, list);
981
982         } else if ((ltdb->cache->GUID_index_attribute != NULL) &&
983                    (ldb_attr_cmp(tree->u.equality.attr,
984                                  ltdb->cache->GUID_index_attribute) == 0)) {
985                 int ret;
986                 struct ldb_context *ldb = ldb_module_get_ctx(module);
987                 list->dn = talloc_array(list, struct ldb_val, 1);
988                 if (list->dn == NULL) {
989                         ldb_module_oom(module);
990                         return LDB_ERR_OPERATIONS_ERROR;
991                 }
992                 /*
993                  * We need to go via the canonicalise_fn() to
994                  * ensure we get the index in binary, rather
995                  * than a string
996                  */
997                 ret = ltdb->GUID_index_syntax->canonicalise_fn(ldb,
998                                                                list->dn,
999                                                                &tree->u.equality.value,
1000                                                                &list->dn[0]);
1001                 if (ret != LDB_SUCCESS) {
1002                         return LDB_ERR_OPERATIONS_ERROR;
1003                 }
1004                 list->count = 1;
1005                 return LDB_SUCCESS;
1006         }
1007
1008         return ltdb_index_dn_simple(module, ltdb, tree, list);
1009 }
1010
1011
1012 /*
1013   list intersection
1014   list = list & list2
1015 */
1016 static bool list_intersect(struct ldb_context *ldb,
1017                            struct ltdb_private *ltdb,
1018                            struct dn_list *list, const struct dn_list *list2)
1019 {
1020         const struct dn_list *short_list, *long_list;
1021         struct dn_list *list3;
1022         unsigned int i;
1023
1024         if (list->count == 0) {
1025                 /* 0 & X == 0 */
1026                 return true;
1027         }
1028         if (list2->count == 0) {
1029                 /* X & 0 == 0 */
1030                 list->count = 0;
1031                 list->dn = NULL;
1032                 return true;
1033         }
1034
1035         /* the indexing code is allowed to return a longer list than
1036            what really matches, as all results are filtered by the
1037            full expression at the end - this shortcut avoids a lot of
1038            work in some cases */
1039         if (list->count < 2 && list2->count > 10 && list2->strict == false) {
1040                 return true;
1041         }
1042         if (list2->count < 2 && list->count > 10 && list->strict == false) {
1043                 list->count = list2->count;
1044                 list->dn = list2->dn;
1045                 /* note that list2 may not be the parent of list2->dn,
1046                    as list2->dn may be owned by ltdb->idxptr. In that
1047                    case we expect this reparent call to fail, which is
1048                    OK */
1049                 talloc_reparent(list2, list, list2->dn);
1050                 return true;
1051         }
1052
1053         if (list->count > list2->count) {
1054                 short_list = list2;
1055                 long_list = list;
1056         } else {
1057                 short_list = list;
1058                 long_list = list2;
1059         }
1060
1061         list3 = talloc_zero(list, struct dn_list);
1062         if (list3 == NULL) {
1063                 return false;
1064         }
1065
1066         list3->dn = talloc_array(list3, struct ldb_val,
1067                                  MIN(list->count, list2->count));
1068         if (!list3->dn) {
1069                 talloc_free(list3);
1070                 return false;
1071         }
1072         list3->count = 0;
1073
1074         for (i=0;i<short_list->count;i++) {
1075                 /* For the GUID index case, this is a binary search */
1076                 if (ltdb_dn_list_find_val(ltdb, long_list,
1077                                           &short_list->dn[i]) != -1) {
1078                         list3->dn[list3->count] = short_list->dn[i];
1079                         list3->count++;
1080                 }
1081         }
1082
1083         list->strict |= list2->strict;
1084         list->dn = talloc_steal(list, list3->dn);
1085         list->count = list3->count;
1086         talloc_free(list3);
1087
1088         return true;
1089 }
1090
1091
1092 /*
1093   list union
1094   list = list | list2
1095 */
1096 static bool list_union(struct ldb_context *ldb,
1097                        struct ltdb_private *ltdb,
1098                        struct dn_list *list, struct dn_list *list2)
1099 {
1100         struct ldb_val *dn3;
1101         unsigned int i = 0, j = 0, k = 0;
1102
1103         if (list2->count == 0) {
1104                 /* X | 0 == X */
1105                 return true;
1106         }
1107
1108         if (list->count == 0) {
1109                 /* 0 | X == X */
1110                 list->count = list2->count;
1111                 list->dn = list2->dn;
1112                 /* note that list2 may not be the parent of list2->dn,
1113                    as list2->dn may be owned by ltdb->idxptr. In that
1114                    case we expect this reparent call to fail, which is
1115                    OK */
1116                 talloc_reparent(list2, list, list2->dn);
1117                 return true;
1118         }
1119
1120         /*
1121          * Sort the lists (if not in GUID DN mode) so we can do
1122          * the de-duplication during the merge
1123          */
1124         ltdb_dn_list_sort(ltdb, list);
1125         ltdb_dn_list_sort(ltdb, list2);
1126
1127         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
1128         if (!dn3) {
1129                 ldb_oom(ldb);
1130                 return false;
1131         }
1132
1133         while (i < list->count || j < list2->count) {
1134                 int cmp;
1135                 if (i >= list->count) {
1136                         cmp = 1;
1137                 } else if (j >= list2->count) {
1138                         cmp = -1;
1139                 } else {
1140                         cmp = ldb_val_equal_exact_ordered(list->dn[i],
1141                                                           &list2->dn[j]);
1142                 }
1143
1144                 if (cmp < 0) {
1145                         /* Take list */
1146                         dn3[k] = list->dn[i];
1147                         i++;
1148                         k++;
1149                 } else if (cmp > 0) {
1150                         /* Take list2 */
1151                         dn3[k] = list2->dn[j];
1152                         j++;
1153                         k++;
1154                 } else {
1155                         /* Equal, take list */
1156                         dn3[k] = list->dn[i];
1157                         i++;
1158                         j++;
1159                         k++;
1160                 }
1161         }
1162
1163         list->dn = dn3;
1164         list->count = k;
1165
1166         return true;
1167 }
1168
1169 static int ltdb_index_dn(struct ldb_module *module,
1170                          struct ltdb_private *ltdb,
1171                          const struct ldb_parse_tree *tree,
1172                          struct dn_list *list);
1173
1174
1175 /*
1176   process an OR list (a union)
1177  */
1178 static int ltdb_index_dn_or(struct ldb_module *module,
1179                             struct ltdb_private *ltdb,
1180                             const struct ldb_parse_tree *tree,
1181                             struct dn_list *list)
1182 {
1183         struct ldb_context *ldb;
1184         unsigned int i;
1185
1186         ldb = ldb_module_get_ctx(module);
1187
1188         list->dn = NULL;
1189         list->count = 0;
1190
1191         for (i=0; i<tree->u.list.num_elements; i++) {
1192                 struct dn_list *list2;
1193                 int ret;
1194
1195                 list2 = talloc_zero(list, struct dn_list);
1196                 if (list2 == NULL) {
1197                         return LDB_ERR_OPERATIONS_ERROR;
1198                 }
1199
1200                 ret = ltdb_index_dn(module, ltdb,
1201                                     tree->u.list.elements[i], list2);
1202
1203                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1204                         /* X || 0 == X */
1205                         talloc_free(list2);
1206                         continue;
1207                 }
1208
1209                 if (ret != LDB_SUCCESS) {
1210                         /* X || * == * */
1211                         talloc_free(list2);
1212                         return ret;
1213                 }
1214
1215                 if (!list_union(ldb, ltdb, list, list2)) {
1216                         talloc_free(list2);
1217                         return LDB_ERR_OPERATIONS_ERROR;
1218                 }
1219         }
1220
1221         if (list->count == 0) {
1222                 return LDB_ERR_NO_SUCH_OBJECT;
1223         }
1224
1225         return LDB_SUCCESS;
1226 }
1227
1228
1229 /*
1230   NOT an index results
1231  */
1232 static int ltdb_index_dn_not(struct ldb_module *module,
1233                              struct ltdb_private *ltdb,
1234                              const struct ldb_parse_tree *tree,
1235                              struct dn_list *list)
1236 {
1237         /* the only way to do an indexed not would be if we could
1238            negate the not via another not or if we knew the total
1239            number of database elements so we could know that the
1240            existing expression covered the whole database.
1241
1242            instead, we just give up, and rely on a full index scan
1243            (unless an outer & manages to reduce the list)
1244         */
1245         return LDB_ERR_OPERATIONS_ERROR;
1246 }
1247
1248 /*
1249  * These things are unique, so avoid a full scan if this is a search
1250  * by GUID, DN or a unique attribute
1251  */
1252 static bool ltdb_index_unique(struct ldb_context *ldb,
1253                               struct ltdb_private *ltdb,
1254                               const char *attr)
1255 {
1256         const struct ldb_schema_attribute *a;
1257         if (ltdb->cache->GUID_index_attribute != NULL) {
1258                 if (ldb_attr_cmp(attr, ltdb->cache->GUID_index_attribute) == 0) {
1259                         return true;
1260                 }
1261         }
1262         if (ldb_attr_dn(attr) == 0) {
1263                 return true;
1264         }
1265
1266         a = ldb_schema_attribute_by_name(ldb, attr);
1267         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1268                 return true;
1269         }
1270         return false;
1271 }
1272
1273 /*
1274   process an AND expression (intersection)
1275  */
1276 static int ltdb_index_dn_and(struct ldb_module *module,
1277                              struct ltdb_private *ltdb,
1278                              const struct ldb_parse_tree *tree,
1279                              struct dn_list *list)
1280 {
1281         struct ldb_context *ldb;
1282         unsigned int i;
1283         bool found;
1284
1285         ldb = ldb_module_get_ctx(module);
1286
1287         list->dn = NULL;
1288         list->count = 0;
1289
1290         /* in the first pass we only look for unique simple
1291            equality tests, in the hope of avoiding having to look
1292            at any others */
1293         for (i=0; i<tree->u.list.num_elements; i++) {
1294                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1295                 int ret;
1296
1297                 if (subtree->operation != LDB_OP_EQUALITY ||
1298                     !ltdb_index_unique(ldb, ltdb,
1299                                        subtree->u.equality.attr)) {
1300                         continue;
1301                 }
1302
1303                 ret = ltdb_index_dn(module, ltdb, subtree, list);
1304                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1305                         /* 0 && X == 0 */
1306                         return LDB_ERR_NO_SUCH_OBJECT;
1307                 }
1308                 if (ret == LDB_SUCCESS) {
1309                         /* a unique index match means we can
1310                          * stop. Note that we don't care if we return
1311                          * a few too many objects, due to later
1312                          * filtering */
1313                         return LDB_SUCCESS;
1314                 }
1315         }
1316
1317         /* now do a full intersection */
1318         found = false;
1319
1320         for (i=0; i<tree->u.list.num_elements; i++) {
1321                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
1322                 struct dn_list *list2;
1323                 int ret;
1324
1325                 list2 = talloc_zero(list, struct dn_list);
1326                 if (list2 == NULL) {
1327                         return ldb_module_oom(module);
1328                 }
1329
1330                 ret = ltdb_index_dn(module, ltdb, subtree, list2);
1331
1332                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1333                         /* X && 0 == 0 */
1334                         list->dn = NULL;
1335                         list->count = 0;
1336                         talloc_free(list2);
1337                         return LDB_ERR_NO_SUCH_OBJECT;
1338                 }
1339
1340                 if (ret != LDB_SUCCESS) {
1341                         /* this didn't adding anything */
1342                         talloc_free(list2);
1343                         continue;
1344                 }
1345
1346                 if (!found) {
1347                         talloc_reparent(list2, list, list->dn);
1348                         list->dn = list2->dn;
1349                         list->count = list2->count;
1350                         found = true;
1351                 } else if (!list_intersect(ldb, ltdb,
1352                                            list, list2)) {
1353                         talloc_free(list2);
1354                         return LDB_ERR_OPERATIONS_ERROR;
1355                 }
1356
1357                 if (list->count == 0) {
1358                         list->dn = NULL;
1359                         return LDB_ERR_NO_SUCH_OBJECT;
1360                 }
1361
1362                 if (list->count < 2) {
1363                         /* it isn't worth loading the next part of the tree */
1364                         return LDB_SUCCESS;
1365                 }
1366         }
1367
1368         if (!found) {
1369                 /* none of the attributes were indexed */
1370                 return LDB_ERR_OPERATIONS_ERROR;
1371         }
1372
1373         return LDB_SUCCESS;
1374 }
1375
1376 /*
1377   return a list of matching objects using a one-level index
1378  */
1379 static int ltdb_index_dn_attr(struct ldb_module *module,
1380                               struct ltdb_private *ltdb,
1381                               const char *attr,
1382                               struct ldb_dn *dn,
1383                               struct dn_list *list)
1384 {
1385         struct ldb_context *ldb;
1386         struct ldb_dn *key;
1387         struct ldb_val val;
1388         int ret;
1389
1390         ldb = ldb_module_get_ctx(module);
1391
1392         /* work out the index key from the parent DN */
1393         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1394         val.length = strlen((char *)val.data);
1395         key = ltdb_index_key(ldb, ltdb, attr, &val, NULL);
1396         if (!key) {
1397                 ldb_oom(ldb);
1398                 return LDB_ERR_OPERATIONS_ERROR;
1399         }
1400
1401         ret = ltdb_dn_list_load(module, ltdb, key, list);
1402         talloc_free(key);
1403         if (ret != LDB_SUCCESS) {
1404                 return ret;
1405         }
1406
1407         if (list->count == 0) {
1408                 return LDB_ERR_NO_SUCH_OBJECT;
1409         }
1410
1411         return LDB_SUCCESS;
1412 }
1413
1414 /*
1415   return a list of matching objects using a one-level index
1416  */
1417 static int ltdb_index_dn_one(struct ldb_module *module,
1418                              struct ltdb_private *ltdb,
1419                              struct ldb_dn *parent_dn,
1420                              struct dn_list *list)
1421 {
1422         /* Ensure we do not shortcut on intersection for this list */
1423         list->strict = true;
1424         return ltdb_index_dn_attr(module, ltdb,
1425                                   LTDB_IDXONE, parent_dn, list);
1426 }
1427
1428 /*
1429   return a list of matching objects using the DN index
1430  */
1431 static int ltdb_index_dn_base_dn(struct ldb_module *module,
1432                                  struct ltdb_private *ltdb,
1433                                  struct ldb_dn *base_dn,
1434                                  struct dn_list *dn_list)
1435 {
1436         const struct ldb_val *guid_val = NULL;
1437         if (ltdb->cache->GUID_index_attribute == NULL) {
1438                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1439                 if (dn_list->dn == NULL) {
1440                         return ldb_module_oom(module);
1441                 }
1442                 dn_list->dn[0].data = discard_const_p(unsigned char,
1443                                                       ldb_dn_get_linearized(base_dn));
1444                 if (dn_list->dn[0].data == NULL) {
1445                         return ldb_module_oom(module);
1446                 }
1447                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1448                 dn_list->count = 1;
1449
1450                 return LDB_SUCCESS;
1451         }
1452
1453         if (ltdb->cache->GUID_index_dn_component != NULL) {
1454                 guid_val = ldb_dn_get_extended_component(base_dn,
1455                                                          ltdb->cache->GUID_index_dn_component);
1456         }
1457
1458         if (guid_val != NULL) {
1459                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1460                 if (dn_list->dn == NULL) {
1461                         return ldb_module_oom(module);
1462                 }
1463                 dn_list->dn[0].data = guid_val->data;
1464                 dn_list->dn[0].length = guid_val->length;
1465                 dn_list->count = 1;
1466
1467                 return LDB_SUCCESS;
1468         }
1469
1470         return ltdb_index_dn_attr(module, ltdb,
1471                                   LTDB_IDXDN, base_dn, dn_list);
1472 }
1473
1474 /*
1475   return a list of dn's that might match a indexed search or
1476   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
1477  */
1478 static int ltdb_index_dn(struct ldb_module *module,
1479                          struct ltdb_private *ltdb,
1480                          const struct ldb_parse_tree *tree,
1481                          struct dn_list *list)
1482 {
1483         int ret = LDB_ERR_OPERATIONS_ERROR;
1484
1485         switch (tree->operation) {
1486         case LDB_OP_AND:
1487                 ret = ltdb_index_dn_and(module, ltdb, tree, list);
1488                 break;
1489
1490         case LDB_OP_OR:
1491                 ret = ltdb_index_dn_or(module, ltdb, tree, list);
1492                 break;
1493
1494         case LDB_OP_NOT:
1495                 ret = ltdb_index_dn_not(module, ltdb, tree, list);
1496                 break;
1497
1498         case LDB_OP_EQUALITY:
1499                 ret = ltdb_index_dn_leaf(module, ltdb, tree, list);
1500                 break;
1501
1502         case LDB_OP_SUBSTRING:
1503         case LDB_OP_GREATER:
1504         case LDB_OP_LESS:
1505         case LDB_OP_PRESENT:
1506         case LDB_OP_APPROX:
1507         case LDB_OP_EXTENDED:
1508                 /* we can't index with fancy bitops yet */
1509                 ret = LDB_ERR_OPERATIONS_ERROR;
1510                 break;
1511         }
1512
1513         return ret;
1514 }
1515
1516 /*
1517   filter a candidate dn_list from an indexed search into a set of results
1518   extracting just the given attributes
1519 */
1520 static int ltdb_index_filter(struct ltdb_private *ltdb,
1521                              const struct dn_list *dn_list,
1522                              struct ltdb_context *ac,
1523                              uint32_t *match_count)
1524 {
1525         struct ldb_context *ldb;
1526         struct ldb_message *msg;
1527         struct ldb_message *filtered_msg;
1528         unsigned int i;
1529
1530         ldb = ldb_module_get_ctx(ac->module);
1531
1532         for (i = 0; i < dn_list->count; i++) {
1533                 uint8_t guid_key[LTDB_GUID_KEY_SIZE];
1534                 TDB_DATA tdb_key = {
1535                         .dptr = guid_key,
1536                         .dsize = sizeof(guid_key)
1537                 };
1538                 int ret;
1539                 bool matched;
1540
1541                 msg = ldb_msg_new(ac);
1542                 if (!msg) {
1543                         return LDB_ERR_OPERATIONS_ERROR;
1544                 }
1545
1546                 ret = ltdb_idx_to_key(ac->module, ltdb,
1547                                       ac, &dn_list->dn[i],
1548                                       &tdb_key);
1549                 if (ret != LDB_SUCCESS) {
1550                         return ret;
1551                 }
1552
1553                 ret = ltdb_search_key(ac->module, ltdb,
1554                                       tdb_key, msg,
1555                                       LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC|
1556                                       LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC);
1557                 if (tdb_key.dptr != guid_key) {
1558                         TALLOC_FREE(tdb_key.dptr);
1559                 }
1560                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1561                         /* the record has disappeared? yes, this can happen */
1562                         talloc_free(msg);
1563                         continue;
1564                 }
1565
1566                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1567                         /* an internal error */
1568                         talloc_free(msg);
1569                         return LDB_ERR_OPERATIONS_ERROR;
1570                 }
1571
1572                 /* We trust the index for SCOPE_ONELEVEL and SCOPE_BASE */
1573                 if ((ac->scope == LDB_SCOPE_ONELEVEL
1574                      && ltdb->cache->one_level_indexes)
1575                     || ac->scope == LDB_SCOPE_BASE) {
1576                         ret = ldb_match_message(ldb, msg, ac->tree,
1577                                                 ac->scope, &matched);
1578                 } else {
1579                         ret = ldb_match_msg_error(ldb, msg,
1580                                                   ac->tree, ac->base,
1581                                                   ac->scope, &matched);
1582                 }
1583
1584                 if (ret != LDB_SUCCESS) {
1585                         talloc_free(msg);
1586                         return ret;
1587                 }
1588                 if (!matched) {
1589                         talloc_free(msg);
1590                         continue;
1591                 }
1592
1593                 /* filter the attributes that the user wants */
1594                 ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
1595
1596                 talloc_free(msg);
1597
1598                 if (ret == -1) {
1599                         return LDB_ERR_OPERATIONS_ERROR;
1600                 }
1601
1602                 ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
1603                 if (ret != LDB_SUCCESS) {
1604                         /* Regardless of success or failure, the msg
1605                          * is the callbacks responsiblity, and should
1606                          * not be talloc_free()'ed */
1607                         ac->request_terminated = true;
1608                         return ret;
1609                 }
1610
1611                 (*match_count)++;
1612         }
1613
1614         return LDB_SUCCESS;
1615 }
1616
1617 /*
1618   sort a DN list
1619  */
1620 static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
1621                               struct dn_list *list)
1622 {
1623         if (list->count < 2) {
1624                 return;
1625         }
1626
1627         /* We know the list is sorted when using the GUID index */
1628         if (ltdb->cache->GUID_index_attribute != NULL) {
1629                 return;
1630         }
1631
1632         TYPESAFE_QSORT(list->dn, list->count,
1633                        ldb_val_equal_exact_for_qsort);
1634 }
1635
1636 /*
1637   search the database with a LDAP-like expression using indexes
1638   returns -1 if an indexed search is not possible, in which
1639   case the caller should call ltdb_search_full()
1640 */
1641 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1642 {
1643         struct ldb_context *ldb = ldb_module_get_ctx(ac->module);
1644         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1645         struct dn_list *dn_list;
1646         int ret;
1647         enum ldb_scope index_scope;
1648
1649         /* see if indexing is enabled */
1650         if (!ltdb->cache->attribute_indexes &&
1651             !ltdb->cache->one_level_indexes &&
1652             ac->scope != LDB_SCOPE_BASE) {
1653                 /* fallback to a full search */
1654                 return LDB_ERR_OPERATIONS_ERROR;
1655         }
1656
1657         dn_list = talloc_zero(ac, struct dn_list);
1658         if (dn_list == NULL) {
1659                 return ldb_module_oom(ac->module);
1660         }
1661
1662         /*
1663          * For the purposes of selecting the switch arm below, if we
1664          * don't have a one-level index then treat it like a subtree
1665          * search
1666          */
1667         if (ac->scope == LDB_SCOPE_ONELEVEL &&
1668             !ltdb->cache->one_level_indexes) {
1669                 index_scope = LDB_SCOPE_SUBTREE;
1670         } else {
1671                 index_scope = ac->scope;
1672         }
1673
1674         switch (index_scope) {
1675         case LDB_SCOPE_BASE:
1676                 /*
1677                  * If we ever start to also load the index values for
1678                  * the tree, we must ensure we strictly intersect with
1679                  * this list, as we trust the BASE index
1680                  */
1681                 ret = ltdb_index_dn_base_dn(ac->module, ltdb,
1682                                             ac->base, dn_list);
1683                 if (ret != LDB_SUCCESS) {
1684                         talloc_free(dn_list);
1685                         return ret;
1686                 }
1687                 break;
1688
1689         case LDB_SCOPE_ONELEVEL:
1690                 /*
1691                  * If we ever start to also load the index values for
1692                  * the tree, we must ensure we strictly intersect with
1693                  * this list, as we trust the ONELEVEL index
1694                  */
1695                 ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list);
1696                 if (ret != LDB_SUCCESS) {
1697                         talloc_free(dn_list);
1698                         return ret;
1699                 }
1700
1701                 /*
1702                  * If we have too many matches, running the filter
1703                  * tree over the SCOPE_ONELEVEL can be quite expensive
1704                  * so we now check the filter tree index as well.
1705                  *
1706                  * We only do this in the GUID index mode, which is
1707                  * O(n*log(m)) otherwise the intersection below will
1708                  * be too costly at O(n*m).
1709                  *
1710                  * We don't set a heuristic for 'too many' but instead
1711                  * do it always and rely on the index lookup being
1712                  * fast enough in the small case.
1713                  */
1714                 if (ltdb->cache->GUID_index_attribute != NULL) {
1715                         struct dn_list *idx_one_tree_list
1716                                 = talloc_zero(ac, struct dn_list);
1717                         if (idx_one_tree_list == NULL) {
1718                                 return ldb_module_oom(ac->module);
1719                         }
1720
1721                         if (!ltdb->cache->attribute_indexes) {
1722                                 talloc_free(idx_one_tree_list);
1723                                 talloc_free(dn_list);
1724                                 return LDB_ERR_OPERATIONS_ERROR;
1725                         }
1726                         /*
1727                          * Here we load the index for the tree.
1728                          */
1729                         ret = ltdb_index_dn(ac->module, ltdb, ac->tree,
1730                                             idx_one_tree_list);
1731                         if (ret != LDB_SUCCESS) {
1732                                 talloc_free(idx_one_tree_list);
1733                                 talloc_free(dn_list);
1734                                 return ret;
1735                         }
1736
1737                         if (!list_intersect(ldb, ltdb,
1738                                             dn_list, idx_one_tree_list)) {
1739                                 talloc_free(idx_one_tree_list);
1740                                 talloc_free(dn_list);
1741                                 return LDB_ERR_OPERATIONS_ERROR;
1742                         }
1743                 }
1744                 break;
1745
1746         case LDB_SCOPE_SUBTREE:
1747         case LDB_SCOPE_DEFAULT:
1748                 if (!ltdb->cache->attribute_indexes) {
1749                         talloc_free(dn_list);
1750                         return LDB_ERR_OPERATIONS_ERROR;
1751                 }
1752                 /*
1753                  * Here we load the index for the tree.  We have no
1754                  * index for the subtree.
1755                  */
1756                 ret = ltdb_index_dn(ac->module, ltdb, ac->tree, dn_list);
1757                 if (ret != LDB_SUCCESS) {
1758                         talloc_free(dn_list);
1759                         return ret;
1760                 }
1761                 break;
1762         }
1763
1764         ret = ltdb_index_filter(ltdb, dn_list, ac, match_count);
1765         talloc_free(dn_list);
1766         return ret;
1767 }
1768
1769 /**
1770  * @brief Add a DN in the index list of a given attribute name/value pair
1771  *
1772  * This function will add the DN in the index list for the index for
1773  * the given attribute name and value.
1774  *
1775  * @param[in]  module       A ldb_module structure
1776  *
1777  * @param[in]  dn           The string representation of the DN as it
1778  *                          will be stored in the index entry
1779  *
1780  * @param[in]  el           A ldb_message_element array, one of the entry
1781  *                          referred by the v_idx is the attribute name and
1782  *                          value pair which will be used to construct the
1783  *                          index name
1784  *
1785  * @param[in]  v_idx        The index of element in the el array to use
1786  *
1787  * @return                  An ldb error code
1788  */
1789 static int ltdb_index_add1(struct ldb_module *module,
1790                            struct ltdb_private *ltdb,
1791                            const struct ldb_message *msg,
1792                            struct ldb_message_element *el, int v_idx)
1793 {
1794         struct ldb_context *ldb;
1795         struct ldb_dn *dn_key;
1796         int ret;
1797         const struct ldb_schema_attribute *a;
1798         struct dn_list *list;
1799         unsigned alloc_len;
1800
1801         ldb = ldb_module_get_ctx(module);
1802
1803         list = talloc_zero(module, struct dn_list);
1804         if (list == NULL) {
1805                 return LDB_ERR_OPERATIONS_ERROR;
1806         }
1807
1808         dn_key = ltdb_index_key(ldb, ltdb,
1809                                 el->name, &el->values[v_idx], &a);
1810         if (!dn_key) {
1811                 talloc_free(list);
1812                 return LDB_ERR_OPERATIONS_ERROR;
1813         }
1814         talloc_steal(list, dn_key);
1815
1816         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
1817         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1818                 talloc_free(list);
1819                 return ret;
1820         }
1821
1822         /*
1823          * Check for duplicates in unique indexes and for the @IDXDN
1824          * DN -> GUID record
1825          */
1826         if (list->count > 0 &&
1827             ((a != NULL
1828               && (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX ||
1829                  (el->flags & LDB_FLAG_INTERNAL_FORCE_UNIQUE_INDEX))) ||
1830              ldb_attr_cmp(el->name, LTDB_IDXDN) == 0)) {
1831                 /*
1832                  * We do not want to print info about a possibly
1833                  * confidential DN that the conflict was with in the
1834                  * user-visible error string
1835                  */
1836                 ldb_debug(ldb, LDB_DEBUG_WARNING,
1837                           __location__ ": unique index violation on %s in %s, "
1838                           "conficts with %*.*s in %s",
1839                           el->name, ldb_dn_get_linearized(msg->dn),
1840                           (int)list->dn[0].length,
1841                           (int)list->dn[0].length,
1842                           list->dn[0].data,
1843                           ldb_dn_get_linearized(dn_key));
1844                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1845                                        el->name,
1846                                        ldb_dn_get_linearized(msg->dn));
1847                 talloc_free(list);
1848                 return LDB_ERR_CONSTRAINT_VIOLATION;
1849         }
1850
1851         /* overallocate the list a bit, to reduce the number of
1852          * realloc trigered copies */
1853         alloc_len = ((list->count+1)+7) & ~7;
1854         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1855         if (list->dn == NULL) {
1856                 talloc_free(list);
1857                 return LDB_ERR_OPERATIONS_ERROR;
1858         }
1859
1860         if (ltdb->cache->GUID_index_attribute == NULL) {
1861                 const char *dn_str = ldb_dn_get_linearized(msg->dn);
1862                 list->dn[list->count].data
1863                         = (uint8_t *)talloc_strdup(list->dn, dn_str);
1864                 if (list->dn[list->count].data == NULL) {
1865                         talloc_free(list);
1866                         return LDB_ERR_OPERATIONS_ERROR;
1867                 }
1868                 list->dn[list->count].length = strlen(dn_str);
1869         } else {
1870                 const struct ldb_val *key_val;
1871                 struct ldb_val *exact = NULL, *next = NULL;
1872                 key_val = ldb_msg_find_ldb_val(msg,
1873                                                ltdb->cache->GUID_index_attribute);
1874                 if (key_val == NULL) {
1875                         talloc_free(list);
1876                         return ldb_module_operr(module);
1877                 }
1878
1879                 if (key_val->length != LTDB_GUID_SIZE) {
1880                         talloc_free(list);
1881                         return ldb_module_operr(module);
1882                 }
1883
1884                 BINARY_ARRAY_SEARCH_GTE(list->dn, list->count,
1885                                         *key_val, ldb_val_equal_exact_ordered,
1886                                         exact, next);
1887                 if (exact != NULL) {
1888                         talloc_free(list);
1889                         return LDB_ERR_OPERATIONS_ERROR;
1890                 }
1891
1892                 if (next == NULL) {
1893                         next = &list->dn[list->count];
1894                 } else {
1895                         memmove(&next[1], next,
1896                                 sizeof(*next) * (list->count - (next - list->dn)));
1897                 }
1898                 *next = ldb_val_dup(list->dn, key_val);
1899                 if (next->data == NULL) {
1900                         talloc_free(list);
1901                         return ldb_module_operr(module);
1902                 }
1903         }
1904         list->count++;
1905
1906         ret = ltdb_dn_list_store(module, dn_key, list);
1907
1908         talloc_free(list);
1909
1910         return ret;
1911 }
1912
1913 /*
1914   add index entries for one elements in a message
1915  */
1916 static int ltdb_index_add_el(struct ldb_module *module,
1917                              struct ltdb_private *ltdb,
1918                              const struct ldb_message *msg,
1919                              struct ldb_message_element *el)
1920 {
1921         unsigned int i;
1922         for (i = 0; i < el->num_values; i++) {
1923                 int ret = ltdb_index_add1(module, ltdb,
1924                                           msg, el, i);
1925                 if (ret != LDB_SUCCESS) {
1926                         return ret;
1927                 }
1928         }
1929
1930         return LDB_SUCCESS;
1931 }
1932
1933 /*
1934   add index entries for all elements in a message
1935  */
1936 static int ltdb_index_add_all(struct ldb_module *module,
1937                               struct ltdb_private *ltdb,
1938                               const struct ldb_message *msg)
1939 {
1940         struct ldb_message_element *elements = msg->elements;
1941         unsigned int i;
1942         const char *dn_str;
1943         int ret;
1944
1945         if (ldb_dn_is_special(msg->dn)) {
1946                 return LDB_SUCCESS;
1947         }
1948
1949         dn_str = ldb_dn_get_linearized(msg->dn);
1950         if (dn_str == NULL) {
1951                 return LDB_ERR_OPERATIONS_ERROR;
1952         }
1953
1954         ret = ltdb_write_index_dn_guid(module, msg, 1);
1955         if (ret != LDB_SUCCESS) {
1956                 return ret;
1957         }
1958
1959         if (!ltdb->cache->attribute_indexes) {
1960                 /* no indexed fields */
1961                 return LDB_SUCCESS;
1962         }
1963
1964         for (i = 0; i < msg->num_elements; i++) {
1965                 if (!ltdb_is_indexed(module, ltdb, elements[i].name)) {
1966                         continue;
1967                 }
1968                 ret = ltdb_index_add_el(module, ltdb,
1969                                         msg, &elements[i]);
1970                 if (ret != LDB_SUCCESS) {
1971                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1972                         ldb_asprintf_errstring(ldb,
1973                                                __location__ ": Failed to re-index %s in %s - %s",
1974                                                elements[i].name, dn_str,
1975                                                ldb_errstring(ldb));
1976                         return ret;
1977                 }
1978         }
1979
1980         return LDB_SUCCESS;
1981 }
1982
1983
1984 /*
1985   insert a DN index for a message
1986 */
1987 static int ltdb_modify_index_dn(struct ldb_module *module,
1988                                 struct ltdb_private *ltdb,
1989                                 const struct ldb_message *msg,
1990                                 struct ldb_dn *dn,
1991                                 const char *index, int add)
1992 {
1993         struct ldb_message_element el;
1994         struct ldb_val val;
1995         int ret;
1996
1997         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
1998         if (val.data == NULL) {
1999                 const char *dn_str = ldb_dn_get_linearized(dn);
2000                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2001                                        __location__
2002                                        ": Failed to modify %s "
2003                                        "against %s in %s: failed "
2004                                        "to get casefold DN",
2005                                        index,
2006                                        ltdb->cache->GUID_index_attribute,
2007                                        dn_str);
2008                 return LDB_ERR_OPERATIONS_ERROR;
2009         }
2010
2011         val.length = strlen((char *)val.data);
2012         el.name = index;
2013         el.values = &val;
2014         el.num_values = 1;
2015
2016         if (add) {
2017                 ret = ltdb_index_add1(module, ltdb, msg, &el, 0);
2018         } else { /* delete */
2019                 ret = ltdb_index_del_value(module, ltdb, msg, &el, 0);
2020         }
2021
2022         if (ret != LDB_SUCCESS) {
2023                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2024                 const char *dn_str = ldb_dn_get_linearized(dn);
2025                 ldb_asprintf_errstring(ldb,
2026                                        __location__
2027                                        ": Failed to modify %s "
2028                                        "against %s in %s - %s",
2029                                        index,
2030                                        ltdb->cache->GUID_index_attribute,
2031                                        dn_str, ldb_errstring(ldb));
2032                 return ret;
2033         }
2034         return ret;
2035 }
2036
2037 /*
2038   insert a one level index for a message
2039 */
2040 static int ltdb_index_onelevel(struct ldb_module *module,
2041                                const struct ldb_message *msg, int add)
2042 {
2043         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2044                                                     struct ltdb_private);
2045         struct ldb_dn *pdn;
2046         int ret;
2047
2048         /* We index for ONE Level only if requested */
2049         if (!ltdb->cache->one_level_indexes) {
2050                 return LDB_SUCCESS;
2051         }
2052
2053         pdn = ldb_dn_get_parent(module, msg->dn);
2054         if (pdn == NULL) {
2055                 return LDB_ERR_OPERATIONS_ERROR;
2056         }
2057         ret = ltdb_modify_index_dn(module, ltdb,
2058                                    msg, pdn, LTDB_IDXONE, add);
2059
2060         talloc_free(pdn);
2061
2062         return ret;
2063 }
2064
2065 /*
2066   insert a one level index for a message
2067 */
2068 static int ltdb_write_index_dn_guid(struct ldb_module *module,
2069                                     const struct ldb_message *msg,
2070                                     int add)
2071 {
2072         int ret;
2073         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2074                                                     struct ltdb_private);
2075
2076         /* We index for DN only if using a GUID index */
2077         if (ltdb->cache->GUID_index_attribute == NULL) {
2078                 return LDB_SUCCESS;
2079         }
2080
2081         ret = ltdb_modify_index_dn(module, ltdb, msg, msg->dn,
2082                                    LTDB_IDXDN, add);
2083
2084         if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
2085                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2086                                        "Entry %s already exists",
2087                                        ldb_dn_get_linearized(msg->dn));
2088                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
2089         }
2090         return ret;
2091 }
2092
2093 /*
2094   add the index entries for a new element in a record
2095   The caller guarantees that these element values are not yet indexed
2096 */
2097 int ltdb_index_add_element(struct ldb_module *module,
2098                            struct ltdb_private *ltdb,
2099                            const struct ldb_message *msg,
2100                            struct ldb_message_element *el)
2101 {
2102         if (ldb_dn_is_special(msg->dn)) {
2103                 return LDB_SUCCESS;
2104         }
2105         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2106                 return LDB_SUCCESS;
2107         }
2108         return ltdb_index_add_el(module, ltdb, msg, el);
2109 }
2110
2111 /*
2112   add the index entries for a new record
2113 */
2114 int ltdb_index_add_new(struct ldb_module *module,
2115                        struct ltdb_private *ltdb,
2116                        const struct ldb_message *msg)
2117 {
2118         int ret;
2119
2120         if (ldb_dn_is_special(msg->dn)) {
2121                 return LDB_SUCCESS;
2122         }
2123
2124         ret = ltdb_index_add_all(module, ltdb, msg);
2125         if (ret != LDB_SUCCESS) {
2126                 /*
2127                  * Because we can't trust the caller to be doing
2128                  * transactions properly, clean up any index for this
2129                  * entry rather than relying on a transaction
2130                  * cleanup
2131                  */
2132
2133                 ltdb_index_delete(module, msg);
2134                 return ret;
2135         }
2136
2137         ret = ltdb_index_onelevel(module, msg, 1);
2138         if (ret != LDB_SUCCESS) {
2139                 /*
2140                  * Because we can't trust the caller to be doing
2141                  * transactions properly, clean up any index for this
2142                  * entry rather than relying on a transaction
2143                  * cleanup
2144                  */
2145                 ltdb_index_delete(module, msg);
2146                 return ret;
2147         }
2148         return ret;
2149 }
2150
2151
2152 /*
2153   delete an index entry for one message element
2154 */
2155 int ltdb_index_del_value(struct ldb_module *module,
2156                          struct ltdb_private *ltdb,
2157                          const struct ldb_message *msg,
2158                          struct ldb_message_element *el, unsigned int v_idx)
2159 {
2160         struct ldb_context *ldb;
2161         struct ldb_dn *dn_key;
2162         const char *dn_str;
2163         int ret, i;
2164         unsigned int j;
2165         struct dn_list *list;
2166         struct ldb_dn *dn = msg->dn;
2167
2168         ldb = ldb_module_get_ctx(module);
2169
2170         dn_str = ldb_dn_get_linearized(dn);
2171         if (dn_str == NULL) {
2172                 return LDB_ERR_OPERATIONS_ERROR;
2173         }
2174
2175         if (dn_str[0] == '@') {
2176                 return LDB_SUCCESS;
2177         }
2178
2179         dn_key = ltdb_index_key(ldb, ltdb,
2180                                 el->name, &el->values[v_idx], NULL);
2181         if (!dn_key) {
2182                 return LDB_ERR_OPERATIONS_ERROR;
2183         }
2184
2185         list = talloc_zero(dn_key, struct dn_list);
2186         if (list == NULL) {
2187                 talloc_free(dn_key);
2188                 return LDB_ERR_OPERATIONS_ERROR;
2189         }
2190
2191         ret = ltdb_dn_list_load(module, ltdb, dn_key, list);
2192         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
2193                 /* it wasn't indexed. Did we have an earlier error? If we did then
2194                    its gone now */
2195                 talloc_free(dn_key);
2196                 return LDB_SUCCESS;
2197         }
2198
2199         if (ret != LDB_SUCCESS) {
2200                 talloc_free(dn_key);
2201                 return ret;
2202         }
2203
2204         i = ltdb_dn_list_find_msg(ltdb, list, msg);
2205         if (i == -1) {
2206                 /* nothing to delete */
2207                 talloc_free(dn_key);
2208                 return LDB_SUCCESS;
2209         }
2210
2211         j = (unsigned int) i;
2212         if (j != list->count - 1) {
2213                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
2214         }
2215         list->count--;
2216         if (list->count == 0) {
2217                 talloc_free(list->dn);
2218                 list->dn = NULL;
2219         } else {
2220                 list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
2221         }
2222
2223         ret = ltdb_dn_list_store(module, dn_key, list);
2224
2225         talloc_free(dn_key);
2226
2227         return ret;
2228 }
2229
2230 /*
2231   delete the index entries for a element
2232   return -1 on failure
2233 */
2234 int ltdb_index_del_element(struct ldb_module *module,
2235                            struct ltdb_private *ltdb,
2236                            const struct ldb_message *msg,
2237                            struct ldb_message_element *el)
2238 {
2239         const char *dn_str;
2240         int ret;
2241         unsigned int i;
2242
2243         if (!ltdb->cache->attribute_indexes) {
2244                 /* no indexed fields */
2245                 return LDB_SUCCESS;
2246         }
2247
2248         dn_str = ldb_dn_get_linearized(msg->dn);
2249         if (dn_str == NULL) {
2250                 return LDB_ERR_OPERATIONS_ERROR;
2251         }
2252
2253         if (dn_str[0] == '@') {
2254                 return LDB_SUCCESS;
2255         }
2256
2257         if (!ltdb_is_indexed(module, ltdb, el->name)) {
2258                 return LDB_SUCCESS;
2259         }
2260         for (i = 0; i < el->num_values; i++) {
2261                 ret = ltdb_index_del_value(module, ltdb, msg, el, i);
2262                 if (ret != LDB_SUCCESS) {
2263                         return ret;
2264                 }
2265         }
2266
2267         return LDB_SUCCESS;
2268 }
2269
2270 /*
2271   delete the index entries for a record
2272   return -1 on failure
2273 */
2274 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
2275 {
2276         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2277         int ret;
2278         unsigned int i;
2279
2280         if (ldb_dn_is_special(msg->dn)) {
2281                 return LDB_SUCCESS;
2282         }
2283
2284         ret = ltdb_index_onelevel(module, msg, 0);
2285         if (ret != LDB_SUCCESS) {
2286                 return ret;
2287         }
2288
2289         ret = ltdb_write_index_dn_guid(module, msg, 0);
2290         if (ret != LDB_SUCCESS) {
2291                 return ret;
2292         }
2293
2294         if (!ltdb->cache->attribute_indexes) {
2295                 /* no indexed fields */
2296                 return LDB_SUCCESS;
2297         }
2298
2299         for (i = 0; i < msg->num_elements; i++) {
2300                 ret = ltdb_index_del_element(module, ltdb,
2301                                              msg, &msg->elements[i]);
2302                 if (ret != LDB_SUCCESS) {
2303                         return ret;
2304                 }
2305         }
2306
2307         return LDB_SUCCESS;
2308 }
2309
2310
2311 /*
2312   traversal function that deletes all @INDEX records in the in-memory
2313   TDB.
2314
2315   This does not touch the actual DB, that is done at transaction
2316   commit, which in turn greatly reduces DB churn as we will likely
2317   be able to do a direct update into the old record.
2318 */
2319 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2320 {
2321         struct ldb_module *module = state;
2322         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2323         const char *dnstr = "DN=" LTDB_INDEX ":";
2324         struct dn_list list;
2325         struct ldb_dn *dn;
2326         struct ldb_val v;
2327         int ret;
2328
2329         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
2330                 return 0;
2331         }
2332         /* we need to put a empty list in the internal tdb for this
2333          * index entry */
2334         list.dn = NULL;
2335         list.count = 0;
2336
2337         /* the offset of 3 is to remove the DN= prefix. */
2338         v.data = key.dptr + 3;
2339         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
2340
2341         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
2342
2343         /*
2344          * This does not actually touch the DB quite yet, just
2345          * the in-memory index cache
2346          */
2347         ret = ltdb_dn_list_store(module, dn, &list);
2348         if (ret != LDB_SUCCESS) {
2349                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
2350                                        "Unable to store null index for %s\n",
2351                                                 ldb_dn_get_linearized(dn));
2352                 talloc_free(dn);
2353                 return -1;
2354         }
2355         talloc_free(dn);
2356         return 0;
2357 }
2358
2359 struct ltdb_reindex_context {
2360         struct ldb_module *module;
2361         int error;
2362         uint32_t count;
2363 };
2364
2365 /*
2366   traversal function that adds @INDEX records during a re index
2367 */
2368 static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2369 {
2370         struct ldb_context *ldb;
2371         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2372         struct ldb_module *module = ctx->module;
2373         struct ldb_message *msg;
2374         unsigned int nb_elements_in_db;
2375         const struct ldb_val val = {
2376                 .data = data.dptr,
2377                 .length = data.dsize,
2378         };
2379         int ret;
2380         TDB_DATA key2;
2381         bool is_record;
2382         
2383         ldb = ldb_module_get_ctx(module);
2384
2385         if (key.dsize > 4 &&
2386             memcmp(key.dptr, "DN=@", 4) == 0) {
2387                 return 0;
2388         }
2389
2390         is_record = ltdb_key_is_record(key);
2391         if (is_record == false) {
2392                 return 0;
2393         }
2394         
2395         msg = ldb_msg_new(module);
2396         if (msg == NULL) {
2397                 return -1;
2398         }
2399
2400         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2401                                                    msg,
2402                                                    NULL, 0,
2403                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2404                                                    &nb_elements_in_db);
2405         if (ret != 0) {
2406                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2407                                                 ldb_dn_get_linearized(msg->dn));
2408                 ctx->error = ret;
2409                 talloc_free(msg);
2410                 return -1;
2411         }
2412
2413         if (msg->dn == NULL) {
2414                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2415                           "Refusing to re-index as GUID "
2416                           "key %*.*s with no DN\n",
2417                           (int)key.dsize, (int)key.dsize,
2418                           (char *)key.dptr);
2419                 talloc_free(msg);
2420                 return -1;
2421         }
2422         
2423         /* check if the DN key has changed, perhaps due to the case
2424            insensitivity of an element changing, or a change from DN
2425            to GUID keys */
2426         key2 = ltdb_key_msg(module, msg, msg);
2427         if (key2.dptr == NULL) {
2428                 /* probably a corrupt record ... darn */
2429                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
2430                                                 ldb_dn_get_linearized(msg->dn));
2431                 talloc_free(msg);
2432                 return 0;
2433         }
2434         if (key.dsize != key2.dsize ||
2435             (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
2436                 int tdb_ret;
2437                 tdb_ret = tdb_delete(tdb, key);
2438                 if (tdb_ret != 0) {
2439                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2440                                   "Failed to delete %*.*s "
2441                                   "for rekey as %*.*s: %s",
2442                                   (int)key.dsize, (int)key.dsize,
2443                                   (const char *)key.dptr,
2444                                   (int)key2.dsize, (int)key2.dsize,
2445                                   (const char *)key.dptr,
2446                                   tdb_errorstr(tdb));
2447                         ctx->error = ltdb_err_map(tdb_error(tdb));
2448                         return -1;
2449                 }
2450                 tdb_ret = tdb_store(tdb, key2, data, 0);
2451                 if (tdb_ret != 0) {
2452                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2453                                   "Failed to rekey %*.*s as %*.*s: %s",
2454                                   (int)key.dsize, (int)key.dsize,
2455                                   (const char *)key.dptr,
2456                                   (int)key2.dsize, (int)key2.dsize,
2457                                   (const char *)key.dptr,
2458                                   tdb_errorstr(tdb));
2459                         ctx->error = ltdb_err_map(tdb_error(tdb));
2460                         return -1;
2461                 }
2462         }
2463         talloc_free(key2.dptr);
2464
2465         talloc_free(msg);
2466
2467         ctx->count++;
2468         if (ctx->count % 10000 == 0) {
2469                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2470                           "Reindexing: re-keyed %u records so far",
2471                           ctx->count);
2472         }
2473
2474         return 0;
2475 }
2476
2477 /*
2478   traversal function that adds @INDEX records during a re index
2479 */
2480 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
2481 {
2482         struct ldb_context *ldb;
2483         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
2484         struct ldb_module *module = ctx->module;
2485         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
2486                                                     struct ltdb_private);
2487         struct ldb_message *msg;
2488         unsigned int nb_elements_in_db;
2489         const struct ldb_val val = {
2490                 .data = data.dptr,
2491                 .length = data.dsize,
2492         };
2493         int ret;
2494         bool is_record;
2495         
2496         ldb = ldb_module_get_ctx(module);
2497
2498         if (key.dsize > 4 &&
2499             memcmp(key.dptr, "DN=@", 4) == 0) {
2500                 return 0;
2501         }
2502
2503         is_record = ltdb_key_is_record(key);
2504         if (is_record == false) {
2505                 return 0;
2506         }
2507         
2508         msg = ldb_msg_new(module);
2509         if (msg == NULL) {
2510                 return -1;
2511         }
2512
2513         ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
2514                                                    msg,
2515                                                    NULL, 0,
2516                                                    LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
2517                                                    &nb_elements_in_db);
2518         if (ret != 0) {
2519                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
2520                                                 ldb_dn_get_linearized(msg->dn));
2521                 ctx->error = ret;
2522                 talloc_free(msg);
2523                 return -1;
2524         }
2525
2526         if (msg->dn == NULL) {
2527                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2528                           "Refusing to re-index as GUID "
2529                           "key %*.*s with no DN\n",
2530                           (int)key.dsize, (int)key.dsize,
2531                           (char *)key.dptr);
2532                 talloc_free(msg);
2533                 return -1;
2534         }
2535
2536         ret = ltdb_index_onelevel(module, msg, 1);
2537         if (ret != LDB_SUCCESS) {
2538                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2539                           "Adding special ONE LEVEL index failed (%s)!",
2540                                                 ldb_dn_get_linearized(msg->dn));
2541                 talloc_free(msg);
2542                 return -1;
2543         }
2544
2545         ret = ltdb_index_add_all(module, ltdb, msg);
2546
2547         if (ret != LDB_SUCCESS) {
2548                 ctx->error = ret;
2549                 talloc_free(msg);
2550                 return -1;
2551         }
2552
2553         talloc_free(msg);
2554
2555         ctx->count++;
2556         if (ctx->count % 10000 == 0) {
2557                 ldb_debug(ldb, LDB_DEBUG_WARNING,
2558                           "Reindexing: re-indexed %u records so far",
2559                           ctx->count);
2560         }
2561
2562         return 0;
2563 }
2564
2565 /*
2566   force a complete reindex of the database
2567 */
2568 int ltdb_reindex(struct ldb_module *module)
2569 {
2570         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
2571         int ret;
2572         struct ltdb_reindex_context ctx;
2573
2574         /*
2575          * Only triggered after a modification, but make clear we do
2576          * not re-index a read-only DB
2577          */
2578         if (ltdb->read_only) {
2579                 return LDB_ERR_UNWILLING_TO_PERFORM;
2580         }
2581
2582         if (ltdb_cache_reload(module) != 0) {
2583                 return LDB_ERR_OPERATIONS_ERROR;
2584         }
2585
2586         /*
2587          * Ensure we read (and so remove) the entries from the real
2588          * DB, no values stored so far are any use as we want to do a
2589          * re-index
2590          */
2591         ltdb_index_transaction_cancel(module);
2592
2593         ret = ltdb_index_transaction_start(module);
2594         if (ret != LDB_SUCCESS) {
2595                 return ret;
2596         }
2597
2598         /* first traverse the database deleting any @INDEX records by
2599          * putting NULL entries in the in-memory tdb
2600          */
2601         ret = tdb_traverse(ltdb->tdb, delete_index, module);
2602         if (ret < 0) {
2603                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2604                 ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
2605                                        ldb_errstring(ldb));
2606                 return LDB_ERR_OPERATIONS_ERROR;
2607         }
2608
2609         ctx.module = module;
2610         ctx.error = 0;
2611         ctx.count = 0;
2612
2613         /* now traverse adding any indexes for normal LDB records */
2614         ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
2615         if (ret < 0) {
2616                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2617                 ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
2618                                        ldb_errstring(ldb));
2619                 return LDB_ERR_OPERATIONS_ERROR;
2620         }
2621
2622         if (ctx.error != LDB_SUCCESS) {
2623                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2624                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2625                 return ctx.error;
2626         }
2627
2628         ctx.error = 0;
2629         ctx.count = 0;
2630
2631         /* now traverse adding any indexes for normal LDB records */
2632         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
2633         if (ret < 0) {
2634                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2635                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
2636                                        ldb_errstring(ldb));
2637                 return LDB_ERR_OPERATIONS_ERROR;
2638         }
2639
2640         if (ctx.error != LDB_SUCCESS) {
2641                 struct ldb_context *ldb = ldb_module_get_ctx(module);
2642                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
2643                 return ctx.error;
2644         }
2645
2646         if (ctx.count > 10000) {
2647                 ldb_debug(ldb_module_get_ctx(module),
2648                           LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
2649                           "final index write-out will be in transaction commit",
2650                           tdb_name(ltdb->tdb));
2651         }
2652         return LDB_SUCCESS;
2653 }