ctdb-daemon: Use correct tdb flags when enabling robust mutex support
authorAmitay Isaacs <amitay@gmail.com>
Thu, 11 Dec 2014 02:16:47 +0000 (13:16 +1100)
committerStefan Metzmacher <metze@samba.org>
Fri, 19 Dec 2014 12:15:12 +0000 (13:15 +0100)
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11000

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
ctdb/client/ctdb_client.c
ctdb/server/ctdb_lock.c
ctdb/server/ctdb_lock_helper.c
ctdb/server/ctdb_ltdb_server.c

index 07b17d016a488110e835ba6c0df7f4c96e383043..da18826fa59d12e64ef5b7231e54c44047ae77ab 100644 (file)
@@ -1928,7 +1928,7 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32
 
 #ifdef TDB_MUTEX_LOCKING
        if (!persistent && ctdb->tunable.mutex_enabled == 1) {
-               tdb_flags |= TDB_MUTEX_LOCKING;
+               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
        }
 #endif
 
@@ -2055,6 +2055,9 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        TDB_DATA data;
        int ret;
        int32_t res;
+#ifdef TDB_MUTEX_LOCKING
+       uint32_t mutex_enabled = 0;
+#endif
 
        ctdb_db = ctdb_db_handle(ctdb, name);
        if (ctdb_db) {
@@ -2080,8 +2083,18 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
        }
 
 #ifdef TDB_MUTEX_LOCKING
-       if (!persistent && ctdb->tunable.mutex_enabled == 1) {
-               tdb_flags |= TDB_MUTEX_LOCKING;
+       if (!persistent) {
+               ret = ctdb_ctrl_get_tunable(ctdb, timeval_current_ofs(3,0),
+                                           CTDB_CURRENT_NODE,
+                                           "TDBMutexEnabled",
+                                           &mutex_enabled);
+               if (ret != 0) {
+                       DEBUG(DEBUG_WARNING, ("Assuming no mutex support.\n"));
+               }
+
+               if (mutex_enabled == 1) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
        }
 #endif
 
@@ -2105,7 +2118,16 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
                return NULL;
        }
 
-       tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
+       if (persistent) {
+               tdb_flags = TDB_DEFAULT;
+       } else {
+               tdb_flags = TDB_NOSYNC;
+#ifdef TDB_MUTEX_LOCKING
+               if (mutex_enabled) {
+                       tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+               }
+#endif
+       }
        if (ctdb->valgrinding) {
                tdb_flags |= TDB_NOMMAP;
        }
index 22a88b337ed86a436d32fc32724504286b8663ee..7959d40fbfec98e12c78348b74ae64d5a8ff12a2 100644 (file)
@@ -544,11 +544,23 @@ static int db_count_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
 {
        int *count = (int *)private_data;
 
-       (*count)++;
+       (*count) += 2;
 
        return 0;
 }
 
+static int db_flags(struct ctdb_db_context *ctdb_db)
+{
+       int tdb_flags = TDB_DEFAULT;
+
+#ifdef TDB_MUTEX_LOCKING
+       if (!ctdb_db->persistent && ctdb_db->ctdb->tunable.mutex_enabled) {
+               tdb_flags = (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+       }
+#endif
+       return tdb_flags;
+}
+
 struct db_namelist {
        const char **names;
        int n;
@@ -560,7 +572,9 @@ static int db_name_handler(struct ctdb_db_context *ctdb_db, uint32_t priority,
        struct db_namelist *list = (struct db_namelist *)private_data;
 
        list->names[list->n] = talloc_strdup(list->names, ctdb_db->db_path);
-       list->n++;
+       list->names[list->n+1] = talloc_asprintf(list->names, "0x%x",
+                                                db_flags(ctdb_db));
+       list->n += 2;
 
        return 0;
 }
@@ -577,11 +591,11 @@ static bool lock_helper_args(TALLOC_CTX *mem_ctx,
 
        switch (lock_ctx->type) {
        case LOCK_RECORD:
-               nargs = 5;
+               nargs = 6;
                break;
 
        case LOCK_DB:
-               nargs = 4;
+               nargs = 5;
                break;
 
        case LOCK_ALLDB_PRIO:
@@ -612,16 +626,20 @@ static bool lock_helper_args(TALLOC_CTX *mem_ctx,
        case LOCK_RECORD:
                args[2] = talloc_strdup(args, "RECORD");
                args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
                if (lock_ctx->key.dsize == 0) {
-                       args[4] = talloc_strdup(args, "NULL");
+                       args[5] = talloc_strdup(args, "NULL");
                } else {
-                       args[4] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
+                       args[5] = hex_encode_talloc(args, lock_ctx->key.dptr, lock_ctx->key.dsize);
                }
                break;
 
        case LOCK_DB:
                args[2] = talloc_strdup(args, "DB");
                args[3] = talloc_strdup(args, lock_ctx->ctdb_db->db_path);
+               args[4] = talloc_asprintf(args, "0x%x",
+                                         db_flags(lock_ctx->ctdb_db));
                break;
 
        case LOCK_ALLDB_PRIO:
index 2161a9a0c446a2ec8abb5e4da4e4ea01bf92d865..7a09ecf3c70edc9777c066384496f97db6407dd1 100644 (file)
@@ -36,9 +36,9 @@ static void send_result(int fd, char result)
 static void usage(void)
 {
        fprintf(stderr, "\n");
-       fprintf(stderr, "Usage: %s <log-fd> <ctdbd-pid> <output-fd> RECORD <db-path> <db-key>\n",
+       fprintf(stderr, "Usage: %s <log-fd> <ctdbd-pid> <output-fd> RECORD <db-path> <db-flags> <db-key>\n",
                progname);
-       fprintf(stderr, "       %s <log-fd> <ctdbd-pid> <output-fd> DB <db1-path> [<db2-path> ...]\n",
+       fprintf(stderr, "       %s <log-fd> <ctdbd-pid> <output-fd> DB <db1-path> <db1-flags> [<db2-path> <db2-flags>...]\n",
                progname);
 }
 
@@ -59,10 +59,14 @@ static uint8_t *hex_decode_talloc(TALLOC_CTX *mem_ctx,
        return buffer;
 }
 
-static int lock_record(const char *dbpath, const char *dbkey)
+static int lock_record(const char *dbpath, const char *dbflags, const char *dbkey)
 {
        TDB_DATA key;
        struct tdb_context *tdb;
+       int tdb_flags;
+
+       /* No error checking since CTDB always passes sane values */
+       tdb_flags = strtol(dbflags, NULL, 0);
 
        /* Convert hex key to key */
        if (strcmp(dbkey, "NULL") == 0) {
@@ -72,7 +76,7 @@ static int lock_record(const char *dbpath, const char *dbkey)
                key.dptr = hex_decode_talloc(NULL, dbkey, &key.dsize);
        }
 
-       tdb = tdb_open(dbpath, 0, TDB_DEFAULT, O_RDWR, 0600);
+       tdb = tdb_open(dbpath, 0, tdb_flags, O_RDWR, 0600);
        if (tdb == NULL) {
                fprintf(stderr, "%s: Error opening database %s\n", progname, dbpath);
                return 1;
@@ -89,11 +93,15 @@ static int lock_record(const char *dbpath, const char *dbkey)
 }
 
 
-static int lock_db(const char *dbpath)
+static int lock_db(const char *dbpath, const char *dbflags)
 {
        struct tdb_context *tdb;
+       int tdb_flags;
+
+       /* No error checking since CTDB always passes sane values */
+       tdb_flags = strtol(dbflags, NULL, 0);
 
-       tdb = tdb_open(dbpath, 0, TDB_DEFAULT, O_RDWR, 0600);
+       tdb = tdb_open(dbpath, 0, tdb_flags, O_RDWR, 0600);
        if (tdb == NULL) {
                fprintf(stderr, "%s: Error opening database %s\n", progname, dbpath);
                return 1;
@@ -140,21 +148,21 @@ int main(int argc, char *argv[])
        lock_type = argv[4];
 
        if (strcmp(lock_type, "RECORD") == 0) {
-               if (argc != 7) {
+               if (argc != 8) {
                        fprintf(stderr, "%s: Invalid number of arguments (%d)\n",
                                progname, argc);
                        usage();
                        exit(1);
                }
-               result = lock_record(argv[5], argv[6]);
+               result = lock_record(argv[5], argv[6], argv[7]);
 
        } else if (strcmp(lock_type, "DB") == 0) {
                int n;
 
                /* If there are no databases specified, no need for lock */
                if (argc > 5) {
-                       for (n=5; n<argc; n++) {
-                               result = lock_db(argv[n]);
+                       for (n=5; n+1<argc; n+=2) {
+                               result = lock_db(argv[n], argv[n+1]);
                                if (result != 0) {
                                        break;
                                }
index 9ac2217a34a8251d6997e758c64fbc3ae18898fd..174a460178453c1b7bd7e32bcea533ab4c2ace26 100644 (file)
@@ -844,7 +844,7 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
 #ifdef TDB_MUTEX_LOCKING
        if (ctdb->tunable.mutex_enabled && mutexes &&
            tdb_runtime_check_for_robust_mutexes()) {
-               tdb_flags |= TDB_MUTEX_LOCKING;
+               tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
        }
 #endif
 
@@ -1138,7 +1138,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
           that tdb_flags is passed in via the (otherwise unused)
           srvid to the attach control */
 #ifdef TDB_MUTEX_LOCKING
-       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING);
+       tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
 #else
        tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
 #endif