2 ldb database library using mdb back end
4 Copyright (C) Jakub Hrozek 2014
5 Copyright (C) Catalyst.Net Ltd 2017
7 ** NOTE! The following LGPL license applies to the ldb
8 ** library. This does NOT imply that all of Samba is released
11 This library is free software; you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public
13 License as published by the Free Software Foundation; either
14 version 3 of the License, or (at your option) any later version.
16 This library is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 Lesser General Public License for more details.
21 You should have received a copy of the GNU Lesser General Public
22 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "../ldb_tdb/ldb_tdb.h"
27 #include "include/dlinklist.h"
29 #define MDB_URL_PREFIX "mdb://"
30 #define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1)
32 #define LDB_MDB_MAX_KEY_LENGTH 511
34 #define MEGABYTE (1024*1024)
35 #define GIGABYTE (1024*1024*1024)
37 int ldb_mdb_err_map(int lmdb_err)
43 return LDB_ERR_OPERATIONS_ERROR;
44 case MDB_INCOMPATIBLE:
47 return LDB_ERR_UNAVAILABLE;
55 return LDB_ERR_PROTOCOL_ERROR;
58 case MDB_READERS_FULL:
64 return LDB_ERR_ENTRY_ALREADY_EXISTS;
67 return LDB_ERR_NO_SUCH_OBJECT;
69 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
76 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
77 static int lmdb_error_at(struct ldb_context *ldb,
82 int ldb_err = ldb_mdb_err_map(ecode);
83 char *reason = mdb_strerror(ecode);
84 ldb_asprintf_errstring(ldb,
94 static bool lmdb_transaction_active(struct ltdb_private *ltdb)
96 return ltdb->lmdb_private->txlist != NULL;
99 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
108 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
111 talloc_steal(lmdb->txlist, ltx);
114 DLIST_ADD(lmdb->txlist, ltx);
117 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
119 DLIST_REMOVE(lmdb->txlist, ltx);
124 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
126 struct lmdb_trans *ltx;
132 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
135 if (lmdb->read_txn != NULL) {
136 return lmdb->read_txn;
139 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
142 ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
145 ldb_asprintf_errstring(lmdb->ldb,
146 "%s failed: %s\n", __FUNCTION__,
149 lmdb->read_txn = txn;
154 static int lmdb_store(struct ltdb_private *ltdb,
156 struct ldb_val data, int flags)
158 struct lmdb_private *lmdb = ltdb->lmdb_private;
165 if (ltdb->read_only) {
166 return LDB_ERR_UNWILLING_TO_PERFORM;
169 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
171 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
172 lmdb->error = MDB_PANIC;
173 return ldb_mdb_error(lmdb->ldb, lmdb->error);
176 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
177 if (lmdb->error != MDB_SUCCESS) {
178 return ldb_mdb_error(lmdb->ldb, lmdb->error);
181 mdb_key.mv_size = key.length;
182 mdb_key.mv_data = key.data;
184 mdb_data.mv_size = data.length;
185 mdb_data.mv_data = data.data;
187 if (flags == TDB_INSERT) {
188 mdb_flags = MDB_NOOVERWRITE;
189 } else if ((flags == TDB_MODIFY)) {
191 * Modifying a record, ensure that it exists.
192 * This mimics the TDB semantics
195 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
196 if (lmdb->error != MDB_SUCCESS) {
197 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
198 mdb_txn_commit(lmdb->read_txn);
199 lmdb->read_txn = NULL;
201 return ldb_mdb_error(lmdb->ldb, lmdb->error);
208 lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
209 if (lmdb->error != MDB_SUCCESS) {
210 return ldb_mdb_error(lmdb->ldb, lmdb->error);
213 return ldb_mdb_err_map(lmdb->error);
217 static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
219 struct lmdb_private *lmdb = ltdb->lmdb_private;
224 if (ltdb->read_only) {
225 return LDB_ERR_UNWILLING_TO_PERFORM;
228 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
230 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
231 lmdb->error = MDB_PANIC;
232 return ldb_mdb_error(lmdb->ldb, lmdb->error);
235 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
236 if (lmdb->error != MDB_SUCCESS) {
237 return ldb_mdb_error(lmdb->ldb, lmdb->error);
240 mdb_key.mv_size = key.length;
241 mdb_key.mv_data = key.data;
243 lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
244 if (lmdb->error != MDB_SUCCESS) {
245 return ldb_mdb_error(lmdb->ldb, lmdb->error);
247 return ldb_mdb_err_map(lmdb->error);
250 static int lmdb_traverse_fn(struct ltdb_private *ltdb,
251 ldb_kv_traverse_fn fn,
254 struct lmdb_private *lmdb = ltdb->lmdb_private;
259 MDB_cursor *cursor = NULL;
262 txn = get_current_txn(lmdb);
264 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
265 lmdb->error = MDB_PANIC;
266 return ldb_mdb_error(lmdb->ldb, lmdb->error);
269 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
270 if (lmdb->error != MDB_SUCCESS) {
271 return ldb_mdb_error(lmdb->ldb, lmdb->error);
274 lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
275 if (lmdb->error != MDB_SUCCESS) {
279 while ((lmdb->error = mdb_cursor_get(
281 &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
283 struct ldb_val key = {
284 .length = mdb_key.mv_size,
285 .data = mdb_key.mv_data,
287 struct ldb_val data = {
288 .length = mdb_data.mv_size,
289 .data = mdb_data.mv_data,
292 ret = fn(ltdb, key, data, ctx);
297 if (lmdb->error == MDB_NOTFOUND) {
298 lmdb->error = MDB_SUCCESS;
301 if (cursor != NULL) {
302 mdb_cursor_close(cursor);
305 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
306 mdb_txn_commit(lmdb->read_txn);
307 lmdb->read_txn = NULL;
310 if (lmdb->error != MDB_SUCCESS) {
311 return ldb_mdb_error(lmdb->ldb, lmdb->error);
313 return ldb_mdb_err_map(lmdb->error);
316 static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
322 struct lmdb_private *lmdb = ltdb->lmdb_private;
324 int ret = LDB_SUCCESS;
327 * Need to take a copy of the data as the delete operation alters the
328 * data, as it is in private lmdb memory.
330 copy.length = data.length;
331 copy.data = talloc_memdup(ltdb, data.data, data.length);
332 if (copy.data == NULL) {
333 lmdb->error = MDB_PANIC;
334 return ldb_oom(lmdb->ldb);
337 lmdb->error = lmdb_delete(ltdb, key);
338 if (lmdb->error != MDB_SUCCESS) {
342 "Failed to delete %*.*s "
343 "for rekey as %*.*s: %s",
344 (int)key.length, (int)key.length,
345 (const char *)key.data,
346 (int)key2.length, (int)key2.length,
347 (const char *)key.data,
348 mdb_strerror(lmdb->error));
349 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
352 lmdb->error = lmdb_store(ltdb, key2, copy, 0);
353 if (lmdb->error != MDB_SUCCESS) {
357 "Failed to rekey %*.*s as %*.*s: %s",
358 (int)key.length, (int)key.length,
359 (const char *)key.data,
360 (int)key2.length, (int)key2.length,
361 (const char *)key.data,
362 mdb_strerror(lmdb->error));
363 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
368 if (copy.data != NULL) {
369 TALLOC_FREE(copy.data);
374 * Explicity invalidate the data, as the delete has done this
381 /* Handles only a single record */
382 static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
383 int (*parser)(struct ldb_val key, struct ldb_val data,
387 struct lmdb_private *lmdb = ltdb->lmdb_private;
394 txn = get_current_txn(lmdb);
396 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
397 lmdb->error = MDB_PANIC;
398 return ldb_mdb_error(lmdb->ldb, lmdb->error);
401 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
402 if (lmdb->error != MDB_SUCCESS) {
403 return ldb_mdb_error(lmdb->ldb, lmdb->error);
406 mdb_key.mv_size = key.length;
407 mdb_key.mv_data = key.data;
409 lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
410 if (lmdb->error != MDB_SUCCESS) {
411 /* TODO closing a handle should not even be necessary */
412 mdb_dbi_close(lmdb->env, dbi);
413 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
414 mdb_txn_commit(lmdb->read_txn);
415 lmdb->read_txn = NULL;
417 if (lmdb->error == MDB_NOTFOUND) {
418 return LDB_ERR_NO_SUCH_OBJECT;
420 return ldb_mdb_error(lmdb->ldb, lmdb->error);
422 data.data = mdb_data.mv_data;
423 data.length = mdb_data.mv_size;
425 /* TODO closing a handle should not even be necessary */
426 mdb_dbi_close(lmdb->env, dbi);
428 /* We created a read transaction, commit it */
429 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
430 mdb_txn_commit(lmdb->read_txn);
431 lmdb->read_txn = NULL;
433 return parser(key, data, ctx);
437 static int lmdb_lock_read(struct ldb_module *module)
439 void *data = ldb_module_get_private(module);
440 struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
441 struct lmdb_private *lmdb = ltdb->lmdb_private;
443 lmdb->error = MDB_SUCCESS;
444 if (ltdb->in_transaction == 0 &&
445 ltdb->read_lock_count == 0) {
446 lmdb->error = mdb_txn_begin(lmdb->env,
451 if (lmdb->error != MDB_SUCCESS) {
452 return ldb_mdb_error(lmdb->ldb, lmdb->error);
455 ltdb->read_lock_count++;
456 return ldb_mdb_err_map(lmdb->error);
459 static int lmdb_unlock_read(struct ldb_module *module)
461 void *data = ldb_module_get_private(module);
462 struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
464 if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
465 struct lmdb_private *lmdb = ltdb->lmdb_private;
466 mdb_txn_commit(lmdb->read_txn);
467 lmdb->read_txn = NULL;
468 ltdb->read_lock_count--;
471 ltdb->read_lock_count--;
475 static int lmdb_transaction_start(struct ltdb_private *ltdb)
477 struct lmdb_private *lmdb = ltdb->lmdb_private;
478 struct lmdb_trans *ltx;
479 struct lmdb_trans *ltx_head;
482 /* Do not take out the transaction lock on a read-only DB */
483 if (ltdb->read_only) {
484 return LDB_ERR_UNWILLING_TO_PERFORM;
487 ltx = talloc_zero(lmdb, struct lmdb_trans);
489 return ldb_oom(lmdb->ldb);
492 ltx_head = lmdb_private_trans_head(lmdb);
494 tx_parent = lmdb_trans_get_tx(ltx_head);
496 lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, <x->tx);
497 if (lmdb->error != MDB_SUCCESS) {
498 return ldb_mdb_error(lmdb->ldb, lmdb->error);
501 trans_push(lmdb, ltx);
503 return ldb_mdb_err_map(lmdb->error);
506 static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
508 struct lmdb_trans *ltx;
509 struct lmdb_private *lmdb = ltdb->lmdb_private;
511 ltx = lmdb_private_trans_head(lmdb);
513 return LDB_ERR_OPERATIONS_ERROR;
516 mdb_txn_abort(ltx->tx);
517 trans_finished(lmdb, ltx);
521 static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
523 /* No need to prepare a commit */
527 static int lmdb_transaction_commit(struct ltdb_private *ltdb)
529 struct lmdb_trans *ltx;
530 struct lmdb_private *lmdb = ltdb->lmdb_private;
532 ltx = lmdb_private_trans_head(lmdb);
534 return LDB_ERR_OPERATIONS_ERROR;
537 lmdb->error = mdb_txn_commit(ltx->tx);
538 trans_finished(lmdb, ltx);
543 static int lmdb_error(struct ltdb_private *ltdb)
545 return ldb_mdb_err_map(ltdb->lmdb_private->error);
548 static const char *lmdb_errorstr(struct ltdb_private *ltdb)
550 return mdb_strerror(ltdb->lmdb_private->error);
553 static const char * lmdb_name(struct ltdb_private *ltdb)
558 static bool lmdb_changed(struct ltdb_private *ltdb)
561 * lmdb does no provide a quick way to determine if the database
562 * has changed. This function always returns true.
564 * Note that tdb uses a sequence number that allows this function
565 * to be implemented efficiently.
570 static struct kv_db_ops lmdb_key_value_ops = {
572 .delete = lmdb_delete,
573 .iterate = lmdb_traverse_fn,
574 .update_in_iterate = lmdb_update_in_iterate,
575 .fetch_and_parse = lmdb_parse_record,
576 .lock_read = lmdb_lock_read,
577 .unlock_read = lmdb_unlock_read,
578 .begin_write = lmdb_transaction_start,
579 .prepare_write = lmdb_transaction_prepare_commit,
580 .finish_write = lmdb_transaction_commit,
581 .abort_write = lmdb_transaction_cancel,
583 .errorstr = lmdb_errorstr,
585 .has_changed = lmdb_changed,
586 .transaction_active = lmdb_transaction_active,
589 static const char *lmdb_get_path(const char *url)
594 if (strchr(url, ':')) {
595 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
598 path = url + MDB_URL_PREFIX_SIZE;
606 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
608 struct lmdb_trans *ltx = NULL;
610 /* Check if this is a forked child */
611 if (getpid() != lmdb->pid) {
614 * We cannot call mdb_env_close or commit any transactions,
615 * otherwise they might appear finished in the parent.
619 if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
623 /* Remove the pointer, so that no access should occur */
630 * Close the read transaction if it's open
632 if (lmdb->read_txn != NULL) {
633 mdb_txn_abort(lmdb->read_txn);
636 if (lmdb->env == NULL) {
641 * Abort any currently active transactions
643 ltx = lmdb_private_trans_head(lmdb);
644 while (ltx != NULL) {
645 mdb_txn_abort(ltx->tx);
646 trans_finished(lmdb, ltx);
647 ltx = lmdb_private_trans_head(lmdb);
654 struct mdb_env_wrap {
655 struct mdb_env_wrap *next, *prev;
662 static struct mdb_env_wrap *mdb_list;
664 /* destroy the last connection to an mdb */
665 static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
667 mdb_env_close(w->env);
668 DLIST_REMOVE(mdb_list, w);
672 static int lmdb_open_env(TALLOC_CTX *mem_ctx,
674 struct ldb_context *ldb,
676 unsigned int flags) {
678 unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
680 * MDB_NOSUBDIR implies there is a separate file called path and a
681 * separate lockfile called path-lock
684 struct mdb_env_wrap *w;
687 if (stat(path, &st) == 0) {
688 for (w=mdb_list;w;w=w->next) {
689 if (st.st_dev == w->device && st.st_ino == w->inode) {
691 * We must have only one MDB_env per process
693 if (!talloc_reference(mem_ctx, w)) {
702 w = talloc(mem_ctx, struct mdb_env_wrap);
707 ret = mdb_env_create(env);
709 ldb_asprintf_errstring(
711 "Could not create MDB environment %s: %s\n",
714 return ldb_mdb_err_map(ret);
718 * Currently we set a 16Gb maximum database size
720 ret = mdb_env_set_mapsize(*env, 16LL * GIGABYTE);
722 ldb_asprintf_errstring(
724 "Could not open MDB environment %s: %s\n",
728 return ldb_mdb_err_map(ret);
731 mdb_env_set_maxreaders(*env, 100000);
733 * As we ensure that there is only one MDB_env open per database per
734 * process. We can not use the MDB_RDONLY flag, as another ldb may be
735 * opened in read write mode
737 if (flags & LDB_FLG_NOSYNC) {
738 mdb_flags |= MDB_NOSYNC;
740 ret = mdb_env_open(*env, path, mdb_flags, 0644);
742 ldb_asprintf_errstring(ldb,
743 "Could not open DB %s: %s\n",
744 path, mdb_strerror(ret));
746 return ldb_mdb_err_map(ret);
749 if (stat(path, &st) != 0) {
750 ldb_asprintf_errstring(
752 "Could not stat %s:\n",
755 return LDB_ERR_OPERATIONS_ERROR;
758 w->device = st.st_dev;
759 w->inode = st.st_ino;
761 talloc_set_destructor(w, mdb_env_wrap_destructor);
763 DLIST_ADD(mdb_list, w);
769 static int lmdb_pvt_open(struct lmdb_private *lmdb,
770 struct ldb_context *ldb,
775 int lmdb_max_key_length;
777 if (flags & LDB_FLG_DONT_CREATE_DB) {
779 if (stat(path, &st) != 0) {
780 return LDB_ERR_UNAVAILABLE;
784 ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
786 ldb_asprintf_errstring(
788 "Could not create MDB environment %s: %s\n",
791 return LDB_ERR_OPERATIONS_ERROR;
794 /* Close when lmdb is released */
795 talloc_set_destructor(lmdb, lmdb_pvt_destructor);
797 /* Store the original pid during the LMDB open */
798 lmdb->pid = getpid();
800 lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env);
802 /* This will never happen, but if it does make sure to freak out */
803 if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
804 return ldb_operr(ldb);
811 int lmdb_connect(struct ldb_context *ldb,
814 const char *options[],
815 struct ldb_module **_module)
817 const char *path = NULL;
818 struct lmdb_private *lmdb = NULL;
819 struct ltdb_private *ltdb = NULL;
823 * We hold locks, so we must use a private event context
824 * on each returned handle
826 ldb_set_require_private_event_context(ldb);
828 path = lmdb_get_path(url);
830 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
831 return LDB_ERR_OPERATIONS_ERROR;
834 ltdb = talloc_zero(ldb, struct ltdb_private);
837 return LDB_ERR_OPERATIONS_ERROR;
840 lmdb = talloc_zero(ltdb, struct lmdb_private);
846 ltdb->kv_ops = &lmdb_key_value_ops;
848 ret = lmdb_pvt_open(lmdb, ldb, path, flags);
849 if (ret != LDB_SUCCESS) {
854 ltdb->lmdb_private = lmdb;
855 if (flags & LDB_FLG_RDONLY) {
856 ltdb->read_only = true;
860 * This maximum length becomes encoded in the index values so
861 * must never change even if LMDB starts to allow longer keys.
862 * The override option is max_key_len_for_self_test, and is
863 * used for testing only.
865 ltdb->max_key_length = LDB_MDB_MAX_KEY_LENGTH;
867 return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);