4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
125 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
127 tevent_req_error(req, ret);
131 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
135 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
137 tevent_req_error(req, ret);
141 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142 state, &state->pnn_list);
143 talloc_free(nodemap);
144 if (state->count <= 0) {
146 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147 state->db_id, state->count));
148 tevent_req_error(req, ENOMEM);
152 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153 ctdb_req_control_set_db_readonly(&request, state->db_id);
154 subreq = ctdb_client_control_multi_send(
155 state, state->ev, state->client,
156 state->pnn_list, state->count,
157 state->timeout, &request);
158 if (tevent_req_nomem(subreq, req)) {
161 tevent_req_set_callback(subreq,
162 ctdb_set_db_flags_readonly_done, req);
164 state->readonly_done = true;
167 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168 ctdb_req_control_set_db_sticky(&request, state->db_id);
169 subreq = ctdb_client_control_multi_send(
170 state, state->ev, state->client,
171 state->pnn_list, state->count,
172 state->timeout, &request);
173 if (tevent_req_nomem(subreq, req)) {
176 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
179 state->sticky_done = true;
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
185 struct tevent_req *req = tevent_req_callback_data(
186 subreq, struct tevent_req);
187 struct ctdb_set_db_flags_state *state = tevent_req_data(
188 req, struct ctdb_set_db_flags_state);
192 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
197 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
199 tevent_req_error(req, ret);
203 state->readonly_done = true;
205 if (state->readonly_done && state->sticky_done) {
206 tevent_req_done(req);
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
212 struct tevent_req *req = tevent_req_callback_data(
213 subreq, struct tevent_req);
214 struct ctdb_set_db_flags_state *state = tevent_req_data(
215 req, struct ctdb_set_db_flags_state);
219 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
224 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
226 tevent_req_error(req, ret);
230 state->sticky_done = true;
232 if (state->readonly_done && state->sticky_done) {
233 tevent_req_done(req);
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
241 if (tevent_req_is_unix_error(req, &err)) {
250 struct ctdb_attach_state {
251 struct tevent_context *ev;
252 struct ctdb_client_context *client;
253 struct timeval timeout;
257 struct ctdb_db_context *db;
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267 struct tevent_context *ev,
268 struct ctdb_client_context *client,
269 struct timeval timeout,
270 const char *db_name, uint8_t db_flags)
272 struct tevent_req *req, *subreq;
273 struct ctdb_attach_state *state;
274 struct ctdb_req_control request;
276 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
281 state->db = client_db_handle(client, db_name);
282 if (state->db != NULL) {
283 tevent_req_done(req);
284 return tevent_req_post(req, ev);
288 state->client = client;
289 state->timeout = timeout;
290 state->destnode = ctdb_client_pnn(client);
291 state->db_flags = db_flags;
293 state->db = talloc_zero(client, struct ctdb_db_context);
294 if (tevent_req_nomem(state->db, req)) {
295 return tevent_req_post(req, ev);
298 state->db->db_name = talloc_strdup(state->db, db_name);
299 if (tevent_req_nomem(state->db, req)) {
300 return tevent_req_post(req, ev);
303 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304 state->db->persistent = true;
307 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308 subreq = ctdb_client_control_send(state, ev, client,
309 ctdb_client_pnn(client), timeout,
311 if (tevent_req_nomem(subreq, req)) {
312 return tevent_req_post(req, ev);
314 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
321 struct tevent_req *req = tevent_req_callback_data(
322 subreq, struct tevent_req);
323 struct ctdb_attach_state *state = tevent_req_data(
324 req, struct ctdb_attach_state);
325 struct ctdb_reply_control *reply;
326 struct ctdb_req_control request;
327 uint32_t mutex_enabled;
331 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
334 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335 state->db->db_name, ret));
336 tevent_req_error(req, ret);
340 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
342 /* Treat error as mutex support not available */
346 if (state->db->persistent) {
347 state->tdb_flags = TDB_DEFAULT;
349 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
351 if (mutex_enabled == 1) {
352 state->tdb_flags |= TDB_MUTEX_LOCKING;
356 if (state->db->persistent) {
357 ctdb_req_control_db_attach_persistent(&request,
361 ctdb_req_control_db_attach(&request, state->db->db_name,
365 subreq = ctdb_client_control_send(state, state->ev, state->client,
366 state->destnode, state->timeout,
368 if (tevent_req_nomem(subreq, req)) {
371 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
376 struct tevent_req *req = tevent_req_callback_data(
377 subreq, struct tevent_req);
378 struct ctdb_attach_state *state = tevent_req_data(
379 req, struct ctdb_attach_state);
380 struct ctdb_req_control request;
381 struct ctdb_reply_control *reply;
385 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
388 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
390 (state->db->persistent
391 ? "DB_ATTACH_PERSISTENT"
394 tevent_req_error(req, ret);
398 if (state->db->persistent) {
399 ret = ctdb_reply_control_db_attach_persistent(
400 reply, &state->db->db_id);
402 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
406 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407 state->db->db_name, ret));
408 tevent_req_error(req, ret);
412 ctdb_req_control_getdbpath(&request, state->db->db_id);
413 subreq = ctdb_client_control_send(state, state->ev, state->client,
414 state->destnode, state->timeout,
416 if (tevent_req_nomem(subreq, req)) {
419 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
424 struct tevent_req *req = tevent_req_callback_data(
425 subreq, struct tevent_req);
426 struct ctdb_attach_state *state = tevent_req_data(
427 req, struct ctdb_attach_state);
428 struct ctdb_reply_control *reply;
429 struct ctdb_req_control request;
433 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
436 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437 state->db->db_name, ret));
438 tevent_req_error(req, ret);
442 ret = ctdb_reply_control_getdbpath(reply, state->db,
443 &state->db->db_path);
446 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447 state->db->db_name, ret));
448 tevent_req_error(req, ret);
452 ctdb_req_control_db_get_health(&request, state->db->db_id);
453 subreq = ctdb_client_control_send(state, state->ev, state->client,
454 state->destnode, state->timeout,
456 if (tevent_req_nomem(subreq, req)) {
459 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
464 struct tevent_req *req = tevent_req_callback_data(
465 subreq, struct tevent_req);
466 struct ctdb_attach_state *state = tevent_req_data(
467 req, struct ctdb_attach_state);
468 struct ctdb_reply_control *reply;
473 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
476 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477 state->db->db_name, ret));
478 tevent_req_error(req, ret);
482 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
485 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486 state->db->db_name, ret));
487 tevent_req_error(req, ret);
491 if (reason != NULL) {
492 /* Database unhealthy, avoid attach */
493 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494 state->db->db_name, reason));
495 tevent_req_error(req, EIO);
499 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500 state->destnode, state->timeout,
501 state->db->db_id, state->db_flags);
502 if (tevent_req_nomem(subreq, req)) {
505 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
510 struct tevent_req *req = tevent_req_callback_data(
511 subreq, struct tevent_req);
512 struct ctdb_attach_state *state = tevent_req_data(
513 req, struct ctdb_attach_state);
517 status = ctdb_set_db_flags_recv(subreq, &ret);
520 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521 state->db->db_name, state->db_flags));
522 tevent_req_error(req, ret);
526 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527 state->tdb_flags, O_RDWR, 0);
528 if (tevent_req_nomem(state->db->ltdb, req)) {
529 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530 state->db->db_name));
533 DLIST_ADD(state->client->db, state->db);
535 tevent_req_done(req);
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539 struct ctdb_db_context **out)
541 struct ctdb_attach_state *state = tevent_req_data(
542 req, struct ctdb_attach_state);
545 if (tevent_req_is_unix_error(req, &err)) {
558 int ctdb_attach(struct tevent_context *ev,
559 struct ctdb_client_context *client,
560 struct timeval timeout,
561 const char *db_name, uint8_t db_flags,
562 struct ctdb_db_context **out)
565 struct tevent_req *req;
569 mem_ctx = talloc_new(client);
570 if (mem_ctx == NULL) {
574 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
577 talloc_free(mem_ctx);
581 tevent_req_poll(req, ev);
583 status = ctdb_attach_recv(req, &ret, out);
585 talloc_free(mem_ctx);
590 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
595 talloc_free(mem_ctx);
599 struct ctdb_detach_state {
600 struct ctdb_client_context *client;
601 struct tevent_context *ev;
602 struct timeval timeout;
607 static void ctdb_detach_dbname_done(struct tevent_req *subreq);
608 static void ctdb_detach_done(struct tevent_req *subreq);
610 struct tevent_req *ctdb_detach_send(TALLOC_CTX *mem_ctx,
611 struct tevent_context *ev,
612 struct ctdb_client_context *client,
613 struct timeval timeout, uint32_t db_id)
615 struct tevent_req *req, *subreq;
616 struct ctdb_detach_state *state;
617 struct ctdb_req_control request;
619 req = tevent_req_create(mem_ctx, &state, struct ctdb_detach_state);
624 state->client = client;
626 state->timeout = timeout;
627 state->db_id = db_id;
629 ctdb_req_control_get_dbname(&request, db_id);
630 subreq = ctdb_client_control_send(state, ev, client,
631 ctdb_client_pnn(client), timeout,
633 if (tevent_req_nomem(subreq, req)) {
634 return tevent_req_post(req, ev);
636 tevent_req_set_callback(subreq, ctdb_detach_dbname_done, req);
641 static void ctdb_detach_dbname_done(struct tevent_req *subreq)
643 struct tevent_req *req = tevent_req_callback_data(
644 subreq, struct tevent_req);
645 struct ctdb_detach_state *state = tevent_req_data(
646 req, struct ctdb_detach_state);
647 struct ctdb_reply_control *reply;
648 struct ctdb_req_control request;
652 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
655 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
657 tevent_req_error(req, ret);
661 ret = ctdb_reply_control_get_dbname(reply, state, &state->db_name);
663 DEBUG(DEBUG_ERR, ("detach: 0x%x GET_DBNAME failed, ret=%d\n",
665 tevent_req_error(req, ret);
669 ctdb_req_control_db_detach(&request, state->db_id);
670 subreq = ctdb_client_control_send(state, state->ev, state->client,
671 ctdb_client_pnn(state->client),
672 state->timeout, &request);
673 if (tevent_req_nomem(subreq, req)) {
676 tevent_req_set_callback(subreq, ctdb_detach_done, req);
680 static void ctdb_detach_done(struct tevent_req *subreq)
682 struct tevent_req *req = tevent_req_callback_data(
683 subreq, struct tevent_req);
684 struct ctdb_detach_state *state = tevent_req_data(
685 req, struct ctdb_detach_state);
686 struct ctdb_reply_control *reply;
687 struct ctdb_db_context *db;
691 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
694 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
695 state->db_name, ret));
696 tevent_req_error(req, ret);
700 ret = ctdb_reply_control_db_detach(reply);
702 DEBUG(DEBUG_ERR, ("detach: %s DB_DETACH failed, ret=%d\n",
703 state->db_name, ret));
704 tevent_req_error(req, ret);
708 db = client_db_handle(state->client, state->db_name);
710 DLIST_REMOVE(state->client->db, db);
714 tevent_req_done(req);
717 bool ctdb_detach_recv(struct tevent_req *req, int *perr)
721 if (tevent_req_is_unix_error(req, &ret)) {
731 int ctdb_detach(struct tevent_context *ev,
732 struct ctdb_client_context *client,
733 struct timeval timeout, uint32_t db_id)
736 struct tevent_req *req;
740 mem_ctx = talloc_new(client);
741 if (mem_ctx == NULL) {
745 req = ctdb_detach_send(mem_ctx, ev, client, timeout, db_id);
747 talloc_free(mem_ctx);
751 tevent_req_poll(req, ev);
753 status = ctdb_detach_recv(req, &ret);
755 talloc_free(mem_ctx);
759 talloc_free(mem_ctx);
763 uint32_t ctdb_db_id(struct ctdb_db_context *db)
768 struct ctdb_db_traverse_local_state {
769 ctdb_rec_parser_func_t parser;
775 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
776 TDB_DATA key, TDB_DATA data,
779 struct ctdb_db_traverse_local_state *state =
780 (struct ctdb_db_traverse_local_state *)private_data;
783 if (state->extract_header) {
784 struct ctdb_ltdb_header header;
786 ret = ctdb_ltdb_header_extract(&data, &header);
792 ret = state->parser(0, &header, key, data, state->private_data);
794 ret = state->parser(0, NULL, key, data, state->private_data);
805 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
807 ctdb_rec_parser_func_t parser, void *private_data)
809 struct ctdb_db_traverse_local_state state;
812 state.parser = parser;
813 state.private_data = private_data;
814 state.extract_header = extract_header;
818 ret = tdb_traverse_read(db->ltdb->tdb,
819 ctdb_db_traverse_local_handler,
822 ret = tdb_traverse(db->ltdb->tdb,
823 ctdb_db_traverse_local_handler, &state);
833 struct ctdb_db_traverse_state {
834 struct tevent_context *ev;
835 struct ctdb_client_context *client;
836 struct ctdb_db_context *db;
839 struct timeval timeout;
840 ctdb_rec_parser_func_t parser;
845 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
846 static void ctdb_db_traverse_started(struct tevent_req *subreq);
847 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
849 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
850 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
852 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
853 struct tevent_context *ev,
854 struct ctdb_client_context *client,
855 struct ctdb_db_context *db,
857 struct timeval timeout,
858 ctdb_rec_parser_func_t parser,
861 struct tevent_req *req, *subreq;
862 struct ctdb_db_traverse_state *state;
864 req = tevent_req_create(mem_ctx, &state,
865 struct ctdb_db_traverse_state);
871 state->client = client;
873 state->destnode = destnode;
874 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
875 state->timeout = timeout;
876 state->parser = parser;
877 state->private_data = private_data;
879 subreq = ctdb_client_set_message_handler_send(state, ev, client,
881 ctdb_db_traverse_handler,
883 if (tevent_req_nomem(subreq, req)) {
884 return tevent_req_post(req, ev);
886 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
891 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
893 struct tevent_req *req = tevent_req_callback_data(
894 subreq, struct tevent_req);
895 struct ctdb_db_traverse_state *state = tevent_req_data(
896 req, struct ctdb_db_traverse_state);
897 struct ctdb_traverse_start_ext traverse;
898 struct ctdb_req_control request;
902 status = ctdb_client_set_message_handler_recv(subreq, &ret);
905 tevent_req_error(req, ret);
909 traverse = (struct ctdb_traverse_start_ext) {
910 .db_id = ctdb_db_id(state->db),
912 .srvid = state->srvid,
913 .withemptyrecords = false,
916 ctdb_req_control_traverse_start_ext(&request, &traverse);
917 subreq = ctdb_client_control_send(state, state->ev, state->client,
918 state->destnode, state->timeout,
920 if (subreq == NULL) {
921 state->result = ENOMEM;
922 ctdb_db_traverse_remove_handler(req);
925 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
928 static void ctdb_db_traverse_started(struct tevent_req *subreq)
930 struct tevent_req *req = tevent_req_callback_data(
931 subreq, struct tevent_req);
932 struct ctdb_db_traverse_state *state = tevent_req_data(
933 req, struct ctdb_db_traverse_state);
934 struct ctdb_reply_control *reply;
938 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
941 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
943 ctdb_db_traverse_remove_handler(req);
947 ret = ctdb_reply_control_traverse_start_ext(reply);
950 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
953 ctdb_db_traverse_remove_handler(req);
958 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
961 struct tevent_req *req = talloc_get_type_abort(
962 private_data, struct tevent_req);
963 struct ctdb_db_traverse_state *state = tevent_req_data(
964 req, struct ctdb_db_traverse_state);
965 struct ctdb_rec_data *rec;
966 struct ctdb_ltdb_header header;
969 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
974 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
976 ctdb_db_traverse_remove_handler(req);
980 ret = ctdb_ltdb_header_extract(&rec->data, &header);
986 if (rec->data.dsize == 0) {
991 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
992 state->private_data);
996 ctdb_db_traverse_remove_handler(req);
1000 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
1002 struct ctdb_db_traverse_state *state = tevent_req_data(
1003 req, struct ctdb_db_traverse_state);
1004 struct tevent_req *subreq;
1006 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
1009 if (tevent_req_nomem(subreq, req)) {
1012 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
1015 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
1017 struct tevent_req *req = tevent_req_callback_data(
1018 subreq, struct tevent_req);
1019 struct ctdb_db_traverse_state *state = tevent_req_data(
1020 req, struct ctdb_db_traverse_state);
1024 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
1025 TALLOC_FREE(subreq);
1027 tevent_req_error(req, ret);
1031 if (state->result != 0) {
1032 tevent_req_error(req, state->result);
1036 tevent_req_done(req);
1039 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
1043 if (tevent_req_is_unix_error(req, &ret)) {
1053 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1054 struct ctdb_client_context *client,
1055 struct ctdb_db_context *db,
1056 uint32_t destnode, struct timeval timeout,
1057 ctdb_rec_parser_func_t parser, void *private_data)
1059 struct tevent_req *req;
1063 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
1064 timeout, parser, private_data);
1069 tevent_req_poll(req, ev);
1071 status = ctdb_db_traverse_recv(req, &ret);
1079 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
1080 struct ctdb_ltdb_header *header,
1081 TALLOC_CTX *mem_ctx, TDB_DATA *data)
1086 rec = tdb_fetch(db->ltdb->tdb, key);
1087 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
1088 /* No record present */
1089 if (rec.dptr != NULL) {
1093 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
1098 header->dmaster = CTDB_UNKNOWN_PNN;
1107 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
1114 size_t offset = ctdb_ltdb_header_len(header);
1116 data->dsize = rec.dsize - offset;
1117 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
1119 if (data->dptr == NULL) {
1129 * Fetch a record from volatile database
1132 * 1. Get a lock on the hash chain
1133 * 2. If the record does not exist, migrate the record
1134 * 3. If readonly=true and delegations do not exist, migrate the record.
1135 * 4. If readonly=false and delegations exist, migrate the record.
1136 * 5. If the local node is not dmaster, migrate the record.
1140 struct ctdb_fetch_lock_state {
1141 struct tevent_context *ev;
1142 struct ctdb_client_context *client;
1143 struct ctdb_record_handle *h;
1148 static int ctdb_fetch_lock_check(struct tevent_req *req);
1149 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1150 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1152 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1153 struct tevent_context *ev,
1154 struct ctdb_client_context *client,
1155 struct ctdb_db_context *db,
1156 TDB_DATA key, bool readonly)
1158 struct ctdb_fetch_lock_state *state;
1159 struct tevent_req *req;
1162 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1168 state->client = client;
1170 state->h = talloc_zero(db, struct ctdb_record_handle);
1171 if (tevent_req_nomem(state->h, req)) {
1172 return tevent_req_post(req, ev);
1174 state->h->client = client;
1176 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1177 if (tevent_req_nomem(state->h->key.dptr, req)) {
1178 return tevent_req_post(req, ev);
1180 state->h->key.dsize = key.dsize;
1181 state->h->readonly = false;
1183 state->readonly = readonly;
1184 state->pnn = ctdb_client_pnn(client);
1186 /* Check that database is not persistent */
1187 if (db->persistent) {
1188 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1190 tevent_req_error(req, EINVAL);
1191 return tevent_req_post(req, ev);
1194 ret = ctdb_fetch_lock_check(req);
1196 tevent_req_done(req);
1197 return tevent_req_post(req, ev);
1199 if (ret != EAGAIN) {
1200 tevent_req_error(req, ret);
1201 return tevent_req_post(req, ev);
1206 static int ctdb_fetch_lock_check(struct tevent_req *req)
1208 struct ctdb_fetch_lock_state *state = tevent_req_data(
1209 req, struct ctdb_fetch_lock_state);
1210 struct ctdb_record_handle *h = state->h;
1211 struct ctdb_ltdb_header header;
1212 TDB_DATA data = tdb_null;
1214 bool do_migrate = false;
1216 ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1219 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1220 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1225 data = tdb_fetch(h->db->ltdb->tdb, h->key);
1226 if (data.dptr == NULL) {
1227 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1235 /* Got the record */
1236 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1242 if (! state->readonly) {
1243 /* Read/write access */
1244 if (header.dmaster == state->pnn &&
1245 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1249 if (header.dmaster != state->pnn) {
1253 /* Readonly access */
1254 if (header.dmaster != state->pnn &&
1255 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1256 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1261 /* We are the dmaster or readonly delegation */
1264 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1265 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1275 if (data.dptr != NULL) {
1278 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1281 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1282 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1287 ctdb_fetch_lock_migrate(req);
1292 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1294 struct ctdb_fetch_lock_state *state = tevent_req_data(
1295 req, struct ctdb_fetch_lock_state);
1296 struct ctdb_req_call request;
1297 struct tevent_req *subreq;
1299 ZERO_STRUCT(request);
1300 request.flags = CTDB_IMMEDIATE_MIGRATION;
1301 if (state->readonly) {
1302 request.flags |= CTDB_WANT_READONLY;
1304 request.db_id = state->h->db->db_id;
1305 request.callid = CTDB_NULL_FUNC;
1306 request.key = state->h->key;
1307 request.calldata = tdb_null;
1309 subreq = ctdb_client_call_send(state, state->ev, state->client,
1311 if (tevent_req_nomem(subreq, req)) {
1315 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1318 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1320 struct tevent_req *req = tevent_req_callback_data(
1321 subreq, struct tevent_req);
1322 struct ctdb_fetch_lock_state *state = tevent_req_data(
1323 req, struct ctdb_fetch_lock_state);
1324 struct ctdb_reply_call *reply;
1328 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1329 TALLOC_FREE(subreq);
1331 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1332 state->h->db->db_name, ret));
1333 tevent_req_error(req, ret);
1337 if (reply->status != 0) {
1338 tevent_req_error(req, EIO);
1343 ret = ctdb_fetch_lock_check(req);
1345 if (ret != EAGAIN) {
1346 tevent_req_error(req, ret);
1351 tevent_req_done(req);
1354 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1358 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1361 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1362 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1368 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1369 struct ctdb_ltdb_header *header,
1370 TALLOC_CTX *mem_ctx,
1371 TDB_DATA *data, int *perr)
1373 struct ctdb_fetch_lock_state *state = tevent_req_data(
1374 req, struct ctdb_fetch_lock_state);
1375 struct ctdb_record_handle *h = state->h;
1378 if (tevent_req_is_unix_error(req, &err)) {
1380 TALLOC_FREE(state->h);
1386 if (header != NULL) {
1387 *header = h->header;
1392 offset = ctdb_ltdb_header_len(&h->header);
1394 data->dsize = h->data.dsize - offset;
1395 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1397 if (data->dptr == NULL) {
1398 TALLOC_FREE(state->h);
1406 talloc_set_destructor(h, ctdb_record_handle_destructor);
1410 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1411 struct ctdb_client_context *client,
1412 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1413 struct ctdb_record_handle **out,
1414 struct ctdb_ltdb_header *header, TDB_DATA *data)
1416 struct tevent_req *req;
1417 struct ctdb_record_handle *h;
1420 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1425 tevent_req_poll(req, ev);
1427 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1436 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1438 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1442 /* Cannot modify the record if it was obtained as a readonly copy */
1447 /* Check if the new data is same */
1448 if (h->data.dsize == data.dsize &&
1449 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1450 /* No need to do anything */
1454 ctdb_ltdb_header_push(&h->header, header);
1456 rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1457 rec[0].dptr = header;
1459 rec[1].dsize = data.dsize;
1460 rec[1].dptr = data.dptr;
1462 ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1465 ("store_record: %s tdb_storev failed, %s\n",
1466 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1473 struct ctdb_delete_record_state {
1474 struct ctdb_record_handle *h;
1477 static void ctdb_delete_record_done(struct tevent_req *subreq);
1479 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1480 struct tevent_context *ev,
1481 struct ctdb_record_handle *h)
1483 struct tevent_req *req, *subreq;
1484 struct ctdb_delete_record_state *state;
1485 struct ctdb_key_data key;
1486 struct ctdb_req_control request;
1487 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1491 req = tevent_req_create(mem_ctx, &state,
1492 struct ctdb_delete_record_state);
1499 /* Cannot delete the record if it was obtained as a readonly copy */
1501 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1503 tevent_req_error(req, EINVAL);
1504 return tevent_req_post(req, ev);
1507 ctdb_ltdb_header_push(&h->header, header);
1509 rec.dsize = ctdb_ltdb_header_len(&h->header);
1512 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1515 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1516 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1517 tevent_req_error(req, EIO);
1518 return tevent_req_post(req, ev);
1521 key.db_id = h->db->db_id;
1522 key.header = h->header;
1525 ctdb_req_control_schedule_for_deletion(&request, &key);
1526 subreq = ctdb_client_control_send(state, ev, h->client,
1527 ctdb_client_pnn(h->client),
1528 tevent_timeval_zero(),
1530 if (tevent_req_nomem(subreq, req)) {
1531 return tevent_req_post(req, ev);
1533 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1538 static void ctdb_delete_record_done(struct tevent_req *subreq)
1540 struct tevent_req *req = tevent_req_callback_data(
1541 subreq, struct tevent_req);
1542 struct ctdb_delete_record_state *state = tevent_req_data(
1543 req, struct ctdb_delete_record_state);
1547 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1548 TALLOC_FREE(subreq);
1551 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1552 "ret=%d\n", state->h->db->db_name, ret));
1553 tevent_req_error(req, ret);
1557 tevent_req_done(req);
1560 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1564 if (tevent_req_is_unix_error(req, &err)) {
1575 int ctdb_delete_record(struct ctdb_record_handle *h)
1577 struct tevent_context *ev = h->ev;
1578 TALLOC_CTX *mem_ctx;
1579 struct tevent_req *req;
1583 mem_ctx = talloc_new(NULL);
1584 if (mem_ctx == NULL) {
1588 req = ctdb_delete_record_send(mem_ctx, ev, h);
1590 talloc_free(mem_ctx);
1594 tevent_req_poll(req, ev);
1596 status = ctdb_delete_record_recv(req, &ret);
1597 talloc_free(mem_ctx);
1606 * Global lock functions
1609 struct ctdb_g_lock_lock_state {
1610 struct tevent_context *ev;
1611 struct ctdb_client_context *client;
1612 struct ctdb_db_context *db;
1614 struct ctdb_server_id my_sid;
1615 enum ctdb_g_lock_type lock_type;
1616 struct ctdb_record_handle *h;
1617 /* state for verification of active locks */
1618 struct ctdb_g_lock_list *lock_list;
1619 unsigned int current;
1622 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1623 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1624 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1625 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1626 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1628 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1629 enum ctdb_g_lock_type l2)
1631 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1637 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1638 struct tevent_context *ev,
1639 struct ctdb_client_context *client,
1640 struct ctdb_db_context *db,
1641 const char *keyname,
1642 struct ctdb_server_id *sid,
1645 struct tevent_req *req, *subreq;
1646 struct ctdb_g_lock_lock_state *state;
1648 req = tevent_req_create(mem_ctx, &state,
1649 struct ctdb_g_lock_lock_state);
1655 state->client = client;
1657 state->key.dptr = discard_const(keyname);
1658 state->key.dsize = strlen(keyname) + 1;
1659 state->my_sid = *sid;
1660 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1662 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1664 if (tevent_req_nomem(subreq, req)) {
1665 return tevent_req_post(req, ev);
1667 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1672 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1674 struct tevent_req *req = tevent_req_callback_data(
1675 subreq, struct tevent_req);
1676 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1677 req, struct ctdb_g_lock_lock_state);
1681 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1682 TALLOC_FREE(subreq);
1683 if (state->h == NULL) {
1684 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1685 (char *)state->key.dptr));
1686 tevent_req_error(req, ret);
1690 if (state->lock_list != NULL) {
1691 TALLOC_FREE(state->lock_list);
1695 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1697 talloc_free(data.dptr);
1699 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1700 (char *)state->key.dptr));
1701 tevent_req_error(req, ret);
1705 ctdb_g_lock_lock_process_locks(req);
1708 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1710 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1711 req, struct ctdb_g_lock_lock_state);
1712 struct tevent_req *subreq;
1713 struct ctdb_g_lock *lock;
1714 bool check_server = false;
1717 while (state->current < state->lock_list->num) {
1718 lock = &state->lock_list->lock[state->current];
1720 /* We should not ask for the same lock more than once */
1721 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1722 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1723 (char *)state->key.dptr));
1724 tevent_req_error(req, EDEADLK);
1728 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1729 check_server = true;
1733 state->current += 1;
1737 struct ctdb_req_control request;
1739 ctdb_req_control_process_exists(&request, lock->sid.pid);
1740 subreq = ctdb_client_control_send(state, state->ev,
1743 tevent_timeval_zero(),
1745 if (tevent_req_nomem(subreq, req)) {
1748 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1752 /* There is no conflict, add ourself to the lock_list */
1753 state->lock_list->lock = talloc_realloc(state->lock_list,
1754 state->lock_list->lock,
1756 state->lock_list->num + 1);
1757 if (state->lock_list->lock == NULL) {
1758 tevent_req_error(req, ENOMEM);
1762 lock = &state->lock_list->lock[state->lock_list->num];
1763 lock->type = state->lock_type;
1764 lock->sid = state->my_sid;
1765 state->lock_list->num += 1;
1767 ret = ctdb_g_lock_lock_update(req);
1769 tevent_req_error(req, ret);
1773 TALLOC_FREE(state->h);
1774 tevent_req_done(req);
1777 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1779 struct tevent_req *req = tevent_req_callback_data(
1780 subreq, struct tevent_req);
1781 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1782 req, struct ctdb_g_lock_lock_state);
1783 struct ctdb_reply_control *reply;
1787 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1788 TALLOC_FREE(subreq);
1791 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1792 (char *)state->key.dptr, ret));
1793 tevent_req_error(req, ret);
1797 ret = ctdb_reply_control_process_exists(reply, &value);
1799 tevent_req_error(req, ret);
1805 /* server process exists, need to retry */
1806 TALLOC_FREE(state->h);
1807 subreq = tevent_wakeup_send(state, state->ev,
1808 tevent_timeval_current_ofs(0,1000));
1809 if (tevent_req_nomem(subreq, req)) {
1812 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1816 /* server process does not exist, remove conflicting entry */
1817 state->lock_list->lock[state->current] =
1818 state->lock_list->lock[state->lock_list->num-1];
1819 state->lock_list->num -= 1;
1821 ret = ctdb_g_lock_lock_update(req);
1823 tevent_req_error(req, ret);
1827 ctdb_g_lock_lock_process_locks(req);
1830 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1832 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1833 req, struct ctdb_g_lock_lock_state);
1837 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1838 data.dptr = talloc_size(state, data.dsize);
1839 if (data.dptr == NULL) {
1843 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1844 ret = ctdb_store_record(state->h, data);
1845 talloc_free(data.dptr);
1849 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1851 struct tevent_req *req = tevent_req_callback_data(
1852 subreq, struct tevent_req);
1853 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1854 req, struct ctdb_g_lock_lock_state);
1857 success = tevent_wakeup_recv(subreq);
1858 TALLOC_FREE(subreq);
1860 tevent_req_error(req, ENOMEM);
1864 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1865 state->db, state->key, false);
1866 if (tevent_req_nomem(subreq, req)) {
1869 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1872 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1874 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1875 req, struct ctdb_g_lock_lock_state);
1878 TALLOC_FREE(state->h);
1880 if (tevent_req_is_unix_error(req, &err)) {
1890 struct ctdb_g_lock_unlock_state {
1891 struct tevent_context *ev;
1892 struct ctdb_client_context *client;
1893 struct ctdb_db_context *db;
1895 struct ctdb_server_id my_sid;
1896 struct ctdb_record_handle *h;
1897 struct ctdb_g_lock_list *lock_list;
1900 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1901 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1902 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1904 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1905 struct tevent_context *ev,
1906 struct ctdb_client_context *client,
1907 struct ctdb_db_context *db,
1908 const char *keyname,
1909 struct ctdb_server_id sid)
1911 struct tevent_req *req, *subreq;
1912 struct ctdb_g_lock_unlock_state *state;
1914 req = tevent_req_create(mem_ctx, &state,
1915 struct ctdb_g_lock_unlock_state);
1921 state->client = client;
1923 state->key.dptr = discard_const(keyname);
1924 state->key.dsize = strlen(keyname) + 1;
1925 state->my_sid = sid;
1927 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1929 if (tevent_req_nomem(subreq, req)) {
1930 return tevent_req_post(req, ev);
1932 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1937 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1939 struct tevent_req *req = tevent_req_callback_data(
1940 subreq, struct tevent_req);
1941 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1942 req, struct ctdb_g_lock_unlock_state);
1946 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1947 TALLOC_FREE(subreq);
1948 if (state->h == NULL) {
1949 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1950 (char *)state->key.dptr));
1951 tevent_req_error(req, ret);
1955 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1958 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1959 (char *)state->key.dptr));
1960 tevent_req_error(req, ret);
1964 ret = ctdb_g_lock_unlock_update(req);
1966 tevent_req_error(req, ret);
1970 if (state->lock_list->num == 0) {
1971 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1972 if (tevent_req_nomem(subreq, req)) {
1975 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1980 TALLOC_FREE(state->h);
1981 tevent_req_done(req);
1984 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1986 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1987 req, struct ctdb_g_lock_unlock_state);
1988 struct ctdb_g_lock *lock;
1991 for (i=0; i<state->lock_list->num; i++) {
1992 lock = &state->lock_list->lock[i];
1994 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1999 if (i < state->lock_list->num) {
2000 state->lock_list->lock[i] =
2001 state->lock_list->lock[state->lock_list->num-1];
2002 state->lock_list->num -= 1;
2005 if (state->lock_list->num != 0) {
2008 data.dsize = ctdb_g_lock_list_len(state->lock_list);
2009 data.dptr = talloc_size(state, data.dsize);
2010 if (data.dptr == NULL) {
2014 ctdb_g_lock_list_push(state->lock_list, data.dptr);
2015 ret = ctdb_store_record(state->h, data);
2016 talloc_free(data.dptr);
2025 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
2027 struct tevent_req *req = tevent_req_callback_data(
2028 subreq, struct tevent_req);
2029 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2030 req, struct ctdb_g_lock_unlock_state);
2034 status = ctdb_delete_record_recv(subreq, &ret);
2037 ("g_lock_unlock %s delete record failed, ret=%d\n",
2038 (char *)state->key.dptr, ret));
2039 tevent_req_error(req, ret);
2043 TALLOC_FREE(state->h);
2044 tevent_req_done(req);
2047 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
2049 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
2050 req, struct ctdb_g_lock_unlock_state);
2053 TALLOC_FREE(state->h);
2055 if (tevent_req_is_unix_error(req, &err)) {
2066 * Persistent database functions
2068 struct ctdb_transaction_start_state {
2069 struct tevent_context *ev;
2070 struct ctdb_client_context *client;
2071 struct timeval timeout;
2072 struct ctdb_transaction_handle *h;
2076 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
2077 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
2079 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
2080 struct tevent_context *ev,
2081 struct ctdb_client_context *client,
2082 struct timeval timeout,
2083 struct ctdb_db_context *db,
2086 struct ctdb_transaction_start_state *state;
2087 struct tevent_req *req, *subreq;
2088 struct ctdb_transaction_handle *h;
2090 req = tevent_req_create(mem_ctx, &state,
2091 struct ctdb_transaction_start_state);
2096 if (! db->persistent) {
2097 tevent_req_error(req, EINVAL);
2098 return tevent_req_post(req, ev);
2102 state->client = client;
2103 state->destnode = ctdb_client_pnn(client);
2105 h = talloc_zero(db, struct ctdb_transaction_handle);
2106 if (tevent_req_nomem(h, req)) {
2107 return tevent_req_post(req, ev);
2113 h->readonly = readonly;
2116 /* SRVID is unique for databases, so client can have transactions
2117 * active for multiple databases */
2118 h->sid = ctdb_client_get_server_id(client, db->db_id);
2120 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
2121 if (tevent_req_nomem(h->recbuf, req)) {
2122 return tevent_req_post(req, ev);
2125 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
2126 if (tevent_req_nomem(h->lock_name, req)) {
2127 return tevent_req_post(req, ev);
2132 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
2133 if (tevent_req_nomem(subreq, req)) {
2134 return tevent_req_post(req, ev);
2136 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2141 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2143 struct tevent_req *req = tevent_req_callback_data(
2144 subreq, struct tevent_req);
2145 struct ctdb_transaction_start_state *state = tevent_req_data(
2146 req, struct ctdb_transaction_start_state);
2150 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2151 TALLOC_FREE(subreq);
2154 ("transaction_start: %s attach g_lock.tdb failed\n",
2155 state->h->db->db_name));
2156 tevent_req_error(req, ret);
2160 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2161 state->h->db_g_lock,
2162 state->h->lock_name,
2163 &state->h->sid, state->h->readonly);
2164 if (tevent_req_nomem(subreq, req)) {
2167 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2170 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2172 struct tevent_req *req = tevent_req_callback_data(
2173 subreq, struct tevent_req);
2174 struct ctdb_transaction_start_state *state = tevent_req_data(
2175 req, struct ctdb_transaction_start_state);
2179 status = ctdb_g_lock_lock_recv(subreq, &ret);
2180 TALLOC_FREE(subreq);
2183 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2184 state->h->db->db_name, ret));
2185 tevent_req_error(req, ret);
2189 tevent_req_done(req);
2192 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2193 struct tevent_req *req,
2196 struct ctdb_transaction_start_state *state = tevent_req_data(
2197 req, struct ctdb_transaction_start_state);
2200 if (tevent_req_is_unix_error(req, &err)) {
2210 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2211 struct ctdb_client_context *client,
2212 struct timeval timeout,
2213 struct ctdb_db_context *db, bool readonly,
2214 struct ctdb_transaction_handle **out)
2216 struct tevent_req *req;
2217 struct ctdb_transaction_handle *h;
2220 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2226 tevent_req_poll(req, ev);
2228 h = ctdb_transaction_start_recv(req, &ret);
2237 struct ctdb_transaction_record_fetch_state {
2239 struct ctdb_ltdb_header header;
2243 static int ctdb_transaction_record_fetch_traverse(
2245 struct ctdb_ltdb_header *nullheader,
2246 TDB_DATA key, TDB_DATA data,
2249 struct ctdb_transaction_record_fetch_state *state =
2250 (struct ctdb_transaction_record_fetch_state *)private_data;
2252 if (state->key.dsize == key.dsize &&
2253 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2256 ret = ctdb_ltdb_header_extract(&data, &state->header);
2259 ("record_fetch: Failed to extract header, "
2265 state->found = true;
2271 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2273 struct ctdb_ltdb_header *header,
2276 struct ctdb_transaction_record_fetch_state state;
2280 state.found = false;
2282 ret = ctdb_rec_buffer_traverse(h->recbuf,
2283 ctdb_transaction_record_fetch_traverse,
2290 if (header != NULL) {
2291 *header = state.header;
2302 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2304 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2307 struct ctdb_ltdb_header header;
2310 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2312 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2314 if (data->dptr == NULL) {
2317 data->dsize = tmp_data.dsize;
2321 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2326 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2334 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2335 TDB_DATA key, TDB_DATA data)
2337 TALLOC_CTX *tmp_ctx;
2338 struct ctdb_ltdb_header header;
2346 tmp_ctx = talloc_new(h);
2347 if (tmp_ctx == NULL) {
2351 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2353 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2359 if (old_data.dsize == data.dsize &&
2360 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2361 talloc_free(tmp_ctx);
2365 header.dmaster = ctdb_client_pnn(h->client);
2368 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2369 talloc_free(tmp_ctx);
2378 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2381 return ctdb_transaction_store_record(h, key, tdb_null);
2384 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2387 const char *keyname = CTDB_DB_SEQNUM_KEY;
2389 struct ctdb_ltdb_header header;
2392 key.dptr = discard_const(keyname);
2393 key.dsize = strlen(keyname) + 1;
2395 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2398 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2399 h->db->db_name, ret));
2403 if (data.dsize == 0) {
2409 if (data.dsize != sizeof(uint64_t)) {
2410 talloc_free(data.dptr);
2414 *seqnum = *(uint64_t *)data.dptr;
2416 talloc_free(data.dptr);
2420 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2423 const char *keyname = CTDB_DB_SEQNUM_KEY;
2426 key.dptr = discard_const(keyname);
2427 key.dsize = strlen(keyname) + 1;
2429 data.dptr = (uint8_t *)&seqnum;
2430 data.dsize = sizeof(seqnum);
2432 return ctdb_transaction_store_record(h, key, data);
2435 struct ctdb_transaction_commit_state {
2436 struct tevent_context *ev;
2437 struct timeval timeout;
2438 struct ctdb_transaction_handle *h;
2442 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2443 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2445 struct tevent_req *ctdb_transaction_commit_send(
2446 TALLOC_CTX *mem_ctx,
2447 struct tevent_context *ev,
2448 struct timeval timeout,
2449 struct ctdb_transaction_handle *h)
2451 struct tevent_req *req, *subreq;
2452 struct ctdb_transaction_commit_state *state;
2453 struct ctdb_req_control request;
2456 req = tevent_req_create(mem_ctx, &state,
2457 struct ctdb_transaction_commit_state);
2463 state->timeout = timeout;
2466 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2468 tevent_req_error(req, ret);
2469 return tevent_req_post(req, ev);
2472 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2474 tevent_req_error(req, ret);
2475 return tevent_req_post(req, ev);
2478 ctdb_req_control_trans3_commit(&request, h->recbuf);
2479 subreq = ctdb_client_control_send(state, ev, h->client,
2480 ctdb_client_pnn(h->client),
2482 if (tevent_req_nomem(subreq, req)) {
2483 return tevent_req_post(req, ev);
2485 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2490 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2492 struct tevent_req *req = tevent_req_callback_data(
2493 subreq, struct tevent_req);
2494 struct ctdb_transaction_commit_state *state = tevent_req_data(
2495 req, struct ctdb_transaction_commit_state);
2496 struct ctdb_transaction_handle *h = state->h;
2497 struct ctdb_reply_control *reply;
2502 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2503 TALLOC_FREE(subreq);
2506 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2507 h->db->db_name, ret));
2508 tevent_req_error(req, ret);
2512 ret = ctdb_reply_control_trans3_commit(reply);
2516 /* Control failed due to recovery */
2518 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2520 tevent_req_error(req, ret);
2524 if (seqnum == state->seqnum) {
2525 struct ctdb_req_control request;
2528 ctdb_req_control_trans3_commit(&request,
2530 subreq = ctdb_client_control_send(
2531 state, state->ev, state->h->client,
2532 ctdb_client_pnn(state->h->client),
2533 state->timeout, &request);
2534 if (tevent_req_nomem(subreq, req)) {
2537 tevent_req_set_callback(subreq,
2538 ctdb_transaction_commit_done,
2543 if (seqnum != state->seqnum + 1) {
2545 ("transaction_commit: %s seqnum mismatch "
2546 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2547 state->h->db->db_name, seqnum, state->seqnum));
2548 tevent_req_error(req, EIO);
2553 /* trans3_commit successful */
2554 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2555 h->db_g_lock, h->lock_name, h->sid);
2556 if (tevent_req_nomem(subreq, req)) {
2559 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2563 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2565 struct tevent_req *req = tevent_req_callback_data(
2566 subreq, struct tevent_req);
2567 struct ctdb_transaction_commit_state *state = tevent_req_data(
2568 req, struct ctdb_transaction_commit_state);
2572 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2573 TALLOC_FREE(subreq);
2576 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2577 state->h->db->db_name, ret));
2578 tevent_req_error(req, ret);
2582 talloc_free(state->h);
2583 tevent_req_done(req);
2586 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2590 if (tevent_req_is_unix_error(req, &err)) {
2600 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2602 struct tevent_context *ev = h->ev;
2603 TALLOC_CTX *mem_ctx;
2604 struct tevent_req *req;
2608 if (h->readonly || ! h->updated) {
2609 return ctdb_transaction_cancel(h);
2612 mem_ctx = talloc_new(NULL);
2613 if (mem_ctx == NULL) {
2617 req = ctdb_transaction_commit_send(mem_ctx, ev,
2618 tevent_timeval_zero(), h);
2620 talloc_free(mem_ctx);
2624 tevent_req_poll(req, ev);
2626 status = ctdb_transaction_commit_recv(req, &ret);
2628 talloc_free(mem_ctx);
2632 talloc_free(mem_ctx);
2636 struct ctdb_transaction_cancel_state {
2637 struct tevent_context *ev;
2638 struct ctdb_transaction_handle *h;
2639 struct timeval timeout;
2642 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2644 struct tevent_req *ctdb_transaction_cancel_send(
2645 TALLOC_CTX *mem_ctx,
2646 struct tevent_context *ev,
2647 struct timeval timeout,
2648 struct ctdb_transaction_handle *h)
2650 struct tevent_req *req, *subreq;
2651 struct ctdb_transaction_cancel_state *state;
2653 req = tevent_req_create(mem_ctx, &state,
2654 struct ctdb_transaction_cancel_state);
2661 state->timeout = timeout;
2663 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2664 state->h->db_g_lock,
2665 state->h->lock_name, state->h->sid);
2666 if (tevent_req_nomem(subreq, req)) {
2667 return tevent_req_post(req, ev);
2669 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2675 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2677 struct tevent_req *req = tevent_req_callback_data(
2678 subreq, struct tevent_req);
2679 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2680 req, struct ctdb_transaction_cancel_state);
2684 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2685 TALLOC_FREE(subreq);
2688 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2689 state->h->db->db_name, ret));
2690 talloc_free(state->h);
2691 tevent_req_error(req, ret);
2695 talloc_free(state->h);
2696 tevent_req_done(req);
2699 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2703 if (tevent_req_is_unix_error(req, &err)) {
2713 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2715 struct tevent_context *ev = h->ev;
2716 struct tevent_req *req;
2717 TALLOC_CTX *mem_ctx;
2721 mem_ctx = talloc_new(NULL);
2722 if (mem_ctx == NULL) {
2727 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2728 tevent_timeval_zero(), h);
2730 talloc_free(mem_ctx);
2735 tevent_req_poll(req, ev);
2737 status = ctdb_transaction_cancel_recv(req, &ret);
2739 talloc_free(mem_ctx);
2743 talloc_free(mem_ctx);
2750 * In future Samba should register SERVER_ID.
2751 * Make that structure same as struct srvid {}.