4 Copyright (C) Andrew Tridgell 2004
6 ** NOTE! The following LGPL license applies to the ldb
7 ** library. This does NOT imply that all of Samba is released
10 This library is free software; you can redistribute it and/or
11 modify it under the terms of the GNU Lesser General Public
12 License as published by the Free Software Foundation; either
13 version 2 of the License, or (at your option) any later version.
15 This library is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 Lesser General Public License for more details.
20 You should have received a copy of the GNU Lesser General Public
21 License along with this library; if not, write to the Free Software
22 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
28 * Component: ldb tdb backend - indexing
30 * Description: indexing routines for ldb tdb backend
32 * Author: Andrew Tridgell
36 #include "ldb/include/ldb.h"
37 #include "ldb/include/ldb_private.h"
38 #include "ldb/ldb_tdb/ldb_tdb.h"
41 find an element in a list, using the given comparison function and
42 assuming that the list is already sorted using comp_fn
44 return -1 if not found, or the index of the first occurance of needle if found
46 static int ldb_list_find(const void *needle,
47 const void *base, size_t nmemb, size_t size,
48 comparison_fn_t comp_fn)
50 const char *base_p = base;
51 size_t min_i, max_i, test_i;
60 while (min_i < max_i) {
63 test_i = (min_i + max_i) / 2;
64 r = comp_fn(needle, *(void * const *)(base_p + (size * test_i)));
66 /* scan back for first element */
68 comp_fn(needle, *(void * const *)(base_p + (size * (test_i-1)))) == 0) {
84 if (comp_fn(needle, *(void * const *)(base_p + (size * min_i))) == 0) {
97 return the dn key to be used for an index
100 static struct ldb_dn *ldb_dn_key(struct ldb_context *ldb,
101 const char *attr, const struct ldb_val *value)
106 const struct ldb_attrib_handler *h;
109 attr_folded = ldb_casefold(ldb, attr);
114 h = ldb_attrib_handler(ldb, attr);
115 if (h->canonicalise_fn(ldb, ldb, value, &v) != 0) {
116 /* canonicalisation can be refused. For example,
117 a attribute that takes wildcards will refuse to canonicalise
118 if the value contains a wildcard */
119 talloc_free(attr_folded);
122 if (ldb_should_b64_encode(&v)) {
123 char *vstr = ldb_base64_encode(ldb, v.data, v.length);
124 if (!vstr) return NULL;
125 dn = talloc_asprintf(ldb, "%s:%s::%s", LTDB_INDEX, attr_folded, vstr);
127 if (v.data != value->data) {
130 talloc_free(attr_folded);
131 if (dn == NULL) return NULL;
135 dn = talloc_asprintf(ldb, "%s:%s:%.*s",
136 LTDB_INDEX, attr_folded, (int)v.length, (char *)v.data);
138 if (v.data != value->data) {
141 talloc_free(attr_folded);
144 ret = ldb_dn_explode(ldb, dn);
150 see if a attribute value is in the list of indexed attributes
152 static int ldb_msg_find_idx(const struct ldb_message *msg, const char *attr,
153 unsigned int *v_idx, const char *key)
156 for (i=0;i<msg->num_elements;i++) {
157 if (ldb_attr_cmp(msg->elements[i].name, key) == 0) {
158 const struct ldb_message_element *el =
160 for (j=0;j<el->num_values;j++) {
161 if (ldb_attr_cmp((char *)el->values[j].data, attr) == 0) {
173 /* used in sorting dn lists */
174 static int list_cmp(const char **s1, const char **s2)
176 return strcmp(*s1, *s2);
180 return a list of dn's that might match a simple indexed search or
182 static int ltdb_index_dn_simple(struct ldb_module *module,
183 struct ldb_parse_tree *tree,
184 const struct ldb_message *index_list,
185 struct dn_list *list)
187 struct ldb_context *ldb = module->ldb;
191 struct ldb_message *msg;
196 /* if the attribute isn't in the list of indexed attributes then
197 this node needs a full search */
198 if (ldb_msg_find_idx(index_list, tree->u.equality.attr, NULL, LTDB_IDXATTR) == -1) {
202 /* the attribute is indexed. Pull the list of DNs that match the
204 dn = ldb_dn_key(ldb, tree->u.equality.attr, &tree->u.equality.value);
207 msg = talloc(list, struct ldb_message);
212 ret = ltdb_search_dn1(module, dn, msg);
214 if (ret == 0 || ret == -1) {
218 for (i=0;i<msg->num_elements;i++) {
219 struct ldb_message_element *el;
221 if (strcmp(msg->elements[i].name, LTDB_IDX) != 0) {
225 el = &msg->elements[i];
227 list->dn = talloc_array(list, char *, el->num_values);
232 for (j=0;j<el->num_values;j++) {
233 list->dn[list->count] =
234 talloc_strdup(list->dn, (char *)el->values[j].data);
235 if (!list->dn[list->count]) {
244 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t) list_cmp);
250 static int list_union(struct ldb_context *, struct dn_list *, const struct dn_list *);
253 return a list of dn's that might match a simple indexed search on
254 the special objectclass attribute
256 static int ltdb_index_dn_objectclass(struct ldb_module *module,
257 struct ldb_parse_tree *tree,
258 const struct ldb_message *index_list,
259 struct dn_list *list)
261 struct ldb_context *ldb = module->ldb;
264 const char *target = tree->u.equality.value.data;
265 const char **subclasses;
270 ret = ltdb_index_dn_simple(module, tree, index_list, list);
272 subclasses = ldb_subclass_list(module->ldb, target);
274 if (subclasses == NULL) {
278 for (i=0;subclasses[i];i++) {
279 struct ldb_parse_tree tree2;
280 struct dn_list *list2;
281 tree2.operation = LDB_OP_EQUALITY;
282 tree2.u.equality.attr = talloc_strdup(list, LTDB_OBJECTCLASS);
283 if (!tree2.u.equality.attr) {
286 tree2.u.equality.value.data = talloc_strdup(tree2.u.equality.attr, subclasses[i]);
287 if (tree2.u.equality.value.data == NULL) {
290 tree2.u.equality.value.length = strlen(subclasses[i]);
291 list2 = talloc(list, struct dn_list);
295 if (ltdb_index_dn_objectclass(module, &tree2,
296 index_list, list2) == 1) {
297 if (list->count == 0) {
301 list_union(ldb, list, list2);
305 talloc_free(tree2.u.equality.attr);
312 return a list of dn's that might match a leaf indexed search
314 static int ltdb_index_dn_leaf(struct ldb_module *module,
315 struct ldb_parse_tree *tree,
316 const struct ldb_message *index_list,
317 struct dn_list *list)
319 if (ldb_attr_cmp(tree->u.equality.attr, LTDB_OBJECTCLASS) == 0) {
320 return ltdb_index_dn_objectclass(module, tree, index_list, list);
322 if (ldb_attr_cmp(tree->u.equality.attr, "distinguishedName") == 0 ||
323 ldb_attr_cmp(tree->u.equality.attr, "dn") == 0) {
324 char *dn = talloc_strdup(list, (char *)tree->u.equality.value.data);
329 return ltdb_index_dn_simple(module, tree, index_list, list);
336 relies on the lists being sorted
338 static int list_intersect(struct ldb_context *ldb,
339 struct dn_list *list, const struct dn_list *list2)
341 struct dn_list *list3;
344 if (list->count == 0 || list2->count == 0) {
349 list3 = talloc(ldb, struct dn_list);
354 list3->dn = talloc_array(list3, char *, list->count);
361 for (i=0;i<list->count;i++) {
362 if (ldb_list_find(list->dn[i], list2->dn, list2->count,
363 sizeof(char *), (comparison_fn_t)strcmp) != -1) {
364 list3->dn[list3->count] = talloc_steal(list3->dn, list->dn[i]);
367 talloc_free(list->dn[i]);
371 talloc_free(list->dn);
372 list->dn = talloc_steal(list, list3->dn);
373 list->count = list3->count;
383 relies on the lists being sorted
385 static int list_union(struct ldb_context *ldb,
386 struct dn_list *list, const struct dn_list *list2)
390 unsigned int count = list->count;
392 if (list->count == 0 && list2->count == 0) {
397 d = talloc_realloc(list, list->dn, char *, list->count + list2->count);
403 for (i=0;i<list2->count;i++) {
404 if (ldb_list_find(list2->dn[i], list->dn, count,
405 sizeof(char *), (comparison_fn_t)strcmp) == -1) {
406 list->dn[list->count] = talloc_strdup(list->dn, list2->dn[i]);
407 if (!list->dn[list->count]) {
414 if (list->count != count) {
415 qsort(list->dn, list->count, sizeof(char *), (comparison_fn_t)list_cmp);
421 static int ltdb_index_dn(struct ldb_module *module,
422 struct ldb_parse_tree *tree,
423 const struct ldb_message *index_list,
424 struct dn_list *list);
430 static int ltdb_index_dn_or(struct ldb_module *module,
431 struct ldb_parse_tree *tree,
432 const struct ldb_message *index_list,
433 struct dn_list *list)
435 struct ldb_context *ldb = module->ldb;
443 for (i=0;i<tree->u.list.num_elements;i++) {
444 struct dn_list *list2;
447 list2 = talloc(module, struct dn_list);
452 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
465 talloc_free(list->dn);
472 list->dn = talloc_steal(list, list2->dn);
473 list->count = list2->count;
475 if (list_union(ldb, list, list2) == -1) {
484 if (list->count == 0) {
495 static int ltdb_index_dn_not(struct ldb_module *module,
496 struct ldb_parse_tree *tree,
497 const struct ldb_message *index_list,
498 struct dn_list *list)
500 /* the only way to do an indexed not would be if we could
501 negate the not via another not or if we knew the total
502 number of database elements so we could know that the
503 existing expression covered the whole database.
505 instead, we just give up, and rely on a full index scan
506 (unless an outer & manages to reduce the list)
512 AND two index results
514 static int ltdb_index_dn_and(struct ldb_module *module,
515 struct ldb_parse_tree *tree,
516 const struct ldb_message *index_list,
517 struct dn_list *list)
519 struct ldb_context *ldb = module->ldb;
527 for (i=0;i<tree->u.list.num_elements;i++) {
528 struct dn_list *list2;
531 list2 = talloc(module, struct dn_list);
536 v = ltdb_index_dn(module, tree->u.list.elements[i], index_list, list2);
540 talloc_free(list->dn);
552 talloc_free(list->dn);
553 list->dn = talloc_steal(list, list2->dn);
554 list->count = list2->count;
556 if (list_intersect(ldb, list, list2) == -1) {
564 if (list->count == 0) {
565 talloc_free(list->dn);
574 return a list of dn's that might match a indexed search or
575 -1 if an error. return 0 for no matches, or 1 for matches
577 static int ltdb_index_dn(struct ldb_module *module,
578 struct ldb_parse_tree *tree,
579 const struct ldb_message *index_list,
580 struct dn_list *list)
584 switch (tree->operation) {
586 ret = ltdb_index_dn_and(module, tree, index_list, list);
590 ret = ltdb_index_dn_or(module, tree, index_list, list);
594 ret = ltdb_index_dn_not(module, tree, index_list, list);
597 case LDB_OP_EQUALITY:
598 ret = ltdb_index_dn_leaf(module, tree, index_list, list);
601 case LDB_OP_SUBSTRING:
606 case LDB_OP_EXTENDED:
607 /* we can't index with fancy bitops yet */
616 filter a candidate dn_list from an indexed search into a set of results
617 extracting just the given attributes
619 static int ldb_index_filter(struct ldb_module *module, struct ldb_parse_tree *tree,
620 const struct ldb_dn *base,
621 enum ldb_scope scope,
622 const struct dn_list *dn_list,
623 const char * const attrs[], struct ldb_message ***res)
628 for (i = 0; i < dn_list->count; i++) {
629 struct ldb_message *msg;
633 msg = talloc(module, struct ldb_message);
638 dn = ldb_dn_explode(msg, dn_list->dn[i]);
644 ret = ltdb_search_dn1(module, dn, msg);
647 /* the record has disappeared? yes, this can happen */
653 /* an internal error */
659 if (ldb_match_msg(module->ldb, msg, tree, base, scope) == 1) {
660 ret = ltdb_add_attr_results(module, msg, attrs, &count, res);
672 search the database with a LDAP-like expression using indexes
673 returns -1 if an indexed search is not possible, in which
674 case the caller should call ltdb_search_full()
676 int ltdb_search_indexed(struct ldb_module *module,
677 const struct ldb_dn *base,
678 enum ldb_scope scope,
679 struct ldb_parse_tree *tree,
680 const char * const attrs[], struct ldb_message ***res)
682 struct ltdb_private *ltdb = module->private_data;
683 struct dn_list *dn_list;
686 if (ltdb->cache->indexlist->num_elements == 0 &&
687 scope != LDB_SCOPE_BASE) {
688 /* no index list? must do full search */
692 dn_list = talloc(module, struct dn_list);
693 if (dn_list == NULL) {
697 if (scope == LDB_SCOPE_BASE) {
698 /* with BASE searches only one DN can match */
699 char *dn = ldb_dn_linearize(dn_list, base);
707 ret = ltdb_index_dn(module, tree, ltdb->cache->indexlist, dn_list);
711 /* we've got a candidate list - now filter by the full tree
712 and extract the needed attributes */
713 ret = ldb_index_filter(module, tree, base, scope, dn_list,
717 talloc_free(dn_list);
723 add a index element where this is the first indexed DN for this value
725 static int ltdb_index_add1_new(struct ldb_context *ldb,
726 struct ldb_message *msg,
727 struct ldb_message_element *el,
730 struct ldb_message_element *el2;
732 /* add another entry */
733 el2 = talloc_realloc(msg, msg->elements,
734 struct ldb_message_element, msg->num_elements+1);
740 msg->elements[msg->num_elements].name = talloc_strdup(msg->elements, LTDB_IDX);
741 if (!msg->elements[msg->num_elements].name) {
744 msg->elements[msg->num_elements].num_values = 0;
745 msg->elements[msg->num_elements].values = talloc(msg->elements, struct ldb_val);
746 if (!msg->elements[msg->num_elements].values) {
749 msg->elements[msg->num_elements].values[0].length = strlen(dn);
750 msg->elements[msg->num_elements].values[0].data = dn;
751 msg->elements[msg->num_elements].num_values = 1;
759 add a index element where this is not the first indexed DN for this
762 static int ltdb_index_add1_add(struct ldb_context *ldb,
763 struct ldb_message *msg,
764 struct ldb_message_element *el,
771 /* for multi-valued attributes we can end up with repeats */
772 for (i=0;i<msg->elements[idx].num_values;i++) {
773 if (strcmp(dn, msg->elements[idx].values[i].data) == 0) {
778 v2 = talloc_realloc(msg->elements, msg->elements[idx].values,
780 msg->elements[idx].num_values+1);
784 msg->elements[idx].values = v2;
786 msg->elements[idx].values[msg->elements[idx].num_values].length = strlen(dn);
787 msg->elements[idx].values[msg->elements[idx].num_values].data = dn;
788 msg->elements[idx].num_values++;
794 add an index entry for one message element
796 static int ltdb_index_add1(struct ldb_module *module, char *dn,
797 struct ldb_message_element *el, int v_idx)
799 struct ldb_context *ldb = module->ldb;
800 struct ldb_message *msg;
801 struct ldb_dn *dn_key;
805 msg = talloc(module, struct ldb_message);
811 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
817 talloc_steal(msg, dn_key);
819 ret = ltdb_search_dn1(module, dn_key, msg);
827 msg->num_elements = 0;
828 msg->elements = NULL;
831 for (i=0;i<msg->num_elements;i++) {
832 if (strcmp(LTDB_IDX, msg->elements[i].name) == 0) {
837 if (i == msg->num_elements) {
838 ret = ltdb_index_add1_new(ldb, msg, el, dn);
840 ret = ltdb_index_add1_add(ldb, msg, el, i, dn);
844 ret = ltdb_store(module, msg, TDB_REPLACE);
852 static int ltdb_index_add0(struct ldb_module *module, char *dn,
853 struct ldb_message_element *elements, int num_el)
855 struct ltdb_private *ltdb = module->private_data;
863 if (ltdb->cache->indexlist->num_elements == 0) {
864 /* no indexed fields */
868 for (i = 0; i < num_el; i++) {
869 ret = ldb_msg_find_idx(ltdb->cache->indexlist, elements[i].name,
874 for (j = 0; j < elements[i].num_values; j++) {
875 ret = ltdb_index_add1(module, dn, &elements[i], j);
887 add the index entries for a new record
890 int ltdb_index_add(struct ldb_module *module, const struct ldb_message *msg)
892 struct ltdb_private *ltdb = module->private_data;
896 dn = ldb_dn_linearize(ltdb, msg->dn);
901 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
910 delete an index entry for one message element
912 int ltdb_index_del_value(struct ldb_module *module, const char *dn,
913 struct ldb_message_element *el, int v_idx)
915 struct ldb_context *ldb = module->ldb;
916 struct ldb_message *msg;
917 struct ldb_dn *dn_key;
925 dn_key = ldb_dn_key(ldb, el->name, &el->values[v_idx]);
930 msg = talloc(dn_key, struct ldb_message);
936 ret = ltdb_search_dn1(module, dn_key, msg);
943 /* it wasn't indexed. Did we have an earlier error? If we did then
949 i = ldb_msg_find_idx(msg, dn, &j, LTDB_IDX);
951 ldb_debug(ldb, LDB_DEBUG_ERROR,
952 "ERROR: dn %s not found in %s\n", dn,
953 ldb_dn_linearize(dn_key, dn_key));
954 /* it ain't there. hmmm */
959 if (j != msg->elements[i].num_values - 1) {
960 memmove(&msg->elements[i].values[j],
961 &msg->elements[i].values[j+1],
962 (msg->elements[i].num_values-(j+1)) *
963 sizeof(msg->elements[i].values[0]));
965 msg->elements[i].num_values--;
967 if (msg->elements[i].num_values == 0) {
968 ret = ltdb_delete_noindex(module, dn_key);
970 ret = ltdb_store(module, msg, TDB_REPLACE);
979 delete the index entries for a record
982 int ltdb_index_del(struct ldb_module *module, const struct ldb_message *msg)
984 struct ltdb_private *ltdb = module->private_data;
989 if (ldb_dn_is_special(msg->dn)) {
993 dn = ldb_dn_linearize(ltdb, msg->dn);
998 /* find the list of indexed fields */
999 if (ltdb->cache->indexlist->num_elements == 0) {
1000 /* no indexed fields */
1004 for (i = 0; i < msg->num_elements; i++) {
1005 ret = ldb_msg_find_idx(ltdb->cache->indexlist, msg->elements[i].name,
1006 NULL, LTDB_IDXATTR);
1010 for (j = 0; j < msg->elements[i].num_values; j++) {
1011 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
1025 traversal function that deletes all @INDEX records
1027 static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1029 const char *dn = "DN=" LTDB_INDEX ":";
1030 if (strncmp(key.dptr, dn, strlen(dn)) == 0) {
1031 return tdb_delete(tdb, key);
1037 traversal function that adds @INDEX records during a re index
1039 static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
1041 struct ldb_module *module = state;
1042 struct ldb_message *msg;
1047 if (strncmp(key.dptr, "DN=@", 4) == 0 ||
1048 strncmp(key.dptr, "DN=", 3) != 0) {
1052 msg = talloc(module, struct ldb_message);
1057 ret = ltdb_unpack_data(module, &data, msg);
1063 /* check if the DN key has changed, perhaps due to the
1064 case insensitivity of an element changing */
1065 key2 = ltdb_key(module, msg->dn);
1066 if (key2.dptr == NULL) {
1067 /* probably a corrupt record ... darn */
1068 ldb_debug(module->ldb, LDB_DEBUG_ERROR, "Invalid DN in re_index: %s\n",
1069 ldb_dn_linearize(msg, msg->dn));
1073 if (strcmp(key2.dptr, key.dptr) != 0) {
1074 tdb_delete(tdb, key);
1075 tdb_store(tdb, key2, data, 0);
1077 talloc_free(key2.dptr);
1079 if (msg->dn == NULL) {
1082 dn = ldb_dn_linearize(msg->dn, msg->dn);
1085 ret = ltdb_index_add0(module, dn, msg->elements, msg->num_elements);
1093 force a complete reindex of the database
1095 int ltdb_reindex(struct ldb_module *module)
1097 struct ltdb_private *ltdb = module->private_data;
1100 if (ltdb_cache_reload(module) != 0) {
1104 /* first traverse the database deleting any @INDEX records */
1105 ret = tdb_traverse(ltdb->tdb, delete_index, NULL);
1111 /* now traverse adding any indexes for normal LDB records */
1112 ret = tdb_traverse(ltdb->tdb, re_index, module);