4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "lib/tdb_wrap/tdb_wrap.h"
29 #include "lib/util/tevent_unix.h"
30 #include "lib/util/dlinklist.h"
31 #include "lib/util/debug.h"
32 #include "ctdb_logging.h"
34 #include "protocol/protocol.h"
35 #include "protocol/protocol_api.h"
36 #include "client/client_private.h"
37 #include "client/client.h"
39 static struct ctdb_db_context *client_db_handle(
40 struct ctdb_client_context *client,
43 struct ctdb_db_context *db;
45 for (db = client->db; db != NULL; db = db->next) {
46 if (strcmp(db_name, db->db_name) == 0) {
54 struct ctdb_set_db_flags_state {
55 struct tevent_context *ev;
56 struct ctdb_client_context *client;
57 struct timeval timeout;
60 bool readonly_done, sticky_done;
65 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
66 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
69 static struct tevent_req *ctdb_set_db_flags_send(
71 struct tevent_context *ev,
72 struct ctdb_client_context *client,
73 uint32_t destnode, struct timeval timeout,
74 uint32_t db_id, uint8_t db_flags)
76 struct tevent_req *req, *subreq;
77 struct ctdb_set_db_flags_state *state;
78 struct ctdb_req_control request;
80 req = tevent_req_create(mem_ctx, &state,
81 struct ctdb_set_db_flags_state);
86 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
88 return tevent_req_post(req, ev);
92 state->client = client;
93 state->timeout = timeout;
95 state->db_flags = db_flags;
97 ctdb_req_control_get_nodemap(&request);
98 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
100 if (tevent_req_nomem(subreq, req)) {
101 return tevent_req_post(req, ev);
103 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
108 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
110 struct tevent_req *req = tevent_req_callback_data(
111 subreq, struct tevent_req);
112 struct ctdb_set_db_flags_state *state = tevent_req_data(
113 req, struct ctdb_set_db_flags_state);
114 struct ctdb_req_control request;
115 struct ctdb_reply_control *reply;
116 struct ctdb_node_map *nodemap;
120 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
123 tevent_req_error(req, ret);
127 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
130 tevent_req_error(req, ret);
134 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
135 state, &state->pnn_list);
136 talloc_free(nodemap);
137 if (state->count <= 0) {
138 tevent_req_error(req, ENOMEM);
142 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
143 ctdb_req_control_set_db_readonly(&request, state->db_id);
144 subreq = ctdb_client_control_multi_send(
145 state, state->ev, state->client,
146 state->pnn_list, state->count,
147 state->timeout, &request);
148 if (tevent_req_nomem(subreq, req)) {
151 tevent_req_set_callback(subreq,
152 ctdb_set_db_flags_readonly_done, req);
154 state->readonly_done = true;
157 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
158 ctdb_req_control_set_db_sticky(&request, state->db_id);
159 subreq = ctdb_client_control_multi_send(
160 state, state->ev, state->client,
161 state->pnn_list, state->count,
162 state->timeout, &request);
163 if (tevent_req_nomem(subreq, req)) {
166 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
169 state->sticky_done = true;
173 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
175 struct tevent_req *req = tevent_req_callback_data(
176 subreq, struct tevent_req);
177 struct ctdb_set_db_flags_state *state = tevent_req_data(
178 req, struct ctdb_set_db_flags_state);
182 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
186 tevent_req_error(req, ret);
190 state->readonly_done = true;
192 if (state->readonly_done && state->sticky_done) {
193 tevent_req_done(req);
197 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
199 struct tevent_req *req = tevent_req_callback_data(
200 subreq, struct tevent_req);
201 struct ctdb_set_db_flags_state *state = tevent_req_data(
202 req, struct ctdb_set_db_flags_state);
206 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
210 tevent_req_error(req, ret);
214 state->sticky_done = true;
216 if (state->readonly_done && state->sticky_done) {
217 tevent_req_done(req);
221 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
225 if (tevent_req_is_unix_error(req, &err)) {
234 struct ctdb_attach_state {
235 struct tevent_context *ev;
236 struct ctdb_client_context *client;
237 struct timeval timeout;
241 struct ctdb_db_context *db;
244 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
245 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
247 static void ctdb_attach_health_done(struct tevent_req *subreq);
248 static void ctdb_attach_flags_done(struct tevent_req *subreq);
250 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
251 struct tevent_context *ev,
252 struct ctdb_client_context *client,
253 struct timeval timeout,
254 const char *db_name, uint8_t db_flags)
256 struct tevent_req *req, *subreq;
257 struct ctdb_attach_state *state;
258 struct ctdb_req_control request;
260 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
265 state->db = client_db_handle(client, db_name);
266 if (state->db != NULL) {
267 tevent_req_done(req);
268 return tevent_req_post(req, ev);
272 state->client = client;
273 state->timeout = timeout;
274 state->destnode = ctdb_client_pnn(client);
275 state->db_flags = db_flags;
277 state->db = talloc_zero(client, struct ctdb_db_context);
278 if (tevent_req_nomem(state->db, req)) {
279 return tevent_req_post(req, ev);
282 state->db->db_name = talloc_strdup(state->db, db_name);
283 if (tevent_req_nomem(state->db, req)) {
284 return tevent_req_post(req, ev);
287 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
288 state->db->persistent = true;
291 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
292 subreq = ctdb_client_control_send(state, ev, client,
293 ctdb_client_pnn(client), timeout,
295 if (tevent_req_nomem(subreq, req)) {
296 return tevent_req_post(req, ev);
298 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
303 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
305 struct tevent_req *req = tevent_req_callback_data(
306 subreq, struct tevent_req);
307 struct ctdb_attach_state *state = tevent_req_data(
308 req, struct ctdb_attach_state);
309 struct ctdb_reply_control *reply;
310 struct ctdb_req_control request;
311 uint32_t mutex_enabled;
315 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
318 tevent_req_error(req, ret);
322 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
324 /* Treat error as mutex support not available */
328 state->tdb_flags = TDB_DEFAULT;
329 if (! state->db->persistent) {
330 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
333 if (mutex_enabled == 1) {
334 state->tdb_flags |= TDB_MUTEX_LOCKING;
337 if (state->db->persistent) {
338 ctdb_req_control_db_attach_persistent(&request,
342 ctdb_req_control_db_attach(&request, state->db->db_name,
346 subreq = ctdb_client_control_send(state, state->ev, state->client,
347 state->destnode, state->timeout,
349 if (tevent_req_nomem(subreq, req)) {
352 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
355 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
357 struct tevent_req *req = tevent_req_callback_data(
358 subreq, struct tevent_req);
359 struct ctdb_attach_state *state = tevent_req_data(
360 req, struct ctdb_attach_state);
361 struct ctdb_req_control request;
362 struct ctdb_reply_control *reply;
366 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
369 tevent_req_error(req, ret);
373 if (state->db->persistent) {
374 ret = ctdb_reply_control_db_attach_persistent(
375 reply, &state->db->db_id);
377 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
381 tevent_req_error(req, ret);
385 ctdb_req_control_getdbpath(&request, state->db->db_id);
386 subreq = ctdb_client_control_send(state, state->ev, state->client,
387 state->destnode, state->timeout,
389 if (tevent_req_nomem(subreq, req)) {
392 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
395 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
397 struct tevent_req *req = tevent_req_callback_data(
398 subreq, struct tevent_req);
399 struct ctdb_attach_state *state = tevent_req_data(
400 req, struct ctdb_attach_state);
401 struct ctdb_reply_control *reply;
402 struct ctdb_req_control request;
406 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
409 tevent_req_error(req, ret);
413 ret = ctdb_reply_control_getdbpath(reply, state->db,
414 &state->db->db_path);
417 tevent_req_error(req, ret);
421 ctdb_req_control_db_get_health(&request, state->db->db_id);
422 subreq = ctdb_client_control_send(state, state->ev, state->client,
423 state->destnode, state->timeout,
425 if (tevent_req_nomem(subreq, req)) {
428 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
431 static void ctdb_attach_health_done(struct tevent_req *subreq)
433 struct tevent_req *req = tevent_req_callback_data(
434 subreq, struct tevent_req);
435 struct ctdb_attach_state *state = tevent_req_data(
436 req, struct ctdb_attach_state);
437 struct ctdb_reply_control *reply;
442 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
445 tevent_req_error(req, ret);
449 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
451 tevent_req_error(req, ret);
455 if (reason != NULL) {
456 /* Database unhealthy, avoid attach */
457 /* FIXME: Log here */
458 tevent_req_error(req, EIO);
462 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
463 state->destnode, state->timeout,
464 state->db->db_id, state->db_flags);
465 if (tevent_req_nomem(subreq, req)) {
468 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
471 static void ctdb_attach_flags_done(struct tevent_req *subreq)
473 struct tevent_req *req = tevent_req_callback_data(
474 subreq, struct tevent_req);
475 struct ctdb_attach_state *state = tevent_req_data(
476 req, struct ctdb_attach_state);
480 status = ctdb_set_db_flags_recv(subreq, &ret);
483 tevent_req_error(req, ret);
487 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
488 state->tdb_flags, O_RDWR, 0);
489 if (tevent_req_nomem(state->db->ltdb, req)) {
492 DLIST_ADD(state->client->db, state->db);
494 tevent_req_done(req);
497 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
498 struct ctdb_db_context **out)
500 struct ctdb_attach_state *state = tevent_req_data(
501 req, struct ctdb_attach_state);
504 if (tevent_req_is_unix_error(req, &err)) {
517 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
518 struct ctdb_client_context *client,
519 struct timeval timeout,
520 const char *db_name, uint8_t db_flags,
521 struct ctdb_db_context **out)
523 struct tevent_req *req;
527 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
533 tevent_req_poll(req, ev);
535 status = ctdb_attach_recv(req, &ret, out);
541 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
542 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
543 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
549 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
550 struct ctdb_client_context *client,
551 struct timeval timeout, uint32_t db_id)
553 struct ctdb_db_context *db;
556 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
562 for (db = client->db; db != NULL; db = db->next) {
563 if (db->db_id == db_id) {
564 DLIST_REMOVE(client->db, db);
572 uint32_t ctdb_db_id(struct ctdb_db_context *db)
577 struct ctdb_db_traverse_state {
578 ctdb_rec_parser_func_t parser;
584 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
585 TDB_DATA data, void *private_data)
587 struct ctdb_db_traverse_state *state =
588 (struct ctdb_db_traverse_state *)private_data;
591 if (state->extract_header) {
592 struct ctdb_ltdb_header header;
595 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
601 len = ctdb_ltdb_header_len(&header);
606 ret = state->parser(0, &header, key, data, state->private_data);
608 ret = state->parser(0, NULL, key, data, state->private_data);
619 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
621 ctdb_rec_parser_func_t parser, void *private_data)
623 struct ctdb_db_traverse_state state;
626 state.parser = parser;
627 state.private_data = private_data;
628 state.extract_header = extract_header;
632 ret = tdb_traverse_read(db->ltdb->tdb,
633 ctdb_db_traverse_handler, &state);
635 ret = tdb_traverse(db->ltdb->tdb,
636 ctdb_db_traverse_handler, &state);
646 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
647 struct ctdb_ltdb_header *header,
648 TALLOC_CTX *mem_ctx, TDB_DATA *data)
653 rec = tdb_fetch(db->ltdb->tdb, key);
654 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
655 /* No record present */
656 if (rec.dptr != NULL) {
660 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
665 header->dmaster = CTDB_UNKNOWN_PNN;
674 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
681 size_t offset = ctdb_ltdb_header_len(header);
683 data->dsize = rec.dsize - offset;
684 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
686 if (data->dptr == NULL) {
696 * Fetch a record from volatile database
699 * 1. Get a lock on the hash chain
700 * 2. If the record does not exist, migrate the record
701 * 3. If readonly=true and delegations do not exist, migrate the record.
702 * 4. If readonly=false and delegations exist, migrate the record.
703 * 5. If the local node is not dmaster, migrate the record.
707 struct ctdb_fetch_lock_state {
708 struct tevent_context *ev;
709 struct ctdb_client_context *client;
710 struct ctdb_record_handle *h;
715 static int ctdb_fetch_lock_check(struct tevent_req *req);
716 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
717 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
719 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
720 struct tevent_context *ev,
721 struct ctdb_client_context *client,
722 struct ctdb_db_context *db,
723 TDB_DATA key, bool readonly)
725 struct ctdb_fetch_lock_state *state;
726 struct tevent_req *req;
729 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
735 state->client = client;
737 state->h = talloc_zero(db, struct ctdb_record_handle);
738 if (tevent_req_nomem(state->h, req)) {
739 return tevent_req_post(req, ev);
741 state->h->client = client;
743 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
744 if (tevent_req_nomem(state->h->key.dptr, req)) {
745 return tevent_req_post(req, ev);
747 state->h->key.dsize = key.dsize;
748 state->h->readonly = false;
750 state->readonly = readonly;
751 state->pnn = ctdb_client_pnn(client);
753 /* Check that database is not persistent */
754 if (db->persistent) {
755 tevent_req_error(req, EINVAL);
756 return tevent_req_post(req, ev);
759 ret = ctdb_fetch_lock_check(req);
761 tevent_req_done(req);
762 return tevent_req_post(req, ev);
765 tevent_req_error(req, ret);
766 return tevent_req_post(req, ev);
771 static int ctdb_fetch_lock_check(struct tevent_req *req)
773 struct ctdb_fetch_lock_state *state = tevent_req_data(
774 req, struct ctdb_fetch_lock_state);
775 struct ctdb_record_handle *h = state->h;
776 struct ctdb_ltdb_header header;
777 TDB_DATA data = tdb_null;
779 bool do_migrate = false;
781 ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
787 data = tdb_fetch(h->db->ltdb->tdb, h->key);
788 if (data.dptr == NULL) {
789 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
798 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
804 if (! state->readonly) {
805 /* Read/write access */
806 if (header.dmaster == state->pnn &&
807 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
811 if (header.dmaster != state->pnn) {
815 /* Readonly access */
816 if (header.dmaster != state->pnn &&
817 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
818 CTDB_REC_RO_HAVE_DELEGATIONS))) {
823 /* We are the dmaster or readonly delegation */
826 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
827 CTDB_REC_RO_HAVE_DELEGATIONS)) {
837 if (data.dptr != NULL) {
840 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
842 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
848 ctdb_fetch_lock_migrate(req);
853 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
855 struct ctdb_fetch_lock_state *state = tevent_req_data(
856 req, struct ctdb_fetch_lock_state);
857 struct ctdb_req_call request;
858 struct tevent_req *subreq;
860 ZERO_STRUCT(request);
861 request.flags = CTDB_IMMEDIATE_MIGRATION;
862 if (state->readonly) {
863 request.flags |= CTDB_WANT_READONLY;
865 request.db_id = state->h->db->db_id;
866 request.callid = CTDB_NULL_FUNC;
867 request.key = state->h->key;
869 subreq = ctdb_client_call_send(state, state->ev, state->client,
871 if (tevent_req_nomem(subreq, req)) {
875 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
878 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
880 struct tevent_req *req = tevent_req_callback_data(
881 subreq, struct tevent_req);
882 struct ctdb_fetch_lock_state *state = tevent_req_data(
883 req, struct ctdb_fetch_lock_state);
884 struct ctdb_reply_call *reply;
888 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
891 tevent_req_error(req, ret);
895 if (reply->status != 0) {
896 tevent_req_error(req, EIO);
901 ret = ctdb_fetch_lock_check(req);
903 tevent_req_error(req, ret);
907 tevent_req_done(req);
910 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
912 tdb_chainunlock(h->db->ltdb->tdb, h->key);
917 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
918 struct ctdb_ltdb_header *header,
920 TDB_DATA *data, int *perr)
922 struct ctdb_fetch_lock_state *state = tevent_req_data(
923 req, struct ctdb_fetch_lock_state);
924 struct ctdb_record_handle *h = state->h;
927 if (tevent_req_is_unix_error(req, &err)) {
934 if (header != NULL) {
940 offset = ctdb_ltdb_header_len(&h->header);
942 data->dsize = h->data.dsize - offset;
943 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
945 if (data->dptr == NULL) {
946 TALLOC_FREE(state->h);
954 talloc_set_destructor(h, ctdb_record_handle_destructor);
958 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
959 struct ctdb_client_context *client,
960 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
961 struct ctdb_record_handle **out,
962 struct ctdb_ltdb_header *header, TDB_DATA *data)
964 struct tevent_req *req;
965 struct ctdb_record_handle *h;
968 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
973 tevent_req_poll(req, ev);
975 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
984 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
990 /* Cannot modify the record if it was obtained as a readonly copy */
995 /* Check if the new data is same */
996 if (h->data.dsize == data.dsize &&
997 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
998 /* No need to do anything */
1002 offset = ctdb_ltdb_header_len(&h->header);
1003 rec.dsize = offset + data.dsize;
1004 rec.dptr = talloc_size(h, rec.dsize);
1005 if (rec.dptr == NULL) {
1009 ctdb_ltdb_header_push(&h->header, rec.dptr);
1010 memcpy(rec.dptr + offset, data.dptr, data.dsize);
1012 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1014 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1019 talloc_free(rec.dptr);
1023 int ctdb_delete_record(struct ctdb_record_handle *h)
1026 struct ctdb_key_data key;
1029 /* Cannot delete the record if it was obtained as a readonly copy */
1034 rec.dsize = ctdb_ltdb_header_len(&h->header);
1035 rec.dptr = talloc_size(h, rec.dsize);
1036 if (rec.dptr == NULL) {
1040 ctdb_ltdb_header_push(&h->header, rec.dptr);
1042 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1043 talloc_free(rec.dptr);
1045 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1050 key.db_id = h->db->db_id;
1051 key.header = h->header;
1054 ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1056 tevent_timeval_zero(), &key);
1058 DEBUG(DEBUG_WARNING,
1059 ("Failed to mark record to be deleted in DB %s\n",
1068 * Global lock functions
1071 struct ctdb_g_lock_lock_state {
1072 struct tevent_context *ev;
1073 struct ctdb_client_context *client;
1074 struct ctdb_db_context *db;
1076 struct ctdb_server_id my_sid;
1077 enum ctdb_g_lock_type lock_type;
1078 struct ctdb_record_handle *h;
1079 /* state for verification of active locks */
1080 struct ctdb_g_lock_list *lock_list;
1081 unsigned int current;
1084 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1085 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1086 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1087 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1088 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1090 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1091 enum ctdb_g_lock_type l2)
1093 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1099 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1100 struct tevent_context *ev,
1101 struct ctdb_client_context *client,
1102 struct ctdb_db_context *db,
1103 const char *keyname,
1104 struct ctdb_server_id *sid,
1107 struct tevent_req *req, *subreq;
1108 struct ctdb_g_lock_lock_state *state;
1110 req = tevent_req_create(mem_ctx, &state,
1111 struct ctdb_g_lock_lock_state);
1117 state->client = client;
1119 state->key.dptr = discard_const(keyname);
1120 state->key.dsize = strlen(keyname) + 1;
1121 state->my_sid = *sid;
1122 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1124 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1126 if (tevent_req_nomem(subreq, req)) {
1127 return tevent_req_post(req, ev);
1129 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1134 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1136 struct tevent_req *req = tevent_req_callback_data(
1137 subreq, struct tevent_req);
1138 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1139 req, struct ctdb_g_lock_lock_state);
1143 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1144 TALLOC_FREE(subreq);
1145 if (state->h == NULL) {
1146 tevent_req_error(req, ret);
1150 if (state->lock_list != NULL) {
1151 TALLOC_FREE(state->lock_list);
1155 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1157 talloc_free(data.dptr);
1159 tevent_req_error(req, ret);
1163 ctdb_g_lock_lock_process_locks(req);
1166 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1168 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1169 req, struct ctdb_g_lock_lock_state);
1170 struct tevent_req *subreq;
1171 struct ctdb_g_lock *lock;
1172 bool check_server = false;
1175 while (state->current < state->lock_list->num) {
1176 lock = &state->lock_list->lock[state->current];
1178 /* We should not ask for the same lock more than once */
1179 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1180 tevent_req_error(req, EDEADLK);
1184 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1185 check_server = true;
1189 state->current += 1;
1193 struct ctdb_req_control request;
1194 struct ctdb_uint64_array u64_array;
1197 u64_array.val = &lock->sid.unique_id;
1199 ctdb_req_control_check_srvids(&request, &u64_array);
1200 subreq = ctdb_client_control_send(state, state->ev,
1203 tevent_timeval_zero(),
1205 if (tevent_req_nomem(subreq, req)) {
1208 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1212 /* There is no conflict, add ourself to the lock_list */
1213 state->lock_list->lock = talloc_realloc(state->lock_list,
1214 state->lock_list->lock,
1216 state->lock_list->num + 1);
1217 if (state->lock_list->lock == NULL) {
1218 tevent_req_error(req, ENOMEM);
1222 lock = &state->lock_list->lock[state->lock_list->num];
1223 lock->type = state->lock_type;
1224 lock->sid = state->my_sid;
1225 state->lock_list->num += 1;
1227 ret = ctdb_g_lock_lock_update(req);
1229 tevent_req_error(req, ret);
1233 tevent_req_done(req);
1236 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1238 struct tevent_req *req = tevent_req_callback_data(
1239 subreq, struct tevent_req);
1240 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1241 req, struct ctdb_g_lock_lock_state);
1242 struct ctdb_reply_control *reply;
1243 struct ctdb_uint8_array *u8_array;
1248 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1249 TALLOC_FREE(subreq);
1251 tevent_req_error(req, ret);
1255 ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1257 tevent_req_error(req, ENOMEM);
1261 if (u8_array->num != 1) {
1262 talloc_free(u8_array);
1263 tevent_req_error(req, EIO);
1267 val = u8_array->val[0];
1268 talloc_free(u8_array);
1271 /* server process exists, need to retry */
1272 subreq = tevent_wakeup_send(state, state->ev,
1273 tevent_timeval_current_ofs(1,0));
1274 if (tevent_req_nomem(subreq, req)) {
1277 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1281 /* server process does not exist, remove conflicting entry */
1282 state->lock_list->lock[state->current] =
1283 state->lock_list->lock[state->lock_list->num-1];
1284 state->lock_list->num -= 1;
1286 ret = ctdb_g_lock_lock_update(req);
1288 tevent_req_error(req, ret);
1292 ctdb_g_lock_lock_process_locks(req);
1295 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1297 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1298 req, struct ctdb_g_lock_lock_state);
1302 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1303 data.dptr = talloc_size(state, data.dsize);
1304 if (data.dptr == NULL) {
1308 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1309 ret = ctdb_store_record(state->h, data);
1310 talloc_free(data.dptr);
1315 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1316 struct ctdb_g_lock_list *lock_list,
1317 struct ctdb_record_handle *h)
1319 struct ctdb_g_lock *lock;
1320 bool conflict = false;
1321 bool modified = false;
1324 for (i=0; i<lock_list->num; i++) {
1325 lock = &lock_list->lock[i];
1327 /* We should not ask for lock more than once */
1328 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1332 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1336 ret = ctdb_server_id_exists(state->client, &lock->sid,
1346 /* Server does not exist, delete conflicting entry */
1347 lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1348 lock_list->num -= 1;
1354 lock = talloc_realloc(lock_list, lock_list->lock,
1355 struct ctdb_g_lock, lock_list->num+1);
1360 lock[lock_list->num].type = state->lock_type;
1361 lock[lock_list->num].sid = state->my_sid;
1362 lock_list->lock = lock;
1363 lock_list->num += 1;
1370 data.dsize = ctdb_g_lock_list_len(lock_list);
1371 data.dptr = talloc_size(state, data.dsize);
1372 if (data.dptr == NULL) {
1376 ctdb_g_lock_list_push(lock_list, data.dptr);
1377 ret = ctdb_store_record(h, data);
1378 talloc_free(data.dptr);
1391 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1393 struct tevent_req *req = tevent_req_callback_data(
1394 subreq, struct tevent_req);
1395 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1396 req, struct ctdb_g_lock_lock_state);
1399 success = tevent_wakeup_recv(subreq);
1400 TALLOC_FREE(subreq);
1402 tevent_req_error(req, ENOMEM);
1406 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1407 state->db, state->key, false);
1408 if (tevent_req_nomem(subreq, req)) {
1411 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1414 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1416 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1417 req, struct ctdb_g_lock_lock_state);
1420 TALLOC_FREE(state->h);
1422 if (tevent_req_is_unix_error(req, &err)) {
1432 struct ctdb_g_lock_unlock_state {
1433 struct tevent_context *ev;
1434 struct ctdb_client_context *client;
1435 struct ctdb_db_context *db;
1437 struct ctdb_server_id my_sid;
1438 struct ctdb_record_handle *h;
1439 struct ctdb_g_lock_list *lock_list;
1442 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1443 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1445 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1446 struct tevent_context *ev,
1447 struct ctdb_client_context *client,
1448 struct ctdb_db_context *db,
1449 const char *keyname,
1450 struct ctdb_server_id sid)
1452 struct tevent_req *req, *subreq;
1453 struct ctdb_g_lock_unlock_state *state;
1455 req = tevent_req_create(mem_ctx, &state,
1456 struct ctdb_g_lock_unlock_state);
1462 state->client = client;
1464 state->key.dptr = discard_const(keyname);
1465 state->key.dsize = strlen(keyname) + 1;
1466 state->my_sid = sid;
1468 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1470 if (tevent_req_nomem(subreq, req)) {
1471 return tevent_req_post(req, ev);
1473 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1478 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1480 struct tevent_req *req = tevent_req_callback_data(
1481 subreq, struct tevent_req);
1482 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1483 req, struct ctdb_g_lock_unlock_state);
1487 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1488 TALLOC_FREE(subreq);
1489 if (state->h == NULL) {
1490 tevent_req_error(req, ret);
1494 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1497 tevent_req_error(req, ret);
1501 ret = ctdb_g_lock_unlock_update(req);
1503 tevent_req_error(req, ret);
1507 tevent_req_done(req);
1510 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1512 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1513 req, struct ctdb_g_lock_unlock_state);
1514 struct ctdb_g_lock *lock;
1517 for (i=0; i<state->lock_list->num; i++) {
1518 lock = &state->lock_list->lock[i];
1520 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1525 if (i < state->lock_list->num) {
1526 state->lock_list->lock[i] =
1527 state->lock_list->lock[state->lock_list->num-1];
1528 state->lock_list->num -= 1;
1531 if (state->lock_list->num == 0) {
1532 ctdb_delete_record(state->h);
1536 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1537 data.dptr = talloc_size(state, data.dsize);
1538 if (data.dptr == NULL) {
1542 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1543 ret = ctdb_store_record(state->h, data);
1544 talloc_free(data.dptr);
1553 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1555 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1556 req, struct ctdb_g_lock_unlock_state);
1559 TALLOC_FREE(state->h);
1561 if (tevent_req_is_unix_error(req, &err)) {
1572 * Persistent database functions
1574 struct ctdb_transaction_start_state {
1575 struct tevent_context *ev;
1576 struct ctdb_client_context *client;
1577 struct timeval timeout;
1578 struct ctdb_transaction_handle *h;
1582 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1583 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1584 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1585 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1587 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1588 struct tevent_context *ev,
1589 struct ctdb_client_context *client,
1590 struct timeval timeout,
1591 struct ctdb_db_context *db,
1594 struct ctdb_transaction_start_state *state;
1595 struct tevent_req *req, *subreq;
1596 struct ctdb_transaction_handle *h;
1598 req = tevent_req_create(mem_ctx, &state,
1599 struct ctdb_transaction_start_state);
1604 if (! db->persistent) {
1605 tevent_req_error(req, EINVAL);
1606 return tevent_req_post(req, ev);
1610 state->client = client;
1611 state->destnode = ctdb_client_pnn(client);
1613 h = talloc_zero(db, struct ctdb_transaction_handle);
1614 if (tevent_req_nomem(h, req)) {
1615 return tevent_req_post(req, ev);
1620 h->readonly = readonly;
1623 /* SRVID is unique for databases, so client can have transactions active
1624 * for multiple databases */
1625 h->sid.pid = getpid();
1626 h->sid.task_id = db->db_id;
1627 h->sid.vnn = state->destnode;
1628 h->sid.unique_id = h->sid.task_id;
1629 h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1631 h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1632 if (tevent_req_nomem(h->recbuf, req)) {
1633 return tevent_req_post(req, ev);
1636 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1637 if (tevent_req_nomem(h->lock_name, req)) {
1638 return tevent_req_post(req, ev);
1643 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1644 if (tevent_req_nomem(subreq, req)) {
1645 return tevent_req_post(req, ev);
1647 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1652 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1654 struct tevent_req *req = tevent_req_callback_data(
1655 subreq, struct tevent_req);
1656 struct ctdb_transaction_start_state *state = tevent_req_data(
1657 req, struct ctdb_transaction_start_state);
1658 struct ctdb_req_control request;
1662 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1663 TALLOC_FREE(subreq);
1665 tevent_req_error(req, ret);
1669 ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1670 subreq = ctdb_client_control_send(state, state->ev, state->client,
1671 state->destnode, state->timeout,
1673 if (tevent_req_nomem(subreq, req)) {
1676 tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1679 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1681 struct tevent_req *req = tevent_req_callback_data(
1682 subreq, struct tevent_req);
1683 struct ctdb_transaction_start_state *state = tevent_req_data(
1684 req, struct ctdb_transaction_start_state);
1685 struct ctdb_reply_control *reply;
1689 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1690 TALLOC_FREE(subreq);
1692 tevent_req_error(req, ret);
1696 ret = ctdb_reply_control_register_srvid(reply);
1699 tevent_req_error(req, ret);
1703 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1704 state->h->db_g_lock, state->h->lock_name,
1705 &state->h->sid, state->h->readonly);
1706 if (tevent_req_nomem(subreq, req)) {
1709 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1712 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1714 struct tevent_req *req = tevent_req_callback_data(
1715 subreq, struct tevent_req);
1719 status = ctdb_g_lock_lock_recv(subreq, &ret);
1720 TALLOC_FREE(subreq);
1722 tevent_req_error(req, ret);
1726 tevent_req_done(req);
1729 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1730 struct tevent_req *req,
1733 struct ctdb_transaction_start_state *state = tevent_req_data(
1734 req, struct ctdb_transaction_start_state);
1735 struct ctdb_transaction_handle *h = state->h;
1738 if (tevent_req_is_unix_error(req, &err)) {
1745 talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1749 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1753 ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1754 tevent_timeval_zero(),
1757 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1763 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1764 struct ctdb_client_context *client,
1765 struct timeval timeout,
1766 struct ctdb_db_context *db, bool readonly,
1767 struct ctdb_transaction_handle **out)
1769 struct tevent_req *req;
1770 struct ctdb_transaction_handle *h;
1773 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1779 tevent_req_poll(req, ev);
1781 h = ctdb_transaction_start_recv(req, &ret);
1790 struct ctdb_transaction_record_fetch_state {
1792 struct ctdb_ltdb_header header;
1796 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1797 struct ctdb_ltdb_header *header,
1802 struct ctdb_transaction_record_fetch_state *state =
1803 (struct ctdb_transaction_record_fetch_state *)private_data;
1805 if (state->key.dsize == key.dsize &&
1806 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1808 state->header = *header;
1809 state->found = true;
1815 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1817 struct ctdb_ltdb_header *header,
1820 struct ctdb_transaction_record_fetch_state state;
1824 state.found = false;
1826 ret = ctdb_rec_buffer_traverse(h->recbuf,
1827 ctdb_transaction_record_fetch_traverse,
1834 if (header != NULL) {
1835 *header = state.header;
1846 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1848 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1851 struct ctdb_ltdb_header header;
1854 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1856 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1858 if (data->dptr == NULL) {
1861 data->dsize = tmp_data.dsize;
1865 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1870 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1878 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1879 TDB_DATA key, TDB_DATA data)
1881 TALLOC_CTX *tmp_ctx;
1882 struct ctdb_ltdb_header header;
1890 tmp_ctx = talloc_new(h);
1891 if (tmp_ctx == NULL) {
1895 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1897 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1903 if (old_data.dsize == data.dsize &&
1904 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1905 talloc_free(tmp_ctx);
1909 header.dmaster = ctdb_client_pnn(h->client);
1912 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1913 talloc_free(tmp_ctx);
1922 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1925 return ctdb_transaction_store_record(h, key, tdb_null);
1928 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1931 const char *keyname = CTDB_DB_SEQNUM_KEY;
1934 key.dptr = discard_const(keyname);
1935 key.dsize = strlen(keyname) + 1;
1937 data.dptr = (uint8_t *)&seqnum;
1938 data.dsize = sizeof(seqnum);
1940 return ctdb_transaction_store_record(h, key, data);
1943 struct ctdb_transaction_commit_state {
1944 struct tevent_context *ev;
1945 struct ctdb_transaction_handle *h;
1949 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1950 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1952 struct tevent_req *ctdb_transaction_commit_send(
1953 TALLOC_CTX *mem_ctx,
1954 struct tevent_context *ev,
1955 struct ctdb_transaction_handle *h)
1957 struct tevent_req *req, *subreq;
1958 struct ctdb_transaction_commit_state *state;
1961 req = tevent_req_create(mem_ctx, &state,
1962 struct ctdb_transaction_commit_state);
1970 ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1971 h->client->pnn, tevent_timeval_zero(),
1972 h->db->db_id, &state->seqnum);
1974 tevent_req_error(req, ret);
1975 return tevent_req_post(req, ev);
1978 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1980 tevent_req_error(req, ret);
1981 return tevent_req_post(req, ev);
1984 subreq = ctdb_recovery_wait_send(state, ev, h->client);
1985 if (tevent_req_nomem(subreq, req)) {
1986 return tevent_req_post(req, ev);
1988 tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1993 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1995 struct tevent_req *req = tevent_req_callback_data(
1996 subreq, struct tevent_req);
1997 struct ctdb_transaction_commit_state *state = tevent_req_data(
1998 req, struct ctdb_transaction_commit_state);
1999 struct ctdb_req_control request;
2003 status = ctdb_recovery_wait_recv(subreq, &ret);
2004 TALLOC_FREE(subreq);
2006 tevent_req_error(req, ret);
2010 ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2011 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2012 state->h->client->pnn,
2013 tevent_timeval_zero(), &request);
2014 if (tevent_req_nomem(subreq, req)) {
2017 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2020 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2022 struct tevent_req *req = tevent_req_callback_data(
2023 subreq, struct tevent_req);
2024 struct ctdb_transaction_commit_state *state = tevent_req_data(
2025 req, struct ctdb_transaction_commit_state);
2026 struct ctdb_reply_control *reply;
2031 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2032 TALLOC_FREE(subreq);
2034 tevent_req_error(req, ret);
2038 ret = ctdb_reply_control_trans3_commit(reply);
2040 /* Control failed due to recovery */
2041 subreq = ctdb_recovery_wait_send(state, state->ev,
2043 if (tevent_req_nomem(subreq, req)) {
2046 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2051 ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2052 state->h->client->pnn,
2053 tevent_timeval_zero(),
2054 state->h->db->db_id, &seqnum);
2056 tevent_req_error(req, ret);
2060 if (seqnum == state->seqnum) {
2061 subreq = ctdb_recovery_wait_send(state, state->ev,
2063 if (tevent_req_nomem(subreq, req)) {
2066 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2071 if (seqnum != state->seqnum + 1) {
2072 tevent_req_error(req, EIO);
2076 tevent_req_done(req);
2079 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2081 struct ctdb_transaction_commit_state *state = tevent_req_data(
2082 req, struct ctdb_transaction_commit_state);
2085 if (tevent_req_is_unix_error(req, &err)) {
2089 TALLOC_FREE(state->h);
2093 TALLOC_FREE(state->h);
2097 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2099 struct tevent_req *req;
2103 if (h->readonly || ! h->updated) {
2108 req = ctdb_transaction_commit_send(h, h->ev, h);
2114 tevent_req_poll(req, h->ev);
2116 status = ctdb_transaction_commit_recv(req, &ret);
2126 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2135 * In future Samba should register SERVER_ID.
2136 * Make that structure same as struct srvid {}.