4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
124 tevent_req_error(req, ret);
128 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
131 tevent_req_error(req, ret);
135 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
136 state, &state->pnn_list);
137 talloc_free(nodemap);
138 if (state->count <= 0) {
139 tevent_req_error(req, ENOMEM);
143 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
144 ctdb_req_control_set_db_readonly(&request, state->db_id);
145 subreq = ctdb_client_control_multi_send(
146 state, state->ev, state->client,
147 state->pnn_list, state->count,
148 state->timeout, &request);
149 if (tevent_req_nomem(subreq, req)) {
152 tevent_req_set_callback(subreq,
153 ctdb_set_db_flags_readonly_done, req);
155 state->readonly_done = true;
158 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
159 ctdb_req_control_set_db_sticky(&request, state->db_id);
160 subreq = ctdb_client_control_multi_send(
161 state, state->ev, state->client,
162 state->pnn_list, state->count,
163 state->timeout, &request);
164 if (tevent_req_nomem(subreq, req)) {
167 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
170 state->sticky_done = true;
174 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
176 struct tevent_req *req = tevent_req_callback_data(
177 subreq, struct tevent_req);
178 struct ctdb_set_db_flags_state *state = tevent_req_data(
179 req, struct ctdb_set_db_flags_state);
183 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
187 tevent_req_error(req, ret);
191 state->readonly_done = true;
193 if (state->readonly_done && state->sticky_done) {
194 tevent_req_done(req);
198 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
200 struct tevent_req *req = tevent_req_callback_data(
201 subreq, struct tevent_req);
202 struct ctdb_set_db_flags_state *state = tevent_req_data(
203 req, struct ctdb_set_db_flags_state);
207 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
211 tevent_req_error(req, ret);
215 state->sticky_done = true;
217 if (state->readonly_done && state->sticky_done) {
218 tevent_req_done(req);
222 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
226 if (tevent_req_is_unix_error(req, &err)) {
235 struct ctdb_attach_state {
236 struct tevent_context *ev;
237 struct ctdb_client_context *client;
238 struct timeval timeout;
242 struct ctdb_db_context *db;
245 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
246 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
247 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
248 static void ctdb_attach_health_done(struct tevent_req *subreq);
249 static void ctdb_attach_flags_done(struct tevent_req *subreq);
251 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
252 struct tevent_context *ev,
253 struct ctdb_client_context *client,
254 struct timeval timeout,
255 const char *db_name, uint8_t db_flags)
257 struct tevent_req *req, *subreq;
258 struct ctdb_attach_state *state;
259 struct ctdb_req_control request;
261 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
266 state->db = client_db_handle(client, db_name);
267 if (state->db != NULL) {
268 tevent_req_done(req);
269 return tevent_req_post(req, ev);
273 state->client = client;
274 state->timeout = timeout;
275 state->destnode = ctdb_client_pnn(client);
276 state->db_flags = db_flags;
278 state->db = talloc_zero(client, struct ctdb_db_context);
279 if (tevent_req_nomem(state->db, req)) {
280 return tevent_req_post(req, ev);
283 state->db->db_name = talloc_strdup(state->db, db_name);
284 if (tevent_req_nomem(state->db, req)) {
285 return tevent_req_post(req, ev);
288 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
289 state->db->persistent = true;
292 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
293 subreq = ctdb_client_control_send(state, ev, client,
294 ctdb_client_pnn(client), timeout,
296 if (tevent_req_nomem(subreq, req)) {
297 return tevent_req_post(req, ev);
299 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
304 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
306 struct tevent_req *req = tevent_req_callback_data(
307 subreq, struct tevent_req);
308 struct ctdb_attach_state *state = tevent_req_data(
309 req, struct ctdb_attach_state);
310 struct ctdb_reply_control *reply;
311 struct ctdb_req_control request;
312 uint32_t mutex_enabled;
316 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
319 tevent_req_error(req, ret);
323 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
325 /* Treat error as mutex support not available */
329 if (state->db->persistent) {
330 state->tdb_flags = TDB_DEFAULT;
332 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
334 if (mutex_enabled == 1) {
335 state->tdb_flags |= TDB_MUTEX_LOCKING;
339 if (state->db->persistent) {
340 ctdb_req_control_db_attach_persistent(&request,
344 ctdb_req_control_db_attach(&request, state->db->db_name,
348 subreq = ctdb_client_control_send(state, state->ev, state->client,
349 state->destnode, state->timeout,
351 if (tevent_req_nomem(subreq, req)) {
354 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
357 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
359 struct tevent_req *req = tevent_req_callback_data(
360 subreq, struct tevent_req);
361 struct ctdb_attach_state *state = tevent_req_data(
362 req, struct ctdb_attach_state);
363 struct ctdb_req_control request;
364 struct ctdb_reply_control *reply;
368 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
371 tevent_req_error(req, ret);
375 if (state->db->persistent) {
376 ret = ctdb_reply_control_db_attach_persistent(
377 reply, &state->db->db_id);
379 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
383 tevent_req_error(req, ret);
387 ctdb_req_control_getdbpath(&request, state->db->db_id);
388 subreq = ctdb_client_control_send(state, state->ev, state->client,
389 state->destnode, state->timeout,
391 if (tevent_req_nomem(subreq, req)) {
394 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
397 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
399 struct tevent_req *req = tevent_req_callback_data(
400 subreq, struct tevent_req);
401 struct ctdb_attach_state *state = tevent_req_data(
402 req, struct ctdb_attach_state);
403 struct ctdb_reply_control *reply;
404 struct ctdb_req_control request;
408 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
411 tevent_req_error(req, ret);
415 ret = ctdb_reply_control_getdbpath(reply, state->db,
416 &state->db->db_path);
419 tevent_req_error(req, ret);
423 ctdb_req_control_db_get_health(&request, state->db->db_id);
424 subreq = ctdb_client_control_send(state, state->ev, state->client,
425 state->destnode, state->timeout,
427 if (tevent_req_nomem(subreq, req)) {
430 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
433 static void ctdb_attach_health_done(struct tevent_req *subreq)
435 struct tevent_req *req = tevent_req_callback_data(
436 subreq, struct tevent_req);
437 struct ctdb_attach_state *state = tevent_req_data(
438 req, struct ctdb_attach_state);
439 struct ctdb_reply_control *reply;
444 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
447 tevent_req_error(req, ret);
451 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
453 tevent_req_error(req, ret);
457 if (reason != NULL) {
458 /* Database unhealthy, avoid attach */
459 /* FIXME: Log here */
460 tevent_req_error(req, EIO);
464 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
465 state->destnode, state->timeout,
466 state->db->db_id, state->db_flags);
467 if (tevent_req_nomem(subreq, req)) {
470 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
473 static void ctdb_attach_flags_done(struct tevent_req *subreq)
475 struct tevent_req *req = tevent_req_callback_data(
476 subreq, struct tevent_req);
477 struct ctdb_attach_state *state = tevent_req_data(
478 req, struct ctdb_attach_state);
482 status = ctdb_set_db_flags_recv(subreq, &ret);
485 tevent_req_error(req, ret);
489 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
490 state->tdb_flags, O_RDWR, 0);
491 if (tevent_req_nomem(state->db->ltdb, req)) {
494 DLIST_ADD(state->client->db, state->db);
496 tevent_req_done(req);
499 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
500 struct ctdb_db_context **out)
502 struct ctdb_attach_state *state = tevent_req_data(
503 req, struct ctdb_attach_state);
506 if (tevent_req_is_unix_error(req, &err)) {
519 int ctdb_attach(struct tevent_context *ev,
520 struct ctdb_client_context *client,
521 struct timeval timeout,
522 const char *db_name, uint8_t db_flags,
523 struct ctdb_db_context **out)
526 struct tevent_req *req;
530 mem_ctx = talloc_new(client);
531 if (mem_ctx == NULL) {
535 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
538 talloc_free(mem_ctx);
542 tevent_req_poll(req, ev);
544 status = ctdb_attach_recv(req, &ret, out);
546 talloc_free(mem_ctx);
551 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
552 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
553 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
556 talloc_free(mem_ctx);
560 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
561 struct ctdb_client_context *client,
562 struct timeval timeout, uint32_t db_id)
564 struct ctdb_db_context *db;
567 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
573 for (db = client->db; db != NULL; db = db->next) {
574 if (db->db_id == db_id) {
575 DLIST_REMOVE(client->db, db);
583 uint32_t ctdb_db_id(struct ctdb_db_context *db)
588 struct ctdb_db_traverse_state {
589 ctdb_rec_parser_func_t parser;
595 static int ctdb_db_traverse_handler(struct tdb_context *tdb, TDB_DATA key,
596 TDB_DATA data, void *private_data)
598 struct ctdb_db_traverse_state *state =
599 (struct ctdb_db_traverse_state *)private_data;
602 if (state->extract_header) {
603 struct ctdb_ltdb_header header;
605 ret = ctdb_ltdb_header_extract(&data, &header);
611 ret = state->parser(0, &header, key, data, state->private_data);
613 ret = state->parser(0, NULL, key, data, state->private_data);
624 int ctdb_db_traverse(struct ctdb_db_context *db, bool readonly,
626 ctdb_rec_parser_func_t parser, void *private_data)
628 struct ctdb_db_traverse_state state;
631 state.parser = parser;
632 state.private_data = private_data;
633 state.extract_header = extract_header;
637 ret = tdb_traverse_read(db->ltdb->tdb,
638 ctdb_db_traverse_handler, &state);
640 ret = tdb_traverse(db->ltdb->tdb,
641 ctdb_db_traverse_handler, &state);
651 static int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
652 struct ctdb_ltdb_header *header,
653 TALLOC_CTX *mem_ctx, TDB_DATA *data)
658 rec = tdb_fetch(db->ltdb->tdb, key);
659 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
660 /* No record present */
661 if (rec.dptr != NULL) {
665 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
670 header->dmaster = CTDB_UNKNOWN_PNN;
679 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
686 size_t offset = ctdb_ltdb_header_len(header);
688 data->dsize = rec.dsize - offset;
689 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
691 if (data->dptr == NULL) {
701 * Fetch a record from volatile database
704 * 1. Get a lock on the hash chain
705 * 2. If the record does not exist, migrate the record
706 * 3. If readonly=true and delegations do not exist, migrate the record.
707 * 4. If readonly=false and delegations exist, migrate the record.
708 * 5. If the local node is not dmaster, migrate the record.
712 struct ctdb_fetch_lock_state {
713 struct tevent_context *ev;
714 struct ctdb_client_context *client;
715 struct ctdb_record_handle *h;
720 static int ctdb_fetch_lock_check(struct tevent_req *req);
721 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
722 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
724 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
725 struct tevent_context *ev,
726 struct ctdb_client_context *client,
727 struct ctdb_db_context *db,
728 TDB_DATA key, bool readonly)
730 struct ctdb_fetch_lock_state *state;
731 struct tevent_req *req;
734 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
740 state->client = client;
742 state->h = talloc_zero(db, struct ctdb_record_handle);
743 if (tevent_req_nomem(state->h, req)) {
744 return tevent_req_post(req, ev);
746 state->h->client = client;
748 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
749 if (tevent_req_nomem(state->h->key.dptr, req)) {
750 return tevent_req_post(req, ev);
752 state->h->key.dsize = key.dsize;
753 state->h->readonly = false;
755 state->readonly = readonly;
756 state->pnn = ctdb_client_pnn(client);
758 /* Check that database is not persistent */
759 if (db->persistent) {
760 tevent_req_error(req, EINVAL);
761 return tevent_req_post(req, ev);
764 ret = ctdb_fetch_lock_check(req);
766 tevent_req_done(req);
767 return tevent_req_post(req, ev);
770 tevent_req_error(req, ret);
771 return tevent_req_post(req, ev);
776 static int ctdb_fetch_lock_check(struct tevent_req *req)
778 struct ctdb_fetch_lock_state *state = tevent_req_data(
779 req, struct ctdb_fetch_lock_state);
780 struct ctdb_record_handle *h = state->h;
781 struct ctdb_ltdb_header header;
782 TDB_DATA data = tdb_null;
784 bool do_migrate = false;
786 ret = tdb_chainlock(state->h->db->ltdb->tdb, state->h->key);
792 data = tdb_fetch(h->db->ltdb->tdb, h->key);
793 if (data.dptr == NULL) {
794 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
803 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
809 if (! state->readonly) {
810 /* Read/write access */
811 if (header.dmaster == state->pnn &&
812 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
816 if (header.dmaster != state->pnn) {
820 /* Readonly access */
821 if (header.dmaster != state->pnn &&
822 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
823 CTDB_REC_RO_HAVE_DELEGATIONS))) {
828 /* We are the dmaster or readonly delegation */
831 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
832 CTDB_REC_RO_HAVE_DELEGATIONS)) {
842 if (data.dptr != NULL) {
845 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
847 DEBUG(DEBUG_ERR, ("tdb_chainunlock failed on %s\n",
853 ctdb_fetch_lock_migrate(req);
858 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
860 struct ctdb_fetch_lock_state *state = tevent_req_data(
861 req, struct ctdb_fetch_lock_state);
862 struct ctdb_req_call request;
863 struct tevent_req *subreq;
865 ZERO_STRUCT(request);
866 request.flags = CTDB_IMMEDIATE_MIGRATION;
867 if (state->readonly) {
868 request.flags |= CTDB_WANT_READONLY;
870 request.db_id = state->h->db->db_id;
871 request.callid = CTDB_NULL_FUNC;
872 request.key = state->h->key;
873 request.calldata = tdb_null;
875 subreq = ctdb_client_call_send(state, state->ev, state->client,
877 if (tevent_req_nomem(subreq, req)) {
881 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
884 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
886 struct tevent_req *req = tevent_req_callback_data(
887 subreq, struct tevent_req);
888 struct ctdb_fetch_lock_state *state = tevent_req_data(
889 req, struct ctdb_fetch_lock_state);
890 struct ctdb_reply_call *reply;
894 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
897 tevent_req_error(req, ret);
901 if (reply->status != 0) {
902 tevent_req_error(req, EIO);
907 ret = ctdb_fetch_lock_check(req);
910 tevent_req_error(req, ret);
915 tevent_req_done(req);
918 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
920 tdb_chainunlock(h->db->ltdb->tdb, h->key);
925 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
926 struct ctdb_ltdb_header *header,
928 TDB_DATA *data, int *perr)
930 struct ctdb_fetch_lock_state *state = tevent_req_data(
931 req, struct ctdb_fetch_lock_state);
932 struct ctdb_record_handle *h = state->h;
935 if (tevent_req_is_unix_error(req, &err)) {
942 if (header != NULL) {
948 offset = ctdb_ltdb_header_len(&h->header);
950 data->dsize = h->data.dsize - offset;
951 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
953 if (data->dptr == NULL) {
954 TALLOC_FREE(state->h);
962 talloc_set_destructor(h, ctdb_record_handle_destructor);
966 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
967 struct ctdb_client_context *client,
968 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
969 struct ctdb_record_handle **out,
970 struct ctdb_ltdb_header *header, TDB_DATA *data)
972 struct tevent_req *req;
973 struct ctdb_record_handle *h;
976 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
981 tevent_req_poll(req, ev);
983 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
992 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
998 /* Cannot modify the record if it was obtained as a readonly copy */
1003 /* Check if the new data is same */
1004 if (h->data.dsize == data.dsize &&
1005 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1006 /* No need to do anything */
1010 offset = ctdb_ltdb_header_len(&h->header);
1011 rec.dsize = offset + data.dsize;
1012 rec.dptr = talloc_size(h, rec.dsize);
1013 if (rec.dptr == NULL) {
1017 ctdb_ltdb_header_push(&h->header, rec.dptr);
1018 memcpy(rec.dptr + offset, data.dptr, data.dsize);
1020 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1022 DEBUG(DEBUG_ERR, ("Failed to store record in DB %s\n",
1027 talloc_free(rec.dptr);
1031 struct ctdb_delete_record_state {
1032 struct ctdb_record_handle *h;
1035 static void ctdb_delete_record_done(struct tevent_req *subreq);
1037 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1038 struct tevent_context *ev,
1039 struct ctdb_record_handle *h)
1041 struct tevent_req *req, *subreq;
1042 struct ctdb_delete_record_state *state;
1043 struct ctdb_key_data key;
1044 struct ctdb_req_control request;
1048 req = tevent_req_create(mem_ctx, &state,
1049 struct ctdb_delete_record_state);
1056 /* Cannot delete the record if it was obtained as a readonly copy */
1058 tevent_req_error(req, EINVAL);
1059 return tevent_req_post(req, ev);
1062 rec.dsize = ctdb_ltdb_header_len(&h->header);
1063 rec.dptr = talloc_size(h, rec.dsize);
1064 if (tevent_req_nomem(rec.dptr, req)) {
1065 return tevent_req_post(req, ev);
1068 ctdb_ltdb_header_push(&h->header, rec.dptr);
1070 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1071 talloc_free(rec.dptr);
1073 DEBUG(DEBUG_ERR, ("Failed to delete record in DB %s\n",
1075 tevent_req_error(req, EIO);
1076 return tevent_req_post(req, ev);
1079 key.db_id = h->db->db_id;
1080 key.header = h->header;
1083 ctdb_req_control_schedule_for_deletion(&request, &key);
1084 subreq = ctdb_client_control_send(state, ev, h->client,
1085 ctdb_client_pnn(h->client),
1086 tevent_timeval_zero(),
1088 if (tevent_req_nomem(subreq, req)) {
1089 return tevent_req_post(req, ev);
1091 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1096 static void ctdb_delete_record_done(struct tevent_req *subreq)
1098 struct tevent_req *req = tevent_req_callback_data(
1099 subreq, struct tevent_req);
1100 struct ctdb_delete_record_state *state = tevent_req_data(
1101 req, struct ctdb_delete_record_state);
1105 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1106 TALLOC_FREE(subreq);
1109 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1110 "ret=%d\n", state->h->db->db_name, ret));
1111 tevent_req_error(req, ret);
1115 tevent_req_done(req);
1118 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1122 if (tevent_req_is_unix_error(req, &err)) {
1133 int ctdb_delete_record(struct ctdb_record_handle *h)
1135 struct tevent_context *ev = h->ev;
1136 TALLOC_CTX *mem_ctx;
1137 struct tevent_req *req;
1141 mem_ctx = talloc_new(NULL);
1142 if (mem_ctx == NULL) {
1146 req = ctdb_delete_record_send(mem_ctx, ev, h);
1148 talloc_free(mem_ctx);
1152 tevent_req_poll(req, ev);
1154 status = ctdb_delete_record_recv(req, &ret);
1155 talloc_free(mem_ctx);
1164 * Global lock functions
1167 struct ctdb_g_lock_lock_state {
1168 struct tevent_context *ev;
1169 struct ctdb_client_context *client;
1170 struct ctdb_db_context *db;
1172 struct ctdb_server_id my_sid;
1173 enum ctdb_g_lock_type lock_type;
1174 struct ctdb_record_handle *h;
1175 /* state for verification of active locks */
1176 struct ctdb_g_lock_list *lock_list;
1177 unsigned int current;
1180 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1181 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1182 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1183 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1184 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1186 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1187 enum ctdb_g_lock_type l2)
1189 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1195 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1196 struct tevent_context *ev,
1197 struct ctdb_client_context *client,
1198 struct ctdb_db_context *db,
1199 const char *keyname,
1200 struct ctdb_server_id *sid,
1203 struct tevent_req *req, *subreq;
1204 struct ctdb_g_lock_lock_state *state;
1206 req = tevent_req_create(mem_ctx, &state,
1207 struct ctdb_g_lock_lock_state);
1213 state->client = client;
1215 state->key.dptr = discard_const(keyname);
1216 state->key.dsize = strlen(keyname) + 1;
1217 state->my_sid = *sid;
1218 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1220 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1222 if (tevent_req_nomem(subreq, req)) {
1223 return tevent_req_post(req, ev);
1225 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1230 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1232 struct tevent_req *req = tevent_req_callback_data(
1233 subreq, struct tevent_req);
1234 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1235 req, struct ctdb_g_lock_lock_state);
1239 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1240 TALLOC_FREE(subreq);
1241 if (state->h == NULL) {
1242 tevent_req_error(req, ret);
1246 if (state->lock_list != NULL) {
1247 TALLOC_FREE(state->lock_list);
1251 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1253 talloc_free(data.dptr);
1255 tevent_req_error(req, ret);
1259 ctdb_g_lock_lock_process_locks(req);
1262 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1264 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1265 req, struct ctdb_g_lock_lock_state);
1266 struct tevent_req *subreq;
1267 struct ctdb_g_lock *lock;
1268 bool check_server = false;
1271 while (state->current < state->lock_list->num) {
1272 lock = &state->lock_list->lock[state->current];
1274 /* We should not ask for the same lock more than once */
1275 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1276 tevent_req_error(req, EDEADLK);
1280 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1281 check_server = true;
1285 state->current += 1;
1289 struct ctdb_req_control request;
1290 struct ctdb_uint64_array u64_array;
1293 u64_array.val = &lock->sid.unique_id;
1295 ctdb_req_control_check_srvids(&request, &u64_array);
1296 subreq = ctdb_client_control_send(state, state->ev,
1299 tevent_timeval_zero(),
1301 if (tevent_req_nomem(subreq, req)) {
1304 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1308 /* There is no conflict, add ourself to the lock_list */
1309 state->lock_list->lock = talloc_realloc(state->lock_list,
1310 state->lock_list->lock,
1312 state->lock_list->num + 1);
1313 if (state->lock_list->lock == NULL) {
1314 tevent_req_error(req, ENOMEM);
1318 lock = &state->lock_list->lock[state->lock_list->num];
1319 lock->type = state->lock_type;
1320 lock->sid = state->my_sid;
1321 state->lock_list->num += 1;
1323 ret = ctdb_g_lock_lock_update(req);
1325 tevent_req_error(req, ret);
1329 tevent_req_done(req);
1332 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1334 struct tevent_req *req = tevent_req_callback_data(
1335 subreq, struct tevent_req);
1336 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1337 req, struct ctdb_g_lock_lock_state);
1338 struct ctdb_reply_control *reply;
1339 struct ctdb_uint8_array *u8_array;
1344 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1345 TALLOC_FREE(subreq);
1347 tevent_req_error(req, ret);
1351 ret = ctdb_reply_control_check_srvids(reply, state, &u8_array);
1353 tevent_req_error(req, ENOMEM);
1357 if (u8_array->num != 1) {
1358 talloc_free(u8_array);
1359 tevent_req_error(req, EIO);
1363 val = u8_array->val[0];
1364 talloc_free(u8_array);
1367 /* server process exists, need to retry */
1368 subreq = tevent_wakeup_send(state, state->ev,
1369 tevent_timeval_current_ofs(0,1000));
1370 if (tevent_req_nomem(subreq, req)) {
1373 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1377 /* server process does not exist, remove conflicting entry */
1378 state->lock_list->lock[state->current] =
1379 state->lock_list->lock[state->lock_list->num-1];
1380 state->lock_list->num -= 1;
1382 ret = ctdb_g_lock_lock_update(req);
1384 tevent_req_error(req, ret);
1388 ctdb_g_lock_lock_process_locks(req);
1391 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1393 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1394 req, struct ctdb_g_lock_lock_state);
1398 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1399 data.dptr = talloc_size(state, data.dsize);
1400 if (data.dptr == NULL) {
1404 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1405 ret = ctdb_store_record(state->h, data);
1406 talloc_free(data.dptr);
1411 static int ctdb_g_lock_lock_update(struct ctdb_g_lock_lock_state *state,
1412 struct ctdb_g_lock_list *lock_list,
1413 struct ctdb_record_handle *h)
1415 struct ctdb_g_lock *lock;
1416 bool conflict = false;
1417 bool modified = false;
1420 for (i=0; i<lock_list->num; i++) {
1421 lock = &lock_list->lock[i];
1423 /* We should not ask for lock more than once */
1424 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1428 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1432 ret = ctdb_server_id_exists(state->client, &lock->sid,
1442 /* Server does not exist, delete conflicting entry */
1443 lock_list->lock[i] = lock_list->lock[lock_list->num-1];
1444 lock_list->num -= 1;
1450 lock = talloc_realloc(lock_list, lock_list->lock,
1451 struct ctdb_g_lock, lock_list->num+1);
1456 lock[lock_list->num].type = state->lock_type;
1457 lock[lock_list->num].sid = state->my_sid;
1458 lock_list->lock = lock;
1459 lock_list->num += 1;
1466 data.dsize = ctdb_g_lock_list_len(lock_list);
1467 data.dptr = talloc_size(state, data.dsize);
1468 if (data.dptr == NULL) {
1472 ctdb_g_lock_list_push(lock_list, data.dptr);
1473 ret = ctdb_store_record(h, data);
1474 talloc_free(data.dptr);
1487 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1489 struct tevent_req *req = tevent_req_callback_data(
1490 subreq, struct tevent_req);
1491 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1492 req, struct ctdb_g_lock_lock_state);
1495 success = tevent_wakeup_recv(subreq);
1496 TALLOC_FREE(subreq);
1498 tevent_req_error(req, ENOMEM);
1502 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1503 state->db, state->key, false);
1504 if (tevent_req_nomem(subreq, req)) {
1507 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1510 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1512 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1513 req, struct ctdb_g_lock_lock_state);
1516 TALLOC_FREE(state->h);
1518 if (tevent_req_is_unix_error(req, &err)) {
1528 struct ctdb_g_lock_unlock_state {
1529 struct tevent_context *ev;
1530 struct ctdb_client_context *client;
1531 struct ctdb_db_context *db;
1533 struct ctdb_server_id my_sid;
1534 struct ctdb_record_handle *h;
1535 struct ctdb_g_lock_list *lock_list;
1538 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1539 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1540 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1542 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1543 struct tevent_context *ev,
1544 struct ctdb_client_context *client,
1545 struct ctdb_db_context *db,
1546 const char *keyname,
1547 struct ctdb_server_id sid)
1549 struct tevent_req *req, *subreq;
1550 struct ctdb_g_lock_unlock_state *state;
1552 req = tevent_req_create(mem_ctx, &state,
1553 struct ctdb_g_lock_unlock_state);
1559 state->client = client;
1561 state->key.dptr = discard_const(keyname);
1562 state->key.dsize = strlen(keyname) + 1;
1563 state->my_sid = sid;
1565 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1567 if (tevent_req_nomem(subreq, req)) {
1568 return tevent_req_post(req, ev);
1570 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1575 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1577 struct tevent_req *req = tevent_req_callback_data(
1578 subreq, struct tevent_req);
1579 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1580 req, struct ctdb_g_lock_unlock_state);
1584 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1585 TALLOC_FREE(subreq);
1586 if (state->h == NULL) {
1587 tevent_req_error(req, ret);
1591 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1594 tevent_req_error(req, ret);
1598 ret = ctdb_g_lock_unlock_update(req);
1600 tevent_req_error(req, ret);
1604 if (state->lock_list->num == 0) {
1605 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1606 if (tevent_req_nomem(subreq, req)) {
1609 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1614 tevent_req_done(req);
1617 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1619 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1620 req, struct ctdb_g_lock_unlock_state);
1621 struct ctdb_g_lock *lock;
1624 for (i=0; i<state->lock_list->num; i++) {
1625 lock = &state->lock_list->lock[i];
1627 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1632 if (i < state->lock_list->num) {
1633 state->lock_list->lock[i] =
1634 state->lock_list->lock[state->lock_list->num-1];
1635 state->lock_list->num -= 1;
1638 if (state->lock_list->num != 0) {
1641 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1642 data.dptr = talloc_size(state, data.dsize);
1643 if (data.dptr == NULL) {
1647 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1648 ret = ctdb_store_record(state->h, data);
1649 talloc_free(data.dptr);
1658 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1660 struct tevent_req *req = tevent_req_callback_data(
1661 subreq, struct tevent_req);
1662 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1663 req, struct ctdb_g_lock_unlock_state);
1667 status = ctdb_delete_record_recv(subreq, &ret);
1670 ("g_lock_unlock %s delete record failed, ret=%d\n",
1671 (char *)state->key.dptr, ret));
1672 tevent_req_error(req, ret);
1676 tevent_req_done(req);
1679 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1681 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1682 req, struct ctdb_g_lock_unlock_state);
1685 TALLOC_FREE(state->h);
1687 if (tevent_req_is_unix_error(req, &err)) {
1698 * Persistent database functions
1700 struct ctdb_transaction_start_state {
1701 struct tevent_context *ev;
1702 struct ctdb_client_context *client;
1703 struct timeval timeout;
1704 struct ctdb_transaction_handle *h;
1708 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1709 static void ctdb_transaction_register_done(struct tevent_req *subreq);
1710 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1711 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h);
1713 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1714 struct tevent_context *ev,
1715 struct ctdb_client_context *client,
1716 struct timeval timeout,
1717 struct ctdb_db_context *db,
1720 struct ctdb_transaction_start_state *state;
1721 struct tevent_req *req, *subreq;
1722 struct ctdb_transaction_handle *h;
1724 req = tevent_req_create(mem_ctx, &state,
1725 struct ctdb_transaction_start_state);
1730 if (! db->persistent) {
1731 tevent_req_error(req, EINVAL);
1732 return tevent_req_post(req, ev);
1736 state->client = client;
1737 state->destnode = ctdb_client_pnn(client);
1739 h = talloc_zero(db, struct ctdb_transaction_handle);
1740 if (tevent_req_nomem(h, req)) {
1741 return tevent_req_post(req, ev);
1747 h->readonly = readonly;
1750 /* SRVID is unique for databases, so client can have transactions
1751 * active for multiple databases */
1752 h->sid = ctdb_client_get_server_id(client, db->db_id);
1754 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1755 if (tevent_req_nomem(h->recbuf, req)) {
1756 return tevent_req_post(req, ev);
1759 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1760 if (tevent_req_nomem(h->lock_name, req)) {
1761 return tevent_req_post(req, ev);
1766 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1767 if (tevent_req_nomem(subreq, req)) {
1768 return tevent_req_post(req, ev);
1770 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
1775 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
1777 struct tevent_req *req = tevent_req_callback_data(
1778 subreq, struct tevent_req);
1779 struct ctdb_transaction_start_state *state = tevent_req_data(
1780 req, struct ctdb_transaction_start_state);
1781 struct ctdb_req_control request;
1785 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
1786 TALLOC_FREE(subreq);
1788 tevent_req_error(req, ret);
1792 ctdb_req_control_register_srvid(&request, state->h->sid.unique_id);
1793 subreq = ctdb_client_control_send(state, state->ev, state->client,
1794 state->destnode, state->timeout,
1796 if (tevent_req_nomem(subreq, req)) {
1799 tevent_req_set_callback(subreq, ctdb_transaction_register_done, req);
1802 static void ctdb_transaction_register_done(struct tevent_req *subreq)
1804 struct tevent_req *req = tevent_req_callback_data(
1805 subreq, struct tevent_req);
1806 struct ctdb_transaction_start_state *state = tevent_req_data(
1807 req, struct ctdb_transaction_start_state);
1808 struct ctdb_reply_control *reply;
1812 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1813 TALLOC_FREE(subreq);
1815 tevent_req_error(req, ret);
1819 ret = ctdb_reply_control_register_srvid(reply);
1822 tevent_req_error(req, ret);
1826 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
1827 state->h->db_g_lock, state->h->lock_name,
1828 &state->h->sid, state->h->readonly);
1829 if (tevent_req_nomem(subreq, req)) {
1832 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
1835 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
1837 struct tevent_req *req = tevent_req_callback_data(
1838 subreq, struct tevent_req);
1842 status = ctdb_g_lock_lock_recv(subreq, &ret);
1843 TALLOC_FREE(subreq);
1845 tevent_req_error(req, ret);
1849 tevent_req_done(req);
1852 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
1853 struct tevent_req *req,
1856 struct ctdb_transaction_start_state *state = tevent_req_data(
1857 req, struct ctdb_transaction_start_state);
1858 struct ctdb_transaction_handle *h = state->h;
1861 if (tevent_req_is_unix_error(req, &err)) {
1868 talloc_set_destructor(h, ctdb_transaction_handle_destructor);
1872 static int ctdb_transaction_handle_destructor(struct ctdb_transaction_handle *h)
1876 ret = ctdb_ctrl_deregister_srvid(h, h->ev, h->client, h->client->pnn,
1877 tevent_timeval_zero(),
1880 DEBUG(DEBUG_WARNING, ("Failed to deregister SRVID\n"));
1886 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1887 struct ctdb_client_context *client,
1888 struct timeval timeout,
1889 struct ctdb_db_context *db, bool readonly,
1890 struct ctdb_transaction_handle **out)
1892 struct tevent_req *req;
1893 struct ctdb_transaction_handle *h;
1896 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
1902 tevent_req_poll(req, ev);
1904 h = ctdb_transaction_start_recv(req, &ret);
1913 struct ctdb_transaction_record_fetch_state {
1915 struct ctdb_ltdb_header header;
1919 static int ctdb_transaction_record_fetch_traverse(
1921 struct ctdb_ltdb_header *nullheader,
1922 TDB_DATA key, TDB_DATA data,
1925 struct ctdb_transaction_record_fetch_state *state =
1926 (struct ctdb_transaction_record_fetch_state *)private_data;
1928 if (state->key.dsize == key.dsize &&
1929 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
1932 ret = ctdb_ltdb_header_extract(&data, &state->header);
1934 DEBUG(DEBUG_ERR, ("Failed to extract header\n"));
1939 state->found = true;
1945 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
1947 struct ctdb_ltdb_header *header,
1950 struct ctdb_transaction_record_fetch_state state;
1954 state.found = false;
1956 ret = ctdb_rec_buffer_traverse(h->recbuf,
1957 ctdb_transaction_record_fetch_traverse,
1964 if (header != NULL) {
1965 *header = state.header;
1976 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
1978 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1981 struct ctdb_ltdb_header header;
1984 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
1986 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
1988 if (data->dptr == NULL) {
1991 data->dsize = tmp_data.dsize;
1995 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2000 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2008 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2009 TDB_DATA key, TDB_DATA data)
2011 TALLOC_CTX *tmp_ctx;
2012 struct ctdb_ltdb_header header;
2020 tmp_ctx = talloc_new(h);
2021 if (tmp_ctx == NULL) {
2025 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2027 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2033 if (old_data.dsize == data.dsize &&
2034 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2035 talloc_free(tmp_ctx);
2039 header.dmaster = ctdb_client_pnn(h->client);
2042 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2043 talloc_free(tmp_ctx);
2052 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2055 return ctdb_transaction_store_record(h, key, tdb_null);
2058 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2061 const char *keyname = CTDB_DB_SEQNUM_KEY;
2064 key.dptr = discard_const(keyname);
2065 key.dsize = strlen(keyname) + 1;
2067 data.dptr = (uint8_t *)&seqnum;
2068 data.dsize = sizeof(seqnum);
2070 return ctdb_transaction_store_record(h, key, data);
2073 struct ctdb_transaction_commit_state {
2074 struct tevent_context *ev;
2075 struct ctdb_transaction_handle *h;
2079 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq);
2080 static void ctdb_transaction_commit_try(struct tevent_req *subreq);
2081 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2082 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq);
2084 struct tevent_req *ctdb_transaction_commit_send(
2085 TALLOC_CTX *mem_ctx,
2086 struct tevent_context *ev,
2087 struct ctdb_transaction_handle *h)
2089 struct tevent_req *req, *subreq;
2090 struct ctdb_transaction_commit_state *state;
2091 struct ctdb_req_control request;
2093 req = tevent_req_create(mem_ctx, &state,
2094 struct ctdb_transaction_commit_state);
2102 ctdb_req_control_get_db_seqnum(&request, h->db->db_id);
2103 subreq = ctdb_client_control_send(state, ev, h->client,
2105 tevent_timeval_zero(), &request);
2106 if (tevent_req_nomem(subreq, req)) {
2107 return tevent_req_post(req, ev);
2109 tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum_done,
2115 static void ctdb_transaction_commit_seqnum_done(struct tevent_req *subreq)
2117 struct tevent_req *req = tevent_req_callback_data(
2118 subreq, struct tevent_req);
2119 struct ctdb_transaction_commit_state *state = tevent_req_data(
2120 req, struct ctdb_transaction_commit_state);
2121 struct ctdb_reply_control *reply;
2125 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2126 TALLOC_FREE(subreq);
2128 tevent_req_error(req, ret);
2132 ret = ctdb_reply_control_get_db_seqnum(reply, &state->seqnum);
2134 tevent_req_error(req, ret);
2138 ret = ctdb_transaction_store_db_seqnum(state->h, state->seqnum+1);
2140 tevent_req_error(req, ret);
2144 subreq = ctdb_recovery_wait_send(state, state->ev, state->h->client);
2145 if (tevent_req_nomem(subreq, req)) {
2148 tevent_req_set_callback(subreq, ctdb_transaction_commit_try, req);
2151 static void ctdb_transaction_commit_try(struct tevent_req *subreq)
2153 struct tevent_req *req = tevent_req_callback_data(
2154 subreq, struct tevent_req);
2155 struct ctdb_transaction_commit_state *state = tevent_req_data(
2156 req, struct ctdb_transaction_commit_state);
2157 struct ctdb_req_control request;
2161 status = ctdb_recovery_wait_recv(subreq, &ret);
2162 TALLOC_FREE(subreq);
2164 tevent_req_error(req, ret);
2168 ctdb_req_control_trans3_commit(&request, state->h->recbuf);
2169 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2170 state->h->client->pnn,
2171 tevent_timeval_zero(), &request);
2172 if (tevent_req_nomem(subreq, req)) {
2175 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2178 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2180 struct tevent_req *req = tevent_req_callback_data(
2181 subreq, struct tevent_req);
2182 struct ctdb_transaction_commit_state *state = tevent_req_data(
2183 req, struct ctdb_transaction_commit_state);
2184 struct ctdb_reply_control *reply;
2185 struct ctdb_req_control request;
2189 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2190 TALLOC_FREE(subreq);
2192 tevent_req_error(req, ret);
2196 ret = ctdb_reply_control_trans3_commit(reply);
2198 /* Control failed due to recovery */
2199 subreq = ctdb_recovery_wait_send(state, state->ev,
2201 if (tevent_req_nomem(subreq, req)) {
2204 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2209 ctdb_req_control_get_db_seqnum(&request, state->h->db->db_id);
2210 subreq = ctdb_client_control_send(state, state->ev, state->h->client,
2211 state->h->client->pnn,
2212 tevent_timeval_zero(), &request);
2213 if (tevent_req_nomem(subreq, req)) {
2216 tevent_req_set_callback(subreq, ctdb_transaction_commit_seqnum2_done,
2220 static void ctdb_transaction_commit_seqnum2_done(struct tevent_req *subreq)
2222 struct tevent_req *req = tevent_req_callback_data(
2223 subreq, struct tevent_req);
2224 struct ctdb_transaction_commit_state *state = tevent_req_data(
2225 req, struct ctdb_transaction_commit_state);
2226 struct ctdb_reply_control *reply;
2231 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2232 TALLOC_FREE(subreq);
2234 tevent_req_error(req, ret);
2238 ret = ctdb_reply_control_get_db_seqnum(reply, &seqnum);
2240 tevent_req_error(req, ret);
2244 if (seqnum == state->seqnum) {
2245 subreq = ctdb_recovery_wait_send(state, state->ev,
2247 if (tevent_req_nomem(subreq, req)) {
2250 tevent_req_set_callback(subreq, ctdb_transaction_commit_try,
2255 if (seqnum != state->seqnum + 1) {
2256 tevent_req_error(req, EIO);
2260 tevent_req_done(req);
2263 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2267 if (tevent_req_is_unix_error(req, &err)) {
2277 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2279 struct tevent_req *req;
2283 if (h->readonly || ! h->updated) {
2288 req = ctdb_transaction_commit_send(h, h->ev, h);
2294 tevent_req_poll(req, h->ev);
2296 status = ctdb_transaction_commit_recv(req, &ret);
2306 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2315 * In future Samba should register SERVER_ID.
2316 * Make that structure same as struct srvid {}.