ctdb-daemon: Add implementation of VACUUM_FETCH control
authorAmitay Isaacs <amitay@gmail.com>
Fri, 16 Feb 2018 04:30:13 +0000 (15:30 +1100)
committerAmitay Isaacs <amitay@samba.org>
Thu, 24 Oct 2019 04:06:42 +0000 (04:06 +0000)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/include/ctdb_private.h
ctdb/server/ctdb_control.c
ctdb/server/ctdb_freeze.c
ctdb/server/ctdb_ltdb_server.c
ctdb/server/ctdb_vacuum.c

index 1f168dae2b8ea0cac9797be8f8bf0d9c044bab64..69afef731edff8d6edea80158a464bc44a29439a 100644 (file)
@@ -359,6 +359,7 @@ struct ctdb_db_context {
        struct revokechild_handle *revokechild_active;
        struct ctdb_persistent_state *persistent_state;
        struct trbt_tree *delete_queue;
+       struct trbt_tree *fetch_queue;
        struct trbt_tree *sticky_records; 
        int (*ctdb_ltdb_store_fn)(struct ctdb_db_context *ctdb_db,
                                  TDB_DATA key,
@@ -998,6 +999,8 @@ void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
                                         const struct ctdb_ltdb_header *hdr,
                                         const TDB_DATA key);
 
+int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata);
+
 /* from eventscript.c */
 
 int ctdb_start_eventd(struct ctdb_context *ctdb);
index 6c91e211660302435feafc4a0d8619ef4a571575..0174f303f1410048fdbd43ab30a491ef47a5b41e 100644 (file)
@@ -729,6 +729,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
        case CTDB_CONTROL_TUNNEL_DEREGISTER:
                return ctdb_control_tunnel_deregister(ctdb, client_id, srvid);
 
+       case CTDB_CONTROL_VACUUM_FETCH:
+               return ctdb_control_vacuum_fetch(ctdb, indata);
+
        default:
                DEBUG(DEBUG_CRIT,(__location__ " Unknown CTDB control opcode %u\n", opcode));
                return -1;
index b4b99a0e5c9a4167b345f704e513f112df83e78f..06aeacfd9398d8c42402f427734bc631bb713e3f 100644 (file)
@@ -869,10 +869,17 @@ int32_t ctdb_control_wipe_database(struct ctdb_context *ctdb, TDB_DATA indata)
 
        if (ctdb_db_volatile(ctdb_db)) {
                talloc_free(ctdb_db->delete_queue);
+               talloc_free(ctdb_db->fetch_queue);
                ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
                if (ctdb_db->delete_queue == NULL) {
                        DEBUG(DEBUG_ERR, (__location__ " Failed to re-create "
-                                         "the vacuum tree.\n"));
+                                         "the delete queue.\n"));
+                       return -1;
+               }
+               ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
+               if (ctdb_db->fetch_queue == NULL) {
+                       DEBUG(DEBUG_ERR, (__location__ " Failed to re-create "
+                                         "the fetch queue.\n"));
                        return -1;
                }
        }
index 022baf62d923144256f086915b0e8960b8d9e721..1ccf60832e1448733bd5bc70f698043e16901858 100644 (file)
@@ -770,6 +770,11 @@ static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
                        CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
                }
 
+               ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
+               if (ctdb_db->fetch_queue == NULL) {
+                       CTDB_NO_MEMORY(ctdb, ctdb_db->fetch_queue);
+               }
+
                ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
        }
 
@@ -1272,6 +1277,7 @@ int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
        /* Disable vacuuming and drop all vacuuming data */
        talloc_free(ctdb_db->vacuum_handle);
        talloc_free(ctdb_db->delete_queue);
+       talloc_free(ctdb_db->fetch_queue);
 
        /* Terminate any deferred fetch */
        talloc_free(ctdb_db->deferred_fetch);
index c1015275fb01a9901f65df3ec6b827da8425333e..09762c49795ac4125aca06b71bc15d89618f1b63 100644 (file)
@@ -41,6 +41,8 @@
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "protocol/protocol_api.h"
+
 #define TIMELIMIT() timeval_current_ofs(10, 0)
 
 enum vacuum_child_status { VACUUM_RUNNING, VACUUM_OK, VACUUM_ERROR, VACUUM_TIMEOUT};
@@ -117,6 +119,11 @@ struct delete_records_list {
        struct vacuum_data *vdata;
 };
 
+struct fetch_record_data {
+       TDB_DATA key;
+       uint8_t keydata[1];
+};
+
 static int insert_record_into_delete_queue(struct ctdb_db_context *ctdb_db,
                                           const struct ctdb_ltdb_header *hdr,
                                           TDB_DATA key);
@@ -1574,3 +1581,62 @@ void ctdb_local_remove_from_delete_queue(struct ctdb_db_context *ctdb_db,
 
        return;
 }
+
+static int vacuum_fetch_parser(uint32_t reqid,
+                              struct ctdb_ltdb_header *header,
+                              TDB_DATA key, TDB_DATA data,
+                              void *private_data)
+{
+       struct ctdb_db_context *ctdb_db = talloc_get_type_abort(
+               private_data, struct ctdb_db_context);
+       struct fetch_record_data *rd;
+       size_t len;
+       uint32_t hash;
+
+       len = offsetof(struct fetch_record_data, keydata) + key.dsize;
+
+       rd = (struct fetch_record_data *)talloc_size(ctdb_db->fetch_queue,
+                                                    len);
+       if (rd == NULL) {
+               DEBUG(DEBUG_ERR, (__location__ " Memory error\n"));
+               return -1;
+       }
+       talloc_set_name_const(rd, "struct fetch_record_data");
+
+       rd->key.dsize = key.dsize;
+       rd->key.dptr = rd->keydata;
+       memcpy(rd->keydata, key.dptr, key.dsize);
+
+       hash = ctdb_hash(&key);
+
+       trbt_insert32(ctdb_db->fetch_queue, hash, rd);
+
+       return 0;
+}
+
+int32_t ctdb_control_vacuum_fetch(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+       struct ctdb_rec_buffer *recbuf;
+       struct ctdb_db_context *ctdb_db;
+       size_t npull;
+       int ret;
+
+       ret = ctdb_rec_buffer_pull(indata.dptr, indata.dsize, ctdb, &recbuf,
+                                  &npull);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("Invalid data in vacuum_fetch\n"));
+               return -1;
+       }
+
+       ctdb_db = find_ctdb_db(ctdb, recbuf->db_id);
+       if (ctdb_db == NULL) {
+               talloc_free(recbuf);
+               DEBUG(DEBUG_ERR, (__location__ " Unknown db 0x%08x\n",
+                                 recbuf->db_id));
+               return -1;
+       }
+
+       ret = ctdb_rec_buffer_traverse(recbuf, vacuum_fetch_parser, ctdb_db);
+       talloc_free(recbuf);
+       return ret;
+}