ntdb: make sure file is always a multiple of PAGESIZE (now NTDB_PGSIZE)
authorRusty Russell <rusty@rustcorp.com.au>
Mon, 18 Jun 2012 13:00:29 +0000 (22:30 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Tue, 19 Jun 2012 03:38:06 +0000 (05:38 +0200)
ntdb uses tdb's transaction code, and it has an undocumented but implicit
assumption: that the transaction recovery area is always aligned to the
transaction pagesize.  This means that no block will overlap the recovery
area.

This is maintained by rounding the size of the database up, so do the same
for ntdb.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
lib/ntdb/free.c
lib/ntdb/io.c
lib/ntdb/private.h
lib/ntdb/test/run-30-exhaust-before-expand.c
lib/ntdb/test/run-64-bit-tdb.c
lib/ntdb/transaction.c

index 0fe6c73775473d781f5a9041682ee5239266ee30..f51aa5bc331bfc9bcd9ce45ee789ea9b56fdb517 100644 (file)
@@ -899,9 +899,14 @@ ntdb_off_t ntdb_expand_adjust(ntdb_off_t map_size, ntdb_off_t size)
                new_size = map_size * 1.25;
        }
 
-       /* Round the database up to a multiple of the page size */
        if (new_size < top_size)
                new_size = top_size;
+
+       /* We always make the file a multiple of transaction page
+        * size.  This guarantees that the transaction recovery area
+        * is always aligned, otherwise the transaction code can overwrite
+        * itself. */
+       new_size = (new_size + NTDB_PGSIZE-1) & ~(NTDB_PGSIZE-1);
        return new_size - map_size;
 }
 
@@ -933,10 +938,10 @@ static enum NTDB_ERROR ntdb_expand(struct ntdb_context *ntdb, ntdb_len_t size)
                return NTDB_SUCCESS;
        }
 
+       /* We need room for the record header too. */
+       size = adjust_size(0, sizeof(struct ntdb_used_record) + size);
        /* Overallocate. */
        wanted = ntdb_expand_adjust(old_size, size);
-       /* We need room for the record header too. */
-       wanted = adjust_size(0, sizeof(struct ntdb_used_record) + wanted);
 
        ecode = ntdb->io->expand_file(ntdb, wanted);
        if (ecode != NTDB_SUCCESS) {
index 4580520fa2a22cabc0229a23961af9361884b0c4..e1518062b1b8d09cd51201b5525bec74725bcbc5 100644 (file)
@@ -441,6 +441,7 @@ static enum NTDB_ERROR ntdb_expand_file(struct ntdb_context *ntdb,
        char buf[8192];
        enum NTDB_ERROR ecode;
 
+       assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
        if (ntdb->flags & NTDB_RDONLY) {
                return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
                                  "Expand on read-only database");
index e1c2fdafc463cf922e1f4c0791974e6d35d0e8d6..4e87f15b8f781041cf642df8fd7b30abeeec61f5 100644 (file)
@@ -98,6 +98,10 @@ typedef uint64_t ntdb_off_t;
 #define NTDB_PTR_ERR(p) ((enum NTDB_ERROR)(long)(p))
 #define NTDB_ERR_PTR(err) ((void *)(long)(err))
 
+/* This doesn't really need to be pagesize, but we use it for similar
+ * reasons. */
+#define NTDB_PGSIZE 65536
+
 /* Common case of returning true, false or -ve error. */
 typedef int ntdb_bool_err;
 
index b94bc01bffe4a9b19740b3cee2d95936a174930a..e3831f51a7f09940641f0aeff64acb5535f65757 100644 (file)
@@ -27,11 +27,11 @@ int main(int argc, char *argv[])
                        NTDB_INTERNAL|NTDB_CONVERT, NTDB_CONVERT,
                        NTDB_NOMMAP|NTDB_CONVERT };
 
-       plan_tests(sizeof(flags) / sizeof(flags[0]) * 9 + 1);
+       plan_tests(sizeof(flags) / sizeof(flags[0]) * 11 + 1);
 
        for (i = 0; i < sizeof(flags) / sizeof(flags[0]); i++) {
-               NTDB_DATA k;
-               uint64_t size;
+               NTDB_DATA k, d;
+               uint64_t size, old_size;
                bool was_empty = false;
 
                k.dptr = (void *)&j;
@@ -43,6 +43,8 @@ int main(int argc, char *argv[])
                if (!ntdb)
                        continue;
 
+               old_size = ntdb->file->map_size;
+
                ok1(empty_freetable(ntdb));
                /* Need some hash lock for expand. */
                ok1(ntdb_lock_hashes(ntdb, 0, 1, F_WRLCK, NTDB_LOCK_WAIT) == 0);
@@ -53,8 +55,17 @@ int main(int argc, char *argv[])
                ok1(!empty_freetable(ntdb));
 
                size = ntdb->file->map_size;
-               /* Insert minimal-length records until we expand. */
-               for (j = 0; ntdb->file->map_size == size; j++) {
+
+               /* Create one record to chew up most space. */
+               d.dsize = (size - old_size - 32);
+               d.dptr = malloc(d.dsize);
+               j = 0;
+               ok1(ntdb_store(ntdb, k, d, NTDB_INSERT) == 0);
+               ok1(ntdb->file->map_size == size);
+               free(d.dptr);
+
+               /* Now insert minimal-length records until we expand. */
+               for (j = 1; ntdb->file->map_size == size; j++) {
                        was_empty = empty_freetable(ntdb);
                        if (ntdb_store(ntdb, k, k, NTDB_INSERT) != 0)
                                err(1, "Failed to store record %i", j);
index 6a146cb1cf7361f2c06d1021fe7a90553721a5ab..b36f422a972e65e634d3a64ae11f7283dda3c681 100644 (file)
@@ -2,6 +2,11 @@
 #include "tap-interface.h"
 #include "logging.h"
 
+/* The largest 32-bit value which is still a multiple of NTDB_PGSIZE */
+#define ALMOST_4G ((uint32_t)-NTDB_PGSIZE)
+/* And this pushes it over 32 bits */
+#define A_LITTLE_BIT (NTDB_PGSIZE * 2)
+
 int main(int argc, char *argv[])
 {
        unsigned int i;
@@ -33,13 +38,14 @@ int main(int argc, char *argv[])
                old_size = ntdb->file->map_size;
 
                /* This makes a sparse file */
-               ok1(ftruncate(ntdb->file->fd, 0xFFFFFFF0) == 0);
-               ok1(add_free_record(ntdb, old_size, 0xFFFFFFF0 - old_size,
+               ok1(ftruncate(ntdb->file->fd, ALMOST_4G) == 0);
+               ok1(add_free_record(ntdb, old_size, ALMOST_4G - old_size,
                                    NTDB_LOCK_WAIT, false) == NTDB_SUCCESS);
 
                /* Now add a little record past the 4G barrier. */
-               ok1(ntdb_expand_file(ntdb, 100) == NTDB_SUCCESS);
-               ok1(add_free_record(ntdb, 0xFFFFFFF0, 100, NTDB_LOCK_WAIT, false)
+               ok1(ntdb_expand_file(ntdb, A_LITTLE_BIT) == NTDB_SUCCESS);
+               ok1(add_free_record(ntdb, ALMOST_4G, A_LITTLE_BIT,
+                                   NTDB_LOCK_WAIT, false)
                    == NTDB_SUCCESS);
 
                ok1(ntdb_check(ntdb, NULL, NULL) == NTDB_SUCCESS);
@@ -52,7 +58,7 @@ int main(int argc, char *argv[])
 
                /* Make sure it put it at end as we expected. */
                off = find_and_lock(ntdb, k, F_RDLCK, &h, &rec, NULL);
-               ok1(off >= 0xFFFFFFF0);
+               ok1(off >= ALMOST_4G);
                ntdb_unlock_hashes(ntdb, h.hlock_start, h.hlock_range, F_RDLCK);
 
                ok1(ntdb_fetch(ntdb, k, &d) == 0);
index 05f571e51a84a2a7aa52fa6383c30bab808702f4..c593ee2e71e12f53cb4ab0926017003c8b81f66b 100644 (file)
@@ -25,6 +25,7 @@
 */
 
 #include "private.h"
+#include <assert.h>
 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
 
 /*
@@ -119,9 +120,6 @@ struct ntdb_transaction {
        ntdb_len_t old_map_size;
 };
 
-/* This doesn't really need to be pagesize, but we use it for similar reasons. */
-#define PAGESIZE 65536
-
 /*
   read while in a transaction. We need to check first if the data is in our list
   of transaction elements, then if not do a real read
@@ -133,8 +131,8 @@ static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t of
        enum NTDB_ERROR ecode;
 
        /* break it down into block sized ops */
-       while (len + (off % PAGESIZE) > PAGESIZE) {
-               ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
+       while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
+               ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
                ecode = transaction_read(ntdb, off, buf, len2);
                if (ecode != NTDB_SUCCESS) {
                        return ecode;
@@ -148,7 +146,7 @@ static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t of
                return NTDB_SUCCESS;
        }
 
-       blk = off / PAGESIZE;
+       blk = off / NTDB_PGSIZE;
 
        /* see if we have it in the block list */
        if (ntdb->transaction->num_blocks <= blk ||
@@ -170,7 +168,7 @@ static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t of
        }
 
        /* now copy it out of this block */
-       memcpy(buf, ntdb->transaction->blocks[blk] + (off % PAGESIZE), len);
+       memcpy(buf, ntdb->transaction->blocks[blk] + (off % NTDB_PGSIZE), len);
        return NTDB_SUCCESS;
 
 fail:
@@ -199,8 +197,8 @@ static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t o
        }
 
        /* break it up into block sized chunks */
-       while (len + (off % PAGESIZE) > PAGESIZE) {
-               ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
+       while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
+               ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
                ecode = transaction_write(ntdb, off, buf, len2);
                if (ecode != NTDB_SUCCESS) {
                        return ecode;
@@ -216,8 +214,8 @@ static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t o
                return NTDB_SUCCESS;
        }
 
-       blk = off / PAGESIZE;
-       off = off % PAGESIZE;
+       blk = off / NTDB_PGSIZE;
+       off = off % NTDB_PGSIZE;
 
        if (ntdb->transaction->num_blocks <= blk) {
                uint8_t **new_blocks;
@@ -245,20 +243,20 @@ static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t o
 
        /* allocate and fill a block? */
        if (ntdb->transaction->blocks[blk] == NULL) {
-               ntdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
+               ntdb->transaction->blocks[blk] = (uint8_t *)calloc(NTDB_PGSIZE, 1);
                if (ntdb->transaction->blocks[blk] == NULL) {
                        ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
                                           "transaction_write:"
                                           " failed to allocate");
                        goto fail;
                }
-               if (ntdb->transaction->old_map_size > blk * PAGESIZE) {
-                       ntdb_len_t len2 = PAGESIZE;
-                       if (len2 + (blk * PAGESIZE) > ntdb->transaction->old_map_size) {
-                               len2 = ntdb->transaction->old_map_size - (blk * PAGESIZE);
+               if (ntdb->transaction->old_map_size > blk * NTDB_PGSIZE) {
+                       ntdb_len_t len2 = NTDB_PGSIZE;
+                       if (len2 + (blk * NTDB_PGSIZE) > ntdb->transaction->old_map_size) {
+                               len2 = ntdb->transaction->old_map_size - (blk * NTDB_PGSIZE);
                        }
                        ecode = ntdb->transaction->io_methods->tread(ntdb,
-                                       blk * PAGESIZE,
+                                       blk * NTDB_PGSIZE,
                                        ntdb->transaction->blocks[blk],
                                        len2);
                        if (ecode != NTDB_SUCCESS) {
@@ -307,8 +305,8 @@ static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off
        size_t blk;
 
        /* break it up into block sized chunks */
-       while (len + (off % PAGESIZE) > PAGESIZE) {
-               ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
+       while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
+               ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
                transaction_write_existing(ntdb, off, buf, len2);
                len -= len2;
                off += len2;
@@ -321,8 +319,8 @@ static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off
                return;
        }
 
-       blk = off / PAGESIZE;
-       off = off % PAGESIZE;
+       blk = off / NTDB_PGSIZE;
+       off = off % NTDB_PGSIZE;
 
        if (ntdb->transaction->num_blocks <= blk ||
            ntdb->transaction->blocks[blk] == NULL) {
@@ -367,6 +365,8 @@ static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
 {
        enum NTDB_ERROR ecode;
 
+       assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
+
        /* add a write to the transaction elements, so subsequent
           reads see the zero data */
        ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
@@ -379,10 +379,10 @@ static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
                                size_t len, bool write_mode)
 {
-       size_t blk = off / PAGESIZE, end_blk;
+       size_t blk = off / NTDB_PGSIZE, end_blk;
 
        /* This is wrong for zero-length blocks, but will fail gracefully */
-       end_blk = (off + len - 1) / PAGESIZE;
+       end_blk = (off + len - 1) / NTDB_PGSIZE;
 
        /* Can only do direct if in single block and we've already copied. */
        if (write_mode) {
@@ -393,7 +393,7 @@ static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
                        ntdb->stats.transaction_write_direct_fail++;
                        return NULL;
                }
-               return ntdb->transaction->blocks[blk] + off % PAGESIZE;
+               return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
        }
 
        ntdb->stats.transaction_read_direct++;
@@ -401,7 +401,7 @@ static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
        if (blk == end_blk
            && blk < ntdb->transaction->num_blocks
            && ntdb->transaction->blocks[blk])
-               return ntdb->transaction->blocks[blk] + off % PAGESIZE;
+               return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
 
        /* Otherwise must be all not copied. */
        while (blk <= end_blk) {
@@ -624,7 +624,7 @@ static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
 
        recovery_size = 0;
        for (i=0;i<ntdb->transaction->num_blocks;i++) {
-               if (i * PAGESIZE >= ntdb->transaction->old_map_size) {
+               if (i * NTDB_PGSIZE >= ntdb->transaction->old_map_size) {
                        break;
                }
                if (ntdb->transaction->blocks[i] == NULL) {
@@ -634,7 +634,7 @@ static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
                if (i == ntdb->transaction->num_blocks-1) {
                        recovery_size += ntdb->transaction->last_block_size;
                } else {
-                       recovery_size += PAGESIZE;
+                       recovery_size += NTDB_PGSIZE;
                }
        }
 
@@ -746,8 +746,8 @@ static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
                        continue;
                }
 
-               offset = i * PAGESIZE;
-               length = PAGESIZE;
+               offset = i * NTDB_PGSIZE;
+               length = NTDB_PGSIZE;
                if (i == ntdb->transaction->num_blocks-1) {
                        length = ntdb->transaction->last_block_size;
                }
@@ -822,8 +822,8 @@ static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
        rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
 
        /* Round up to a page. */
-       rec->max_len = ((sizeof(*rec) + rec->max_len + PAGESIZE-1)
-                       & ~(PAGESIZE-1))
+       rec->max_len = ((sizeof(*rec) + rec->max_len + NTDB_PGSIZE-1)
+                       & ~(NTDB_PGSIZE-1))
                - sizeof(*rec);
 
        off = ntdb->file->map_size;
@@ -1101,8 +1101,8 @@ _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
                        continue;
                }
 
-               offset = i * PAGESIZE;
-               length = PAGESIZE;
+               offset = i * NTDB_PGSIZE;
+               length = NTDB_PGSIZE;
                if (i == ntdb->transaction->num_blocks-1) {
                        length = ntdb->transaction->last_block_size;
                }