lib/ldb: switch ldb_tdb to schema-based attribute comparison
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
index 0d4be49123501d4de36d0a016130595d6aafc0a3..753d2b4c1dfd22d89e8d1981bd0bb5dd579bc322 100644 (file)
  */
 
 #include "ldb_tdb.h"
-#include <lib/tdb_compat/tdb_compat.h>
+#include "ldb_private.h"
+#include <tdb.h>
 
+/*
+  prevent memory errors on callbacks
+*/
+struct ltdb_req_spy {
+       struct ltdb_context *ctx;
+};
 
 /*
   map a tdb error code to a ldb error code
@@ -68,13 +75,9 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
        case TDB_ERR_IO:
                return LDB_ERR_PROTOCOL_ERROR;
        case TDB_ERR_LOCK:
-#ifndef BUILD_TDB2
        case TDB_ERR_NOLOCK:
-#endif
                return LDB_ERR_BUSY;
-#ifndef BUILD_TDB2
        case TDB_ERR_LOCK_TIMEOUT:
-#endif
                return LDB_ERR_TIME_LIMIT_EXCEEDED;
        case TDB_ERR_EXISTS:
                return LDB_ERR_ENTRY_ALREADY_EXISTS;
@@ -226,7 +229,13 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
        if (ldb_dn_is_special(dn) &&
            (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
-            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
+            ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
+       {
+               if (ltdb->warn_reindex) {
+                       ldb_debug(ldb_module_get_ctx(module),
+                               LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
+                               tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+               }
                ret = ltdb_reindex(module);
        }
 
@@ -262,7 +271,8 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
                return LDB_ERR_OTHER;
        }
 
-       ret = ltdb_pack_data(module, msg, &tdb_data);
+       ret = ldb_pack_data(ldb_module_get_ctx(module),
+                           msg, (struct ldb_val *)&tdb_data);
        if (ret == -1) {
                talloc_free(tdb_key.dptr);
                return LDB_ERR_OTHER;
@@ -311,11 +321,12 @@ static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
 }
 
 static int ltdb_add_internal(struct ldb_module *module,
-                            const struct ldb_message *msg)
+                            const struct ldb_message *msg,
+                            bool check_single_value)
 {
        struct ldb_context *ldb = ldb_module_get_ctx(module);
        int ret = LDB_SUCCESS;
-       unsigned int i;
+       unsigned int i, j;
 
        for (i=0;i<msg->num_elements;i++) {
                struct ldb_message_element *el = &msg->elements[i];
@@ -326,11 +337,29 @@ static int ltdb_add_internal(struct ldb_module *module,
                                               el->name, ldb_dn_get_linearized(msg->dn));
                        return LDB_ERR_CONSTRAINT_VIOLATION;
                }
-               if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
+               if (check_single_value &&
+                               el->num_values > 1 &&
+                               ldb_tdb_single_valued(a, el)) {
                        ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
                                               el->name, ldb_dn_get_linearized(msg->dn));
                        return LDB_ERR_CONSTRAINT_VIOLATION;
                }
+
+               /* TODO: This is O(n^2) - replace with more efficient check */
+               for (j=0; j<el->num_values; j++) {
+                       struct ldb_val *found_val;
+                       ret = ldb_msg_find_val_schema(ldb, a, el,
+                                                     &el->values[j], &found_val);
+                       if (ret != LDB_SUCCESS) {
+                               return ret;
+                       }
+                       if (found_val != &el->values[j]) {
+                               ldb_asprintf_errstring(ldb,
+                                                      "attribute '%s': value #%u on '%s' provided more than once",
+                                                      el->name, j, ldb_dn_get_linearized(msg->dn));
+                               return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
+                       }
+               }
        }
 
        ret = ltdb_store(module, msg, TDB_INSERT);
@@ -373,7 +402,7 @@ static int ltdb_add(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_add_internal(module, req->op.add.message);
+       ret = ltdb_add_internal(module, req->op.add.message, true);
 
        return ret;
 }
@@ -591,13 +620,9 @@ static int msg_delete_element(struct ldb_module *module,
 
        for (i=0;i<el->num_values;i++) {
                bool matched;
-               if (a->syntax->operator_fn) {
-                       ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
-                                                    &el->values[i], val, &matched);
-                       if (ret != LDB_SUCCESS) return ret;
-               } else {
-                       matched = (a->syntax->comparison_fn(ldb, ldb,
-                                                           &el->values[i], val) == 0);
+               ret = ldb_val_equal_schema(ldb, a, &el->values[i], val, &matched);
+               if (ret != LDB_SUCCESS) {
+                       return ret;
                }
                if (matched) {
                        if (el->num_values == 1) {
@@ -659,7 +684,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                return LDB_ERR_OTHER;
        }
 
-       tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
+       tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
        if (!tdb_data.dptr) {
                talloc_free(tdb_key.dptr);
                return ltdb_err_map(tdb_error(ltdb->tdb));
@@ -672,7 +697,7 @@ int ltdb_modify_internal(struct ldb_module *module,
                goto done;
        }
 
-       ret = ltdb_unpack_data(module, &tdb_data, msg2);
+       ret = ldb_unpack_data(ldb_module_get_ctx(module), (struct ldb_val *)&tdb_data, msg2);
        free(tdb_data.dptr);
        if (ret == -1) {
                ret = LDB_ERR_OTHER;
@@ -756,8 +781,15 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                                /* Check that values don't exist yet on multi-
                                   valued attributes or aren't provided twice */
+                               /* TODO: This is O(n^2) - replace with more efficient check */
                                for (j = 0; j < el->num_values; j++) {
-                                       if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
+                                       struct ldb_val *matched_val;
+                                       ret = ldb_msg_find_val_schema(ldb, a, el2,
+                                                                     &el->values[j], &matched_val);
+                                       if (ret != LDB_SUCCESS) {
+                                               return ret;
+                                       }
+                                       if (matched_val != NULL) {
                                                if (control_permissive) {
                                                        /* remove this one as if it was never added */
                                                        el->num_values--;
@@ -775,7 +807,12 @@ int ltdb_modify_internal(struct ldb_module *module,
                                                ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
                                                goto done;
                                        }
-                                       if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
+                                       ret = ldb_msg_find_val_schema(ldb, a, el,
+                                                                     &el->values[j], &matched_val);
+                                       if (ret != LDB_SUCCESS) {
+                                               return ret;
+                                       }
+                                       if (matched_val != &el->values[j]) {
                                                ldb_asprintf_errstring(ldb,
                                                                       "attribute '%s': value #%u on '%s' provided more than once",
                                                                       el->name, j, ldb_dn_get_linearized(msg2->dn));
@@ -822,7 +859,13 @@ int ltdb_modify_internal(struct ldb_module *module,
 
                        /* TODO: This is O(n^2) - replace with more efficient check */
                        for (j=0; j<el->num_values; j++) {
-                               if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
+                               struct ldb_val *matched_val;
+                               ret = ldb_msg_find_val_schema(ldb, a, el,
+                                                             &el->values[j], &matched_val);
+                               if (ret != LDB_SUCCESS) {
+                                       return ret;
+                               }
+                               if (matched_val != &el->values[j]) {
                                        ldb_asprintf_errstring(ldb,
                                                               "attribute '%s': value #%u on '%s' provided more than once",
                                                               el->name, j, ldb_dn_get_linearized(msg2->dn));
@@ -836,11 +879,18 @@ int ltdb_modify_internal(struct ldb_module *module,
                        if (idx != -1) {
                                j = (unsigned int) idx;
                                el2 = &(msg2->elements[j]);
-                               if (ldb_msg_element_compare(el, el2) == 0) {
-                                       /* we are replacing with the same values */
+
+                               /* we consider two elements to be
+                                * equal only if the order
+                                * matches. This allows dbcheck to
+                                * fix the ordering on attributes
+                                * where order matters, such as
+                                * objectClass
+                                */
+                               if (ldb_msg_element_equal_ordered(el, el2)) {
                                        continue;
                                }
-                       
+
                                /* Delete the attribute if it exists in the DB */
                                if (msg_delete_attribute(module, ldb, msg2,
                                                         el->name) != 0) {
@@ -961,9 +1011,12 @@ static int ltdb_modify(struct ltdb_context *ctx)
 static int ltdb_rename(struct ltdb_context *ctx)
 {
        struct ldb_module *module = ctx->module;
+       void *data = ldb_module_get_private(module);
+       struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
        struct ldb_request *req = ctx->req;
        struct ldb_message *msg;
        int ret = LDB_SUCCESS;
+       TDB_DATA tdb_key, tdb_key_old;
 
        ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
@@ -976,29 +1029,68 @@ static int ltdb_rename(struct ltdb_context *ctx)
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       /* in case any attribute of the message was indexed, we need
-          to fetch the old record */
+       /* we need to fetch the old record to re-add under the new name */
        ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
        if (ret != LDB_SUCCESS) {
                /* not finding the old record is an error */
                return ret;
        }
 
+       /* We need to, before changing the DB, check if the new DN
+        * exists, so we can return this error to the caller with an
+        * unmodified DB */
+       tdb_key = ltdb_key(module, req->op.rename.newdn);
+       if (!tdb_key.dptr) {
+               talloc_free(msg);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       tdb_key_old = ltdb_key(module, req->op.rename.olddn);
+       if (!tdb_key_old.dptr) {
+               talloc_free(msg);
+               talloc_free(tdb_key.dptr);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
+       if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
+               if (tdb_exists(ltdb->tdb, tdb_key)) {
+                       talloc_free(tdb_key_old.dptr);
+                       talloc_free(tdb_key.dptr);
+                       ldb_asprintf_errstring(ldb_module_get_ctx(module),
+                                              "Entry %s already exists",
+                                              ldb_dn_get_linearized(msg->dn));
+                       /* finding the new record already in the DB is an error */
+                       talloc_free(msg);
+                       return LDB_ERR_ENTRY_ALREADY_EXISTS;
+               }
+       }
+       talloc_free(tdb_key_old.dptr);
+       talloc_free(tdb_key.dptr);
+
        /* Always delete first then add, to avoid conflicts with
         * unique indexes. We rely on the transaction to make this
         * atomic
         */
        ret = ltdb_delete_internal(module, msg->dn);
        if (ret != LDB_SUCCESS) {
+               talloc_free(msg);
                return ret;
        }
 
        msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
        if (msg->dn == NULL) {
+               talloc_free(msg);
                return LDB_ERR_OPERATIONS_ERROR;
        }
 
-       ret = ltdb_add_internal(module, msg);
+       /* We don't check single value as we can have more than 1 with
+        * deleted attributes. We could go through all elements but that's
+        * maybe not the most efficient way
+        */
+       ret = ltdb_add_internal(module, msg, false);
+
+       talloc_free(msg);
 
        return ret;
 }
@@ -1214,9 +1306,10 @@ static void ltdb_timeout(struct tevent_context *ev,
                ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
        }
 
-       if (!ctx->request_terminated) {
+       if (ctx->spy) {
                /* neutralize the spy */
                ctx->spy->ctx = NULL;
+               ctx->spy = NULL;
        }
        talloc_free(ctx);
 }
@@ -1311,9 +1404,10 @@ static void ltdb_callback(struct tevent_context *ev,
        }
 
 done:
-       if (!ctx->request_terminated) {
+       if (ctx->spy) {
                /* neutralize the spy */
                ctx->spy->ctx = NULL;
+               ctx->spy = NULL;
        }
        talloc_free(ctx);
 }
@@ -1323,7 +1417,9 @@ static int ltdb_request_destructor(void *ptr)
        struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
 
        if (spy->ctx != NULL) {
+               spy->ctx->spy = NULL;
                spy->ctx->request_terminated = true;
+               spy->ctx = NULL;
        }
 
        return 0;
@@ -1401,14 +1497,8 @@ static int ltdb_handle_request(struct ldb_module *module,
 
 static int ltdb_init_rootdse(struct ldb_module *module)
 {
-       struct ldb_context *ldb;
-       int ret;
-
-       ldb = ldb_module_get_ctx(module);
-
-       ret = ldb_mod_register_control(module,
-                                      LDB_CONTROL_PERMISSIVE_MODIFY_OID);
        /* ignore errors on this - we expect it for non-sam databases */
+       ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
 
        /* there can be no module beyond the backend, just return */
        return LDB_SUCCESS;
@@ -1482,6 +1572,8 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                                   tdb_flags, open_flags,
                                   ldb_get_create_perms(ldb), ldb);
        if (!ltdb->tdb) {
+               ldb_asprintf_errstring(ldb,
+                                      "Unable to open tdb '%s'", path);
                ldb_debug(ldb, LDB_DEBUG_ERROR,
                          "Unable to open tdb '%s'", path);
                talloc_free(ltdb);
@@ -1492,10 +1584,15 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
                ltdb->warn_unindexed = true;
        }
 
+       if (getenv("LDB_WARN_REINDEX")) {
+               ltdb->warn_reindex = true;
+       }
+
        ltdb->sequence_number = 0;
 
        module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
        if (!module) {
+               ldb_oom(ldb);
                talloc_free(ltdb);
                return LDB_ERR_OPERATIONS_ERROR;
        }
@@ -1503,8 +1600,9 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
        talloc_steal(module, ltdb);
 
        if (ltdb_cache_load(module) != 0) {
+               ldb_asprintf_errstring(ldb,
+                                      "Unable to load ltdb cache records of tdb '%s'", path);
                talloc_free(module);
-               talloc_free(ltdb);
                return LDB_ERR_OPERATIONS_ERROR;
        }