trans_finished(lmdb, ltx);
ltx = lmdb_private_trans_head(lmdb);
}
-
- mdb_env_close(lmdb->env);
lmdb->env = NULL;
return 0;
}
-static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
- struct ldb_context *ldb,
- const char *path,
- unsigned int flags,
- struct lmdb_private *lmdb)
+struct mdb_env_wrap {
+ struct mdb_env_wrap *next, *prev;
+ dev_t device;
+ ino_t inode;
+ MDB_env *env;
+ int pid;
+};
+
+static struct mdb_env_wrap *mdb_list;
+
+/* destroy the last connection to an mdb */
+static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
{
+ mdb_env_close(w->env);
+ DLIST_REMOVE(mdb_list, w);
+ return 0;
+}
+
+static int lmdb_open_env(TALLOC_CTX *mem_ctx,
+ MDB_env **env,
+ struct ldb_context *ldb,
+ const char *path,
+ unsigned int flags) {
int ret;
- unsigned int mdb_flags;
- int lmdb_max_key_length;
+ unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
+ /*
+ * MDB_NOSUBDIR implies there is a separate file called path and a
+ * separate lockfile called path-lock
+ */
- if (flags & LDB_FLG_DONT_CREATE_DB) {
- struct stat st;
- if (stat(path, &st) != 0) {
- return LDB_ERR_UNAVAILABLE;
+ struct mdb_env_wrap *w;
+ struct stat st;
+
+ if (stat(path, &st) == 0) {
+ for (w=mdb_list;w;w=w->next) {
+ if (st.st_dev == w->device && st.st_ino == w->inode) {
+ /*
+ * We must have only one MDB_env per process
+ */
+ if (!talloc_reference(mem_ctx, w)) {
+ return ldb_oom(ldb);
+ }
+ *env = w->env;
+ return LDB_SUCCESS;
+ }
}
}
- ret = mdb_env_create(&lmdb->env);
+ w = talloc(mem_ctx, struct mdb_env_wrap);
+ if (w == NULL) {
+ return ldb_oom(ldb);
+ }
+
+ ret = mdb_env_create(env);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not create MDB environment %s: %s\n",
path,
mdb_strerror(ret));
- return LDB_ERR_OPERATIONS_ERROR;
+ return ldb_mdb_err_map(ret);
}
- /* Close when lmdb is released */
- talloc_set_destructor(lmdb, lmdb_pvt_destructor);
-
- ret = mdb_env_set_mapsize(lmdb->env, 16LL * GIGABYTE);
+ /*
+ * Currently we set a 16Gb maximum database size
+ */
+ ret = mdb_env_set_mapsize(*env, 16LL * GIGABYTE);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not open MDB environment %s: %s\n",
path,
mdb_strerror(ret));
+ TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
- mdb_env_set_maxreaders(lmdb->env, 100000);
- /* MDB_NOSUBDIR implies there is a separate file called path and a
- * separate lockfile called path-lock
+ mdb_env_set_maxreaders(*env, 100000);
+ /*
+ * As we ensure that there is only one MDB_env open per database per
+ * process. We can not use the MDB_RDONLY flag, as another ldb may be
+ * opened in read write mode
*/
- mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
- if (flags & LDB_FLG_RDONLY) {
- mdb_flags |= MDB_RDONLY;
- }
if (flags & LDB_FLG_NOSYNC) {
mdb_flags |= MDB_NOSYNC;
}
- ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
+ ret = mdb_env_open(*env, path, mdb_flags, 0644);
if (ret != 0) {
ldb_asprintf_errstring(ldb,
"Could not open DB %s: %s\n",
path, mdb_strerror(ret));
- talloc_free(lmdb);
+ TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
+ if (stat(path, &st) != 0) {
+ ldb_asprintf_errstring(
+ ldb,
+ "Could not stat %s:\n",
+ path);
+ TALLOC_FREE(w);
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+ w->env = *env;
+ w->device = st.st_dev;
+ w->inode = st.st_ino;
+
+ talloc_set_destructor(w, mdb_env_wrap_destructor);
+
+ DLIST_ADD(mdb_list, w);
+
+ return LDB_SUCCESS;
+
+}
+
+static int lmdb_pvt_open(struct lmdb_private *lmdb,
+ struct ldb_context *ldb,
+ const char *path,
+ unsigned int flags)
+{
+ int ret;
+ int lmdb_max_key_length;
+
+ if (flags & LDB_FLG_DONT_CREATE_DB) {
+ struct stat st;
+ if (stat(path, &st) != 0) {
+ return LDB_ERR_UNAVAILABLE;
+ }
+ }
+
+ ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
+ if (ret != 0) {
+ ldb_asprintf_errstring(
+ ldb,
+ "Could not create MDB environment %s: %s\n",
+ path,
+ mdb_strerror(ret));
+ return LDB_ERR_OPERATIONS_ERROR;
+ }
+
+ /* Close when lmdb is released */
+ talloc_set_destructor(lmdb, lmdb_pvt_destructor);
+
/* Store the original pid during the LMDB open */
lmdb->pid = getpid();
/* This will never happen, but if it does make sure to freak out */
if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
- talloc_free(lmdb);
return ldb_operr(ldb);
}
return LDB_ERR_OPERATIONS_ERROR;
}
- lmdb = talloc_zero(ldb, struct lmdb_private);
+ lmdb = talloc_zero(ltdb, struct lmdb_private);
if (lmdb == NULL) {
TALLOC_FREE(ltdb);
- ldb_oom(ldb);
- return LDB_ERR_OPERATIONS_ERROR;
+ return ldb_oom(ldb);
}
lmdb->ldb = ldb;
ltdb->kv_ops = &lmdb_key_value_ops;
- ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
+ ret = lmdb_pvt_open(lmdb, ldb, path, flags);
if (ret != LDB_SUCCESS) {
+ TALLOC_FREE(ltdb);
return ret;
}