a3848eddb2b73c32bf1d9dd200ab7ef5cddaf29d
[ddiss/samba.git] / lib / ldb / ldb_tdb / ldb_index.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004-2009
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 /*
25  *  Name: ldb
26  *
27  *  Component: ldb tdb backend - indexing
28  *
29  *  Description: indexing routines for ldb tdb backend
30  *
31  *  Author: Andrew Tridgell
32  */
33
34 #include "ldb_tdb.h"
35
36 struct dn_list {
37         unsigned int count;
38         struct ldb_val *dn;
39 };
40
41 struct ltdb_idxptr {
42         struct tdb_context *itdb;
43         int error;
44 };
45
46 /* we put a @IDXVERSION attribute on index entries. This
47    allows us to tell if it was written by an older version
48 */
49 #define LTDB_INDEXING_VERSION 2
50
51 /* enable the idxptr mode when transactions start */
52 int ltdb_index_transaction_start(struct ldb_module *module)
53 {
54         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
55         ltdb->idxptr = talloc_zero(ltdb, struct ltdb_idxptr);
56         return LDB_SUCCESS;
57 }
58
59 /* compare two DN entries in a dn_list. Take account of possible
60  * differences in string termination */
61 static int dn_list_cmp(const struct ldb_val *v1, const struct ldb_val *v2)
62 {
63         if (v1->length > v2->length && v1->data[v2->length] != 0) {
64                 return -1;
65         }
66         if (v1->length < v2->length && v2->data[v1->length] != 0) {
67                 return 1;
68         }
69         return strncmp((char *)v1->data, (char *)v2->data, v1->length);
70 }
71
72
73 /*
74   find a entry in a dn_list, using a ldb_val. Uses a case sensitive
75   comparison with the dn returns -1 if not found
76  */
77 static int ltdb_dn_list_find_val(const struct dn_list *list, const struct ldb_val *v)
78 {
79         unsigned int i;
80         for (i=0; i<list->count; i++) {
81                 if (dn_list_cmp(&list->dn[i], v) == 0) return i;
82         }
83         return -1;
84 }
85
86 /*
87   find a entry in a dn_list. Uses a case sensitive comparison with the dn
88   returns -1 if not found
89  */
90 static int ltdb_dn_list_find_str(struct dn_list *list, const char *dn)
91 {
92         struct ldb_val v;
93         v.data = discard_const_p(unsigned char, dn);
94         v.length = strlen(dn);
95         return ltdb_dn_list_find_val(list, &v);
96 }
97
98 /*
99   this is effectively a cast function, but with lots of paranoia
100   checks and also copes with CPUs that are fussy about pointer
101   alignment
102  */
103 static struct dn_list *ltdb_index_idxptr(struct ldb_module *module, TDB_DATA rec, bool check_parent)
104 {
105         struct dn_list *list;
106         if (rec.dsize != sizeof(void *)) {
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
108                                        "Bad data size for idxptr %u", (unsigned)rec.dsize);
109                 return NULL;
110         }
111         /* note that we can't just use a cast here, as rec.dptr may
112            not be aligned sufficiently for a pointer. A cast would cause
113            platforms like some ARM CPUs to crash */
114         memcpy(&list, rec.dptr, sizeof(void *));
115         list = talloc_get_type(list, struct dn_list);
116         if (list == NULL) {
117                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
118                                        "Bad type '%s' for idxptr", 
119                                        talloc_get_name(list));
120                 return NULL;
121         }
122         if (check_parent && list->dn && talloc_parent(list->dn) != list) {
123                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
124                                        "Bad parent '%s' for idxptr", 
125                                        talloc_get_name(talloc_parent(list->dn)));
126                 return NULL;
127         }
128         return list;
129 }
130
131 /*
132   return the @IDX list in an index entry for a dn as a 
133   struct dn_list
134  */
135 static int ltdb_dn_list_load(struct ldb_module *module,
136                              struct ldb_dn *dn, struct dn_list *list)
137 {
138         struct ldb_message *msg;
139         int ret;
140         struct ldb_message_element *el;
141         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
142         TDB_DATA rec;
143         struct dn_list *list2;
144         TDB_DATA key;
145
146         list->dn = NULL;
147         list->count = 0;
148
149         /* see if we have any in-memory index entries */
150         if (ltdb->idxptr == NULL ||
151             ltdb->idxptr->itdb == NULL) {
152                 goto normal_index;
153         }
154
155         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
156         key.dsize = strlen((char *)key.dptr);
157
158         rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
159         if (rec.dptr == NULL) {
160                 goto normal_index;
161         }
162
163         /* we've found an in-memory index entry */
164         list2 = ltdb_index_idxptr(module, rec, true);
165         if (list2 == NULL) {
166                 free(rec.dptr);
167                 return LDB_ERR_OPERATIONS_ERROR;
168         }
169         free(rec.dptr);
170
171         *list = *list2;
172         return LDB_SUCCESS;
173
174 normal_index:
175         msg = ldb_msg_new(list);
176         if (msg == NULL) {
177                 return LDB_ERR_OPERATIONS_ERROR;
178         }
179
180         ret = ltdb_search_dn1(module, dn, msg);
181         if (ret != LDB_SUCCESS) {
182                 talloc_free(msg);
183                 return ret;
184         }
185
186         /* TODO: check indexing version number */
187
188         el = ldb_msg_find_element(msg, LTDB_IDX);
189         if (!el) {
190                 talloc_free(msg);
191                 return LDB_SUCCESS;
192         }
193
194         /* we avoid copying the strings by stealing the list */
195         list->dn = talloc_steal(list, el->values);
196         list->count = el->num_values;
197
198         return LDB_SUCCESS;
199 }
200
201
202 /*
203   save a dn_list into a full @IDX style record
204  */
205 static int ltdb_dn_list_store_full(struct ldb_module *module, struct ldb_dn *dn, 
206                                    struct dn_list *list)
207 {
208         struct ldb_message *msg;
209         int ret;
210
211         if (list->count == 0) {
212                 ret = ltdb_delete_noindex(module, dn);
213                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
214                         return LDB_SUCCESS;
215                 }
216                 return ret;
217         }
218
219         msg = ldb_msg_new(module);
220         if (!msg) {
221                 return ldb_module_oom(module);
222         }
223
224         ret = ldb_msg_add_fmt(msg, LTDB_IDXVERSION, "%u", LTDB_INDEXING_VERSION);
225         if (ret != LDB_SUCCESS) {
226                 talloc_free(msg);
227                 return ldb_module_oom(module);
228         }
229
230         msg->dn = dn;
231         if (list->count > 0) {
232                 struct ldb_message_element *el;
233
234                 ret = ldb_msg_add_empty(msg, LTDB_IDX, LDB_FLAG_MOD_ADD, &el);
235                 if (ret != LDB_SUCCESS) {
236                         talloc_free(msg);
237                         return ldb_module_oom(module);
238                 }
239                 el->values = list->dn;
240                 el->num_values = list->count;
241         }
242
243         ret = ltdb_store(module, msg, TDB_REPLACE);
244         talloc_free(msg);
245         return ret;
246 }
247
248 /*
249   save a dn_list into the database, in either @IDX or internal format
250  */
251 static int ltdb_dn_list_store(struct ldb_module *module, struct ldb_dn *dn, 
252                               struct dn_list *list)
253 {
254         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
255         TDB_DATA rec, key;
256         int ret;
257         struct dn_list *list2;
258
259         if (ltdb->idxptr == NULL) {
260                 return ltdb_dn_list_store_full(module, dn, list);
261         }
262
263         if (ltdb->idxptr->itdb == NULL) {
264                 ltdb->idxptr->itdb = tdb_open_compat(NULL, 1000, TDB_INTERNAL, O_RDWR, 0, NULL, NULL);
265                 if (ltdb->idxptr->itdb == NULL) {
266                         return LDB_ERR_OPERATIONS_ERROR;
267                 }
268         }
269
270         key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
271         key.dsize = strlen((char *)key.dptr);
272
273         rec = tdb_fetch_compat(ltdb->idxptr->itdb, key);
274         if (rec.dptr != NULL) {
275                 list2 = ltdb_index_idxptr(module, rec, false);
276                 if (list2 == NULL) {
277                         free(rec.dptr);
278                         return LDB_ERR_OPERATIONS_ERROR;
279                 }
280                 free(rec.dptr);
281                 list2->dn = talloc_steal(list2, list->dn);
282                 list2->count = list->count;
283                 return LDB_SUCCESS;
284         }
285
286         list2 = talloc(ltdb->idxptr, struct dn_list);
287         if (list2 == NULL) {
288                 return LDB_ERR_OPERATIONS_ERROR;
289         }
290         list2->dn = talloc_steal(list2, list->dn);
291         list2->count = list->count;
292
293         rec.dptr = (uint8_t *)&list2;
294         rec.dsize = sizeof(void *);
295
296         ret = tdb_store(ltdb->idxptr->itdb, key, rec, TDB_INSERT);
297         if (ret != 0) {
298                 return ltdb_err_map(tdb_error(ltdb->idxptr->itdb));
299         }
300         return LDB_SUCCESS;
301 }
302
303 /*
304   traverse function for storing the in-memory index entries on disk
305  */
306 static int ltdb_index_traverse_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
307 {
308         struct ldb_module *module = state;
309         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
310         struct ldb_dn *dn;
311         struct ldb_context *ldb = ldb_module_get_ctx(module);
312         struct ldb_val v;
313         struct dn_list *list;
314
315         list = ltdb_index_idxptr(module, data, true);
316         if (list == NULL) {
317                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
318                 return -1;
319         }
320
321         v.data = key.dptr;
322         v.length = strnlen((char *)key.dptr, key.dsize);
323
324         dn = ldb_dn_from_ldb_val(module, ldb, &v);
325         if (dn == NULL) {
326                 ldb_asprintf_errstring(ldb, "Failed to parse index key %*.*s as an LDB DN", (int)v.length, (int)v.length, (const char *)v.data);
327                 ltdb->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
328                 return -1;
329         }
330
331         ltdb->idxptr->error = ltdb_dn_list_store_full(module, dn, list);
332         talloc_free(dn);
333         if (ltdb->idxptr->error != 0) {
334                 return -1;
335         }
336         return 0;
337 }
338
339 /* cleanup the idxptr mode when transaction commits */
340 int ltdb_index_transaction_commit(struct ldb_module *module)
341 {
342         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
343         int ret;
344
345         struct ldb_context *ldb = ldb_module_get_ctx(module);
346
347         ldb_reset_err_string(ldb);
348
349         if (ltdb->idxptr->itdb) {
350                 tdb_traverse(ltdb->idxptr->itdb, ltdb_index_traverse_store, module);
351                 tdb_close(ltdb->idxptr->itdb);
352         }
353
354         ret = ltdb->idxptr->error;
355         if (ret != LDB_SUCCESS) {
356                 if (!ldb_errstring(ldb)) {
357                         ldb_set_errstring(ldb, ldb_strerror(ret));
358                 }
359                 ldb_asprintf_errstring(ldb, "Failed to store index records in transaction commit: %s", ldb_errstring(ldb));
360         }
361
362         talloc_free(ltdb->idxptr);
363         ltdb->idxptr = NULL;
364         return ret;
365 }
366
367 /* cleanup the idxptr mode when transaction cancels */
368 int ltdb_index_transaction_cancel(struct ldb_module *module)
369 {
370         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
371         if (ltdb->idxptr && ltdb->idxptr->itdb) {
372                 tdb_close(ltdb->idxptr->itdb);
373         }
374         talloc_free(ltdb->idxptr);
375         ltdb->idxptr = NULL;
376         return LDB_SUCCESS;
377 }
378
379
380 /*
381   return the dn key to be used for an index
382   the caller is responsible for freeing
383 */
384 static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
385                                      const char *attr, const struct ldb_val *value,
386                                      const struct ldb_schema_attribute **ap)
387 {
388         struct ldb_dn *ret;
389         struct ldb_val v;
390         const struct ldb_schema_attribute *a;
391         char *attr_folded;
392         int r;
393
394         attr_folded = ldb_attr_casefold(ldb, attr);
395         if (!attr_folded) {
396                 return NULL;
397         }
398
399         a = ldb_schema_attribute_by_name(ldb, attr);
400         if (ap) {
401                 *ap = a;
402         }
403         r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
404         if (r != LDB_SUCCESS) {
405                 const char *errstr = ldb_errstring(ldb);
406                 /* canonicalisation can be refused. For example, 
407                    a attribute that takes wildcards will refuse to canonicalise
408                    if the value contains a wildcard */
409                 ldb_asprintf_errstring(ldb, "Failed to create index key for attribute '%s':%s%s%s",
410                                        attr, ldb_strerror(r), (errstr?":":""), (errstr?errstr:""));
411                 talloc_free(attr_folded);
412                 return NULL;
413         }
414         if (ldb_should_b64_encode(ldb, &v)) {
415                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
416                 if (!vstr) {
417                         talloc_free(attr_folded);
418                         return NULL;
419                 }
420                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
421                 talloc_free(vstr);
422         } else {
423                 ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
424         }
425
426         if (v.data != value->data) {
427                 talloc_free(v.data);
428         }
429         talloc_free(attr_folded);
430
431         return ret;
432 }
433
434 /*
435   see if a attribute value is in the list of indexed attributes
436 */
437 static bool ltdb_is_indexed(const struct ldb_message *index_list, const char *attr)
438 {
439         unsigned int i;
440         struct ldb_message_element *el;
441
442         el = ldb_msg_find_element(index_list, LTDB_IDXATTR);
443         if (el == NULL) {
444                 return false;
445         }
446
447         /* TODO: this is too expensive! At least use a binary search */
448         for (i=0; i<el->num_values; i++) {
449                 if (ldb_attr_cmp((char *)el->values[i].data, attr) == 0) {
450                         return true;
451                 }
452         }
453         return false;
454 }
455
456 /*
457   in the following logic functions, the return value is treated as
458   follows:
459
460      LDB_SUCCESS: we found some matching index values
461
462      LDB_ERR_NO_SUCH_OBJECT: we know for sure that no object matches
463
464      LDB_ERR_OPERATIONS_ERROR: indexing could not answer the call,
465                                we'll need a full search
466  */
467
468 /*
469   return a list of dn's that might match a simple indexed search (an
470   equality search only)
471  */
472 static int ltdb_index_dn_simple(struct ldb_module *module,
473                                 const struct ldb_parse_tree *tree,
474                                 const struct ldb_message *index_list,
475                                 struct dn_list *list)
476 {
477         struct ldb_context *ldb;
478         struct ldb_dn *dn;
479         int ret;
480
481         ldb = ldb_module_get_ctx(module);
482
483         list->count = 0;
484         list->dn = NULL;
485
486         /* if the attribute isn't in the list of indexed attributes then
487            this node needs a full search */
488         if (!ltdb_is_indexed(index_list, tree->u.equality.attr)) {
489                 return LDB_ERR_OPERATIONS_ERROR;
490         }
491
492         /* the attribute is indexed. Pull the list of DNs that match the 
493            search criterion */
494         dn = ltdb_index_key(ldb, tree->u.equality.attr, &tree->u.equality.value, NULL);
495         if (!dn) return LDB_ERR_OPERATIONS_ERROR;
496
497         ret = ltdb_dn_list_load(module, dn, list);
498         talloc_free(dn);
499         return ret;
500 }
501
502
503 static bool list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
504
505 /*
506   return a list of dn's that might match a leaf indexed search
507  */
508 static int ltdb_index_dn_leaf(struct ldb_module *module,
509                               const struct ldb_parse_tree *tree,
510                               const struct ldb_message *index_list,
511                               struct dn_list *list)
512 {
513         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
514                                                     struct ltdb_private);
515         if (ltdb->disallow_dn_filter &&
516             (ldb_attr_cmp(tree->u.equality.attr, "dn") == 0)) {
517                 /* in AD mode we do not support "(dn=...)" search filters */
518                 list->dn = NULL;
519                 list->count = 0;
520                 return LDB_SUCCESS;
521         }
522         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
523                 list->dn = talloc_array(list, struct ldb_val, 1);
524                 if (list->dn == NULL) {
525                         ldb_module_oom(module);
526                         return LDB_ERR_OPERATIONS_ERROR;
527                 }
528                 list->dn[0] = tree->u.equality.value;
529                 list->count = 1;
530                 return LDB_SUCCESS;
531         }
532         return ltdb_index_dn_simple(module, tree, index_list, list);
533 }
534
535
536 /*
537   list intersection
538   list = list & list2
539 */
540 static bool list_intersect(struct ldb_context *ldb,
541                            struct dn_list *list, const struct dn_list *list2)
542 {
543         struct dn_list *list3;
544         unsigned int i;
545
546         if (list->count == 0) {
547                 /* 0 & X == 0 */
548                 return true;
549         }
550         if (list2->count == 0) {
551                 /* X & 0 == 0 */
552                 list->count = 0;
553                 list->dn = NULL;
554                 return true;
555         }
556
557         /* the indexing code is allowed to return a longer list than
558            what really matches, as all results are filtered by the
559            full expression at the end - this shortcut avoids a lot of
560            work in some cases */
561         if (list->count < 2 && list2->count > 10) {
562                 return true;
563         }
564         if (list2->count < 2 && list->count > 10) {
565                 list->count = list2->count;
566                 list->dn = list2->dn;
567                 /* note that list2 may not be the parent of list2->dn,
568                    as list2->dn may be owned by ltdb->idxptr. In that
569                    case we expect this reparent call to fail, which is
570                    OK */
571                 talloc_reparent(list2, list, list2->dn);
572                 return true;
573         }
574
575         list3 = talloc_zero(list, struct dn_list);
576         if (list3 == NULL) {
577                 return false;
578         }
579
580         list3->dn = talloc_array(list3, struct ldb_val, list->count);
581         if (!list3->dn) {
582                 talloc_free(list3);
583                 return false;
584         }
585         list3->count = 0;
586
587         for (i=0;i<list->count;i++) {
588                 if (ltdb_dn_list_find_val(list2, &list->dn[i]) != -1) {
589                         list3->dn[list3->count] = list->dn[i];
590                         list3->count++;
591                 }
592         }
593
594         list->dn = talloc_steal(list, list3->dn);
595         list->count = list3->count;
596         talloc_free(list3);
597
598         return true;
599 }
600
601
602 /*
603   list union
604   list = list | list2
605 */
606 static bool list_union(struct ldb_context *ldb,
607                        struct dn_list *list, const struct dn_list *list2)
608 {
609         struct ldb_val *dn3;
610
611         if (list2->count == 0) {
612                 /* X | 0 == X */
613                 return true;
614         }
615
616         if (list->count == 0) {
617                 /* 0 | X == X */
618                 list->count = list2->count;
619                 list->dn = list2->dn;
620                 /* note that list2 may not be the parent of list2->dn,
621                    as list2->dn may be owned by ltdb->idxptr. In that
622                    case we expect this reparent call to fail, which is
623                    OK */
624                 talloc_reparent(list2, list, list2->dn);
625                 return true;
626         }
627
628         dn3 = talloc_array(list, struct ldb_val, list->count + list2->count);
629         if (!dn3) {
630                 ldb_oom(ldb);
631                 return false;
632         }
633
634         /* we allow for duplicates here, and get rid of them later */
635         memcpy(dn3, list->dn, sizeof(list->dn[0])*list->count);
636         memcpy(dn3+list->count, list2->dn, sizeof(list2->dn[0])*list2->count);
637
638         list->dn = dn3;
639         list->count += list2->count;
640
641         return true;
642 }
643
644 static int ltdb_index_dn(struct ldb_module *module,
645                          const struct ldb_parse_tree *tree,
646                          const struct ldb_message *index_list,
647                          struct dn_list *list);
648
649
650 /*
651   process an OR list (a union)
652  */
653 static int ltdb_index_dn_or(struct ldb_module *module,
654                             const struct ldb_parse_tree *tree,
655                             const struct ldb_message *index_list,
656                             struct dn_list *list)
657 {
658         struct ldb_context *ldb;
659         unsigned int i;
660
661         ldb = ldb_module_get_ctx(module);
662
663         list->dn = NULL;
664         list->count = 0;
665
666         for (i=0; i<tree->u.list.num_elements; i++) {
667                 struct dn_list *list2;
668                 int ret;
669
670                 list2 = talloc_zero(list, struct dn_list);
671                 if (list2 == NULL) {
672                         return LDB_ERR_OPERATIONS_ERROR;
673                 }
674
675                 ret = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
676
677                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
678                         /* X || 0 == X */
679                         talloc_free(list2);
680                         continue;
681                 }
682
683                 if (ret != LDB_SUCCESS) {
684                         /* X || * == * */
685                         talloc_free(list2);
686                         return ret;
687                 }
688
689                 if (!list_union(ldb, list, list2)) {
690                         talloc_free(list2);
691                         return LDB_ERR_OPERATIONS_ERROR;
692                 }
693         }
694
695         if (list->count == 0) {
696                 return LDB_ERR_NO_SUCH_OBJECT;
697         }
698
699         return LDB_SUCCESS;
700 }
701
702
703 /*
704   NOT an index results
705  */
706 static int ltdb_index_dn_not(struct ldb_module *module,
707                              const struct ldb_parse_tree *tree,
708                              const struct ldb_message *index_list,
709                              struct dn_list *list)
710 {
711         /* the only way to do an indexed not would be if we could
712            negate the not via another not or if we knew the total
713            number of database elements so we could know that the
714            existing expression covered the whole database.
715
716            instead, we just give up, and rely on a full index scan
717            (unless an outer & manages to reduce the list)
718         */
719         return LDB_ERR_OPERATIONS_ERROR;
720 }
721
722
723 static bool ltdb_index_unique(struct ldb_context *ldb,
724                               const char *attr)
725 {
726         const struct ldb_schema_attribute *a;
727         a = ldb_schema_attribute_by_name(ldb, attr);
728         if (a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
729                 return true;
730         }
731         return false;
732 }
733
734 /*
735   process an AND expression (intersection)
736  */
737 static int ltdb_index_dn_and(struct ldb_module *module,
738                              const struct ldb_parse_tree *tree,
739                              const struct ldb_message *index_list,
740                              struct dn_list *list)
741 {
742         struct ldb_context *ldb;
743         unsigned int i;
744         bool found;
745
746         ldb = ldb_module_get_ctx(module);
747
748         list->dn = NULL;
749         list->count = 0;
750
751         /* in the first pass we only look for unique simple
752            equality tests, in the hope of avoiding having to look
753            at any others */
754         for (i=0; i<tree->u.list.num_elements; i++) {
755                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
756                 int ret;
757
758                 if (subtree->operation != LDB_OP_EQUALITY ||
759                     !ltdb_index_unique(ldb, subtree->u.equality.attr)) {
760                         continue;
761                 }
762                 
763                 ret = ltdb_index_dn(module, subtree, index_list, list);
764                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
765                         /* 0 && X == 0 */
766                         return LDB_ERR_NO_SUCH_OBJECT;
767                 }
768                 if (ret == LDB_SUCCESS) {
769                         /* a unique index match means we can
770                          * stop. Note that we don't care if we return
771                          * a few too many objects, due to later
772                          * filtering */
773                         return LDB_SUCCESS;
774                 }
775         }       
776
777         /* now do a full intersection */
778         found = false;
779
780         for (i=0; i<tree->u.list.num_elements; i++) {
781                 const struct ldb_parse_tree *subtree = tree->u.list.elements[i];
782                 struct dn_list *list2;
783                 int ret;
784
785                 list2 = talloc_zero(list, struct dn_list);
786                 if (list2 == NULL) {
787                         return ldb_module_oom(module);
788                 }
789                         
790                 ret = ltdb_index_dn(module, subtree, index_list, list2);
791
792                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
793                         /* X && 0 == 0 */
794                         list->dn = NULL;
795                         list->count = 0;
796                         talloc_free(list2);
797                         return LDB_ERR_NO_SUCH_OBJECT;
798                 }
799                 
800                 if (ret != LDB_SUCCESS) {
801                         /* this didn't adding anything */
802                         talloc_free(list2);
803                         continue;
804                 }
805
806                 if (!found) {
807                         talloc_reparent(list2, list, list->dn);
808                         list->dn = list2->dn;
809                         list->count = list2->count;
810                         found = true;
811                 } else if (!list_intersect(ldb, list, list2)) {
812                         talloc_free(list2);
813                         return LDB_ERR_OPERATIONS_ERROR;
814                 }
815                         
816                 if (list->count == 0) {
817                         list->dn = NULL;
818                         return LDB_ERR_NO_SUCH_OBJECT;
819                 }
820                         
821                 if (list->count < 2) {
822                         /* it isn't worth loading the next part of the tree */
823                         return LDB_SUCCESS;
824                 }
825         }       
826
827         if (!found) {
828                 /* none of the attributes were indexed */
829                 return LDB_ERR_OPERATIONS_ERROR;
830         }
831
832         return LDB_SUCCESS;
833 }
834         
835 /*
836   return a list of matching objects using a one-level index
837  */
838 static int ltdb_index_dn_one(struct ldb_module *module,
839                              struct ldb_dn *parent_dn,
840                              struct dn_list *list)
841 {
842         struct ldb_context *ldb;
843         struct ldb_dn *key;
844         struct ldb_val val;
845         int ret;
846
847         ldb = ldb_module_get_ctx(module);
848
849         /* work out the index key from the parent DN */
850         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(parent_dn));
851         val.length = strlen((char *)val.data);
852         key = ltdb_index_key(ldb, LTDB_IDXONE, &val, NULL);
853         if (!key) {
854                 ldb_oom(ldb);
855                 return LDB_ERR_OPERATIONS_ERROR;
856         }
857
858         ret = ltdb_dn_list_load(module, key, list);
859         talloc_free(key);
860         if (ret != LDB_SUCCESS) {
861                 return ret;
862         }
863
864         if (list->count == 0) {
865                 return LDB_ERR_NO_SUCH_OBJECT;
866         }
867
868         return LDB_SUCCESS;
869 }
870
871 /*
872   return a list of dn's that might match a indexed search or
873   an error. return LDB_ERR_NO_SUCH_OBJECT for no matches, or LDB_SUCCESS for matches
874  */
875 static int ltdb_index_dn(struct ldb_module *module,
876                          const struct ldb_parse_tree *tree,
877                          const struct ldb_message *index_list,
878                          struct dn_list *list)
879 {
880         int ret = LDB_ERR_OPERATIONS_ERROR;
881
882         switch (tree->operation) {
883         case LDB_OP_AND:
884                 ret = ltdb_index_dn_and(module, tree, index_list, list);
885                 break;
886
887         case LDB_OP_OR:
888                 ret = ltdb_index_dn_or(module, tree, index_list, list);
889                 break;
890
891         case LDB_OP_NOT:
892                 ret = ltdb_index_dn_not(module, tree, index_list, list);
893                 break;
894
895         case LDB_OP_EQUALITY:
896                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
897                 break;
898
899         case LDB_OP_SUBSTRING:
900         case LDB_OP_GREATER:
901         case LDB_OP_LESS:
902         case LDB_OP_PRESENT:
903         case LDB_OP_APPROX:
904         case LDB_OP_EXTENDED:
905                 /* we can't index with fancy bitops yet */
906                 ret = LDB_ERR_OPERATIONS_ERROR;
907                 break;
908         }
909
910         return ret;
911 }
912
913 /*
914   filter a candidate dn_list from an indexed search into a set of results
915   extracting just the given attributes
916 */
917 static int ltdb_index_filter(const struct dn_list *dn_list,
918                              struct ltdb_context *ac, 
919                              uint32_t *match_count)
920 {
921         struct ldb_context *ldb;
922         struct ldb_message *msg;
923         unsigned int i;
924
925         ldb = ldb_module_get_ctx(ac->module);
926
927         for (i = 0; i < dn_list->count; i++) {
928                 struct ldb_dn *dn;
929                 int ret;
930                 bool matched;
931
932                 msg = ldb_msg_new(ac);
933                 if (!msg) {
934                         return LDB_ERR_OPERATIONS_ERROR;
935                 }
936
937                 dn = ldb_dn_from_ldb_val(msg, ldb, &dn_list->dn[i]);
938                 if (dn == NULL) {
939                         talloc_free(msg);
940                         return LDB_ERR_OPERATIONS_ERROR;
941                 }
942
943                 ret = ltdb_search_dn1(ac->module, dn, msg);
944                 talloc_free(dn);
945                 if (ret == LDB_ERR_NO_SUCH_OBJECT) {
946                         /* the record has disappeared? yes, this can happen */
947                         talloc_free(msg);
948                         continue;
949                 }
950
951                 if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
952                         /* an internal error */
953                         talloc_free(msg);
954                         return LDB_ERR_OPERATIONS_ERROR;
955                 }
956
957                 ret = ldb_match_msg_error(ldb, msg,
958                                           ac->tree, ac->base, ac->scope, &matched);
959                 if (ret != LDB_SUCCESS) {
960                         talloc_free(msg);
961                         return ret;
962                 }
963                 if (!matched) {
964                         talloc_free(msg);
965                         continue;
966                 }
967
968                 /* filter the attributes that the user wants */
969                 ret = ltdb_filter_attrs(msg, ac->attrs);
970
971                 if (ret == -1) {
972                         talloc_free(msg);
973                         return LDB_ERR_OPERATIONS_ERROR;
974                 }
975
976                 ret = ldb_module_send_entry(ac->req, msg, NULL);
977                 if (ret != LDB_SUCCESS) {
978                         /* Regardless of success or failure, the msg
979                          * is the callbacks responsiblity, and should
980                          * not be talloc_free()'ed */
981                         ac->request_terminated = true;
982                         return ret;
983                 }
984
985                 (*match_count)++;
986         }
987
988         return LDB_SUCCESS;
989 }
990
991 /*
992   remove any duplicated entries in a indexed result
993  */
994 static void ltdb_dn_list_remove_duplicates(struct dn_list *list)
995 {
996         unsigned int i, new_count;
997
998         if (list->count < 2) {
999                 return;
1000         }
1001
1002         TYPESAFE_QSORT(list->dn, list->count, dn_list_cmp);
1003
1004         new_count = 1;
1005         for (i=1; i<list->count; i++) {
1006                 if (dn_list_cmp(&list->dn[i], &list->dn[new_count-1]) != 0) {
1007                         if (new_count != i) {
1008                                 list->dn[new_count] = list->dn[i];
1009                         }
1010                         new_count++;
1011                 }
1012         }
1013         
1014         list->count = new_count;
1015 }
1016
1017 /*
1018   search the database with a LDAP-like expression using indexes
1019   returns -1 if an indexed search is not possible, in which
1020   case the caller should call ltdb_search_full()
1021 */
1022 int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
1023 {
1024         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(ac->module), struct ltdb_private);
1025         struct dn_list *dn_list;
1026         int ret;
1027
1028         /* see if indexing is enabled */
1029         if (!ltdb->cache->attribute_indexes && 
1030             !ltdb->cache->one_level_indexes &&
1031             ac->scope != LDB_SCOPE_BASE) {
1032                 /* fallback to a full search */
1033                 return LDB_ERR_OPERATIONS_ERROR;
1034         }
1035
1036         dn_list = talloc_zero(ac, struct dn_list);
1037         if (dn_list == NULL) {
1038                 return ldb_module_oom(ac->module);
1039         }
1040
1041         switch (ac->scope) {
1042         case LDB_SCOPE_BASE:
1043                 dn_list->dn = talloc_array(dn_list, struct ldb_val, 1);
1044                 if (dn_list->dn == NULL) {
1045                         talloc_free(dn_list);
1046                         return ldb_module_oom(ac->module);
1047                 }
1048                 dn_list->dn[0].data = discard_const_p(unsigned char, ldb_dn_get_linearized(ac->base));
1049                 if (dn_list->dn[0].data == NULL) {
1050                         talloc_free(dn_list);
1051                         return ldb_module_oom(ac->module);
1052                 }
1053                 dn_list->dn[0].length = strlen((char *)dn_list->dn[0].data);
1054                 dn_list->count = 1;
1055                 break;          
1056
1057         case LDB_SCOPE_ONELEVEL:
1058                 if (!ltdb->cache->one_level_indexes) {
1059                         talloc_free(dn_list);
1060                         return LDB_ERR_OPERATIONS_ERROR;
1061                 }
1062                 ret = ltdb_index_dn_one(ac->module, ac->base, dn_list);
1063                 if (ret != LDB_SUCCESS) {
1064                         talloc_free(dn_list);
1065                         return ret;
1066                 }
1067                 break;
1068
1069         case LDB_SCOPE_SUBTREE:
1070         case LDB_SCOPE_DEFAULT:
1071                 if (!ltdb->cache->attribute_indexes) {
1072                         talloc_free(dn_list);
1073                         return LDB_ERR_OPERATIONS_ERROR;
1074                 }
1075                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
1076                 if (ret != LDB_SUCCESS) {
1077                         talloc_free(dn_list);
1078                         return ret;
1079                 }
1080                 ltdb_dn_list_remove_duplicates(dn_list);
1081                 break;
1082         }
1083
1084         ret = ltdb_index_filter(dn_list, ac, match_count);
1085         talloc_free(dn_list);
1086         return ret;
1087 }
1088
1089 /*
1090   add an index entry for one message element
1091 */
1092 static int ltdb_index_add1(struct ldb_module *module, const char *dn,
1093                            struct ldb_message_element *el, int v_idx)
1094 {
1095         struct ldb_context *ldb;
1096         struct ldb_dn *dn_key;
1097         int ret;
1098         const struct ldb_schema_attribute *a;
1099         struct dn_list *list;
1100         unsigned alloc_len;
1101
1102         ldb = ldb_module_get_ctx(module);
1103
1104         list = talloc_zero(module, struct dn_list);
1105         if (list == NULL) {
1106                 return LDB_ERR_OPERATIONS_ERROR;
1107         }
1108
1109         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], &a);
1110         if (!dn_key) {
1111                 talloc_free(list);
1112                 return LDB_ERR_OPERATIONS_ERROR;
1113         }
1114         talloc_steal(list, dn_key);
1115
1116         ret = ltdb_dn_list_load(module, dn_key, list);
1117         if (ret != LDB_SUCCESS && ret != LDB_ERR_NO_SUCH_OBJECT) {
1118                 talloc_free(list);
1119                 return ret;
1120         }
1121
1122         if (ltdb_dn_list_find_str(list, dn) != -1) {
1123                 talloc_free(list);
1124                 return LDB_SUCCESS;
1125         }
1126
1127         if (list->count > 0 &&
1128             a->flags & LDB_ATTR_FLAG_UNIQUE_INDEX) {
1129                 talloc_free(list);
1130                 ldb_asprintf_errstring(ldb, __location__ ": unique index violation on %s in %s",
1131                                        el->name, dn);
1132                 return LDB_ERR_ENTRY_ALREADY_EXISTS;            
1133         }
1134
1135         /* overallocate the list a bit, to reduce the number of
1136          * realloc trigered copies */    
1137         alloc_len = ((list->count+1)+7) & ~7;
1138         list->dn = talloc_realloc(list, list->dn, struct ldb_val, alloc_len);
1139         if (list->dn == NULL) {
1140                 talloc_free(list);
1141                 return LDB_ERR_OPERATIONS_ERROR;
1142         }
1143         list->dn[list->count].data = (uint8_t *)talloc_strdup(list->dn, dn);
1144         list->dn[list->count].length = strlen(dn);
1145         list->count++;
1146
1147         ret = ltdb_dn_list_store(module, dn_key, list);
1148
1149         talloc_free(list);
1150
1151         return ret;
1152 }
1153
1154 /*
1155   add index entries for one elements in a message
1156  */
1157 static int ltdb_index_add_el(struct ldb_module *module, const char *dn,
1158                              struct ldb_message_element *el)
1159 {
1160         unsigned int i;
1161         for (i = 0; i < el->num_values; i++) {
1162                 int ret = ltdb_index_add1(module, dn, el, i);
1163                 if (ret != LDB_SUCCESS) {
1164                         return ret;
1165                 }
1166         }
1167
1168         return LDB_SUCCESS;
1169 }
1170
1171 /*
1172   add index entries for all elements in a message
1173  */
1174 static int ltdb_index_add_all(struct ldb_module *module, const char *dn,
1175                               struct ldb_message_element *elements, int num_el)
1176 {
1177         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1178         unsigned int i;
1179
1180         if (dn[0] == '@') {
1181                 return LDB_SUCCESS;
1182         }
1183
1184         if (ltdb->cache->indexlist->num_elements == 0) {
1185                 /* no indexed fields */
1186                 return LDB_SUCCESS;
1187         }
1188
1189         for (i = 0; i < num_el; i++) {
1190                 int ret;
1191                 if (!ltdb_is_indexed(ltdb->cache->indexlist, elements[i].name)) {
1192                         continue;
1193                 }
1194                 ret = ltdb_index_add_el(module, dn, &elements[i]);
1195                 if (ret != LDB_SUCCESS) {
1196                         struct ldb_context *ldb = ldb_module_get_ctx(module);
1197                         ldb_asprintf_errstring(ldb,
1198                                                __location__ ": Failed to re-index %s in %s - %s",
1199                                                elements[i].name, dn, ldb_errstring(ldb));
1200                         return ret;
1201                 }
1202         }
1203
1204         return LDB_SUCCESS;
1205 }
1206
1207
1208 /*
1209   insert a one level index for a message
1210 */
1211 static int ltdb_index_onelevel(struct ldb_module *module, const struct ldb_message *msg, int add)
1212 {
1213         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1214         struct ldb_message_element el;
1215         struct ldb_val val;
1216         struct ldb_dn *pdn;
1217         const char *dn;
1218         int ret;
1219
1220         /* We index for ONE Level only if requested */
1221         if (!ltdb->cache->one_level_indexes) {
1222                 return LDB_SUCCESS;
1223         }
1224
1225         pdn = ldb_dn_get_parent(module, msg->dn);
1226         if (pdn == NULL) {
1227                 return LDB_ERR_OPERATIONS_ERROR;
1228         }
1229
1230         dn = ldb_dn_get_linearized(msg->dn);
1231         if (dn == NULL) {
1232                 talloc_free(pdn);
1233                 return LDB_ERR_OPERATIONS_ERROR;
1234         }
1235
1236         val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(pdn));
1237         if (val.data == NULL) {
1238                 talloc_free(pdn);
1239                 return LDB_ERR_OPERATIONS_ERROR;
1240         }
1241
1242         val.length = strlen((char *)val.data);
1243         el.name = LTDB_IDXONE;
1244         el.values = &val;
1245         el.num_values = 1;
1246
1247         if (add) {
1248                 ret = ltdb_index_add1(module, dn, &el, 0);
1249         } else { /* delete */
1250                 ret = ltdb_index_del_value(module, msg->dn, &el, 0);
1251         }
1252
1253         talloc_free(pdn);
1254
1255         return ret;
1256 }
1257
1258 /*
1259   add the index entries for a new element in a record
1260   The caller guarantees that these element values are not yet indexed
1261 */
1262 int ltdb_index_add_element(struct ldb_module *module, struct ldb_dn *dn, 
1263                            struct ldb_message_element *el)
1264 {
1265         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1266         if (ldb_dn_is_special(dn)) {
1267                 return LDB_SUCCESS;
1268         }
1269         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1270                 return LDB_SUCCESS;
1271         }
1272         return ltdb_index_add_el(module, ldb_dn_get_linearized(dn), el);
1273 }
1274
1275 /*
1276   add the index entries for a new record
1277 */
1278 int ltdb_index_add_new(struct ldb_module *module, const struct ldb_message *msg)
1279 {
1280         const char *dn;
1281         int ret;
1282
1283         if (ldb_dn_is_special(msg->dn)) {
1284                 return LDB_SUCCESS;
1285         }
1286
1287         dn = ldb_dn_get_linearized(msg->dn);
1288         if (dn == NULL) {
1289                 return LDB_ERR_OPERATIONS_ERROR;
1290         }
1291
1292         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1293         if (ret != LDB_SUCCESS) {
1294                 return ret;
1295         }
1296
1297         return ltdb_index_onelevel(module, msg, 1);
1298 }
1299
1300
1301 /*
1302   delete an index entry for one message element
1303 */
1304 int ltdb_index_del_value(struct ldb_module *module, struct ldb_dn *dn,
1305                          struct ldb_message_element *el, unsigned int v_idx)
1306 {
1307         struct ldb_context *ldb;
1308         struct ldb_dn *dn_key;
1309         const char *dn_str;
1310         int ret, i;
1311         unsigned int j;
1312         struct dn_list *list;
1313
1314         ldb = ldb_module_get_ctx(module);
1315
1316         dn_str = ldb_dn_get_linearized(dn);
1317         if (dn_str == NULL) {
1318                 return LDB_ERR_OPERATIONS_ERROR;
1319         }
1320
1321         if (dn_str[0] == '@') {
1322                 return LDB_SUCCESS;
1323         }
1324
1325         dn_key = ltdb_index_key(ldb, el->name, &el->values[v_idx], NULL);
1326         if (!dn_key) {
1327                 return LDB_ERR_OPERATIONS_ERROR;
1328         }
1329
1330         list = talloc_zero(dn_key, struct dn_list);
1331         if (list == NULL) {
1332                 talloc_free(dn_key);
1333                 return LDB_ERR_OPERATIONS_ERROR;
1334         }
1335
1336         ret = ltdb_dn_list_load(module, dn_key, list);
1337         if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1338                 /* it wasn't indexed. Did we have an earlier error? If we did then
1339                    its gone now */
1340                 talloc_free(dn_key);
1341                 return LDB_SUCCESS;
1342         }
1343
1344         if (ret != LDB_SUCCESS) {
1345                 talloc_free(dn_key);
1346                 return ret;
1347         }
1348
1349         i = ltdb_dn_list_find_str(list, dn_str);
1350         if (i == -1) {
1351                 /* nothing to delete */
1352                 talloc_free(dn_key);
1353                 return LDB_SUCCESS;             
1354         }
1355
1356         j = (unsigned int) i;
1357         if (j != list->count - 1) {
1358                 memmove(&list->dn[j], &list->dn[j+1], sizeof(list->dn[0])*(list->count - (j+1)));
1359         }
1360         list->count--;
1361         list->dn = talloc_realloc(list, list->dn, struct ldb_val, list->count);
1362
1363         ret = ltdb_dn_list_store(module, dn_key, list);
1364
1365         talloc_free(dn_key);
1366
1367         return ret;
1368 }
1369
1370 /*
1371   delete the index entries for a element
1372   return -1 on failure
1373 */
1374 int ltdb_index_del_element(struct ldb_module *module, struct ldb_dn *dn,
1375                            struct ldb_message_element *el)
1376 {
1377         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1378         const char *dn_str;
1379         int ret;
1380         unsigned int i;
1381
1382         if (!ltdb->cache->attribute_indexes) {
1383                 /* no indexed fields */
1384                 return LDB_SUCCESS;
1385         }
1386
1387         dn_str = ldb_dn_get_linearized(dn);
1388         if (dn_str == NULL) {
1389                 return LDB_ERR_OPERATIONS_ERROR;
1390         }
1391
1392         if (dn_str[0] == '@') {
1393                 return LDB_SUCCESS;
1394         }
1395
1396         if (!ltdb_is_indexed(ltdb->cache->indexlist, el->name)) {
1397                 return LDB_SUCCESS;
1398         }
1399         for (i = 0; i < el->num_values; i++) {
1400                 ret = ltdb_index_del_value(module, dn, el, i);
1401                 if (ret != LDB_SUCCESS) {
1402                         return ret;
1403                 }
1404         }
1405
1406         return LDB_SUCCESS;
1407 }
1408
1409 /*
1410   delete the index entries for a record
1411   return -1 on failure
1412 */
1413 int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
1414 {
1415         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1416         int ret;
1417         unsigned int i;
1418
1419         if (ldb_dn_is_special(msg->dn)) {
1420                 return LDB_SUCCESS;
1421         }
1422
1423         ret = ltdb_index_onelevel(module, msg, 0);
1424         if (ret != LDB_SUCCESS) {
1425                 return ret;
1426         }
1427
1428         if (!ltdb->cache->attribute_indexes) {
1429                 /* no indexed fields */
1430                 return LDB_SUCCESS;
1431         }
1432
1433         for (i = 0; i < msg->num_elements; i++) {
1434                 ret = ltdb_index_del_element(module, msg->dn, &msg->elements[i]);
1435                 if (ret != LDB_SUCCESS) {
1436                         return ret;
1437                 }
1438         }
1439
1440         return LDB_SUCCESS;
1441 }
1442
1443
1444 /*
1445   traversal function that deletes all @INDEX records
1446 */
1447 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1448 {
1449         struct ldb_module *module = state;
1450         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1451         const char *dnstr = "DN=" LTDB_INDEX ":";
1452         struct dn_list list;
1453         struct ldb_dn *dn;
1454         struct ldb_val v;
1455         int ret;
1456
1457         if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
1458                 return 0;
1459         }
1460         /* we need to put a empty list in the internal tdb for this
1461          * index entry */
1462         list.dn = NULL;
1463         list.count = 0;
1464
1465         /* the offset of 3 is to remove the DN= prefix. */
1466         v.data = key.dptr + 3;
1467         v.length = strnlen((char *)key.dptr, key.dsize) - 3;
1468
1469         dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
1470         ret = ltdb_dn_list_store(module, dn, &list);
1471         if (ret != LDB_SUCCESS) {
1472                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1473                                        "Unable to store null index for %s\n",
1474                                                 ldb_dn_get_linearized(dn));
1475                 talloc_free(dn);
1476                 return -1;
1477         }
1478         talloc_free(dn);
1479         return 0;
1480 }
1481
1482 struct ltdb_reindex_context {
1483         struct ldb_module *module;
1484         int error;
1485 };
1486
1487 /*
1488   traversal function that adds @INDEX records during a re index
1489 */
1490 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1491 {
1492         struct ldb_context *ldb;
1493         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1494         struct ldb_module *module = ctx->module;
1495         struct ldb_message *msg;
1496         const char *dn = NULL;
1497         int ret;
1498         TDB_DATA key2;
1499
1500         ldb = ldb_module_get_ctx(module);
1501
1502         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1503             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1504                 return 0;
1505         }
1506
1507         msg = ldb_msg_new(module);
1508         if (msg == NULL) {
1509                 return -1;
1510         }
1511
1512         ret = ltdb_unpack_data(module, &data, msg);
1513         if (ret != 0) {
1514                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %s\n",
1515                                                 ldb_dn_get_linearized(msg->dn));
1516                 talloc_free(msg);
1517                 return -1;
1518         }
1519
1520         /* check if the DN key has changed, perhaps due to the
1521            case insensitivity of an element changing */
1522         key2 = ltdb_key(module, msg->dn);
1523         if (key2.dptr == NULL) {
1524                 /* probably a corrupt record ... darn */
1525                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s",
1526                                                 ldb_dn_get_linearized(msg->dn));
1527                 talloc_free(msg);
1528                 return 0;
1529         }
1530         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1531                 tdb_delete(tdb, key);
1532                 tdb_store(tdb, key2, data, 0);
1533         }
1534         talloc_free(key2.dptr);
1535
1536         if (msg->dn == NULL) {
1537                 dn = (char *)key.dptr + 3;
1538         } else {
1539                 dn = ldb_dn_get_linearized(msg->dn);
1540         }
1541
1542         ret = ltdb_index_onelevel(module, msg, 1);
1543         if (ret != LDB_SUCCESS) {
1544                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1545                           "Adding special ONE LEVEL index failed (%s)!",
1546                                                 ldb_dn_get_linearized(msg->dn));
1547                 talloc_free(msg);
1548                 return -1;
1549         }
1550
1551         ret = ltdb_index_add_all(module, dn, msg->elements, msg->num_elements);
1552
1553         if (ret != LDB_SUCCESS) {
1554                 ctx->error = ret;
1555                 talloc_free(msg);
1556                 return -1;
1557         }
1558
1559         talloc_free(msg);
1560
1561         return 0;
1562 }
1563
1564 /*
1565   force a complete reindex of the database
1566 */
1567 int ltdb_reindex(struct ldb_module *module)
1568 {
1569         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
1570         int ret;
1571         struct ltdb_reindex_context ctx;
1572
1573         if (ltdb_cache_reload(module) != 0) {
1574                 return LDB_ERR_OPERATIONS_ERROR;
1575         }
1576
1577         /* first traverse the database deleting any @INDEX records by
1578          * putting NULL entries in the in-memory tdb
1579          */
1580         ret = tdb_traverse(ltdb->tdb, delete_index, module);
1581         if (ret < 0) {
1582                 return LDB_ERR_OPERATIONS_ERROR;
1583         }
1584
1585         /* if we don't have indexes we have nothing todo */
1586         if (ltdb->cache->indexlist->num_elements == 0) {
1587                 return LDB_SUCCESS;
1588         }
1589
1590         ctx.module = module;
1591         ctx.error = 0;
1592
1593         /* now traverse adding any indexes for normal LDB records */
1594         ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
1595         if (ret < 0) {
1596                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1597                 ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s", ldb_errstring(ldb));
1598                 return LDB_ERR_OPERATIONS_ERROR;
1599         }
1600
1601         if (ctx.error != LDB_SUCCESS) {
1602                 struct ldb_context *ldb = ldb_module_get_ctx(module);
1603                 ldb_asprintf_errstring(ldb, "reindexing failed: %s", ldb_errstring(ldb));
1604                 return ctx.error;
1605         }
1606
1607         return LDB_SUCCESS;
1608 }