ldb: Lock the whole backend database for the duration of a search
authorAndrew Bartlett <abartlet@samba.org>
Thu, 15 Jun 2017 01:56:46 +0000 (13:56 +1200)
committerStefan Metzmacher <metze@samba.org>
Sun, 2 Jul 2017 15:35:19 +0000 (17:35 +0200)
We must hold locks not just for the duration of each search, but for the whole search
as our module stack may make multiple search requests to build up the whole result.

This is explains a number of replication and read corruption issues in Samba

Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
lib/ldb/common/ldb.c
lib/ldb/tests/ldb_mod_op_test.c

index 700d89c65d8e3eb5f8c7a529b647b4dde18c857c..a4d9977d1b4f5fb6646bd412f7d44295ba60ca2f 100644 (file)
@@ -966,6 +966,146 @@ static int ldb_msg_check_element_flags(struct ldb_context *ldb,
        return LDB_SUCCESS;
 }
 
+/*
+ * This context allows us to make the unlock be a talloc destructor
+ *
+ * This ensures that a request started, but not waited on, will still
+ * unlock.
+ */
+struct ldb_db_lock_context {
+       struct ldb_request *req;
+       struct ldb_context *ldb;
+};
+
+/*
+ * We have to have a the unlock on a destructor so that we unlock the
+ * DB if a caller calls talloc_free(req).  We trust that the ldb
+ * context has not already gone away.
+ */
+static int ldb_db_lock_destructor(struct ldb_db_lock_context *lock_context)
+{
+       int ret;
+       struct ldb_module *next_module;
+       FIRST_OP_NOERR(lock_context->ldb, read_unlock);
+       if (next_module != NULL) {
+               ret = next_module->ops->read_unlock(next_module);
+       } else {
+               ret = LDB_SUCCESS;
+       }
+
+       if (ret != LDB_SUCCESS) {
+               ldb_debug(lock_context->ldb,
+                         LDB_DEBUG_FATAL,
+                         "Failed to unlock db: %s / %s",
+                         ldb_errstring(lock_context->ldb),
+                         ldb_strerror(ret));
+       }
+       return 0;
+}
+
+static int ldb_lock_backend_callback(struct ldb_request *req,
+                                    struct ldb_reply *ares)
+{
+       struct ldb_db_lock_context *lock_context;
+       int ret;
+
+       lock_context = talloc_get_type(req->context,
+                                      struct ldb_db_lock_context);
+
+       if (!ares) {
+               return ldb_module_done(lock_context->req, NULL, NULL,
+                                       LDB_ERR_OPERATIONS_ERROR);
+       }
+       if (ares->error != LDB_SUCCESS || ares->type == LDB_REPLY_DONE) {
+               ret = ldb_module_done(lock_context->req, ares->controls,
+                                     ares->response, ares->error);
+               /*
+                * If this is a LDB_REPLY_DONE or an error, unlock the
+                * DB by calling the destructor on this context
+                */
+               talloc_free(lock_context);
+               return ret;
+       }
+
+       /* Otherwise pass on the callback */
+       switch (ares->type) {
+       case LDB_REPLY_ENTRY:
+               return ldb_module_send_entry(lock_context->req, ares->message,
+                                            ares->controls);
+
+       case LDB_REPLY_REFERRAL:
+               return ldb_module_send_referral(lock_context->req,
+                                               ares->referral);
+       default:
+               /* Can't happen */
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+}
+
+/*
+ * Do an ldb_search() with a lock held, but release it if the request
+ * is freed with talloc_free()
+ */
+static int lock_search(struct ldb_module *lock_module, struct ldb_request *req)
+{
+       /* Used in FIRST_OP_NOERR to find where to send the lock request */
+       struct ldb_module *next_module = NULL;
+       struct ldb_request *down_req = NULL;
+       struct ldb_db_lock_context *lock_context;
+       struct ldb_context *ldb = ldb_module_get_ctx(lock_module);
+       int ret;
+
+       lock_context = talloc(req, struct ldb_db_lock_context);
+       if (lock_context == NULL) {
+               return ldb_oom(ldb);
+       }
+
+       lock_context->ldb = ldb;
+       lock_context->req = req;
+
+       ret = ldb_build_search_req_ex(&down_req, ldb, req,
+                                     req->op.search.base,
+                                     req->op.search.scope,
+                                     req->op.search.tree,
+                                     req->op.search.attrs,
+                                     req->controls,
+                                     lock_context,
+                                     ldb_lock_backend_callback,
+                                     req);
+       LDB_REQ_SET_LOCATION(down_req);
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       /* call DB lock */
+       FIRST_OP_NOERR(ldb, read_lock);
+       if (next_module != NULL) {
+               ret = next_module->ops->read_lock(next_module);
+       } else {
+               ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
+       }
+
+       if (ret == LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION) {
+               /* We might be talking LDAP */
+               ldb_reset_err_string(ldb);
+               ret = 0;
+               TALLOC_FREE(lock_context);
+
+               return ldb_next_request(lock_module, req);
+       } else if ((ret != LDB_SUCCESS) && (ldb->err_string == NULL)) {
+               /* if no error string was setup by the backend */
+               ldb_asprintf_errstring(ldb, "Failed to get DB lock: %s (%d)",
+                                      ldb_strerror(ret), ret);
+       } else {
+               talloc_set_destructor(lock_context, ldb_db_lock_destructor);
+       }
+
+       if (ret != LDB_SUCCESS) {
+               return ret;
+       }
+
+       return ldb_next_request(lock_module, down_req);
+}
 
 /*
   start an ldb request
@@ -991,15 +1131,32 @@ int ldb_request(struct ldb_context *ldb, struct ldb_request *req)
        /* call the first module in the chain */
        switch (req->operation) {
        case LDB_SEARCH:
+       {
+               /*
+                * A fake module to allow ldb_next_request() to be
+                * re-used and to keep the locking out of this function.
+                */
+               static const struct ldb_module_ops lock_module_ops = {
+                       .name = "lock_searches",
+                       .search = lock_search
+               };
+               struct ldb_module lock_module = {
+                       .ldb = ldb,
+                       .next = ldb->modules,
+                       .ops = &lock_module_ops
+               };
+               next_module = &lock_module;
+
                /* due to "ldb_build_search_req" base DN always != NULL */
                if (!ldb_dn_validate(req->op.search.base)) {
                        ldb_asprintf_errstring(ldb, "ldb_search: invalid basedn '%s'",
                                               ldb_dn_get_linearized(req->op.search.base));
                        return LDB_ERR_INVALID_DN_SYNTAX;
                }
-               FIRST_OP(ldb, search);
+
                ret = next_module->ops->search(next_module, req);
                break;
+       }
        case LDB_ADD:
                if (!ldb_dn_validate(req->op.add.message->dn)) {
                        ldb_asprintf_errstring(ldb, "ldb_add: invalid dn '%s'",
index 2840357a894c9417385123d3a71854650ac51da8..6357f83dd024a4f363782f9b24af0361494afce3 100644 (file)
@@ -1991,14 +1991,8 @@ static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req
        ldb_request_done(req, LDB_SUCCESS);
        assert_int_equal(ret, 0);
 
-       /*
-        * We got the result because of this
-        * tests wants to demonstrate.
-        *
-        * Once the bug is fixed, it should
-        * change to assert_int_equal(res_count, 0);
-        */
-       assert_int_equal(res_count, 1);
+       /* We should not have got the result */
+       assert_int_equal(res_count, 0);
 
        return ret;
 }