call: transfer the record flags in the ctdb call packets.
authorMichael Adam <obnox@samba.org>
Fri, 10 Dec 2010 13:02:33 +0000 (14:02 +0100)
committerMichael Adam <obnox@samba.org>
Wed, 9 Mar 2011 23:01:08 +0000 (00:01 +0100)
This way, the MIGRATED_WITH_DATA information can be transported
along with the records. This is important for vacuuming to function
properly.

The record flags are appended to the data section of the ctdb_req_dmaster
and ctdb_reply_dmaster structs.

Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

server/ctdb_call.c

index 813e3cf9fe6441ff50407863fd3af1ccfa9ba093..b02b843147b33783147fd95d5fccf251dca2fee2 100644 (file)
@@ -160,7 +160,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        tmp_ctx = talloc_new(ctdb);
 
        /* send the CTDB_REPLY_DMASTER */
-       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
+       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
                                    struct ctdb_reply_dmaster);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
@@ -173,6 +173,7 @@ static void ctdb_send_dmaster_reply(struct ctdb_db_context *ctdb_db,
        r->db_id         = ctdb_db->db_id;
        memcpy(&r->data[0], key.dptr, key.dsize);
        memcpy(&r->data[key.dsize], data.dptr, data.dsize);
+       memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
 
        ctdb_queue_packet(ctdb, &r->hdr);
 
@@ -211,7 +212,8 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
                return;
        }
        
-       len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
+       len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
+                       + sizeof(uint32_t);
        r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len, 
                                    struct ctdb_req_dmaster);
        CTDB_NO_MEMORY_FATAL(ctdb, r);
@@ -224,6 +226,7 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
        r->datalen       = data->dsize;
        memcpy(&r->data[0], key->dptr, key->dsize);
        memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+       memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
 
        header->dmaster = c->hdr.srcnode;
        if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
@@ -241,10 +244,10 @@ static void ctdb_call_send_dmaster(struct ctdb_db_context *ctdb_db,
 
   must be called with the chainlock held. This function releases the chainlock
 */
-static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db, 
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
                                struct ctdb_req_header *hdr,
                                TDB_DATA key, TDB_DATA data,
-                               uint64_t rsn)
+                               uint64_t rsn, uint32_t record_flags)
 {
        struct ctdb_call_state *state;
        struct ctdb_context *ctdb = ctdb_db->ctdb;
@@ -328,12 +331,19 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        TDB_DATA key, data, data2;
        struct ctdb_ltdb_header header;
        struct ctdb_db_context *ctdb_db;
+       uint32_t record_flags = 0;
+       size_t len;
        int ret;
 
        key.dptr = c->data;
        key.dsize = c->keylen;
        data.dptr = c->data + c->keylen;
        data.dsize = c->datalen;
+       len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
+                       + sizeof(uint32_t);
+       if (len <= c->hdr.length) {
+               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+       }
 
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
        if (!ctdb_db) {
@@ -390,10 +400,13 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
        /* use the rsn from the sending node */
        header.rsn = c->rsn;
 
+       /* store the record flags from the sending node */
+       header.flags = record_flags;
+
        /* check if the new dmaster is the lmaster, in which case we
           skip the dmaster reply */
        if (c->dmaster == ctdb->pnn) {
-               ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+               ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
        } else {
                ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
 
@@ -566,6 +579,8 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
        struct ctdb_db_context *ctdb_db;
        TDB_DATA key, data;
+       uint32_t record_flags = 0;
+       size_t len;
        int ret;
 
        ctdb_db = find_ctdb_db(ctdb, c->db_id);
@@ -578,6 +593,11 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
        key.dsize = c->keylen;
        data.dptr = &c->data[key.dsize];
        data.dsize = c->datalen;
+       len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
+               + sizeof(uint32_t);
+       if (len <= c->hdr.length) {
+               record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+       }
 
        ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
                                     ctdb_call_input_pkt, ctdb, False);
@@ -589,7 +609,7 @@ void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
                return;
        }
 
-       ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+       ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
 }