ctdb-client: Refactor cluster-wide database traverse api
authorAmitay Isaacs <amitay@gmail.com>
Tue, 4 Apr 2017 08:25:28 +0000 (18:25 +1000)
committerMartin Schwenke <martins@samba.org>
Wed, 7 Jun 2017 14:05:26 +0000 (16:05 +0200)
This implements the async version of the traverse code in the ctdb tool
for catdb command.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
ctdb/client/client.h
ctdb/client/client_db.c

index 2205112b4f11d246d884a29d9176d4555c6b0b45..928fcd52d29ea6347a2a3c55fde7277c979cf609 100644 (file)
@@ -699,6 +699,23 @@ int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
                           bool extract_header,
                           ctdb_rec_parser_func_t parser, void *private_data);
 
+struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct ctdb_client_context *client,
+                                        struct ctdb_db_context *db,
+                                        uint32_t destnode,
+                                        struct timeval timeout,
+                                        ctdb_rec_parser_func_t parser,
+                                        void *private_data);
+
+bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr);
+
+int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+                    struct ctdb_client_context *client,
+                    struct ctdb_db_context *db,
+                    uint32_t destnode, struct timeval timeout,
+                    ctdb_rec_parser_func_t parser, void *private_data);
+
 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
                    struct ctdb_ltdb_header *header,
                    TALLOC_CTX *mem_ctx, TDB_DATA *data);
index cb3dc44c529c2305b5d7f512207f3a094005ad53..604034335a17ddf317ab650c3fe649e716e9dec2 100644 (file)
@@ -689,6 +689,252 @@ int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
        return state.error;
 }
 
+struct ctdb_db_traverse_state {
+       struct tevent_context *ev;
+       struct ctdb_client_context *client;
+       struct ctdb_db_context *db;
+       uint32_t destnode;
+       uint64_t srvid;
+       struct timeval timeout;
+       ctdb_rec_parser_func_t parser;
+       void *private_data;
+       int result;
+};
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
+static void ctdb_db_traverse_started(struct tevent_req *subreq);
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+                                    void *private_data);
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
+                                        struct tevent_context *ev,
+                                        struct ctdb_client_context *client,
+                                        struct ctdb_db_context *db,
+                                        uint32_t destnode,
+                                        struct timeval timeout,
+                                        ctdb_rec_parser_func_t parser,
+                                        void *private_data)
+{
+       struct tevent_req *req, *subreq;
+       struct ctdb_db_traverse_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct ctdb_db_traverse_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ev = ev;
+       state->client = client;
+       state->db = db;
+       state->destnode = destnode;
+       state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
+       state->timeout = timeout;
+       state->parser = parser;
+       state->private_data = private_data;
+
+       subreq = ctdb_client_set_message_handler_send(state, ev, client,
+                                                     state->srvid,
+                                                     ctdb_db_traverse_handler,
+                                                     req);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
+
+       return req;
+}
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_db_traverse_state *state = tevent_req_data(
+               req, struct ctdb_db_traverse_state);
+       struct ctdb_traverse_start_ext traverse;
+       struct ctdb_req_control request;
+       int ret = 0;
+       bool status;
+
+       status = ctdb_client_set_message_handler_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       traverse = (struct ctdb_traverse_start_ext) {
+               .db_id = ctdb_db_id(state->db),
+               .reqid = 0,
+               .srvid = state->srvid,
+               .withemptyrecords = false,
+       };
+
+       ctdb_req_control_traverse_start_ext(&request, &traverse);
+       subreq = ctdb_client_control_send(state, state->ev, state->client,
+                                         state->destnode, state->timeout,
+                                         &request);
+       if (subreq == NULL) {
+               state->result = ENOMEM;
+               ctdb_db_traverse_remove_handler(req);
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
+}
+
+static void ctdb_db_traverse_started(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_db_traverse_state *state = tevent_req_data(
+               req, struct ctdb_db_traverse_state);
+       struct ctdb_reply_control *reply;
+       int ret = 0;
+       bool status;
+
+       status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
+               state->result = ret;
+               ctdb_db_traverse_remove_handler(req);
+               return;
+       }
+
+       ret = ctdb_reply_control_traverse_start_ext(reply);
+       talloc_free(reply);
+       if (ret != 0) {
+               DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
+                                 ret));
+               state->result = ret;
+               ctdb_db_traverse_remove_handler(req);
+               return;
+       }
+}
+
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+                                    void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct ctdb_db_traverse_state *state = tevent_req_data(
+               req, struct ctdb_db_traverse_state);
+       struct ctdb_rec_data *rec;
+       struct ctdb_ltdb_header header;
+       int ret;
+
+       ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
+       if (ret != 0) {
+               return;
+       }
+
+       if (rec->key.dsize == 0 && rec->data.dsize == 0) {
+               talloc_free(rec);
+               ctdb_db_traverse_remove_handler(req);
+               return;
+       }
+
+       ret = ctdb_ltdb_header_extract(&rec->data, &header);
+       if (ret != 0) {
+               talloc_free(rec);
+               return;
+       }
+
+       if (rec->data.dsize == 0) {
+               talloc_free(rec);
+               return;
+       }
+
+       ret = state->parser(rec->reqid, &header, rec->key, rec->data,
+                           state->private_data);
+       talloc_free(rec);
+       if (ret != 0) {
+               state->result = ret;
+               ctdb_db_traverse_remove_handler(req);
+       }
+}
+
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
+{
+       struct ctdb_db_traverse_state *state = tevent_req_data(
+               req, struct ctdb_db_traverse_state);
+       struct tevent_req *subreq;
+
+       subreq = ctdb_client_remove_message_handler_send(state, state->ev,
+                                                        state->client,
+                                                        state->srvid, req);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
+}
+
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct ctdb_db_traverse_state *state = tevent_req_data(
+               req, struct ctdb_db_traverse_state);
+       int ret;
+       bool status;
+
+       status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+       TALLOC_FREE(subreq);
+       if (! status) {
+               tevent_req_error(req, ret);
+               return;
+       }
+
+       if (state->result != 0) {
+               tevent_req_error(req, state->result);
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
+{
+       int ret;
+
+       if (tevent_req_is_unix_error(req, &ret)) {
+               if (perr != NULL) {
+                       *perr = ret;
+               }
+               return false;
+       }
+
+       return true;
+}
+
+int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+                    struct ctdb_client_context *client,
+                    struct ctdb_db_context *db,
+                    uint32_t destnode, struct timeval timeout,
+                    ctdb_rec_parser_func_t parser, void *private_data)
+{
+       struct tevent_req *req;
+       int ret = 0;
+       bool status;
+
+       req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
+                                   timeout, parser, private_data);
+       if (req == NULL) {
+               return ENOMEM;
+       }
+
+       tevent_req_poll(req, ev);
+
+       status = ctdb_db_traverse_recv(req, &ret);
+       if (! status) {
+               return ret;
+       }
+
+       return 0;
+}
+
 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
                    struct ctdb_ltdb_header *header,
                    TALLOC_CTX *mem_ctx, TDB_DATA *data)