4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
124 tevent_req_error(req, ret);
128 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
131 tevent_req_error(req, ret);
135 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136 state, &state->pnn_list);
137 talloc_free(nodemap);
138 if (state->count <= 0) {
139 tevent_req_error(req, ENOMEM);
143 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144 ctdb_req_control_set_db_readonly(&request, state->db_id);
145 subreq = ctdb_client_control_multi_send(
146 state, state->ev, state->client,
147 state->pnn_list, state->count,
148 state->timeout, &request);
149 if (tevent_req_nomem(subreq, req)) {
152 tevent_req_set_callback(subreq,
153 ctdb_set_db_flags_readonly_done, req);
155 state->readonly_done = true;
158 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159 ctdb_req_control_set_db_sticky(&request, state->db_id);
160 subreq = ctdb_client_control_multi_send(
161 state, state->ev, state->client,
162 state->pnn_list, state->count,
163 state->timeout, &request);
164 if (tevent_req_nomem(subreq, req)) {
167 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
170 state->sticky_done = true;
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
176 struct tevent_req *req = tevent_req_callback_data(
177 subreq, struct tevent_req);
178 struct ctdb_set_db_flags_state *state = tevent_req_data(
179 req, struct ctdb_set_db_flags_state);
183 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
187 tevent_req_error(req, ret);
191 state->readonly_done = true;
193 if (state->readonly_done && state->sticky_done) {
194 tevent_req_done(req);
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
200 struct tevent_req *req = tevent_req_callback_data(
201 subreq, struct tevent_req);
202 struct ctdb_set_db_flags_state *state = tevent_req_data(
203 req, struct ctdb_set_db_flags_state);
207 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
211 tevent_req_error(req, ret);
215 state->sticky_done = true;
217 if (state->readonly_done && state->sticky_done) {
218 tevent_req_done(req);
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
226 if (tevent_req_is_unix_error(req, &err)) {
235 struct ctdb_attach_state {
236 struct tevent_context *ev;
237 struct ctdb_client_context *client;
238 struct timeval timeout;
242 struct ctdb_db_context *db;
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252 struct tevent_context *ev,
253 struct ctdb_client_context *client,
254 struct timeval timeout,
255 const char *db_name, uint8_t db_flags)
257 struct tevent_req *req, *subreq;
258 struct ctdb_attach_state *state;
259 struct ctdb_req_control request;
261 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
266 state->db = client_db_handle(client, db_name);
267 if (state->db != NULL) {
268 tevent_req_done(req);
269 return tevent_req_post(req, ev);
273 state->client = client;
274 state->timeout = timeout;
275 state->destnode = ctdb_client_pnn(client);
276 state->db_flags = db_flags;
278 state->db = talloc_zero(client, struct ctdb_db_context);
279 if (tevent_req_nomem(state->db, req)) {
280 return tevent_req_post(req, ev);
283 state->db->db_name = talloc_strdup(state->db, db_name);
284 if (tevent_req_nomem(state->db, req)) {
285 return tevent_req_post(req, ev);
288 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289 state->db->persistent = true;
292 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293 subreq = ctdb_client_control_send(state, ev, client,
294 ctdb_client_pnn(client), timeout,
296 if (tevent_req_nomem(subreq, req)) {
297 return tevent_req_post(req, ev);
299 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
306 struct tevent_req *req = tevent_req_callback_data(
307 subreq, struct tevent_req);
308 struct ctdb_attach_state *state = tevent_req_data(
309 req, struct ctdb_attach_state);
310 struct ctdb_reply_control *reply;
311 struct ctdb_req_control request;
312 uint32_t mutex_enabled;
316 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
319 tevent_req_error(req, ret);
323 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
325 /* Treat error as mutex support not available */
329 state->tdb_flags = TDB_DEFAULT;
330 if (! state->db->persistent) {
331 state->tdb_flags |= (TDB_INCOMPATIBLE_HASH |
334 if (mutex_enabled == 1) {
335 state->tdb_flags |= TDB_MUTEX_LOCKING;
338 if (state->db->persistent) {
339 ctdb_req_control_db_attach_persistent(&request,
343 ctdb_req_control_db_attach(&request, state->db->db_name,
347 subreq = ctdb_client_control_send(state, state->ev, state->client,
348 state->destnode, state->timeout,
350 if (tevent_req_nomem(subreq, req)) {
353 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
356 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
358 struct tevent_req *req = tevent_req_callback_data(
359 subreq, struct tevent_req);
360 struct ctdb_attach_state *state = tevent_req_data(
361 req, struct ctdb_attach_state);
362 struct ctdb_req_control request;
363 struct ctdb_reply_control *reply;
367 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
370 tevent_req_error(req, ret);
374 if (state->db->persistent) {
375 ret = ctdb_reply_control_db_attach_persistent(
376 reply, &state->db->db_id);
378 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
382 tevent_req_error(req, ret);
386 ctdb_req_control_getdbpath(&request, state->db->db_id);
387 subreq = ctdb_client_control_send(state, state->ev, state->client,
388 state->destnode, state->timeout,
390 if (tevent_req_nomem(subreq, req)) {
393 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
396 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
398 struct tevent_req *req = tevent_req_callback_data(
399 subreq, struct tevent_req);
400 struct ctdb_attach_state *state = tevent_req_data(
401 req, struct ctdb_attach_state);
402 struct ctdb_reply_control *reply;
403 struct ctdb_req_control request;
407 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
410 tevent_req_error(req, ret);
414 ret = ctdb_reply_control_getdbpath(reply, state->db,
415 &state->db->db_path);
418 tevent_req_error(req, ret);
422 ctdb_req_control_db_get_health(&request, state->db->db_id);
423 subreq = ctdb_client_control_send(state, state->ev, state->client,
424 state->destnode, state->timeout,
426 if (tevent_req_nomem(subreq, req)) {
429 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
432 static void ctdb_attach_health_done(struct tevent_req *subreq)
434 struct tevent_req *req = tevent_req_callback_data(
435 subreq, struct tevent_req);
436 struct ctdb_attach_state *state = tevent_req_data(
437 req, struct ctdb_attach_state);
438 struct ctdb_reply_control *reply;
443 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
446 tevent_req_error(req, ret);
450 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
452 tevent_req_error(req, ret);
456 if (reason != NULL) {
457 /* Database unhealthy, avoid attach */
458 /* FIXME: Log here */
459 tevent_req_error(req, EIO);
463 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
464 state->destnode, state->timeout,
465 state->db->db_id, state->db_flags);
466 if (tevent_req_nomem(subreq, req)) {
469 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
472 static void ctdb_attach_flags_done(struct tevent_req *subreq)
474 struct tevent_req *req = tevent_req_callback_data(
475 subreq, struct tevent_req);
476 struct ctdb_attach_state *state = tevent_req_data(
477 req, struct ctdb_attach_state);
481 status = ctdb_set_db_flags_recv(subreq, &ret);
484 tevent_req_error(req, ret);
488 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
489 state->tdb_flags, O_RDWR, 0);
490 if (tevent_req_nomem(state->db->ltdb, req)) {
493 DLIST_ADD(state->client->db, state->db);
495 tevent_req_done(req);
498 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
499 struct ctdb_db_context **out)
501 struct ctdb_attach_state *state = tevent_req_data(
502 req, struct ctdb_attach_state);
505 if (tevent_req_is_unix_error(req, &err)) {
518 int ctdb_attach(struct tevent_context *ev,
519 struct ctdb_client_context *client,
520 struct timeval timeout,
521 const char *db_name, uint8_t db_flags,
522 struct ctdb_db_context **out)
525 struct tevent_req *req;
529 mem_ctx = talloc_new(client);
530 if (mem_ctx == NULL) {
534 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
537 talloc_free(mem_ctx);
541 tevent_req_poll(req, ev);
543 status = ctdb_attach_recv(req, &ret, out);
545 talloc_free(mem_ctx);
550 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
551 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
552 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
555 talloc_free(mem_ctx);
559 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
560 struct ctdb_client_context *client,
561 struct timeval timeout, uint32_t db_id)
563 struct ctdb_db_context *db;
566 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
572 for (db = client->db; db != NULL; db = db->next) {
573 if (db->db_id == db_id) {
574 DLIST_REMOVE(client->db, db);
582 uint32_t ctdb_db_id(struct ctdb_db_context *db)
587 struct ctdb_db_traverse_state {
588 ctdb_rec_parser_func_t parser;
594 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
595 TDB_DATA data, void *private_data)
597 struct ctdb_db_traverse_state *state =
598 (struct ctdb_db_traverse_state *)private_data;
601 if (state->extract_header) {
602 struct ctdb_ltdb_header header;
604 ret = ctdb_ltdb_header_extract(&data, &header);
610 ret = state->parser(0, &header, key, data, state->private_data);
612 ret = state->parser(0, NULL, key, data, state->private_data);
623 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
625 ctdb_rec_parser_func_t parser, void *private_data)
627 struct ctdb_db_traverse_state state;
630 state.parser = parser;
631 state.private_data = private_data;
632 state.extract_header = extract_header;
636 ret = tdb_traverse_read(db->ltdb->tdb,
637 ctdb_db_traverse_handler, &state);
639 ret = tdb_traverse(db->ltdb->tdb,
640 ctdb_db_traverse_handler, &state);
650 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
651 struct ctdb_ltdb_header *header,
652 TALLOC_CTX *mem_ctx, TDB_DATA *data)
657 rec = tdb_fetch(db->ltdb->tdb, key);
658 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
659 /* No record present */
660 if (rec.dptr != NULL) {
664 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
669 header->dmaster = CTDB_UNKNOWN_PNN;
678 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
685 size_t offset = ctdb_ltdb_header_len(header);
687 data->dsize = rec.dsize - offset;
688 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
690 if (data->dptr == NULL) {
700 * Fetch a record from volatile database
703 * 1. Get a lock on the hash chain
704 * 2. If the record does not exist, migrate the record
705 * 3. If readonly=true and delegations do not exist, migrate the record.
706 * 4. If readonly=false and delegations exist, migrate the record.
707 * 5. If the local node is not dmaster, migrate the record.
711 struct ctdb_fetch_lock_state {
712 struct tevent_context *ev;
713 struct ctdb_client_context *client;
714 struct ctdb_record_handle *h;
719 static int ctdb_fetch_lock_check(struct tevent_req *req);
720 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
721 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
723 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
724 struct tevent_context *ev,
725 struct ctdb_client_context *client,
726 struct ctdb_db_context *db,
727 TDB_DATA key, bool readonly)
729 struct ctdb_fetch_lock_state *state;
730 struct tevent_req *req;
733 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
739 state->client = client;
741 state->h = talloc_zero(db, struct ctdb_record_handle);
742 if (tevent_req_nomem(state->h, req)) {
743 return tevent_req_post(req, ev);
745 state->h->client = client;
747 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
748 if (tevent_req_nomem(state->h->key.dptr, req)) {
749 return tevent_req_post(req, ev);
751 state->h->key.dsize = key.dsize;
752 state->h->readonly = false;
754 state->readonly = readonly;
755 state->pnn = ctdb_client_pnn(client);
757 /* Check that database is not persistent */
758 if (db->persistent) {
759 tevent_req_error(req, EINVAL);
760 return tevent_req_post(req, ev);
763 ret = ctdb_fetch_lock_check(req);
765 tevent_req_done(req);
766 return tevent_req_post(req, ev);
769 tevent_req_error(req, ret);
770 return tevent_req_post(req, ev);
775 static int ctdb_fetch_lock_check(struct tevent_req *req)
777 struct ctdb_fetch_lock_state *state = tevent_req_data(
778 req, struct ctdb_fetch_lock_state);
779 struct ctdb_record_handle *h = state->h;
780 struct ctdb_ltdb_header header;
781 TDB_DATA data = tdb_null;
783 bool do_migrate = false;
785 ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
791 data = tdb_fetch(h->db->ltdb->tdb, h->key);
792 if (data.dptr == NULL) {
793 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
802 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
808 if (! state->readonly) {
809 /* Read/write access */
810 if (header.dmaster == state->pnn &&
811 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
815 if (header.dmaster != state->pnn) {
819 /* Readonly access */
820 if (header.dmaster != state->pnn &&
821 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
822 CTDB_REC_RO_HAVE_DELEGATIONS))) {
827 /* We are the dmaster or readonly delegation */
830 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
831 CTDB_REC_RO_HAVE_DELEGATIONS)) {
841 if (data.dptr != NULL) {
844 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
846 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
852 ctdb_fetch_lock_migrate(req);
857 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
859 struct ctdb_fetch_lock_state *state = tevent_req_data(
860 req, struct ctdb_fetch_lock_state);
861 struct ctdb_req_call request;
862 struct tevent_req *subreq;
864 ZERO_STRUCT(request);
865 request.flags = CTDB_IMMEDIATE_MIGRATION;
866 if (state->readonly) {
867 request.flags |= CTDB_WANT_READONLY;
869 request.db_id = state->h->db->db_id;
870 request.callid = CTDB_NULL_FUNC;
871 request.key = state->h->key;
872 request.calldata = tdb_null;
874 subreq = ctdb_client_call_send(state, state->ev, state->client,
876 if (tevent_req_nomem(subreq, req)) {
880 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
883 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
885 struct tevent_req *req = tevent_req_callback_data(
886 subreq, struct tevent_req);
887 struct ctdb_fetch_lock_state *state = tevent_req_data(
888 req, struct ctdb_fetch_lock_state);
889 struct ctdb_reply_call *reply;
893 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
896 tevent_req_error(req, ret);
900 if (reply->status != 0) {
901 tevent_req_error(req, EIO);
906 ret = ctdb_fetch_lock_check(req);
909 tevent_req_error(req, ret);
914 tevent_req_done(req);
917 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
919 tdb_chainunlock(h->db->ltdb->tdb, h->key);
924 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
925 struct ctdb_ltdb_header *header,
927 TDB_DATA *data, int *perr)
929 struct ctdb_fetch_lock_state *state = tevent_req_data(
930 req, struct ctdb_fetch_lock_state);
931 struct ctdb_record_handle *h = state->h;
934 if (tevent_req_is_unix_error(req, &err)) {
941 if (header != NULL) {
947 offset = ctdb_ltdb_header_len(&h->header);
949 data->dsize = h->data.dsize - offset;
950 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
952 if (data->dptr == NULL) {
953 TALLOC_FREE(state->h);
961 talloc_set_destructor(h, ctdb_record_handle_destructor);
965 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
966 struct ctdb_client_context *client,
967 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
968 struct ctdb_record_handle **out,
969 struct ctdb_ltdb_header *header, TDB_DATA *data)
971 struct tevent_req *req;
972 struct ctdb_record_handle *h;
975 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
980 tevent_req_poll(req, ev);
982 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
991 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
997 /* Cannot modify the record if it was obtained as a readonly copy */
1002 /* Check if the new data is same */
1003 if (h->data.dsize == data.dsize &&
1004 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1005 /* No need to do anything */
1009 offset = ctdb_ltdb_header_len(&h->header);
1010 rec.dsize = offset + data.dsize;
1011 rec.dptr = talloc_size(h, rec.dsize);
1012 if (rec.dptr == NULL) {
1016 ctdb_ltdb_header_push(&h->header, rec.dptr);
1017 memcpy(rec.dptr + offset, data.dptr, data.dsize);
1019 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1021 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1026 talloc_free(rec.dptr);
1030 int ctdb_delete_record(struct ctdb_record_handle *h)
1033 struct ctdb_key_data key;
1036 /* Cannot delete the record if it was obtained as a readonly copy */
1041 rec.dsize = ctdb_ltdb_header_len(&h->header);
1042 rec.dptr = talloc_size(h, rec.dsize);
1043 if (rec.dptr == NULL) {
1047 ctdb_ltdb_header_push(&h->header, rec.dptr);
1049 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1050 talloc_free(rec.dptr);
1052 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1057 key.db_id = h->db->db_id;
1058 key.header = h->header;
1061 ret = ctdb_ctrl_schedule_for_deletion(h, h->ev, h->client,
1063 tevent_timeval_zero(), &key);
1065 DEBUG(DEBUG_WARNING,
1066 ("Failed to mark record to be deleted in DB %s\n",
1075 * Global lock functions
1078 struct ctdb_g_lock_lock_state {
1079 struct tevent_context *ev;
1080 struct ctdb_client_context *client;
1081 struct ctdb_db_context *db;
1083 struct ctdb_server_id my_sid;
1084 enum ctdb_g_lock_type lock_type;
1085 struct ctdb_record_handle *h;
1086 /* state for verification of active locks */
1087 struct ctdb_g_lock_list *lock_list;
1088 unsigned int current;
1091 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1092 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1093 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1094 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1095 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1097 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1098 enum ctdb_g_lock_type l2)
1100 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1106 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1107 struct tevent_context *ev,
1108 struct ctdb_client_context *client,
1109 struct ctdb_db_context *db,
1110 const char *keyname,
1111 struct ctdb_server_id *sid,
1114 struct tevent_req *req, *subreq;
1115 struct ctdb_g_lock_lock_state *state;
1117 req = tevent_req_create(mem_ctx, &state,
1118 struct ctdb_g_lock_lock_state);
1124 state->client = client;
1126 state->key.dptr = discard_const(keyname);
1127 state->key.dsize = strlen(keyname) + 1;
1128 state->my_sid = *sid;
1129 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1131 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1133 if (tevent_req_nomem(subreq, req)) {
1134 return tevent_req_post(req, ev);
1136 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1141 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1143 struct tevent_req *req = tevent_req_callback_data(
1144 subreq, struct tevent_req);
1145 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1146 req, struct ctdb_g_lock_lock_state);
1150 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1151 TALLOC_FREE(subreq);
1152 if (state->h == NULL) {
1153 tevent_req_error(req, ret);
1157 if (state->lock_list != NULL) {
1158 TALLOC_FREE(state->lock_list);
1162 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1164 talloc_free(data.dptr);
1166 tevent_req_error(req, ret);
1170 ctdb_g_lock_lock_process_locks(req);
1173 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1175 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1176 req, struct ctdb_g_lock_lock_state);
1177 struct tevent_req *subreq;
1178 struct ctdb_g_lock *lock;
1179 bool check_server = false;
1182 while (state->current < state->lock_list->num) {
1183 lock = &state->lock_list->lock[state->current];
1185 /* We should not ask for the same lock more than once */
1186 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1187 tevent_req_error(req, EDEADLK);
1191 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1192 check_server = true;
1196 state->current += 1;
1200 struct ctdb_req_control request;
1201 struct ctdb_uint64_array u64_array;
1204 u64_array.val = &lock->sid.unique_id;
1206 ctdb_req_control_check_srvids(&request, &u64_array);
1207 subreq = ctdb_client_control_send(state, state->ev,
1210 tevent_timeval_zero(),
1212 if (tevent_req_nomem(subreq, req)) {
1215 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1219 /* There is no conflict, add ourself to the lock_list */
1220 state->lock_list->lock = talloc_realloc(state->lock_list,
1221 state->lock_list->lock,
1223 state->lock_list->num + 1);
1224 if (state->lock_list->lock == NULL) {
1225 tevent_req_error(req, ENOMEM);
1229 lock = &state->lock_list->lock[state->lock_list->num];
1230 lock->type = state->lock_type;
1231 lock->sid = state->my_sid;
1232 state->lock_list->num += 1;
1234 ret = ctdb_g_lock_lock_update(req);
1236 tevent_req_error(req, ret);
1240 tevent_req_done(req);
1243 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1245 struct tevent_req *req = tevent_req_callback_data(
1246 subreq, struct tevent_req);
1247 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1248 req, struct ctdb_g_lock_lock_state);
1249 struct ctdb_reply_control *reply;
1250 struct ctdb_uint8_array *u8_array;
1255 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1256 TALLOC_FREE(subreq);
1258 tevent_req_error(req, ret);
1262 ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1264 tevent_req_error(req, ENOMEM);
1268 if (u8_array->num != 1) {
1269 talloc_free(u8_array);
1270 tevent_req_error(req, EIO);
1274 val = u8_array->val[0];
1275 talloc_free(u8_array);
1278 /* server process exists, need to retry */
1279 subreq = tevent_wakeup_send(state, state->ev,
1280 tevent_timeval_current_ofs(1,0));
1281 if (tevent_req_nomem(subreq, req)) {
1284 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1288 /* server process does not exist, remove conflicting entry */
1289 state->lock_list->lock[state->current] =
1290 state->lock_list->lock[state->lock_list->num-1];
1291 state->lock_list->num -= 1;
1293 ret = ctdb_g_lock_lock_update(req);
1295 tevent_req_error(req, ret);
1299 ctdb_g_lock_lock_process_locks(req);
1302 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1304 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1305 req, struct ctdb_g_lock_lock_state);
1309 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1310 data.dptr = talloc_size(state, data.dsize);
1311 if (data.dptr == NULL) {
1315 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1316 ret = ctdb_store_record(state->h, data);
1317 talloc_free(data.dptr);
1322 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1323 struct ctdb_g_lock_list *lock_list,
1324 struct ctdb_record_handle *h)
1326 struct ctdb_g_lock *lock;
1327 bool conflict = false;
1328 bool modified = false;
1331 for (i=0; i<lock_list->num; i++) {
1332 lock = &lock_list->lock[i];
1334 /* We should not ask for lock more than once */
1335 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1339 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1343 ret = ctdb_server_id_exists(state->client, &lock->sid,
1353 /* Server does not exist, delete conflicting entry */
1354 lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1355 lock_list->num -= 1;
1361 lock = talloc_realloc(lock_list, lock_list->lock,
1362 struct ctdb_g_lock, lock_list->num+1);
1367 lock[lock_list->num].type = state->lock_type;
1368 lock[lock_list->num].sid = state->my_sid;
1369 lock_list->lock = lock;
1370 lock_list->num += 1;
1377 data.dsize = ctdb_g_lock_list_len(lock_list);
1378 data.dptr = talloc_size(state, data.dsize);
1379 if (data.dptr == NULL) {
1383 ctdb_g_lock_list_push(lock_list, data.dptr);
1384 ret = ctdb_store_record(h, data);
1385 talloc_free(data.dptr);
1398 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1400 struct tevent_req *req = tevent_req_callback_data(
1401 subreq, struct tevent_req);
1402 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1403 req, struct ctdb_g_lock_lock_state);
1406 success = tevent_wakeup_recv(subreq);
1407 TALLOC_FREE(subreq);
1409 tevent_req_error(req, ENOMEM);
1413 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1414 state->db, state->key, false);
1415 if (tevent_req_nomem(subreq, req)) {
1418 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1421 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1423 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1424 req, struct ctdb_g_lock_lock_state);
1427 TALLOC_FREE(state->h);
1429 if (tevent_req_is_unix_error(req, &err)) {
1439 struct ctdb_g_lock_unlock_state {
1440 struct tevent_context *ev;
1441 struct ctdb_client_context *client;
1442 struct ctdb_db_context *db;
1444 struct ctdb_server_id my_sid;
1445 struct ctdb_record_handle *h;
1446 struct ctdb_g_lock_list *lock_list;
1449 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1450 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1452 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1453 struct tevent_context *ev,
1454 struct ctdb_client_context *client,
1455 struct ctdb_db_context *db,
1456 const char *keyname,
1457 struct ctdb_server_id sid)
1459 struct tevent_req *req, *subreq;
1460 struct ctdb_g_lock_unlock_state *state;
1462 req = tevent_req_create(mem_ctx, &state,
1463 struct ctdb_g_lock_unlock_state);
1469 state->client = client;
1471 state->key.dptr = discard_const(keyname);
1472 state->key.dsize = strlen(keyname) + 1;
1473 state->my_sid = sid;
1475 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1477 if (tevent_req_nomem(subreq, req)) {
1478 return tevent_req_post(req, ev);
1480 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1485 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1487 struct tevent_req *req = tevent_req_callback_data(
1488 subreq, struct tevent_req);
1489 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1490 req, struct ctdb_g_lock_unlock_state);
1494 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1495 TALLOC_FREE(subreq);
1496 if (state->h == NULL) {
1497 tevent_req_error(req, ret);
1501 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1504 tevent_req_error(req, ret);
1508 ret = ctdb_g_lock_unlock_update(req);
1510 tevent_req_error(req, ret);
1514 tevent_req_done(req);
1517 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1519 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1520 req, struct ctdb_g_lock_unlock_state);
1521 struct ctdb_g_lock *lock;
1524 for (i=0; i<state->lock_list->num; i++) {
1525 lock = &state->lock_list->lock[i];
1527 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1532 if (i < state->lock_list->num) {
1533 state->lock_list->lock[i] =
1534 state->lock_list->lock[state->lock_list->num-1];
1535 state->lock_list->num -= 1;
1538 if (state->lock_list->num == 0) {
1539 ctdb_delete_record(state->h);
1543 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1544 data.dptr = talloc_size(state, data.dsize);
1545 if (data.dptr == NULL) {
1549 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1550 ret = ctdb_store_record(state->h, data);
1551 talloc_free(data.dptr);
1560 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1562 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1563 req, struct ctdb_g_lock_unlock_state);
1566 TALLOC_FREE(state->h);
1568 if (tevent_req_is_unix_error(req, &err)) {
1579 * Persistent database functions
1581 struct ctdb_transaction_start_state {
1582 struct tevent_context *ev;
1583 struct ctdb_client_context *client;
1584 struct timeval timeout;
1585 struct ctdb_transaction_handle *h;
1589 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1590 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1591 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1592 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1594 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1595 struct tevent_context *ev,
1596 struct ctdb_client_context *client,
1597 struct timeval timeout,
1598 struct ctdb_db_context *db,
1601 struct ctdb_transaction_start_state *state;
1602 struct tevent_req *req, *subreq;
1603 struct ctdb_transaction_handle *h;
1605 req = tevent_req_create(mem_ctx, &state,
1606 struct ctdb_transaction_start_state);
1611 if (! db->persistent) {
1612 tevent_req_error(req, EINVAL);
1613 return tevent_req_post(req, ev);
1617 state->client = client;
1618 state->destnode = ctdb_client_pnn(client);
1620 h = talloc_zero(db, struct ctdb_transaction_handle);
1621 if (tevent_req_nomem(h, req)) {
1622 return tevent_req_post(req, ev);
1628 h->readonly = readonly;
1631 /* SRVID is unique for databases, so client can have transactions active
1632 * for multiple databases */
1633 h->sid.pid = getpid();
1634 h->sid.task_id = db->db_id;
1635 h->sid.vnn = state->destnode;
1636 h->sid.unique_id = h->sid.task_id;
1637 h->sid.unique_id = (h->sid.unique_id << 32) | h->sid.pid;
1639 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1640 if (tevent_req_nomem(h->recbuf, req)) {
1641 return tevent_req_post(req, ev);
1644 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1645 if (tevent_req_nomem(h->lock_name, req)) {
1646 return tevent_req_post(req, ev);
1651 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1652 if (tevent_req_nomem(subreq, req)) {
1653 return tevent_req_post(req, ev);
1655 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1660 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1662 struct tevent_req *req = tevent_req_callback_data(
1663 subreq, struct tevent_req);
1664 struct ctdb_transaction_start_state *state = tevent_req_data(
1665 req, struct ctdb_transaction_start_state);
1666 struct ctdb_req_control request;
1670 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1671 TALLOC_FREE(subreq);
1673 tevent_req_error(req, ret);
1677 ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1678 subreq = ctdb_client_control_send(state, state->ev, state->client,
1679 state->destnode, state->timeout,
1681 if (tevent_req_nomem(subreq, req)) {
1684 tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1687 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1689 struct tevent_req *req = tevent_req_callback_data(
1690 subreq, struct tevent_req);
1691 struct ctdb_transaction_start_state *state = tevent_req_data(
1692 req, struct ctdb_transaction_start_state);
1693 struct ctdb_reply_control *reply;
1697 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1698 TALLOC_FREE(subreq);
1700 tevent_req_error(req, ret);
1704 ret = ctdb_reply_control_register_srvid(reply);
1707 tevent_req_error(req, ret);
1711 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1712 state->h->db_g_lock, state->h->lock_name,
1713 &state->h->sid, state->h->readonly);
1714 if (tevent_req_nomem(subreq, req)) {
1717 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1720 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1722 struct tevent_req *req = tevent_req_callback_data(
1723 subreq, struct tevent_req);
1727 status = ctdb_g_lock_lock_recv(subreq, &ret);
1728 TALLOC_FREE(subreq);
1730 tevent_req_error(req, ret);
1734 tevent_req_done(req);
1737 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1738 struct tevent_req *req,
1741 struct ctdb_transaction_start_state *state = tevent_req_data(
1742 req, struct ctdb_transaction_start_state);
1743 struct ctdb_transaction_handle *h = state->h;
1746 if (tevent_req_is_unix_error(req, &err)) {
1753 talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1757 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1761 ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1762 tevent_timeval_zero(),
1765 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1771 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1772 struct ctdb_client_context *client,
1773 struct timeval timeout,
1774 struct ctdb_db_context *db, bool readonly,
1775 struct ctdb_transaction_handle **out)
1777 struct tevent_req *req;
1778 struct ctdb_transaction_handle *h;
1781 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1787 tevent_req_poll(req, ev);
1789 h = ctdb_transaction_start_recv(req, &ret);
1798 struct ctdb_transaction_record_fetch_state {
1800 struct ctdb_ltdb_header header;
1804 static int ctdb_transaction_record_fetch_traverse(uint32_t reqid,
1805 struct ctdb_ltdb_header *header,
1810 struct ctdb_transaction_record_fetch_state *state =
1811 (struct ctdb_transaction_record_fetch_state *)private_data;
1813 if (state->key.dsize == key.dsize &&
1814 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1816 state->header = *header;
1817 state->found = true;
1823 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1825 struct ctdb_ltdb_header *header,
1828 struct ctdb_transaction_record_fetch_state state;
1832 state.found = false;
1834 ret = ctdb_rec_buffer_traverse(h->recbuf,
1835 ctdb_transaction_record_fetch_traverse,
1842 if (header != NULL) {
1843 *header = state.header;
1854 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1856 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1859 struct ctdb_ltdb_header header;
1862 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1864 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1866 if (data->dptr == NULL) {
1869 data->dsize = tmp_data.dsize;
1873 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
1878 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
1886 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
1887 TDB_DATA key, TDB_DATA data)
1889 TALLOC_CTX *tmp_ctx;
1890 struct ctdb_ltdb_header header;
1898 tmp_ctx = talloc_new(h);
1899 if (tmp_ctx == NULL) {
1903 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
1905 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
1911 if (old_data.dsize == data.dsize &&
1912 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
1913 talloc_free(tmp_ctx);
1917 header.dmaster = ctdb_client_pnn(h->client);
1920 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
1921 talloc_free(tmp_ctx);
1930 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
1933 return ctdb_transaction_store_record(h, key, tdb_null);
1936 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
1939 const char *keyname = CTDB_DB_SEQNUM_KEY;
1942 key.dptr = discard_const(keyname);
1943 key.dsize = strlen(keyname) + 1;
1945 data.dptr = (uint8_t *)&seqnum;
1946 data.dsize = sizeof(seqnum);
1948 return ctdb_transaction_store_record(h, key, data);
1951 struct ctdb_transaction_commit_state {
1952 struct tevent_context *ev;
1953 struct ctdb_transaction_handle *h;
1957 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq);
1958 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
1959 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
1960 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq);
1962 struct tevent_req *ctdb_transaction_commit_send(
1963 TALLOC_CTX *mem_ctx,
1964 struct tevent_context *ev,
1965 struct ctdb_transaction_handle *h)
1967 struct tevent_req *req, *subreq;
1968 struct ctdb_transaction_commit_state *state;
1969 struct ctdb_req_control request;
1971 req = tevent_req_create(mem_ctx, &state,
1972 struct ctdb_transaction_commit_state);
1980 ctdb_req_control_get_db_seqnum(&request, h->db->db_id);
1981 subreq = ctdb_client_control_send(state, ev, h->client,
1983 tevent_timeval_zero(), &request);
1984 if (tevent_req_nomem(subreq, req)) {
1985 return tevent_req_post(req, ev);
1987 tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum_done,
1993 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq)
1995 struct tevent_req *req = tevent_req_callback_data(
1996 subreq, struct tevent_req);
1997 struct ctdb_transaction_commit_state *state = tevent_req_data(
1998 req, struct ctdb_transaction_commit_state);
1999 struct ctdb_reply_control *reply;
2003 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2004 TALLOC_FREE(subreq);
2006 tevent_req_error(req, ret);
2010 ret = ctdb_reply_control_get_db_seqnum(reply, &state->seqnum);
2012 tevent_req_error(req, ret);
2016 ret = ctdb_transaction_store_db_seqnum(state->h, state->seqnum+1);
2018 tevent_req_error(req, ret);
2022 subreq = ctdb_recovery_wait_send(state, state->ev, state->h->client);
2023 if (tevent_req_nomem(subreq, req)) {
2026 tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
2029 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
2031 struct tevent_req *req = tevent_req_callback_data(
2032 subreq, struct tevent_req);
2033 struct ctdb_transaction_commit_state *state = tevent_req_data(
2034 req, struct ctdb_transaction_commit_state);
2035 struct ctdb_req_control request;
2039 status = ctdb_recovery_wait_recv(subreq, &ret);
2040 TALLOC_FREE(subreq);
2042 tevent_req_error(req, ret);
2046 ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2047 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2048 state->h->client->pnn,
2049 tevent_timeval_zero(), &request);
2050 if (tevent_req_nomem(subreq, req)) {
2053 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2056 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2058 struct tevent_req *req = tevent_req_callback_data(
2059 subreq, struct tevent_req);
2060 struct ctdb_transaction_commit_state *state = tevent_req_data(
2061 req, struct ctdb_transaction_commit_state);
2062 struct ctdb_reply_control *reply;
2063 struct ctdb_req_control request;
2067 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2068 TALLOC_FREE(subreq);
2070 tevent_req_error(req, ret);
2074 ret = ctdb_reply_control_trans3_commit(reply);
2076 /* Control failed due to recovery */
2077 subreq = ctdb_recovery_wait_send(state, state->ev,
2079 if (tevent_req_nomem(subreq, req)) {
2082 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2087 ctdb_req_control_get_db_seqnum(&request, state->h->db->db_id);
2088 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2089 state->h->client->pnn,
2090 tevent_timeval_zero(), &request);
2091 if (tevent_req_nomem(subreq, req)) {
2094 tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum2_done,
2098 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq)
2100 struct tevent_req *req = tevent_req_callback_data(
2101 subreq, struct tevent_req);
2102 struct ctdb_transaction_commit_state *state = tevent_req_data(
2103 req, struct ctdb_transaction_commit_state);
2104 struct ctdb_reply_control *reply;
2109 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2110 TALLOC_FREE(subreq);
2112 tevent_req_error(req, ret);
2116 ret = ctdb_reply_control_get_db_seqnum(reply, &seqnum);
2118 tevent_req_error(req, ret);
2122 if (seqnum == state->seqnum) {
2123 subreq = ctdb_recovery_wait_send(state, state->ev,
2125 if (tevent_req_nomem(subreq, req)) {
2128 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2133 if (seqnum != state->seqnum + 1) {
2134 tevent_req_error(req, EIO);
2138 tevent_req_done(req);
2141 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2145 if (tevent_req_is_unix_error(req, &err)) {
2155 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2157 struct tevent_req *req;
2161 if (h->readonly || ! h->updated) {
2166 req = ctdb_transaction_commit_send(h, h->ev, h);
2172 tevent_req_poll(req, h->ev);
2174 status = ctdb_transaction_commit_recv(req, &ret);
2184 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2193 * In future Samba should register SERVER_ID.
2194 * Make that structure same as struct srvid {}.