server: Use the ctdb_ltdb_store_server() in the ctdb daemon for non-persistent dbs
[sahlberg/ctdb.git] / common / ctdb_ltdb.c
index a6bf26817aae9a6142d3076acd88010ad453c2a0..b5e586568cd379fc1735f2e3161f74dfb3ba3a02 100644 (file)
@@ -18,7 +18,7 @@
 */
 
 #include "includes.h"
-#include "lib/events/events.h"
+#include "lib/tevent/tevent.h"
 #include "lib/tdb/include/tdb.h"
 #include "system/network.h"
 #include "system/filesys.h"
@@ -62,11 +62,9 @@ static void ltdb_initial_header(struct ctdb_db_context *ctdb_db,
                                TDB_DATA key,
                                struct ctdb_ltdb_header *header)
 {
-       header->rsn = 0;
+       ZERO_STRUCTP(header);
        /* initial dmaster is the lmaster */
        header->dmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
-       header->laccessor = header->dmaster;
-       header->lacount = 0;
 }
 
 
@@ -129,6 +127,11 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
        struct ctdb_context *ctdb = ctdb_db->ctdb;
        TDB_DATA rec;
        int ret;
+       bool seqnum_suppressed = false;
+
+       if (ctdb_db->ctdb_ltdb_store_fn) {
+               return ctdb_db->ctdb_ltdb_store_fn(ctdb_db, key, header, data);
+       }
 
        if (ctdb->flags & CTDB_FLAG_TORTURE) {
                struct ctdb_ltdb_header *h2;
@@ -148,70 +151,29 @@ int ctdb_ltdb_store(struct ctdb_db_context *ctdb_db, TDB_DATA key,
        memcpy(rec.dptr, header, sizeof(*header));
        memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
 
+       /* Databases with seqnum updates enabled only get their seqnum
+          changes when/if we modify the data */
+       if (ctdb_db->seqnum_update != NULL) {
+               TDB_DATA old;
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+
+               if ( (old.dsize == rec.dsize)
+               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+                       tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+                       seqnum_suppressed = true;
+               }
+               if (old.dptr) free(old.dptr);
+       }
        ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
        if (ret != 0) {
                DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
        }
-
-       talloc_free(rec.dptr);
-
-       return ret;
-}
-
-/*
-  write a record to a persistent database
-  this is done by a child process
-*/
-int ctdb_ltdb_persistent_store(struct ctdb_db_context *ctdb_db, TDB_DATA key, 
-                   struct ctdb_ltdb_header *header, TDB_DATA data)
-{
-       struct ctdb_context *ctdb = ctdb_db->ctdb;
-       TDB_DATA rec;
-       int ret;
-
-       if (ctdb->flags & CTDB_FLAG_TORTURE) {
-               struct ctdb_ltdb_header *h2;
-               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
-               h2 = (struct ctdb_ltdb_header *)rec.dptr;
-               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
-                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
-                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
-               }
-               if (rec.dptr) free(rec.dptr);
-       }
-
-       rec.dsize = sizeof(*header) + data.dsize;
-       rec.dptr = talloc_size(ctdb, rec.dsize);
-       CTDB_NO_MEMORY(ctdb, rec.dptr);
-
-       memcpy(rec.dptr, header, sizeof(*header));
-       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
-
-       /* if this is a persistent database without NOSYNC then we
-          will do this via a transaction */
-       if (!(ctdb_db->client_tdb_flags & TDB_NOSYNC)) {
-               ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to start local transaction\n"));
-                       goto failed;
-               }
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to store persistent data\n"));
-                       tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-                       goto failed;
-               }
-               ret = tdb_transaction_commit(ctdb_db->ltdb->tdb);
-               if (ret != 0) {
-                       DEBUG(DEBUG_ERR, (__location__ " Failed to commit persistent store transaction.\n"));
-                       tdb_transaction_cancel(ctdb_db->ltdb->tdb);
-                       goto failed;
-               }
-       } else {
-               ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+       if (seqnum_suppressed) {
+               tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
        }
 
-failed:
        talloc_free(rec.dptr);
 
        return ret;
@@ -232,7 +194,26 @@ int ctdb_ltdb_unlock(struct ctdb_db_context *ctdb_db, TDB_DATA key)
 {
        int ret = tdb_chainunlock(ctdb_db->ltdb->tdb, key);
        if (ret != 0) {
-               DEBUG(DEBUG_ERR,("tdb_chainunlock failed\n"));
+               DEBUG(DEBUG_ERR,("tdb_chainunlock failed on db %s [%s]\n", ctdb_db->db_name, tdb_errorstr(ctdb_db->ltdb->tdb)));
        }
        return ret;
 }
+
+
+/*
+  delete a record from a normal database
+*/
+int ctdb_ltdb_delete(struct ctdb_db_context *ctdb_db, TDB_DATA key)
+{
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+
+       if (ctdb_db->persistent != 0) {
+               DEBUG(DEBUG_ERR,("Trying to delete emty record in persistent database\n"));
+               return 0;
+       }
+       if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
+               DEBUG(DEBUG_ERR,("Failed to delete empty record."));
+               return -1;
+       }
+       return 0;
+}