server: implement a new control SCHEDULE_FOR_DELETION to fill the delete_queue.
authorMichael Adam <obnox@samba.org>
Tue, 21 Dec 2010 13:25:48 +0000 (14:25 +0100)
committerMichael Adam <obnox@samba.org>
Wed, 9 Mar 2011 23:37:03 +0000 (00:37 +0100)
include/ctdb_private.h
server/ctdb_control.c
server/ctdb_vacuum.c

index e2cd9c83a0ca1157cf2f70bdad91ead16793ec6d..c2a448088d93c87131288cad10b4f3446ead1d50 100644 (file)
@@ -1384,4 +1384,17 @@ int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb);
 
 int ctdb_process_deferred_attach(struct ctdb_context *ctdb);
 
+/**
+ * structure to pass to a schedule_for_deletion_control
+ */
+struct ctdb_control_schedule_for_deletion {
+       uint32_t db_id;
+       struct ctdb_ltdb_header hdr;
+       uint32_t keylen;
+       uint8_t key[1]; /* key[] */
+};
+
+int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
+                                          TDB_DATA indata);
+
 #endif
index 69724e396575dbafb8f552edea136759f7261e5f..748907f2a9c3f09e524033d4918bd3d4b43020ce 100644 (file)
@@ -604,6 +604,15 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
                CHECK_CONTROL_DATA_SIZE(0);
                return ctdb_control_get_stat_history(ctdb, c, outdata);
 
+       case CTDB_CONTROL_SCHEDULE_FOR_DELETION: {
+               struct ctdb_control_schedule_for_deletion *d;
+               size_t size = offsetof(struct ctdb_control_schedule_for_deletion, key);
+               CHECK_CONTROL_MIN_DATA_SIZE(size);
+               d = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
+               size += d->keylen;
+               CHECK_CONTROL_DATA_SIZE(size);
+               return ctdb_control_schedule_for_deletion(ctdb, indata);
+       }
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index 94d3416bf3b763cda0e113f794278113b0cf973b..ec3b5f29f2815b2de0786f86a995e59e5e57ee3d 100644 (file)
@@ -1269,3 +1269,67 @@ int ctdb_vacuum_init(struct ctdb_db_context *ctdb_db)
 
        return 0;
 }
+
+/**
+ * Schedule a record for deletetion.
+ * Called from the parent context.
+ */
+int32_t ctdb_control_schedule_for_deletion(struct ctdb_context *ctdb,
+                                          TDB_DATA indata)
+{
+       struct ctdb_control_schedule_for_deletion *dd;
+       struct ctdb_db_context *ctdb_db;
+       int ret;
+       TDB_DATA key;
+       uint32_t hash;
+       struct delete_record_data *kd;
+
+       dd = (struct ctdb_control_schedule_for_deletion *)indata.dptr;
+
+       ctdb_db = find_ctdb_db(ctdb, dd->db_id);
+       if (ctdb_db == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Unknown db id 0x%08x\n",
+                                 dd->db_id));
+               return -1;
+       }
+
+       key.dsize = dd->keylen;
+       key.dptr = dd->key;
+
+       hash = (uint32_t)ctdb_hash(&key);
+
+       DEBUG(DEBUG_INFO, (__location__ " Schedule for deletion: db[%s] "
+                          "db_id[0x%08x] "
+                          "key_hash[0x%08x] "
+                          "lmaster[%u] "
+                          "migrated_with_data[%s]\n",
+                           ctdb_db->db_name, dd->db_id,
+                           hash,
+                           ctdb_lmaster(ctdb_db->ctdb, &key),
+                           dd->hdr.flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA ? "yes" : "no"));
+
+       kd = (struct delete_record_data *)trbt_lookup32(ctdb_db->delete_queue, hash);
+       if (kd != NULL) {
+               if ((kd->key.dsize != key.dsize) ||
+                   (memcmp(kd->key.dptr, key.dptr, key.dsize) != 0))
+               {
+                       DEBUG(DEBUG_INFO,
+                             ("schedule for deletion: Hash collision (0x%08x)."
+                              " Skipping the record.\n", hash));
+                       return 0;
+               } else {
+                       DEBUG(DEBUG_INFO,
+                             ("schedule for deletetion: Overwriting entry for "
+                              "key with hash 0x%08x.\n", hash));
+               }
+       }
+
+       ret = insert_delete_record_data_into_tree(ctdb, ctdb_db,
+                                                 ctdb_db->delete_queue,
+                                                 &dd->hdr, key);
+       if (ret != 0) {
+               return -1;
+       }
+
+       return 0;
+}