ldb_mdb: Don't allow modify operations on a read only db
[metze/samba/wip.git] / lib / ldb / ldb_mdb / ldb_mdb.c
1 /*
2    ldb database library using mdb back end
3
4    Copyright (C) Jakub Hrozek 2014
5    Copyright (C) Catalyst.Net Ltd 2017
6
7      ** NOTE! The following LGPL license applies to the ldb
8      ** library. This does NOT imply that all of Samba is released
9      ** under the LGPL
10
11    This library is free software; you can redistribute it and/or
12    modify it under the terms of the GNU Lesser General Public
13    License as published by the Free Software Foundation; either
14    version 3 of the License, or (at your option) any later version.
15
16    This library is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19    Lesser General Public License for more details.
20
21    You should have received a copy of the GNU Lesser General Public
22    License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 */
24
25 #include "ldb_mdb.h"
26 #include "../ldb_tdb/ldb_tdb.h"
27 #include "include/dlinklist.h"
28
29 #define MDB_URL_PREFIX          "mdb://"
30 #define MDB_URL_PREFIX_SIZE     (sizeof(MDB_URL_PREFIX)-1)
31
32 #define MEGABYTE (1024*1024)
33 #define GIGABYTE (1024*1024*1024)
34
35 int ldb_mdb_err_map(int lmdb_err)
36 {
37         switch (lmdb_err) {
38         case MDB_SUCCESS:
39                 return LDB_SUCCESS;
40         case EIO:
41                 return LDB_ERR_OPERATIONS_ERROR;
42         case MDB_INCOMPATIBLE:
43         case MDB_CORRUPTED:
44         case MDB_INVALID:
45                 return LDB_ERR_UNAVAILABLE;
46         case MDB_BAD_TXN:
47         case MDB_BAD_VALSIZE:
48 #ifdef MDB_BAD_DBI
49         case MDB_BAD_DBI:
50 #endif
51         case MDB_PANIC:
52         case EINVAL:
53                 return LDB_ERR_PROTOCOL_ERROR;
54         case MDB_MAP_FULL:
55         case MDB_DBS_FULL:
56         case MDB_READERS_FULL:
57         case MDB_TLS_FULL:
58         case MDB_TXN_FULL:
59         case EAGAIN:
60                 return LDB_ERR_BUSY;
61         case MDB_KEYEXIST:
62                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
63         case MDB_NOTFOUND:
64         case ENOENT:
65                 return LDB_ERR_NO_SUCH_OBJECT;
66         case EACCES:
67                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
68         default:
69                 break;
70         }
71         return LDB_ERR_OTHER;
72 }
73
74 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
75 static int lmdb_error_at(struct ldb_context *ldb,
76                          int ecode,
77                          const char *file,
78                          int line)
79 {
80         int ldb_err = ldb_mdb_err_map(ecode);
81         char *reason = mdb_strerror(ecode);
82         ldb_asprintf_errstring(ldb,
83                                "(%d) - %s at %s:%d",
84                                ecode,
85                                reason,
86                                file,
87                                line);
88         return ldb_err;
89 }
90
91
92 static bool lmdb_transaction_active(struct ltdb_private *ltdb)
93 {
94         return ltdb->lmdb_private->txlist != NULL;
95 }
96
97 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
98 {
99         if (ltx == NULL) {
100                 return NULL;
101         }
102
103         return ltx->tx;
104 }
105
106 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
107 {
108         if (lmdb->txlist) {
109                 talloc_steal(lmdb->txlist, ltx);
110         }
111
112         DLIST_ADD(lmdb->txlist, ltx);
113 }
114
115 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
116 {
117         DLIST_REMOVE(lmdb->txlist, ltx);
118         talloc_free(ltx);
119 }
120
121
122 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
123 {
124         struct lmdb_trans *ltx;
125
126         ltx = lmdb->txlist;
127         return ltx;
128 }
129
130 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
131 {
132         MDB_txn *txn;
133         if (lmdb->read_txn != NULL) {
134                 return lmdb->read_txn;
135         }
136
137         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
138         if (txn == NULL) {
139                 int ret;
140                 ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
141                 if (ret != 0) {
142                         lmdb->error = ret;
143                         ldb_asprintf_errstring(lmdb->ldb,
144                                                "%s failed: %s\n", __FUNCTION__,
145                                                mdb_strerror(ret));
146                 }
147                 lmdb->read_txn = txn;
148         }
149         return txn;
150 }
151
152 static int lmdb_store(struct ltdb_private *ltdb,
153                       struct ldb_val key,
154                       struct ldb_val data, int flags)
155 {
156         struct lmdb_private *lmdb = ltdb->lmdb_private;
157         MDB_val mdb_key;
158         MDB_val mdb_data;
159         int mdb_flags;
160         MDB_txn *txn = NULL;
161         MDB_dbi dbi = 0;
162
163         if (ltdb->read_only) {
164                 return LDB_ERR_UNWILLING_TO_PERFORM;
165         }
166
167         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
168         if (txn == NULL) {
169                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
170                 lmdb->error = MDB_PANIC;
171                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
172         }
173
174         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
175         if (lmdb->error != MDB_SUCCESS) {
176                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
177         }
178
179         mdb_key.mv_size = key.length;
180         mdb_key.mv_data = key.data;
181
182         mdb_data.mv_size = data.length;
183         mdb_data.mv_data = data.data;
184
185         if (flags == TDB_INSERT) {
186                 mdb_flags = MDB_NOOVERWRITE;
187         } else if ((flags == TDB_MODIFY)) {
188                 /*
189                  * Modifying a record, ensure that it exists.
190                  * This mimics the TDB semantics
191                  */
192                 MDB_val value;
193                 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
194                 if (lmdb->error != MDB_SUCCESS) {
195                         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
196                                 mdb_txn_commit(lmdb->read_txn);
197                                 lmdb->read_txn = NULL;
198                         }
199                         return ldb_mdb_error(lmdb->ldb, lmdb->error);
200                 }
201                 mdb_flags = 0;
202         } else {
203                 mdb_flags = 0;
204         }
205
206         lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
207         if (lmdb->error != MDB_SUCCESS) {
208                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
209         }
210
211         return ldb_mdb_err_map(lmdb->error);
212 }
213
214
215 static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
216 {
217         struct lmdb_private *lmdb = ltdb->lmdb_private;
218         MDB_val mdb_key;
219         MDB_txn *txn = NULL;
220         MDB_dbi dbi = 0;
221
222         if (ltdb->read_only) {
223                 return LDB_ERR_UNWILLING_TO_PERFORM;
224         }
225
226         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
227         if (txn == NULL) {
228                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
229                 lmdb->error = MDB_PANIC;
230                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
231         }
232
233         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
234         if (lmdb->error != MDB_SUCCESS) {
235                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
236         }
237
238         mdb_key.mv_size = key.length;
239         mdb_key.mv_data = key.data;
240
241         lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
242         if (lmdb->error != MDB_SUCCESS) {
243                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
244         }
245         return ldb_mdb_err_map(lmdb->error);
246 }
247
248 static int lmdb_traverse_fn(struct ltdb_private *ltdb,
249                             ldb_kv_traverse_fn fn,
250                             void *ctx)
251 {
252         struct lmdb_private *lmdb = ltdb->lmdb_private;
253         MDB_val mdb_key;
254         MDB_val mdb_data;
255         MDB_txn *txn = NULL;
256         MDB_dbi dbi = 0;
257         MDB_cursor *cursor = NULL;
258         int ret;
259
260         txn = get_current_txn(lmdb);
261         if (txn == NULL) {
262                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
263                 lmdb->error = MDB_PANIC;
264                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
265         }
266
267         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
268         if (lmdb->error != MDB_SUCCESS) {
269                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
270         }
271
272         lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
273         if (lmdb->error != MDB_SUCCESS) {
274                 goto done;
275         }
276
277         while ((lmdb->error = mdb_cursor_get(
278                         cursor, &mdb_key,
279                         &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
280
281                 struct ldb_val key = {
282                         .length = mdb_key.mv_size,
283                         .data = mdb_key.mv_data,
284                 };
285                 struct ldb_val data = {
286                         .length = mdb_data.mv_size,
287                         .data = mdb_data.mv_data,
288                 };
289
290                 ret = fn(ltdb, key, data, ctx);
291                 if (ret != 0) {
292                         goto done;
293                 }
294         }
295         if (lmdb->error == MDB_NOTFOUND) {
296                 lmdb->error = MDB_SUCCESS;
297         }
298 done:
299         if (cursor != NULL) {
300                 mdb_cursor_close(cursor);
301         }
302
303         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
304                 mdb_txn_commit(lmdb->read_txn);
305                 lmdb->read_txn = NULL;
306         }
307
308         if (lmdb->error != MDB_SUCCESS) {
309                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
310         }
311         return ldb_mdb_err_map(lmdb->error);
312 }
313
314 static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
315                                   struct ldb_val key,
316                                   struct ldb_val key2,
317                                   struct ldb_val data,
318                                   void *state)
319 {
320         struct lmdb_private *lmdb = ltdb->lmdb_private;
321         struct ldb_val copy;
322         int ret = LDB_SUCCESS;
323
324         /*
325          * Need to take a copy of the data as the delete operation alters the
326          * data, as it is in private lmdb memory.
327          */
328         copy.length = data.length;
329         copy.data = talloc_memdup(ltdb, data.data, data.length);
330         if (copy.data == NULL) {
331                 lmdb->error = MDB_PANIC;
332                 return ldb_oom(lmdb->ldb);
333         }
334
335         lmdb->error = lmdb_delete(ltdb, key);
336         if (lmdb->error != MDB_SUCCESS) {
337                 ldb_debug(
338                         lmdb->ldb,
339                         LDB_DEBUG_ERROR,
340                         "Failed to delete %*.*s "
341                         "for rekey as %*.*s: %s",
342                         (int)key.length, (int)key.length,
343                         (const char *)key.data,
344                         (int)key2.length, (int)key2.length,
345                         (const char *)key.data,
346                         mdb_strerror(lmdb->error));
347                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
348                 goto done;
349         }
350         lmdb->error = lmdb_store(ltdb, key2, copy, 0);
351         if (lmdb->error != MDB_SUCCESS) {
352                 ldb_debug(
353                         lmdb->ldb,
354                         LDB_DEBUG_ERROR,
355                         "Failed to rekey %*.*s as %*.*s: %s",
356                         (int)key.length, (int)key.length,
357                         (const char *)key.data,
358                         (int)key2.length, (int)key2.length,
359                         (const char *)key.data,
360                         mdb_strerror(lmdb->error));
361                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
362                 goto done;
363         }
364
365 done:
366         if (copy.data != NULL) {
367                 TALLOC_FREE(copy.data);
368                 copy.length = 0;
369         }
370
371         /*
372          * Explicity invalidate the data, as the delete has done this
373          */
374         data.length = 0;
375         data.data = NULL;
376
377         return ret;
378 }
379 /* Handles only a single record */
380 static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
381                              int (*parser)(struct ldb_val key, struct ldb_val data,
382                                            void *private_data),
383                              void *ctx)
384 {
385         struct lmdb_private *lmdb = ltdb->lmdb_private;
386         MDB_val mdb_key;
387         MDB_val mdb_data;
388         MDB_txn *txn = NULL;
389         MDB_dbi dbi;
390         struct ldb_val data;
391
392         txn = get_current_txn(lmdb);
393         if (txn == NULL) {
394                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
395                 lmdb->error = MDB_PANIC;
396                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
397         }
398
399         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
400         if (lmdb->error != MDB_SUCCESS) {
401                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
402         }
403
404         mdb_key.mv_size = key.length;
405         mdb_key.mv_data = key.data;
406
407         lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
408         if (lmdb->error != MDB_SUCCESS) {
409                 /* TODO closing a handle should not even be necessary */
410                 mdb_dbi_close(lmdb->env, dbi);
411                 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
412                         mdb_txn_commit(lmdb->read_txn);
413                         lmdb->read_txn = NULL;
414                 }
415                 if (lmdb->error == MDB_NOTFOUND) {
416                         return LDB_ERR_NO_SUCH_OBJECT;
417                 }
418                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
419         }
420         data.data = mdb_data.mv_data;
421         data.length = mdb_data.mv_size;
422
423         /* TODO closing a handle should not even be necessary */
424         mdb_dbi_close(lmdb->env, dbi);
425
426         /* We created a read transaction, commit it */
427         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
428                 mdb_txn_commit(lmdb->read_txn);
429                 lmdb->read_txn = NULL;
430         }
431         return parser(key, data, ctx);
432 }
433
434
435 static int lmdb_lock_read(struct ldb_module *module)
436 {
437         void *data = ldb_module_get_private(module);
438         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
439         struct lmdb_private *lmdb = ltdb->lmdb_private;
440
441         lmdb->error = MDB_SUCCESS;
442         if (ltdb->in_transaction == 0 &&
443             ltdb->read_lock_count == 0) {
444                 lmdb->error = mdb_txn_begin(lmdb->env,
445                                             NULL,
446                                             MDB_RDONLY,
447                                             &lmdb->read_txn);
448         }
449         if (lmdb->error != MDB_SUCCESS) {
450                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
451         }
452
453         ltdb->read_lock_count++;
454         return ldb_mdb_err_map(lmdb->error);
455 }
456
457 static int lmdb_unlock_read(struct ldb_module *module)
458 {
459         void *data = ldb_module_get_private(module);
460         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
461
462         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
463                 struct lmdb_private *lmdb = ltdb->lmdb_private;
464                 mdb_txn_commit(lmdb->read_txn);
465                 lmdb->read_txn = NULL;
466                 ltdb->read_lock_count--;
467                 return LDB_SUCCESS;
468         }
469         ltdb->read_lock_count--;
470         return LDB_SUCCESS;
471 }
472
473 static int lmdb_transaction_start(struct ltdb_private *ltdb)
474 {
475         struct lmdb_private *lmdb = ltdb->lmdb_private;
476         struct lmdb_trans *ltx;
477         struct lmdb_trans *ltx_head;
478         MDB_txn *tx_parent;
479
480         /* Do not take out the transaction lock on a read-only DB */
481         if (ltdb->read_only) {
482                 return LDB_ERR_UNWILLING_TO_PERFORM;
483         }
484
485         ltx = talloc_zero(lmdb, struct lmdb_trans);
486         if (ltx == NULL) {
487                 return ldb_oom(lmdb->ldb);
488         }
489
490         ltx_head = lmdb_private_trans_head(lmdb);
491
492         tx_parent = lmdb_trans_get_tx(ltx_head);
493
494         lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
495         if (lmdb->error != MDB_SUCCESS) {
496                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
497         }
498
499         trans_push(lmdb, ltx);
500
501         return ldb_mdb_err_map(lmdb->error);
502 }
503
504 static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
505 {
506         struct lmdb_trans *ltx;
507         struct lmdb_private *lmdb = ltdb->lmdb_private;
508
509         ltx = lmdb_private_trans_head(lmdb);
510         if (ltx == NULL) {
511                 return LDB_ERR_OPERATIONS_ERROR;
512         }
513
514         mdb_txn_abort(ltx->tx);
515         trans_finished(lmdb, ltx);
516         return LDB_SUCCESS;
517 }
518
519 static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
520 {
521         /* No need to prepare a commit */
522         return LDB_SUCCESS;
523 }
524
525 static int lmdb_transaction_commit(struct ltdb_private *ltdb)
526 {
527         struct lmdb_trans *ltx;
528         struct lmdb_private *lmdb = ltdb->lmdb_private;
529
530         ltx = lmdb_private_trans_head(lmdb);
531         if (ltx == NULL) {
532                 return LDB_ERR_OPERATIONS_ERROR;
533         }
534
535         lmdb->error = mdb_txn_commit(ltx->tx);
536         trans_finished(lmdb, ltx);
537
538         return lmdb->error;
539 }
540
541 static int lmdb_error(struct ltdb_private *ltdb)
542 {
543         return ldb_mdb_err_map(ltdb->lmdb_private->error);
544 }
545
546 static const char *lmdb_errorstr(struct ltdb_private *ltdb)
547 {
548         return mdb_strerror(ltdb->lmdb_private->error);
549 }
550
551 static const char * lmdb_name(struct ltdb_private *ltdb)
552 {
553         return "lmdb";
554 }
555
556 static bool lmdb_changed(struct ltdb_private *ltdb)
557 {
558         /*
559          * lmdb does no provide a quick way to determine if the database
560          * has changed.  This function always returns true.
561          *
562          * Note that tdb uses a sequence number that allows this function
563          * to be implemented efficiently.
564          */
565         return true;
566 }
567
568 static struct kv_db_ops lmdb_key_value_ops = {
569         .store              = lmdb_store,
570         .delete             = lmdb_delete,
571         .iterate            = lmdb_traverse_fn,
572         .update_in_iterate  = lmdb_update_in_iterate,
573         .fetch_and_parse    = lmdb_parse_record,
574         .lock_read          = lmdb_lock_read,
575         .unlock_read        = lmdb_unlock_read,
576         .begin_write        = lmdb_transaction_start,
577         .prepare_write      = lmdb_transaction_prepare_commit,
578         .finish_write       = lmdb_transaction_commit,
579         .abort_write        = lmdb_transaction_cancel,
580         .error              = lmdb_error,
581         .errorstr           = lmdb_errorstr,
582         .name               = lmdb_name,
583         .has_changed        = lmdb_changed,
584         .transaction_active = lmdb_transaction_active,
585 };
586
587 static const char *lmdb_get_path(const char *url)
588 {
589         const char *path;
590
591         /* parse the url */
592         if (strchr(url, ':')) {
593                 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
594                         return NULL;
595                 }
596                 path = url + MDB_URL_PREFIX_SIZE;
597         } else {
598                 path = url;
599         }
600
601         return path;
602 }
603
604 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
605 {
606         struct lmdb_trans *ltx = NULL;
607
608         /* Check if this is a forked child */
609         if (getpid() != lmdb->pid) {
610                 int fd = 0;
611                 /*
612                  * We cannot call mdb_env_close or commit any transactions,
613                  * otherwise they might appear finished in the parent.
614                  *
615                  */
616
617                 if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
618                         close(fd);
619                 }
620
621                 /* Remove the pointer, so that no access should occur */
622                 lmdb->env = NULL;
623
624                 return 0;
625         }
626
627         /*
628          * Close the read transaction if it's open
629          */
630         if (lmdb->read_txn != NULL) {
631                 mdb_txn_abort(lmdb->read_txn);
632         }
633
634         if (lmdb->env == NULL) {
635                 return 0;
636         }
637
638         /*
639          * Abort any currently active transactions
640          */
641         ltx = lmdb_private_trans_head(lmdb);
642         while (ltx != NULL) {
643                 mdb_txn_abort(ltx->tx);
644                 trans_finished(lmdb, ltx);
645                 ltx = lmdb_private_trans_head(lmdb);
646         }
647
648         mdb_env_close(lmdb->env);
649         lmdb->env = NULL;
650
651         return 0;
652 }
653
654 static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
655                                   struct ldb_context *ldb,
656                                   const char *path,
657                                   unsigned int flags,
658                                   struct lmdb_private *lmdb)
659 {
660         int ret;
661         unsigned int mdb_flags;
662
663         if (flags & LDB_FLG_DONT_CREATE_DB) {
664                 struct stat st;
665                 if (stat(path, &st) != 0) {
666                         return LDB_ERR_UNAVAILABLE;
667                 }
668         }
669
670         ret = mdb_env_create(&lmdb->env);
671         if (ret != 0) {
672                 ldb_asprintf_errstring(
673                         ldb,
674                         "Could not create MDB environment %s: %s\n",
675                         path,
676                         mdb_strerror(ret));
677                 return LDB_ERR_OPERATIONS_ERROR;
678         }
679
680         /* Close when lmdb is released */
681         talloc_set_destructor(lmdb, lmdb_pvt_destructor);
682
683         ret = mdb_env_set_mapsize(lmdb->env, 16LL * GIGABYTE);
684         if (ret != 0) {
685                 ldb_asprintf_errstring(
686                         ldb,
687                         "Could not open MDB environment %s: %s\n",
688                         path,
689                         mdb_strerror(ret));
690                 return ldb_mdb_err_map(ret);
691         }
692
693         mdb_env_set_maxreaders(lmdb->env, 100000);
694         /* MDB_NOSUBDIR implies there is a separate file called path and a
695          * separate lockfile called path-lock
696          */
697         mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
698         if (flags & LDB_FLG_RDONLY) {
699                 mdb_flags |= MDB_RDONLY;
700         }
701         if (flags & LDB_FLG_NOSYNC) {
702                 mdb_flags |= MDB_NOSYNC;
703         }
704         ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
705         if (ret != 0) {
706                 ldb_asprintf_errstring(ldb,
707                                 "Could not open DB %s: %s\n",
708                                 path, mdb_strerror(ret));
709                 talloc_free(lmdb);
710                 return ldb_mdb_err_map(ret);
711         }
712
713         /* Store the original pid during the LMDB open */
714         lmdb->pid = getpid();
715
716         return LDB_SUCCESS;
717
718 }
719
720 int lmdb_connect(struct ldb_context *ldb,
721                  const char *url,
722                  unsigned int flags,
723                  const char *options[],
724                  struct ldb_module **_module)
725 {
726         const char *path = NULL;
727         struct lmdb_private *lmdb = NULL;
728         struct ltdb_private *ltdb = NULL;
729         int ret;
730
731         /*
732          * We hold locks, so we must use a private event context
733          * on each returned handle
734          */
735         ldb_set_require_private_event_context(ldb);
736
737         path = lmdb_get_path(url);
738         if (path == NULL) {
739                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
740                 return LDB_ERR_OPERATIONS_ERROR;
741         }
742
743         ltdb = talloc_zero(ldb, struct ltdb_private);
744         if (!ltdb) {
745                 ldb_oom(ldb);
746                 return LDB_ERR_OPERATIONS_ERROR;
747         }
748
749         lmdb = talloc_zero(ldb, struct lmdb_private);
750         if (lmdb == NULL) {
751                 TALLOC_FREE(ltdb);
752                 ldb_oom(ldb);
753                 return LDB_ERR_OPERATIONS_ERROR;
754         }
755         lmdb->ldb = ldb;
756         ltdb->kv_ops = &lmdb_key_value_ops;
757
758         ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
759         if (ret != LDB_SUCCESS) {
760                 return ret;
761         }
762
763         ltdb->lmdb_private = lmdb;
764         if (flags & LDB_FLG_RDONLY) {
765                 ltdb->read_only = true;
766         }
767         return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);
768 }
769