c4c23022f811a57a39fcaf8f37ec5b1a8a53d34b
[metze/samba/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_includes.h"
35
36 #include "ldb_tdb.h"
37
38 /*
39   the idxptr code is a bit unusual. The way it works is to replace
40   @IDX elements in records during a transaction with @IDXPTR
41   elements. The @IDXPTR elements don't contain the actual index entry
42   values, but contain a pointer to a linked list of values.
43
44   This means we are storing pointers in a database, which is normally
45   not allowed, but in this case we are storing them only for the
46   duration of a transaction, and re-writing them into the normal @IDX
47   format at the end of the transaction. That means no other processes
48   are ever exposed to the @IDXPTR values.
49
50   The advantage is that the linked list doesn't cause huge
51   fragmentation during a transaction. Without the @IDXPTR method we
52   often ended up with a ldb that was between 10x and 100x larger then
53   it needs to be due to massive fragmentation caused by re-writing
54   @INDEX records many times during indexing.
55  */
56 struct ldb_index_pointer {
57         struct ldb_index_pointer *next, *prev;
58         struct ldb_val value;
59 };
60
61 struct ltdb_idxptr {
62         int num_dns;
63         const char **dn_list;
64         bool repack;
65 };
66
67 /*
68   add to the list of DNs that need to be fixed on transaction end
69  */
70 static int ltdb_idxptr_add(struct ldb_module *module, const struct ldb_message *msg)
71 {
72         struct ltdb_private *ltdb =
73                 talloc_get_type(module->private_data, struct ltdb_private);
74         ltdb->idxptr->dn_list = talloc_realloc(ltdb->idxptr, ltdb->idxptr->dn_list, 
75                                                const char *, ltdb->idxptr->num_dns+1);
76         if (ltdb->idxptr->dn_list == NULL) {
77                 ltdb->idxptr->num_dns = 0;
78                 return LDB_ERR_OPERATIONS_ERROR;
79         }
80         ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] =
81                 talloc_strdup(ltdb->idxptr->dn_list, ldb_dn_get_linearized(msg->dn));
82         if (ltdb->idxptr->dn_list[ltdb->idxptr->num_dns] == NULL) {
83                 return LDB_ERR_OPERATIONS_ERROR;
84         }
85         ltdb->idxptr->num_dns++;
86         return LDB_SUCCESS;
87 }
88
89 /* free an idxptr record */
90 static int ltdb_free_idxptr(struct ldb_module *module, struct ldb_message_element *el)
91 {
92         struct ldb_val val;
93         struct ldb_index_pointer *ptr;
94
95         if (el->num_values != 1) {
96                 return LDB_ERR_OPERATIONS_ERROR;
97         }
98
99         val = el->values[0];
100         if (val.length != sizeof(void *)) {
101                 return LDB_ERR_OPERATIONS_ERROR;
102         }
103
104         ptr = *(struct ldb_index_pointer **)val.data;
105         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
106                 return LDB_ERR_OPERATIONS_ERROR;
107         }
108
109         while (ptr) {
110                 struct ldb_index_pointer *tmp = ptr;
111                 DLIST_REMOVE(ptr, ptr);
112                 talloc_free(tmp);
113         }
114
115         return LDB_SUCCESS;
116 }
117
118
119 /* convert from the IDXPTR format to a ldb_message_element format */
120 static int ltdb_convert_from_idxptr(struct ldb_module *module, struct ldb_message_element *el)
121 {
122         struct ldb_val val;
123         struct ldb_index_pointer *ptr, *tmp;
124         int i;
125         struct ldb_val *val2;
126
127         if (el->num_values != 1) {
128                 return LDB_ERR_OPERATIONS_ERROR;
129         }
130
131         val = el->values[0];
132         if (val.length != sizeof(void *)) {
133                 return LDB_ERR_OPERATIONS_ERROR;
134         }
135
136         ptr = *(struct ldb_index_pointer **)val.data;
137         if (talloc_get_type(ptr, struct ldb_index_pointer) != ptr) {
138                 return LDB_ERR_OPERATIONS_ERROR;
139         }
140
141         /* count the length of the list */
142         for (i=0, tmp = ptr; tmp; tmp=tmp->next) {
143                 i++;
144         }
145
146         /* allocate the new values array */
147         val2 = talloc_realloc(NULL, el->values, struct ldb_val, i);
148         if (val2 == NULL) {
149                 return LDB_ERR_OPERATIONS_ERROR;
150         }
151         el->values = val2;
152         el->num_values = i;
153
154         /* populate the values array */
155         for (i=0, tmp = ptr; tmp; tmp=tmp->next, i++) {
156                 el->values[i].length = tmp->value.length;
157                 /* we need to over-allocate here as there are still some places
158                    in ldb that rely on null termination. */
159                 el->values[i].data = talloc_size(el->values, tmp->value.length+1);
160                 if (el->values[i].data == NULL) {
161                         return LDB_ERR_OPERATIONS_ERROR;
162                 }
163                 memcpy(el->values[i].data, tmp->value.data, tmp->value.length);
164                 el->values[i].data[tmp->value.length] = 0;
165         }
166
167         /* update the name */
168         el->name = LTDB_IDX;
169
170         return LDB_SUCCESS;
171 }
172
173
174 /* convert to the IDXPTR format from a ldb_message_element format */
175 static int ltdb_convert_to_idxptr(struct ldb_module *module, struct ldb_message_element *el)
176 {
177         struct ldb_index_pointer *ptr, *tmp;
178         int i;
179         struct ldb_val *val2;
180         struct ltdb_private *ltdb =
181                 talloc_get_type(module->private_data, struct ltdb_private);
182
183         ptr = NULL;
184
185         for (i=0;i<el->num_values;i++) {
186                 tmp = talloc(ltdb->idxptr, struct ldb_index_pointer);
187                 if (tmp == NULL) {
188                         return LDB_ERR_OPERATIONS_ERROR;
189                 }
190                 tmp->value = el->values[i];
191                 tmp->value.data = talloc_memdup(tmp, tmp->value.data, tmp->value.length);
192                 if (tmp->value.data == NULL) {
193                         return LDB_ERR_OPERATIONS_ERROR;
194                 }
195                 DLIST_ADD(ptr, tmp);
196         }
197
198         /* allocate the new values array */
199         val2 = talloc_realloc(NULL, el->values, struct ldb_val, 1);
200         if (val2 == NULL) {
201                 return LDB_ERR_OPERATIONS_ERROR;
202         }
203         el->values = val2;
204         el->num_values = 1;
205
206         el->values[0].data = talloc_memdup(el->values, &ptr, sizeof(ptr));
207         el->values[0].length = sizeof(ptr);
208
209         /* update the name */
210         el->name = LTDB_IDXPTR;
211
212         return LDB_SUCCESS;
213 }
214
215
216 /* enable the idxptr mode when transactions start */
217 int ltdb_index_transaction_start(struct ldb_module *module)
218 {
219         struct ltdb_private *ltdb =
220                 talloc_get_type(module->private_data, struct ltdb_private);
221         ltdb->idxptr = talloc_zero(module, struct ltdb_idxptr);
222         return LDB_SUCCESS;
223 }
224
225 /*
226   a wrapper around ltdb_search_dn1() which translates pointer based index records
227   and maps them into normal ldb message structures
228  */
229 static int ltdb_search_dn1_index(struct ldb_module *module,
230                                 struct ldb_dn *dn, struct ldb_message *msg)
231 {
232         int ret, i;
233         ret = ltdb_search_dn1(module, dn, msg);
234         if (ret != LDB_SUCCESS) {
235                 return ret;
236         }
237
238         /* if this isn't a @INDEX record then don't munge it */
239         if (strncmp(ldb_dn_get_linearized(msg->dn), LTDB_INDEX ":", strlen(LTDB_INDEX) + 1) != 0) {
240                 return LDB_ERR_OPERATIONS_ERROR;
241         }
242
243         for (i=0;i<msg->num_elements;i++) {
244                 struct ldb_message_element *el = &msg->elements[i];
245                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
246                         ret = ltdb_convert_from_idxptr(module, el);
247                         if (ret != LDB_SUCCESS) {
248                                 return ret;
249                         }
250                 }
251         }
252
253         return ret;
254 }
255
256
257
258 /*
259   fixup the idxptr for one DN
260  */
261 static int ltdb_idxptr_fix_dn(struct ldb_module *module, const char *strdn)
262 {
263         struct ldb_dn *dn;
264         struct ldb_message *msg = ldb_msg_new(module);
265         int ret;
266
267         dn = ldb_dn_new(msg, module->ldb, strdn);
268         if (ltdb_search_dn1_index(module, dn, msg) == LDB_SUCCESS) {
269                 ret = ltdb_store(module, msg, TDB_REPLACE);
270         }
271         talloc_free(msg);
272         return ret;
273 }
274
275 /* cleanup the idxptr mode when transaction commits */
276 int ltdb_index_transaction_commit(struct ldb_module *module)
277 {
278         int i;
279         struct ltdb_private *ltdb =
280                 talloc_get_type(module->private_data, struct ltdb_private);
281
282         /* fix all the DNs that we have modified */
283         if (ltdb->idxptr) {
284                 for (i=0;i<ltdb->idxptr->num_dns;i++) {
285                         ltdb_idxptr_fix_dn(module, ltdb->idxptr->dn_list[i]);
286                 }
287
288                 if (ltdb->idxptr->repack) {
289                         tdb_repack(ltdb->tdb);
290                 }
291         }
292
293         talloc_free(ltdb->idxptr);
294         ltdb->idxptr = NULL;
295         return LDB_SUCCESS;
296 }
297
298 /* cleanup the idxptr mode when transaction cancels */
299 int ltdb_index_transaction_cancel(struct ldb_module *module)
300 {
301         struct ltdb_private *ltdb =
302                 talloc_get_type(module->private_data, struct ltdb_private);
303         talloc_free(ltdb->idxptr);
304         ltdb->idxptr = NULL;
305         return LDB_SUCCESS;
306 }
307
308
309
310 /* a wrapper around ltdb_store() for the index code which
311    stores in IDXPTR format when idxptr mode is enabled
312
313    WARNING: This modifies the msg which is passed in
314 */
315 int ltdb_store_idxptr(struct ldb_module *module, const struct ldb_message *msg, int flgs)
316 {
317         struct ltdb_private *ltdb =
318                 talloc_get_type(module->private_data, struct ltdb_private);
319         int ret;
320
321         if (ltdb->idxptr) {
322                 int i;
323                 struct ldb_message *msg2 = ldb_msg_new(module);
324
325                 /* free any old pointer */
326                 ret = ltdb_search_dn1(module, msg->dn, msg2);
327                 if (ret == 0) {
328                         for (i=0;i<msg2->num_elements;i++) {
329                                 struct ldb_message_element *el = &msg2->elements[i];
330                                 if (strcmp(el->name, LTDB_IDXPTR) == 0) {
331                                         ret = ltdb_free_idxptr(module, el);
332                                         if (ret != LDB_SUCCESS) {
333                                                 return ret;
334                                         }
335                                 }
336                         }
337                 }
338                 talloc_free(msg2);
339
340                 for (i=0;i<msg->num_elements;i++) {
341                         struct ldb_message_element *el = &msg->elements[i];
342                         if (strcmp(el->name, LTDB_IDX) == 0) {
343                                 ret = ltdb_convert_to_idxptr(module, el);
344                                 if (ret != LDB_SUCCESS) {
345                                         return ret;
346                                 }
347                         }
348                 }
349
350                 if (ltdb_idxptr_add(module, msg) != 0) {
351                         return LDB_ERR_OPERATIONS_ERROR;
352                 }
353         }
354
355         ret = ltdb_store(module, msg, flgs);
356         return ret;
357 }
358
359
360 /*
361   find an element in a list, using the given comparison function and
362   assuming that the list is already sorted using comp_fn
363
364   return -1 if not found, or the index of the first occurance of needle if found
365 */
366 static int ldb_list_find(const void *needle,
367                          const void *base, size_t nmemb, size_t size,
368                          comparison_fn_t comp_fn)
369 {
370         const char *base_p = (const char *)base;
371         size_t min_i, max_i, test_i;
372
373         if (nmemb == 0) {
374                 return -1;
375         }
376
377         min_i = 0;
378         max_i = nmemb-1;
379
380         while (min_i < max_i) {
381                 int r;
382
383                 test_i = (min_i + max_i) / 2;
384                 /* the following cast looks strange, but is
385                  correct. The key to understanding it is that base_p
386                  is a pointer to an array of pointers, so we have to
387                  dereference it after casting to void **. The strange
388                  const in the middle gives us the right type of pointer
389                  after the dereference  (tridge) */
390                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
391                 if (r == 0) {
392                         /* scan back for first element */
393                         while (test_i > 0 &&
394                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
395                                 test_i--;
396                         }
397                         return test_i;
398                 }
399                 if (r < 0) {
400                         if (test_i == 0) {
401                                 return -1;
402                         }
403                         max_i = test_i - 1;
404                 }
405                 if (r > 0) {
406                         min_i = test_i + 1;
407                 }
408         }
409
410         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
411                 return min_i;
412         }
413
414         return -1;
415 }
416
417 struct dn_list {
418         unsigned int count;
419         char **dn;
420 };
421
422 /*
423   return the dn key to be used for an index
424   caller frees
425 */
426 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
427                                      const char *attr, const struct ldb_val *value)
428 {
429         struct ldb_dn *ret;
430         struct ldb_val v;
431         const struct ldb_schema_attribute *a;
432         char *attr_folded;
433         int r;
434
435         attr_folded = ldb_attr_casefold(ldb, attr);
436         if (!attr_folded) {
437                 return NULL;
438         }
439
440         a = ldb_schema_attribute_by_name(ldb, attr);
441         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
442         if (r != LDB_SUCCESS) {
443                 const char *errstr = ldb_errstring(ldb);
444                 /* canonicalisation can be refused. For example, 
445                    a attribute that takes wildcards will refuse to canonicalise
446                    if the value contains a wildcard */
447                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
448                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
449                 talloc_free(attr_folded);
450                 return NULL;
451         }
452         if (ldb_should_b64_encode(&v)) {
453                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
454                 if (!vstr) return NULL;
455                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
456                 talloc_free(vstr);
457         } else {
458                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
459         }
460
461         if (v.data != value->data) {
462                 talloc_free(v.data);
463         }
464         talloc_free(attr_folded);
465
466         return ret;
467 }
468
469 /*
470   see if a attribute value is in the list of indexed attributes
471 */
472 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
473                             unsigned int *v_idx, const char *key)
474 {
475         unsigned int i, j;
476         for (i=0;i<msg->num_elements;i++) {
477                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
478                         const struct ldb_message_element *el = &msg->elements[i];
479
480                         if (attr == NULL) {
481                                 /* in this case we are just looking to see if key is present,
482                                    we are not spearching for a specific index */
483                                 return 0;
484                         }
485
486                         for (j=0;j<el->num_values;j++) {
487                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
488                                         if (v_idx) {
489                                                 *v_idx = j;
490                                         }
491                                         return i;
492                                 }
493                         }
494                 }
495         }
496         return -1;
497 }
498
499 /* used in sorting dn lists */
500 static int list_cmp(const char **s1, const char **s2)
501 {
502         return strcmp(*s1, *s2);
503 }
504
505 /*
506   return a list of dn's that might match a simple indexed search or
507  */
508 static int ltdb_index_dn_simple(struct ldb_module *module,
509                                 const struct ldb_parse_tree *tree,
510                                 const struct ldb_message *index_list,
511                                 struct dn_list *list)
512 {
513         struct ldb_context *ldb = module->ldb;
514         struct ldb_dn *dn;
515         int ret;
516         unsigned int i, j;
517         struct ldb_message *msg;
518
519         list->count = 0;
520         list->dn = NULL;
521
522         /* if the attribute isn't in the list of indexed attributes then
523            this node needs a full search */
524         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
525                 return LDB_ERR_OPERATIONS_ERROR;
526         }
527
528         /* the attribute is indexed. Pull the list of DNs that match the 
529            search criterion */
530         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
531         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
532
533         msg = talloc(list, struct ldb_message);
534         if (msg == NULL) {
535                 return LDB_ERR_OPERATIONS_ERROR;
536         }
537
538         ret = ltdb_search_dn1_index(module, dn, msg);
539         talloc_free(dn);
540         if (ret != LDB_SUCCESS) {
541                 return ret;
542         }
543
544         for (i=0;i<msg->num_elements;i++) {
545                 struct ldb_message_element *el;
546
547                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
548                         continue;
549                 }
550
551                 el = &msg->elements[i];
552
553                 list->dn = talloc_array(list, char *, el->num_values);
554                 if (!list->dn) {
555                         talloc_free(msg);
556                         return LDB_ERR_OPERATIONS_ERROR;
557                 }
558
559                 for (j=0;j<el->num_values;j++) {
560                         list->dn[list->count] =
561                                 talloc_strdup(list->dn, (char *)el->values[j].data);
562                         if (!list->dn[list->count]) {
563                                 talloc_free(msg);
564                                 return LDB_ERR_OPERATIONS_ERROR;
565                         }
566                         list->count++;
567                 }
568         }
569
570         talloc_free(msg);
571
572         if (list->count > 1) {
573                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
574         }
575
576         return LDB_SUCCESS;
577 }
578
579
580 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
581
582 /*
583   return a list of dn's that might match a leaf indexed search
584  */
585 static int ltdb_index_dn_leaf(struct ldb_module *module,
586                               const struct ldb_parse_tree *tree,
587                               const struct ldb_message *index_list,
588                               struct dn_list *list)
589 {
590         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
591                 list->dn = talloc_array(list, char *, 1);
592                 if (list->dn == NULL) {
593                         ldb_oom(module->ldb);
594                         return LDB_ERR_OPERATIONS_ERROR;
595                 }
596                 list->dn[0] = talloc_strdup(list->dn, (char *)tree->u.equality.value.data);
597                 if (list->dn[0] == NULL) {
598                         ldb_oom(module->ldb);
599                         return LDB_ERR_OPERATIONS_ERROR;
600                 }
601                 list->count = 1;
602                 return LDB_SUCCESS;
603         }
604         return ltdb_index_dn_simple(module, tree, index_list, list);
605 }
606
607
608 /*
609   list intersection
610   list = list & list2
611   relies on the lists being sorted
612 */
613 static int list_intersect(struct ldb_context *ldb,
614                           struct dn_list *list, const struct dn_list *list2)
615 {
616         struct dn_list *list3;
617         unsigned int i;
618
619         if (list->count == 0 || list2->count == 0) {
620                 /* 0 & X == 0 */
621                 return LDB_ERR_NO_SUCH_OBJECT;
622         }
623
624         list3 = talloc(ldb, struct dn_list);
625         if (list3 == NULL) {
626                 return LDB_ERR_OPERATIONS_ERROR;
627         }
628
629         list3->dn = talloc_array(list3, char *, list->count);
630         if (!list3->dn) {
631                 talloc_free(list3);
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634         list3->count = 0;
635
636         for (i=0;i<list->count;i++) {
637                 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
638                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
639                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
640                         list3->count++;
641                 } else {
642                         talloc_free(list->dn[i]);
643                 }
644         }
645
646         talloc_free(list->dn);
647         list->dn = talloc_move(list, &list3->dn);
648         list->count = list3->count;
649         talloc_free(list3);
650
651         return LDB_ERR_NO_SUCH_OBJECT;
652 }
653
654
655 /*
656   list union
657   list = list | list2
658   relies on the lists being sorted
659 */
660 static int list_union(struct ldb_context *ldb,
661                       struct dn_list *list, const struct dn_list *list2)
662 {
663         unsigned int i;
664         char **d;
665         unsigned int count = list->count;
666
667         if (list->count == 0 && list2->count == 0) {
668                 /* 0 | 0 == 0 */
669                 return LDB_ERR_NO_SUCH_OBJECT;
670         }
671
672         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
673         if (!d) {
674                 return LDB_ERR_OPERATIONS_ERROR;
675         }
676         list->dn = d;
677
678         for (i=0;i<list2->count;i++) {
679                 if (ldb_list_find(list2->dn[i], list->dn, count,
680                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
681                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
682                         if (!list->dn[list->count]) {
683                                 return LDB_ERR_OPERATIONS_ERROR;
684                         }
685                         list->count++;
686                 }
687         }
688
689         if (list->count != count) {
690                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
691         }
692
693         return LDB_ERR_NO_SUCH_OBJECT;
694 }
695
696 static int ltdb_index_dn(struct ldb_module *module,
697                          const struct ldb_parse_tree *tree,
698                          const struct ldb_message *index_list,
699                          struct dn_list *list);
700
701
702 /*
703   OR two index results
704  */
705 static int ltdb_index_dn_or(struct ldb_module *module,
706                             const struct ldb_parse_tree *tree,
707                             const struct ldb_message *index_list,
708                             struct dn_list *list)
709 {
710         struct ldb_context *ldb = module->ldb;
711         unsigned int i;
712         int ret;
713
714         ret = LDB_ERR_OPERATIONS_ERROR;
715         list->dn = NULL;
716         list->count = 0;
717
718         for (i=0;i<tree->u.list.num_elements;i++) {
719                 struct dn_list *list2;
720                 int v;
721
722                 list2 = talloc(module, struct dn_list);
723                 if (list2 == NULL) {
724                         return LDB_ERR_OPERATIONS_ERROR;
725                 }
726
727                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
728
729                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
730                         /* 0 || X == X */
731                         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
732                                 ret = v;
733                         }
734                         talloc_free(list2);
735                         continue;
736                 }
737
738                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
739                         /* 1 || X == 1 */
740                         talloc_free(list->dn);
741                         talloc_free(list2);
742                         return v;
743                 }
744
745                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
746                         ret = LDB_SUCCESS;
747                         list->dn = talloc_move(list, &list2->dn);
748                         list->count = list2->count;
749                 } else {
750                         if (list_union(ldb, list, list2) == -1) {
751                                 talloc_free(list2);
752                                 return LDB_ERR_OPERATIONS_ERROR;
753                         }
754                         ret = LDB_SUCCESS;
755                 }
756                 talloc_free(list2);
757         }
758
759         if (list->count == 0) {
760                 return LDB_ERR_NO_SUCH_OBJECT;
761         }
762
763         return ret;
764 }
765
766
767 /*
768   NOT an index results
769  */
770 static int ltdb_index_dn_not(struct ldb_module *module,
771                              const struct ldb_parse_tree *tree,
772                              const struct ldb_message *index_list,
773                              struct dn_list *list)
774 {
775         /* the only way to do an indexed not would be if we could
776            negate the not via another not or if we knew the total
777            number of database elements so we could know that the
778            existing expression covered the whole database.
779
780            instead, we just give up, and rely on a full index scan
781            (unless an outer & manages to reduce the list)
782         */
783         return LDB_ERR_OPERATIONS_ERROR;
784 }
785
786 /*
787   AND two index results
788  */
789 static int ltdb_index_dn_and(struct ldb_module *module,
790                              const struct ldb_parse_tree *tree,
791                              const struct ldb_message *index_list,
792                              struct dn_list *list)
793 {
794         struct ldb_context *ldb = module->ldb;
795         unsigned int i;
796         int ret;
797
798         ret = LDB_ERR_OPERATIONS_ERROR;
799         list->dn = NULL;
800         list->count = 0;
801
802         for (i=0;i<tree->u.list.num_elements;i++) {
803                 struct dn_list *list2;
804                 int v;
805
806                 list2 = talloc(module, struct dn_list);
807                 if (list2 == NULL) {
808                         return LDB_ERR_OPERATIONS_ERROR;
809                 }
810
811                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
812
813                 if (v == LDB_ERR_NO_SUCH_OBJECT) {
814                         /* 0 && X == 0 */
815                         talloc_free(list->dn);
816                         talloc_free(list2);
817                         return LDB_ERR_NO_SUCH_OBJECT;
818                 }
819
820                 if (v != LDB_SUCCESS && v != LDB_ERR_NO_SUCH_OBJECT) {
821                         talloc_free(list2);
822                         continue;
823                 }
824
825                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
826                         ret = LDB_SUCCESS;
827                         talloc_free(list->dn);
828                         list->dn = talloc_move(list, &list2->dn);
829                         list->count = list2->count;
830                 } else {
831                         if (list_intersect(ldb, list, list2) == -1) {
832                                 talloc_free(list2);
833                                 return LDB_ERR_OPERATIONS_ERROR;
834                         }
835                 }
836
837                 talloc_free(list2);
838
839                 if (list->count == 0) {
840                         talloc_free(list->dn);
841                         return LDB_ERR_NO_SUCH_OBJECT;
842                 }
843         }
844
845         return ret;
846 }
847
848 /*
849   AND index results and ONE level special index
850  */
851 static int ltdb_index_dn_one(struct ldb_module *module,
852                              struct ldb_dn *parent_dn,
853                              struct dn_list *list)
854 {
855         struct ldb_context *ldb = module->ldb;
856         struct dn_list *list2;
857         struct ldb_message *msg;
858         struct ldb_dn *key;
859         struct ldb_val val;
860         unsigned int i, j;
861         int ret;
862
863         list2 = talloc_zero(module, struct dn_list);
864         if (list2 == NULL) {
865                 return LDB_ERR_OPERATIONS_ERROR;
866         }
867
868         /* the attribute is indexed. Pull the list of DNs that match the
869            search criterion */
870         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
871         val.length = strlen((char *)val.data);
872         key = ltdb_index_key(ldb, LTDB_IDXONE, &val);
873         if (!key) {
874                 talloc_free(list2);
875                 return LDB_ERR_OPERATIONS_ERROR;
876         }
877
878         msg = talloc(list2, struct ldb_message);
879         if (msg == NULL) {
880                 talloc_free(list2);
881                 return LDB_ERR_OPERATIONS_ERROR;
882         }
883
884         ret = ltdb_search_dn1_index(module, key, msg);
885         talloc_free(key);
886         if (ret != LDB_SUCCESS) {
887                 return ret;
888         }
889
890         for (i = 0; i < msg->num_elements; i++) {
891                 struct ldb_message_element *el;
892
893                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
894                         continue;
895                 }
896
897                 el = &msg->elements[i];
898
899                 list2->dn = talloc_array(list2, char *, el->num_values);
900                 if (!list2->dn) {
901                         talloc_free(list2);
902                         return LDB_ERR_OPERATIONS_ERROR;
903                 }
904
905                 for (j = 0; j < el->num_values; j++) {
906                         list2->dn[list2->count] = talloc_strdup(list2->dn, (char *)el->values[j].data);
907                         if (!list2->dn[list2->count]) {
908                                 talloc_free(list2);
909                                 return LDB_ERR_OPERATIONS_ERROR;
910                         }
911                         list2->count++;
912                 }
913         }
914
915         if (list2->count == 0) {
916                 talloc_free(list2);
917                 return LDB_ERR_NO_SUCH_OBJECT;
918         }
919
920         if (list2->count > 1) {
921                 qsort(list2->dn, list2->count, sizeof(char *), (comparison_fn_t) list_cmp);
922         }
923
924         if (list->count > 0) {
925                 if (list_intersect(ldb, list, list2) == -1) {
926                         talloc_free(list2);
927                         return LDB_ERR_OPERATIONS_ERROR;
928                 }
929
930                 if (list->count == 0) {
931                         talloc_free(list->dn);
932                         talloc_free(list2);
933                         return LDB_ERR_NO_SUCH_OBJECT;
934                 }
935         } else {
936                 list->dn = talloc_move(list, &list2->dn);
937                 list->count = list2->count;
938         }
939
940         talloc_free(list2);
941
942         return LDB_SUCCESS;
943 }
944
945 /*
946   return a list of dn's that might match a indexed search or
947   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
948  */
949 static int ltdb_index_dn(struct ldb_module *module,
950                          const struct ldb_parse_tree *tree,
951                          const struct ldb_message *index_list,
952                          struct dn_list *list)
953 {
954         int ret = LDB_ERR_OPERATIONS_ERROR;
955
956         switch (tree->operation) {
957         case LDB_OP_AND:
958                 ret = ltdb_index_dn_and(module, tree, index_list, list);
959                 break;
960
961         case LDB_OP_OR:
962                 ret = ltdb_index_dn_or(module, tree, index_list, list);
963                 break;
964
965         case LDB_OP_NOT:
966                 ret = ltdb_index_dn_not(module, tree, index_list, list);
967                 break;
968
969         case LDB_OP_EQUALITY:
970                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
971                 break;
972
973         case LDB_OP_SUBSTRING:
974         case LDB_OP_GREATER:
975         case LDB_OP_LESS:
976         case LDB_OP_PRESENT:
977         case LDB_OP_APPROX:
978         case LDB_OP_EXTENDED:
979                 /* we can't index with fancy bitops yet */
980                 ret = LDB_ERR_OPERATIONS_ERROR;
981                 break;
982         }
983
984         return ret;
985 }
986
987 /*
988   filter a candidate dn_list from an indexed search into a set of results
989   extracting just the given attributes
990 */
991 static int ltdb_index_filter(const struct dn_list *dn_list,
992                              struct ltdb_context *ac)
993 {
994         struct ldb_message *msg;
995         unsigned int i;
996
997         for (i = 0; i < dn_list->count; i++) {
998                 struct ldb_dn *dn;
999                 int ret;
1000
1001                 msg = ldb_msg_new(ac);
1002                 if (!msg) {
1003                         return LDB_ERR_OPERATIONS_ERROR;
1004                 }
1005
1006                 dn = ldb_dn_new(msg, ac->module->ldb, dn_list->dn[i]);
1007                 if (dn == NULL) {
1008                         talloc_free(msg);
1009                         return LDB_ERR_OPERATIONS_ERROR;
1010                 }
1011
1012                 ret = ltdb_search_dn1(ac->module, dn, msg);
1013                 talloc_free(dn);
1014                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1015                         /* the record has disappeared? yes, this can happen */
1016                         talloc_free(msg);
1017                         continue;
1018                 }
1019
1020                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1021                         /* an internal error */
1022                         talloc_free(msg);
1023                         return LDB_ERR_OPERATIONS_ERROR;
1024                 }
1025
1026                 if (!ldb_match_msg(ac->module->ldb, msg,
1027                                    ac->tree, ac->base, ac->scope)) {
1028                         talloc_free(msg);
1029                         continue;
1030                 }
1031
1032                 /* filter the attributes that the user wants */
1033                 ret = ltdb_filter_attrs(msg, ac->attrs);
1034
1035                 if (ret == -1) {
1036                         talloc_free(msg);
1037                         return LDB_ERR_OPERATIONS_ERROR;
1038                 }
1039
1040                 ret = ldb_module_send_entry(ac->req, msg, NULL);
1041                 if (ret != LDB_SUCCESS) {
1042                         ac->callback_failed = true;
1043                         return ret;
1044                 }
1045         }
1046
1047         return LDB_SUCCESS;
1048 }
1049
1050 /*
1051   search the database with a LDAP-like expression using indexes
1052   returns -1 if an indexed search is not possible, in which
1053   case the caller should call ltdb_search_full()
1054 */
1055 int ltdb_search_indexed(struct ltdb_context *ac)
1056 {
1057         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
1058         struct dn_list *dn_list;
1059         int ret, idxattr, idxone;
1060
1061         idxattr = idxone = 0;
1062         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXATTR);
1063         if (ret == 0 ) {
1064                 idxattr = 1;
1065         }
1066
1067         /* We do one level indexing only if requested */
1068         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1069         if (ret == 0 ) {
1070                 idxone = 1;
1071         }
1072
1073         if ((ac->scope == LDB_SCOPE_ONELEVEL && (idxattr+idxone == 0)) ||
1074             (ac->scope == LDB_SCOPE_SUBTREE && idxattr == 0)) {
1075                 /* no indexes? must do full search */
1076                 return LDB_ERR_OPERATIONS_ERROR;
1077         }
1078
1079         ret = LDB_ERR_OPERATIONS_ERROR;
1080
1081         dn_list = talloc_zero(ac, struct dn_list);
1082         if (dn_list == NULL) {
1083                 return LDB_ERR_OPERATIONS_ERROR;
1084         }
1085
1086         if (ac->scope == LDB_SCOPE_BASE) {
1087                 /* with BASE searches only one DN can match */
1088                 dn_list->dn = talloc_array(dn_list, char *, 1);
1089                 if (dn_list->dn == NULL) {
1090                         ldb_oom(ac->module->ldb);
1091                         return LDB_ERR_OPERATIONS_ERROR;
1092                 }
1093                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
1094                 if (dn_list->dn[0] == NULL) {
1095                         ldb_oom(ac->module->ldb);
1096                         return LDB_ERR_OPERATIONS_ERROR;
1097                 }
1098                 dn_list->count = 1;
1099                 ret = LDB_SUCCESS;
1100         }
1101
1102         if (ac->scope != LDB_SCOPE_BASE && idxattr == 1) {
1103                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1104
1105                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1106                         talloc_free(dn_list);
1107                         return ret;
1108                 }
1109         }
1110
1111         if (ac->scope == LDB_SCOPE_ONELEVEL && idxone == 1) {
1112                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1113         }
1114
1115         if (ret == LDB_SUCCESS) {
1116                 /* we've got a candidate list - now filter by the full tree
1117                    and extract the needed attributes */
1118                 ret = ltdb_index_filter(dn_list, ac);
1119         }
1120
1121         talloc_free(dn_list);
1122
1123         return ret;
1124 }
1125
1126 /*
1127   add a index element where this is the first indexed DN for this value
1128 */
1129 static int ltdb_index_add1_new(struct ldb_context *ldb,
1130                                struct ldb_message *msg,
1131                                const char *dn)
1132 {
1133         struct ldb_message_element *el;
1134
1135         /* add another entry */
1136         el = talloc_realloc(msg, msg->elements,
1137                                struct ldb_message_element, msg->num_elements+1);
1138         if (!el) {
1139                 return LDB_ERR_OPERATIONS_ERROR;
1140         }
1141
1142         msg->elements = el;
1143         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
1144         if (!msg->elements[msg->num_elements].name) {
1145                 return LDB_ERR_OPERATIONS_ERROR;
1146         }
1147         msg->elements[msg->num_elements].num_values = 0;
1148         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
1149         if (!msg->elements[msg->num_elements].values) {
1150                 return LDB_ERR_OPERATIONS_ERROR;
1151         }
1152         msg->elements[msg->num_elements].values[0].length = strlen(dn);
1153         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
1154         msg->elements[msg->num_elements].num_values = 1;
1155         msg->num_elements++;
1156
1157         return LDB_SUCCESS;
1158 }
1159
1160
1161 /*
1162   add a index element where this is not the first indexed DN for this
1163   value
1164 */
1165 static int ltdb_index_add1_add(struct ldb_context *ldb,
1166                                struct ldb_message *msg,
1167                                int idx,
1168                                const char *dn)
1169 {
1170         struct ldb_val *v2;
1171         unsigned int i;
1172
1173         /* for multi-valued attributes we can end up with repeats */
1174         for (i=0;i<msg->elements[idx].num_values;i++) {
1175                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
1176                         return LDB_SUCCESS;
1177                 }
1178         }
1179
1180         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
1181                               struct ldb_val,
1182                               msg->elements[idx].num_values+1);
1183         if (!v2) {
1184                 return LDB_ERR_OPERATIONS_ERROR;
1185         }
1186         msg->elements[idx].values = v2;
1187
1188         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
1189         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
1190         msg->elements[idx].num_values++;
1191
1192         return LDB_SUCCESS;
1193 }
1194
1195 /*
1196   add an index entry for one message element
1197 */
1198 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1199                            struct ldb_message_element *el, int v_idx)
1200 {
1201         struct ldb_context *ldb = module->ldb;
1202         struct ldb_message *msg;
1203         struct ldb_dn *dn_key;
1204         int ret;
1205         unsigned int i;
1206
1207         msg = talloc(module, struct ldb_message);
1208         if (msg == NULL) {
1209                 errno = ENOMEM;
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1214         if (!dn_key) {
1215                 talloc_free(msg);
1216                 return LDB_ERR_OPERATIONS_ERROR;
1217         }
1218         talloc_steal(msg, dn_key);
1219
1220         ret = ltdb_search_dn1_index(module, dn_key, msg);
1221         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1222                 talloc_free(msg);
1223                 return ret;
1224         }
1225
1226         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1227                 msg->dn = dn_key;
1228                 msg->num_elements = 0;
1229                 msg->elements = NULL;
1230         }
1231
1232         for (i=0;i<msg->num_elements;i++) {
1233                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
1234                         break;
1235                 }
1236         }
1237
1238         if (i == msg->num_elements) {
1239                 ret = ltdb_index_add1_new(ldb, msg, dn);
1240         } else {
1241                 ret = ltdb_index_add1_add(ldb, msg, i, dn);
1242         }
1243
1244         if (ret == LDB_SUCCESS) {
1245                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1246         }
1247
1248         talloc_free(msg);
1249
1250         return ret;
1251 }
1252
1253 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
1254                            struct ldb_message_element *elements, int num_el)
1255 {
1256         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1257         int ret;
1258         unsigned int i, j;
1259
1260         if (dn[0] == '@') {
1261                 return LDB_SUCCESS;
1262         }
1263
1264         if (ltdb->cache->indexlist->num_elements == 0) {
1265                 /* no indexed fields */
1266                 return LDB_SUCCESS;
1267         }
1268
1269         for (i = 0; i < num_el; i++) {
1270                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
1271                                        NULL, LTDB_IDXATTR);
1272                 if (ret == -1) {
1273                         continue;
1274                 }
1275                 for (j = 0; j < elements[i].num_values; j++) {
1276                         ret = ltdb_index_add1(module, dn, &elements[i], j);
1277                         if (ret != LDB_SUCCESS) {
1278                                 return ret;
1279                         }
1280                 }
1281         }
1282
1283         return LDB_SUCCESS;
1284 }
1285
1286 /*
1287   add the index entries for a new record
1288 */
1289 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
1290 {
1291         const char *dn;
1292         int ret;
1293
1294         dn = ldb_dn_get_linearized(msg->dn);
1295         if (dn == NULL) {
1296                 return LDB_ERR_OPERATIONS_ERROR;
1297         }
1298
1299         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1300
1301         return ret;
1302 }
1303
1304
1305 /*
1306   delete an index entry for one message element
1307 */
1308 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
1309                          struct ldb_message_element *el, int v_idx)
1310 {
1311         struct ldb_context *ldb = module->ldb;
1312         struct ldb_message *msg;
1313         struct ldb_dn *dn_key;
1314         int ret, i;
1315         unsigned int j;
1316
1317         if (dn[0] == '@') {
1318                 return LDB_SUCCESS;
1319         }
1320
1321         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx]);
1322         if (!dn_key) {
1323                 return LDB_ERR_OPERATIONS_ERROR;
1324         }
1325
1326         msg = talloc(dn_key, struct ldb_message);
1327         if (msg == NULL) {
1328                 talloc_free(dn_key);
1329                 return LDB_ERR_OPERATIONS_ERROR;
1330         }
1331
1332         ret = ltdb_search_dn1_index(module, dn_key, msg);
1333         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1334                 talloc_free(dn_key);
1335                 return ret;
1336         }
1337
1338         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1339                 /* it wasn't indexed. Did we have an earlier error? If we did then
1340                    its gone now */
1341                 talloc_free(dn_key);
1342                 return LDB_SUCCESS;
1343         }
1344
1345         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
1346         if (i == -1) {
1347                 struct ldb_ldif ldif;
1348
1349                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1350                                 "ERROR: dn %s not found in %s\n", dn,
1351                                 ldb_dn_get_linearized(dn_key));
1352                 ldif.changetype = LDB_CHANGETYPE_NONE;
1353                 ldif.msg = msg;
1354                 ldb_ldif_write_file(module->ldb, stdout, &ldif);
1355                 sleep(100);
1356                 /* it ain't there. hmmm */
1357                 talloc_free(dn_key);
1358                 return LDB_SUCCESS;
1359         }
1360
1361         if (j != msg->elements[i].num_values - 1) {
1362                 memmove(&msg->elements[i].values[j],
1363                         &msg->elements[i].values[j+1],
1364                         (msg->elements[i].num_values-(j+1)) *
1365                         sizeof(msg->elements[i].values[0]));
1366         }
1367         msg->elements[i].num_values--;
1368
1369         if (msg->elements[i].num_values == 0) {
1370                 ret = ltdb_delete_noindex(module, dn_key);
1371         } else {
1372                 ret = ltdb_store_idxptr(module, msg, TDB_REPLACE);
1373         }
1374
1375         talloc_free(dn_key);
1376
1377         return ret;
1378 }
1379
1380 /*
1381   delete the index entries for a record
1382   return -1 on failure
1383 */
1384 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1385 {
1386         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1387         int ret;
1388         const char *dn;
1389         unsigned int i, j;
1390
1391         /* find the list of indexed fields */
1392         if (ltdb->cache->indexlist->num_elements == 0) {
1393                 /* no indexed fields */
1394                 return LDB_SUCCESS;
1395         }
1396
1397         if (ldb_dn_is_special(msg->dn)) {
1398                 return LDB_SUCCESS;
1399         }
1400
1401         dn = ldb_dn_get_linearized(msg->dn);
1402         if (dn == NULL) {
1403                 return LDB_ERR_OPERATIONS_ERROR;
1404         }
1405
1406         for (i = 0; i < msg->num_elements; i++) {
1407                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1408                                        NULL, LTDB_IDXATTR);
1409                 if (ret == -1) {
1410                         continue;
1411                 }
1412                 for (j = 0; j < msg->elements[i].num_values; j++) {
1413                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1414                         if (ret != LDB_SUCCESS) {
1415                                 return ret;
1416                         }
1417                 }
1418         }
1419
1420         return LDB_SUCCESS;
1421 }
1422
1423 /*
1424   handle special index for one level searches
1425 */
1426 int ltdb_index_one(struct ldb_module *module, const struct ldb_message *msg, int add)
1427 {
1428         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1429         struct ldb_message_element el;
1430         struct ldb_val val;
1431         struct ldb_dn *pdn;
1432         const char *dn;
1433         int ret;
1434
1435         /* We index for ONE Level only if requested */
1436         ret = ldb_msg_find_idx(ltdb->cache->indexlist, NULL, NULL, LTDB_IDXONE);
1437         if (ret != 0) {
1438                 return LDB_SUCCESS;
1439         }
1440
1441         pdn = ldb_dn_get_parent(module, msg->dn);
1442         if (pdn == NULL) {
1443                 return LDB_ERR_OPERATIONS_ERROR;
1444         }
1445
1446         dn = ldb_dn_get_linearized(msg->dn);
1447         if (dn == NULL) {
1448                 talloc_free(pdn);
1449                 return LDB_ERR_OPERATIONS_ERROR;
1450         }
1451
1452         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1453         if (val.data == NULL) {
1454                 talloc_free(pdn);
1455                 return LDB_ERR_OPERATIONS_ERROR;
1456         }
1457
1458         val.length = strlen((char *)val.data);
1459         el.name = LTDB_IDXONE;
1460         el.values = &val;
1461         el.num_values = 1;
1462
1463         if (add) {
1464                 ret = ltdb_index_add1(module, dn, &el, 0);
1465         } else { /* delete */
1466                 ret = ltdb_index_del_value(module, dn, &el, 0);
1467         }
1468
1469         talloc_free(pdn);
1470
1471         return ret;
1472 }
1473
1474
1475 /*
1476   traversal function that deletes all @INDEX records
1477 */
1478 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1479 {
1480         const char *dn = "DN=" LTDB_INDEX ":";
1481         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1482                 return tdb_delete(tdb, key);
1483         }
1484         return 0;
1485 }
1486
1487 /*
1488   traversal function that adds @INDEX records during a re index
1489 */
1490 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1491 {
1492         struct ldb_module *module = (struct ldb_module *)state;
1493         struct ldb_message *msg;
1494         const char *dn = NULL;
1495         int ret;
1496         TDB_DATA key2;
1497
1498         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1499             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1500                 return 0;
1501         }
1502
1503         msg = talloc(module, struct ldb_message);
1504         if (msg == NULL) {
1505                 return -1;
1506         }
1507
1508         ret = ltdb_unpack_data(module, &data, msg);
1509         if (ret != 0) {
1510                 talloc_free(msg);
1511                 return -1;
1512         }
1513
1514         /* check if the DN key has changed, perhaps due to the
1515            case insensitivity of an element changing */
1516         key2 = ltdb_key(module, msg->dn);
1517         if (key2.dptr == NULL) {
1518                 /* probably a corrupt record ... darn */
1519                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1520                                                         ldb_dn_get_linearized(msg->dn));
1521                 talloc_free(msg);
1522                 return 0;
1523         }
1524         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1525                 tdb_delete(tdb, key);
1526                 tdb_store(tdb, key2, data, 0);
1527         }
1528         talloc_free(key2.dptr);
1529
1530         if (msg->dn == NULL) {
1531                 dn = (char *)key.dptr + 3;
1532         } else {
1533                 dn = ldb_dn_get_linearized(msg->dn);
1534         }
1535
1536         ret = ltdb_index_one(module, msg, 1);
1537         if (ret == LDB_SUCCESS) {
1538                 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1539         } else {
1540                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1541                         "Adding special ONE LEVEL index failed (%s)!\n",
1542                         ldb_dn_get_linearized(msg->dn));
1543         }
1544
1545         talloc_free(msg);
1546
1547         if (ret != LDB_SUCCESS) return -1;
1548
1549         return 0;
1550 }
1551
1552 /*
1553   force a complete reindex of the database
1554 */
1555 int ltdb_reindex(struct ldb_module *module)
1556 {
1557         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
1558         int ret;
1559
1560         if (ltdb_cache_reload(module) != 0) {
1561                 return LDB_ERR_OPERATIONS_ERROR;
1562         }
1563
1564         /* first traverse the database deleting any @INDEX records */
1565         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1566         if (ret == -1) {
1567                 return LDB_ERR_OPERATIONS_ERROR;
1568         }
1569
1570         /* if we don't have indexes we have nothing todo */
1571         if (ltdb->cache->indexlist->num_elements == 0) {
1572                 return LDB_SUCCESS;
1573         }
1574
1575         /* now traverse adding any indexes for normal LDB records */
1576         ret = tdb_traverse(ltdb->tdb, re_index, module);
1577         if (ret == -1) {
1578                 return LDB_ERR_OPERATIONS_ERROR;
1579         }
1580
1581         if (ltdb->idxptr) {
1582                 ltdb->idxptr->repack = true;
1583         }
1584
1585         return LDB_SUCCESS;
1586 }