libctdb: always check header hasn't changed on local tdb
authorRusty Russell <rusty@rustcorp.com.au>
Tue, 8 Jun 2010 08:40:36 +0000 (18:10 +0930)
committerRusty Russell <rusty@rustcorp.com.au>
Tue, 8 Jun 2010 08:40:36 +0000 (18:10 +0930)
The code on which this is based could alter the header: a normal client
can't.  If we use this differently later we can change this.  For the
moment it's a nice extra check.

We optimize out the record write altogether when the record hasn't
changed, rather than just suppressing the seqnum update.

Signed-off-by: Rusty Russell <rusty@rustcorp.com.au>
libctdb/ctdb.c
libctdb/local_tdb.c

index 4a74bf7aed9381343b77eb7c398c0629c427cd1f..8b248f18ec8e7983ed5eabfa9dae6d99f5a99554 100644 (file)
@@ -826,5 +826,30 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
                return false;
        }
 
-       return ctdb_local_store(ctdb_db->tdb, lock->key, lock->hdr, data) == 0;
+       switch (ctdb_local_store(ctdb_db->tdb, lock->key, lock->hdr, data)) {
+       case 0:
+               DEBUG(ctdb_db->ctdb, LOG_DEBUG,
+                     "ctdb_writerecord: optimized away noop write.");
+               /* fall thru */
+       case 1:
+               return true;
+
+       default:
+               switch (errno) {
+               case ENOMEM:
+                       DEBUG(ctdb_db->ctdb, LOG_CRIT,
+                             "ctdb_writerecord: out of memory.");
+                       break;
+               case EINVAL:
+                       DEBUG(ctdb_db->ctdb, LOG_ALERT,
+                             "ctdb_writerecord: record changed under lock?");
+                       break;
+               default:
+                       /* FIXME: replace with proper tdb logging. */
+                       DEBUG(ctdb_db->ctdb, LOG_CRIT,
+                             "ctdb_writerecord: tdb error.");
+                       break;
+               }
+               return false;
+       }
 }
index dbf06ede173a5735a34f6b831a87c7feb0b3b0d8..9a19df7c08bd073b57474a0d69b45fe581d8d457 100644 (file)
@@ -23,6 +23,7 @@
 #include <stdlib.h>
 #include <stdbool.h>
 #include <string.h>
+#include <errno.h>
 #include <ctdb_protocol.h> // For struct ctdb_ltdb_header
 #include "local_tdb.h"
 
@@ -51,44 +52,50 @@ struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb,
 
 
 /*
-  write a record to a normal database
+  write a record to a normal database: 1 on success, 0 if noop, -1 on fail.
+  errno = EIO => tdb error.
 */
 int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key,
                     struct ctdb_ltdb_header *header, TDB_DATA data)
 {
        TDB_DATA rec;
        int ret;
-       bool seqnum_suppressed = false;
+       TDB_DATA old;
 
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = malloc(rec.dsize);
-       if (!rec.dptr) {
+       old = tdb_fetch(tdb, key);
+       if (old.dsize < sizeof(*header)) {
+               errno = EIO;
                return -1;
        }
 
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+       /* Debugging check: we have lock and should not change hdr. */
+       if (memcmp(old.dptr, header, sizeof(*header)) != 0) {
+               free(old.dptr);
+               errno = EINVAL;
+               return -1;
+       }
 
-       /* Databases with seqnum updates enabled only get their seqnum
-          changes when/if we modify the data */
-       if (tdb_get_flags(tdb) & TDB_SEQNUM) {
-               TDB_DATA old;
-               old = tdb_fetch(tdb, key);
+       /* Optimize out the nothing-changed case. */
+       if (old.dsize == sizeof(*header) + data.dsize
+           && memcmp(old.dptr+sizeof(*header), data.dptr, data.dsize) == 0) {
+               free(old.dptr);
+               return 0;
+       }
 
-               if ( (old.dsize == rec.dsize)
-               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dptr+sizeof(struct ctdb_ltdb_header),
-                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
-                       tdb_remove_flags(tdb, TDB_SEQNUM);
-                       seqnum_suppressed = true;
-               }
+       rec.dsize = sizeof(*header) + data.dsize;
+       rec.dptr = malloc(rec.dsize);
+       if (!rec.dptr) {
                free(old.dptr);
+               errno = ENOMEM;
+               return -1;
        }
+
        ret = tdb_store(tdb, key, rec, TDB_REPLACE);
-       if (seqnum_suppressed) {
-               tdb_add_flags(tdb, TDB_SEQNUM);
-       }
+       free(old.dptr);
        free(rec.dptr);
-
-       return ret;
+       if (ret != 0) {
+               errno = EIO;
+               return -1;
+       }
+       return 1;
 }