ldb_mdb: Wrap mdb_env_open
authorGary Lockyer <gary@catalyst.net.nz>
Tue, 6 Mar 2018 23:05:34 +0000 (12:05 +1300)
committerStefan Metzmacher <metze@samba.org>
Thu, 12 Apr 2018 14:27:17 +0000 (16:27 +0200)
Wrap mdb_env_open to ensure that we only have one MDB_env opened per
database in each process

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/ldb_mdb/ldb_mdb.c

index 422ed71219a7b89129cc527525963b6c513c0bab..1e6c99b3dd0d92eb5a030016ece211d725052fda 100644 (file)
@@ -646,73 +646,154 @@ static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
                trans_finished(lmdb, ltx);
                ltx = lmdb_private_trans_head(lmdb);
        }
-
-       mdb_env_close(lmdb->env);
        lmdb->env = NULL;
 
        return 0;
 }
 
-static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
-                                 struct ldb_context *ldb,
-                                 const char *path,
-                                 unsigned int flags,
-                                 struct lmdb_private *lmdb)
+struct mdb_env_wrap {
+       struct mdb_env_wrap *next, *prev;
+       dev_t device;
+       ino_t inode;
+       MDB_env *env;
+       int pid;
+};
+
+static struct mdb_env_wrap *mdb_list;
+
+/* destroy the last connection to an mdb */
+static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
 {
+       mdb_env_close(w->env);
+       DLIST_REMOVE(mdb_list, w);
+       return 0;
+}
+
+static int lmdb_open_env(TALLOC_CTX *mem_ctx,
+                        MDB_env **env,
+                        struct ldb_context *ldb,
+                        const char *path,
+                        unsigned int flags) {
        int ret;
-       unsigned int mdb_flags;
-       int lmdb_max_key_length;
+       unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
+       /*
+        * MDB_NOSUBDIR implies there is a separate file called path and a
+        * separate lockfile called path-lock
+        */
 
-       if (flags & LDB_FLG_DONT_CREATE_DB) {
-               struct stat st;
-               if (stat(path, &st) != 0) {
-                       return LDB_ERR_UNAVAILABLE;
+       struct mdb_env_wrap *w;
+       struct stat st;
+
+       if (stat(path, &st) == 0) {
+               for (w=mdb_list;w;w=w->next) {
+                       if (st.st_dev == w->device && st.st_ino == w->inode) {
+                               /*
+                                * We must have only one MDB_env per process
+                                */
+                               if (!talloc_reference(mem_ctx, w)) {
+                                       return ldb_oom(ldb);
+                               }
+                               *env = w->env;
+                               return LDB_SUCCESS;
+                       }
                }
        }
 
-       ret = mdb_env_create(&lmdb->env);
+       w = talloc(mem_ctx, struct mdb_env_wrap);
+       if (w == NULL) {
+               return ldb_oom(ldb);
+       }
+
+       ret = mdb_env_create(env);
        if (ret != 0) {
                ldb_asprintf_errstring(
                        ldb,
                        "Could not create MDB environment %s: %s\n",
                        path,
                        mdb_strerror(ret));
-               return LDB_ERR_OPERATIONS_ERROR;
+               return ldb_mdb_err_map(ret);
        }
 
-       /* Close when lmdb is released */
-       talloc_set_destructor(lmdb, lmdb_pvt_destructor);
-
-       ret = mdb_env_set_mapsize(lmdb->env, 16LL * GIGABYTE);
+       /*
+        * Currently we set a 16Gb maximum database size
+        */
+       ret = mdb_env_set_mapsize(*env, 16LL * GIGABYTE);
        if (ret != 0) {
                ldb_asprintf_errstring(
                        ldb,
                        "Could not open MDB environment %s: %s\n",
                        path,
                        mdb_strerror(ret));
+               TALLOC_FREE(w);
                return ldb_mdb_err_map(ret);
        }
 
-       mdb_env_set_maxreaders(lmdb->env, 100000);
-       /* MDB_NOSUBDIR implies there is a separate file called path and a
-        * separate lockfile called path-lock
+       mdb_env_set_maxreaders(*env, 100000);
+       /*
+        * As we ensure that there is only one MDB_env open per database per
+        * process. We can not use the MDB_RDONLY flag, as another ldb may be
+        * opened in read write mode
         */
-       mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
-       if (flags & LDB_FLG_RDONLY) {
-               mdb_flags |= MDB_RDONLY;
-       }
        if (flags & LDB_FLG_NOSYNC) {
                mdb_flags |= MDB_NOSYNC;
        }
-       ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
+       ret = mdb_env_open(*env, path, mdb_flags, 0644);
        if (ret != 0) {
                ldb_asprintf_errstring(ldb,
                                "Could not open DB %s: %s\n",
                                path, mdb_strerror(ret));
-               talloc_free(lmdb);
+               TALLOC_FREE(w);
                return ldb_mdb_err_map(ret);
        }
 
+       if (stat(path, &st) != 0) {
+               ldb_asprintf_errstring(
+                       ldb,
+                       "Could not stat %s:\n",
+                       path);
+               TALLOC_FREE(w);
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+       w->env = *env;
+       w->device = st.st_dev;
+       w->inode  = st.st_ino;
+
+       talloc_set_destructor(w, mdb_env_wrap_destructor);
+
+       DLIST_ADD(mdb_list, w);
+
+       return LDB_SUCCESS;
+
+}
+
+static int lmdb_pvt_open(struct lmdb_private *lmdb,
+                        struct ldb_context *ldb,
+                        const char *path,
+                        unsigned int flags)
+{
+       int ret;
+       int lmdb_max_key_length;
+
+       if (flags & LDB_FLG_DONT_CREATE_DB) {
+               struct stat st;
+               if (stat(path, &st) != 0) {
+                       return LDB_ERR_UNAVAILABLE;
+               }
+       }
+
+       ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
+       if (ret != 0) {
+               ldb_asprintf_errstring(
+                       ldb,
+                       "Could not create MDB environment %s: %s\n",
+                       path,
+                       mdb_strerror(ret));
+               return LDB_ERR_OPERATIONS_ERROR;
+       }
+
+       /* Close when lmdb is released */
+       talloc_set_destructor(lmdb, lmdb_pvt_destructor);
+
        /* Store the original pid during the LMDB open */
        lmdb->pid = getpid();
 
@@ -720,7 +801,6 @@ static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
 
        /* This will never happen, but if it does make sure to freak out */
        if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
-               talloc_free(lmdb);
                return ldb_operr(ldb);
        }
 
@@ -757,17 +837,17 @@ int lmdb_connect(struct ldb_context *ldb,
                 return LDB_ERR_OPERATIONS_ERROR;
         }
 
-       lmdb = talloc_zero(ldb, struct lmdb_private);
+       lmdb = talloc_zero(ltdb, struct lmdb_private);
        if (lmdb == NULL) {
                TALLOC_FREE(ltdb);
-                ldb_oom(ldb);
-                return LDB_ERR_OPERATIONS_ERROR;
+                return ldb_oom(ldb);
        }
        lmdb->ldb = ldb;
        ltdb->kv_ops = &lmdb_key_value_ops;
 
-       ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
+       ret = lmdb_pvt_open(lmdb, ldb, path, flags);
        if (ret != LDB_SUCCESS) {
+               TALLOC_FREE(ltdb);
                return ret;
        }