r19832: better prototypes for the linearization functions:
[kamenim/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_index.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5
6      ** NOTE! The following LGPL license applies to the ldb
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9    
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 2 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, write to the Free Software
22    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb tdb backend - indexing
29  *
30  *  Description: indexing routines for ldb tdb backend
31  *
32  *  Author: Andrew Tridgell
33  */
34
35 #include "includes.h"
36 #include "ldb/include/includes.h"
37
38 #include "ldb/ldb_tdb/ldb_tdb.h"
39
40 /*
41   find an element in a list, using the given comparison function and
42   assuming that the list is already sorted using comp_fn
43
44   return -1 if not found, or the index of the first occurance of needle if found
45 */
46 static int ldb_list_find(const void *needle, 
47                          const void *base, size_t nmemb, size_t size, 
48                          comparison_fn_t comp_fn)
49 {
50         const char *base_p = base;
51         size_t min_i, max_i, test_i;
52
53         if (nmemb == 0) {
54                 return -1;
55         }
56
57         min_i = 0;
58         max_i = nmemb-1;
59
60         while (min_i < max_i) {
61                 int r;
62
63                 test_i = (min_i + max_i) / 2;
64                 /* the following cast looks strange, but is
65                  correct. The key to understanding it is that base_p
66                  is a pointer to an array of pointers, so we have to
67                  dereference it after casting to void **. The strange
68                  const in the middle gives us the right type of pointer
69                  after the dereference  (tridge) */
70                 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
71                 if (r == 0) {
72                         /* scan back for first element */
73                         while (test_i > 0 &&
74                                comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
75                                 test_i--;
76                         }
77                         return test_i;
78                 }
79                 if (r < 0) {
80                         if (test_i == 0) {
81                                 return -1;
82                         }
83                         max_i = test_i - 1;
84                 }
85                 if (r > 0) {
86                         min_i = test_i + 1;
87                 }
88         }
89
90         if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
91                 return min_i;
92         }
93
94         return -1;
95 }
96
97 struct dn_list {
98         unsigned int count;
99         char **dn;
100 };
101
102 /*
103   return the dn key to be used for an index
104   caller frees
105 */
106 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
107                         const char *attr, const struct ldb_val *value)
108 {
109         struct ldb_dn *ret;
110         char *dn;
111         struct ldb_val v;
112         const struct ldb_attrib_handler *h;
113         char *attr_folded;
114
115         attr_folded = ldb_attr_casefold(ldb, attr);
116         if (!attr_folded) {
117                 return NULL;
118         }
119
120         h = ldb_attrib_handler(ldb, attr);
121         if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
122                 /* canonicalisation can be refused. For example, 
123                    a attribute that takes wildcards will refuse to canonicalise
124                    if the value contains a wildcard */
125                 talloc_free(attr_folded);
126                 return NULL;
127         }
128         if (ldb_should_b64_encode(&v)) {
129                 char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
130                 if (!vstr) return NULL;
131                 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
132                 talloc_free(vstr);
133                 if (v.data != value->data) {
134                         talloc_free(v.data);
135                 }
136                 talloc_free(attr_folded);
137                 if (dn == NULL) return NULL;
138                 goto done;
139         }
140
141         dn = talloc_asprintf(ldb, "%s:%s:%.*s", 
142                               LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
143
144         if (v.data != value->data) {
145                 talloc_free(v.data);
146         }
147         talloc_free(attr_folded);
148
149 done:
150         ret = ldb_dn_new(ldb, ldb, dn);
151         talloc_free(dn);
152         return ret;
153 }
154
155 /*
156   see if a attribute value is in the list of indexed attributes
157 */
158 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
159                             unsigned int *v_idx, const char *key)
160 {
161         unsigned int i, j;
162         for (i=0;i<msg->num_elements;i++) {
163                 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
164                         const struct ldb_message_element *el = 
165                                 &msg->elements[i];
166                         for (j=0;j<el->num_values;j++) {
167                                 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
168                                         if (v_idx) {
169                                                 *v_idx = j;
170                                         }
171                                         return i;
172                                 }
173                         }
174                 }
175         }
176         return -1;
177 }
178
179 /* used in sorting dn lists */
180 static int list_cmp(const char **s1, const char **s2)
181 {
182         return strcmp(*s1, *s2);
183 }
184
185 /*
186   return a list of dn's that might match a simple indexed search or
187  */
188 static int ltdb_index_dn_simple(struct ldb_module *module, 
189                                 const struct ldb_parse_tree *tree,
190                                 const struct ldb_message *index_list,
191                                 struct dn_list *list)
192 {
193         struct ldb_context *ldb = module->ldb;
194         struct ldb_dn *dn;
195         int ret;
196         unsigned int i, j;
197         struct ldb_message *msg;
198
199         list->count = 0;
200         list->dn = NULL;
201
202         /* if the attribute isn't in the list of indexed attributes then
203            this node needs a full search */
204         if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
205                 return -1;
206         }
207
208         /* the attribute is indexed. Pull the list of DNs that match the 
209            search criterion */
210         dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
211         if (!dn) return -1;
212
213         msg = talloc(list, struct ldb_message);
214         if (msg == NULL) {
215                 return -1;
216         }
217
218         ret = ltdb_search_dn1(module, dn, msg);
219         talloc_free(dn);
220         if (ret == 0 || ret == -1) {
221                 return ret;
222         }
223
224         for (i=0;i<msg->num_elements;i++) {
225                 struct ldb_message_element *el;
226
227                 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
228                         continue;
229                 }
230
231                 el = &msg->elements[i];
232
233                 list->dn = talloc_array(list, char *, el->num_values);
234                 if (!list->dn) {
235                         talloc_free(msg);
236                         return -1;
237                 }
238
239                 for (j=0;j<el->num_values;j++) {
240                         list->dn[list->count] = 
241                                 talloc_strdup(list->dn, (char *)el->values[j].data);
242                         if (!list->dn[list->count]) {
243                                 talloc_free(msg);
244                                 return -1;
245                         }
246                         list->count++;
247                 }
248         }
249
250         talloc_free(msg);
251
252         if (list->count > 1) {
253                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
254         }
255
256         return 1;
257 }
258
259
260 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
261
262 /*
263   return a list of dn's that might match a simple indexed search on
264   the special objectclass attribute
265  */
266 static int ltdb_index_dn_objectclass(struct ldb_module *module, 
267                                      const struct ldb_parse_tree *tree,
268                                      const struct ldb_message *index_list,
269                                      struct dn_list *list)
270 {
271         struct ldb_context *ldb = module->ldb;
272         unsigned int i;
273         int ret;
274         const char *target = (const char *)tree->u.equality.value.data;
275         const char **subclasses;
276
277         list->count = 0;
278         list->dn = NULL;
279
280         ret = ltdb_index_dn_simple(module, tree, index_list, list);
281
282         subclasses = ldb_subclass_list(module->ldb, target);
283
284         if (subclasses == NULL) {
285                 return ret;
286         }
287
288         for (i=0;subclasses[i];i++) {
289                 struct ldb_parse_tree tree2;
290                 struct dn_list *list2;
291                 tree2.operation = LDB_OP_EQUALITY;
292                 tree2.u.equality.attr = LTDB_OBJECTCLASS;
293                 if (!tree2.u.equality.attr) {
294                         return -1;
295                 }
296                 tree2.u.equality.value.data = 
297                         (uint8_t *)talloc_strdup(list, subclasses[i]);
298                 if (tree2.u.equality.value.data == NULL) {
299                         return -1;                      
300                 }
301                 tree2.u.equality.value.length = strlen(subclasses[i]);
302                 list2 = talloc(list, struct dn_list);
303                 if (list2 == NULL) {
304                         talloc_free(tree2.u.equality.value.data);
305                         return -1;
306                 }
307                 if (ltdb_index_dn_objectclass(module, &tree2, 
308                                               index_list, list2) == 1) {
309                         if (list->count == 0) {
310                                 *list = *list2;
311                                 ret = 1;
312                         } else {
313                                 list_union(ldb, list, list2);
314                                 talloc_free(list2);
315                         }
316                 }
317                 talloc_free(tree2.u.equality.value.data);
318         }
319
320         return ret;
321 }
322
323 /*
324   return a list of dn's that might match a leaf indexed search
325  */
326 static int ltdb_index_dn_leaf(struct ldb_module *module, 
327                               const struct ldb_parse_tree *tree,
328                               const struct ldb_message *index_list,
329                               struct dn_list *list)
330 {
331         if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
332                 return ltdb_index_dn_objectclass(module, tree, index_list, list);
333         }
334         if (ldb_attr_dn(tree->u.equality.attr) == 0) {
335                 list->dn = talloc_array(list, char *, 1);
336                 if (list->dn == NULL) {
337                         ldb_oom(module->ldb);
338                         return -1;
339                 }
340                 list->dn[0] = talloc_strdup(list, (char *)tree->u.equality.value.data);
341                 if (list->dn[0] == NULL) {
342                         ldb_oom(module->ldb);
343                         return -1;
344                 }
345                 list->count = 1;
346                 return 1;
347         }
348         return ltdb_index_dn_simple(module, tree, index_list, list);
349 }
350
351
352 /*
353   list intersection
354   list = list & list2
355   relies on the lists being sorted
356 */
357 static int list_intersect(struct ldb_context *ldb,
358                           struct dn_list *list, const struct dn_list *list2)
359 {
360         struct dn_list *list3;
361         unsigned int i;
362
363         if (list->count == 0 || list2->count == 0) {
364                 /* 0 & X == 0 */
365                 return 0;
366         }
367
368         list3 = talloc(ldb, struct dn_list);
369         if (list3 == NULL) {
370                 return -1;
371         }
372
373         list3->dn = talloc_array(list3, char *, list->count);
374         if (!list3->dn) {
375                 talloc_free(list3);
376                 return -1;
377         }
378         list3->count = 0;
379
380         for (i=0;i<list->count;i++) {
381                 if (ldb_list_find(list->dn[i], list2->dn, list2->count, 
382                               sizeof(char *), (comparison_fn_t)strcmp) != -1) {
383                         list3->dn[list3->count] = talloc_move(list3->dn, &list->dn[i]);
384                         list3->count++;
385                 } else {
386                         talloc_free(list->dn[i]);
387                 }               
388         }
389
390         talloc_free(list->dn);
391         list->dn = talloc_move(list, &list3->dn);
392         list->count = list3->count;
393         talloc_free(list3);
394
395         return 0;
396 }
397
398
399 /*
400   list union
401   list = list | list2
402   relies on the lists being sorted
403 */
404 static int list_union(struct ldb_context *ldb, 
405                       struct dn_list *list, const struct dn_list *list2)
406 {
407         unsigned int i;
408         char **d;
409         unsigned int count = list->count;
410
411         if (list->count == 0 && list2->count == 0) {
412                 /* 0 | 0 == 0 */
413                 return 0;
414         }
415
416         d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
417         if (!d) {
418                 return -1;
419         }
420         list->dn = d;
421
422         for (i=0;i<list2->count;i++) {
423                 if (ldb_list_find(list2->dn[i], list->dn, count, 
424                               sizeof(char *), (comparison_fn_t)strcmp) == -1) {
425                         list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
426                         if (!list->dn[list->count]) {
427                                 return -1;
428                         }
429                         list->count++;
430                 }               
431         }
432
433         if (list->count != count) {
434                 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
435         }
436
437         return 0;
438 }
439
440 static int ltdb_index_dn(struct ldb_module *module, 
441                          const struct ldb_parse_tree *tree,
442                          const struct ldb_message *index_list,
443                          struct dn_list *list);
444
445
446 /*
447   OR two index results
448  */
449 static int ltdb_index_dn_or(struct ldb_module *module, 
450                             const struct ldb_parse_tree *tree,
451                             const struct ldb_message *index_list,
452                             struct dn_list *list)
453 {
454         struct ldb_context *ldb = module->ldb;
455         unsigned int i;
456         int ret;
457         
458         ret = -1;
459         list->dn = NULL;
460         list->count = 0;
461
462         for (i=0;i<tree->u.list.num_elements;i++) {
463                 struct dn_list *list2;
464                 int v;
465
466                 list2 = talloc(module, struct dn_list);
467                 if (list2 == NULL) {
468                         return -1;
469                 }
470
471                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
472
473                 if (v == 0) {
474                         /* 0 || X == X */
475                         if (ret == -1) {
476                                 ret = 0;
477                         }
478                         talloc_free(list2);
479                         continue;
480                 }
481
482                 if (v == -1) {
483                         /* 1 || X == 1 */
484                         talloc_free(list->dn);
485                         talloc_free(list2);
486                         return -1;
487                 }
488
489                 if (ret == -1) {
490                         ret = 1;
491                         list->dn = talloc_move(list, &list2->dn);
492                         list->count = list2->count;
493                 } else {
494                         if (list_union(ldb, list, list2) == -1) {
495                                 talloc_free(list2);
496                                 return -1;
497                         }
498                         ret = 1;
499                 }
500                 talloc_free(list2);
501         }
502
503         if (list->count == 0) {
504                 return 0;
505         }
506
507         return ret;
508 }
509
510
511 /*
512   NOT an index results
513  */
514 static int ltdb_index_dn_not(struct ldb_module *module, 
515                              const struct ldb_parse_tree *tree,
516                              const struct ldb_message *index_list,
517                              struct dn_list *list)
518 {
519         /* the only way to do an indexed not would be if we could
520            negate the not via another not or if we knew the total
521            number of database elements so we could know that the
522            existing expression covered the whole database. 
523            
524            instead, we just give up, and rely on a full index scan
525            (unless an outer & manages to reduce the list)
526         */
527         return -1;
528 }
529
530 /*
531   AND two index results
532  */
533 static int ltdb_index_dn_and(struct ldb_module *module, 
534                              const struct ldb_parse_tree *tree,
535                              const struct ldb_message *index_list,
536                              struct dn_list *list)
537 {
538         struct ldb_context *ldb = module->ldb;
539         unsigned int i;
540         int ret;
541         
542         ret = -1;
543         list->dn = NULL;
544         list->count = 0;
545
546         for (i=0;i<tree->u.list.num_elements;i++) {
547                 struct dn_list *list2;
548                 int v;
549
550                 list2 = talloc(module, struct dn_list);
551                 if (list2 == NULL) {
552                         return -1;
553                 }
554
555                 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
556
557                 if (v == 0) {
558                         /* 0 && X == 0 */
559                         talloc_free(list->dn);
560                         talloc_free(list2);
561                         return 0;
562                 }
563
564                 if (v == -1) {
565                         talloc_free(list2);
566                         continue;
567                 }
568
569                 if (ret == -1) {
570                         ret = 1;
571                         talloc_free(list->dn);
572                         list->dn = talloc_move(list, &list2->dn);
573                         list->count = list2->count;
574                 } else {
575                         if (list_intersect(ldb, list, list2) == -1) {
576                                 talloc_free(list2);
577                                 return -1;
578                         }
579                 }
580
581                 talloc_free(list2);
582
583                 if (list->count == 0) {
584                         talloc_free(list->dn);
585                         return 0;
586                 }
587         }
588
589         return ret;
590 }
591
592 /*
593   return a list of dn's that might match a indexed search or
594   -1 if an error. return 0 for no matches, or 1 for matches
595  */
596 static int ltdb_index_dn(struct ldb_module *module, 
597                          const struct ldb_parse_tree *tree,
598                          const struct ldb_message *index_list,
599                          struct dn_list *list)
600 {
601         int ret = -1;
602
603         switch (tree->operation) {
604         case LDB_OP_AND:
605                 ret = ltdb_index_dn_and(module, tree, index_list, list);
606                 break;
607
608         case LDB_OP_OR:
609                 ret = ltdb_index_dn_or(module, tree, index_list, list);
610                 break;
611
612         case LDB_OP_NOT:
613                 ret = ltdb_index_dn_not(module, tree, index_list, list);
614                 break;
615
616         case LDB_OP_EQUALITY:
617                 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
618                 break;
619
620         case LDB_OP_SUBSTRING:
621         case LDB_OP_GREATER:
622         case LDB_OP_LESS:
623         case LDB_OP_PRESENT:
624         case LDB_OP_APPROX:
625         case LDB_OP_EXTENDED:
626                 /* we can't index with fancy bitops yet */
627                 ret = -1;
628                 break;
629         }
630
631         return ret;
632 }
633
634 /*
635   filter a candidate dn_list from an indexed search into a set of results
636   extracting just the given attributes
637 */
638 static int ltdb_index_filter(const struct dn_list *dn_list, 
639                              struct ldb_handle *handle)
640 {
641         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
642         struct ldb_reply *ares = NULL;
643         unsigned int i;
644
645         for (i = 0; i < dn_list->count; i++) {
646                 struct ldb_dn *dn;
647                 int ret;
648
649                 ares = talloc_zero(ac, struct ldb_reply);
650                 if (!ares) {
651                         handle->status = LDB_ERR_OPERATIONS_ERROR;
652                         handle->state = LDB_ASYNC_DONE;
653                         return LDB_ERR_OPERATIONS_ERROR;
654                 }
655
656                 ares->message = ldb_msg_new(ares);
657                 if (!ares->message) {
658                         handle->status = LDB_ERR_OPERATIONS_ERROR;
659                         handle->state = LDB_ASYNC_DONE;
660                         talloc_free(ares);
661                         return LDB_ERR_OPERATIONS_ERROR;
662                 }
663
664
665                 dn = ldb_dn_new(ares->message, ac->module->ldb, dn_list->dn[i]);
666                 if (dn == NULL) {
667                         talloc_free(ares);
668                         return LDB_ERR_OPERATIONS_ERROR;
669                 }
670
671                 ret = ltdb_search_dn1(ac->module, dn, ares->message);
672                 talloc_free(dn);
673                 if (ret == 0) {
674                         /* the record has disappeared? yes, this can happen */
675                         talloc_free(ares);
676                         continue;
677                 }
678
679                 if (ret == -1) {
680                         /* an internal error */
681                         talloc_free(ares);
682                         return LDB_ERR_OPERATIONS_ERROR;
683                 }
684
685                 if (!ldb_match_msg(ac->module->ldb, ares->message, ac->tree, ac->base, ac->scope)) {
686                         talloc_free(ares);
687                         continue;
688                 }
689
690                 /* filter the attributes that the user wants */
691                 ret = ltdb_filter_attrs(ares->message, ac->attrs);
692
693                 if (ret == -1) {
694                         handle->status = LDB_ERR_OPERATIONS_ERROR;
695                         handle->state = LDB_ASYNC_DONE;
696                         talloc_free(ares);
697                         return LDB_ERR_OPERATIONS_ERROR;
698                 }
699
700                 ares->type = LDB_REPLY_ENTRY;
701                 handle->state = LDB_ASYNC_PENDING;
702                 handle->status = ac->callback(ac->module->ldb, ac->context, ares);
703
704                 if (handle->status != LDB_SUCCESS) {
705                         handle->state = LDB_ASYNC_DONE;
706                         return handle->status;
707                 }
708         }
709
710         return LDB_SUCCESS;
711 }
712
713 /*
714   search the database with a LDAP-like expression using indexes
715   returns -1 if an indexed search is not possible, in which
716   case the caller should call ltdb_search_full() 
717 */
718 int ltdb_search_indexed(struct ldb_handle *handle)
719 {
720         struct ltdb_context *ac = talloc_get_type(handle->private_data, struct ltdb_context);
721         struct ltdb_private *ltdb = talloc_get_type(ac->module->private_data, struct ltdb_private);
722         struct dn_list *dn_list;
723         int ret;
724
725         if (ltdb->cache->indexlist->num_elements == 0 && 
726             ac->scope != LDB_SCOPE_BASE) {
727                 /* no index list? must do full search */
728                 return -1;
729         }
730
731         dn_list = talloc(handle, struct dn_list);
732         if (dn_list == NULL) {
733                 return -1;
734         }
735
736         if (ac->scope == LDB_SCOPE_BASE) {
737                 /* with BASE searches only one DN can match */
738                 dn_list->dn = talloc_array(dn_list, char *, 1);
739                 if (dn_list->dn == NULL) {
740                         ldb_oom(ac->module->ldb);
741                         return -1;
742                 }
743                 dn_list->dn[0] = ldb_dn_alloc_linearized(dn_list, ac->base);
744                 if (dn_list->dn[0] == NULL) {
745                         ldb_oom(ac->module->ldb);
746                         return -1;
747                 }
748                 dn_list->count = 1;
749                 ret = 1;
750         } else {
751                 ret = ltdb_index_dn(ac->module, ac->tree, ltdb->cache->indexlist, dn_list);
752         }
753
754         if (ret == 1) {
755                 /* we've got a candidate list - now filter by the full tree
756                    and extract the needed attributes */
757                 ret = ltdb_index_filter(dn_list, handle);
758                 handle->status = ret;
759                 handle->state = LDB_ASYNC_DONE;
760         }
761
762         talloc_free(dn_list);
763
764         return ret;
765 }
766
767 /*
768   add a index element where this is the first indexed DN for this value
769 */
770 static int ltdb_index_add1_new(struct ldb_context *ldb, 
771                                struct ldb_message *msg,
772                                struct ldb_message_element *el,
773                                const char *dn)
774 {
775         struct ldb_message_element *el2;
776
777         /* add another entry */
778         el2 = talloc_realloc(msg, msg->elements, 
779                                struct ldb_message_element, msg->num_elements+1);
780         if (!el2) {
781                 return -1;
782         }
783
784         msg->elements = el2;
785         msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
786         if (!msg->elements[msg->num_elements].name) {
787                 return -1;
788         }
789         msg->elements[msg->num_elements].num_values = 0;
790         msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
791         if (!msg->elements[msg->num_elements].values) {
792                 return -1;
793         }
794         msg->elements[msg->num_elements].values[0].length = strlen(dn);
795         msg->elements[msg->num_elements].values[0].data = discard_const_p(uint8_t, dn);
796         msg->elements[msg->num_elements].num_values = 1;
797         msg->num_elements++;
798
799         return 0;
800 }
801
802
803 /*
804   add a index element where this is not the first indexed DN for this
805   value
806 */
807 static int ltdb_index_add1_add(struct ldb_context *ldb, 
808                                struct ldb_message *msg,
809                                struct ldb_message_element *el,
810                                int idx,
811                                const char *dn)
812 {
813         struct ldb_val *v2;
814         unsigned int i;
815
816         /* for multi-valued attributes we can end up with repeats */
817         for (i=0;i<msg->elements[idx].num_values;i++) {
818                 if (strcmp(dn, (char *)msg->elements[idx].values[i].data) == 0) {
819                         return 0;
820                 }
821         }
822
823         v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
824                               struct ldb_val, 
825                               msg->elements[idx].num_values+1);
826         if (!v2) {
827                 return -1;
828         }
829         msg->elements[idx].values = v2;
830
831         msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
832         msg->elements[idx].values[msg->elements[idx].num_values].data = discard_const_p(uint8_t, dn);
833         msg->elements[idx].num_values++;
834
835         return 0;
836 }
837
838 /*
839   add an index entry for one message element
840 */
841 static int ltdb_index_add1(struct ldb_module *module, const char *dn, 
842                            struct ldb_message_element *el, int v_idx)
843 {
844         struct ldb_context *ldb = module->ldb;
845         struct ldb_message *msg;
846         struct ldb_dn *dn_key;
847         int ret;
848         unsigned int i;
849
850         msg = talloc(module, struct ldb_message);
851         if (msg == NULL) {
852                 errno = ENOMEM;
853                 return -1;
854         }
855
856         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
857         if (!dn_key) {
858                 talloc_free(msg);
859                 errno = ENOMEM;
860                 return -1;
861         }
862         talloc_steal(msg, dn_key);
863
864         ret = ltdb_search_dn1(module, dn_key, msg);
865         if (ret == -1) {
866                 talloc_free(msg);
867                 return -1;
868         }
869
870         if (ret == 0) {
871                 msg->dn = dn_key;
872                 msg->num_elements = 0;
873                 msg->elements = NULL;
874         }
875
876         for (i=0;i<msg->num_elements;i++) {
877                 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
878                         break;
879                 }
880         }
881
882         if (i == msg->num_elements) {
883                 ret = ltdb_index_add1_new(ldb, msg, el, dn);
884         } else {
885                 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
886         }
887
888         if (ret == 0) {
889                 ret = ltdb_store(module, msg, TDB_REPLACE);
890         }
891
892         talloc_free(msg);
893
894         return ret;
895 }
896
897 static int ltdb_index_add0(struct ldb_module *module, const char *dn,
898                            struct ldb_message_element *elements, int num_el)
899 {
900         struct ltdb_private *ltdb = module->private_data;
901         int ret;
902         unsigned int i, j;
903
904         if (dn[0] == '@') {
905                 return 0;
906         }
907
908         if (ltdb->cache->indexlist->num_elements == 0) {
909                 /* no indexed fields */
910                 return 0;
911         }
912
913         for (i = 0; i < num_el; i++) {
914                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name, 
915                                        NULL, LTDB_IDXATTR);
916                 if (ret == -1) {
917                         continue;
918                 }
919                 for (j = 0; j < elements[i].num_values; j++) {
920                         ret = ltdb_index_add1(module, dn, &elements[i], j);
921                         if (ret == -1) {
922                                 return -1;
923                         }
924                 }
925         }
926
927         return 0;
928 }
929
930 /*
931   add the index entries for a new record
932   return -1 on failure
933 */
934 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
935 {
936         const char *dn;
937         int ret;
938
939         dn = ldb_dn_get_linearized(msg->dn);
940         if (dn == NULL) {
941                 return -1;
942         }
943
944         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
945
946         return ret;
947 }
948
949
950 /*
951   delete an index entry for one message element
952 */
953 int ltdb_index_del_value(struct ldb_module *module, const char *dn, 
954                          struct ldb_message_element *el, int v_idx)
955 {
956         struct ldb_context *ldb = module->ldb;
957         struct ldb_message *msg;
958         struct ldb_dn *dn_key;
959         int ret, i;
960         unsigned int j;
961
962         if (dn[0] == '@') {
963                 return 0;
964         }
965
966         dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
967         if (!dn_key) {
968                 return -1;
969         }
970
971         msg = talloc(dn_key, struct ldb_message);
972         if (msg == NULL) {
973                 talloc_free(dn_key);
974                 return -1;
975         }
976
977         ret = ltdb_search_dn1(module, dn_key, msg);
978         if (ret == -1) {
979                 talloc_free(dn_key);
980                 return -1;
981         }
982
983         if (ret == 0) {
984                 /* it wasn't indexed. Did we have an earlier error? If we did then
985                    its gone now */
986                 talloc_free(dn_key);
987                 return 0;
988         }
989
990         i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
991         if (i == -1) {
992                 ldb_debug(ldb, LDB_DEBUG_ERROR,
993                                 "ERROR: dn %s not found in %s\n", dn,
994                                 ldb_dn_get_linearized(dn_key));
995                 /* it ain't there. hmmm */
996                 talloc_free(dn_key);
997                 return 0;
998         }
999
1000         if (j != msg->elements[i].num_values - 1) {
1001                 memmove(&msg->elements[i].values[j], 
1002                         &msg->elements[i].values[j+1], 
1003                         (msg->elements[i].num_values-(j+1)) * 
1004                         sizeof(msg->elements[i].values[0]));
1005         }
1006         msg->elements[i].num_values--;
1007
1008         if (msg->elements[i].num_values == 0) {
1009                 ret = ltdb_delete_noindex(module, dn_key);
1010         } else {
1011                 ret = ltdb_store(module, msg, TDB_REPLACE);
1012         }
1013
1014         talloc_free(dn_key);
1015
1016         return ret;
1017 }
1018
1019 /*
1020   delete the index entries for a record
1021   return -1 on failure
1022 */
1023 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
1024 {
1025         struct ltdb_private *ltdb = module->private_data;
1026         int ret;
1027         const char *dn;
1028         unsigned int i, j;
1029
1030         /* find the list of indexed fields */   
1031         if (ltdb->cache->indexlist->num_elements == 0) {
1032                 /* no indexed fields */
1033                 return 0;
1034         }
1035
1036         if (ldb_dn_is_special(msg->dn)) {
1037                 return 0;
1038         }
1039
1040         dn = ldb_dn_get_linearized(msg->dn);
1041         if (dn == NULL) {
1042                 return -1;
1043         }
1044
1045         for (i = 0; i < msg->num_elements; i++) {
1046                 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name, 
1047                                        NULL, LTDB_IDXATTR);
1048                 if (ret == -1) {
1049                         continue;
1050                 }
1051                 for (j = 0; j < msg->elements[i].num_values; j++) {
1052                         ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1053                         if (ret == -1) {
1054                                 return -1;
1055                         }
1056                 }
1057         }
1058
1059         return 0;
1060 }
1061
1062
1063 /*
1064   traversal function that deletes all @INDEX records
1065 */
1066 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1067 {
1068         const char *dn = "DN=" LTDB_INDEX ":";
1069         if (strncmp((char *)key.dptr, dn, strlen(dn)) == 0) {
1070                 return tdb_delete(tdb, key);
1071         }
1072         return 0;
1073 }
1074
1075 /*
1076   traversal function that adds @INDEX records during a re index
1077 */
1078 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1079 {
1080         struct ldb_module *module = state;
1081         struct ldb_message *msg;
1082         const char *dn = NULL;
1083         int ret;
1084         TDB_DATA key2;
1085
1086         if (strncmp((char *)key.dptr, "DN=@", 4) == 0 ||
1087             strncmp((char *)key.dptr, "DN=", 3) != 0) {
1088                 return 0;
1089         }
1090
1091         msg = talloc(module, struct ldb_message);
1092         if (msg == NULL) {
1093                 return -1;
1094         }
1095
1096         ret = ltdb_unpack_data(module, &data, msg);
1097         if (ret != 0) {
1098                 talloc_free(msg);
1099                 return -1;
1100         }
1101
1102         /* check if the DN key has changed, perhaps due to the 
1103            case insensitivity of an element changing */
1104         key2 = ltdb_key(module, msg->dn);
1105         if (key2.dptr == NULL) {
1106                 /* probably a corrupt record ... darn */
1107                 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1108                                                         ldb_dn_get_linearized(msg->dn));
1109                 talloc_free(msg);
1110                 return 0;
1111         }
1112         if (strcmp((char *)key2.dptr, (char *)key.dptr) != 0) {
1113                 tdb_delete(tdb, key);
1114                 tdb_store(tdb, key2, data, 0);
1115         }
1116         talloc_free(key2.dptr);
1117
1118         if (msg->dn == NULL) {
1119                 dn = (char *)key.dptr + 3;
1120         } else {
1121                 dn = ldb_dn_get_linearized(msg->dn);
1122         }
1123
1124         ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1125
1126         talloc_free(msg);
1127
1128         return ret;
1129 }
1130
1131 /*
1132   force a complete reindex of the database
1133 */
1134 int ltdb_reindex(struct ldb_module *module)
1135 {
1136         struct ltdb_private *ltdb = module->private_data;
1137         int ret;
1138
1139         if (ltdb_cache_reload(module) != 0) {
1140                 return -1;
1141         }
1142
1143         /* first traverse the database deleting any @INDEX records */
1144         ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1145         if (ret == -1) {
1146                 return -1;
1147         }
1148
1149         /* now traverse adding any indexes for normal LDB records */
1150         ret = tdb_traverse(ltdb->tdb, re_index, module);
1151         if (ret == -1) {
1152                 return -1;
1153         }
1154
1155         return 0;
1156 }