tdb: Remove "header" from tdb_context
[obnox/samba/samba-obnox.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         /* Only a commit is allowed on a prepared transaction */
214         if (tdb->transaction->prepared) {
215                 tdb->ecode = TDB_ERR_EINVAL;
216                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
217                 tdb->transaction->transaction_error = 1;
218                 return -1;
219         }
220
221         /* if the write is to a hash head, then update the transaction
222            hash heads */
223         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
224             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
225                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
226                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
227         }
228
229         /* break it up into block sized chunks */
230         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
231                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
232                 if (transaction_write(tdb, off, buf, len2) != 0) {
233                         return -1;
234                 }
235                 len -= len2;
236                 off += len2;
237                 if (buf != NULL) {
238                         buf = (const void *)(len2 + (const char *)buf);
239                 }
240         }
241
242         if (len == 0) {
243                 return 0;
244         }
245
246         blk = off / tdb->transaction->block_size;
247         off = off % tdb->transaction->block_size;
248
249         if (tdb->transaction->num_blocks <= blk) {
250                 uint8_t **new_blocks;
251                 /* expand the blocks array */
252                 if (tdb->transaction->blocks == NULL) {
253                         new_blocks = (uint8_t **)malloc(
254                                 (blk+1)*sizeof(uint8_t *));
255                 } else {
256                         new_blocks = (uint8_t **)realloc(
257                                 tdb->transaction->blocks,
258                                 (blk+1)*sizeof(uint8_t *));
259                 }
260                 if (new_blocks == NULL) {
261                         tdb->ecode = TDB_ERR_OOM;
262                         goto fail;
263                 }
264                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
265                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
266                 tdb->transaction->blocks = new_blocks;
267                 tdb->transaction->num_blocks = blk+1;
268                 tdb->transaction->last_block_size = 0;
269         }
270
271         /* allocate and fill a block? */
272         if (tdb->transaction->blocks[blk] == NULL) {
273                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
274                 if (tdb->transaction->blocks[blk] == NULL) {
275                         tdb->ecode = TDB_ERR_OOM;
276                         tdb->transaction->transaction_error = 1;
277                         return -1;
278                 }
279                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
280                         tdb_len_t len2 = tdb->transaction->block_size;
281                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
282                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
283                         }
284                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
285                                                                    tdb->transaction->blocks[blk],
286                                                                    len2, 0) != 0) {
287                                 SAFE_FREE(tdb->transaction->blocks[blk]);
288                                 tdb->ecode = TDB_ERR_IO;
289                                 goto fail;
290                         }
291                         if (blk == tdb->transaction->num_blocks-1) {
292                                 tdb->transaction->last_block_size = len2;
293                         }
294                 }
295         }
296
297         /* overwrite part of an existing block */
298         if (buf == NULL) {
299                 memset(tdb->transaction->blocks[blk] + off, 0, len);
300         } else {
301                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
302         }
303         if (blk == tdb->transaction->num_blocks-1) {
304                 if (len + off > tdb->transaction->last_block_size) {
305                         tdb->transaction->last_block_size = len + off;
306                 }
307         }
308
309         return 0;
310
311 fail:
312         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
313                  (blk*tdb->transaction->block_size) + off, len));
314         tdb->transaction->transaction_error = 1;
315         return -1;
316 }
317
318
319 /*
320   write while in a transaction - this variant never expands the transaction blocks, it only
321   updates existing blocks. This means it cannot change the recovery size
322 */
323 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
324                                       const void *buf, tdb_len_t len)
325 {
326         uint32_t blk;
327
328         /* break it up into block sized chunks */
329         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
330                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
331                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
332                         return -1;
333                 }
334                 len -= len2;
335                 off += len2;
336                 if (buf != NULL) {
337                         buf = (const void *)(len2 + (const char *)buf);
338                 }
339         }
340
341         if (len == 0) {
342                 return 0;
343         }
344
345         blk = off / tdb->transaction->block_size;
346         off = off % tdb->transaction->block_size;
347
348         if (tdb->transaction->num_blocks <= blk ||
349             tdb->transaction->blocks[blk] == NULL) {
350                 return 0;
351         }
352
353         if (blk == tdb->transaction->num_blocks-1 &&
354             off + len > tdb->transaction->last_block_size) {
355                 if (off >= tdb->transaction->last_block_size) {
356                         return 0;
357                 }
358                 len = tdb->transaction->last_block_size - off;
359         }
360
361         /* overwrite part of an existing block */
362         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
363
364         return 0;
365 }
366
367
368 /*
369   accelerated hash chain head search, using the cached hash heads
370 */
371 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
372 {
373         uint32_t h = *chain;
374         for (;h < tdb->hash_size;h++) {
375                 /* the +1 takes account of the freelist */
376                 if (0 != tdb->transaction->hash_heads[h+1]) {
377                         break;
378                 }
379         }
380         (*chain) = h;
381 }
382
383 /*
384   out of bounds check during a transaction
385 */
386 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
387                            tdb_len_t len, int probe)
388 {
389         if (off + len >= off && off + len <= tdb->map_size) {
390                 return 0;
391         }
392         tdb->ecode = TDB_ERR_IO;
393         return -1;
394 }
395
396 /*
397   transaction version of tdb_expand().
398 */
399 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
400                                    tdb_off_t addition)
401 {
402         /* add a write to the transaction elements, so subsequent
403            reads see the zero data */
404         if (transaction_write(tdb, size, NULL, addition) != 0) {
405                 return -1;
406         }
407
408         tdb->transaction->expanded = true;
409
410         return 0;
411 }
412
413 static const struct tdb_methods transaction_methods = {
414         transaction_read,
415         transaction_write,
416         transaction_next_hash_chain,
417         transaction_oob,
418         transaction_expand_file,
419 };
420
421
422 /*
423   start a tdb transaction. No token is returned, as only a single
424   transaction is allowed to be pending per tdb_context
425 */
426 static int _tdb_transaction_start(struct tdb_context *tdb,
427                                   enum tdb_lock_flags lockflags)
428 {
429         /* some sanity checks */
430         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
431                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
432                 tdb->ecode = TDB_ERR_EINVAL;
433                 return -1;
434         }
435
436         /* cope with nested tdb_transaction_start() calls */
437         if (tdb->transaction != NULL) {
438                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
439                         tdb->ecode = TDB_ERR_NESTING;
440                         return -1;
441                 }
442                 tdb->transaction->nesting++;
443                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
444                          tdb->transaction->nesting));
445                 return 0;
446         }
447
448         if (tdb_have_extra_locks(tdb)) {
449                 /* the caller must not have any locks when starting a
450                    transaction as otherwise we'll be screwed by lack
451                    of nested locks in posix */
452                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
453                 tdb->ecode = TDB_ERR_LOCK;
454                 return -1;
455         }
456
457         if (tdb->travlocks.next != NULL) {
458                 /* you cannot use transactions inside a traverse (although you can use
459                    traverse inside a transaction) as otherwise you can end up with
460                    deadlock */
461                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
462                 tdb->ecode = TDB_ERR_LOCK;
463                 return -1;
464         }
465
466         tdb->transaction = (struct tdb_transaction *)
467                 calloc(sizeof(struct tdb_transaction), 1);
468         if (tdb->transaction == NULL) {
469                 tdb->ecode = TDB_ERR_OOM;
470                 return -1;
471         }
472
473         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
474         tdb->transaction->block_size = tdb->page_size;
475
476         /* get the transaction write lock. This is a blocking lock. As
477            discussed with Volker, there are a number of ways we could
478            make this async, which we will probably do in the future */
479         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
480                 SAFE_FREE(tdb->transaction->blocks);
481                 SAFE_FREE(tdb->transaction);
482                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
483                         tdb->ecode = TDB_ERR_NOLOCK;
484                 }
485                 return -1;
486         }
487
488         /* get a read lock from the freelist to the end of file. This
489            is upgraded to a write lock during the commit */
490         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
491                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
492                 goto fail_allrecord_lock;
493         }
494
495         /* setup a copy of the hash table heads so the hash scan in
496            traverse can be fast */
497         tdb->transaction->hash_heads = (uint32_t *)
498                 calloc(tdb->hash_size+1, sizeof(uint32_t));
499         if (tdb->transaction->hash_heads == NULL) {
500                 tdb->ecode = TDB_ERR_OOM;
501                 goto fail;
502         }
503         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
504                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
505                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
506                 tdb->ecode = TDB_ERR_IO;
507                 goto fail;
508         }
509
510         /* make sure we know about any file expansions already done by
511            anyone else */
512         tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
513         tdb->transaction->old_map_size = tdb->map_size;
514
515         /* finally hook the io methods, replacing them with
516            transaction specific methods */
517         tdb->transaction->io_methods = tdb->methods;
518         tdb->methods = &transaction_methods;
519
520         /* Trace at the end, so we get sequence number correct. */
521         tdb_trace(tdb, "tdb_transaction_start");
522         return 0;
523
524 fail:
525         tdb_allrecord_unlock(tdb, F_RDLCK, false);
526 fail_allrecord_lock:
527         tdb_transaction_unlock(tdb, F_WRLCK);
528         SAFE_FREE(tdb->transaction->blocks);
529         SAFE_FREE(tdb->transaction->hash_heads);
530         SAFE_FREE(tdb->transaction);
531         return -1;
532 }
533
534 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
535 {
536         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
537 }
538
539 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
540 {
541         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
542 }
543
544 /*
545   sync to disk
546 */
547 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
548 {
549         if (tdb->flags & TDB_NOSYNC) {
550                 return 0;
551         }
552
553 #ifdef HAVE_FDATASYNC
554         if (fdatasync(tdb->fd) != 0) {
555 #else
556         if (fsync(tdb->fd) != 0) {
557 #endif
558                 tdb->ecode = TDB_ERR_IO;
559                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
560                 return -1;
561         }
562 #ifdef HAVE_MMAP
563         if (tdb->map_ptr) {
564                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
565                 if (msync(moffset + (char *)tdb->map_ptr,
566                           length + (offset - moffset), MS_SYNC) != 0) {
567                         tdb->ecode = TDB_ERR_IO;
568                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
569                                  strerror(errno)));
570                         return -1;
571                 }
572         }
573 #endif
574         return 0;
575 }
576
577
578 static int _tdb_transaction_cancel(struct tdb_context *tdb)
579 {
580         int i, ret = 0;
581
582         if (tdb->transaction == NULL) {
583                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
584                 return -1;
585         }
586
587         if (tdb->transaction->nesting != 0) {
588                 tdb->transaction->transaction_error = 1;
589                 tdb->transaction->nesting--;
590                 return 0;
591         }
592
593         tdb->map_size = tdb->transaction->old_map_size;
594
595         /* free all the transaction blocks */
596         for (i=0;i<tdb->transaction->num_blocks;i++) {
597                 if (tdb->transaction->blocks[i] != NULL) {
598                         free(tdb->transaction->blocks[i]);
599                 }
600         }
601         SAFE_FREE(tdb->transaction->blocks);
602
603         if (tdb->transaction->magic_offset) {
604                 const struct tdb_methods *methods = tdb->transaction->io_methods;
605                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
606
607                 /* remove the recovery marker */
608                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
609                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
610                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
611                         ret = -1;
612                 }
613         }
614
615         /* This also removes the OPEN_LOCK, if we have it. */
616         tdb_release_transaction_locks(tdb);
617
618         /* restore the normal io methods */
619         tdb->methods = tdb->transaction->io_methods;
620
621         SAFE_FREE(tdb->transaction->hash_heads);
622         SAFE_FREE(tdb->transaction);
623
624         return ret;
625 }
626
627 /*
628   cancel the current transaction
629 */
630 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
631 {
632         tdb_trace(tdb, "tdb_transaction_cancel");
633         return _tdb_transaction_cancel(tdb);
634 }
635
636 /*
637   work out how much space the linearised recovery data will consume
638 */
639 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
640 {
641         tdb_len_t recovery_size = 0;
642         int i;
643
644         recovery_size = sizeof(uint32_t);
645         for (i=0;i<tdb->transaction->num_blocks;i++) {
646                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
647                         break;
648                 }
649                 if (tdb->transaction->blocks[i] == NULL) {
650                         continue;
651                 }
652                 recovery_size += 2*sizeof(tdb_off_t);
653                 if (i == tdb->transaction->num_blocks-1) {
654                         recovery_size += tdb->transaction->last_block_size;
655                 } else {
656                         recovery_size += tdb->transaction->block_size;
657                 }
658         }
659
660         return recovery_size;
661 }
662
663 int tdb_recovery_area(struct tdb_context *tdb,
664                       const struct tdb_methods *methods,
665                       tdb_off_t *recovery_offset,
666                       struct tdb_record *rec)
667 {
668         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
669                 return -1;
670         }
671
672         if (*recovery_offset == 0) {
673                 rec->rec_len = 0;
674                 return 0;
675         }
676
677         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
678                               DOCONV()) == -1) {
679                 return -1;
680         }
681
682         /* ignore invalid recovery regions: can happen in crash */
683         if (rec->magic != TDB_RECOVERY_MAGIC &&
684             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
685                 *recovery_offset = 0;
686                 rec->rec_len = 0;
687         }
688         return 0;
689 }
690
691 /*
692   allocate the recovery area, or use an existing recovery area if it is
693   large enough
694 */
695 static int tdb_recovery_allocate(struct tdb_context *tdb,
696                                  tdb_len_t *recovery_size,
697                                  tdb_off_t *recovery_offset,
698                                  tdb_len_t *recovery_max_size)
699 {
700         struct tdb_record rec;
701         const struct tdb_methods *methods = tdb->transaction->io_methods;
702         tdb_off_t recovery_head, new_end;
703
704         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
705                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
706                 return -1;
707         }
708
709         *recovery_size = tdb_recovery_size(tdb);
710
711         /* Existing recovery area? */
712         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
713                 /* it fits in the existing area */
714                 *recovery_max_size = rec.rec_len;
715                 *recovery_offset = recovery_head;
716                 return 0;
717         }
718
719         /* If recovery area in middle of file, we need a new one. */
720         if (recovery_head == 0
721             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
722                 /* we need to free up the old recovery area, then allocate a
723                    new one at the end of the file. Note that we cannot use
724                    tdb_allocate() to allocate the new one as that might return
725                    us an area that is being currently used (as of the start of
726                    the transaction) */
727                 if (recovery_head) {
728                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
729                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
730                                          "tdb_recovery_allocate: failed to"
731                                          " free previous recovery area\n"));
732                                 return -1;
733                         }
734
735                         /* the tdb_free() call might have increased
736                          * the recovery size */
737                         *recovery_size = tdb_recovery_size(tdb);
738                 }
739
740                 /* New head will be at end of file. */
741                 recovery_head = tdb->map_size;
742         }
743
744         /* Now we know where it will be. */
745         *recovery_offset = recovery_head;
746
747         /* Expand by more than we need, so we don't do it often. */
748         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
749                                                *recovery_size,
750                                                tdb->page_size)
751                 - sizeof(rec);
752
753         new_end = recovery_head + sizeof(rec) + *recovery_max_size;
754
755         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
756                                      new_end - tdb->transaction->old_map_size)
757             == -1) {
758                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
759                 return -1;
760         }
761
762         /* remap the file (if using mmap) */
763         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
764
765         /* we have to reset the old map size so that we don't try to expand the file
766            again in the transaction commit, which would destroy the recovery area */
767         tdb->transaction->old_map_size = tdb->map_size;
768
769         /* write the recovery header offset and sync - we can sync without a race here
770            as the magic ptr in the recovery record has not been set */
771         CONVERT(recovery_head);
772         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
773                                &recovery_head, sizeof(tdb_off_t)) == -1) {
774                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
775                 return -1;
776         }
777         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
778                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
779                 return -1;
780         }
781
782         return 0;
783 }
784
785
786 /*
787   setup the recovery data that will be used on a crash during commit
788 */
789 static int transaction_setup_recovery(struct tdb_context *tdb,
790                                       tdb_off_t *magic_offset)
791 {
792         tdb_len_t recovery_size;
793         unsigned char *data, *p;
794         const struct tdb_methods *methods = tdb->transaction->io_methods;
795         struct tdb_record *rec;
796         tdb_off_t recovery_offset, recovery_max_size;
797         tdb_off_t old_map_size = tdb->transaction->old_map_size;
798         uint32_t magic, tailer;
799         int i;
800
801         /*
802           check that the recovery area has enough space
803         */
804         if (tdb_recovery_allocate(tdb, &recovery_size,
805                                   &recovery_offset, &recovery_max_size) == -1) {
806                 return -1;
807         }
808
809         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
810         if (data == NULL) {
811                 tdb->ecode = TDB_ERR_OOM;
812                 return -1;
813         }
814
815         rec = (struct tdb_record *)data;
816         memset(rec, 0, sizeof(*rec));
817
818         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
819         rec->data_len = recovery_size;
820         rec->rec_len  = recovery_max_size;
821         rec->key_len  = old_map_size;
822         CONVERT(*rec);
823
824         /* build the recovery data into a single blob to allow us to do a single
825            large write, which should be more efficient */
826         p = data + sizeof(*rec);
827         for (i=0;i<tdb->transaction->num_blocks;i++) {
828                 tdb_off_t offset;
829                 tdb_len_t length;
830
831                 if (tdb->transaction->blocks[i] == NULL) {
832                         continue;
833                 }
834
835                 offset = i * tdb->transaction->block_size;
836                 length = tdb->transaction->block_size;
837                 if (i == tdb->transaction->num_blocks-1) {
838                         length = tdb->transaction->last_block_size;
839                 }
840
841                 if (offset >= old_map_size) {
842                         continue;
843                 }
844                 if (offset + length > tdb->transaction->old_map_size) {
845                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
846                         free(data);
847                         tdb->ecode = TDB_ERR_CORRUPT;
848                         return -1;
849                 }
850                 memcpy(p, &offset, 4);
851                 memcpy(p+4, &length, 4);
852                 if (DOCONV()) {
853                         tdb_convert(p, 8);
854                 }
855                 /* the recovery area contains the old data, not the
856                    new data, so we have to call the original tdb_read
857                    method to get it */
858                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
859                         free(data);
860                         tdb->ecode = TDB_ERR_IO;
861                         return -1;
862                 }
863                 p += 8 + length;
864         }
865
866         /* and the tailer */
867         tailer = sizeof(*rec) + recovery_max_size;
868         memcpy(p, &tailer, 4);
869         if (DOCONV()) {
870                 tdb_convert(p, 4);
871         }
872
873         /* write the recovery data to the recovery area */
874         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
875                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
876                 free(data);
877                 tdb->ecode = TDB_ERR_IO;
878                 return -1;
879         }
880         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
881                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
882                 free(data);
883                 tdb->ecode = TDB_ERR_IO;
884                 return -1;
885         }
886
887         /* as we don't have ordered writes, we have to sync the recovery
888            data before we update the magic to indicate that the recovery
889            data is present */
890         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
891                 free(data);
892                 return -1;
893         }
894
895         free(data);
896
897         magic = TDB_RECOVERY_MAGIC;
898         CONVERT(magic);
899
900         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
901
902         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
903                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
904                 tdb->ecode = TDB_ERR_IO;
905                 return -1;
906         }
907         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
908                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
909                 tdb->ecode = TDB_ERR_IO;
910                 return -1;
911         }
912
913         /* ensure the recovery magic marker is on disk */
914         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
915                 return -1;
916         }
917
918         return 0;
919 }
920
921 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
922 {
923         const struct tdb_methods *methods;
924
925         if (tdb->transaction == NULL) {
926                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
927                 return -1;
928         }
929
930         if (tdb->transaction->prepared) {
931                 tdb->ecode = TDB_ERR_EINVAL;
932                 _tdb_transaction_cancel(tdb);
933                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
934                 return -1;
935         }
936
937         if (tdb->transaction->transaction_error) {
938                 tdb->ecode = TDB_ERR_IO;
939                 _tdb_transaction_cancel(tdb);
940                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
941                 return -1;
942         }
943
944
945         if (tdb->transaction->nesting != 0) {
946                 return 0;
947         }
948
949         /* check for a null transaction */
950         if (tdb->transaction->blocks == NULL) {
951                 return 0;
952         }
953
954         methods = tdb->transaction->io_methods;
955
956         /* if there are any locks pending then the caller has not
957            nested their locks properly, so fail the transaction */
958         if (tdb_have_extra_locks(tdb)) {
959                 tdb->ecode = TDB_ERR_LOCK;
960                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
961                 _tdb_transaction_cancel(tdb);
962                 return -1;
963         }
964
965         /* upgrade the main transaction lock region to a write lock */
966         if (tdb_allrecord_upgrade(tdb) == -1) {
967                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
968                 _tdb_transaction_cancel(tdb);
969                 return -1;
970         }
971
972         /* get the open lock - this prevents new users attaching to the database
973            during the commit */
974         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
975                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
976                 _tdb_transaction_cancel(tdb);
977                 return -1;
978         }
979
980         /* write the recovery data to the end of the file */
981         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
982                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
983                 _tdb_transaction_cancel(tdb);
984                 return -1;
985         }
986
987         tdb->transaction->prepared = true;
988
989         /* expand the file to the new size if needed */
990         if (tdb->map_size != tdb->transaction->old_map_size) {
991                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
992                                              tdb->map_size -
993                                              tdb->transaction->old_map_size) == -1) {
994                         tdb->ecode = TDB_ERR_IO;
995                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
996                         _tdb_transaction_cancel(tdb);
997                         return -1;
998                 }
999                 tdb->map_size = tdb->transaction->old_map_size;
1000                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1001         }
1002
1003         /* Keep the open lock until the actual commit */
1004
1005         return 0;
1006 }
1007
1008 /*
1009    prepare to commit the current transaction
1010 */
1011 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1012 {
1013         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1014         return _tdb_transaction_prepare_commit(tdb);
1015 }
1016
1017 /* A repack is worthwhile if the largest is less than half total free. */
1018 static bool repack_worthwhile(struct tdb_context *tdb)
1019 {
1020         tdb_off_t ptr;
1021         struct tdb_record rec;
1022         tdb_len_t total = 0, largest = 0;
1023
1024         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1025                 return false;
1026         }
1027
1028         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1029                 total += rec.rec_len;
1030                 if (rec.rec_len > largest) {
1031                         largest = rec.rec_len;
1032                 }
1033                 ptr = rec.next;
1034         }
1035
1036         return total > largest * 2;
1037 }
1038
1039 /*
1040   commit the current transaction
1041 */
1042 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1043 {
1044         const struct tdb_methods *methods;
1045         int i;
1046         bool need_repack = false;
1047
1048         if (tdb->transaction == NULL) {
1049                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1050                 return -1;
1051         }
1052
1053         tdb_trace(tdb, "tdb_transaction_commit");
1054
1055         if (tdb->transaction->transaction_error) {
1056                 tdb->ecode = TDB_ERR_IO;
1057                 _tdb_transaction_cancel(tdb);
1058                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1059                 return -1;
1060         }
1061
1062
1063         if (tdb->transaction->nesting != 0) {
1064                 tdb->transaction->nesting--;
1065                 return 0;
1066         }
1067
1068         /* check for a null transaction */
1069         if (tdb->transaction->blocks == NULL) {
1070                 _tdb_transaction_cancel(tdb);
1071                 return 0;
1072         }
1073
1074         if (!tdb->transaction->prepared) {
1075                 int ret = _tdb_transaction_prepare_commit(tdb);
1076                 if (ret)
1077                         return ret;
1078         }
1079
1080         methods = tdb->transaction->io_methods;
1081
1082         /* perform all the writes */
1083         for (i=0;i<tdb->transaction->num_blocks;i++) {
1084                 tdb_off_t offset;
1085                 tdb_len_t length;
1086
1087                 if (tdb->transaction->blocks[i] == NULL) {
1088                         continue;
1089                 }
1090
1091                 offset = i * tdb->transaction->block_size;
1092                 length = tdb->transaction->block_size;
1093                 if (i == tdb->transaction->num_blocks-1) {
1094                         length = tdb->transaction->last_block_size;
1095                 }
1096
1097                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1098                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1099
1100                         /* we've overwritten part of the data and
1101                            possibly expanded the file, so we need to
1102                            run the crash recovery code */
1103                         tdb->methods = methods;
1104                         tdb_transaction_recover(tdb);
1105
1106                         _tdb_transaction_cancel(tdb);
1107
1108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1109                         return -1;
1110                 }
1111                 SAFE_FREE(tdb->transaction->blocks[i]);
1112         }
1113
1114         /* Do this before we drop lock or blocks. */
1115         if (tdb->transaction->expanded) {
1116                 need_repack = repack_worthwhile(tdb);
1117         }
1118
1119         SAFE_FREE(tdb->transaction->blocks);
1120         tdb->transaction->num_blocks = 0;
1121
1122         /* ensure the new data is on disk */
1123         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1124                 return -1;
1125         }
1126
1127         /*
1128           TODO: maybe write to some dummy hdr field, or write to magic
1129           offset without mmap, before the last sync, instead of the
1130           utime() call
1131         */
1132
1133         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1134            don't change the mtime of the file, this means the file may
1135            not be backed up (as tdb rounding to block sizes means that
1136            file size changes are quite rare too). The following forces
1137            mtime changes when a transaction completes */
1138 #ifdef HAVE_UTIME
1139         utime(tdb->name, NULL);
1140 #endif
1141
1142         /* use a transaction cancel to free memory and remove the
1143            transaction locks */
1144         _tdb_transaction_cancel(tdb);
1145
1146         if (need_repack) {
1147                 return tdb_repack(tdb);
1148         }
1149
1150         return 0;
1151 }
1152
1153
1154 /*
1155   recover from an aborted transaction. Must be called with exclusive
1156   database write access already established (including the open
1157   lock to prevent new processes attaching)
1158 */
1159 int tdb_transaction_recover(struct tdb_context *tdb)
1160 {
1161         tdb_off_t recovery_head, recovery_eof;
1162         unsigned char *data, *p;
1163         uint32_t zero = 0;
1164         struct tdb_record rec;
1165
1166         /* find the recovery area */
1167         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1168                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1169                 tdb->ecode = TDB_ERR_IO;
1170                 return -1;
1171         }
1172
1173         if (recovery_head == 0) {
1174                 /* we have never allocated a recovery record */
1175                 return 0;
1176         }
1177
1178         /* read the recovery record */
1179         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1180                                    sizeof(rec), DOCONV()) == -1) {
1181                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1182                 tdb->ecode = TDB_ERR_IO;
1183                 return -1;
1184         }
1185
1186         if (rec.magic != TDB_RECOVERY_MAGIC) {
1187                 /* there is no valid recovery data */
1188                 return 0;
1189         }
1190
1191         if (tdb->read_only) {
1192                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1193                 tdb->ecode = TDB_ERR_CORRUPT;
1194                 return -1;
1195         }
1196
1197         recovery_eof = rec.key_len;
1198
1199         data = (unsigned char *)malloc(rec.data_len);
1200         if (data == NULL) {
1201                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1202                 tdb->ecode = TDB_ERR_OOM;
1203                 return -1;
1204         }
1205
1206         /* read the full recovery data */
1207         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1208                                    rec.data_len, 0) == -1) {
1209                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1210                 tdb->ecode = TDB_ERR_IO;
1211                 return -1;
1212         }
1213
1214         /* recover the file data */
1215         p = data;
1216         while (p+8 < data + rec.data_len) {
1217                 uint32_t ofs, len;
1218                 if (DOCONV()) {
1219                         tdb_convert(p, 8);
1220                 }
1221                 memcpy(&ofs, p, 4);
1222                 memcpy(&len, p+4, 4);
1223
1224                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1225                         free(data);
1226                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1227                         tdb->ecode = TDB_ERR_IO;
1228                         return -1;
1229                 }
1230                 p += 8 + len;
1231         }
1232
1233         free(data);
1234
1235         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1236                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1237                 tdb->ecode = TDB_ERR_IO;
1238                 return -1;
1239         }
1240
1241         /* if the recovery area is after the recovered eof then remove it */
1242         if (recovery_eof <= recovery_head) {
1243                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1244                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1245                         tdb->ecode = TDB_ERR_IO;
1246                         return -1;
1247                 }
1248         }
1249
1250         /* remove the recovery magic */
1251         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1252                           &zero) == -1) {
1253                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1254                 tdb->ecode = TDB_ERR_IO;
1255                 return -1;
1256         }
1257
1258         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1259                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1260                 tdb->ecode = TDB_ERR_IO;
1261                 return -1;
1262         }
1263
1264         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
1265                  recovery_eof));
1266
1267         /* all done */
1268         return 0;
1269 }
1270
1271 /* Any I/O failures we say "needs recovery". */
1272 bool tdb_needs_recovery(struct tdb_context *tdb)
1273 {
1274         tdb_off_t recovery_head;
1275         struct tdb_record rec;
1276
1277         /* find the recovery area */
1278         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1279                 return true;
1280         }
1281
1282         if (recovery_head == 0) {
1283                 /* we have never allocated a recovery record */
1284                 return false;
1285         }
1286
1287         /* read the recovery record */
1288         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1289                                    sizeof(rec), DOCONV()) == -1) {
1290                 return true;
1291         }
1292
1293         return (rec.magic == TDB_RECOVERY_MAGIC);
1294 }