tmp_ctx = talloc_new(ctdb);
/* send the CTDB_REPLY_DMASTER */
- len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize;
+ len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize + sizeof(uint32_t);
r = ctdb_transport_allocate(ctdb, tmp_ctx, CTDB_REPLY_DMASTER, len,
struct ctdb_reply_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->db_id = ctdb_db->db_id;
memcpy(&r->data[0], key.dptr, key.dsize);
memcpy(&r->data[key.dsize], data.dptr, data.dsize);
+ memcpy(&r->data[key.dsize+data.dsize], &header->flags, sizeof(uint32_t));
ctdb_queue_packet(ctdb, &r->hdr);
return;
}
- len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize;
+ len = offsetof(struct ctdb_req_dmaster, data) + key->dsize + data->dsize
+ + sizeof(uint32_t);
r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_DMASTER, len,
struct ctdb_req_dmaster);
CTDB_NO_MEMORY_FATAL(ctdb, r);
r->datalen = data->dsize;
memcpy(&r->data[0], key->dptr, key->dsize);
memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+ memcpy(&r->data[key->dsize + data->dsize], &header->flags, sizeof(uint32_t));
header->dmaster = c->hdr.srcnode;
if (ctdb_ltdb_store(ctdb_db, *key, header, *data) != 0) {
must be called with the chainlock held. This function releases the chainlock
*/
-static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
+static void ctdb_become_dmaster(struct ctdb_db_context *ctdb_db,
struct ctdb_req_header *hdr,
TDB_DATA key, TDB_DATA data,
- uint64_t rsn)
+ uint64_t rsn, uint32_t record_flags)
{
struct ctdb_call_state *state;
struct ctdb_context *ctdb = ctdb_db->ctdb;
TDB_DATA key, data, data2;
struct ctdb_ltdb_header header;
struct ctdb_db_context *ctdb_db;
+ uint32_t record_flags = 0;
+ size_t len;
int ret;
key.dptr = c->data;
key.dsize = c->keylen;
data.dptr = c->data + c->keylen;
data.dsize = c->datalen;
+ len = offsetof(struct ctdb_req_dmaster, data) + key.dsize + data.dsize
+ + sizeof(uint32_t);
+ if (len <= c->hdr.length) {
+ record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+ }
ctdb_db = find_ctdb_db(ctdb, c->db_id);
if (!ctdb_db) {
/* use the rsn from the sending node */
header.rsn = c->rsn;
+ /* store the record flags from the sending node */
+ header.flags = record_flags;
+
/* check if the new dmaster is the lmaster, in which case we
skip the dmaster reply */
if (c->dmaster == ctdb->pnn) {
- ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+ ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
} else {
ctdb_send_dmaster_reply(ctdb_db, &header, key, data, c->dmaster, hdr->reqid);
struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
struct ctdb_db_context *ctdb_db;
TDB_DATA key, data;
+ uint32_t record_flags = 0;
+ size_t len;
int ret;
ctdb_db = find_ctdb_db(ctdb, c->db_id);
key.dsize = c->keylen;
data.dptr = &c->data[key.dsize];
data.dsize = c->datalen;
+ len = offsetof(struct ctdb_reply_dmaster, data) + key.dsize + data.dsize
+ + sizeof(uint32_t);
+ if (len <= c->hdr.length) {
+ record_flags = *(uint32_t *)&c->data[c->keylen + c->datalen];
+ }
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr,
ctdb_call_input_pkt, ctdb, False);
return;
}
- ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn);
+ ctdb_become_dmaster(ctdb_db, hdr, key, data, c->rsn, record_flags);
}