server: create a server variant ctdb_ltdb_store_server() of ctdb_ltdb_store().
authorMichael Adam <obnox@samba.org>
Thu, 30 Dec 2010 16:44:51 +0000 (17:44 +0100)
committerMichael Adam <obnox@samba.org>
Wed, 9 Mar 2011 23:56:34 +0000 (00:56 +0100)
This is supposed to contain logic for deleting records that are safe
to delete and scheduling records for deletion. It will be called in
server context for non-persistent databases instead of the standard
ctdb_ltdb_store() function.

server/ctdb_ltdb_server.c

index 31a5f4f31e695fbed1f9f89bb0c2c324e41a1393..71aaaf5191ae60d8193fd5a3d0c09971022ad740 100644 (file)
@@ -50,6 +50,68 @@ static int ctdb_fetch_func(struct ctdb_call_info *call)
 }
 
 
+/**
+ * write a record to a normal database
+ *
+ * This is the server-variant of the ctdb_ltdb_store function.
+ * It contains logic to determine whether a record should be
+ * stored or deleted.
+ */
+static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
+                                 TDB_DATA key,
+                                 struct ctdb_ltdb_header *header,
+                                 TDB_DATA data)
+{
+       struct ctdb_context *ctdb = ctdb_db->ctdb;
+       TDB_DATA rec;
+       int ret;
+       bool seqnum_suppressed = false;
+
+       if (ctdb->flags & CTDB_FLAG_TORTURE) {
+               struct ctdb_ltdb_header *h2;
+               rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
+               h2 = (struct ctdb_ltdb_header *)rec.dptr;
+               if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
+                       DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
+                                (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+               }
+               if (rec.dptr) free(rec.dptr);
+       }
+
+       rec.dsize = sizeof(*header) + data.dsize;
+       rec.dptr = talloc_size(ctdb, rec.dsize);
+       CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+       memcpy(rec.dptr, header, sizeof(*header));
+       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+       /* Databases with seqnum updates enabled only get their seqnum
+          changes when/if we modify the data */
+       if (ctdb_db->seqnum_update != NULL) {
+               TDB_DATA old;
+               old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+
+               if ( (old.dsize == rec.dsize)
+               && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dptr+sizeof(struct ctdb_ltdb_header),
+                         rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+                       tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+                       seqnum_suppressed = true;
+               }
+               if (old.dptr) free(old.dptr);
+       }
+       ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to store dynamic data\n"));
+       }
+       if (seqnum_suppressed) {
+               tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
+       }
+
+       talloc_free(rec.dptr);
+
+       return ret;
+}
 
 struct lock_fetch_state {
        struct ctdb_context *ctdb;
@@ -1214,4 +1276,3 @@ int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
        return 0;
 }
 
-