return state.error;
}
+struct ctdb_db_traverse_state {
+ struct tevent_context *ev;
+ struct ctdb_client_context *client;
+ struct ctdb_db_context *db;
+ uint32_t destnode;
+ uint64_t srvid;
+ struct timeval timeout;
+ ctdb_rec_parser_func_t parser;
+ void *private_data;
+ int result;
+};
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
+static void ctdb_db_traverse_started(struct tevent_req *subreq);
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data);
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
+
+struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ uint32_t destnode,
+ struct timeval timeout,
+ ctdb_rec_parser_func_t parser,
+ void *private_data)
+{
+ struct tevent_req *req, *subreq;
+ struct ctdb_db_traverse_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct ctdb_db_traverse_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ev = ev;
+ state->client = client;
+ state->db = db;
+ state->destnode = destnode;
+ state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
+ state->timeout = timeout;
+ state->parser = parser;
+ state->private_data = private_data;
+
+ subreq = ctdb_client_set_message_handler_send(state, ev, client,
+ state->srvid,
+ ctdb_db_traverse_handler,
+ req);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
+
+ return req;
+}
+
+static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_traverse_start_ext traverse;
+ struct ctdb_req_control request;
+ int ret = 0;
+ bool status;
+
+ status = ctdb_client_set_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ traverse = (struct ctdb_traverse_start_ext) {
+ .db_id = ctdb_db_id(state->db),
+ .reqid = 0,
+ .srvid = state->srvid,
+ .withemptyrecords = false,
+ };
+
+ ctdb_req_control_traverse_start_ext(&request, &traverse);
+ subreq = ctdb_client_control_send(state, state->ev, state->client,
+ state->destnode, state->timeout,
+ &request);
+ if (subreq == NULL) {
+ state->result = ENOMEM;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
+}
+
+static void ctdb_db_traverse_started(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_reply_control *reply;
+ int ret = 0;
+ bool status;
+
+ status = ctdb_client_control_recv(subreq, &ret, state, &reply);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+
+ ret = ctdb_reply_control_traverse_start_ext(reply);
+ talloc_free(reply);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
+ ret));
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+}
+
+static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
+ void *private_data)
+{
+ struct tevent_req *req = talloc_get_type_abort(
+ private_data, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct ctdb_rec_data *rec;
+ struct ctdb_ltdb_header header;
+ int ret;
+
+ ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
+ if (ret != 0) {
+ return;
+ }
+
+ if (rec->key.dsize == 0 && rec->data.dsize == 0) {
+ talloc_free(rec);
+ ctdb_db_traverse_remove_handler(req);
+ return;
+ }
+
+ ret = ctdb_ltdb_header_extract(&rec->data, &header);
+ if (ret != 0) {
+ talloc_free(rec);
+ return;
+ }
+
+ if (rec->data.dsize == 0) {
+ talloc_free(rec);
+ return;
+ }
+
+ ret = state->parser(rec->reqid, &header, rec->key, rec->data,
+ state->private_data);
+ talloc_free(rec);
+ if (ret != 0) {
+ state->result = ret;
+ ctdb_db_traverse_remove_handler(req);
+ }
+}
+
+static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
+{
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ struct tevent_req *subreq;
+
+ subreq = ctdb_client_remove_message_handler_send(state, state->ev,
+ state->client,
+ state->srvid, req);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
+}
+
+static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct ctdb_db_traverse_state *state = tevent_req_data(
+ req, struct ctdb_db_traverse_state);
+ int ret;
+ bool status;
+
+ status = ctdb_client_remove_message_handler_recv(subreq, &ret);
+ TALLOC_FREE(subreq);
+ if (! status) {
+ tevent_req_error(req, ret);
+ return;
+ }
+
+ if (state->result != 0) {
+ tevent_req_error(req, state->result);
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
+{
+ int ret;
+
+ if (tevent_req_is_unix_error(req, &ret)) {
+ if (perr != NULL) {
+ *perr = ret;
+ }
+ return false;
+ }
+
+ return true;
+}
+
+int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct ctdb_client_context *client,
+ struct ctdb_db_context *db,
+ uint32_t destnode, struct timeval timeout,
+ ctdb_rec_parser_func_t parser, void *private_data)
+{
+ struct tevent_req *req;
+ int ret = 0;
+ bool status;
+
+ req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
+ timeout, parser, private_data);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+
+ tevent_req_poll(req, ev);
+
+ status = ctdb_db_traverse_recv(req, &ret);
+ if (! status) {
+ return ret;
+ }
+
+ return 0;
+}
+
int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
struct ctdb_ltdb_header *header,
TALLOC_CTX *mem_ctx, TDB_DATA *data)