2 ldb database library using mdb back end
4 Copyright (C) Jakub Hrozek 2014
5 Copyright (C) Catalyst.Net Ltd 2017
7 ** NOTE! The following LGPL license applies to the ldb
8 ** library. This does NOT imply that all of Samba is released
11 This library is free software; you can redistribute it and/or
12 modify it under the terms of the GNU Lesser General Public
13 License as published by the Free Software Foundation; either
14 version 3 of the License, or (at your option) any later version.
16 This library is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19 Lesser General Public License for more details.
21 You should have received a copy of the GNU Lesser General Public
22 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "../ldb_tdb/ldb_tdb.h"
27 #include "include/dlinklist.h"
29 #define MDB_URL_PREFIX "mdb://"
30 #define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1)
32 #define MEGABYTE (1024*1024)
33 #define GIGABYTE (1024*1024*1024)
35 int ldb_mdb_err_map(int lmdb_err)
41 return LDB_ERR_OPERATIONS_ERROR;
42 case MDB_INCOMPATIBLE:
45 return LDB_ERR_UNAVAILABLE;
53 return LDB_ERR_PROTOCOL_ERROR;
56 case MDB_READERS_FULL:
62 return LDB_ERR_ENTRY_ALREADY_EXISTS;
65 return LDB_ERR_NO_SUCH_OBJECT;
67 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
74 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
75 static int lmdb_error_at(struct ldb_context *ldb,
80 int ldb_err = ldb_mdb_err_map(ecode);
81 char *reason = mdb_strerror(ecode);
82 ldb_asprintf_errstring(ldb,
92 static bool lmdb_transaction_active(struct ltdb_private *ltdb)
94 return ltdb->lmdb_private->txlist != NULL;
97 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
106 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
109 talloc_steal(lmdb->txlist, ltx);
112 DLIST_ADD(lmdb->txlist, ltx);
115 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
117 DLIST_REMOVE(lmdb->txlist, ltx);
122 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
124 struct lmdb_trans *ltx;
130 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
133 if (lmdb->read_txn != NULL) {
134 return lmdb->read_txn;
137 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
140 ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
143 ldb_asprintf_errstring(lmdb->ldb,
144 "%s failed: %s\n", __FUNCTION__,
147 lmdb->read_txn = txn;
152 static int lmdb_store(struct ltdb_private *ltdb,
154 struct ldb_val data, int flags)
156 struct lmdb_private *lmdb = ltdb->lmdb_private;
163 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
165 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
166 lmdb->error = MDB_PANIC;
167 return ldb_mdb_error(lmdb->ldb, lmdb->error);
170 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
171 if (lmdb->error != MDB_SUCCESS) {
172 return ldb_mdb_error(lmdb->ldb, lmdb->error);
175 mdb_key.mv_size = key.length;
176 mdb_key.mv_data = key.data;
178 mdb_data.mv_size = data.length;
179 mdb_data.mv_data = data.data;
181 if (flags == TDB_INSERT) {
182 mdb_flags = MDB_NOOVERWRITE;
183 } else if ((flags == TDB_MODIFY)) {
185 * Modifying a record, ensure that it exists.
186 * This mimics the TDB semantics
189 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
190 if (lmdb->error != MDB_SUCCESS) {
191 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
192 mdb_txn_commit(lmdb->read_txn);
193 lmdb->read_txn = NULL;
195 return ldb_mdb_error(lmdb->ldb, lmdb->error);
202 lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
203 if (lmdb->error != MDB_SUCCESS) {
204 return ldb_mdb_error(lmdb->ldb, lmdb->error);
207 return ldb_mdb_err_map(lmdb->error);
211 static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
213 struct lmdb_private *lmdb = ltdb->lmdb_private;
218 txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
220 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
221 lmdb->error = MDB_PANIC;
222 return ldb_mdb_error(lmdb->ldb, lmdb->error);
225 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
226 if (lmdb->error != MDB_SUCCESS) {
227 return ldb_mdb_error(lmdb->ldb, lmdb->error);
230 mdb_key.mv_size = key.length;
231 mdb_key.mv_data = key.data;
233 lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
234 if (lmdb->error != MDB_SUCCESS) {
235 return ldb_mdb_error(lmdb->ldb, lmdb->error);
237 return ldb_mdb_err_map(lmdb->error);
240 static int lmdb_traverse_fn(struct ltdb_private *ltdb,
241 ldb_kv_traverse_fn fn,
244 struct lmdb_private *lmdb = ltdb->lmdb_private;
249 MDB_cursor *cursor = NULL;
252 txn = get_current_txn(lmdb);
254 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
255 lmdb->error = MDB_PANIC;
256 return ldb_mdb_error(lmdb->ldb, lmdb->error);
259 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
260 if (lmdb->error != MDB_SUCCESS) {
261 return ldb_mdb_error(lmdb->ldb, lmdb->error);
264 lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
265 if (lmdb->error != MDB_SUCCESS) {
269 while ((lmdb->error = mdb_cursor_get(
271 &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
273 struct ldb_val key = {
274 .length = mdb_key.mv_size,
275 .data = mdb_key.mv_data,
277 struct ldb_val data = {
278 .length = mdb_data.mv_size,
279 .data = mdb_data.mv_data,
282 ret = fn(ltdb, key, data, ctx);
287 if (lmdb->error == MDB_NOTFOUND) {
288 lmdb->error = MDB_SUCCESS;
291 if (cursor != NULL) {
292 mdb_cursor_close(cursor);
295 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
296 mdb_txn_commit(lmdb->read_txn);
297 lmdb->read_txn = NULL;
300 if (lmdb->error != MDB_SUCCESS) {
301 return ldb_mdb_error(lmdb->ldb, lmdb->error);
303 return ldb_mdb_err_map(lmdb->error);
306 static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
312 struct lmdb_private *lmdb = ltdb->lmdb_private;
314 int ret = LDB_SUCCESS;
317 * Need to take a copy of the data as the delete operation alters the
318 * data, as it is in private lmdb memory.
320 copy.length = data.length;
321 copy.data = talloc_memdup(ltdb, data.data, data.length);
322 if (copy.data == NULL) {
323 lmdb->error = MDB_PANIC;
324 return ldb_oom(lmdb->ldb);
327 lmdb->error = lmdb_delete(ltdb, key);
328 if (lmdb->error != MDB_SUCCESS) {
332 "Failed to delete %*.*s "
333 "for rekey as %*.*s: %s",
334 (int)key.length, (int)key.length,
335 (const char *)key.data,
336 (int)key2.length, (int)key2.length,
337 (const char *)key.data,
338 mdb_strerror(lmdb->error));
339 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
342 lmdb->error = lmdb_store(ltdb, key2, copy, 0);
343 if (lmdb->error != MDB_SUCCESS) {
347 "Failed to rekey %*.*s as %*.*s: %s",
348 (int)key.length, (int)key.length,
349 (const char *)key.data,
350 (int)key2.length, (int)key2.length,
351 (const char *)key.data,
352 mdb_strerror(lmdb->error));
353 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
358 if (copy.data != NULL) {
359 TALLOC_FREE(copy.data);
364 * Explicity invalidate the data, as the delete has done this
371 /* Handles only a single record */
372 static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
373 int (*parser)(struct ldb_val key, struct ldb_val data,
377 struct lmdb_private *lmdb = ltdb->lmdb_private;
384 txn = get_current_txn(lmdb);
386 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
387 lmdb->error = MDB_PANIC;
388 return ldb_mdb_error(lmdb->ldb, lmdb->error);
391 lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
392 if (lmdb->error != MDB_SUCCESS) {
393 return ldb_mdb_error(lmdb->ldb, lmdb->error);
396 mdb_key.mv_size = key.length;
397 mdb_key.mv_data = key.data;
399 lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
400 if (lmdb->error != MDB_SUCCESS) {
401 /* TODO closing a handle should not even be necessary */
402 mdb_dbi_close(lmdb->env, dbi);
403 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
404 mdb_txn_commit(lmdb->read_txn);
405 lmdb->read_txn = NULL;
407 if (lmdb->error == MDB_NOTFOUND) {
408 return LDB_ERR_NO_SUCH_OBJECT;
410 return ldb_mdb_error(lmdb->ldb, lmdb->error);
412 data.data = mdb_data.mv_data;
413 data.length = mdb_data.mv_size;
415 /* TODO closing a handle should not even be necessary */
416 mdb_dbi_close(lmdb->env, dbi);
418 /* We created a read transaction, commit it */
419 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
420 mdb_txn_commit(lmdb->read_txn);
421 lmdb->read_txn = NULL;
423 return parser(key, data, ctx);
427 static int lmdb_lock_read(struct ldb_module *module)
429 void *data = ldb_module_get_private(module);
430 struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
431 struct lmdb_private *lmdb = ltdb->lmdb_private;
433 lmdb->error = MDB_SUCCESS;
434 if (ltdb->in_transaction == 0 &&
435 ltdb->read_lock_count == 0) {
436 lmdb->error = mdb_txn_begin(lmdb->env,
441 if (lmdb->error != MDB_SUCCESS) {
442 return ldb_mdb_error(lmdb->ldb, lmdb->error);
445 ltdb->read_lock_count++;
446 return ldb_mdb_err_map(lmdb->error);
449 static int lmdb_unlock_read(struct ldb_module *module)
451 void *data = ldb_module_get_private(module);
452 struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
454 if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
455 struct lmdb_private *lmdb = ltdb->lmdb_private;
456 mdb_txn_commit(lmdb->read_txn);
457 lmdb->read_txn = NULL;
458 ltdb->read_lock_count--;
461 ltdb->read_lock_count--;
465 static int lmdb_transaction_start(struct ltdb_private *ltdb)
467 struct lmdb_private *lmdb = ltdb->lmdb_private;
468 struct lmdb_trans *ltx;
469 struct lmdb_trans *ltx_head;
472 ltx = talloc_zero(lmdb, struct lmdb_trans);
474 return ldb_oom(lmdb->ldb);
477 ltx_head = lmdb_private_trans_head(lmdb);
479 tx_parent = lmdb_trans_get_tx(ltx_head);
481 lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, <x->tx);
482 if (lmdb->error != MDB_SUCCESS) {
483 return ldb_mdb_error(lmdb->ldb, lmdb->error);
486 trans_push(lmdb, ltx);
488 return ldb_mdb_err_map(lmdb->error);
491 static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
493 struct lmdb_trans *ltx;
494 struct lmdb_private *lmdb = ltdb->lmdb_private;
496 ltx = lmdb_private_trans_head(lmdb);
498 return LDB_ERR_OPERATIONS_ERROR;
501 mdb_txn_abort(ltx->tx);
502 trans_finished(lmdb, ltx);
506 static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
508 /* No need to prepare a commit */
512 static int lmdb_transaction_commit(struct ltdb_private *ltdb)
514 struct lmdb_trans *ltx;
515 struct lmdb_private *lmdb = ltdb->lmdb_private;
517 ltx = lmdb_private_trans_head(lmdb);
519 return LDB_ERR_OPERATIONS_ERROR;
522 lmdb->error = mdb_txn_commit(ltx->tx);
523 trans_finished(lmdb, ltx);
528 static int lmdb_error(struct ltdb_private *ltdb)
530 return ldb_mdb_err_map(ltdb->lmdb_private->error);
533 static const char *lmdb_errorstr(struct ltdb_private *ltdb)
535 return mdb_strerror(ltdb->lmdb_private->error);
538 static const char * lmdb_name(struct ltdb_private *ltdb)
543 static bool lmdb_changed(struct ltdb_private *ltdb)
546 * lmdb does no provide a quick way to determine if the database
547 * has changed. This function always returns true.
549 * Note that tdb uses a sequence number that allows this function
550 * to be implemented efficiently.
555 static struct kv_db_ops lmdb_key_value_ops = {
557 .delete = lmdb_delete,
558 .iterate = lmdb_traverse_fn,
559 .update_in_iterate = lmdb_update_in_iterate,
560 .fetch_and_parse = lmdb_parse_record,
561 .lock_read = lmdb_lock_read,
562 .unlock_read = lmdb_unlock_read,
563 .begin_write = lmdb_transaction_start,
564 .prepare_write = lmdb_transaction_prepare_commit,
565 .finish_write = lmdb_transaction_commit,
566 .abort_write = lmdb_transaction_cancel,
568 .errorstr = lmdb_errorstr,
570 .has_changed = lmdb_changed,
571 .transaction_active = lmdb_transaction_active,
574 static const char *lmdb_get_path(const char *url)
579 if (strchr(url, ':')) {
580 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
583 path = url + MDB_URL_PREFIX_SIZE;
591 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
593 struct lmdb_trans *ltx = NULL;
596 * Close the read transaction if it's open
598 if (lmdb->read_txn != NULL) {
599 mdb_txn_abort(lmdb->read_txn);
602 if (lmdb->env == NULL) {
607 * Abort any currently active transactions
609 ltx = lmdb_private_trans_head(lmdb);
610 while (ltx != NULL) {
611 mdb_txn_abort(ltx->tx);
612 trans_finished(lmdb, ltx);
613 ltx = lmdb_private_trans_head(lmdb);
616 mdb_env_close(lmdb->env);
622 static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
623 struct ldb_context *ldb,
626 struct lmdb_private *lmdb)
629 unsigned int mdb_flags;
631 if (flags & LDB_FLG_DONT_CREATE_DB) {
633 if (stat(path, &st) != 0) {
634 return LDB_ERR_UNAVAILABLE;
638 ret = mdb_env_create(&lmdb->env);
640 ldb_asprintf_errstring(
642 "Could not create MDB environment %s: %s\n",
645 return LDB_ERR_OPERATIONS_ERROR;
648 /* Close when lmdb is released */
649 talloc_set_destructor(lmdb, lmdb_pvt_destructor);
651 ret = mdb_env_set_mapsize(lmdb->env, 16LL * GIGABYTE);
653 ldb_asprintf_errstring(
655 "Could not open MDB environment %s: %s\n",
658 return ldb_mdb_err_map(ret);
661 mdb_env_set_maxreaders(lmdb->env, 100000);
662 /* MDB_NOSUBDIR implies there is a separate file called path and a
663 * separate lockfile called path-lock
665 mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
666 if (flags & LDB_FLG_RDONLY) {
667 mdb_flags |= MDB_RDONLY;
669 ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
671 ldb_asprintf_errstring(ldb,
672 "Could not open DB %s: %s\n",
673 path, mdb_strerror(ret));
675 return ldb_mdb_err_map(ret);
682 int lmdb_connect(struct ldb_context *ldb,
685 const char *options[],
686 struct ldb_module **_module)
688 const char *path = NULL;
689 struct lmdb_private *lmdb = NULL;
690 struct ltdb_private *ltdb = NULL;
694 * We hold locks, so we must use a private event context
695 * on each returned handle
697 ldb_set_require_private_event_context(ldb);
699 path = lmdb_get_path(url);
701 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
702 return LDB_ERR_OPERATIONS_ERROR;
705 ltdb = talloc_zero(ldb, struct ltdb_private);
708 return LDB_ERR_OPERATIONS_ERROR;
711 lmdb = talloc_zero(ldb, struct lmdb_private);
715 return LDB_ERR_OPERATIONS_ERROR;
718 ltdb->kv_ops = &lmdb_key_value_ops;
720 ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
721 if (ret != LDB_SUCCESS) {
725 ltdb->lmdb_private = lmdb;
726 if (flags & LDB_FLG_RDONLY) {
727 ltdb->read_only = true;
729 return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);