2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 #include "tdb_private.h"
31 - only allow a single transaction at a time per database. This makes
32 using the transaction API simpler, as otherwise the caller would
33 have to cope with temporary failures in transactions that conflict
34 with other current transactions
36 - keep the transaction recovery information in the same file as the
37 database, using a special 'transaction recovery' record pointed at
38 by the header. This removes the need for extra journal files as
39 used by some other databases
41 - dynamically allocated the transaction recover record, re-using it
42 for subsequent transactions. If a larger record is needed then
43 tdb_free() the old record to place it on the normal tdb freelist
44 before allocating the new record
46 - during transactions, keep a linked list of writes all that have
47 been performed by intercepting all tdb_write() calls. The hooked
48 transaction versions of tdb_read() and tdb_write() check this
49 linked list and try to use the elements of the list in preference
52 - don't allow any locks to be held when a transaction starts,
53 otherwise we can end up with deadlock (plus lack of lock nesting
54 in posix locks would mean the lock is lost)
56 - if the caller gains a lock during the transaction but doesn't
57 release it then fail the commit
59 - allow for nested calls to tdb_transaction_start(), re-using the
60 existing transaction record. If the inner transaction is cancelled
61 then a subsequent commit will fail
63 - keep a mirrored copy of the tdb hash chain heads to allow for the
64 fast hash heads scan on traverse, updating the mirrored copy in
65 the transaction version of tdb_write
67 - allow callers to mix transaction and non-transaction use of tdb,
68 although once a transaction is started then an exclusive lock is
69 gained until the transaction is committed or cancelled
71 - the commit stategy involves first saving away all modified data
72 into a linearised buffer in the transaction recovery area, then
73 marking the transaction recovery area with a magic value to
74 indicate a valid recovery record. In total 4 fsync/msync calls are
75 needed per commit to prevent race conditions. It might be possible
76 to reduce this to 3 or even 2 with some more work.
78 - check for a valid recovery record on open of the tdb, while the
79 global lock is held. Automatically recover from the transaction
80 recovery area if needed, then continue with the open as
81 usual. This allows for smooth crash recovery with no administrator
84 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85 still available, but no transaction recovery area is used and no
86 fsync/msync calls are made.
92 hold the context of any current transaction
94 struct tdb_transaction {
95 /* we keep a mirrored copy of the tdb hash heads here so
96 tdb_next_hash_chain() can operate efficiently */
99 /* the original io methods - used to do IOs to the real db */
100 const struct tdb_methods *io_methods;
102 /* the list of transaction blocks. When a block is first
103 written to, it gets created in this list */
106 uint32_t block_size; /* bytes in each block */
107 uint32_t last_block_size; /* number of valid bytes in the last block */
109 /* non-zero when an internal transaction error has
110 occurred. All write operations will then fail until the
111 transaction is ended */
112 int transaction_error;
114 /* when inside a transaction we need to keep track of any
115 nested tdb_transaction_start() calls, as these are allowed,
116 but don't create a new transaction */
119 /* old file size before transaction */
120 tdb_len_t old_map_size;
125 read while in a transaction. We need to check first if the data is in our list
126 of transaction elements, then if not do a real read
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
129 tdb_len_t len, int cv)
133 /* break it down into block sized ops */
134 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
141 buf = (void *)(len2 + (char *)buf);
148 blk = off / tdb->transaction->block_size;
150 /* see if we have it in the block list */
151 if (tdb->transaction->num_blocks <= blk ||
152 tdb->transaction->blocks[blk] == NULL) {
153 /* nope, do a real read */
154 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
160 /* it is in the block list. Now check for the last block */
161 if (blk == tdb->transaction->num_blocks-1) {
162 if (len > tdb->transaction->last_block_size) {
167 /* now copy it out of this block */
168 memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
170 tdb_convert(buf, len);
175 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176 tdb->ecode = TDB_ERR_IO;
177 tdb->transaction->transaction_error = 1;
183 write while in a transaction
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
186 const void *buf, tdb_len_t len)
190 /* if the write is to a hash head, then update the transaction
192 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
198 /* break it up into block sized chunks */
199 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201 if (transaction_write(tdb, off, buf, len2) != 0) {
207 buf = (const void *)(len2 + (const char *)buf);
215 blk = off / tdb->transaction->block_size;
216 off = off % tdb->transaction->block_size;
218 if (tdb->transaction->num_blocks <= blk) {
219 uint8_t **new_blocks;
220 /* expand the blocks array */
221 if (tdb->transaction->blocks == NULL) {
222 new_blocks = (uint8_t **)malloc(
223 (blk+1)*sizeof(uint8_t *));
225 new_blocks = (uint8_t **)realloc(
226 tdb->transaction->blocks,
227 (blk+1)*sizeof(uint8_t *));
229 if (new_blocks == NULL) {
230 tdb->ecode = TDB_ERR_OOM;
233 memset(&new_blocks[tdb->transaction->num_blocks], 0,
234 (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235 tdb->transaction->blocks = new_blocks;
236 tdb->transaction->num_blocks = blk+1;
237 tdb->transaction->last_block_size = 0;
240 /* allocate and fill a block? */
241 if (tdb->transaction->blocks[blk] == NULL) {
242 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243 if (tdb->transaction->blocks[blk] == NULL) {
244 tdb->ecode = TDB_ERR_OOM;
245 tdb->transaction->transaction_error = 1;
248 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249 tdb_len_t len2 = tdb->transaction->block_size;
250 if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
253 if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
254 tdb->transaction->blocks[blk],
256 SAFE_FREE(tdb->transaction->blocks[blk]);
257 tdb->ecode = TDB_ERR_IO;
260 if (blk == tdb->transaction->num_blocks-1) {
261 tdb->transaction->last_block_size = len2;
266 /* overwrite part of an existing block */
268 memset(tdb->transaction->blocks[blk] + off, 0, len);
270 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
272 if (blk == tdb->transaction->num_blocks-1) {
273 if (len + off > tdb->transaction->last_block_size) {
274 tdb->transaction->last_block_size = len + off;
281 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
282 (blk*tdb->transaction->block_size) + off, len));
283 tdb->transaction->transaction_error = 1;
289 write while in a transaction - this varient never expands the transaction blocks, it only
290 updates existing blocks. This means it cannot change the recovery size
292 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
293 const void *buf, tdb_len_t len)
297 /* break it up into block sized chunks */
298 while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
306 buf = (const void *)(len2 + (const char *)buf);
314 blk = off / tdb->transaction->block_size;
315 off = off % tdb->transaction->block_size;
317 if (tdb->transaction->num_blocks <= blk ||
318 tdb->transaction->blocks[blk] == NULL) {
322 if (blk == tdb->transaction->num_blocks-1 &&
323 off + len > tdb->transaction->last_block_size) {
324 len = tdb->transaction->last_block_size - off;
327 /* overwrite part of an existing block */
328 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
335 accelerated hash chain head search, using the cached hash heads
337 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
340 for (;h < tdb->header.hash_size;h++) {
341 /* the +1 takes account of the freelist */
342 if (0 != tdb->transaction->hash_heads[h+1]) {
350 out of bounds check during a transaction
352 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
354 if (len <= tdb->map_size) {
357 return TDB_ERRCODE(TDB_ERR_IO, -1);
361 transaction version of tdb_expand().
363 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
366 /* add a write to the transaction elements, so subsequent
367 reads see the zero data */
368 if (transaction_write(tdb, size, NULL, addition) != 0) {
376 brlock during a transaction - ignore them
378 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
379 int rw_type, int lck_type, int probe, size_t len)
384 static const struct tdb_methods transaction_methods = {
387 transaction_next_hash_chain,
389 transaction_expand_file,
395 start a tdb transaction. No token is returned, as only a single
396 transaction is allowed to be pending per tdb_context
398 int tdb_transaction_start(struct tdb_context *tdb)
400 /* some sanity checks */
401 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
402 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
403 tdb->ecode = TDB_ERR_EINVAL;
407 /* cope with nested tdb_transaction_start() calls */
408 if (tdb->transaction != NULL) {
409 tdb->transaction->nesting++;
410 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
411 tdb->transaction->nesting));
415 if (tdb->num_locks != 0 || tdb->global_lock.count) {
416 /* the caller must not have any locks when starting a
417 transaction as otherwise we'll be screwed by lack
418 of nested locks in posix */
419 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
420 tdb->ecode = TDB_ERR_LOCK;
424 if (tdb->travlocks.next != NULL) {
425 /* you cannot use transactions inside a traverse (although you can use
426 traverse inside a transaction) as otherwise you can end up with
428 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
429 tdb->ecode = TDB_ERR_LOCK;
433 tdb->transaction = (struct tdb_transaction *)
434 calloc(sizeof(struct tdb_transaction), 1);
435 if (tdb->transaction == NULL) {
436 tdb->ecode = TDB_ERR_OOM;
440 /* a page at a time seems like a reasonable compromise between compactness and efficiency */
441 tdb->transaction->block_size = tdb->page_size;
443 /* get the transaction write lock. This is a blocking lock. As
444 discussed with Volker, there are a number of ways we could
445 make this async, which we will probably do in the future */
446 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
447 SAFE_FREE(tdb->transaction->blocks);
448 SAFE_FREE(tdb->transaction);
452 /* get a read lock from the freelist to the end of file. This
453 is upgraded to a write lock during the commit */
454 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
455 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
456 tdb->ecode = TDB_ERR_LOCK;
460 /* setup a copy of the hash table heads so the hash scan in
461 traverse can be fast */
462 tdb->transaction->hash_heads = (uint32_t *)
463 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
464 if (tdb->transaction->hash_heads == NULL) {
465 tdb->ecode = TDB_ERR_OOM;
468 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
469 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
470 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
471 tdb->ecode = TDB_ERR_IO;
475 /* make sure we know about any file expansions already done by
477 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
478 tdb->transaction->old_map_size = tdb->map_size;
480 /* finally hook the io methods, replacing them with
481 transaction specific methods */
482 tdb->transaction->io_methods = tdb->methods;
483 tdb->methods = &transaction_methods;
488 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
489 tdb_transaction_unlock(tdb);
490 SAFE_FREE(tdb->transaction->blocks);
491 SAFE_FREE(tdb->transaction->hash_heads);
492 SAFE_FREE(tdb->transaction);
498 cancel the current transaction
500 int tdb_transaction_cancel(struct tdb_context *tdb)
504 if (tdb->transaction == NULL) {
505 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
509 if (tdb->transaction->nesting != 0) {
510 tdb->transaction->transaction_error = 1;
511 tdb->transaction->nesting--;
515 tdb->map_size = tdb->transaction->old_map_size;
517 /* free all the transaction blocks */
518 for (i=0;i<tdb->transaction->num_blocks;i++) {
519 if (tdb->transaction->blocks[i] != NULL) {
520 free(tdb->transaction->blocks[i]);
523 SAFE_FREE(tdb->transaction->blocks);
525 /* remove any global lock created during the transaction */
526 if (tdb->global_lock.count != 0) {
527 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
528 tdb->global_lock.count = 0;
531 /* remove any locks created during the transaction */
532 if (tdb->num_locks != 0) {
533 for (i=0;i<tdb->num_lockrecs;i++) {
534 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
535 F_UNLCK,F_SETLKW, 0, 1);
538 tdb->num_lockrecs = 0;
539 SAFE_FREE(tdb->lockrecs);
542 /* restore the normal io methods */
543 tdb->methods = tdb->transaction->io_methods;
545 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
546 tdb_transaction_unlock(tdb);
547 SAFE_FREE(tdb->transaction->hash_heads);
548 SAFE_FREE(tdb->transaction);
556 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
558 if (fsync(tdb->fd) != 0) {
559 tdb->ecode = TDB_ERR_IO;
560 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
565 tdb_off_t moffset = offset & ~(tdb->page_size-1);
566 if (msync(moffset + (char *)tdb->map_ptr,
567 length + (offset - moffset), MS_SYNC) != 0) {
568 tdb->ecode = TDB_ERR_IO;
569 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
580 work out how much space the linearised recovery data will consume
582 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
584 tdb_len_t recovery_size = 0;
587 recovery_size = sizeof(uint32_t);
588 for (i=0;i<tdb->transaction->num_blocks;i++) {
589 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
592 if (tdb->transaction->blocks[i] == NULL) {
595 recovery_size += 2*sizeof(tdb_off_t);
596 if (i == tdb->transaction->num_blocks-1) {
597 recovery_size += tdb->transaction->last_block_size;
599 recovery_size += tdb->transaction->block_size;
603 return recovery_size;
607 allocate the recovery area, or use an existing recovery area if it is
610 static int tdb_recovery_allocate(struct tdb_context *tdb,
611 tdb_len_t *recovery_size,
612 tdb_off_t *recovery_offset,
613 tdb_len_t *recovery_max_size)
615 struct list_struct rec;
616 const struct tdb_methods *methods = tdb->transaction->io_methods;
617 tdb_off_t recovery_head;
619 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
620 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
626 if (recovery_head != 0 &&
627 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
628 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
632 *recovery_size = tdb_recovery_size(tdb);
634 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
635 /* it fits in the existing area */
636 *recovery_max_size = rec.rec_len;
637 *recovery_offset = recovery_head;
641 /* we need to free up the old recovery area, then allocate a
642 new one at the end of the file. Note that we cannot use
643 tdb_allocate() to allocate the new one as that might return
644 us an area that is being currently used (as of the start of
646 if (recovery_head != 0) {
647 if (tdb_free(tdb, recovery_head, &rec) == -1) {
648 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
653 /* the tdb_free() call might have increased the recovery size */
654 *recovery_size = tdb_recovery_size(tdb);
656 /* round up to a multiple of page size */
657 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
658 *recovery_offset = tdb->map_size;
659 recovery_head = *recovery_offset;
661 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
662 (tdb->map_size - tdb->transaction->old_map_size) +
663 sizeof(rec) + *recovery_max_size) == -1) {
664 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
668 /* remap the file (if using mmap) */
669 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
671 /* we have to reset the old map size so that we don't try to expand the file
672 again in the transaction commit, which would destroy the recovery area */
673 tdb->transaction->old_map_size = tdb->map_size;
675 /* write the recovery header offset and sync - we can sync without a race here
676 as the magic ptr in the recovery record has not been set */
677 CONVERT(recovery_head);
678 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
679 &recovery_head, sizeof(tdb_off_t)) == -1) {
680 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
683 if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
684 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
693 setup the recovery data that will be used on a crash during commit
695 static int transaction_setup_recovery(struct tdb_context *tdb,
696 tdb_off_t *magic_offset)
698 tdb_len_t recovery_size;
699 unsigned char *data, *p;
700 const struct tdb_methods *methods = tdb->transaction->io_methods;
701 struct list_struct *rec;
702 tdb_off_t recovery_offset, recovery_max_size;
703 tdb_off_t old_map_size = tdb->transaction->old_map_size;
704 uint32_t magic, tailer;
708 check that the recovery area has enough space
710 if (tdb_recovery_allocate(tdb, &recovery_size,
711 &recovery_offset, &recovery_max_size) == -1) {
715 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
717 tdb->ecode = TDB_ERR_OOM;
721 rec = (struct list_struct *)data;
722 memset(rec, 0, sizeof(*rec));
725 rec->data_len = recovery_size;
726 rec->rec_len = recovery_max_size;
727 rec->key_len = old_map_size;
730 /* build the recovery data into a single blob to allow us to do a single
731 large write, which should be more efficient */
732 p = data + sizeof(*rec);
733 for (i=0;i<tdb->transaction->num_blocks;i++) {
737 if (tdb->transaction->blocks[i] == NULL) {
741 offset = i * tdb->transaction->block_size;
742 length = tdb->transaction->block_size;
743 if (i == tdb->transaction->num_blocks-1) {
744 length = tdb->transaction->last_block_size;
747 if (offset >= old_map_size) {
750 if (offset + length > tdb->transaction->old_map_size) {
751 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
753 tdb->ecode = TDB_ERR_CORRUPT;
756 memcpy(p, &offset, 4);
757 memcpy(p+4, &length, 4);
761 /* the recovery area contains the old data, not the
762 new data, so we have to call the original tdb_read
764 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
766 tdb->ecode = TDB_ERR_IO;
773 tailer = sizeof(*rec) + recovery_max_size;
774 memcpy(p, &tailer, 4);
777 /* write the recovery data to the recovery area */
778 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
779 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
781 tdb->ecode = TDB_ERR_IO;
784 if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
785 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
787 tdb->ecode = TDB_ERR_IO;
791 /* as we don't have ordered writes, we have to sync the recovery
792 data before we update the magic to indicate that the recovery
794 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
801 magic = TDB_RECOVERY_MAGIC;
804 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
806 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
807 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
808 tdb->ecode = TDB_ERR_IO;
811 if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
812 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
813 tdb->ecode = TDB_ERR_IO;
817 /* ensure the recovery magic marker is on disk */
818 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
826 commit the current transaction
828 int tdb_transaction_commit(struct tdb_context *tdb)
830 const struct tdb_methods *methods;
831 tdb_off_t magic_offset = 0;
835 if (tdb->transaction == NULL) {
836 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
840 if (tdb->transaction->transaction_error) {
841 tdb->ecode = TDB_ERR_IO;
842 tdb_transaction_cancel(tdb);
843 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
848 if (tdb->transaction->nesting != 0) {
849 tdb->transaction->nesting--;
853 /* check for a null transaction */
854 if (tdb->transaction->blocks == NULL) {
855 tdb_transaction_cancel(tdb);
859 methods = tdb->transaction->io_methods;
861 /* if there are any locks pending then the caller has not
862 nested their locks properly, so fail the transaction */
863 if (tdb->num_locks || tdb->global_lock.count) {
864 tdb->ecode = TDB_ERR_LOCK;
865 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
866 tdb_transaction_cancel(tdb);
870 /* upgrade the main transaction lock region to a write lock */
871 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
872 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
873 tdb->ecode = TDB_ERR_LOCK;
874 tdb_transaction_cancel(tdb);
878 /* get the global lock - this prevents new users attaching to the database
880 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
881 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
882 tdb->ecode = TDB_ERR_LOCK;
883 tdb_transaction_cancel(tdb);
887 if (!(tdb->flags & TDB_NOSYNC)) {
888 /* write the recovery data to the end of the file */
889 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
890 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
891 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
892 tdb_transaction_cancel(tdb);
897 /* expand the file to the new size if needed */
898 if (tdb->map_size != tdb->transaction->old_map_size) {
899 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
901 tdb->transaction->old_map_size) == -1) {
902 tdb->ecode = TDB_ERR_IO;
903 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
904 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
905 tdb_transaction_cancel(tdb);
908 tdb->map_size = tdb->transaction->old_map_size;
909 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
912 /* perform all the writes */
913 for (i=0;i<tdb->transaction->num_blocks;i++) {
917 if (tdb->transaction->blocks[i] == NULL) {
921 offset = i * tdb->transaction->block_size;
922 length = tdb->transaction->block_size;
923 if (i == tdb->transaction->num_blocks-1) {
924 length = tdb->transaction->last_block_size;
927 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
928 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
930 /* we've overwritten part of the data and
931 possibly expanded the file, so we need to
932 run the crash recovery code */
933 tdb->methods = methods;
934 tdb_transaction_recover(tdb);
936 tdb_transaction_cancel(tdb);
937 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
939 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
942 SAFE_FREE(tdb->transaction->blocks[i]);
945 SAFE_FREE(tdb->transaction->blocks);
946 tdb->transaction->num_blocks = 0;
948 if (!(tdb->flags & TDB_NOSYNC)) {
949 /* ensure the new data is on disk */
950 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
954 /* remove the recovery marker */
955 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
956 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
960 /* ensure the recovery marker has been removed on disk */
961 if (transaction_sync(tdb, magic_offset, 4) == -1) {
966 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
969 TODO: maybe write to some dummy hdr field, or write to magic
970 offset without mmap, before the last sync, instead of the
974 /* on some systems (like Linux 2.6.x) changes via mmap/msync
975 don't change the mtime of the file, this means the file may
976 not be backed up (as tdb rounding to block sizes means that
977 file size changes are quite rare too). The following forces
978 mtime changes when a transaction completes */
980 utime(tdb->name, NULL);
983 /* use a transaction cancel to free memory and remove the
985 tdb_transaction_cancel(tdb);
992 recover from an aborted transaction. Must be called with exclusive
993 database write access already established (including the global
994 lock to prevent new processes attaching)
996 int tdb_transaction_recover(struct tdb_context *tdb)
998 tdb_off_t recovery_head, recovery_eof;
999 unsigned char *data, *p;
1001 struct list_struct rec;
1003 /* find the recovery area */
1004 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1005 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1006 tdb->ecode = TDB_ERR_IO;
1010 if (recovery_head == 0) {
1011 /* we have never allocated a recovery record */
1015 /* read the recovery record */
1016 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1017 sizeof(rec), DOCONV()) == -1) {
1018 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1019 tdb->ecode = TDB_ERR_IO;
1023 if (rec.magic != TDB_RECOVERY_MAGIC) {
1024 /* there is no valid recovery data */
1028 if (tdb->read_only) {
1029 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1030 tdb->ecode = TDB_ERR_CORRUPT;
1034 recovery_eof = rec.key_len;
1036 data = (unsigned char *)malloc(rec.data_len);
1038 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1039 tdb->ecode = TDB_ERR_OOM;
1043 /* read the full recovery data */
1044 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1045 rec.data_len, 0) == -1) {
1046 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1047 tdb->ecode = TDB_ERR_IO;
1051 /* recover the file data */
1053 while (p+8 < data + rec.data_len) {
1059 memcpy(&len, p+4, 4);
1061 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1063 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1064 tdb->ecode = TDB_ERR_IO;
1072 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1073 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1074 tdb->ecode = TDB_ERR_IO;
1078 /* if the recovery area is after the recovered eof then remove it */
1079 if (recovery_eof <= recovery_head) {
1080 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1081 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1082 tdb->ecode = TDB_ERR_IO;
1087 /* remove the recovery magic */
1088 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1090 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1091 tdb->ecode = TDB_ERR_IO;
1095 /* reduce the file size to the old size */
1097 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1098 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1099 tdb->ecode = TDB_ERR_IO;
1102 tdb->map_size = recovery_eof;
1105 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1106 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1107 tdb->ecode = TDB_ERR_IO;
1111 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",