s4:ldb Fix ldb_list_find() folowing the change from char * to TDB_DATA
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35 #include "dlinklist.h"
36
37 /*
38   the idxptr code is a bit unusual. The way it works is to replace
39   @IDX elements in records during a transaction with @IDXPTR
40   elements. The @IDXPTR elements don't contain the actual index entry
41   values, but contain a pointer to a linked list of values.
42
43   This means we are storing pointers in a database, which is normally
44   not allowed, but in this case we are storing them only for the
45   duration of a transaction, and re-writing them into the normal @IDX
46   format at the end of the transaction. That means no other processes
47   are ever exposed to the @IDXPTR values.
48
49   The advantage is that the linked list doesn't cause huge
50   fragmentation during a transaction. Without the @IDXPTR method we
51   often ended up with a ldb that was between 10x and 100x larger then
52   it needs to be due to massive fragmentation caused by re-writing
53   @INDEX records many times during indexing.
54  */
55 struct ldb_index_pointer {
56         struct ldb_index_pointer *next, *prev;
57         struct ldb_val value;
58 };
59
60 struct ltdb_idxptr {
61         int num_dns;
62         const char **dn_list;
63         bool repack;
64 };
65
66 /*
67   add to the list of DNs that need to be fixed on transaction end
68  */
69 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
70 {
71         void *data = ldb_module_get_private(module);
72         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
73         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
74                                                const char *, ltdb->idxptr->num_dns+1);
75         if (ltdb->idxptr->dn_list == NULL) {
76                 ltdb->idxptr->num_dns = 0;
77                 return LDB_ERR_OPERATIONS_ERROR;
78         }
79         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
80                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
81         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
82                 return LDB_ERR_OPERATIONS_ERROR;
83         }
84         ltdb->idxptr->num_dns++;
85         return LDB_SUCCESS;
86 }
87
88 /* free an idxptr record */
89 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
90 {
91         struct ldb_val val;
92         struct ldb_index_pointer *ptr;
93
94         if (el->num_values != 1) {
95                 return LDB_ERR_OPERATIONS_ERROR;
96         }
97
98         val = el->values[0];
99         if (val.length != sizeof(void *)) {
100                 return LDB_ERR_OPERATIONS_ERROR;
101         }
102
103         ptr = *(struct ldb_index_pointer **)val.data;
104         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
105                 return LDB_ERR_OPERATIONS_ERROR;
106         }
107
108         while (ptr) {
109                 struct ldb_index_pointer *tmp = ptr;
110                 DLIST_REMOVE(ptr, ptr);
111                 talloc_free(tmp);
112         }
113
114         return LDB_SUCCESS;
115 }
116
117
118 /* convert from the IDXPTR format to a ldb_message_element format */
119 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
120 {
121         struct ldb_val val;
122         struct ldb_index_pointer *ptr, *tmp;
123         int i;
124         struct ldb_val *val2;
125
126         if (el->num_values != 1) {
127                 return LDB_ERR_OPERATIONS_ERROR;
128         }
129
130         val = el->values[0];
131         if (val.length != sizeof(void *)) {
132                 return LDB_ERR_OPERATIONS_ERROR;
133         }
134
135         ptr = *(struct ldb_index_pointer **)val.data;
136         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
137                 return LDB_ERR_OPERATIONS_ERROR;
138         }
139
140         /* count the length of the list */
141         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
142                 i++;
143         }
144
145         /* allocate the new values array */
146         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
147         if (val2 == NULL) {
148                 return LDB_ERR_OPERATIONS_ERROR;
149         }
150         el->values = val2;
151         el->num_values = i;
152
153         /* populate the values array */
154         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
155                 el->values[i].length = tmp->value.length;
156                 /* we need to over-allocate here as there are still some places
157                    in ldb that rely on null termination. */
158                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
159                 if (el->values[i].data == NULL) {
160                         return LDB_ERR_OPERATIONS_ERROR;
161                 }
162                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
163                 el->values[i].data[tmp->value.length] = 0;
164         }
165
166         /* update the name */
167         el->name = LTDB_IDX;
168
169         return LDB_SUCCESS;
170 }
171
172
173 /* convert to the IDXPTR format from a ldb_message_element format */
174 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
175 {
176         struct ldb_index_pointer *ptr, *tmp;
177         int i;
178         struct ldb_val *val2;
179         void *data = ldb_module_get_private(module);
180         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
181
182         ptr = NULL;
183
184         for (i=0;i<el->num_values;i++) {
185                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
186                 if (tmp == NULL) {
187                         return LDB_ERR_OPERATIONS_ERROR;
188                 }
189                 tmp->value = el->values[i];
190                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
191                 if (tmp->value.data == NULL) {
192                         return LDB_ERR_OPERATIONS_ERROR;
193                 }
194                 DLIST_ADD(ptr, tmp);
195         }
196
197         /* allocate the new values array */
198         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
199         if (val2 == NULL) {
200                 return LDB_ERR_OPERATIONS_ERROR;
201         }
202         el->values = val2;
203         el->num_values = 1;
204
205         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
206         el->values[0].length = sizeof(ptr);
207
208         /* update the name */
209         el->name = LTDB_IDXPTR;
210
211         return LDB_SUCCESS;
212 }
213
214
215 /* enable the idxptr mode when transactions start */
216 int ltdb_index_transaction_start(struct ldb_module *module)
217 {
218         void *data = ldb_module_get_private(module);
219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
220         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
221         return LDB_SUCCESS;
222 }
223
224 /*
225   a wrapper around ltdb_search_dn1() which translates pointer based index records
226   and maps them into normal ldb message structures
227  */
228 static int ltdb_search_dn1_index(struct ldb_module *module,
229                                 struct ldb_dn *dn, struct ldb_message *msg)
230 {
231         int ret, i;
232         ret = ltdb_search_dn1(module, dn, msg);
233         if (ret != LDB_SUCCESS) {
234                 return ret;
235         }
236
237         /* if this isn't a @INDEX record then don't munge it */
238         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         for (i=0;i<msg->num_elements;i++) {
243                 struct ldb_message_element *el = &msg->elements[i];
244                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
245                         ret = ltdb_convert_from_idxptr(module, el);
246                         if (ret != LDB_SUCCESS) {
247                                 return ret;
248                         }
249                 }
250         }
251
252         return ret;
253 }
254
255
256
257 /*
258   fixup the idxptr for one DN
259  */
260 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
261 {
262         struct ldb_context *ldb;
263         struct ldb_dn *dn;
264         struct ldb_message *msg = ldb_msg_new(module);
265         int ret;
266
267         ldb = ldb_module_get_ctx(module);
268
269         dn = ldb_dn_new(msg, ldb, strdn);
270         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
271                 ret = ltdb_store(module, msg, TDB_REPLACE);
272         }
273         talloc_free(msg);
274         return ret;
275 }
276
277 /* cleanup the idxptr mode when transaction commits */
278 int ltdb_index_transaction_commit(struct ldb_module *module)
279 {
280         int i;
281         void *data = ldb_module_get_private(module);
282         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
283
284         /* fix all the DNs that we have modified */
285         if (ltdb->idxptr) {
286                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
287                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
288                 }
289
290                 if (ltdb->idxptr->repack) {
291                         tdb_repack(ltdb->tdb);
292                 }
293         }
294
295         talloc_free(ltdb->idxptr);
296         ltdb->idxptr = NULL;
297         return LDB_SUCCESS;
298 }
299
300 /* cleanup the idxptr mode when transaction cancels */
301 int ltdb_index_transaction_cancel(struct ldb_module *module)
302 {
303         void *data = ldb_module_get_private(module);
304         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
305         talloc_free(ltdb->idxptr);
306         ltdb->idxptr = NULL;
307         return LDB_SUCCESS;
308 }
309
310
311
312 /* a wrapper around ltdb_store() for the index code which
313    stores in IDXPTR format when idxptr mode is enabled
314
315    WARNING: This modifies the msg which is passed in
316 */
317 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
318 {
319         void *data = ldb_module_get_private(module);
320         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
321         int ret;
322
323         if (ltdb->idxptr) {
324                 int i;
325                 struct ldb_message *msg2 = ldb_msg_new(module);
326
327                 /* free any old pointer */
328                 ret = ltdb_search_dn1(module, msg->dn, msg2);
329                 if (ret == 0) {
330                         for (i=0;i<msg2->num_elements;i++) {
331                                 struct ldb_message_element *el = &msg2->elements[i];
332                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
333                                         ret = ltdb_free_idxptr(module, el);
334                                         if (ret != LDB_SUCCESS) {
335                                                 return ret;
336                                         }
337                                 }
338                         }
339                 }
340                 talloc_free(msg2);
341
342                 for (i=0;i<msg->num_elements;i++) {
343                         struct ldb_message_element *el = &msg->elements[i];
344                         if (strcmp(el->name, LTDB_IDX) == 0) {
345                                 ret = ltdb_convert_to_idxptr(module, el);
346                                 if (ret != LDB_SUCCESS) {
347                                         return ret;
348                                 }
349                         }
350                 }
351
352                 if (ltdb_idxptr_add(module, msg) != 0) {
353                         return LDB_ERR_OPERATIONS_ERROR;
354                 }
355         }
356
357         ret = ltdb_store(module, msg, flgs);
358         return ret;
359 }
360
361
362 /*
363   find an element in a list, using the given comparison function and
364   assuming that the list is already sorted using comp_fn
365
366   return -1 if not found, or the index of the first occurance of needle if found
367 */
368 static int ldb_list_find(const void *needle,
369                          const void *base, size_t nmemb, size_t size,
370                          comparison_fn_t comp_fn)
371 {
372         const uint8_t *base_p = (const uint8_t *)base;
373         size_t min_i, max_i, test_i;
374
375         if (nmemb == 0) {
376                 return -1;
377         }
378
379         min_i = 0;
380         max_i = nmemb-1;
381
382         while (min_i < max_i) {
383                 int r;
384
385                 test_i = (min_i + max_i) / 2;
386                 r = comp_fn(needle, (void const *)(base_p + (size * test_i)));
387                 if (r == 0) {
388                         /* scan back for first element */
389                         while (test_i > 0 &&
390                                comp_fn(needle, (void const *)(base_p + (size * (test_i-1)))) == 0) {
391                                 test_i--;
392                         }
393                         return test_i;
394                 }
395                 if (r < 0) {
396                         if (test_i == 0) {
397                                 return -1;
398                         }
399                         max_i = test_i - 1;
400                 }
401                 if (r > 0) {
402                         min_i = test_i + 1;
403                 }
404         }
405
406         if (comp_fn(needle, (void const *)(base_p + (size * min_i))) == 0) {
407                 return min_i;
408         }
409
410         return -1;
411 }
412
413 struct dn_list {
414         unsigned int count;
415         char **dn;
416 };
417
418 /*
419   return the dn key to be used for an index
420   caller frees
421 */
422 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
423                                      const char *attr, const struct ldb_val *value,
424                                      const struct ldb_schema_attribute **ap)
425 {
426         struct ldb_dn *ret;
427         struct ldb_val v;
428         const struct ldb_schema_attribute *a;
429         char *attr_folded;
430         int r;
431
432         attr_folded = ldb_attr_casefold(ldb, attr);
433         if (!attr_folded) {
434                 return NULL;
435         }
436
437         a = ldb_schema_attribute_by_name(ldb, attr);
438         if (ap) {
439                 *ap = a;
440         }
441         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
442         if (r != LDB_SUCCESS) {
443                 const char *errstr = ldb_errstring(ldb);
444                 /* canonicalisation can be refused. For example, 
445                    a attribute that takes wildcards will refuse to canonicalise
446                    if the value contains a wildcard */
447                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
448                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
449                 talloc_free(attr_folded);
450                 return NULL;
451         }
452         if (ldb_should_b64_encode(ldb, &v)) {
453                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
454                 if (!vstr) return NULL;
455                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
456                 talloc_free(vstr);
457         } else {
458                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
459         }
460
461         if (v.data != value->data) {
462                 talloc_free(v.data);
463         }
464         talloc_free(attr_folded);
465
466         return ret;
467 }
468
469 /*
470   see if a attribute value is in the list of indexed attributes
471 */
472 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
473                             unsigned int *v_idx, const char *key)
474 {
475         unsigned int i, j;
476         for (i=0;i<msg->num_elements;i++) {
477                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
478                         const struct ldb_message_element *el = &msg->elements[i];
479
480                         if (attr == NULL) {
481                                 /* in this case we are just looking to see if key is present,
482                                    we are not spearching for a specific index */
483                                 return 0;
484                         }
485
486                         for (j=0;j<el->num_values;j++) {
487                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
488                                         if (v_idx) {
489                                                 *v_idx = j;
490                                         }
491                                         return i;
492                                 }
493                         }
494                 }
495         }
496         return -1;
497 }
498
499 /* used in sorting dn lists */
500 static int list_cmp(const char **s1, const char **s2)
501 {
502         return strcmp(*s1, *s2);
503 }
504
505 /*
506   return a list of dn's that might match a simple indexed search or
507  */
508 static int ltdb_index_dn_simple(struct ldb_module *module,
509                                 const struct ldb_parse_tree *tree,
510                                 const struct ldb_message *index_list,
511                                 struct dn_list *list)
512 {
513         struct ldb_context *ldb;
514         struct ldb_dn *dn;
515         int ret;
516         unsigned int i, j;
517         struct ldb_message *msg;
518
519         ldb = ldb_module_get_ctx(module);
520
521         list->count = 0;
522         list->dn = NULL;
523
524         /* if the attribute isn't in the list of indexed attributes then
525            this node needs a full search */
526         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
527                 return LDB_ERR_OPERATIONS_ERROR;
528         }
529
530         /* the attribute is indexed. Pull the list of DNs that match the 
531            search criterion */
532         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
533         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
534
535         msg = talloc(list, struct ldb_message);
536         if (msg == NULL) {
537                 return LDB_ERR_OPERATIONS_ERROR;
538         }
539
540         ret = ltdb_search_dn1_index(module, dn, msg);
541         talloc_free(dn);
542         if (ret != LDB_SUCCESS) {
543                 return ret;
544         }
545
546         for (i=0;i<msg->num_elements;i++) {
547                 struct ldb_message_element *el;
548
549                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
550                         continue;
551                 }
552
553                 el = &msg->elements[i];
554
555                 list->dn = talloc_array(list, char *, el->num_values);
556                 if (!list->dn) {
557                         talloc_free(msg);
558                         return LDB_ERR_OPERATIONS_ERROR;
559                 }
560
561                 for (j=0;j<el->num_values;j++) {
562                         list->dn[list->count] =
563                                 talloc_strdup(list->dn, (char *)el->values[j].data);
564                         if (!list->dn[list->count]) {
565                                 talloc_free(msg);
566                                 return LDB_ERR_OPERATIONS_ERROR;
567                         }
568                         list->count++;
569                 }
570         }
571
572         talloc_free(msg);
573
574         if (list->count > 1) {
575                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
576         }
577
578         return LDB_SUCCESS;
579 }
580
581
582 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
583
584 /*
585   return a list of dn's that might match a leaf indexed search
586  */
587 static int ltdb_index_dn_leaf(struct ldb_module *module,
588                               const struct ldb_parse_tree *tree,
589                               const struct ldb_message *index_list,
590                               struct dn_list *list)
591 {
592         struct ldb_context *ldb;
593         ldb = ldb_module_get_ctx(module);
594
595         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
596                 list->dn = talloc_array(list, char *, 1);
597                 if (list->dn == NULL) {
598                         ldb_oom(ldb);
599                         return LDB_ERR_OPERATIONS_ERROR;
600                 }
601                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
602                 if (list->dn[0] == NULL) {
603                         ldb_oom(ldb);
604                         return LDB_ERR_OPERATIONS_ERROR;
605                 }
606                 list->count = 1;
607                 return LDB_SUCCESS;
608         }
609         return ltdb_index_dn_simple(module, tree, index_list, list);
610 }
611
612
613 /*
614   list intersection
615   list = list & list2
616   relies on the lists being sorted
617 */
618 static int list_intersect(struct ldb_context *ldb,
619                           struct dn_list *list, const struct dn_list *list2)
620 {
621         struct dn_list *list3;
622         unsigned int i;
623
624         if (list->count == 0 || list2->count == 0) {
625                 /* 0 & X == 0 */
626                 return LDB_ERR_NO_SUCH_OBJECT;
627         }
628
629         list3 = talloc(ldb, struct dn_list);
630         if (list3 == NULL) {
631                 return LDB_ERR_OPERATIONS_ERROR;
632         }
633
634         list3->dn = talloc_array(list3, char *, list->count);
635         if (!list3->dn) {
636                 talloc_free(list3);
637                 return LDB_ERR_OPERATIONS_ERROR;
638         }
639         list3->count = 0;
640
641         for (i=0;i<list->count;i++) {
642                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
643                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
644                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
645                         list3->count++;
646                 } else {
647                         talloc_free(list->dn[i]);
648                 }
649         }
650
651         talloc_free(list->dn);
652         list->dn = talloc_move(list, &list3->dn);
653         list->count = list3->count;
654         talloc_free(list3);
655
656         return LDB_ERR_NO_SUCH_OBJECT;
657 }
658
659
660 /*
661   list union
662   list = list | list2
663   relies on the lists being sorted
664 */
665 static int list_union(struct ldb_context *ldb,
666                       struct dn_list *list, const struct dn_list *list2)
667 {
668         unsigned int i;
669         char **d;
670         unsigned int count = list->count;
671
672         if (list->count == 0 && list2->count == 0) {
673                 /* 0 | 0 == 0 */
674                 return LDB_ERR_NO_SUCH_OBJECT;
675         }
676
677         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
678         if (!d) {
679                 return LDB_ERR_OPERATIONS_ERROR;
680         }
681         list->dn = d;
682
683         for (i=0;i<list2->count;i++) {
684                 if (ldb_list_find(list2->dn[i], list->dn, count,
685                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
686                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
687                         if (!list->dn[list->count]) {
688                                 return LDB_ERR_OPERATIONS_ERROR;
689                         }
690                         list->count++;
691                 }
692         }
693
694         if (list->count != count) {
695                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
696         }
697
698         return LDB_ERR_NO_SUCH_OBJECT;
699 }
700
701 static int ltdb_index_dn(struct ldb_module *module,
702                          const struct ldb_parse_tree *tree,
703                          const struct ldb_message *index_list,
704                          struct dn_list *list);
705
706
707 /*
708   OR two index results
709  */
710 static int ltdb_index_dn_or(struct ldb_module *module,
711                             const struct ldb_parse_tree *tree,
712                             const struct ldb_message *index_list,
713                             struct dn_list *list)
714 {
715         struct ldb_context *ldb;
716         unsigned int i;
717         int ret;
718
719         ldb = ldb_module_get_ctx(module);
720
721         ret = LDB_ERR_OPERATIONS_ERROR;
722         list->dn = NULL;
723         list->count = 0;
724
725         for (i=0;i<tree->u.list.num_elements;i++) {
726                 struct dn_list *list2;
727                 int v;
728
729                 list2 = talloc(module, struct dn_list);
730                 if (list2 == NULL) {
731                         return LDB_ERR_OPERATIONS_ERROR;
732                 }
733
734                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
735
736                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
737                         /* 0 || X == X */
738                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
739                                 ret = v;
740                         }
741                         talloc_free(list2);
742                         continue;
743                 }
744
745                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
746                         /* 1 || X == 1 */
747                         talloc_free(list->dn);
748                         talloc_free(list2);
749                         return v;
750                 }
751
752                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
753                         ret = LDB_SUCCESS;
754                         list->dn = talloc_move(list, &list2->dn);
755                         list->count = list2->count;
756                 } else {
757                         if (list_union(ldb, list, list2) == -1) {
758                                 talloc_free(list2);
759                                 return LDB_ERR_OPERATIONS_ERROR;
760                         }
761                         ret = LDB_SUCCESS;
762                 }
763                 talloc_free(list2);
764         }
765
766         if (list->count == 0) {
767                 return LDB_ERR_NO_SUCH_OBJECT;
768         }
769
770         return ret;
771 }
772
773
774 /*
775   NOT an index results
776  */
777 static int ltdb_index_dn_not(struct ldb_module *module,
778                              const struct ldb_parse_tree *tree,
779                              const struct ldb_message *index_list,
780                              struct dn_list *list)
781 {
782         /* the only way to do an indexed not would be if we could
783            negate the not via another not or if we knew the total
784            number of database elements so we could know that the
785            existing expression covered the whole database.
786
787            instead, we just give up, and rely on a full index scan
788            (unless an outer & manages to reduce the list)
789         */
790         return LDB_ERR_OPERATIONS_ERROR;
791 }
792
793
794 static bool ltdb_index_unique(struct ldb_context *ldb,
795                               const char *attr)
796 {
797         const struct ldb_schema_attribute *a;
798         a = ldb_schema_attribute_by_name(ldb, attr);
799         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
800                 return true;
801         }
802         return false;
803 }
804
805 /*
806   AND two index results
807  */
808 static int ltdb_index_dn_and(struct ldb_module *module,
809                              const struct ldb_parse_tree *tree,
810                              const struct ldb_message *index_list,
811                              struct dn_list *list)
812 {
813         struct ldb_context *ldb;
814         unsigned int i;
815         int ret, pass;
816
817         ldb = ldb_module_get_ctx(module);
818
819         ret = LDB_ERR_OPERATIONS_ERROR;
820         list->dn = NULL;
821         list->count = 0;
822
823         for (pass=0;pass<=1;pass++) {
824                 /* in the first pass we only look for unique simple
825                    equality tests, in the hope of avoiding having to look
826                    at any others */
827                 bool only_unique = pass==0?true:false;
828
829                 for (i=0;i<tree->u.list.num_elements;i++) {
830                         struct dn_list *list2;
831                         int v;
832                         bool is_unique = false;
833                         const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
834
835                         if (subtree->operation == LDB_OP_EQUALITY &&
836                             ltdb_index_unique(ldb, subtree->u.equality.attr)) {
837                                 is_unique = true;
838                         }
839                         if (is_unique != only_unique) continue;
840                         
841                         list2 = talloc(module, struct dn_list);
842                         if (list2 == NULL) {
843                                 return LDB_ERR_OPERATIONS_ERROR;
844                         }
845                         
846                         v = ltdb_index_dn(module, subtree, index_list, list2);
847
848                         if (v == LDB_ERR_NO_SUCH_OBJECT) {
849                                 /* 0 && X == 0 */
850                                 talloc_free(list->dn);
851                                 talloc_free(list2);
852                                 return LDB_ERR_NO_SUCH_OBJECT;
853                         }
854                         
855                         if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
856                                 talloc_free(list2);
857                                 continue;
858                         }
859                         
860                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
861                                 ret = LDB_SUCCESS;
862                                 talloc_free(list->dn);
863                                 list->dn = talloc_move(list, &list2->dn);
864                                 list->count = list2->count;
865                         } else {
866                                 if (list_intersect(ldb, list, list2) == -1) {
867                                         talloc_free(list2);
868                                         return LDB_ERR_OPERATIONS_ERROR;
869                                 }
870                         }
871                         
872                         talloc_free(list2);
873                         
874                         if (list->count == 0) {
875                                 talloc_free(list->dn);
876                                 return LDB_ERR_NO_SUCH_OBJECT;
877                         }
878                         
879                         if (list->count == 1) {
880                                 /* it isn't worth loading the next part of the tree */
881                                 return ret;
882                         }
883                 }
884         }       
885         return ret;
886 }
887         
888 /*
889   AND index results and ONE level special index
890  */
891 static int ltdb_index_dn_one(struct ldb_module *module,
892                              struct ldb_dn *parent_dn,
893                              struct dn_list *list)
894 {
895         struct ldb_context *ldb;
896         struct dn_list *list2;
897         struct ldb_message *msg;
898         struct ldb_dn *key;
899         struct ldb_val val;
900         unsigned int i, j;
901         int ret;
902
903         ldb = ldb_module_get_ctx(module);
904
905         list2 = talloc_zero(module, struct dn_list);
906         if (list2 == NULL) {
907                 return LDB_ERR_OPERATIONS_ERROR;
908         }
909
910         /* the attribute is indexed. Pull the list of DNs that match the
911            search criterion */
912         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
913         val.length = strlen((char *)val.data);
914         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
915         if (!key) {
916                 talloc_free(list2);
917                 return LDB_ERR_OPERATIONS_ERROR;
918         }
919
920         msg = talloc(list2, struct ldb_message);
921         if (msg == NULL) {
922                 talloc_free(list2);
923                 return LDB_ERR_OPERATIONS_ERROR;
924         }
925
926         ret = ltdb_search_dn1_index(module, key, msg);
927         talloc_free(key);
928         if (ret != LDB_SUCCESS) {
929                 return ret;
930         }
931
932         for (i = 0; i < msg->num_elements; i++) {
933                 struct ldb_message_element *el;
934
935                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
936                         continue;
937                 }
938
939                 el = &msg->elements[i];
940
941                 list2->dn = talloc_array(list2, char *, el->num_values);
942                 if (!list2->dn) {
943                         talloc_free(list2);
944                         return LDB_ERR_OPERATIONS_ERROR;
945                 }
946
947                 for (j = 0; j < el->num_values; j++) {
948                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
949                         if (!list2->dn[list2->count]) {
950                                 talloc_free(list2);
951                                 return LDB_ERR_OPERATIONS_ERROR;
952                         }
953                         list2->count++;
954                 }
955         }
956
957         if (list2->count == 0) {
958                 talloc_free(list2);
959                 return LDB_ERR_NO_SUCH_OBJECT;
960         }
961
962         if (list2->count > 1) {
963                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
964         }
965
966         if (list->count > 0) {
967                 if (list_intersect(ldb, list, list2) == -1) {
968                         talloc_free(list2);
969                         return LDB_ERR_OPERATIONS_ERROR;
970                 }
971
972                 if (list->count == 0) {
973                         talloc_free(list->dn);
974                         talloc_free(list2);
975                         return LDB_ERR_NO_SUCH_OBJECT;
976                 }
977         } else {
978                 list->dn = talloc_move(list, &list2->dn);
979                 list->count = list2->count;
980         }
981
982         talloc_free(list2);
983
984         return LDB_SUCCESS;
985 }
986
987 /*
988   return a list of dn's that might match a indexed search or
989   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
990  */
991 static int ltdb_index_dn(struct ldb_module *module,
992                          const struct ldb_parse_tree *tree,
993                          const struct ldb_message *index_list,
994                          struct dn_list *list)
995 {
996         int ret = LDB_ERR_OPERATIONS_ERROR;
997
998         switch (tree->operation) {
999         case LDB_OP_AND:
1000                 ret = ltdb_index_dn_and(module, tree, index_list, list);
1001                 break;
1002
1003         case LDB_OP_OR:
1004                 ret = ltdb_index_dn_or(module, tree, index_list, list);
1005                 break;
1006
1007         case LDB_OP_NOT:
1008                 ret = ltdb_index_dn_not(module, tree, index_list, list);
1009                 break;
1010
1011         case LDB_OP_EQUALITY:
1012                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
1013                 break;
1014
1015         case LDB_OP_SUBSTRING:
1016         case LDB_OP_GREATER:
1017         case LDB_OP_LESS:
1018         case LDB_OP_PRESENT:
1019         case LDB_OP_APPROX:
1020         case LDB_OP_EXTENDED:
1021                 /* we can't index with fancy bitops yet */
1022                 ret = LDB_ERR_OPERATIONS_ERROR;
1023                 break;
1024         }
1025
1026         return ret;
1027 }
1028
1029 /*
1030   filter a candidate dn_list from an indexed search into a set of results
1031   extracting just the given attributes
1032 */
1033 static int ltdb_index_filter(const struct dn_list *dn_list,
1034                              struct ltdb_context *ac, 
1035                              uint32_t *match_count)
1036 {
1037         struct ldb_context *ldb;
1038         struct ldb_message *msg;
1039         unsigned int i;
1040
1041         ldb = ldb_module_get_ctx(ac->module);
1042
1043         for (i = 0; i < dn_list->count; i++) {
1044                 struct ldb_dn *dn;
1045                 int ret;
1046
1047                 msg = ldb_msg_new(ac);
1048                 if (!msg) {
1049                         return LDB_ERR_OPERATIONS_ERROR;
1050                 }
1051
1052                 dn = ldb_dn_new(msg, ldb, dn_list->dn[i]);
1053                 if (dn == NULL) {
1054                         talloc_free(msg);
1055                         return LDB_ERR_OPERATIONS_ERROR;
1056                 }
1057
1058                 ret = ltdb_search_dn1(ac->module, dn, msg);
1059                 talloc_free(dn);
1060                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1061                         /* the record has disappeared? yes, this can happen */
1062                         talloc_free(msg);
1063                         continue;
1064                 }
1065
1066                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1067                         /* an internal error */
1068                         talloc_free(msg);
1069                         return LDB_ERR_OPERATIONS_ERROR;
1070                 }
1071
1072                 if (!ldb_match_msg(ldb, msg,
1073                                    ac->tree, ac->base, ac->scope)) {
1074                         talloc_free(msg);
1075                         continue;
1076                 }
1077
1078                 /* filter the attributes that the user wants */
1079                 ret = ltdb_filter_attrs(msg, ac->attrs);
1080
1081                 if (ret == -1) {
1082                         talloc_free(msg);
1083                         return LDB_ERR_OPERATIONS_ERROR;
1084                 }
1085
1086                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1087                 if (ret != LDB_SUCCESS) {
1088                         ac->request_terminated = true;
1089                         return ret;
1090                 }
1091
1092                 (*match_count)++;
1093         }
1094
1095         return LDB_SUCCESS;
1096 }
1097
1098 /*
1099   search the database with a LDAP-like expression using indexes
1100   returns -1 if an indexed search is not possible, in which
1101   case the caller should call ltdb_search_full()
1102 */
1103 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1104 {
1105         struct ldb_context *ldb;
1106         void *data = ldb_module_get_private(ac->module);
1107         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1108         struct dn_list *dn_list;
1109         int ret, idxattr, idxone;
1110
1111         ldb = ldb_module_get_ctx(ac->module);
1112
1113         idxattr = idxone = 0;
1114         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1115         if (ret == 0 ) {
1116                 idxattr = 1;
1117         }
1118
1119         /* We do one level indexing only if requested */
1120         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1121         if (ret == 0 ) {
1122                 idxone = 1;
1123         }
1124
1125         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1126             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1127                 /* no indexes? must do full search */
1128                 return LDB_ERR_OPERATIONS_ERROR;
1129         }
1130
1131         ret = LDB_ERR_OPERATIONS_ERROR;
1132
1133         dn_list = talloc_zero(ac, struct dn_list);
1134         if (dn_list == NULL) {
1135                 return LDB_ERR_OPERATIONS_ERROR;
1136         }
1137
1138         if (ac->scope == LDB_SCOPE_BASE) {
1139                 /* with BASE searches only one DN can match */
1140                 dn_list->dn = talloc_array(dn_list, char *, 1);
1141                 if (dn_list->dn == NULL) {
1142                         ldb_oom(ldb);
1143                         return LDB_ERR_OPERATIONS_ERROR;
1144                 }
1145                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1146                 if (dn_list->dn[0] == NULL) {
1147                         ldb_oom(ldb);
1148                         return LDB_ERR_OPERATIONS_ERROR;
1149                 }
1150                 dn_list->count = 1;
1151                 ret = LDB_SUCCESS;
1152         }
1153
1154         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1155                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1156         }
1157
1158         if (ret == LDB_ERR_OPERATIONS_ERROR &&
1159             ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1160                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1161         }
1162
1163         if (ret == LDB_SUCCESS) {
1164                 /* we've got a candidate list - now filter by the full tree
1165                    and extract the needed attributes */
1166                 ret = ltdb_index_filter(dn_list, ac, match_count);
1167         }
1168
1169         talloc_free(dn_list);
1170
1171         return ret;
1172 }
1173
1174 /*
1175   add a index element where this is the first indexed DN for this value
1176 */
1177 static int ltdb_index_add1_new(struct ldb_context *ldb,
1178                                struct ldb_message *msg,
1179                                const char *dn)
1180 {
1181         struct ldb_message_element *el;
1182
1183         /* add another entry */
1184         el = talloc_realloc(msg, msg->elements,
1185                                struct ldb_message_element, msg->num_elements+1);
1186         if (!el) {
1187                 return LDB_ERR_OPERATIONS_ERROR;
1188         }
1189
1190         msg->elements = el;
1191         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1192         if (!msg->elements[msg->num_elements].name) {
1193                 return LDB_ERR_OPERATIONS_ERROR;
1194         }
1195         msg->elements[msg->num_elements].num_values = 0;
1196         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1197         if (!msg->elements[msg->num_elements].values) {
1198                 return LDB_ERR_OPERATIONS_ERROR;
1199         }
1200         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1201         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1202         msg->elements[msg->num_elements].num_values = 1;
1203         msg->num_elements++;
1204
1205         return LDB_SUCCESS;
1206 }
1207
1208
1209 /*
1210   add a index element where this is not the first indexed DN for this
1211   value
1212 */
1213 static int ltdb_index_add1_add(struct ldb_context *ldb,
1214                                struct ldb_message *msg,
1215                                int idx,
1216                                const char *dn,
1217                                const struct ldb_schema_attribute *a)
1218 {
1219         struct ldb_val *v2;
1220         unsigned int i;
1221
1222         /* for multi-valued attributes we can end up with repeats */
1223         for (i=0;i<msg->elements[idx].num_values;i++) {
1224                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1225                         return LDB_SUCCESS;
1226                 }
1227         }
1228
1229         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1230                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
1231         }
1232
1233         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1234                               struct ldb_val,
1235                               msg->elements[idx].num_values+1);
1236         if (!v2) {
1237                 return LDB_ERR_OPERATIONS_ERROR;
1238         }
1239         msg->elements[idx].values = v2;
1240
1241         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1242         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1243         msg->elements[idx].num_values++;
1244
1245         return LDB_SUCCESS;
1246 }
1247
1248 /*
1249   add an index entry for one message element
1250 */
1251 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1252                            struct ldb_message_element *el, int v_idx)
1253 {
1254         struct ldb_context *ldb;
1255         struct ldb_message *msg;
1256         struct ldb_dn *dn_key;
1257         int ret;
1258         unsigned int i;
1259         const struct ldb_schema_attribute *a;
1260
1261         ldb = ldb_module_get_ctx(module);
1262
1263         msg = talloc(module, struct ldb_message);
1264         if (msg == NULL) {
1265                 errno = ENOMEM;
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1270         if (!dn_key) {
1271                 talloc_free(msg);
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274         talloc_steal(msg, dn_key);
1275
1276         ret = ltdb_search_dn1_index(module, dn_key, msg);
1277         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1278                 talloc_free(msg);
1279                 return ret;
1280         }
1281
1282         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1283                 msg->dn = dn_key;
1284                 msg->num_elements = 0;
1285                 msg->elements = NULL;
1286         }
1287
1288         for (i=0;i<msg->num_elements;i++) {
1289                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1290                         break;
1291                 }
1292         }
1293
1294         if (i == msg->num_elements) {
1295                 ret = ltdb_index_add1_new(ldb, msg, dn);
1296         } else {
1297                 ret = ltdb_index_add1_add(ldb, msg, i, dn, a);
1298         }
1299
1300         if (ret == LDB_SUCCESS) {
1301                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1302         }
1303
1304         talloc_free(msg);
1305
1306         return ret;
1307 }
1308
1309 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1310                            struct ldb_message_element *elements, int num_el)
1311 {
1312         void *data = ldb_module_get_private(module);
1313         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1314         int ret;
1315         unsigned int i, j;
1316
1317         if (dn[0] == '@') {
1318                 return LDB_SUCCESS;
1319         }
1320
1321         if (ltdb->cache->indexlist->num_elements == 0) {
1322                 /* no indexed fields */
1323                 return LDB_SUCCESS;
1324         }
1325
1326         for (i = 0; i < num_el; i++) {
1327                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1328                                        NULL, LTDB_IDXATTR);
1329                 if (ret == -1) {
1330                         continue;
1331                 }
1332                 for (j = 0; j < elements[i].num_values; j++) {
1333                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1334                         if (ret != LDB_SUCCESS) {
1335                                 return ret;
1336                         }
1337                 }
1338         }
1339
1340         return LDB_SUCCESS;
1341 }
1342
1343 /*
1344   add the index entries for a new record
1345 */
1346 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1347 {
1348         const char *dn;
1349         int ret;
1350
1351         dn = ldb_dn_get_linearized(msg->dn);
1352         if (dn == NULL) {
1353                 return LDB_ERR_OPERATIONS_ERROR;
1354         }
1355
1356         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1357
1358         return ret;
1359 }
1360
1361
1362 /*
1363   delete an index entry for one message element
1364 */
1365 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1366                          struct ldb_message_element *el, int v_idx)
1367 {
1368         struct ldb_context *ldb;
1369         struct ldb_message *msg;
1370         struct ldb_dn *dn_key;
1371         int ret, i;
1372         unsigned int j;
1373
1374         ldb = ldb_module_get_ctx(module);
1375
1376         if (dn[0] == '@') {
1377                 return LDB_SUCCESS;
1378         }
1379
1380         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1381         if (!dn_key) {
1382                 return LDB_ERR_OPERATIONS_ERROR;
1383         }
1384
1385         msg = talloc(dn_key, struct ldb_message);
1386         if (msg == NULL) {
1387                 talloc_free(dn_key);
1388                 return LDB_ERR_OPERATIONS_ERROR;
1389         }
1390
1391         ret = ltdb_search_dn1_index(module, dn_key, msg);
1392         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1393                 talloc_free(dn_key);
1394                 return ret;
1395         }
1396
1397         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1398                 /* it wasn't indexed. Did we have an earlier error? If we did then
1399                    its gone now */
1400                 talloc_free(dn_key);
1401                 return LDB_SUCCESS;
1402         }
1403
1404         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1405         if (i == -1) {
1406                 struct ldb_ldif ldif;
1407                 char *ldif_string;
1408                 ldif.changetype = LDB_CHANGETYPE_NONE;
1409                 ldif.msg = msg;
1410                 ldif_string = ldb_ldif_write_string(ldb, NULL, &ldif);
1411                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1412                           "ERROR: dn %s not found in %s", dn,
1413                           ldif_string);
1414                 talloc_free(ldif_string);
1415                 /* it ain't there. hmmm */
1416                 talloc_free(dn_key);
1417                 return LDB_SUCCESS;
1418         }
1419
1420         if (j != msg->elements[i].num_values - 1) {
1421                 memmove(&msg->elements[i].values[j],
1422                         &msg->elements[i].values[j+1],
1423                         (msg->elements[i].num_values-(j+1)) *
1424                         sizeof(msg->elements[i].values[0]));
1425         }
1426         msg->elements[i].num_values--;
1427
1428         if (msg->elements[i].num_values == 0) {
1429                 ret = ltdb_delete_noindex(module, dn_key);
1430         } else {
1431                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1432         }
1433
1434         talloc_free(dn_key);
1435
1436         return ret;
1437 }
1438
1439 /*
1440   delete the index entries for a record
1441   return -1 on failure
1442 */
1443 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1444 {
1445         void *data = ldb_module_get_private(module);
1446         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1447         int ret;
1448         const char *dn;
1449         unsigned int i, j;
1450
1451         /* find the list of indexed fields */
1452         if (ltdb->cache->indexlist->num_elements == 0) {
1453                 /* no indexed fields */
1454                 return LDB_SUCCESS;
1455         }
1456
1457         if (ldb_dn_is_special(msg->dn)) {
1458                 return LDB_SUCCESS;
1459         }
1460
1461         dn = ldb_dn_get_linearized(msg->dn);
1462         if (dn == NULL) {
1463                 return LDB_ERR_OPERATIONS_ERROR;
1464         }
1465
1466         for (i = 0; i < msg->num_elements; i++) {
1467                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1468                                        NULL, LTDB_IDXATTR);
1469                 if (ret == -1) {
1470                         continue;
1471                 }
1472                 for (j = 0; j < msg->elements[i].num_values; j++) {
1473                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1474                         if (ret != LDB_SUCCESS) {
1475                                 return ret;
1476                         }
1477                 }
1478         }
1479
1480         return LDB_SUCCESS;
1481 }
1482
1483 /*
1484   handle special index for one level searches
1485 */
1486 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1487 {
1488         void *data = ldb_module_get_private(module);
1489         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1490         struct ldb_message_element el;
1491         struct ldb_val val;
1492         struct ldb_dn *pdn;
1493         const char *dn;
1494         int ret;
1495
1496         if (ldb_dn_is_special(msg->dn)) {
1497                 return LDB_SUCCESS;
1498         }
1499
1500         /* We index for ONE Level only if requested */
1501         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1502         if (ret != 0) {
1503                 return LDB_SUCCESS;
1504         }
1505
1506         pdn = ldb_dn_get_parent(module, msg->dn);
1507         if (pdn == NULL) {
1508                 return LDB_ERR_OPERATIONS_ERROR;
1509         }
1510
1511         dn = ldb_dn_get_linearized(msg->dn);
1512         if (dn == NULL) {
1513                 talloc_free(pdn);
1514                 return LDB_ERR_OPERATIONS_ERROR;
1515         }
1516
1517         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1518         if (val.data == NULL) {
1519                 talloc_free(pdn);
1520                 return LDB_ERR_OPERATIONS_ERROR;
1521         }
1522
1523         val.length = strlen((char *)val.data);
1524         el.name = LTDB_IDXONE;
1525         el.values = &val;
1526         el.num_values = 1;
1527
1528         if (add) {
1529                 ret = ltdb_index_add1(module, dn, &el, 0);
1530         } else { /* delete */
1531                 ret = ltdb_index_del_value(module, dn, &el, 0);
1532         }
1533
1534         talloc_free(pdn);
1535
1536         return ret;
1537 }
1538
1539
1540 /*
1541   traversal function that deletes all @INDEX records
1542 */
1543 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1544 {
1545         const char *dn = "DN=" LTDB_INDEX ":";
1546         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1547                 return tdb_delete(tdb, key);
1548         }
1549         return 0;
1550 }
1551
1552 /*
1553   traversal function that adds @INDEX records during a re index
1554 */
1555 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1556 {
1557         struct ldb_context *ldb;
1558         struct ldb_module *module = (struct ldb_module *)state;
1559         struct ldb_message *msg;
1560         const char *dn = NULL;
1561         int ret;
1562         TDB_DATA key2;
1563
1564         ldb = ldb_module_get_ctx(module);
1565
1566         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1567             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1568                 return 0;
1569         }
1570
1571         msg = talloc(module, struct ldb_message);
1572         if (msg == NULL) {
1573                 return -1;
1574         }
1575
1576         ret = ltdb_unpack_data(module, &data, msg);
1577         if (ret != 0) {
1578                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1579                           ldb_dn_get_linearized(msg->dn));
1580                 talloc_free(msg);
1581                 return -1;
1582         }
1583
1584         /* check if the DN key has changed, perhaps due to the
1585            case insensitivity of an element changing */
1586         key2 = ltdb_key(module, msg->dn);
1587         if (key2.dptr == NULL) {
1588                 /* probably a corrupt record ... darn */
1589                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1590                                                         ldb_dn_get_linearized(msg->dn));
1591                 talloc_free(msg);
1592                 return 0;
1593         }
1594         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1595                 tdb_delete(tdb, key);
1596                 tdb_store(tdb, key2, data, 0);
1597         }
1598         talloc_free(key2.dptr);
1599
1600         if (msg->dn == NULL) {
1601                 dn = (char *)key.dptr + 3;
1602         } else {
1603                 dn = ldb_dn_get_linearized(msg->dn);
1604         }
1605
1606         ret = ltdb_index_one(module, msg, 1);
1607         if (ret == LDB_SUCCESS) {
1608                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1609         } else {
1610                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1611                         "Adding special ONE LEVEL index failed (%s)!",
1612                         ldb_dn_get_linearized(msg->dn));
1613         }
1614
1615         talloc_free(msg);
1616
1617         if (ret != LDB_SUCCESS) return -1;
1618
1619         return 0;
1620 }
1621
1622 /*
1623   force a complete reindex of the database
1624 */
1625 int ltdb_reindex(struct ldb_module *module)
1626 {
1627         void *data = ldb_module_get_private(module);
1628         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1629         int ret;
1630
1631         if (ltdb_cache_reload(module) != 0) {
1632                 return LDB_ERR_OPERATIONS_ERROR;
1633         }
1634
1635         /* first traverse the database deleting any @INDEX records */
1636         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1637         if (ret == -1) {
1638                 return LDB_ERR_OPERATIONS_ERROR;
1639         }
1640
1641         /* if we don't have indexes we have nothing todo */
1642         if (ltdb->cache->indexlist->num_elements == 0) {
1643                 return LDB_SUCCESS;
1644         }
1645
1646         /* now traverse adding any indexes for normal LDB records */
1647         ret = tdb_traverse(ltdb->tdb, re_index, module);
1648         if (ret == -1) {
1649                 return LDB_ERR_OPERATIONS_ERROR;
1650         }
1651
1652         if (ltdb->idxptr) {
1653                 ltdb->idxptr->repack = true;
1654         }
1655
1656         return LDB_SUCCESS;
1657 }