4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
124 tevent_req_error(req, ret);
128 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
131 tevent_req_error(req, ret);
135 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136 state, &state->pnn_list);
137 talloc_free(nodemap);
138 if (state->count <= 0) {
139 tevent_req_error(req, ENOMEM);
143 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144 ctdb_req_control_set_db_readonly(&request, state->db_id);
145 subreq = ctdb_client_control_multi_send(
146 state, state->ev, state->client,
147 state->pnn_list, state->count,
148 state->timeout, &request);
149 if (tevent_req_nomem(subreq, req)) {
152 tevent_req_set_callback(subreq,
153 ctdb_set_db_flags_readonly_done, req);
155 state->readonly_done = true;
158 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159 ctdb_req_control_set_db_sticky(&request, state->db_id);
160 subreq = ctdb_client_control_multi_send(
161 state, state->ev, state->client,
162 state->pnn_list, state->count,
163 state->timeout, &request);
164 if (tevent_req_nomem(subreq, req)) {
167 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
170 state->sticky_done = true;
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
176 struct tevent_req *req = tevent_req_callback_data(
177 subreq, struct tevent_req);
178 struct ctdb_set_db_flags_state *state = tevent_req_data(
179 req, struct ctdb_set_db_flags_state);
183 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
187 tevent_req_error(req, ret);
191 state->readonly_done = true;
193 if (state->readonly_done && state->sticky_done) {
194 tevent_req_done(req);
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
200 struct tevent_req *req = tevent_req_callback_data(
201 subreq, struct tevent_req);
202 struct ctdb_set_db_flags_state *state = tevent_req_data(
203 req, struct ctdb_set_db_flags_state);
207 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
211 tevent_req_error(req, ret);
215 state->sticky_done = true;
217 if (state->readonly_done && state->sticky_done) {
218 tevent_req_done(req);
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
226 if (tevent_req_is_unix_error(req, &err)) {
235 struct ctdb_attach_state {
236 struct tevent_context *ev;
237 struct ctdb_client_context *client;
238 struct timeval timeout;
242 struct ctdb_db_context *db;
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252 struct tevent_context *ev,
253 struct ctdb_client_context *client,
254 struct timeval timeout,
255 const char *db_name, uint8_t db_flags)
257 struct tevent_req *req, *subreq;
258 struct ctdb_attach_state *state;
259 struct ctdb_req_control request;
261 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
266 state->db = client_db_handle(client, db_name);
267 if (state->db != NULL) {
268 tevent_req_done(req);
269 return tevent_req_post(req, ev);
273 state->client = client;
274 state->timeout = timeout;
275 state->destnode = ctdb_client_pnn(client);
276 state->db_flags = db_flags;
278 state->db = talloc_zero(client, struct ctdb_db_context);
279 if (tevent_req_nomem(state->db, req)) {
280 return tevent_req_post(req, ev);
283 state->db->db_name = talloc_strdup(state->db, db_name);
284 if (tevent_req_nomem(state->db, req)) {
285 return tevent_req_post(req, ev);
288 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289 state->db->persistent = true;
292 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293 subreq = ctdb_client_control_send(state, ev, client,
294 ctdb_client_pnn(client), timeout,
296 if (tevent_req_nomem(subreq, req)) {
297 return tevent_req_post(req, ev);
299 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
306 struct tevent_req *req = tevent_req_callback_data(
307 subreq, struct tevent_req);
308 struct ctdb_attach_state *state = tevent_req_data(
309 req, struct ctdb_attach_state);
310 struct ctdb_reply_control *reply;
311 struct ctdb_req_control request;
312 uint32_t mutex_enabled;
316 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
319 tevent_req_error(req, ret);
323 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
325 /* Treat error as mutex support not available */
329 state->tdb_flags = TDB_DEFAULT;
330 if (! state->db->persistent) {
331 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
334 if (mutex_enabled == 1) {
335 state->tdb_flags |= TDB_MUTEX_LOCKING;
338 if (state->db->persistent) {
339 ctdb_req_control_db_attach_persistent(&request,
343 ctdb_req_control_db_attach(&request, state->db->db_name,
347 subreq = ctdb_client_control_send(state, state->ev, state->client,
348 state->destnode, state->timeout,
350 if (tevent_req_nomem(subreq, req)) {
353 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
358 struct tevent_req *req = tevent_req_callback_data(
359 subreq, struct tevent_req);
360 struct ctdb_attach_state *state = tevent_req_data(
361 req, struct ctdb_attach_state);
362 struct ctdb_req_control request;
363 struct ctdb_reply_control *reply;
367 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
370 tevent_req_error(req, ret);
374 if (state->db->persistent) {
375 ret = ctdb_reply_control_db_attach_persistent(
376 reply, &state->db->db_id);
378 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
382 tevent_req_error(req, ret);
386 ctdb_req_control_getdbpath(&request, state->db->db_id);
387 subreq = ctdb_client_control_send(state, state->ev, state->client,
388 state->destnode, state->timeout,
390 if (tevent_req_nomem(subreq, req)) {
393 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
398 struct tevent_req *req = tevent_req_callback_data(
399 subreq, struct tevent_req);
400 struct ctdb_attach_state *state = tevent_req_data(
401 req, struct ctdb_attach_state);
402 struct ctdb_reply_control *reply;
403 struct ctdb_req_control request;
407 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
410 tevent_req_error(req, ret);
414 ret = ctdb_reply_control_getdbpath(reply, state->db,
415 &state->db->db_path);
418 tevent_req_error(req, ret);
422 ctdb_req_control_db_get_health(&request, state->db->db_id);
423 subreq = ctdb_client_control_send(state, state->ev, state->client,
424 state->destnode, state->timeout,
426 if (tevent_req_nomem(subreq, req)) {
429 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
434 struct tevent_req *req = tevent_req_callback_data(
435 subreq, struct tevent_req);
436 struct ctdb_attach_state *state = tevent_req_data(
437 req, struct ctdb_attach_state);
438 struct ctdb_reply_control *reply;
443 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
446 tevent_req_error(req, ret);
450 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
452 tevent_req_error(req, ret);
456 if (reason != NULL) {
457 /* Database unhealthy, avoid attach */
458 /* FIXME: Log here */
459 tevent_req_error(req, EIO);
463 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464 state->destnode, state->timeout,
465 state->db->db_id, state->db_flags);
466 if (tevent_req_nomem(subreq, req)) {
469 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
474 struct tevent_req *req = tevent_req_callback_data(
475 subreq, struct tevent_req);
476 struct ctdb_attach_state *state = tevent_req_data(
477 req, struct ctdb_attach_state);
481 status = ctdb_set_db_flags_recv(subreq, &ret);
484 tevent_req_error(req, ret);
488 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489 state->tdb_flags, O_RDWR, 0);
490 if (tevent_req_nomem(state->db->ltdb, req)) {
493 DLIST_ADD(state->client->db, state->db);
495 tevent_req_done(req);
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499 struct ctdb_db_context **out)
501 struct ctdb_attach_state *state = tevent_req_data(
502 req, struct ctdb_attach_state);
505 if (tevent_req_is_unix_error(req, &err)) {
518 int ctdb_attach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
519 struct ctdb_client_context *client,
520 struct timeval timeout,
521 const char *db_name, uint8_t db_flags,
522 struct ctdb_db_context **out)
524 struct tevent_req *req;
528 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
534 tevent_req_poll(req, ev);
536 status = ctdb_attach_recv(req, &ret, out);
542 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
543 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
544 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
550 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
551 struct ctdb_client_context *client,
552 struct timeval timeout, uint32_t db_id)
554 struct ctdb_db_context *db;
557 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
563 for (db = client->db; db != NULL; db = db->next) {
564 if (db->db_id == db_id) {
565 DLIST_REMOVE(client->db, db);
573 uint32_t ctdb_db_id(struct ctdb_db_context *db)
578 struct ctdb_db_traverse_state {
579 ctdb_rec_parser_func_t parser;
585 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
586 TDB_DATA data, void *private_data)
588 struct ctdb_db_traverse_state *state =
589 (struct ctdb_db_traverse_state *)private_data;
592 if (state->extract_header) {
593 struct ctdb_ltdb_header header;
596 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
602 len = ctdb_ltdb_header_len(&header);
607 ret = state->parser(0, &header, key, data, state->private_data);
609 ret = state->parser(0, NULL, key, data, state->private_data);
620 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
622 ctdb_rec_parser_func_t parser, void *private_data)
624 struct ctdb_db_traverse_state state;
627 state.parser = parser;
628 state.private_data = private_data;
629 state.extract_header = extract_header;
633 ret = tdb_traverse_read(db->ltdb->tdb,
634 ctdb_db_traverse_handler, &state);
636 ret = tdb_traverse(db->ltdb->tdb,
637 ctdb_db_traverse_handler, &state);
647 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
648 struct ctdb_ltdb_header *header,
649 TALLOC_CTX *mem_ctx, TDB_DATA *data)
654 rec = tdb_fetch(db->ltdb->tdb, key);
655 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
656 /* No record present */
657 if (rec.dptr != NULL) {
661 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
666 header->dmaster = CTDB_UNKNOWN_PNN;
675 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
682 size_t offset = ctdb_ltdb_header_len(header);
684 data->dsize = rec.dsize - offset;
685 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
687 if (data->dptr == NULL) {
697 * Fetch a record from volatile database
700 * 1. Get a lock on the hash chain
701 * 2. If the record does not exist, migrate the record
702 * 3. If readonly=true and delegations do not exist, migrate the record.
703 * 4. If readonly=false and delegations exist, migrate the record.
704 * 5. If the local node is not dmaster, migrate the record.
708 struct ctdb_fetch_lock_state {
709 struct tevent_context *ev;
710 struct ctdb_client_context *client;
711 struct ctdb_record_handle *h;
716 static int ctdb_fetch_lock_check(struct tevent_req *req);
717 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
718 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
720 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
721 struct tevent_context *ev,
722 struct ctdb_client_context *client,
723 struct ctdb_db_context *db,
724 TDB_DATA key, bool readonly)
726 struct ctdb_fetch_lock_state *state;
727 struct tevent_req *req;
730 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
736 state->client = client;
738 state->h = talloc_zero(db, struct ctdb_record_handle);
739 if (tevent_req_nomem(state->h, req)) {
740 return tevent_req_post(req, ev);
742 state->h->client = client;
744 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
745 if (tevent_req_nomem(state->h->key.dptr, req)) {
746 return tevent_req_post(req, ev);
748 state->h->key.dsize = key.dsize;
749 state->h->readonly = false;
751 state->readonly = readonly;
752 state->pnn = ctdb_client_pnn(client);
754 /* Check that database is not persistent */
755 if (db->persistent) {
756 tevent_req_error(req, EINVAL);
757 return tevent_req_post(req, ev);
760 ret = ctdb_fetch_lock_check(req);
762 tevent_req_done(req);
763 return tevent_req_post(req, ev);
766 tevent_req_error(req, ret);
767 return tevent_req_post(req, ev);
772 static int ctdb_fetch_lock_check(struct tevent_req *req)
774 struct ctdb_fetch_lock_state *state = tevent_req_data(
775 req, struct ctdb_fetch_lock_state);
776 struct ctdb_record_handle *h = state->h;
777 struct ctdb_ltdb_header header;
778 TDB_DATA data = tdb_null;
780 bool do_migrate = false;
782 ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
788 data = tdb_fetch(h->db->ltdb->tdb, h->key);
789 if (data.dptr == NULL) {
790 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
799 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
805 if (! state->readonly) {
806 /* Read/write access */
807 if (header.dmaster == state->pnn &&
808 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
812 if (header.dmaster != state->pnn) {
816 /* Readonly access */
817 if (header.dmaster != state->pnn &&
818 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
819 CTDB_REC_RO_HAVE_DELEGATIONS))) {
824 /* We are the dmaster or readonly delegation */
827 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
828 CTDB_REC_RO_HAVE_DELEGATIONS)) {
838 if (data.dptr != NULL) {
841 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
843 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
849 ctdb_fetch_lock_migrate(req);
854 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
856 struct ctdb_fetch_lock_state *state = tevent_req_data(
857 req, struct ctdb_fetch_lock_state);
858 struct ctdb_req_call request;
859 struct tevent_req *subreq;
861 ZERO_STRUCT(request);
862 request.flags = CTDB_IMMEDIATE_MIGRATION;
863 if (state->readonly) {
864 request.flags |= CTDB_WANT_READONLY;
866 request.db_id = state->h->db->db_id;
867 request.callid = CTDB_NULL_FUNC;
868 request.key = state->h->key;
870 subreq = ctdb_client_call_send(state, state->ev, state->client,
872 if (tevent_req_nomem(subreq, req)) {
876 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
879 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
881 struct tevent_req *req = tevent_req_callback_data(
882 subreq, struct tevent_req);
883 struct ctdb_fetch_lock_state *state = tevent_req_data(
884 req, struct ctdb_fetch_lock_state);
885 struct ctdb_reply_call *reply;
889 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
892 tevent_req_error(req, ret);
896 if (reply->status != 0) {
897 tevent_req_error(req, EIO);
902 ret = ctdb_fetch_lock_check(req);
904 tevent_req_error(req, ret);
908 tevent_req_done(req);
911 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
913 tdb_chainunlock(h->db->ltdb->tdb, h->key);
918 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
919 struct ctdb_ltdb_header *header,
921 TDB_DATA *data, int *perr)
923 struct ctdb_fetch_lock_state *state = tevent_req_data(
924 req, struct ctdb_fetch_lock_state);
925 struct ctdb_record_handle *h = state->h;
928 if (tevent_req_is_unix_error(req, &err)) {
935 if (header != NULL) {
941 offset = ctdb_ltdb_header_len(&h->header);
943 data->dsize = h->data.dsize - offset;
944 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
946 if (data->dptr == NULL) {
947 TALLOC_FREE(state->h);
955 talloc_set_destructor(h, ctdb_record_handle_destructor);
959 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
960 struct ctdb_client_context *client,
961 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
962 struct ctdb_record_handle **out,
963 struct ctdb_ltdb_header *header, TDB_DATA *data)
965 struct tevent_req *req;
966 struct ctdb_record_handle *h;
969 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
974 tevent_req_poll(req, ev);
976 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
985 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
991 /* Cannot modify the record if it was obtained as a readonly copy */
996 /* Check if the new data is same */
997 if (h->data.dsize == data.dsize &&
998 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
999 /* No need to do anything */
1003 offset = ctdb_ltdb_header_len(&h->header);
1004 rec.dsize = offset + data.dsize;
1005 rec.dptr = talloc_size(h, rec.dsize);
1006 if (rec.dptr == NULL) {
1010 ctdb_ltdb_header_push(&h->header, rec.dptr);
1011 memcpy(rec.dptr + offset, data.dptr, data.dsize);
1013 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1015 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1020 talloc_free(rec.dptr);
1024 int ctdb_delete_record(struct ctdb_record_handle *h)
1027 struct ctdb_key_data key;
1030 /* Cannot delete the record if it was obtained as a readonly copy */
1035 rec.dsize = ctdb_ltdb_header_len(&h->header);
1036 rec.dptr = talloc_size(h, rec.dsize);
1037 if (rec.dptr == NULL) {
1041 ctdb_ltdb_header_push(&h->header, rec.dptr);
1043 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1044 talloc_free(rec.dptr);
1046 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1051 key.db_id = h->db->db_id;
1052 key.header = h->header;
1055 ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1057 tevent_timeval_zero(), &key);
1059 DEBUG(DEBUG_WARNING,
1060 ("Failed to mark record to be deleted in DB %s\n",
1069 * Global lock functions
1072 struct ctdb_g_lock_lock_state {
1073 struct tevent_context *ev;
1074 struct ctdb_client_context *client;
1075 struct ctdb_db_context *db;
1077 struct ctdb_server_id my_sid;
1078 enum ctdb_g_lock_type lock_type;
1079 struct ctdb_record_handle *h;
1080 /* state for verification of active locks */
1081 struct ctdb_g_lock_list *lock_list;
1082 unsigned int current;
1085 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1086 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1087 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1088 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1089 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1091 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1092 enum ctdb_g_lock_type l2)
1094 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1100 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1101 struct tevent_context *ev,
1102 struct ctdb_client_context *client,
1103 struct ctdb_db_context *db,
1104 const char *keyname,
1105 struct ctdb_server_id *sid,
1108 struct tevent_req *req, *subreq;
1109 struct ctdb_g_lock_lock_state *state;
1111 req = tevent_req_create(mem_ctx, &state,
1112 struct ctdb_g_lock_lock_state);
1118 state->client = client;
1120 state->key.dptr = discard_const(keyname);
1121 state->key.dsize = strlen(keyname) + 1;
1122 state->my_sid = *sid;
1123 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1125 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1127 if (tevent_req_nomem(subreq, req)) {
1128 return tevent_req_post(req, ev);
1130 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1135 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1137 struct tevent_req *req = tevent_req_callback_data(
1138 subreq, struct tevent_req);
1139 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1140 req, struct ctdb_g_lock_lock_state);
1144 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1145 TALLOC_FREE(subreq);
1146 if (state->h == NULL) {
1147 tevent_req_error(req, ret);
1151 if (state->lock_list != NULL) {
1152 TALLOC_FREE(state->lock_list);
1156 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1158 talloc_free(data.dptr);
1160 tevent_req_error(req, ret);
1164 ctdb_g_lock_lock_process_locks(req);
1167 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1169 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1170 req, struct ctdb_g_lock_lock_state);
1171 struct tevent_req *subreq;
1172 struct ctdb_g_lock *lock;
1173 bool check_server = false;
1176 while (state->current < state->lock_list->num) {
1177 lock = &state->lock_list->lock[state->current];
1179 /* We should not ask for the same lock more than once */
1180 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1181 tevent_req_error(req, EDEADLK);
1185 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1186 check_server = true;
1190 state->current += 1;
1194 struct ctdb_req_control request;
1195 struct ctdb_uint64_array u64_array;
1198 u64_array.val = &lock->sid.unique_id;
1200 ctdb_req_control_check_srvids(&request, &u64_array);
1201 subreq = ctdb_client_control_send(state, state->ev,
1204 tevent_timeval_zero(),
1206 if (tevent_req_nomem(subreq, req)) {
1209 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1213 /* There is no conflict, add ourself to the lock_list */
1214 state->lock_list->lock = talloc_realloc(state->lock_list,
1215 state->lock_list->lock,
1217 state->lock_list->num + 1);
1218 if (state->lock_list->lock == NULL) {
1219 tevent_req_error(req, ENOMEM);
1223 lock = &state->lock_list->lock[state->lock_list->num];
1224 lock->type = state->lock_type;
1225 lock->sid = state->my_sid;
1226 state->lock_list->num += 1;
1228 ret = ctdb_g_lock_lock_update(req);
1230 tevent_req_error(req, ret);
1234 tevent_req_done(req);
1237 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1239 struct tevent_req *req = tevent_req_callback_data(
1240 subreq, struct tevent_req);
1241 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1242 req, struct ctdb_g_lock_lock_state);
1243 struct ctdb_reply_control *reply;
1244 struct ctdb_uint8_array *u8_array;
1249 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1250 TALLOC_FREE(subreq);
1252 tevent_req_error(req, ret);
1256 ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1258 tevent_req_error(req, ENOMEM);
1262 if (u8_array->num != 1) {
1263 talloc_free(u8_array);
1264 tevent_req_error(req, EIO);
1268 val = u8_array->val[0];
1269 talloc_free(u8_array);
1272 /* server process exists, need to retry */
1273 subreq = tevent_wakeup_send(state, state->ev,
1274 tevent_timeval_current_ofs(1,0));
1275 if (tevent_req_nomem(subreq, req)) {
1278 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1282 /* server process does not exist, remove conflicting entry */
1283 state->lock_list->lock[state->current] =
1284 state->lock_list->lock[state->lock_list->num-1];
1285 state->lock_list->num -= 1;
1287 ret = ctdb_g_lock_lock_update(req);
1289 tevent_req_error(req, ret);
1293 ctdb_g_lock_lock_process_locks(req);
1296 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1298 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1299 req, struct ctdb_g_lock_lock_state);
1303 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1304 data.dptr = talloc_size(state, data.dsize);
1305 if (data.dptr == NULL) {
1309 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1310 ret = ctdb_store_record(state->h, data);
1311 talloc_free(data.dptr);
1316 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1317 struct ctdb_g_lock_list *lock_list,
1318 struct ctdb_record_handle *h)
1320 struct ctdb_g_lock *lock;
1321 bool conflict = false;
1322 bool modified = false;
1325 for (i=0; i<lock_list->num; i++) {
1326 lock = &lock_list->lock[i];
1328 /* We should not ask for lock more than once */
1329 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1333 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1337 ret = ctdb_server_id_exists(state->client, &lock->sid,
1347 /* Server does not exist, delete conflicting entry */
1348 lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1349 lock_list->num -= 1;
1355 lock = talloc_realloc(lock_list, lock_list->lock,
1356 struct ctdb_g_lock, lock_list->num+1);
1361 lock[lock_list->num].type = state->lock_type;
1362 lock[lock_list->num].sid = state->my_sid;
1363 lock_list->lock = lock;
1364 lock_list->num += 1;
1371 data.dsize = ctdb_g_lock_list_len(lock_list);
1372 data.dptr = talloc_size(state, data.dsize);
1373 if (data.dptr == NULL) {
1377 ctdb_g_lock_list_push(lock_list, data.dptr);
1378 ret = ctdb_store_record(h, data);
1379 talloc_free(data.dptr);
1392 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1394 struct tevent_req *req = tevent_req_callback_data(
1395 subreq, struct tevent_req);
1396 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1397 req, struct ctdb_g_lock_lock_state);
1400 success = tevent_wakeup_recv(subreq);
1401 TALLOC_FREE(subreq);
1403 tevent_req_error(req, ENOMEM);
1407 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1408 state->db, state->key, false);
1409 if (tevent_req_nomem(subreq, req)) {
1412 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1415 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1417 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1418 req, struct ctdb_g_lock_lock_state);
1421 TALLOC_FREE(state->h);
1423 if (tevent_req_is_unix_error(req, &err)) {
1433 struct ctdb_g_lock_unlock_state {
1434 struct tevent_context *ev;
1435 struct ctdb_client_context *client;
1436 struct ctdb_db_context *db;
1438 struct ctdb_server_id my_sid;
1439 struct ctdb_record_handle *h;
1440 struct ctdb_g_lock_list *lock_list;
1443 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1444 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1446 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1447 struct tevent_context *ev,
1448 struct ctdb_client_context *client,
1449 struct ctdb_db_context *db,
1450 const char *keyname,
1451 struct ctdb_server_id sid)
1453 struct tevent_req *req, *subreq;
1454 struct ctdb_g_lock_unlock_state *state;
1456 req = tevent_req_create(mem_ctx, &state,
1457 struct ctdb_g_lock_unlock_state);
1463 state->client = client;
1465 state->key.dptr = discard_const(keyname);
1466 state->key.dsize = strlen(keyname) + 1;
1467 state->my_sid = sid;
1469 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1471 if (tevent_req_nomem(subreq, req)) {
1472 return tevent_req_post(req, ev);
1474 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1479 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1481 struct tevent_req *req = tevent_req_callback_data(
1482 subreq, struct tevent_req);
1483 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1484 req, struct ctdb_g_lock_unlock_state);
1488 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1489 TALLOC_FREE(subreq);
1490 if (state->h == NULL) {
1491 tevent_req_error(req, ret);
1495 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1498 tevent_req_error(req, ret);
1502 ret = ctdb_g_lock_unlock_update(req);
1504 tevent_req_error(req, ret);
1508 tevent_req_done(req);
1511 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1513 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1514 req, struct ctdb_g_lock_unlock_state);
1515 struct ctdb_g_lock *lock;
1518 for (i=0; i<state->lock_list->num; i++) {
1519 lock = &state->lock_list->lock[i];
1521 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1526 if (i < state->lock_list->num) {
1527 state->lock_list->lock[i] =
1528 state->lock_list->lock[state->lock_list->num-1];
1529 state->lock_list->num -= 1;
1532 if (state->lock_list->num == 0) {
1533 ctdb_delete_record(state->h);
1537 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1538 data.dptr = talloc_size(state, data.dsize);
1539 if (data.dptr == NULL) {
1543 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1544 ret = ctdb_store_record(state->h, data);
1545 talloc_free(data.dptr);
1554 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1556 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1557 req, struct ctdb_g_lock_unlock_state);
1560 TALLOC_FREE(state->h);
1562 if (tevent_req_is_unix_error(req, &err)) {
1573 * Persistent database functions
1575 struct ctdb_transaction_start_state {
1576 struct tevent_context *ev;
1577 struct ctdb_client_context *client;
1578 struct timeval timeout;
1579 struct ctdb_transaction_handle *h;
1583 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1584 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1585 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1586 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1588 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1589 struct tevent_context *ev,
1590 struct ctdb_client_context *client,
1591 struct timeval timeout,
1592 struct ctdb_db_context *db,
1595 struct ctdb_transaction_start_state *state;
1596 struct tevent_req *req, *subreq;
1597 struct ctdb_transaction_handle *h;
1599 req = tevent_req_create(mem_ctx, &state,
1600 struct ctdb_transaction_start_state);
1605 if (! db->persistent) {
1606 tevent_req_error(req, EINVAL);
1607 return tevent_req_post(req, ev);
1611 state->client = client;
1612 state->destnode = ctdb_client_pnn(client);
1614 h = talloc_zero(db, struct ctdb_transaction_handle);
1615 if (tevent_req_nomem(h, req)) {
1616 return tevent_req_post(req, ev);
1621 h->readonly = readonly;
1624 /* SRVID is unique for databases, so client can have transactions active
1625 * for multiple databases */
1626 h->sid.pid = getpid();
1627 h->sid.task_id = db->db_id;
1628 h->sid.vnn = state->destnode;
1629 h->sid.unique_id = h->sid.task_id;
1630 h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1632 h->recbuf = talloc_zero(h, struct ctdb_rec_buffer);
1633 if (tevent_req_nomem(h->recbuf, req)) {
1634 return tevent_req_post(req, ev);
1637 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1638 if (tevent_req_nomem(h->lock_name, req)) {
1639 return tevent_req_post(req, ev);
1644 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1645 if (tevent_req_nomem(subreq, req)) {
1646 return tevent_req_post(req, ev);
1648 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1653 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1655 struct tevent_req *req = tevent_req_callback_data(
1656 subreq, struct tevent_req);
1657 struct ctdb_transaction_start_state *state = tevent_req_data(
1658 req, struct ctdb_transaction_start_state);
1659 struct ctdb_req_control request;
1663 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1664 TALLOC_FREE(subreq);
1666 tevent_req_error(req, ret);
1670 ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1671 subreq = ctdb_client_control_send(state, state->ev, state->client,
1672 state->destnode, state->timeout,
1674 if (tevent_req_nomem(subreq, req)) {
1677 tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1680 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1682 struct tevent_req *req = tevent_req_callback_data(
1683 subreq, struct tevent_req);
1684 struct ctdb_transaction_start_state *state = tevent_req_data(
1685 req, struct ctdb_transaction_start_state);
1686 struct ctdb_reply_control *reply;
1690 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1691 TALLOC_FREE(subreq);
1693 tevent_req_error(req, ret);
1697 ret = ctdb_reply_control_register_srvid(reply);
1700 tevent_req_error(req, ret);
1704 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1705 state->h->db_g_lock, state->h->lock_name,
1706 &state->h->sid, state->h->readonly);
1707 if (tevent_req_nomem(subreq, req)) {
1710 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1713 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1715 struct tevent_req *req = tevent_req_callback_data(
1716 subreq, struct tevent_req);
1720 status = ctdb_g_lock_lock_recv(subreq, &ret);
1721 TALLOC_FREE(subreq);
1723 tevent_req_error(req, ret);
1727 tevent_req_done(req);
1730 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1731 struct tevent_req *req,
1734 struct ctdb_transaction_start_state *state = tevent_req_data(
1735 req, struct ctdb_transaction_start_state);
1736 struct ctdb_transaction_handle *h = state->h;
1739 if (tevent_req_is_unix_error(req, &err)) {
1746 talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1750 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1754 ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1755 tevent_timeval_zero(),
1758 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1764 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1765 struct ctdb_client_context *client,
1766 struct timeval timeout,
1767 struct ctdb_db_context *db, bool readonly,
1768 struct ctdb_transaction_handle **out)
1770 struct tevent_req *req;
1771 struct ctdb_transaction_handle *h;
1774 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1780 tevent_req_poll(req, ev);
1782 h = ctdb_transaction_start_recv(req, &ret);
1791 struct ctdb_transaction_record_fetch_state {
1793 struct ctdb_ltdb_header header;
1797 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1798 struct ctdb_ltdb_header *header,
1803 struct ctdb_transaction_record_fetch_state *state =
1804 (struct ctdb_transaction_record_fetch_state *)private_data;
1806 if (state->key.dsize == key.dsize &&
1807 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1809 state->header = *header;
1810 state->found = true;
1816 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1818 struct ctdb_ltdb_header *header,
1821 struct ctdb_transaction_record_fetch_state state;
1825 state.found = false;
1827 ret = ctdb_rec_buffer_traverse(h->recbuf,
1828 ctdb_transaction_record_fetch_traverse,
1835 if (header != NULL) {
1836 *header = state.header;
1847 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1849 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1852 struct ctdb_ltdb_header header;
1855 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1857 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1859 if (data->dptr == NULL) {
1862 data->dsize = tmp_data.dsize;
1866 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1871 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1879 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1880 TDB_DATA key, TDB_DATA data)
1882 TALLOC_CTX *tmp_ctx;
1883 struct ctdb_ltdb_header header;
1891 tmp_ctx = talloc_new(h);
1892 if (tmp_ctx == NULL) {
1896 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1898 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1904 if (old_data.dsize == data.dsize &&
1905 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1906 talloc_free(tmp_ctx);
1910 header.dmaster = ctdb_client_pnn(h->client);
1913 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1914 talloc_free(tmp_ctx);
1923 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1926 return ctdb_transaction_store_record(h, key, tdb_null);
1929 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1932 const char *keyname = CTDB_DB_SEQNUM_KEY;
1935 key.dptr = discard_const(keyname);
1936 key.dsize = strlen(keyname) + 1;
1938 data.dptr = (uint8_t *)&seqnum;
1939 data.dsize = sizeof(seqnum);
1941 return ctdb_transaction_store_record(h, key, data);
1944 struct ctdb_transaction_commit_state {
1945 struct tevent_context *ev;
1946 struct ctdb_transaction_handle *h;
1950 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1951 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1953 struct tevent_req *ctdb_transaction_commit_send(
1954 TALLOC_CTX *mem_ctx,
1955 struct tevent_context *ev,
1956 struct ctdb_transaction_handle *h)
1958 struct tevent_req *req, *subreq;
1959 struct ctdb_transaction_commit_state *state;
1962 req = tevent_req_create(mem_ctx, &state,
1963 struct ctdb_transaction_commit_state);
1971 ret = ctdb_ctrl_get_db_seqnum(state, ev, h->client,
1972 h->client->pnn, tevent_timeval_zero(),
1973 h->db->db_id, &state->seqnum);
1975 tevent_req_error(req, ret);
1976 return tevent_req_post(req, ev);
1979 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
1981 tevent_req_error(req, ret);
1982 return tevent_req_post(req, ev);
1985 subreq = ctdb_recovery_wait_send(state, ev, h->client);
1986 if (tevent_req_nomem(subreq, req)) {
1987 return tevent_req_post(req, ev);
1989 tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
1994 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
1996 struct tevent_req *req = tevent_req_callback_data(
1997 subreq, struct tevent_req);
1998 struct ctdb_transaction_commit_state *state = tevent_req_data(
1999 req, struct ctdb_transaction_commit_state);
2000 struct ctdb_req_control request;
2004 status = ctdb_recovery_wait_recv(subreq, &ret);
2005 TALLOC_FREE(subreq);
2007 tevent_req_error(req, ret);
2011 ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2012 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2013 state->h->client->pnn,
2014 tevent_timeval_zero(), &request);
2015 if (tevent_req_nomem(subreq, req)) {
2018 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2021 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2023 struct tevent_req *req = tevent_req_callback_data(
2024 subreq, struct tevent_req);
2025 struct ctdb_transaction_commit_state *state = tevent_req_data(
2026 req, struct ctdb_transaction_commit_state);
2027 struct ctdb_reply_control *reply;
2032 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2033 TALLOC_FREE(subreq);
2035 tevent_req_error(req, ret);
2039 ret = ctdb_reply_control_trans3_commit(reply);
2041 /* Control failed due to recovery */
2042 subreq = ctdb_recovery_wait_send(state, state->ev,
2044 if (tevent_req_nomem(subreq, req)) {
2047 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2052 ret = ctdb_ctrl_get_db_seqnum(state, state->ev, state->h->client,
2053 state->h->client->pnn,
2054 tevent_timeval_zero(),
2055 state->h->db->db_id, &seqnum);
2057 tevent_req_error(req, ret);
2061 if (seqnum == state->seqnum) {
2062 subreq = ctdb_recovery_wait_send(state, state->ev,
2064 if (tevent_req_nomem(subreq, req)) {
2067 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2072 if (seqnum != state->seqnum + 1) {
2073 tevent_req_error(req, EIO);
2077 tevent_req_done(req);
2080 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2082 struct ctdb_transaction_commit_state *state = tevent_req_data(
2083 req, struct ctdb_transaction_commit_state);
2086 if (tevent_req_is_unix_error(req, &err)) {
2090 TALLOC_FREE(state->h);
2094 TALLOC_FREE(state->h);
2098 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2100 struct tevent_req *req;
2104 if (h->readonly || ! h->updated) {
2109 req = ctdb_transaction_commit_send(h, h->ev, h);
2115 tevent_req_poll(req, h->ev);
2117 status = ctdb_transaction_commit_recv(req, &ret);
2127 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2136 * In future Samba should register SERVER_ID.
2137 * Make that structure same as struct srvid {}.