f5b5de73272297488c604d6e0733f50e9a5f7861
[metze/samba/wip.git] / lib / ldb / ldb_mdb / ldb_mdb.c
1 /*
2    ldb database library using mdb back end
3
4    Copyright (C) Jakub Hrozek 2014
5    Copyright (C) Catalyst.Net Ltd 2017
6
7      ** NOTE! The following LGPL license applies to the ldb
8      ** library. This does NOT imply that all of Samba is released
9      ** under the LGPL
10
11    This library is free software; you can redistribute it and/or
12    modify it under the terms of the GNU Lesser General Public
13    License as published by the Free Software Foundation; either
14    version 3 of the License, or (at your option) any later version.
15
16    This library is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19    Lesser General Public License for more details.
20
21    You should have received a copy of the GNU Lesser General Public
22    License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 */
24
25 #include "ldb_mdb.h"
26 #include "../ldb_tdb/ldb_tdb.h"
27 #include "include/dlinklist.h"
28
29 #define MDB_URL_PREFIX          "mdb://"
30 #define MDB_URL_PREFIX_SIZE     (sizeof(MDB_URL_PREFIX)-1)
31
32 #define MEGABYTE (1024*1024)
33 #define GIGABYTE (1024*1024*1024)
34
35 int ldb_mdb_err_map(int lmdb_err)
36 {
37         switch (lmdb_err) {
38         case MDB_SUCCESS:
39                 return LDB_SUCCESS;
40         case EIO:
41                 return LDB_ERR_OPERATIONS_ERROR;
42         case MDB_INCOMPATIBLE:
43         case MDB_CORRUPTED:
44         case MDB_INVALID:
45                 return LDB_ERR_UNAVAILABLE;
46         case MDB_BAD_TXN:
47         case MDB_BAD_VALSIZE:
48 #ifdef MDB_BAD_DBI
49         case MDB_BAD_DBI:
50 #endif
51         case MDB_PANIC:
52         case EINVAL:
53                 return LDB_ERR_PROTOCOL_ERROR;
54         case MDB_MAP_FULL:
55         case MDB_DBS_FULL:
56         case MDB_READERS_FULL:
57         case MDB_TLS_FULL:
58         case MDB_TXN_FULL:
59         case EAGAIN:
60                 return LDB_ERR_BUSY;
61         case MDB_KEYEXIST:
62                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
63         case MDB_NOTFOUND:
64         case ENOENT:
65                 return LDB_ERR_NO_SUCH_OBJECT;
66         case EACCES:
67                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
68         default:
69                 break;
70         }
71         return LDB_ERR_OTHER;
72 }
73
74 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
75 static int lmdb_error_at(struct ldb_context *ldb,
76                          int ecode,
77                          const char *file,
78                          int line)
79 {
80         int ldb_err = ldb_mdb_err_map(ecode);
81         char *reason = mdb_strerror(ecode);
82         ldb_asprintf_errstring(ldb,
83                                "(%d) - %s at %s:%d",
84                                ecode,
85                                reason,
86                                file,
87                                line);
88         return ldb_err;
89 }
90
91
92 static bool lmdb_transaction_active(struct ltdb_private *ltdb)
93 {
94         return ltdb->lmdb_private->txlist != NULL;
95 }
96
97 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
98 {
99         if (ltx == NULL) {
100                 return NULL;
101         }
102
103         return ltx->tx;
104 }
105
106 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
107 {
108         if (lmdb->txlist) {
109                 talloc_steal(lmdb->txlist, ltx);
110         }
111
112         DLIST_ADD(lmdb->txlist, ltx);
113 }
114
115 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
116 {
117         DLIST_REMOVE(lmdb->txlist, ltx);
118         talloc_free(ltx);
119 }
120
121
122 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
123 {
124         struct lmdb_trans *ltx;
125
126         ltx = lmdb->txlist;
127         return ltx;
128 }
129
130 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
131 {
132         MDB_txn *txn;
133         if (lmdb->read_txn != NULL) {
134                 return lmdb->read_txn;
135         }
136
137         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
138         if (txn == NULL) {
139                 int ret;
140                 ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
141                 if (ret != 0) {
142                         lmdb->error = ret;
143                         ldb_asprintf_errstring(lmdb->ldb,
144                                                "%s failed: %s\n", __FUNCTION__,
145                                                mdb_strerror(ret));
146                 }
147                 lmdb->read_txn = txn;
148         }
149         return txn;
150 }
151
152 static int lmdb_store(struct ltdb_private *ltdb,
153                       struct ldb_val key,
154                       struct ldb_val data, int flags)
155 {
156         struct lmdb_private *lmdb = ltdb->lmdb_private;
157         MDB_val mdb_key;
158         MDB_val mdb_data;
159         int mdb_flags;
160         MDB_txn *txn = NULL;
161         MDB_dbi dbi = 0;
162
163         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
164         if (txn == NULL) {
165                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
166                 lmdb->error = MDB_PANIC;
167                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
168         }
169
170         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
171         if (lmdb->error != MDB_SUCCESS) {
172                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
173         }
174
175         mdb_key.mv_size = key.length;
176         mdb_key.mv_data = key.data;
177
178         mdb_data.mv_size = data.length;
179         mdb_data.mv_data = data.data;
180
181         if (flags == TDB_INSERT) {
182                 mdb_flags = MDB_NOOVERWRITE;
183         } else if ((flags == TDB_MODIFY)) {
184                 /*
185                  * Modifying a record, ensure that it exists.
186                  * This mimics the TDB semantics
187                  */
188                 MDB_val value;
189                 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
190                 if (lmdb->error != MDB_SUCCESS) {
191                         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
192                                 mdb_txn_commit(lmdb->read_txn);
193                                 lmdb->read_txn = NULL;
194                         }
195                         return ldb_mdb_error(lmdb->ldb, lmdb->error);
196                 }
197                 mdb_flags = 0;
198         } else {
199                 mdb_flags = 0;
200         }
201
202         lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
203         if (lmdb->error != MDB_SUCCESS) {
204                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
205         }
206
207         return ldb_mdb_err_map(lmdb->error);
208 }
209
210
211 static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
212 {
213         struct lmdb_private *lmdb = ltdb->lmdb_private;
214         MDB_val mdb_key;
215         MDB_txn *txn = NULL;
216         MDB_dbi dbi = 0;
217
218         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
219         if (txn == NULL) {
220                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
221                 lmdb->error = MDB_PANIC;
222                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
223         }
224
225         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
226         if (lmdb->error != MDB_SUCCESS) {
227                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
228         }
229
230         mdb_key.mv_size = key.length;
231         mdb_key.mv_data = key.data;
232
233         lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
234         if (lmdb->error != MDB_SUCCESS) {
235                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
236         }
237         return ldb_mdb_err_map(lmdb->error);
238 }
239
240 static int lmdb_traverse_fn(struct ltdb_private *ltdb,
241                             ldb_kv_traverse_fn fn,
242                             void *ctx)
243 {
244         struct lmdb_private *lmdb = ltdb->lmdb_private;
245         MDB_val mdb_key;
246         MDB_val mdb_data;
247         MDB_txn *txn = NULL;
248         MDB_dbi dbi = 0;
249         MDB_cursor *cursor = NULL;
250         int ret;
251
252         txn = get_current_txn(lmdb);
253         if (txn == NULL) {
254                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
255                 lmdb->error = MDB_PANIC;
256                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
257         }
258
259         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
260         if (lmdb->error != MDB_SUCCESS) {
261                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
262         }
263
264         lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
265         if (lmdb->error != MDB_SUCCESS) {
266                 goto done;
267         }
268
269         while ((lmdb->error = mdb_cursor_get(
270                         cursor, &mdb_key,
271                         &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
272
273                 struct ldb_val key = {
274                         .length = mdb_key.mv_size,
275                         .data = mdb_key.mv_data,
276                 };
277                 struct ldb_val data = {
278                         .length = mdb_data.mv_size,
279                         .data = mdb_data.mv_data,
280                 };
281
282                 ret = fn(ltdb, key, data, ctx);
283                 if (ret != 0) {
284                         goto done;
285                 }
286         }
287         if (lmdb->error == MDB_NOTFOUND) {
288                 lmdb->error = MDB_SUCCESS;
289         }
290 done:
291         if (cursor != NULL) {
292                 mdb_cursor_close(cursor);
293         }
294
295         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
296                 mdb_txn_commit(lmdb->read_txn);
297                 lmdb->read_txn = NULL;
298         }
299
300         if (lmdb->error != MDB_SUCCESS) {
301                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
302         }
303         return ldb_mdb_err_map(lmdb->error);
304 }
305
306 static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
307                                   struct ldb_val key,
308                                   struct ldb_val key2,
309                                   struct ldb_val data,
310                                   void *state)
311 {
312         struct lmdb_private *lmdb = ltdb->lmdb_private;
313         struct ldb_val copy;
314         int ret = LDB_SUCCESS;
315
316         /*
317          * Need to take a copy of the data as the delete operation alters the
318          * data, as it is in private lmdb memory.
319          */
320         copy.length = data.length;
321         copy.data = talloc_memdup(ltdb, data.data, data.length);
322         if (copy.data == NULL) {
323                 lmdb->error = MDB_PANIC;
324                 return ldb_oom(lmdb->ldb);
325         }
326
327         lmdb->error = lmdb_delete(ltdb, key);
328         if (lmdb->error != MDB_SUCCESS) {
329                 ldb_debug(
330                         lmdb->ldb,
331                         LDB_DEBUG_ERROR,
332                         "Failed to delete %*.*s "
333                         "for rekey as %*.*s: %s",
334                         (int)key.length, (int)key.length,
335                         (const char *)key.data,
336                         (int)key2.length, (int)key2.length,
337                         (const char *)key.data,
338                         mdb_strerror(lmdb->error));
339                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
340                 goto done;
341         }
342         lmdb->error = lmdb_store(ltdb, key2, copy, 0);
343         if (lmdb->error != MDB_SUCCESS) {
344                 ldb_debug(
345                         lmdb->ldb,
346                         LDB_DEBUG_ERROR,
347                         "Failed to rekey %*.*s as %*.*s: %s",
348                         (int)key.length, (int)key.length,
349                         (const char *)key.data,
350                         (int)key2.length, (int)key2.length,
351                         (const char *)key.data,
352                         mdb_strerror(lmdb->error));
353                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
354                 goto done;
355         }
356
357 done:
358         if (copy.data != NULL) {
359                 TALLOC_FREE(copy.data);
360                 copy.length = 0;
361         }
362
363         /*
364          * Explicity invalidate the data, as the delete has done this
365          */
366         data.length = 0;
367         data.data = NULL;
368
369         return ret;
370 }
371 /* Handles only a single record */
372 static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
373                              int (*parser)(struct ldb_val key, struct ldb_val data,
374                                            void *private_data),
375                              void *ctx)
376 {
377         struct lmdb_private *lmdb = ltdb->lmdb_private;
378         MDB_val mdb_key;
379         MDB_val mdb_data;
380         MDB_txn *txn = NULL;
381         MDB_dbi dbi;
382         struct ldb_val data;
383
384         txn = get_current_txn(lmdb);
385         if (txn == NULL) {
386                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
387                 lmdb->error = MDB_PANIC;
388                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
389         }
390
391         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
392         if (lmdb->error != MDB_SUCCESS) {
393                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
394         }
395
396         mdb_key.mv_size = key.length;
397         mdb_key.mv_data = key.data;
398
399         lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
400         if (lmdb->error != MDB_SUCCESS) {
401                 /* TODO closing a handle should not even be necessary */
402                 mdb_dbi_close(lmdb->env, dbi);
403                 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
404                         mdb_txn_commit(lmdb->read_txn);
405                         lmdb->read_txn = NULL;
406                 }
407                 if (lmdb->error == MDB_NOTFOUND) {
408                         return LDB_ERR_NO_SUCH_OBJECT;
409                 }
410                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
411         }
412         data.data = mdb_data.mv_data;
413         data.length = mdb_data.mv_size;
414
415         /* TODO closing a handle should not even be necessary */
416         mdb_dbi_close(lmdb->env, dbi);
417
418         /* We created a read transaction, commit it */
419         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
420                 mdb_txn_commit(lmdb->read_txn);
421                 lmdb->read_txn = NULL;
422         }
423         return parser(key, data, ctx);
424 }
425
426
427 static int lmdb_lock_read(struct ldb_module *module)
428 {
429         void *data = ldb_module_get_private(module);
430         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
431         struct lmdb_private *lmdb = ltdb->lmdb_private;
432
433         lmdb->error = MDB_SUCCESS;
434         if (ltdb->in_transaction == 0 &&
435             ltdb->read_lock_count == 0) {
436                 lmdb->error = mdb_txn_begin(lmdb->env,
437                                             NULL,
438                                             MDB_RDONLY,
439                                             &lmdb->read_txn);
440         }
441         if (lmdb->error != MDB_SUCCESS) {
442                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
443         }
444
445         ltdb->read_lock_count++;
446         return ldb_mdb_err_map(lmdb->error);
447 }
448
449 static int lmdb_unlock_read(struct ldb_module *module)
450 {
451         void *data = ldb_module_get_private(module);
452         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
453
454         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
455                 struct lmdb_private *lmdb = ltdb->lmdb_private;
456                 mdb_txn_commit(lmdb->read_txn);
457                 lmdb->read_txn = NULL;
458                 ltdb->read_lock_count--;
459                 return LDB_SUCCESS;
460         }
461         ltdb->read_lock_count--;
462         return LDB_SUCCESS;
463 }
464
465 static int lmdb_transaction_start(struct ltdb_private *ltdb)
466 {
467         struct lmdb_private *lmdb = ltdb->lmdb_private;
468         struct lmdb_trans *ltx;
469         struct lmdb_trans *ltx_head;
470         MDB_txn *tx_parent;
471
472         ltx = talloc_zero(lmdb, struct lmdb_trans);
473         if (ltx == NULL) {
474                 return ldb_oom(lmdb->ldb);
475         }
476
477         ltx_head = lmdb_private_trans_head(lmdb);
478
479         tx_parent = lmdb_trans_get_tx(ltx_head);
480
481         lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
482         if (lmdb->error != MDB_SUCCESS) {
483                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
484         }
485
486         trans_push(lmdb, ltx);
487
488         return ldb_mdb_err_map(lmdb->error);
489 }
490
491 static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
492 {
493         struct lmdb_trans *ltx;
494         struct lmdb_private *lmdb = ltdb->lmdb_private;
495
496         ltx = lmdb_private_trans_head(lmdb);
497         if (ltx == NULL) {
498                 return LDB_ERR_OPERATIONS_ERROR;
499         }
500
501         mdb_txn_abort(ltx->tx);
502         trans_finished(lmdb, ltx);
503         return LDB_SUCCESS;
504 }
505
506 static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
507 {
508         /* No need to prepare a commit */
509         return LDB_SUCCESS;
510 }
511
512 static int lmdb_transaction_commit(struct ltdb_private *ltdb)
513 {
514         struct lmdb_trans *ltx;
515         struct lmdb_private *lmdb = ltdb->lmdb_private;
516
517         ltx = lmdb_private_trans_head(lmdb);
518         if (ltx == NULL) {
519                 return LDB_ERR_OPERATIONS_ERROR;
520         }
521
522         lmdb->error = mdb_txn_commit(ltx->tx);
523         trans_finished(lmdb, ltx);
524
525         return lmdb->error;
526 }
527
528 static int lmdb_error(struct ltdb_private *ltdb)
529 {
530         return ldb_mdb_err_map(ltdb->lmdb_private->error);
531 }
532
533 static const char *lmdb_errorstr(struct ltdb_private *ltdb)
534 {
535         return mdb_strerror(ltdb->lmdb_private->error);
536 }
537
538 static const char * lmdb_name(struct ltdb_private *ltdb)
539 {
540         return "lmdb";
541 }
542
543 static bool lmdb_changed(struct ltdb_private *ltdb)
544 {
545         /*
546          * lmdb does no provide a quick way to determine if the database
547          * has changed.  This function always returns true.
548          *
549          * Note that tdb uses a sequence number that allows this function
550          * to be implemented efficiently.
551          */
552         return true;
553 }
554
555 static struct kv_db_ops lmdb_key_value_ops = {
556         .store              = lmdb_store,
557         .delete             = lmdb_delete,
558         .iterate            = lmdb_traverse_fn,
559         .update_in_iterate  = lmdb_update_in_iterate,
560         .fetch_and_parse    = lmdb_parse_record,
561         .lock_read          = lmdb_lock_read,
562         .unlock_read        = lmdb_unlock_read,
563         .begin_write        = lmdb_transaction_start,
564         .prepare_write      = lmdb_transaction_prepare_commit,
565         .finish_write       = lmdb_transaction_commit,
566         .abort_write        = lmdb_transaction_cancel,
567         .error              = lmdb_error,
568         .errorstr           = lmdb_errorstr,
569         .name               = lmdb_name,
570         .has_changed        = lmdb_changed,
571         .transaction_active = lmdb_transaction_active,
572 };
573
574 static const char *lmdb_get_path(const char *url)
575 {
576         const char *path;
577
578         /* parse the url */
579         if (strchr(url, ':')) {
580                 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
581                         return NULL;
582                 }
583                 path = url + MDB_URL_PREFIX_SIZE;
584         } else {
585                 path = url;
586         }
587
588         return path;
589 }
590
591 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
592 {
593         struct lmdb_trans *ltx = NULL;
594
595         /*
596          * Close the read transaction if it's open
597          */
598         if (lmdb->read_txn != NULL) {
599                 mdb_txn_abort(lmdb->read_txn);
600         }
601
602         if (lmdb->env == NULL) {
603                 return 0;
604         }
605
606         /*
607          * Abort any currently active transactions
608          */
609         ltx = lmdb_private_trans_head(lmdb);
610         while (ltx != NULL) {
611                 mdb_txn_abort(ltx->tx);
612                 trans_finished(lmdb, ltx);
613                 ltx = lmdb_private_trans_head(lmdb);
614         }
615
616         mdb_env_close(lmdb->env);
617         lmdb->env = NULL;
618
619         return 0;
620 }
621
622 static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
623                                   struct ldb_context *ldb,
624                                   const char *path,
625                                   unsigned int flags,
626                                   struct lmdb_private *lmdb)
627 {
628         int ret;
629         unsigned int mdb_flags;
630
631         if (flags & LDB_FLG_DONT_CREATE_DB) {
632                 struct stat st;
633                 if (stat(path, &st) != 0) {
634                         return LDB_ERR_UNAVAILABLE;
635                 }
636         }
637
638         ret = mdb_env_create(&lmdb->env);
639         if (ret != 0) {
640                 ldb_asprintf_errstring(
641                         ldb,
642                         "Could not create MDB environment %s: %s\n",
643                         path,
644                         mdb_strerror(ret));
645                 return LDB_ERR_OPERATIONS_ERROR;
646         }
647
648         /* Close when lmdb is released */
649         talloc_set_destructor(lmdb, lmdb_pvt_destructor);
650
651         ret = mdb_env_set_mapsize(lmdb->env, 16LL * GIGABYTE);
652         if (ret != 0) {
653                 ldb_asprintf_errstring(
654                         ldb,
655                         "Could not open MDB environment %s: %s\n",
656                         path,
657                         mdb_strerror(ret));
658                 return ldb_mdb_err_map(ret);
659         }
660
661         mdb_env_set_maxreaders(lmdb->env, 100000);
662         /* MDB_NOSUBDIR implies there is a separate file called path and a
663          * separate lockfile called path-lock
664          */
665         mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
666         if (flags & LDB_FLG_RDONLY) {
667                 mdb_flags |= MDB_RDONLY;
668         }
669         ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
670         if (ret != 0) {
671                 ldb_asprintf_errstring(ldb,
672                                 "Could not open DB %s: %s\n",
673                                 path, mdb_strerror(ret));
674                 talloc_free(lmdb);
675                 return ldb_mdb_err_map(ret);
676         }
677
678         return LDB_SUCCESS;
679
680 }
681
682 int lmdb_connect(struct ldb_context *ldb,
683                  const char *url,
684                  unsigned int flags,
685                  const char *options[],
686                  struct ldb_module **_module)
687 {
688         const char *path = NULL;
689         struct lmdb_private *lmdb = NULL;
690         struct ltdb_private *ltdb = NULL;
691         int ret;
692
693         /*
694          * We hold locks, so we must use a private event context
695          * on each returned handle
696          */
697         ldb_set_require_private_event_context(ldb);
698
699         path = lmdb_get_path(url);
700         if (path == NULL) {
701                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
702                 return LDB_ERR_OPERATIONS_ERROR;
703         }
704
705         ltdb = talloc_zero(ldb, struct ltdb_private);
706         if (!ltdb) {
707                 ldb_oom(ldb);
708                 return LDB_ERR_OPERATIONS_ERROR;
709         }
710
711         lmdb = talloc_zero(ldb, struct lmdb_private);
712         if (lmdb == NULL) {
713                 TALLOC_FREE(ltdb);
714                 ldb_oom(ldb);
715                 return LDB_ERR_OPERATIONS_ERROR;
716         }
717         lmdb->ldb = ldb;
718         ltdb->kv_ops = &lmdb_key_value_ops;
719
720         ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
721         if (ret != LDB_SUCCESS) {
722                 return ret;
723         }
724
725         ltdb->lmdb_private = lmdb;
726         if (flags & LDB_FLG_RDONLY) {
727                 ltdb->read_only = true;
728         }
729         return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);
730 }
731