ctdb-vacuum: Add processing of fetch queue
authorAmitay Isaacs <amitay@gmail.com>
Fri, 16 Feb 2018 06:00:40 +0000 (17:00 +1100)
committerAmitay Isaacs <amitay@samba.org>
Thu, 24 Oct 2019 04:06:42 +0000 (04:06 +0000)
Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/server/ctdb_vacuum.c

index 09762c49795ac4125aca06b71bc15d89618f1b63..bdb00c3df0b8d2b53fc2c0627300860baa3d5024 100644 (file)
@@ -317,6 +317,181 @@ static int delete_marshall_traverse(void *param, void *data)
        return 0;
 }
 
+struct fetch_queue_state {
+       struct ctdb_db_context *ctdb_db;
+       int count;
+};
+
+struct fetch_record_migrate_state {
+       struct fetch_queue_state *fetch_queue;
+       TDB_DATA key;
+};
+
+static void fetch_record_migrate_callback(struct ctdb_client_call_state *state)
+{
+       struct fetch_record_migrate_state *fetch = talloc_get_type_abort(
+               state->async.private_data, struct fetch_record_migrate_state);
+       struct fetch_queue_state *fetch_queue = fetch->fetch_queue;
+       struct ctdb_ltdb_header hdr;
+       struct ctdb_call call = { 0 };
+       int ret;
+
+       ret = ctdb_call_recv(state, &call);
+       fetch_queue->count--;
+       if (ret != 0) {
+               D_ERR("Failed to migrate record for vacuuming\n");
+               goto done;
+       }
+
+       ret = tdb_chainlock_nonblock(fetch_queue->ctdb_db->ltdb->tdb,
+                                    fetch->key);
+       if (ret != 0) {
+               goto done;
+       }
+
+       ret = tdb_parse_record(fetch_queue->ctdb_db->ltdb->tdb,
+                              fetch->key,
+                              vacuum_record_parser,
+                              &hdr);
+
+       tdb_chainunlock(fetch_queue->ctdb_db->ltdb->tdb, fetch->key);
+
+       if (ret != 0) {
+               goto done;
+       }
+
+       D_INFO("Vacuum Fetch record, key=%.*s\n",
+              (int)fetch->key.dsize,
+              fetch->key.dptr);
+
+       (void) ctdb_local_schedule_for_deletion(fetch_queue->ctdb_db,
+                                               &hdr,
+                                               fetch->key);
+
+done:
+       talloc_free(fetch);
+}
+
+static int fetch_record_parser(TDB_DATA key, TDB_DATA data, void *private_data)
+{
+       struct ctdb_ltdb_header *header =
+               (struct ctdb_ltdb_header *)private_data;
+
+       if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+               return -1;
+       }
+
+       memcpy(header, data.dptr, sizeof(*header));
+       return 0;
+}
+
+/**
+ * traverse function for the traversal of the fetch_queue.
+ *
+ * Send a record migration request.
+ */
+static int fetch_queue_traverse(void *param, void *data)
+{
+       struct fetch_record_data *rd = talloc_get_type_abort(
+               data, struct fetch_record_data);
+       struct fetch_queue_state *fetch_queue =
+               (struct fetch_queue_state *)param;
+       struct ctdb_db_context *ctdb_db = fetch_queue->ctdb_db;
+       struct ctdb_client_call_state *state;
+       struct fetch_record_migrate_state *fetch;
+       struct ctdb_call call = { 0 };
+       struct ctdb_ltdb_header header;
+       int ret;
+
+       ret = tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, rd->key);
+       if (ret != 0) {
+               return 0;
+       }
+
+       ret = tdb_parse_record(ctdb_db->ltdb->tdb,
+                              rd->key,
+                              fetch_record_parser,
+                              &header);
+
+       tdb_chainunlock(ctdb_db->ltdb->tdb, rd->key);
+
+       if (ret != 0) {
+               goto skipped;
+       }
+
+       if (header.dmaster == ctdb_db->ctdb->pnn) {
+               /* If the record is already migrated, skip */
+               goto skipped;
+       }
+
+       fetch = talloc_zero(ctdb_db, struct fetch_record_migrate_state);
+       if (fetch == NULL) {
+               D_ERR("Failed to setup fetch record migrate state\n");
+               return 0;
+       }
+
+       fetch->fetch_queue = fetch_queue;
+
+       fetch->key.dsize = rd->key.dsize;
+       fetch->key.dptr = talloc_memdup(fetch, rd->key.dptr, rd->key.dsize);
+       if (fetch->key.dptr == NULL) {
+               D_ERR("Memory error in fetch_queue_traverse\n");
+               talloc_free(fetch);
+               return 0;
+       }
+
+       call.call_id = CTDB_NULL_FUNC;
+       call.flags = CTDB_IMMEDIATE_MIGRATION |
+                    CTDB_CALL_FLAG_VACUUM_MIGRATION;
+       call.key = fetch->key;
+
+       state = ctdb_call_send(ctdb_db, &call);
+       if (state == NULL) {
+               DEBUG(DEBUG_ERR, ("Failed to setup vacuum fetch call\n"));
+               talloc_free(fetch);
+               return 0;
+       }
+
+       state->async.fn = fetch_record_migrate_callback;
+       state->async.private_data = fetch;
+
+       fetch_queue->count++;
+
+       return 0;
+
+skipped:
+       D_INFO("Skipped Fetch record, key=%.*s\n",
+              (int)rd->key.dsize,
+              rd->key.dptr);
+       return 0;
+}
+
+/**
+ * Traverse the fetch.
+ * Records are migrated to the local node and
+ * added to delete queue for further processing.
+ */
+static void ctdb_process_fetch_queue(struct ctdb_db_context *ctdb_db)
+{
+       struct fetch_queue_state state;
+       int ret;
+
+       state.ctdb_db = ctdb_db;
+       state.count = 0;
+
+       ret = trbt_traversearray32(ctdb_db->fetch_queue, 1,
+                                  fetch_queue_traverse, &state);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, (__location__ " Error traversing "
+                     "the fetch queue.\n"));
+       }
+
+       /* Wait for all migrations to complete */
+       while (state.count > 0) {
+               tevent_loop_once(ctdb_db->ctdb->ev);
+       }
+}
+
 /**
  * traverse function for the traversal of the delete_queue,
  * the fast-path vacuuming list.
@@ -998,8 +1173,10 @@ fail:
 /**
  * Vacuum a DB:
  *  - Always do the fast vacuuming run, which traverses
- *    the in-memory delete queue: these records have been
- *    scheduled for deletion.
+ *    - the in-memory fetch queue: these records have been
+ *      scheduled for migration
+ *    - the in-memory delete queue: these records have been
+ *      scheduled for deletion.
  *  - Only if explicitly requested, the database is traversed
  *    in order to use the traditional heuristics on empty records
  *    to trigger deletion.
@@ -1070,6 +1247,8 @@ static int ctdb_vacuum_db(struct ctdb_db_context *ctdb_db,
                ctdb_vacuum_traverse_db(ctdb_db, vdata);
        }
 
+       ctdb_process_fetch_queue(ctdb_db);
+
        ctdb_process_delete_queue(ctdb_db, vdata);
 
        ctdb_process_vacuum_fetch_lists(ctdb_db, vdata);
@@ -1310,10 +1489,17 @@ static void ctdb_vacuum_event(struct tevent_context *ev,
        ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
        if (ctdb_db->delete_queue == NULL) {
                /* fatal here? ... */
-               ctdb_fatal(ctdb, "Out of memory when re-creating vacuum tree "
+               ctdb_fatal(ctdb, "Out of memory when re-creating delete queue "
                                 "in parent context. Shutting down\n");
        }
 
+       talloc_free(ctdb_db->fetch_queue);
+       ctdb_db->fetch_queue = trbt_create(ctdb_db, 0);
+       if (ctdb_db->fetch_queue == NULL) {
+               ctdb_fatal(ctdb, "Out of memory when re-create fetch queue "
+                                " in parent context. Shutting down\n");
+       }
+
        tevent_add_timer(ctdb->ev, child_ctx,
                         timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
                         vacuum_child_timeout, child_ctx);