tdb: cleanup: tdb_have_extra_locks() helper
[sahlberg/ctdb.git] / lib / tdb / common / transaction.c
index f8a788f8573620d942f71fb736293ae57489afd8..36b5c035b7f8280acaed3da6902749228590dd36 100644 (file)
@@ -20,8 +20,7 @@
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
-   License along with this library; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
 */
 
 #include "tdb_private.h"
@@ -77,7 +76,7 @@
     to reduce this to 3 or even 2 with some more work.
 
   - check for a valid recovery record on open of the tdb, while the
-    global lock is held. Automatically recover from the transaction
+    open lock is held. Automatically recover from the transaction
     recovery area if needed, then continue with the open as
     usual. This allows for smooth crash recovery with no administrator
     intervention.
     still available, but no transaction recovery area is used and no
     fsync/msync calls are made.
 
+  - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
+    tdb_add_flags() transaction nesting is enabled.
+    It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
+    The default is that transaction nesting is allowed.
+    Note: this default may change in future versions of tdb.
+
+    Beware. when transactions are nested a transaction successfully
+    completed with tdb_transaction_commit() can be silently unrolled later.
+
+  - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
+    tdb_add_flags() transaction nesting is disabled.
+    It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
+    An attempt create a nested transaction will fail with TDB_ERR_NESTING.
+    The default is that transaction nesting is allowed.
+    Note: this default may change in future versions of tdb.
 */
 
-struct tdb_transaction_el {
-       struct tdb_transaction_el *next, *prev;
-       tdb_off_t offset;
-       tdb_len_t length;
-       unsigned char *data;
-};
 
 /*
   hold the context of any current transaction
@@ -101,17 +109,17 @@ struct tdb_transaction_el {
 struct tdb_transaction {
        /* we keep a mirrored copy of the tdb hash heads here so
           tdb_next_hash_chain() can operate efficiently */
-       u32 *hash_heads;
+       uint32_t *hash_heads;
 
        /* the original io methods - used to do IOs to the real db */
        const struct tdb_methods *io_methods;
 
-       /* the list of transaction elements. We use a doubly linked
-          list with a last pointer to allow us to keep the list
-          ordered, with first element at the front of the list. It
-          needs to be doubly linked as the read/write traversals need
-          to be backwards, while the commit needs to be forwards */
-       struct tdb_transaction_el *elements, *elements_last;
+       /* the list of transaction blocks. When a block is first
+          written to, it gets created in this list */
+       uint8_t **blocks;
+       uint32_t num_blocks;
+       uint32_t block_size;      /* bytes in each block */
+       uint32_t last_block_size; /* number of valid bytes in the last block */
 
        /* non-zero when an internal transaction error has
           occurred. All write operations will then fail until the
@@ -123,8 +131,18 @@ struct tdb_transaction {
           but don't create a new transaction */
        int nesting;
 
+       /* set when a prepare has already occurred */
+       bool prepared;
+       tdb_off_t magic_offset;
+
+       /* set when the OPEN_LOCK has been taken */
+       bool open_lock_taken;
+
        /* old file size before transaction */
        tdb_len_t old_map_size;
+
+       /* we should re-pack on commit */
+       bool need_repack;
 };
 
 
@@ -135,52 +153,48 @@ struct tdb_transaction {
 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
                            tdb_len_t len, int cv)
 {
-       struct tdb_transaction_el *el;
-
-       /* we need to walk the list backwards to get the most recent data */
-       for (el=tdb->transaction->elements_last;el;el=el->prev) {
-               tdb_len_t partial;
+       uint32_t blk;
 
-               if (off+len <= el->offset) {
-                       continue;
-               }
-               if (off >= el->offset + el->length) {
-                       continue;
+       /* break it down into block sized ops */
+       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
+               tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+               if (transaction_read(tdb, off, buf, len2, cv) != 0) {
+                       return -1;
                }
+               len -= len2;
+               off += len2;
+               buf = (void *)(len2 + (char *)buf);
+       }
 
-               /* an overlapping read - needs to be split into up to
-                  2 reads and a memcpy */
-               if (off < el->offset) {
-                       partial = el->offset - off;
-                       if (transaction_read(tdb, off, buf, partial, cv) != 0) {
-                               goto fail;
-                       }
-                       len -= partial;
-                       off += partial;
-                       buf = (void *)(partial + (char *)buf);
-               }
-               if (off + len <= el->offset + el->length) {
-                       partial = len;
-               } else {
-                       partial = el->offset + el->length - off;
-               }
-               memcpy(buf, el->data + (off - el->offset), partial);
-               if (cv) {
-                       tdb_convert(buf, len);
-               }
-               len -= partial;
-               off += partial;
-               buf = (void *)(partial + (char *)buf);
-               
-               if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
+       if (len == 0) {
+               return 0;
+       }
+
+       blk = off / tdb->transaction->block_size;
+
+       /* see if we have it in the block list */
+       if (tdb->transaction->num_blocks <= blk ||
+           tdb->transaction->blocks[blk] == NULL) {
+               /* nope, do a real read */
+               if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
                        goto fail;
                }
-
                return 0;
        }
 
-       /* its not in the transaction elements - do a real read */
-       return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
+       /* it is in the block list. Now check for the last block */
+       if (blk == tdb->transaction->num_blocks-1) {
+               if (len > tdb->transaction->last_block_size) {
+                       goto fail;
+               }
+       }
+       
+       /* now copy it out of this block */
+       memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
+       if (cv) {
+               tdb_convert(buf, len);
+       }
+       return 0;
 
 fail:
        TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
@@ -196,130 +210,169 @@ fail:
 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
                             const void *buf, tdb_len_t len)
 {
-       struct tdb_transaction_el *el, *best_el=NULL;
+       uint32_t blk;
 
-       if (len == 0) {
-               return 0;
+       /* Only a commit is allowed on a prepared transaction */
+       if (tdb->transaction->prepared) {
+               tdb->ecode = TDB_ERR_EINVAL;
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
+               tdb->transaction->transaction_error = 1;
+               return -1;
        }
-       
+
        /* if the write is to a hash head, then update the transaction
           hash heads */
        if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
            off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
-               u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
+               uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
                memcpy(&tdb->transaction->hash_heads[chain], buf, len);
        }
 
-       /* first see if we can replace an existing entry */
-       for (el=tdb->transaction->elements_last;el;el=el->prev) {
-               tdb_len_t partial;
-
-               if (best_el == NULL && off == el->offset+el->length) {
-                       best_el = el;
-               }
-
-               if (off+len <= el->offset) {
-                       continue;
+       /* break it up into block sized chunks */
+       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
+               tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+               if (transaction_write(tdb, off, buf, len2) != 0) {
+                       return -1;
                }
-               if (off >= el->offset + el->length) {
-                       continue;
+               len -= len2;
+               off += len2;
+               if (buf != NULL) {
+                       buf = (const void *)(len2 + (const char *)buf);
                }
+       }
 
-               /* an overlapping write - needs to be split into up to
-                  2 writes and a memcpy */
-               if (off < el->offset) {
-                       partial = el->offset - off;
-                       if (transaction_write(tdb, off, buf, partial) != 0) {
-                               goto fail;
-                       }
-                       len -= partial;
-                       off += partial;
-                       buf = (const void *)(partial + (const char *)buf);
-               }
-               if (off + len <= el->offset + el->length) {
-                       partial = len;
+       if (len == 0) {
+               return 0;
+       }
+
+       blk = off / tdb->transaction->block_size;
+       off = off % tdb->transaction->block_size;
+
+       if (tdb->transaction->num_blocks <= blk) {
+               uint8_t **new_blocks;
+               /* expand the blocks array */
+               if (tdb->transaction->blocks == NULL) {
+                       new_blocks = (uint8_t **)malloc(
+                               (blk+1)*sizeof(uint8_t *));
                } else {
-                       partial = el->offset + el->length - off;
+                       new_blocks = (uint8_t **)realloc(
+                               tdb->transaction->blocks,
+                               (blk+1)*sizeof(uint8_t *));
                }
-               memcpy(el->data + (off - el->offset), buf, partial);
-               len -= partial;
-               off += partial;
-               buf = (const void *)(partial + (const char *)buf);
-               
-               if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
+               if (new_blocks == NULL) {
+                       tdb->ecode = TDB_ERR_OOM;
                        goto fail;
                }
-
-               return 0;
+               memset(&new_blocks[tdb->transaction->num_blocks], 0, 
+                      (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
+               tdb->transaction->blocks = new_blocks;
+               tdb->transaction->num_blocks = blk+1;
+               tdb->transaction->last_block_size = 0;
        }
 
-       /* see if we can append the new entry to an existing entry */
-       if (best_el && best_el->offset + best_el->length == off && 
-           (off+len < tdb->transaction->old_map_size ||
-            off > tdb->transaction->old_map_size)) {
-               unsigned char *data = best_el->data;
-               el = best_el;
-               el->data = (unsigned char *)realloc(el->data,
-                                                   el->length + len);
-               if (el->data == NULL) {
+       /* allocate and fill a block? */
+       if (tdb->transaction->blocks[blk] == NULL) {
+               tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
+               if (tdb->transaction->blocks[blk] == NULL) {
                        tdb->ecode = TDB_ERR_OOM;
                        tdb->transaction->transaction_error = 1;
-                       el->data = data;
-                       return -1;
+                       return -1;                      
                }
-               if (buf) {
-                       memcpy(el->data + el->length, buf, len);
-               } else {
-                       memset(el->data + el->length, TDB_PAD_BYTE, len);
+               if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
+                       tdb_len_t len2 = tdb->transaction->block_size;
+                       if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
+                               len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
+                       }
+                       if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
+                                                                  tdb->transaction->blocks[blk], 
+                                                                  len2, 0) != 0) {
+                               SAFE_FREE(tdb->transaction->blocks[blk]);                               
+                               tdb->ecode = TDB_ERR_IO;
+                               goto fail;
+                       }
+                       if (blk == tdb->transaction->num_blocks-1) {
+                               tdb->transaction->last_block_size = len2;
+                       }                       
                }
-               el->length += len;
-               return 0;
        }
-
-       /* add a new entry at the end of the list */
-       el = (struct tdb_transaction_el *)malloc(sizeof(*el));
-       if (el == NULL) {
-               tdb->ecode = TDB_ERR_OOM;
-               tdb->transaction->transaction_error = 1;                
-               return -1;
-       }
-       el->next = NULL;
-       el->prev = tdb->transaction->elements_last;
-       el->offset = off;
-       el->length = len;
-       el->data = (unsigned char *)malloc(len);
-       if (el->data == NULL) {
-               free(el);
-               tdb->ecode = TDB_ERR_OOM;
-               tdb->transaction->transaction_error = 1;                
-               return -1;
-       }
-       if (buf) {
-               memcpy(el->data, buf, len);
+       
+       /* overwrite part of an existing block */
+       if (buf == NULL) {
+               memset(tdb->transaction->blocks[blk] + off, 0, len);
        } else {
-               memset(el->data, TDB_PAD_BYTE, len);
+               memcpy(tdb->transaction->blocks[blk] + off, buf, len);
        }
-       if (el->prev) {
-               el->prev->next = el;
-       } else {
-               tdb->transaction->elements = el;
+       if (blk == tdb->transaction->num_blocks-1) {
+               if (len + off > tdb->transaction->last_block_size) {
+                       tdb->transaction->last_block_size = len + off;
+               }
        }
-       tdb->transaction->elements_last = el;
+
        return 0;
 
 fail:
-       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
-       tdb->ecode = TDB_ERR_IO;
+       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
+                (blk*tdb->transaction->block_size) + off, len));
        tdb->transaction->transaction_error = 1;
        return -1;
 }
 
+
+/*
+  write while in a transaction - this varient never expands the transaction blocks, it only
+  updates existing blocks. This means it cannot change the recovery size
+*/
+static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
+                                     const void *buf, tdb_len_t len)
+{
+       uint32_t blk;
+
+       /* break it up into block sized chunks */
+       while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
+               tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
+               if (transaction_write_existing(tdb, off, buf, len2) != 0) {
+                       return -1;
+               }
+               len -= len2;
+               off += len2;
+               if (buf != NULL) {
+                       buf = (const void *)(len2 + (const char *)buf);
+               }
+       }
+
+       if (len == 0) {
+               return 0;
+       }
+
+       blk = off / tdb->transaction->block_size;
+       off = off % tdb->transaction->block_size;
+
+       if (tdb->transaction->num_blocks <= blk ||
+           tdb->transaction->blocks[blk] == NULL) {
+               return 0;
+       }
+
+       if (blk == tdb->transaction->num_blocks-1 &&
+           off + len > tdb->transaction->last_block_size) {
+               if (off >= tdb->transaction->last_block_size) {
+                       return 0;
+               }
+               len = tdb->transaction->last_block_size - off;
+       }
+
+       /* overwrite part of an existing block */
+       memcpy(tdb->transaction->blocks[blk] + off, buf, len);
+
+       return 0;
+}
+
+
 /*
   accelerated hash chain head search, using the cached hash heads
 */
-static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
+static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
 {
-       u32 h = *chain;
+       uint32_t h = *chain;
        for (;h < tdb->header.hash_size;h++) {
                /* the +1 takes account of the freelist */
                if (0 != tdb->transaction->hash_heads[h+1]) {
@@ -337,7 +390,8 @@ static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
        if (len <= tdb->map_size) {
                return 0;
        }
-       return TDB_ERRCODE(TDB_ERR_IO, -1);
+       tdb->ecode = TDB_ERR_IO;
+       return -1;
 }
 
 /*
@@ -352,14 +406,23 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
                return -1;
        }
 
+       tdb->transaction->need_repack = true;
+
        return 0;
 }
 
 /*
   brlock during a transaction - ignore them
 */
-static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
-                             int rw_type, int lck_type, int probe, size_t len)
+static int transaction_brlock(struct tdb_context *tdb,
+                             int rw_type, tdb_off_t offset, size_t len,
+                             enum tdb_lock_flags flags)
+{
+       return 0;
+}
+
+static int transaction_brunlock(struct tdb_context *tdb,
+                               int rw_type, tdb_off_t offset, size_t len)
 {
        return 0;
 }
@@ -370,7 +433,8 @@ static const struct tdb_methods transaction_methods = {
        transaction_next_hash_chain,
        transaction_oob,
        transaction_expand_file,
-       transaction_brlock
+       transaction_brlock,
+       transaction_brunlock
 };
 
 
@@ -389,13 +453,17 @@ int tdb_transaction_start(struct tdb_context *tdb)
 
        /* cope with nested tdb_transaction_start() calls */
        if (tdb->transaction != NULL) {
+               if (!(tdb->flags & TDB_ALLOW_NESTING)) {
+                       tdb->ecode = TDB_ERR_NESTING;
+                       return -1;
+               }
                tdb->transaction->nesting++;
                TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
                         tdb->transaction->nesting));
                return 0;
        }
 
-       if (tdb->num_locks != 0 || tdb->global_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                /* the caller must not have any locks when starting a
                   transaction as otherwise we'll be screwed by lack
                   of nested locks in posix */
@@ -420,17 +488,21 @@ int tdb_transaction_start(struct tdb_context *tdb)
                return -1;
        }
 
+       /* a page at a time seems like a reasonable compromise between compactness and efficiency */
+       tdb->transaction->block_size = tdb->page_size;
+
        /* get the transaction write lock. This is a blocking lock. As
           discussed with Volker, there are a number of ways we could
           make this async, which we will probably do in the future */
        if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
+               SAFE_FREE(tdb->transaction->blocks);
                SAFE_FREE(tdb->transaction);
                return -1;
        }
        
        /* get a read lock from the freelist to the end of file. This
           is upgraded to a write lock during the commit */
-       if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
+       if (tdb_brlock(tdb, F_RDLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
                tdb->ecode = TDB_ERR_LOCK;
                goto fail;
@@ -438,8 +510,8 @@ int tdb_transaction_start(struct tdb_context *tdb)
 
        /* setup a copy of the hash table heads so the hash scan in
           traverse can be fast */
-       tdb->transaction->hash_heads = (u32 *)
-               calloc(tdb->header.hash_size+1, sizeof(u32));
+       tdb->transaction->hash_heads = (uint32_t *)
+               calloc(tdb->header.hash_size+1, sizeof(uint32_t));
        if (tdb->transaction->hash_heads == NULL) {
                tdb->ecode = TDB_ERR_OOM;
                goto fail;
@@ -461,21 +533,14 @@ int tdb_transaction_start(struct tdb_context *tdb)
        tdb->transaction->io_methods = tdb->methods;
        tdb->methods = &transaction_methods;
 
-       /* by calling this transaction write here, we ensure that we don't grow the
-          transaction linked list due to hash table updates */
-       if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
-                             TDB_HASHTABLE_SIZE(tdb)) != 0) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
-               tdb->ecode = TDB_ERR_IO;
-               tdb->methods = tdb->transaction->io_methods;
-               goto fail;
-       }
-
+       /* Trace at the end, so we get sequence number correct. */
+       tdb_trace(tdb, "tdb_transaction_start");
        return 0;
        
 fail:
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_brunlock(tdb, F_RDLCK, FREELIST_TOP, 0);
+       tdb_transaction_unlock(tdb, F_WRLCK);
+       SAFE_FREE(tdb->transaction->blocks);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
        return -1;
@@ -483,10 +548,40 @@ fail:
 
 
 /*
-  cancel the current transaction
+  sync to disk
 */
-int tdb_transaction_cancel(struct tdb_context *tdb)
+static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
 {      
+       if (tdb->flags & TDB_NOSYNC) {
+               return 0;
+       }
+
+       if (fdatasync(tdb->fd) != 0) {
+               tdb->ecode = TDB_ERR_IO;
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
+               return -1;
+       }
+#ifdef HAVE_MMAP
+       if (tdb->map_ptr) {
+               tdb_off_t moffset = offset & ~(tdb->page_size-1);
+               if (msync(moffset + (char *)tdb->map_ptr, 
+                         length + (offset - moffset), MS_SYNC) != 0) {
+                       tdb->ecode = TDB_ERR_IO;
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
+                                strerror(errno)));
+                       return -1;
+               }
+       }
+#endif
+       return 0;
+}
+
+
+/* ltype is F_WRLCK after prepare. */
+static int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype)
+{      
+       int i, ret = 0;
+
        if (tdb->transaction == NULL) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
                return -1;
@@ -500,26 +595,43 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
 
        tdb->map_size = tdb->transaction->old_map_size;
 
-       /* free all the transaction elements */
-       while (tdb->transaction->elements) {
-               struct tdb_transaction_el *el = tdb->transaction->elements;
-               tdb->transaction->elements = el->next;
-               free(el->data);
-               free(el);
+       /* free all the transaction blocks */
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               if (tdb->transaction->blocks[i] != NULL) {
+                       free(tdb->transaction->blocks[i]);
+               }
+       }
+       SAFE_FREE(tdb->transaction->blocks);
+
+       if (tdb->transaction->magic_offset) {
+               const struct tdb_methods *methods = tdb->transaction->io_methods;
+               const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
+
+               /* remove the recovery marker */
+               if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
+               transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
+                       ret = -1;
+               }
+       }
+
+       if (tdb->transaction->open_lock_taken) {
+               tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
+               tdb->transaction->open_lock_taken = false;
        }
 
        /* remove any global lock created during the transaction */
-       if (tdb->global_lock.count != 0) {
-               tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
-               tdb->global_lock.count = 0;
+       if (tdb->allrecord_lock.count != 0) {
+               tdb_brunlock(tdb, tdb->allrecord_lock.ltype,
+                            FREELIST_TOP, 4*tdb->header.hash_size);
+               tdb->allrecord_lock.count = 0;
        }
 
        /* remove any locks created during the transaction */
        if (tdb->num_locks != 0) {
-               int i;
                for (i=0;i<tdb->num_lockrecs;i++) {
-                       tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
-                                  F_UNLCK,F_SETLKW, 0, 1);
+                       tdb_brunlock(tdb, tdb->lockrecs[i].ltype,
+                                    tdb->lockrecs[i].off, 1);
                }
                tdb->num_locks = 0;
                tdb->num_lockrecs = 0;
@@ -529,55 +641,49 @@ int tdb_transaction_cancel(struct tdb_context *tdb)
        /* restore the normal io methods */
        tdb->methods = tdb->transaction->io_methods;
 
-       tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
-       tdb_transaction_unlock(tdb);
+       tdb_brunlock(tdb, ltype, FREELIST_TOP, 0);
+       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
        
-       return 0;
+       return ret;
 }
 
 /*
-  sync to disk
+  cancel the current transaction
 */
-static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
-{      
-       if (fsync(tdb->fd) != 0) {
-               tdb->ecode = TDB_ERR_IO;
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
-               return -1;
-       }
-#ifdef MS_SYNC
-       if (tdb->map_ptr) {
-               tdb_off_t moffset = offset & ~(tdb->page_size-1);
-               if (msync(moffset + (char *)tdb->map_ptr, 
-                         length + (offset - moffset), MS_SYNC) != 0) {
-                       tdb->ecode = TDB_ERR_IO;
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
-                                strerror(errno)));
-                       return -1;
-               }
-       }
-#endif
-       return 0;
+int tdb_transaction_cancel(struct tdb_context *tdb)
+{
+       int ltype = F_RDLCK;
+       tdb_trace(tdb, "tdb_transaction_cancel");
+       if (tdb->transaction && tdb->transaction->prepared)
+               ltype = F_WRLCK;
+       return _tdb_transaction_cancel(tdb, ltype);
 }
 
-
 /*
   work out how much space the linearised recovery data will consume
 */
 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
 {
-       struct tdb_transaction_el *el;
        tdb_len_t recovery_size = 0;
+       int i;
 
-       recovery_size = sizeof(u32);
-       for (el=tdb->transaction->elements;el;el=el->next) {
-               if (el->offset >= tdb->transaction->old_map_size) {
+       recovery_size = sizeof(uint32_t);
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
+                       break;
+               }
+               if (tdb->transaction->blocks[i] == NULL) {
                        continue;
                }
-               recovery_size += 2*sizeof(tdb_off_t) + el->length;
-       }
+               recovery_size += 2*sizeof(tdb_off_t);
+               if (i == tdb->transaction->num_blocks-1) {
+                       recovery_size += tdb->transaction->last_block_size;
+               } else {
+                       recovery_size += tdb->transaction->block_size;
+               }
+       }       
 
        return recovery_size;
 }
@@ -591,7 +697,7 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
                                 tdb_off_t *recovery_offset,
                                 tdb_len_t *recovery_max_size)
 {
-       struct list_struct rec;
+       struct tdb_record rec;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
        tdb_off_t recovery_head;
 
@@ -602,10 +708,16 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
 
        rec.rec_len = 0;
 
-       if (recovery_head != 0 && 
-           methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
-               return -1;
+       if (recovery_head != 0) {
+               if (methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
+                       return -1;
+               }
+               /* ignore invalid recovery regions: can happen in crash */
+               if (rec.magic != TDB_RECOVERY_MAGIC &&
+                   rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
+                       recovery_head = 0;
+               }
        }
 
        *recovery_size = tdb_recovery_size(tdb);
@@ -659,6 +771,10 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
                return -1;
        }
+       if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
+               return -1;
+       }
 
        return 0;
 }
@@ -670,14 +786,14 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
 static int transaction_setup_recovery(struct tdb_context *tdb, 
                                      tdb_off_t *magic_offset)
 {
-       struct tdb_transaction_el *el;
        tdb_len_t recovery_size;
        unsigned char *data, *p;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
-       struct list_struct *rec;
+       struct tdb_record *rec;
        tdb_off_t recovery_offset, recovery_max_size;
        tdb_off_t old_map_size = tdb->transaction->old_map_size;
-       u32 magic, tailer;
+       uint32_t magic, tailer;
+       int i;
 
        /*
          check that the recovery area has enough space
@@ -693,10 +809,10 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
                return -1;
        }
 
-       rec = (struct list_struct *)data;
+       rec = (struct tdb_record *)data;
        memset(rec, 0, sizeof(*rec));
 
-       rec->magic    = 0;
+       rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
        rec->data_len = recovery_size;
        rec->rec_len  = recovery_max_size;
        rec->key_len  = old_map_size;
@@ -705,30 +821,43 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
        p = data + sizeof(*rec);
-       for (el=tdb->transaction->elements;el;el=el->next) {
-               if (el->offset >= old_map_size) {
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               tdb_off_t offset;
+               tdb_len_t length;
+
+               if (tdb->transaction->blocks[i] == NULL) {
+                       continue;
+               }
+
+               offset = i * tdb->transaction->block_size;
+               length = tdb->transaction->block_size;
+               if (i == tdb->transaction->num_blocks-1) {
+                       length = tdb->transaction->last_block_size;
+               }
+               
+               if (offset >= old_map_size) {
                        continue;
                }
-               if (el->offset + el->length > tdb->transaction->old_map_size) {
+               if (offset + length > tdb->transaction->old_map_size) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
                        free(data);
                        tdb->ecode = TDB_ERR_CORRUPT;
                        return -1;
                }
-               memcpy(p, &el->offset, 4);
-               memcpy(p+4, &el->length, 4);
+               memcpy(p, &offset, 4);
+               memcpy(p+4, &length, 4);
                if (DOCONV()) {
                        tdb_convert(p, 8);
                }
                /* the recovery area contains the old data, not the
                   new data, so we have to call the original tdb_read
                   method to get it */
-               if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
+               if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
                        free(data);
                        tdb->ecode = TDB_ERR_IO;
                        return -1;
                }
-               p += 8 + el->length;
+               p += 8 + length;
        }
 
        /* and the tailer */
@@ -743,6 +872,12 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
                tdb->ecode = TDB_ERR_IO;
                return -1;
        }
+       if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
+               free(data);
+               tdb->ecode = TDB_ERR_IO;
+               return -1;
+       }
 
        /* as we don't have ordered writes, we have to sync the recovery
           data before we update the magic to indicate that the recovery
@@ -757,13 +892,18 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        magic = TDB_RECOVERY_MAGIC;
        CONVERT(magic);
 
-       *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
+       *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
 
        if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
                tdb->ecode = TDB_ERR_IO;
                return -1;
        }
+       if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
+               tdb->ecode = TDB_ERR_IO;
+               return -1;
+       }
 
        /* ensure the recovery magic marker is on disk */
        if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
@@ -773,35 +913,36 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        return 0;
 }
 
-/*
-  commit the current transaction
-*/
-int tdb_transaction_commit(struct tdb_context *tdb)
+static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 {      
        const struct tdb_methods *methods;
-       tdb_off_t magic_offset = 0;
-       u32 zero = 0;
 
        if (tdb->transaction == NULL) {
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
+               return -1;
+       }
+
+       if (tdb->transaction->prepared) {
+               tdb->ecode = TDB_ERR_EINVAL;
+               _tdb_transaction_cancel(tdb, F_WRLCK);
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
                return -1;
        }
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               tdb_transaction_cancel(tdb);
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
+               _tdb_transaction_cancel(tdb, F_RDLCK);
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
                return -1;
        }
 
+
        if (tdb->transaction->nesting != 0) {
-               tdb->transaction->nesting--;
                return 0;
        }               
 
        /* check for a null transaction */
-       if (tdb->transaction->elements == NULL) {
-               tdb_transaction_cancel(tdb);
+       if (tdb->transaction->blocks == NULL) {
                return 0;
        }
 
@@ -809,60 +950,130 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        
        /* if there are any locks pending then the caller has not
           nested their locks properly, so fail the transaction */
-       if (tdb->num_locks || tdb->global_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                tdb->ecode = TDB_ERR_LOCK;
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
-               tdb_transaction_cancel(tdb);
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                return -1;
        }
 
        /* upgrade the main transaction lock region to a write lock */
        if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
                tdb->ecode = TDB_ERR_LOCK;
-               tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_RDLCK);
                return -1;
        }
 
-       /* get the global lock - this prevents new users attaching to the database
+       /* get the open lock - this prevents new users attaching to the database
           during the commit */
-       if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
+       if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
                tdb->ecode = TDB_ERR_LOCK;
-               tdb_transaction_cancel(tdb);
+               _tdb_transaction_cancel(tdb, F_WRLCK);
                return -1;
        }
 
+       tdb->transaction->open_lock_taken = true;
+
        if (!(tdb->flags & TDB_NOSYNC)) {
                /* write the recovery data to the end of the file */
-               if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-                       tdb_transaction_cancel(tdb);
+               if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
                        return -1;
                }
        }
 
+       tdb->transaction->prepared = true;
+
        /* expand the file to the new size if needed */
        if (tdb->map_size != tdb->transaction->old_map_size) {
                if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
                                             tdb->map_size - 
                                             tdb->transaction->old_map_size) == -1) {
                        tdb->ecode = TDB_ERR_IO;
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-                       tdb_transaction_cancel(tdb);
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
                        return -1;
                }
                tdb->map_size = tdb->transaction->old_map_size;
                methods->tdb_oob(tdb, tdb->map_size + 1, 1);
        }
 
+       /* Keep the open lock until the actual commit */
+
+       return 0;
+}
+
+/*
+   prepare to commit the current transaction
+*/
+int tdb_transaction_prepare_commit(struct tdb_context *tdb)
+{      
+       tdb_trace(tdb, "tdb_transaction_prepare_commit");
+       return _tdb_transaction_prepare_commit(tdb);
+}
+
+/*
+  commit the current transaction
+*/
+int tdb_transaction_commit(struct tdb_context *tdb)
+{      
+       const struct tdb_methods *methods;
+       int i;
+       bool need_repack;
+
+       if (tdb->transaction == NULL) {
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
+               return -1;
+       }
+
+       tdb_trace(tdb, "tdb_transaction_commit");
+
+       if (tdb->transaction->transaction_error) {
+               tdb->ecode = TDB_ERR_IO;
+               _tdb_transaction_cancel(tdb, F_RDLCK);
+               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
+               return -1;
+       }
+
+
+       if (tdb->transaction->nesting != 0) {
+               tdb->transaction->nesting--;
+               return 0;
+       }
+
+       /* check for a null transaction */
+       if (tdb->transaction->blocks == NULL) {
+               _tdb_transaction_cancel(tdb, F_RDLCK);
+               return 0;
+       }
+
+       if (!tdb->transaction->prepared) {
+               int ret = _tdb_transaction_prepare_commit(tdb);
+               if (ret)
+                       return ret;
+       }
+
+       methods = tdb->transaction->io_methods;
+
        /* perform all the writes */
-       while (tdb->transaction->elements) {
-               struct tdb_transaction_el *el = tdb->transaction->elements;
+       for (i=0;i<tdb->transaction->num_blocks;i++) {
+               tdb_off_t offset;
+               tdb_len_t length;
+
+               if (tdb->transaction->blocks[i] == NULL) {
+                       continue;
+               }
+
+               offset = i * tdb->transaction->block_size;
+               length = tdb->transaction->block_size;
+               if (i == tdb->transaction->num_blocks-1) {
+                       length = tdb->transaction->last_block_size;
+               }
 
-               if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
+               if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
                        
                        /* we've overwritten part of the data and
@@ -871,37 +1082,22 @@ int tdb_transaction_commit(struct tdb_context *tdb)
                        tdb->methods = methods;
                        tdb_transaction_recover(tdb); 
 
-                       tdb_transaction_cancel(tdb);
-                       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+                       _tdb_transaction_cancel(tdb, F_WRLCK);
 
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
                        return -1;
                }
-               tdb->transaction->elements = el->next;
-               free(el->data); 
-               free(el);
+               SAFE_FREE(tdb->transaction->blocks[i]);
        } 
 
-       if (!(tdb->flags & TDB_NOSYNC)) {
-               /* ensure the new data is on disk */
-               if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
-                       return -1;
-               }
+       SAFE_FREE(tdb->transaction->blocks);
+       tdb->transaction->num_blocks = 0;
 
-               /* remove the recovery marker */
-               if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
-                       return -1;
-               }
-
-               /* ensure the recovery marker has been removed on disk */
-               if (transaction_sync(tdb, magic_offset, 4) == -1) {
-                       return -1;
-               }
+       /* ensure the new data is on disk */
+       if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
+               return -1;
        }
 
-       tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-
        /*
          TODO: maybe write to some dummy hdr field, or write to magic
          offset without mmap, before the last sync, instead of the
@@ -917,24 +1113,31 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        utime(tdb->name, NULL);
 #endif
 
+       need_repack = tdb->transaction->need_repack;
+
        /* use a transaction cancel to free memory and remove the
           transaction locks */
-       tdb_transaction_cancel(tdb);
+       _tdb_transaction_cancel(tdb, F_WRLCK);
+
+       if (need_repack) {
+               return tdb_repack(tdb);
+       }
+
        return 0;
 }
 
 
 /*
   recover from an aborted transaction. Must be called with exclusive
-  database write access already established (including the global
+  database write access already established (including the open
   lock to prevent new processes attaching)
 */
 int tdb_transaction_recover(struct tdb_context *tdb)
 {
        tdb_off_t recovery_head, recovery_eof;
        unsigned char *data, *p;
-       u32 zero = 0;
-       struct list_struct rec;
+       uint32_t zero = 0;
+       struct tdb_record rec;
 
        /* find the recovery area */
        if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
@@ -987,7 +1190,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        /* recover the file data */
        p = data;
        while (p+8 < data + rec.data_len) {
-               u32 ofs, len;
+               uint32_t ofs, len;
                if (DOCONV()) {
                        tdb_convert(p, 8);
                }
@@ -1021,7 +1224,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        }
 
        /* remove the recovery magic */
-       if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
+       if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
                          &zero) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
                tdb->ecode = TDB_ERR_IO;