vacuum: introduce the RECEIVE_RECORDS control
authorMichael Adam <obnox@samba.org>
Thu, 20 Dec 2012 23:24:47 +0000 (00:24 +0100)
committerMichael Adam <obnox@samba.org>
Fri, 26 Apr 2013 14:51:59 +0000 (16:51 +0200)
This in preparation of turning the vacuming on the lmaster into
into a two phase process:

- First the node sends the list of records to be vacuumed
  to all other nodes with this new RECEIVE_RECORDS control.
  The remote nodes should store the lmaster's empty current copy.
- Only those records that could be stored on all other nodes
  are processed further. They are send to all other nodes with
  the TRY_DELETE_RECORDS control as before for deletion.

Signed-off-by: Michael Adam <obnox@samba.org>
Reviewed-By: Amitay Isaacs <amitay@gmail.com>
(cherry picked from commit e397702e271af38204fd99733bbeba7c1db3a999)

Conflicts:

include/ctdb_protocol.h
server/ctdb_control.c

include/ctdb_private.h
server/ctdb_control.c
server/ctdb_recover.c

index 3b290612d7a54c0d1cca9368af90a978fc218d83..a2af9bb52b6d621e6fe73d4122b3af6715e76670 100644 (file)
@@ -659,6 +659,8 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS          = 0,
                    CTDB_CONTROL_SCHEDULE_FOR_DELETION   = 128,
                    /* 129 & 130: skipped (master) */
                    CTDB_CONTROL_TRAVERSE_START_EXT      = 131,
+                   /* 132, 133, 134, 135 skipped (master) */
+                   CTDB_CONTROL_RECEIVE_RECORDS         = 136,
 };
 
 /*
@@ -1460,6 +1462,7 @@ int32_t ctdb_control_get_tunable(struct ctdb_context *ctdb, TDB_DATA indata,
 int32_t ctdb_control_set_tunable(struct ctdb_context *ctdb, TDB_DATA indata);
 int32_t ctdb_control_list_tunables(struct ctdb_context *ctdb, TDB_DATA *outdata);
 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
+int32_t ctdb_control_receive_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
 int32_t ctdb_control_add_public_address(struct ctdb_context *ctdb, TDB_DATA indata);
 int32_t ctdb_control_del_public_address(struct ctdb_context *ctdb, TDB_DATA indata);
 
index 64ea8272395c7a482abf6c9ad95a8bd9f8cbf27f..0a3c7612d20ba107bb004207b42fe75cd5c35224 100644 (file)
@@ -595,6 +595,10 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                CHECK_CONTROL_DATA_SIZE(size);
                return ctdb_control_schedule_for_deletion(ctdb, indata);
        }
+
+       case CTDB_CONTROL_RECEIVE_RECORDS:
+               return ctdb_control_receive_records(ctdb, indata, outdata);
+
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 99a601dab20e45fe3171a9547f135830784e7046..9d360c7634f4cabeddc92112d59d0bd8be6ee2e0 100644 (file)
@@ -1078,6 +1078,188 @@ int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA inda
        return 0;
 }
 
+/**
+ * Store a record as part of the vacuum process:
+ * This is called from the RECEIVE_RECORD control which
+ * the lmaster uses to send the current empty copy
+ * to all nodes for storing, before it lets the other
+ * nodes delete the records in the second phase with
+ * the TRY_DELETE_RECORDS control.
+ *
+ * Only store if we are not lmaster or dmaster, and our
+ * rsn is <= the provided rsn. Use non-blocking locks.
+ *
+ * return 0 if the record was successfully stored.
+ * return !0 if the record still exists in the tdb after returning.
+ */
+static int store_tdb_record(struct ctdb_context *ctdb,
+                           struct ctdb_db_context *ctdb_db,
+                           struct ctdb_rec_data *rec)
+{
+       TDB_DATA key, data, data2;
+       struct ctdb_ltdb_header *hdr, *hdr2;
+       int ret;
+
+       key.dsize = rec->keylen;
+       key.dptr = &rec->data[0];
+       data.dsize = rec->datalen;
+       data.dptr = &rec->data[rec->keylen];
+
+       if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
+               DEBUG(DEBUG_INFO, (__location__ " Called store_tdb_record "
+                                  "where we are lmaster\n"));
+               return -1;
+       }
+
+       if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+               DEBUG(DEBUG_ERR, (__location__ " Bad record size\n"));
+               return -1;
+       }
+
+       hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+       /* use a non-blocking lock */
+       if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Failed to lock chain\n"));
+               return -1;
+       }
+
+       data2 = tdb_fetch(ctdb_db->ltdb->tdb, key);
+       if (data2.dptr == NULL || data2.dsize < sizeof(struct ctdb_ltdb_header)) {
+               tdb_store(ctdb_db->ltdb->tdb, key, data, 0);
+               DEBUG(DEBUG_INFO, (__location__ " Stored record\n"));
+               ret = 0;
+               goto done;
+       }
+
+       hdr2 = (struct ctdb_ltdb_header *)data.dptr;
+
+       if (hdr2->rsn > hdr->rsn) {
+               DEBUG(DEBUG_INFO, (__location__ " Skipping record with "
+                                  "rsn=%llu - called with rsn=%llu\n",
+                                  (unsigned long long)hdr2->rsn,
+                                  (unsigned long long)hdr->rsn));
+               ret = -1;
+               goto done;
+       }
+
+       if (hdr2->dmaster == ctdb->pnn) {
+               DEBUG(DEBUG_INFO, (__location__ " Attempted to store record "
+                                  "where we are the dmaster\n"));
+               ret = -1;
+               goto done;
+       }
+
+       if (tdb_store(ctdb_db->ltdb->tdb, key, data, 0) != 0) {
+               DEBUG(DEBUG_INFO,(__location__ " Failed to store record\n"));
+               ret = -1;
+               goto done;
+       }
+
+       ret = 0;
+
+done:
+       tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+       free(data2.dptr);
+       return  ret;
+}
+
+
+
+/**
+ * Try to store all these records as part of the vacuuming process
+ * and return the records we failed to store.
+ */
+int32_t ctdb_control_receive_records(struct ctdb_context *ctdb,
+                                    TDB_DATA indata, TDB_DATA *outdata)
+{
+       struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
+       struct ctdb_db_context *ctdb_db;
+       int i;
+       struct ctdb_rec_data *rec;
+       struct ctdb_marshall_buffer *records;
+
+       if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
+               DEBUG(DEBUG_ERR,
+                     (__location__ " invalid data in receive_records\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, reply->db_id);
+       if (!ctdb_db) {
+               DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
+                                 reply->db_id));
+               return -1;
+       }
+
+       DEBUG(DEBUG_DEBUG, ("starting receive_records of %u records for "
+                           "dbid 0x%x\n", reply->count, reply->db_id));
+
+       /* create a blob to send back the records we could not store */
+       records = (struct ctdb_marshall_buffer *)
+                       talloc_zero_size(outdata,
+                               offsetof(struct ctdb_marshall_buffer, data));
+       if (records == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Out of memory\n"));
+               return -1;
+       }
+       records->db_id = ctdb_db->db_id;
+
+       rec = (struct ctdb_rec_data *)&reply->data[0];
+       for (i=0; i<reply->count; i++) {
+               TDB_DATA key, data;
+
+               key.dptr = &rec->data[0];
+               key.dsize = rec->keylen;
+               data.dptr = &rec->data[key.dsize];
+               data.dsize = rec->datalen;
+
+               if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+                       DEBUG(DEBUG_CRIT, (__location__ " bad ltdb record "
+                                          "in indata\n"));
+                       return -1;
+               }
+
+               /*
+                * If we can not store the record we must add it to the reply
+                * so the lmaster knows it may not purge this record.
+                */
+               if (store_tdb_record(ctdb, ctdb_db, rec) != 0) {
+                       size_t old_size;
+                       struct ctdb_ltdb_header *hdr;
+
+                       hdr = (struct ctdb_ltdb_header *)data.dptr;
+                       data.dptr += sizeof(*hdr);
+                       data.dsize -= sizeof(*hdr);
+
+                       DEBUG(DEBUG_INFO, (__location__ " Failed to store "
+                                          "record with hash 0x%08x in vacuum "
+                                          "via RECEIVE_RECORDS\n",
+                                          ctdb_hash(&key)));
+
+                       old_size = talloc_get_size(records);
+                       records = talloc_realloc_size(outdata, records,
+                                                     old_size + rec->length);
+                       if (records == NULL) {
+                               DEBUG(DEBUG_ERR, (__location__ " Failed to "
+                                                 "expand\n"));
+                               return -1;
+                       }
+                       records->count++;
+                       memcpy(old_size+(uint8_t *)records, rec, rec->length);
+               }
+
+               rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
+       }
+
+
+       outdata->dptr = (uint8_t *)records;
+       outdata->dsize = talloc_get_size(records);
+
+       return 0;
+}
+
+
 /*
   report capabilities
  */