ldb_mdb: Wrap mdb_env_open
[metze/samba/wip.git] / lib / ldb / ldb_mdb / ldb_mdb.c
1 /*
2    ldb database library using mdb back end
3
4    Copyright (C) Jakub Hrozek 2014
5    Copyright (C) Catalyst.Net Ltd 2017
6
7      ** NOTE! The following LGPL license applies to the ldb
8      ** library. This does NOT imply that all of Samba is released
9      ** under the LGPL
10
11    This library is free software; you can redistribute it and/or
12    modify it under the terms of the GNU Lesser General Public
13    License as published by the Free Software Foundation; either
14    version 3 of the License, or (at your option) any later version.
15
16    This library is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19    Lesser General Public License for more details.
20
21    You should have received a copy of the GNU Lesser General Public
22    License along with this library; if not, see <http://www.gnu.org/licenses/>.
23 */
24
25 #include "ldb_mdb.h"
26 #include "../ldb_tdb/ldb_tdb.h"
27 #include "include/dlinklist.h"
28
29 #define MDB_URL_PREFIX          "mdb://"
30 #define MDB_URL_PREFIX_SIZE     (sizeof(MDB_URL_PREFIX)-1)
31
32 #define LDB_MDB_MAX_KEY_LENGTH 511
33
34 #define MEGABYTE (1024*1024)
35 #define GIGABYTE (1024*1024*1024)
36
37 int ldb_mdb_err_map(int lmdb_err)
38 {
39         switch (lmdb_err) {
40         case MDB_SUCCESS:
41                 return LDB_SUCCESS;
42         case EIO:
43                 return LDB_ERR_OPERATIONS_ERROR;
44         case MDB_INCOMPATIBLE:
45         case MDB_CORRUPTED:
46         case MDB_INVALID:
47                 return LDB_ERR_UNAVAILABLE;
48         case MDB_BAD_TXN:
49         case MDB_BAD_VALSIZE:
50 #ifdef MDB_BAD_DBI
51         case MDB_BAD_DBI:
52 #endif
53         case MDB_PANIC:
54         case EINVAL:
55                 return LDB_ERR_PROTOCOL_ERROR;
56         case MDB_MAP_FULL:
57         case MDB_DBS_FULL:
58         case MDB_READERS_FULL:
59         case MDB_TLS_FULL:
60         case MDB_TXN_FULL:
61         case EAGAIN:
62                 return LDB_ERR_BUSY;
63         case MDB_KEYEXIST:
64                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
65         case MDB_NOTFOUND:
66         case ENOENT:
67                 return LDB_ERR_NO_SUCH_OBJECT;
68         case EACCES:
69                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
70         default:
71                 break;
72         }
73         return LDB_ERR_OTHER;
74 }
75
76 #define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
77 static int lmdb_error_at(struct ldb_context *ldb,
78                          int ecode,
79                          const char *file,
80                          int line)
81 {
82         int ldb_err = ldb_mdb_err_map(ecode);
83         char *reason = mdb_strerror(ecode);
84         ldb_asprintf_errstring(ldb,
85                                "(%d) - %s at %s:%d",
86                                ecode,
87                                reason,
88                                file,
89                                line);
90         return ldb_err;
91 }
92
93
94 static bool lmdb_transaction_active(struct ltdb_private *ltdb)
95 {
96         return ltdb->lmdb_private->txlist != NULL;
97 }
98
99 static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
100 {
101         if (ltx == NULL) {
102                 return NULL;
103         }
104
105         return ltx->tx;
106 }
107
108 static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
109 {
110         if (lmdb->txlist) {
111                 talloc_steal(lmdb->txlist, ltx);
112         }
113
114         DLIST_ADD(lmdb->txlist, ltx);
115 }
116
117 static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
118 {
119         DLIST_REMOVE(lmdb->txlist, ltx);
120         talloc_free(ltx);
121 }
122
123
124 static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
125 {
126         struct lmdb_trans *ltx;
127
128         ltx = lmdb->txlist;
129         return ltx;
130 }
131
132 static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
133 {
134         MDB_txn *txn;
135         if (lmdb->read_txn != NULL) {
136                 return lmdb->read_txn;
137         }
138
139         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
140         if (txn == NULL) {
141                 int ret;
142                 ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
143                 if (ret != 0) {
144                         lmdb->error = ret;
145                         ldb_asprintf_errstring(lmdb->ldb,
146                                                "%s failed: %s\n", __FUNCTION__,
147                                                mdb_strerror(ret));
148                 }
149                 lmdb->read_txn = txn;
150         }
151         return txn;
152 }
153
154 static int lmdb_store(struct ltdb_private *ltdb,
155                       struct ldb_val key,
156                       struct ldb_val data, int flags)
157 {
158         struct lmdb_private *lmdb = ltdb->lmdb_private;
159         MDB_val mdb_key;
160         MDB_val mdb_data;
161         int mdb_flags;
162         MDB_txn *txn = NULL;
163         MDB_dbi dbi = 0;
164
165         if (ltdb->read_only) {
166                 return LDB_ERR_UNWILLING_TO_PERFORM;
167         }
168
169         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
170         if (txn == NULL) {
171                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
172                 lmdb->error = MDB_PANIC;
173                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
174         }
175
176         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
177         if (lmdb->error != MDB_SUCCESS) {
178                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
179         }
180
181         mdb_key.mv_size = key.length;
182         mdb_key.mv_data = key.data;
183
184         mdb_data.mv_size = data.length;
185         mdb_data.mv_data = data.data;
186
187         if (flags == TDB_INSERT) {
188                 mdb_flags = MDB_NOOVERWRITE;
189         } else if ((flags == TDB_MODIFY)) {
190                 /*
191                  * Modifying a record, ensure that it exists.
192                  * This mimics the TDB semantics
193                  */
194                 MDB_val value;
195                 lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
196                 if (lmdb->error != MDB_SUCCESS) {
197                         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
198                                 mdb_txn_commit(lmdb->read_txn);
199                                 lmdb->read_txn = NULL;
200                         }
201                         return ldb_mdb_error(lmdb->ldb, lmdb->error);
202                 }
203                 mdb_flags = 0;
204         } else {
205                 mdb_flags = 0;
206         }
207
208         lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
209         if (lmdb->error != MDB_SUCCESS) {
210                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
211         }
212
213         return ldb_mdb_err_map(lmdb->error);
214 }
215
216
217 static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
218 {
219         struct lmdb_private *lmdb = ltdb->lmdb_private;
220         MDB_val mdb_key;
221         MDB_txn *txn = NULL;
222         MDB_dbi dbi = 0;
223
224         if (ltdb->read_only) {
225                 return LDB_ERR_UNWILLING_TO_PERFORM;
226         }
227
228         txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
229         if (txn == NULL) {
230                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
231                 lmdb->error = MDB_PANIC;
232                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
233         }
234
235         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
236         if (lmdb->error != MDB_SUCCESS) {
237                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
238         }
239
240         mdb_key.mv_size = key.length;
241         mdb_key.mv_data = key.data;
242
243         lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
244         if (lmdb->error != MDB_SUCCESS) {
245                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
246         }
247         return ldb_mdb_err_map(lmdb->error);
248 }
249
250 static int lmdb_traverse_fn(struct ltdb_private *ltdb,
251                             ldb_kv_traverse_fn fn,
252                             void *ctx)
253 {
254         struct lmdb_private *lmdb = ltdb->lmdb_private;
255         MDB_val mdb_key;
256         MDB_val mdb_data;
257         MDB_txn *txn = NULL;
258         MDB_dbi dbi = 0;
259         MDB_cursor *cursor = NULL;
260         int ret;
261
262         txn = get_current_txn(lmdb);
263         if (txn == NULL) {
264                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
265                 lmdb->error = MDB_PANIC;
266                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
267         }
268
269         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
270         if (lmdb->error != MDB_SUCCESS) {
271                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
272         }
273
274         lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
275         if (lmdb->error != MDB_SUCCESS) {
276                 goto done;
277         }
278
279         while ((lmdb->error = mdb_cursor_get(
280                         cursor, &mdb_key,
281                         &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
282
283                 struct ldb_val key = {
284                         .length = mdb_key.mv_size,
285                         .data = mdb_key.mv_data,
286                 };
287                 struct ldb_val data = {
288                         .length = mdb_data.mv_size,
289                         .data = mdb_data.mv_data,
290                 };
291
292                 ret = fn(ltdb, key, data, ctx);
293                 if (ret != 0) {
294                         goto done;
295                 }
296         }
297         if (lmdb->error == MDB_NOTFOUND) {
298                 lmdb->error = MDB_SUCCESS;
299         }
300 done:
301         if (cursor != NULL) {
302                 mdb_cursor_close(cursor);
303         }
304
305         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
306                 mdb_txn_commit(lmdb->read_txn);
307                 lmdb->read_txn = NULL;
308         }
309
310         if (lmdb->error != MDB_SUCCESS) {
311                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
312         }
313         return ldb_mdb_err_map(lmdb->error);
314 }
315
316 static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
317                                   struct ldb_val key,
318                                   struct ldb_val key2,
319                                   struct ldb_val data,
320                                   void *state)
321 {
322         struct lmdb_private *lmdb = ltdb->lmdb_private;
323         struct ldb_val copy;
324         int ret = LDB_SUCCESS;
325
326         /*
327          * Need to take a copy of the data as the delete operation alters the
328          * data, as it is in private lmdb memory.
329          */
330         copy.length = data.length;
331         copy.data = talloc_memdup(ltdb, data.data, data.length);
332         if (copy.data == NULL) {
333                 lmdb->error = MDB_PANIC;
334                 return ldb_oom(lmdb->ldb);
335         }
336
337         lmdb->error = lmdb_delete(ltdb, key);
338         if (lmdb->error != MDB_SUCCESS) {
339                 ldb_debug(
340                         lmdb->ldb,
341                         LDB_DEBUG_ERROR,
342                         "Failed to delete %*.*s "
343                         "for rekey as %*.*s: %s",
344                         (int)key.length, (int)key.length,
345                         (const char *)key.data,
346                         (int)key2.length, (int)key2.length,
347                         (const char *)key.data,
348                         mdb_strerror(lmdb->error));
349                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
350                 goto done;
351         }
352         lmdb->error = lmdb_store(ltdb, key2, copy, 0);
353         if (lmdb->error != MDB_SUCCESS) {
354                 ldb_debug(
355                         lmdb->ldb,
356                         LDB_DEBUG_ERROR,
357                         "Failed to rekey %*.*s as %*.*s: %s",
358                         (int)key.length, (int)key.length,
359                         (const char *)key.data,
360                         (int)key2.length, (int)key2.length,
361                         (const char *)key.data,
362                         mdb_strerror(lmdb->error));
363                 ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
364                 goto done;
365         }
366
367 done:
368         if (copy.data != NULL) {
369                 TALLOC_FREE(copy.data);
370                 copy.length = 0;
371         }
372
373         /*
374          * Explicity invalidate the data, as the delete has done this
375          */
376         data.length = 0;
377         data.data = NULL;
378
379         return ret;
380 }
381 /* Handles only a single record */
382 static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
383                              int (*parser)(struct ldb_val key, struct ldb_val data,
384                                            void *private_data),
385                              void *ctx)
386 {
387         struct lmdb_private *lmdb = ltdb->lmdb_private;
388         MDB_val mdb_key;
389         MDB_val mdb_data;
390         MDB_txn *txn = NULL;
391         MDB_dbi dbi;
392         struct ldb_val data;
393
394         txn = get_current_txn(lmdb);
395         if (txn == NULL) {
396                 ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
397                 lmdb->error = MDB_PANIC;
398                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
399         }
400
401         lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
402         if (lmdb->error != MDB_SUCCESS) {
403                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
404         }
405
406         mdb_key.mv_size = key.length;
407         mdb_key.mv_data = key.data;
408
409         lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
410         if (lmdb->error != MDB_SUCCESS) {
411                 /* TODO closing a handle should not even be necessary */
412                 mdb_dbi_close(lmdb->env, dbi);
413                 if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
414                         mdb_txn_commit(lmdb->read_txn);
415                         lmdb->read_txn = NULL;
416                 }
417                 if (lmdb->error == MDB_NOTFOUND) {
418                         return LDB_ERR_NO_SUCH_OBJECT;
419                 }
420                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
421         }
422         data.data = mdb_data.mv_data;
423         data.length = mdb_data.mv_size;
424
425         /* TODO closing a handle should not even be necessary */
426         mdb_dbi_close(lmdb->env, dbi);
427
428         /* We created a read transaction, commit it */
429         if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
430                 mdb_txn_commit(lmdb->read_txn);
431                 lmdb->read_txn = NULL;
432         }
433         return parser(key, data, ctx);
434 }
435
436
437 static int lmdb_lock_read(struct ldb_module *module)
438 {
439         void *data = ldb_module_get_private(module);
440         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
441         struct lmdb_private *lmdb = ltdb->lmdb_private;
442
443         lmdb->error = MDB_SUCCESS;
444         if (ltdb->in_transaction == 0 &&
445             ltdb->read_lock_count == 0) {
446                 lmdb->error = mdb_txn_begin(lmdb->env,
447                                             NULL,
448                                             MDB_RDONLY,
449                                             &lmdb->read_txn);
450         }
451         if (lmdb->error != MDB_SUCCESS) {
452                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
453         }
454
455         ltdb->read_lock_count++;
456         return ldb_mdb_err_map(lmdb->error);
457 }
458
459 static int lmdb_unlock_read(struct ldb_module *module)
460 {
461         void *data = ldb_module_get_private(module);
462         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
463
464         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
465                 struct lmdb_private *lmdb = ltdb->lmdb_private;
466                 mdb_txn_commit(lmdb->read_txn);
467                 lmdb->read_txn = NULL;
468                 ltdb->read_lock_count--;
469                 return LDB_SUCCESS;
470         }
471         ltdb->read_lock_count--;
472         return LDB_SUCCESS;
473 }
474
475 static int lmdb_transaction_start(struct ltdb_private *ltdb)
476 {
477         struct lmdb_private *lmdb = ltdb->lmdb_private;
478         struct lmdb_trans *ltx;
479         struct lmdb_trans *ltx_head;
480         MDB_txn *tx_parent;
481
482         /* Do not take out the transaction lock on a read-only DB */
483         if (ltdb->read_only) {
484                 return LDB_ERR_UNWILLING_TO_PERFORM;
485         }
486
487         ltx = talloc_zero(lmdb, struct lmdb_trans);
488         if (ltx == NULL) {
489                 return ldb_oom(lmdb->ldb);
490         }
491
492         ltx_head = lmdb_private_trans_head(lmdb);
493
494         tx_parent = lmdb_trans_get_tx(ltx_head);
495
496         lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
497         if (lmdb->error != MDB_SUCCESS) {
498                 return ldb_mdb_error(lmdb->ldb, lmdb->error);
499         }
500
501         trans_push(lmdb, ltx);
502
503         return ldb_mdb_err_map(lmdb->error);
504 }
505
506 static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
507 {
508         struct lmdb_trans *ltx;
509         struct lmdb_private *lmdb = ltdb->lmdb_private;
510
511         ltx = lmdb_private_trans_head(lmdb);
512         if (ltx == NULL) {
513                 return LDB_ERR_OPERATIONS_ERROR;
514         }
515
516         mdb_txn_abort(ltx->tx);
517         trans_finished(lmdb, ltx);
518         return LDB_SUCCESS;
519 }
520
521 static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
522 {
523         /* No need to prepare a commit */
524         return LDB_SUCCESS;
525 }
526
527 static int lmdb_transaction_commit(struct ltdb_private *ltdb)
528 {
529         struct lmdb_trans *ltx;
530         struct lmdb_private *lmdb = ltdb->lmdb_private;
531
532         ltx = lmdb_private_trans_head(lmdb);
533         if (ltx == NULL) {
534                 return LDB_ERR_OPERATIONS_ERROR;
535         }
536
537         lmdb->error = mdb_txn_commit(ltx->tx);
538         trans_finished(lmdb, ltx);
539
540         return lmdb->error;
541 }
542
543 static int lmdb_error(struct ltdb_private *ltdb)
544 {
545         return ldb_mdb_err_map(ltdb->lmdb_private->error);
546 }
547
548 static const char *lmdb_errorstr(struct ltdb_private *ltdb)
549 {
550         return mdb_strerror(ltdb->lmdb_private->error);
551 }
552
553 static const char * lmdb_name(struct ltdb_private *ltdb)
554 {
555         return "lmdb";
556 }
557
558 static bool lmdb_changed(struct ltdb_private *ltdb)
559 {
560         /*
561          * lmdb does no provide a quick way to determine if the database
562          * has changed.  This function always returns true.
563          *
564          * Note that tdb uses a sequence number that allows this function
565          * to be implemented efficiently.
566          */
567         return true;
568 }
569
570 static struct kv_db_ops lmdb_key_value_ops = {
571         .store              = lmdb_store,
572         .delete             = lmdb_delete,
573         .iterate            = lmdb_traverse_fn,
574         .update_in_iterate  = lmdb_update_in_iterate,
575         .fetch_and_parse    = lmdb_parse_record,
576         .lock_read          = lmdb_lock_read,
577         .unlock_read        = lmdb_unlock_read,
578         .begin_write        = lmdb_transaction_start,
579         .prepare_write      = lmdb_transaction_prepare_commit,
580         .finish_write       = lmdb_transaction_commit,
581         .abort_write        = lmdb_transaction_cancel,
582         .error              = lmdb_error,
583         .errorstr           = lmdb_errorstr,
584         .name               = lmdb_name,
585         .has_changed        = lmdb_changed,
586         .transaction_active = lmdb_transaction_active,
587 };
588
589 static const char *lmdb_get_path(const char *url)
590 {
591         const char *path;
592
593         /* parse the url */
594         if (strchr(url, ':')) {
595                 if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
596                         return NULL;
597                 }
598                 path = url + MDB_URL_PREFIX_SIZE;
599         } else {
600                 path = url;
601         }
602
603         return path;
604 }
605
606 static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
607 {
608         struct lmdb_trans *ltx = NULL;
609
610         /* Check if this is a forked child */
611         if (getpid() != lmdb->pid) {
612                 int fd = 0;
613                 /*
614                  * We cannot call mdb_env_close or commit any transactions,
615                  * otherwise they might appear finished in the parent.
616                  *
617                  */
618
619                 if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
620                         close(fd);
621                 }
622
623                 /* Remove the pointer, so that no access should occur */
624                 lmdb->env = NULL;
625
626                 return 0;
627         }
628
629         /*
630          * Close the read transaction if it's open
631          */
632         if (lmdb->read_txn != NULL) {
633                 mdb_txn_abort(lmdb->read_txn);
634         }
635
636         if (lmdb->env == NULL) {
637                 return 0;
638         }
639
640         /*
641          * Abort any currently active transactions
642          */
643         ltx = lmdb_private_trans_head(lmdb);
644         while (ltx != NULL) {
645                 mdb_txn_abort(ltx->tx);
646                 trans_finished(lmdb, ltx);
647                 ltx = lmdb_private_trans_head(lmdb);
648         }
649         lmdb->env = NULL;
650
651         return 0;
652 }
653
654 struct mdb_env_wrap {
655         struct mdb_env_wrap *next, *prev;
656         dev_t device;
657         ino_t inode;
658         MDB_env *env;
659         int pid;
660 };
661
662 static struct mdb_env_wrap *mdb_list;
663
664 /* destroy the last connection to an mdb */
665 static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
666 {
667         mdb_env_close(w->env);
668         DLIST_REMOVE(mdb_list, w);
669         return 0;
670 }
671
672 static int lmdb_open_env(TALLOC_CTX *mem_ctx,
673                          MDB_env **env,
674                          struct ldb_context *ldb,
675                          const char *path,
676                          unsigned int flags) {
677         int ret;
678         unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
679         /*
680          * MDB_NOSUBDIR implies there is a separate file called path and a
681          * separate lockfile called path-lock
682          */
683
684         struct mdb_env_wrap *w;
685         struct stat st;
686
687         if (stat(path, &st) == 0) {
688                 for (w=mdb_list;w;w=w->next) {
689                         if (st.st_dev == w->device && st.st_ino == w->inode) {
690                                 /*
691                                  * We must have only one MDB_env per process
692                                  */
693                                 if (!talloc_reference(mem_ctx, w)) {
694                                         return ldb_oom(ldb);
695                                 }
696                                 *env = w->env;
697                                 return LDB_SUCCESS;
698                         }
699                 }
700         }
701
702         w = talloc(mem_ctx, struct mdb_env_wrap);
703         if (w == NULL) {
704                 return ldb_oom(ldb);
705         }
706
707         ret = mdb_env_create(env);
708         if (ret != 0) {
709                 ldb_asprintf_errstring(
710                         ldb,
711                         "Could not create MDB environment %s: %s\n",
712                         path,
713                         mdb_strerror(ret));
714                 return ldb_mdb_err_map(ret);
715         }
716
717         /*
718          * Currently we set a 16Gb maximum database size
719          */
720         ret = mdb_env_set_mapsize(*env, 16LL * GIGABYTE);
721         if (ret != 0) {
722                 ldb_asprintf_errstring(
723                         ldb,
724                         "Could not open MDB environment %s: %s\n",
725                         path,
726                         mdb_strerror(ret));
727                 TALLOC_FREE(w);
728                 return ldb_mdb_err_map(ret);
729         }
730
731         mdb_env_set_maxreaders(*env, 100000);
732         /*
733          * As we ensure that there is only one MDB_env open per database per
734          * process. We can not use the MDB_RDONLY flag, as another ldb may be
735          * opened in read write mode
736          */
737         if (flags & LDB_FLG_NOSYNC) {
738                 mdb_flags |= MDB_NOSYNC;
739         }
740         ret = mdb_env_open(*env, path, mdb_flags, 0644);
741         if (ret != 0) {
742                 ldb_asprintf_errstring(ldb,
743                                 "Could not open DB %s: %s\n",
744                                 path, mdb_strerror(ret));
745                 TALLOC_FREE(w);
746                 return ldb_mdb_err_map(ret);
747         }
748
749         if (stat(path, &st) != 0) {
750                 ldb_asprintf_errstring(
751                         ldb,
752                         "Could not stat %s:\n",
753                         path);
754                 TALLOC_FREE(w);
755                 return LDB_ERR_OPERATIONS_ERROR;
756         }
757         w->env = *env;
758         w->device = st.st_dev;
759         w->inode  = st.st_ino;
760
761         talloc_set_destructor(w, mdb_env_wrap_destructor);
762
763         DLIST_ADD(mdb_list, w);
764
765         return LDB_SUCCESS;
766
767 }
768
769 static int lmdb_pvt_open(struct lmdb_private *lmdb,
770                          struct ldb_context *ldb,
771                          const char *path,
772                          unsigned int flags)
773 {
774         int ret;
775         int lmdb_max_key_length;
776
777         if (flags & LDB_FLG_DONT_CREATE_DB) {
778                 struct stat st;
779                 if (stat(path, &st) != 0) {
780                         return LDB_ERR_UNAVAILABLE;
781                 }
782         }
783
784         ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
785         if (ret != 0) {
786                 ldb_asprintf_errstring(
787                         ldb,
788                         "Could not create MDB environment %s: %s\n",
789                         path,
790                         mdb_strerror(ret));
791                 return LDB_ERR_OPERATIONS_ERROR;
792         }
793
794         /* Close when lmdb is released */
795         talloc_set_destructor(lmdb, lmdb_pvt_destructor);
796
797         /* Store the original pid during the LMDB open */
798         lmdb->pid = getpid();
799
800         lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env);
801
802         /* This will never happen, but if it does make sure to freak out */
803         if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
804                 return ldb_operr(ldb);
805         }
806
807         return LDB_SUCCESS;
808
809 }
810
811 int lmdb_connect(struct ldb_context *ldb,
812                  const char *url,
813                  unsigned int flags,
814                  const char *options[],
815                  struct ldb_module **_module)
816 {
817         const char *path = NULL;
818         struct lmdb_private *lmdb = NULL;
819         struct ltdb_private *ltdb = NULL;
820         int ret;
821
822         /*
823          * We hold locks, so we must use a private event context
824          * on each returned handle
825          */
826         ldb_set_require_private_event_context(ldb);
827
828         path = lmdb_get_path(url);
829         if (path == NULL) {
830                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
831                 return LDB_ERR_OPERATIONS_ERROR;
832         }
833
834         ltdb = talloc_zero(ldb, struct ltdb_private);
835         if (!ltdb) {
836                 ldb_oom(ldb);
837                 return LDB_ERR_OPERATIONS_ERROR;
838         }
839
840         lmdb = talloc_zero(ltdb, struct lmdb_private);
841         if (lmdb == NULL) {
842                 TALLOC_FREE(ltdb);
843                 return ldb_oom(ldb);
844         }
845         lmdb->ldb = ldb;
846         ltdb->kv_ops = &lmdb_key_value_ops;
847
848         ret = lmdb_pvt_open(lmdb, ldb, path, flags);
849         if (ret != LDB_SUCCESS) {
850                 TALLOC_FREE(ltdb);
851                 return ret;
852         }
853
854         ltdb->lmdb_private = lmdb;
855         if (flags & LDB_FLG_RDONLY) {
856                 ltdb->read_only = true;
857         }
858
859         /*
860          * This maximum length becomes encoded in the index values so
861          * must never change even if LMDB starts to allow longer keys.
862          * The override option is max_key_len_for_self_test, and is
863          * used for testing only.
864          */
865         ltdb->max_key_length = LDB_MDB_MAX_KEY_LENGTH;
866
867         return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);
868 }
869