4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/network.h"
22 #include "system/filesys.h"
28 #include "common/logging.h"
30 #include "lib/tdb_wrap/tdb_wrap.h"
31 #include "lib/util/tevent_unix.h"
32 #include "lib/util/dlinklist.h"
33 #include "lib/util/debug.h"
35 #include "protocol/protocol.h"
36 #include "protocol/protocol_api.h"
37 #include "client/client_private.h"
38 #include "client/client.h"
40 static struct ctdb_db_context *client_db_handle(
41 struct ctdb_client_context *client,
44 struct ctdb_db_context *db;
46 for (db = client->db; db != NULL; db = db->next) {
47 if (strcmp(db_name, db->db_name) == 0) {
55 struct ctdb_set_db_flags_state {
56 struct tevent_context *ev;
57 struct ctdb_client_context *client;
58 struct timeval timeout;
61 bool readonly_done, sticky_done;
66 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq);
67 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq);
68 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq);
70 static struct tevent_req *ctdb_set_db_flags_send(
72 struct tevent_context *ev,
73 struct ctdb_client_context *client,
74 uint32_t destnode, struct timeval timeout,
75 uint32_t db_id, uint8_t db_flags)
77 struct tevent_req *req, *subreq;
78 struct ctdb_set_db_flags_state *state;
79 struct ctdb_req_control request;
81 req = tevent_req_create(mem_ctx, &state,
82 struct ctdb_set_db_flags_state);
87 if (! (db_flags & (CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY))) {
89 return tevent_req_post(req, ev);
93 state->client = client;
94 state->timeout = timeout;
96 state->db_flags = db_flags;
98 ctdb_req_control_get_nodemap(&request);
99 subreq = ctdb_client_control_send(state, ev, client, destnode, timeout,
101 if (tevent_req_nomem(subreq, req)) {
102 return tevent_req_post(req, ev);
104 tevent_req_set_callback(subreq, ctdb_set_db_flags_nodemap_done, req);
109 static void ctdb_set_db_flags_nodemap_done(struct tevent_req *subreq)
111 struct tevent_req *req = tevent_req_callback_data(
112 subreq, struct tevent_req);
113 struct ctdb_set_db_flags_state *state = tevent_req_data(
114 req, struct ctdb_set_db_flags_state);
115 struct ctdb_req_control request;
116 struct ctdb_reply_control *reply;
117 struct ctdb_node_map *nodemap;
121 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
125 ("set_db_flags: 0x%08x GET_NODEMAP failed, ret=%d\n",
127 tevent_req_error(req, ret);
131 ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
135 ("set_db_flags: 0x%08x GET_NODEMAP parse failed, ret=%d\n",
137 tevent_req_error(req, ret);
141 state->count = list_of_connected_nodes(nodemap, CTDB_UNKNOWN_PNN,
142 state, &state->pnn_list);
143 talloc_free(nodemap);
144 if (state->count <= 0) {
146 ("set_db_flags: 0x%08x no connected nodes, count=%d\n",
147 state->db_id, state->count));
148 tevent_req_error(req, ENOMEM);
152 if (state->db_flags & CTDB_DB_FLAGS_READONLY) {
153 ctdb_req_control_set_db_readonly(&request, state->db_id);
154 subreq = ctdb_client_control_multi_send(
155 state, state->ev, state->client,
156 state->pnn_list, state->count,
157 state->timeout, &request);
158 if (tevent_req_nomem(subreq, req)) {
161 tevent_req_set_callback(subreq,
162 ctdb_set_db_flags_readonly_done, req);
164 state->readonly_done = true;
167 if (state->db_flags & CTDB_DB_FLAGS_STICKY) {
168 ctdb_req_control_set_db_sticky(&request, state->db_id);
169 subreq = ctdb_client_control_multi_send(
170 state, state->ev, state->client,
171 state->pnn_list, state->count,
172 state->timeout, &request);
173 if (tevent_req_nomem(subreq, req)) {
176 tevent_req_set_callback(subreq, ctdb_set_db_flags_sticky_done,
179 state->sticky_done = true;
183 static void ctdb_set_db_flags_readonly_done(struct tevent_req *subreq)
185 struct tevent_req *req = tevent_req_callback_data(
186 subreq, struct tevent_req);
187 struct ctdb_set_db_flags_state *state = tevent_req_data(
188 req, struct ctdb_set_db_flags_state);
192 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
197 ("set_db_flags: 0x%08x SET_DB_READONLY failed, ret=%d\n",
199 tevent_req_error(req, ret);
203 state->readonly_done = true;
205 if (state->readonly_done && state->sticky_done) {
206 tevent_req_done(req);
210 static void ctdb_set_db_flags_sticky_done(struct tevent_req *subreq)
212 struct tevent_req *req = tevent_req_callback_data(
213 subreq, struct tevent_req);
214 struct ctdb_set_db_flags_state *state = tevent_req_data(
215 req, struct ctdb_set_db_flags_state);
219 status = ctdb_client_control_multi_recv(subreq, &ret, NULL, NULL,
224 ("set_db_flags: 0x%08x SET_DB_STICKY failed, ret=%d\n",
226 tevent_req_error(req, ret);
230 state->sticky_done = true;
232 if (state->readonly_done && state->sticky_done) {
233 tevent_req_done(req);
237 static bool ctdb_set_db_flags_recv(struct tevent_req *req, int *perr)
241 if (tevent_req_is_unix_error(req, &err)) {
250 struct ctdb_attach_state {
251 struct tevent_context *ev;
252 struct ctdb_client_context *client;
253 struct timeval timeout;
257 struct ctdb_db_context *db;
260 static void ctdb_attach_mutex_done(struct tevent_req *subreq);
261 static void ctdb_attach_dbid_done(struct tevent_req *subreq);
262 static void ctdb_attach_dbpath_done(struct tevent_req *subreq);
263 static void ctdb_attach_health_done(struct tevent_req *subreq);
264 static void ctdb_attach_flags_done(struct tevent_req *subreq);
266 struct tevent_req *ctdb_attach_send(TALLOC_CTX *mem_ctx,
267 struct tevent_context *ev,
268 struct ctdb_client_context *client,
269 struct timeval timeout,
270 const char *db_name, uint8_t db_flags)
272 struct tevent_req *req, *subreq;
273 struct ctdb_attach_state *state;
274 struct ctdb_req_control request;
276 req = tevent_req_create(mem_ctx, &state, struct ctdb_attach_state);
281 state->db = client_db_handle(client, db_name);
282 if (state->db != NULL) {
283 tevent_req_done(req);
284 return tevent_req_post(req, ev);
288 state->client = client;
289 state->timeout = timeout;
290 state->destnode = ctdb_client_pnn(client);
291 state->db_flags = db_flags;
293 state->db = talloc_zero(client, struct ctdb_db_context);
294 if (tevent_req_nomem(state->db, req)) {
295 return tevent_req_post(req, ev);
298 state->db->db_name = talloc_strdup(state->db, db_name);
299 if (tevent_req_nomem(state->db, req)) {
300 return tevent_req_post(req, ev);
303 if (db_flags & CTDB_DB_FLAGS_PERSISTENT) {
304 state->db->persistent = true;
307 ctdb_req_control_get_tunable(&request, "TDBMutexEnabled");
308 subreq = ctdb_client_control_send(state, ev, client,
309 ctdb_client_pnn(client), timeout,
311 if (tevent_req_nomem(subreq, req)) {
312 return tevent_req_post(req, ev);
314 tevent_req_set_callback(subreq, ctdb_attach_mutex_done, req);
319 static void ctdb_attach_mutex_done(struct tevent_req *subreq)
321 struct tevent_req *req = tevent_req_callback_data(
322 subreq, struct tevent_req);
323 struct ctdb_attach_state *state = tevent_req_data(
324 req, struct ctdb_attach_state);
325 struct ctdb_reply_control *reply;
326 struct ctdb_req_control request;
327 uint32_t mutex_enabled;
331 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
334 DEBUG(DEBUG_ERR, ("attach: %s GET_TUNABLE failed, ret=%d\n",
335 state->db->db_name, ret));
336 tevent_req_error(req, ret);
340 ret = ctdb_reply_control_get_tunable(reply, &mutex_enabled);
342 /* Treat error as mutex support not available */
346 if (state->db->persistent) {
347 state->tdb_flags = TDB_DEFAULT;
349 state->tdb_flags = (TDB_NOSYNC | TDB_INCOMPATIBLE_HASH |
351 if (mutex_enabled == 1) {
352 state->tdb_flags |= TDB_MUTEX_LOCKING;
356 if (state->db->persistent) {
357 ctdb_req_control_db_attach_persistent(&request,
361 ctdb_req_control_db_attach(&request, state->db->db_name,
365 subreq = ctdb_client_control_send(state, state->ev, state->client,
366 state->destnode, state->timeout,
368 if (tevent_req_nomem(subreq, req)) {
371 tevent_req_set_callback(subreq, ctdb_attach_dbid_done, req);
374 static void ctdb_attach_dbid_done(struct tevent_req *subreq)
376 struct tevent_req *req = tevent_req_callback_data(
377 subreq, struct tevent_req);
378 struct ctdb_attach_state *state = tevent_req_data(
379 req, struct ctdb_attach_state);
380 struct ctdb_req_control request;
381 struct ctdb_reply_control *reply;
385 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
388 DEBUG(DEBUG_ERR, ("attach: %s %s failed, ret=%d\n",
390 (state->db->persistent
391 ? "DB_ATTACH_PERSISTENT"
394 tevent_req_error(req, ret);
398 if (state->db->persistent) {
399 ret = ctdb_reply_control_db_attach_persistent(
400 reply, &state->db->db_id);
402 ret = ctdb_reply_control_db_attach(reply, &state->db->db_id);
406 DEBUG(DEBUG_ERR, ("attach: %s failed to get db_id, ret=%d\n",
407 state->db->db_name, ret));
408 tevent_req_error(req, ret);
412 ctdb_req_control_getdbpath(&request, state->db->db_id);
413 subreq = ctdb_client_control_send(state, state->ev, state->client,
414 state->destnode, state->timeout,
416 if (tevent_req_nomem(subreq, req)) {
419 tevent_req_set_callback(subreq, ctdb_attach_dbpath_done, req);
422 static void ctdb_attach_dbpath_done(struct tevent_req *subreq)
424 struct tevent_req *req = tevent_req_callback_data(
425 subreq, struct tevent_req);
426 struct ctdb_attach_state *state = tevent_req_data(
427 req, struct ctdb_attach_state);
428 struct ctdb_reply_control *reply;
429 struct ctdb_req_control request;
433 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
436 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH failed, ret=%d\n",
437 state->db->db_name, ret));
438 tevent_req_error(req, ret);
442 ret = ctdb_reply_control_getdbpath(reply, state->db,
443 &state->db->db_path);
446 DEBUG(DEBUG_ERR, ("attach: %s GETDBPATH parse failed, ret=%d\n",
447 state->db->db_name, ret));
448 tevent_req_error(req, ret);
452 ctdb_req_control_db_get_health(&request, state->db->db_id);
453 subreq = ctdb_client_control_send(state, state->ev, state->client,
454 state->destnode, state->timeout,
456 if (tevent_req_nomem(subreq, req)) {
459 tevent_req_set_callback(subreq, ctdb_attach_health_done, req);
462 static void ctdb_attach_health_done(struct tevent_req *subreq)
464 struct tevent_req *req = tevent_req_callback_data(
465 subreq, struct tevent_req);
466 struct ctdb_attach_state *state = tevent_req_data(
467 req, struct ctdb_attach_state);
468 struct ctdb_reply_control *reply;
473 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
476 DEBUG(DEBUG_ERR, ("attach: %s DB_GET_HEALTH failed, ret=%d\n",
477 state->db->db_name, ret));
478 tevent_req_error(req, ret);
482 ret = ctdb_reply_control_db_get_health(reply, state, &reason);
485 ("attach: %s DB_GET_HEALTH parse failed, ret=%d\n",
486 state->db->db_name, ret));
487 tevent_req_error(req, ret);
491 if (reason != NULL) {
492 /* Database unhealthy, avoid attach */
493 DEBUG(DEBUG_ERR, ("attach: %s database unhealthy (%s)\n",
494 state->db->db_name, reason));
495 tevent_req_error(req, EIO);
499 subreq = ctdb_set_db_flags_send(state, state->ev, state->client,
500 state->destnode, state->timeout,
501 state->db->db_id, state->db_flags);
502 if (tevent_req_nomem(subreq, req)) {
505 tevent_req_set_callback(subreq, ctdb_attach_flags_done, req);
508 static void ctdb_attach_flags_done(struct tevent_req *subreq)
510 struct tevent_req *req = tevent_req_callback_data(
511 subreq, struct tevent_req);
512 struct ctdb_attach_state *state = tevent_req_data(
513 req, struct ctdb_attach_state);
517 status = ctdb_set_db_flags_recv(subreq, &ret);
520 DEBUG(DEBUG_ERR, ("attach: %s set db flags 0x%08x failed\n",
521 state->db->db_name, state->db_flags));
522 tevent_req_error(req, ret);
526 state->db->ltdb = tdb_wrap_open(state->db, state->db->db_path, 0,
527 state->tdb_flags, O_RDWR, 0);
528 if (tevent_req_nomem(state->db->ltdb, req)) {
529 DEBUG(DEBUG_ERR, ("attach: %s tdb_wrap_open failed\n",
530 state->db->db_name));
533 DLIST_ADD(state->client->db, state->db);
535 tevent_req_done(req);
538 bool ctdb_attach_recv(struct tevent_req *req, int *perr,
539 struct ctdb_db_context **out)
541 struct ctdb_attach_state *state = tevent_req_data(
542 req, struct ctdb_attach_state);
545 if (tevent_req_is_unix_error(req, &err)) {
558 int ctdb_attach(struct tevent_context *ev,
559 struct ctdb_client_context *client,
560 struct timeval timeout,
561 const char *db_name, uint8_t db_flags,
562 struct ctdb_db_context **out)
565 struct tevent_req *req;
569 mem_ctx = talloc_new(client);
570 if (mem_ctx == NULL) {
574 req = ctdb_attach_send(mem_ctx, ev, client, timeout,
577 talloc_free(mem_ctx);
581 tevent_req_poll(req, ev);
583 status = ctdb_attach_recv(req, &ret, out);
585 talloc_free(mem_ctx);
590 ctdb_set_call(db, CTDB_NULL_FUNC, ctdb_null_func);
591 ctdb_set_call(db, CTDB_FETCH_FUNC, ctdb_fetch_func);
592 ctdb_set_call(db, CTDB_FETCH_WITH_HEADER_FUNC, ctdb_fetch_with_header_func);
595 talloc_free(mem_ctx);
599 int ctdb_detach(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
600 struct ctdb_client_context *client,
601 struct timeval timeout, uint32_t db_id)
603 struct ctdb_db_context *db;
606 ret = ctdb_ctrl_db_detach(mem_ctx, ev, client, client->pnn, timeout,
612 for (db = client->db; db != NULL; db = db->next) {
613 if (db->db_id == db_id) {
614 DLIST_REMOVE(client->db, db);
622 uint32_t ctdb_db_id(struct ctdb_db_context *db)
627 struct ctdb_db_traverse_local_state {
628 ctdb_rec_parser_func_t parser;
634 static int ctdb_db_traverse_local_handler(struct tdb_context *tdb,
635 TDB_DATA key, TDB_DATA data,
638 struct ctdb_db_traverse_local_state *state =
639 (struct ctdb_db_traverse_local_state *)private_data;
642 if (state->extract_header) {
643 struct ctdb_ltdb_header header;
645 ret = ctdb_ltdb_header_extract(&data, &header);
651 ret = state->parser(0, &header, key, data, state->private_data);
653 ret = state->parser(0, NULL, key, data, state->private_data);
664 int ctdb_db_traverse_local(struct ctdb_db_context *db, bool readonly,
666 ctdb_rec_parser_func_t parser, void *private_data)
668 struct ctdb_db_traverse_local_state state;
671 state.parser = parser;
672 state.private_data = private_data;
673 state.extract_header = extract_header;
677 ret = tdb_traverse_read(db->ltdb->tdb,
678 ctdb_db_traverse_local_handler,
681 ret = tdb_traverse(db->ltdb->tdb,
682 ctdb_db_traverse_local_handler, &state);
692 struct ctdb_db_traverse_state {
693 struct tevent_context *ev;
694 struct ctdb_client_context *client;
695 struct ctdb_db_context *db;
698 struct timeval timeout;
699 ctdb_rec_parser_func_t parser;
704 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq);
705 static void ctdb_db_traverse_started(struct tevent_req *subreq);
706 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
708 static void ctdb_db_traverse_remove_handler(struct tevent_req *req);
709 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq);
711 struct tevent_req *ctdb_db_traverse_send(TALLOC_CTX *mem_ctx,
712 struct tevent_context *ev,
713 struct ctdb_client_context *client,
714 struct ctdb_db_context *db,
716 struct timeval timeout,
717 ctdb_rec_parser_func_t parser,
720 struct tevent_req *req, *subreq;
721 struct ctdb_db_traverse_state *state;
723 req = tevent_req_create(mem_ctx, &state,
724 struct ctdb_db_traverse_state);
730 state->client = client;
732 state->destnode = destnode;
733 state->srvid = CTDB_SRVID_CLIENT_RANGE | getpid();
734 state->timeout = timeout;
735 state->parser = parser;
736 state->private_data = private_data;
738 subreq = ctdb_client_set_message_handler_send(state, ev, client,
740 ctdb_db_traverse_handler,
742 if (tevent_req_nomem(subreq, req)) {
743 return tevent_req_post(req, ev);
745 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_set, req);
750 static void ctdb_db_traverse_handler_set(struct tevent_req *subreq)
752 struct tevent_req *req = tevent_req_callback_data(
753 subreq, struct tevent_req);
754 struct ctdb_db_traverse_state *state = tevent_req_data(
755 req, struct ctdb_db_traverse_state);
756 struct ctdb_traverse_start_ext traverse;
757 struct ctdb_req_control request;
761 status = ctdb_client_set_message_handler_recv(subreq, &ret);
764 tevent_req_error(req, ret);
768 traverse = (struct ctdb_traverse_start_ext) {
769 .db_id = ctdb_db_id(state->db),
771 .srvid = state->srvid,
772 .withemptyrecords = false,
775 ctdb_req_control_traverse_start_ext(&request, &traverse);
776 subreq = ctdb_client_control_send(state, state->ev, state->client,
777 state->destnode, state->timeout,
779 if (subreq == NULL) {
780 state->result = ENOMEM;
781 ctdb_db_traverse_remove_handler(req);
784 tevent_req_set_callback(subreq, ctdb_db_traverse_started, req);
787 static void ctdb_db_traverse_started(struct tevent_req *subreq)
789 struct tevent_req *req = tevent_req_callback_data(
790 subreq, struct tevent_req);
791 struct ctdb_db_traverse_state *state = tevent_req_data(
792 req, struct ctdb_db_traverse_state);
793 struct ctdb_reply_control *reply;
797 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
800 DEBUG(DEBUG_ERR, ("traverse: control failed, ret=%d\n", ret));
802 ctdb_db_traverse_remove_handler(req);
806 ret = ctdb_reply_control_traverse_start_ext(reply);
809 DEBUG(DEBUG_ERR, ("traverse: control reply failed, ret=%d\n",
812 ctdb_db_traverse_remove_handler(req);
817 static void ctdb_db_traverse_handler(uint64_t srvid, TDB_DATA data,
820 struct tevent_req *req = talloc_get_type_abort(
821 private_data, struct tevent_req);
822 struct ctdb_db_traverse_state *state = tevent_req_data(
823 req, struct ctdb_db_traverse_state);
824 struct ctdb_rec_data *rec;
825 struct ctdb_ltdb_header header;
828 ret = ctdb_rec_data_pull(data.dptr, data.dsize, state, &rec);
833 if (rec->key.dsize == 0 && rec->data.dsize == 0) {
835 ctdb_db_traverse_remove_handler(req);
839 ret = ctdb_ltdb_header_extract(&rec->data, &header);
845 if (rec->data.dsize == 0) {
850 ret = state->parser(rec->reqid, &header, rec->key, rec->data,
851 state->private_data);
855 ctdb_db_traverse_remove_handler(req);
859 static void ctdb_db_traverse_remove_handler(struct tevent_req *req)
861 struct ctdb_db_traverse_state *state = tevent_req_data(
862 req, struct ctdb_db_traverse_state);
863 struct tevent_req *subreq;
865 subreq = ctdb_client_remove_message_handler_send(state, state->ev,
868 if (tevent_req_nomem(subreq, req)) {
871 tevent_req_set_callback(subreq, ctdb_db_traverse_handler_removed, req);
874 static void ctdb_db_traverse_handler_removed(struct tevent_req *subreq)
876 struct tevent_req *req = tevent_req_callback_data(
877 subreq, struct tevent_req);
878 struct ctdb_db_traverse_state *state = tevent_req_data(
879 req, struct ctdb_db_traverse_state);
883 status = ctdb_client_remove_message_handler_recv(subreq, &ret);
886 tevent_req_error(req, ret);
890 if (state->result != 0) {
891 tevent_req_error(req, state->result);
895 tevent_req_done(req);
898 bool ctdb_db_traverse_recv(struct tevent_req *req, int *perr)
902 if (tevent_req_is_unix_error(req, &ret)) {
912 int ctdb_db_traverse(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
913 struct ctdb_client_context *client,
914 struct ctdb_db_context *db,
915 uint32_t destnode, struct timeval timeout,
916 ctdb_rec_parser_func_t parser, void *private_data)
918 struct tevent_req *req;
922 req = ctdb_db_traverse_send(mem_ctx, ev, client, db, destnode,
923 timeout, parser, private_data);
928 tevent_req_poll(req, ev);
930 status = ctdb_db_traverse_recv(req, &ret);
938 int ctdb_ltdb_fetch(struct ctdb_db_context *db, TDB_DATA key,
939 struct ctdb_ltdb_header *header,
940 TALLOC_CTX *mem_ctx, TDB_DATA *data)
945 rec = tdb_fetch(db->ltdb->tdb, key);
946 if (rec.dsize < sizeof(struct ctdb_ltdb_header)) {
947 /* No record present */
948 if (rec.dptr != NULL) {
952 if (tdb_error(db->ltdb->tdb) != TDB_ERR_NOEXIST) {
957 header->dmaster = CTDB_UNKNOWN_PNN;
966 ret = ctdb_ltdb_header_pull(rec.dptr, rec.dsize, header);
973 size_t offset = ctdb_ltdb_header_len(header);
975 data->dsize = rec.dsize - offset;
976 data->dptr = talloc_memdup(mem_ctx, rec.dptr + offset,
978 if (data->dptr == NULL) {
988 * Fetch a record from volatile database
991 * 1. Get a lock on the hash chain
992 * 2. If the record does not exist, migrate the record
993 * 3. If readonly=true and delegations do not exist, migrate the record.
994 * 4. If readonly=false and delegations exist, migrate the record.
995 * 5. If the local node is not dmaster, migrate the record.
999 struct ctdb_fetch_lock_state {
1000 struct tevent_context *ev;
1001 struct ctdb_client_context *client;
1002 struct ctdb_record_handle *h;
1007 static int ctdb_fetch_lock_check(struct tevent_req *req);
1008 static void ctdb_fetch_lock_migrate(struct tevent_req *req);
1009 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq);
1011 struct tevent_req *ctdb_fetch_lock_send(TALLOC_CTX *mem_ctx,
1012 struct tevent_context *ev,
1013 struct ctdb_client_context *client,
1014 struct ctdb_db_context *db,
1015 TDB_DATA key, bool readonly)
1017 struct ctdb_fetch_lock_state *state;
1018 struct tevent_req *req;
1021 req = tevent_req_create(mem_ctx, &state, struct ctdb_fetch_lock_state);
1027 state->client = client;
1029 state->h = talloc_zero(db, struct ctdb_record_handle);
1030 if (tevent_req_nomem(state->h, req)) {
1031 return tevent_req_post(req, ev);
1033 state->h->client = client;
1035 state->h->key.dptr = talloc_memdup(state->h, key.dptr, key.dsize);
1036 if (tevent_req_nomem(state->h->key.dptr, req)) {
1037 return tevent_req_post(req, ev);
1039 state->h->key.dsize = key.dsize;
1040 state->h->readonly = false;
1042 state->readonly = readonly;
1043 state->pnn = ctdb_client_pnn(client);
1045 /* Check that database is not persistent */
1046 if (db->persistent) {
1047 DEBUG(DEBUG_ERR, ("fetch_lock: %s database not volatile\n",
1049 tevent_req_error(req, EINVAL);
1050 return tevent_req_post(req, ev);
1053 ret = ctdb_fetch_lock_check(req);
1055 tevent_req_done(req);
1056 return tevent_req_post(req, ev);
1058 if (ret != EAGAIN) {
1059 tevent_req_error(req, ret);
1060 return tevent_req_post(req, ev);
1065 static int ctdb_fetch_lock_check(struct tevent_req *req)
1067 struct ctdb_fetch_lock_state *state = tevent_req_data(
1068 req, struct ctdb_fetch_lock_state);
1069 struct ctdb_record_handle *h = state->h;
1070 struct ctdb_ltdb_header header;
1071 TDB_DATA data = tdb_null;
1073 bool do_migrate = false;
1075 ret = tdb_chainlock(h->db->ltdb->tdb, h->key);
1078 ("fetch_lock: %s tdb_chainlock failed, %s\n",
1079 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1084 data = tdb_fetch(h->db->ltdb->tdb, h->key);
1085 if (data.dptr == NULL) {
1086 if (tdb_error(h->db->ltdb->tdb) == TDB_ERR_NOEXIST) {
1094 /* Got the record */
1095 ret = ctdb_ltdb_header_pull(data.dptr, data.dsize, &header);
1101 if (! state->readonly) {
1102 /* Read/write access */
1103 if (header.dmaster == state->pnn &&
1104 header.flags & CTDB_REC_RO_HAVE_DELEGATIONS) {
1108 if (header.dmaster != state->pnn) {
1112 /* Readonly access */
1113 if (header.dmaster != state->pnn &&
1114 ! (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1115 CTDB_REC_RO_HAVE_DELEGATIONS))) {
1120 /* We are the dmaster or readonly delegation */
1123 if (header.flags & (CTDB_REC_RO_HAVE_READONLY |
1124 CTDB_REC_RO_HAVE_DELEGATIONS)) {
1134 if (data.dptr != NULL) {
1137 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1140 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1141 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1146 ctdb_fetch_lock_migrate(req);
1151 static void ctdb_fetch_lock_migrate(struct tevent_req *req)
1153 struct ctdb_fetch_lock_state *state = tevent_req_data(
1154 req, struct ctdb_fetch_lock_state);
1155 struct ctdb_req_call request;
1156 struct tevent_req *subreq;
1158 ZERO_STRUCT(request);
1159 request.flags = CTDB_IMMEDIATE_MIGRATION;
1160 if (state->readonly) {
1161 request.flags |= CTDB_WANT_READONLY;
1163 request.db_id = state->h->db->db_id;
1164 request.callid = CTDB_NULL_FUNC;
1165 request.key = state->h->key;
1166 request.calldata = tdb_null;
1168 subreq = ctdb_client_call_send(state, state->ev, state->client,
1170 if (tevent_req_nomem(subreq, req)) {
1174 tevent_req_set_callback(subreq, ctdb_fetch_lock_migrate_done, req);
1177 static void ctdb_fetch_lock_migrate_done(struct tevent_req *subreq)
1179 struct tevent_req *req = tevent_req_callback_data(
1180 subreq, struct tevent_req);
1181 struct ctdb_fetch_lock_state *state = tevent_req_data(
1182 req, struct ctdb_fetch_lock_state);
1183 struct ctdb_reply_call *reply;
1187 status = ctdb_client_call_recv(subreq, state, &reply, &ret);
1188 TALLOC_FREE(subreq);
1190 DEBUG(DEBUG_ERR, ("fetch_lock: %s CALL failed, ret=%d\n",
1191 state->h->db->db_name, ret));
1192 tevent_req_error(req, ret);
1196 if (reply->status != 0) {
1197 tevent_req_error(req, EIO);
1202 ret = ctdb_fetch_lock_check(req);
1204 if (ret != EAGAIN) {
1205 tevent_req_error(req, ret);
1210 tevent_req_done(req);
1213 static int ctdb_record_handle_destructor(struct ctdb_record_handle *h)
1217 ret = tdb_chainunlock(h->db->ltdb->tdb, h->key);
1220 ("fetch_lock: %s tdb_chainunlock failed, %s\n",
1221 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1227 struct ctdb_record_handle *ctdb_fetch_lock_recv(struct tevent_req *req,
1228 struct ctdb_ltdb_header *header,
1229 TALLOC_CTX *mem_ctx,
1230 TDB_DATA *data, int *perr)
1232 struct ctdb_fetch_lock_state *state = tevent_req_data(
1233 req, struct ctdb_fetch_lock_state);
1234 struct ctdb_record_handle *h = state->h;
1237 if (tevent_req_is_unix_error(req, &err)) {
1239 TALLOC_FREE(state->h);
1245 if (header != NULL) {
1246 *header = h->header;
1251 offset = ctdb_ltdb_header_len(&h->header);
1253 data->dsize = h->data.dsize - offset;
1254 data->dptr = talloc_memdup(mem_ctx, h->data.dptr + offset,
1256 if (data->dptr == NULL) {
1257 TALLOC_FREE(state->h);
1265 talloc_set_destructor(h, ctdb_record_handle_destructor);
1269 int ctdb_fetch_lock(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
1270 struct ctdb_client_context *client,
1271 struct ctdb_db_context *db, TDB_DATA key, bool readonly,
1272 struct ctdb_record_handle **out,
1273 struct ctdb_ltdb_header *header, TDB_DATA *data)
1275 struct tevent_req *req;
1276 struct ctdb_record_handle *h;
1279 req = ctdb_fetch_lock_send(mem_ctx, ev, client, db, key, readonly);
1284 tevent_req_poll(req, ev);
1286 h = ctdb_fetch_lock_recv(req, header, mem_ctx, data, &ret);
1295 int ctdb_store_record(struct ctdb_record_handle *h, TDB_DATA data)
1297 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1301 /* Cannot modify the record if it was obtained as a readonly copy */
1306 /* Check if the new data is same */
1307 if (h->data.dsize == data.dsize &&
1308 memcmp(h->data.dptr, data.dptr, data.dsize) == 0) {
1309 /* No need to do anything */
1313 ctdb_ltdb_header_push(&h->header, header);
1315 rec[0].dsize = ctdb_ltdb_header_len(&h->header);
1316 rec[0].dptr = header;
1318 rec[1].dsize = data.dsize;
1319 rec[1].dptr = data.dptr;
1321 ret = tdb_storev(h->db->ltdb->tdb, h->key, rec, 2, TDB_REPLACE);
1324 ("store_record: %s tdb_storev failed, %s\n",
1325 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1332 struct ctdb_delete_record_state {
1333 struct ctdb_record_handle *h;
1336 static void ctdb_delete_record_done(struct tevent_req *subreq);
1338 struct tevent_req *ctdb_delete_record_send(TALLOC_CTX *mem_ctx,
1339 struct tevent_context *ev,
1340 struct ctdb_record_handle *h)
1342 struct tevent_req *req, *subreq;
1343 struct ctdb_delete_record_state *state;
1344 struct ctdb_key_data key;
1345 struct ctdb_req_control request;
1346 uint8_t header[sizeof(struct ctdb_ltdb_header)];
1350 req = tevent_req_create(mem_ctx, &state,
1351 struct ctdb_delete_record_state);
1358 /* Cannot delete the record if it was obtained as a readonly copy */
1360 DEBUG(DEBUG_ERR, ("fetch_lock delete: %s readonly record\n",
1362 tevent_req_error(req, EINVAL);
1363 return tevent_req_post(req, ev);
1366 ctdb_ltdb_header_push(&h->header, header);
1368 rec.dsize = ctdb_ltdb_header_len(&h->header);
1371 ret = tdb_store(h->db->ltdb->tdb, h->key, rec, TDB_REPLACE);
1374 ("fetch_lock delete: %s tdb_sore failed, %s\n",
1375 h->db->db_name, tdb_errorstr(h->db->ltdb->tdb)));
1376 tevent_req_error(req, EIO);
1377 return tevent_req_post(req, ev);
1380 key.db_id = h->db->db_id;
1381 key.header = h->header;
1384 ctdb_req_control_schedule_for_deletion(&request, &key);
1385 subreq = ctdb_client_control_send(state, ev, h->client,
1386 ctdb_client_pnn(h->client),
1387 tevent_timeval_zero(),
1389 if (tevent_req_nomem(subreq, req)) {
1390 return tevent_req_post(req, ev);
1392 tevent_req_set_callback(subreq, ctdb_delete_record_done, req);
1397 static void ctdb_delete_record_done(struct tevent_req *subreq)
1399 struct tevent_req *req = tevent_req_callback_data(
1400 subreq, struct tevent_req);
1401 struct ctdb_delete_record_state *state = tevent_req_data(
1402 req, struct ctdb_delete_record_state);
1406 status = ctdb_client_control_recv(subreq, &ret, NULL, NULL);
1407 TALLOC_FREE(subreq);
1410 ("delete_record: %s SCHDULE_FOR_DELETION failed, "
1411 "ret=%d\n", state->h->db->db_name, ret));
1412 tevent_req_error(req, ret);
1416 tevent_req_done(req);
1419 bool ctdb_delete_record_recv(struct tevent_req *req, int *perr)
1423 if (tevent_req_is_unix_error(req, &err)) {
1434 int ctdb_delete_record(struct ctdb_record_handle *h)
1436 struct tevent_context *ev = h->ev;
1437 TALLOC_CTX *mem_ctx;
1438 struct tevent_req *req;
1442 mem_ctx = talloc_new(NULL);
1443 if (mem_ctx == NULL) {
1447 req = ctdb_delete_record_send(mem_ctx, ev, h);
1449 talloc_free(mem_ctx);
1453 tevent_req_poll(req, ev);
1455 status = ctdb_delete_record_recv(req, &ret);
1456 talloc_free(mem_ctx);
1465 * Global lock functions
1468 struct ctdb_g_lock_lock_state {
1469 struct tevent_context *ev;
1470 struct ctdb_client_context *client;
1471 struct ctdb_db_context *db;
1473 struct ctdb_server_id my_sid;
1474 enum ctdb_g_lock_type lock_type;
1475 struct ctdb_record_handle *h;
1476 /* state for verification of active locks */
1477 struct ctdb_g_lock_list *lock_list;
1478 unsigned int current;
1481 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq);
1482 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req);
1483 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq);
1484 static int ctdb_g_lock_lock_update(struct tevent_req *req);
1485 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq);
1487 static bool ctdb_g_lock_conflicts(enum ctdb_g_lock_type l1,
1488 enum ctdb_g_lock_type l2)
1490 if ((l1 == CTDB_G_LOCK_READ) && (l2 == CTDB_G_LOCK_READ)) {
1496 struct tevent_req *ctdb_g_lock_lock_send(TALLOC_CTX *mem_ctx,
1497 struct tevent_context *ev,
1498 struct ctdb_client_context *client,
1499 struct ctdb_db_context *db,
1500 const char *keyname,
1501 struct ctdb_server_id *sid,
1504 struct tevent_req *req, *subreq;
1505 struct ctdb_g_lock_lock_state *state;
1507 req = tevent_req_create(mem_ctx, &state,
1508 struct ctdb_g_lock_lock_state);
1514 state->client = client;
1516 state->key.dptr = discard_const(keyname);
1517 state->key.dsize = strlen(keyname) + 1;
1518 state->my_sid = *sid;
1519 state->lock_type = (readonly ? CTDB_G_LOCK_READ : CTDB_G_LOCK_WRITE);
1521 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1523 if (tevent_req_nomem(subreq, req)) {
1524 return tevent_req_post(req, ev);
1526 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1531 static void ctdb_g_lock_lock_fetched(struct tevent_req *subreq)
1533 struct tevent_req *req = tevent_req_callback_data(
1534 subreq, struct tevent_req);
1535 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1536 req, struct ctdb_g_lock_lock_state);
1540 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1541 TALLOC_FREE(subreq);
1542 if (state->h == NULL) {
1543 DEBUG(DEBUG_ERR, ("g_lock_lock: %s fetch lock failed\n",
1544 (char *)state->key.dptr));
1545 tevent_req_error(req, ret);
1549 if (state->lock_list != NULL) {
1550 TALLOC_FREE(state->lock_list);
1554 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1556 talloc_free(data.dptr);
1558 DEBUG(DEBUG_ERR, ("g_lock_lock: %s invalid lock data\n",
1559 (char *)state->key.dptr));
1560 tevent_req_error(req, ret);
1564 ctdb_g_lock_lock_process_locks(req);
1567 static void ctdb_g_lock_lock_process_locks(struct tevent_req *req)
1569 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1570 req, struct ctdb_g_lock_lock_state);
1571 struct tevent_req *subreq;
1572 struct ctdb_g_lock *lock;
1573 bool check_server = false;
1576 while (state->current < state->lock_list->num) {
1577 lock = &state->lock_list->lock[state->current];
1579 /* We should not ask for the same lock more than once */
1580 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1581 DEBUG(DEBUG_ERR, ("g_lock_lock: %s deadlock\n",
1582 (char *)state->key.dptr));
1583 tevent_req_error(req, EDEADLK);
1587 if (ctdb_g_lock_conflicts(lock->type, state->lock_type)) {
1588 check_server = true;
1592 state->current += 1;
1596 struct ctdb_req_control request;
1598 ctdb_req_control_process_exists(&request, lock->sid.pid);
1599 subreq = ctdb_client_control_send(state, state->ev,
1602 tevent_timeval_zero(),
1604 if (tevent_req_nomem(subreq, req)) {
1607 tevent_req_set_callback(subreq, ctdb_g_lock_lock_checked, req);
1611 /* There is no conflict, add ourself to the lock_list */
1612 state->lock_list->lock = talloc_realloc(state->lock_list,
1613 state->lock_list->lock,
1615 state->lock_list->num + 1);
1616 if (state->lock_list->lock == NULL) {
1617 tevent_req_error(req, ENOMEM);
1621 lock = &state->lock_list->lock[state->lock_list->num];
1622 lock->type = state->lock_type;
1623 lock->sid = state->my_sid;
1624 state->lock_list->num += 1;
1626 ret = ctdb_g_lock_lock_update(req);
1628 tevent_req_error(req, ret);
1632 TALLOC_FREE(state->h);
1633 tevent_req_done(req);
1636 static void ctdb_g_lock_lock_checked(struct tevent_req *subreq)
1638 struct tevent_req *req = tevent_req_callback_data(
1639 subreq, struct tevent_req);
1640 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1641 req, struct ctdb_g_lock_lock_state);
1642 struct ctdb_reply_control *reply;
1646 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
1647 TALLOC_FREE(subreq);
1650 ("g_lock_lock: %s PROCESS_EXISTS failed, ret=%d\n",
1651 (char *)state->key.dptr, ret));
1652 tevent_req_error(req, ret);
1656 ret = ctdb_reply_control_process_exists(reply, &value);
1658 tevent_req_error(req, ret);
1664 /* server process exists, need to retry */
1665 TALLOC_FREE(state->h);
1666 subreq = tevent_wakeup_send(state, state->ev,
1667 tevent_timeval_current_ofs(0,1000));
1668 if (tevent_req_nomem(subreq, req)) {
1671 tevent_req_set_callback(subreq, ctdb_g_lock_lock_retry, req);
1675 /* server process does not exist, remove conflicting entry */
1676 state->lock_list->lock[state->current] =
1677 state->lock_list->lock[state->lock_list->num-1];
1678 state->lock_list->num -= 1;
1680 ret = ctdb_g_lock_lock_update(req);
1682 tevent_req_error(req, ret);
1686 ctdb_g_lock_lock_process_locks(req);
1689 static int ctdb_g_lock_lock_update(struct tevent_req *req)
1691 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1692 req, struct ctdb_g_lock_lock_state);
1696 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1697 data.dptr = talloc_size(state, data.dsize);
1698 if (data.dptr == NULL) {
1702 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1703 ret = ctdb_store_record(state->h, data);
1704 talloc_free(data.dptr);
1708 static void ctdb_g_lock_lock_retry(struct tevent_req *subreq)
1710 struct tevent_req *req = tevent_req_callback_data(
1711 subreq, struct tevent_req);
1712 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1713 req, struct ctdb_g_lock_lock_state);
1716 success = tevent_wakeup_recv(subreq);
1717 TALLOC_FREE(subreq);
1719 tevent_req_error(req, ENOMEM);
1723 subreq = ctdb_fetch_lock_send(state, state->ev, state->client,
1724 state->db, state->key, false);
1725 if (tevent_req_nomem(subreq, req)) {
1728 tevent_req_set_callback(subreq, ctdb_g_lock_lock_fetched, req);
1731 bool ctdb_g_lock_lock_recv(struct tevent_req *req, int *perr)
1733 struct ctdb_g_lock_lock_state *state = tevent_req_data(
1734 req, struct ctdb_g_lock_lock_state);
1737 TALLOC_FREE(state->h);
1739 if (tevent_req_is_unix_error(req, &err)) {
1749 struct ctdb_g_lock_unlock_state {
1750 struct tevent_context *ev;
1751 struct ctdb_client_context *client;
1752 struct ctdb_db_context *db;
1754 struct ctdb_server_id my_sid;
1755 struct ctdb_record_handle *h;
1756 struct ctdb_g_lock_list *lock_list;
1759 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq);
1760 static int ctdb_g_lock_unlock_update(struct tevent_req *req);
1761 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq);
1763 struct tevent_req *ctdb_g_lock_unlock_send(TALLOC_CTX *mem_ctx,
1764 struct tevent_context *ev,
1765 struct ctdb_client_context *client,
1766 struct ctdb_db_context *db,
1767 const char *keyname,
1768 struct ctdb_server_id sid)
1770 struct tevent_req *req, *subreq;
1771 struct ctdb_g_lock_unlock_state *state;
1773 req = tevent_req_create(mem_ctx, &state,
1774 struct ctdb_g_lock_unlock_state);
1780 state->client = client;
1782 state->key.dptr = discard_const(keyname);
1783 state->key.dsize = strlen(keyname) + 1;
1784 state->my_sid = sid;
1786 subreq = ctdb_fetch_lock_send(state, ev, client, db, state->key,
1788 if (tevent_req_nomem(subreq, req)) {
1789 return tevent_req_post(req, ev);
1791 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_fetched, req);
1796 static void ctdb_g_lock_unlock_fetched(struct tevent_req *subreq)
1798 struct tevent_req *req = tevent_req_callback_data(
1799 subreq, struct tevent_req);
1800 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1801 req, struct ctdb_g_lock_unlock_state);
1805 state->h = ctdb_fetch_lock_recv(subreq, NULL, state, &data, &ret);
1806 TALLOC_FREE(subreq);
1807 if (state->h == NULL) {
1808 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s fetch lock failed\n",
1809 (char *)state->key.dptr));
1810 tevent_req_error(req, ret);
1814 ret = ctdb_g_lock_list_pull(data.dptr, data.dsize, state,
1817 DEBUG(DEBUG_ERR, ("g_lock_unlock: %s invalid lock data\n",
1818 (char *)state->key.dptr));
1819 tevent_req_error(req, ret);
1823 ret = ctdb_g_lock_unlock_update(req);
1825 tevent_req_error(req, ret);
1829 if (state->lock_list->num == 0) {
1830 subreq = ctdb_delete_record_send(state, state->ev, state->h);
1831 if (tevent_req_nomem(subreq, req)) {
1834 tevent_req_set_callback(subreq, ctdb_g_lock_unlock_deleted,
1839 TALLOC_FREE(state->h);
1840 tevent_req_done(req);
1843 static int ctdb_g_lock_unlock_update(struct tevent_req *req)
1845 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1846 req, struct ctdb_g_lock_unlock_state);
1847 struct ctdb_g_lock *lock;
1850 for (i=0; i<state->lock_list->num; i++) {
1851 lock = &state->lock_list->lock[i];
1853 if (ctdb_server_id_equal(&lock->sid, &state->my_sid)) {
1858 if (i < state->lock_list->num) {
1859 state->lock_list->lock[i] =
1860 state->lock_list->lock[state->lock_list->num-1];
1861 state->lock_list->num -= 1;
1864 if (state->lock_list->num != 0) {
1867 data.dsize = ctdb_g_lock_list_len(state->lock_list);
1868 data.dptr = talloc_size(state, data.dsize);
1869 if (data.dptr == NULL) {
1873 ctdb_g_lock_list_push(state->lock_list, data.dptr);
1874 ret = ctdb_store_record(state->h, data);
1875 talloc_free(data.dptr);
1884 static void ctdb_g_lock_unlock_deleted(struct tevent_req *subreq)
1886 struct tevent_req *req = tevent_req_callback_data(
1887 subreq, struct tevent_req);
1888 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1889 req, struct ctdb_g_lock_unlock_state);
1893 status = ctdb_delete_record_recv(subreq, &ret);
1896 ("g_lock_unlock %s delete record failed, ret=%d\n",
1897 (char *)state->key.dptr, ret));
1898 tevent_req_error(req, ret);
1902 TALLOC_FREE(state->h);
1903 tevent_req_done(req);
1906 bool ctdb_g_lock_unlock_recv(struct tevent_req *req, int *perr)
1908 struct ctdb_g_lock_unlock_state *state = tevent_req_data(
1909 req, struct ctdb_g_lock_unlock_state);
1912 TALLOC_FREE(state->h);
1914 if (tevent_req_is_unix_error(req, &err)) {
1925 * Persistent database functions
1927 struct ctdb_transaction_start_state {
1928 struct tevent_context *ev;
1929 struct ctdb_client_context *client;
1930 struct timeval timeout;
1931 struct ctdb_transaction_handle *h;
1935 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq);
1936 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq);
1938 struct tevent_req *ctdb_transaction_start_send(TALLOC_CTX *mem_ctx,
1939 struct tevent_context *ev,
1940 struct ctdb_client_context *client,
1941 struct timeval timeout,
1942 struct ctdb_db_context *db,
1945 struct ctdb_transaction_start_state *state;
1946 struct tevent_req *req, *subreq;
1947 struct ctdb_transaction_handle *h;
1949 req = tevent_req_create(mem_ctx, &state,
1950 struct ctdb_transaction_start_state);
1955 if (! db->persistent) {
1956 tevent_req_error(req, EINVAL);
1957 return tevent_req_post(req, ev);
1961 state->client = client;
1962 state->destnode = ctdb_client_pnn(client);
1964 h = talloc_zero(db, struct ctdb_transaction_handle);
1965 if (tevent_req_nomem(h, req)) {
1966 return tevent_req_post(req, ev);
1972 h->readonly = readonly;
1975 /* SRVID is unique for databases, so client can have transactions
1976 * active for multiple databases */
1977 h->sid = ctdb_client_get_server_id(client, db->db_id);
1979 h->recbuf = ctdb_rec_buffer_init(h, db->db_id);
1980 if (tevent_req_nomem(h->recbuf, req)) {
1981 return tevent_req_post(req, ev);
1984 h->lock_name = talloc_asprintf(h, "transaction_db_0x%08x", db->db_id);
1985 if (tevent_req_nomem(h->lock_name, req)) {
1986 return tevent_req_post(req, ev);
1991 subreq = ctdb_attach_send(state, ev, client, timeout, "g_lock.tdb", 0);
1992 if (tevent_req_nomem(subreq, req)) {
1993 return tevent_req_post(req, ev);
1995 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_attached, req);
2000 static void ctdb_transaction_g_lock_attached(struct tevent_req *subreq)
2002 struct tevent_req *req = tevent_req_callback_data(
2003 subreq, struct tevent_req);
2004 struct ctdb_transaction_start_state *state = tevent_req_data(
2005 req, struct ctdb_transaction_start_state);
2009 status = ctdb_attach_recv(subreq, &ret, &state->h->db_g_lock);
2010 TALLOC_FREE(subreq);
2013 ("transaction_start: %s attach g_lock.tdb failed\n",
2014 state->h->db->db_name));
2015 tevent_req_error(req, ret);
2019 subreq = ctdb_g_lock_lock_send(state, state->ev, state->client,
2020 state->h->db_g_lock,
2021 state->h->lock_name,
2022 &state->h->sid, state->h->readonly);
2023 if (tevent_req_nomem(subreq, req)) {
2026 tevent_req_set_callback(subreq, ctdb_transaction_g_lock_done, req);
2029 static void ctdb_transaction_g_lock_done(struct tevent_req *subreq)
2031 struct tevent_req *req = tevent_req_callback_data(
2032 subreq, struct tevent_req);
2033 struct ctdb_transaction_start_state *state = tevent_req_data(
2034 req, struct ctdb_transaction_start_state);
2038 status = ctdb_g_lock_lock_recv(subreq, &ret);
2039 TALLOC_FREE(subreq);
2042 ("transaction_start: %s g_lock lock failed, ret=%d\n",
2043 state->h->db->db_name, ret));
2044 tevent_req_error(req, ret);
2048 tevent_req_done(req);
2051 struct ctdb_transaction_handle *ctdb_transaction_start_recv(
2052 struct tevent_req *req,
2055 struct ctdb_transaction_start_state *state = tevent_req_data(
2056 req, struct ctdb_transaction_start_state);
2059 if (tevent_req_is_unix_error(req, &err)) {
2069 int ctdb_transaction_start(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
2070 struct ctdb_client_context *client,
2071 struct timeval timeout,
2072 struct ctdb_db_context *db, bool readonly,
2073 struct ctdb_transaction_handle **out)
2075 struct tevent_req *req;
2076 struct ctdb_transaction_handle *h;
2079 req = ctdb_transaction_start_send(mem_ctx, ev, client, timeout, db,
2085 tevent_req_poll(req, ev);
2087 h = ctdb_transaction_start_recv(req, &ret);
2096 struct ctdb_transaction_record_fetch_state {
2098 struct ctdb_ltdb_header header;
2102 static int ctdb_transaction_record_fetch_traverse(
2104 struct ctdb_ltdb_header *nullheader,
2105 TDB_DATA key, TDB_DATA data,
2108 struct ctdb_transaction_record_fetch_state *state =
2109 (struct ctdb_transaction_record_fetch_state *)private_data;
2111 if (state->key.dsize == key.dsize &&
2112 memcmp(state->key.dptr, key.dptr, key.dsize) == 0) {
2115 ret = ctdb_ltdb_header_extract(&data, &state->header);
2118 ("record_fetch: Failed to extract header, "
2124 state->found = true;
2130 static int ctdb_transaction_record_fetch(struct ctdb_transaction_handle *h,
2132 struct ctdb_ltdb_header *header,
2135 struct ctdb_transaction_record_fetch_state state;
2139 state.found = false;
2141 ret = ctdb_rec_buffer_traverse(h->recbuf,
2142 ctdb_transaction_record_fetch_traverse,
2149 if (header != NULL) {
2150 *header = state.header;
2161 int ctdb_transaction_fetch_record(struct ctdb_transaction_handle *h,
2163 TALLOC_CTX *mem_ctx, TDB_DATA *data)
2166 struct ctdb_ltdb_header header;
2169 ret = ctdb_transaction_record_fetch(h, key, NULL, &tmp_data);
2171 data->dptr = talloc_memdup(mem_ctx, tmp_data.dptr,
2173 if (data->dptr == NULL) {
2176 data->dsize = tmp_data.dsize;
2180 ret = ctdb_ltdb_fetch(h->db, key, &header, mem_ctx, data);
2185 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, *data);
2193 int ctdb_transaction_store_record(struct ctdb_transaction_handle *h,
2194 TDB_DATA key, TDB_DATA data)
2196 TALLOC_CTX *tmp_ctx;
2197 struct ctdb_ltdb_header header;
2205 tmp_ctx = talloc_new(h);
2206 if (tmp_ctx == NULL) {
2210 ret = ctdb_transaction_record_fetch(h, key, &header, &old_data);
2212 ret = ctdb_ltdb_fetch(h->db, key, &header, tmp_ctx, &old_data);
2218 if (old_data.dsize == data.dsize &&
2219 memcmp(old_data.dptr, data.dptr, data.dsize) == 0) {
2220 talloc_free(tmp_ctx);
2224 header.dmaster = ctdb_client_pnn(h->client);
2227 ret = ctdb_rec_buffer_add(h, h->recbuf, 0, &header, key, data);
2228 talloc_free(tmp_ctx);
2237 int ctdb_transaction_delete_record(struct ctdb_transaction_handle *h,
2240 return ctdb_transaction_store_record(h, key, tdb_null);
2243 static int ctdb_transaction_fetch_db_seqnum(struct ctdb_transaction_handle *h,
2246 const char *keyname = CTDB_DB_SEQNUM_KEY;
2248 struct ctdb_ltdb_header header;
2251 key.dptr = discard_const(keyname);
2252 key.dsize = strlen(keyname) + 1;
2254 ret = ctdb_ltdb_fetch(h->db, key, &header, h, &data);
2257 ("transaction_commit: %s seqnum fetch failed, ret=%d\n",
2258 h->db->db_name, ret));
2262 if (data.dsize == 0) {
2268 if (data.dsize != sizeof(uint64_t)) {
2269 talloc_free(data.dptr);
2273 *seqnum = *(uint64_t *)data.dptr;
2275 talloc_free(data.dptr);
2279 static int ctdb_transaction_store_db_seqnum(struct ctdb_transaction_handle *h,
2282 const char *keyname = CTDB_DB_SEQNUM_KEY;
2285 key.dptr = discard_const(keyname);
2286 key.dsize = strlen(keyname) + 1;
2288 data.dptr = (uint8_t *)&seqnum;
2289 data.dsize = sizeof(seqnum);
2291 return ctdb_transaction_store_record(h, key, data);
2294 struct ctdb_transaction_commit_state {
2295 struct tevent_context *ev;
2296 struct timeval timeout;
2297 struct ctdb_transaction_handle *h;
2301 static void ctdb_transaction_commit_done(struct tevent_req *subreq);
2302 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq);
2304 struct tevent_req *ctdb_transaction_commit_send(
2305 TALLOC_CTX *mem_ctx,
2306 struct tevent_context *ev,
2307 struct timeval timeout,
2308 struct ctdb_transaction_handle *h)
2310 struct tevent_req *req, *subreq;
2311 struct ctdb_transaction_commit_state *state;
2312 struct ctdb_req_control request;
2315 req = tevent_req_create(mem_ctx, &state,
2316 struct ctdb_transaction_commit_state);
2322 state->timeout = timeout;
2325 ret = ctdb_transaction_fetch_db_seqnum(h, &state->seqnum);
2327 tevent_req_error(req, ret);
2328 return tevent_req_post(req, ev);
2331 ret = ctdb_transaction_store_db_seqnum(h, state->seqnum+1);
2333 tevent_req_error(req, ret);
2334 return tevent_req_post(req, ev);
2337 ctdb_req_control_trans3_commit(&request, h->recbuf);
2338 subreq = ctdb_client_control_send(state, ev, h->client,
2339 ctdb_client_pnn(h->client),
2341 if (tevent_req_nomem(subreq, req)) {
2342 return tevent_req_post(req, ev);
2344 tevent_req_set_callback(subreq, ctdb_transaction_commit_done, req);
2349 static void ctdb_transaction_commit_done(struct tevent_req *subreq)
2351 struct tevent_req *req = tevent_req_callback_data(
2352 subreq, struct tevent_req);
2353 struct ctdb_transaction_commit_state *state = tevent_req_data(
2354 req, struct ctdb_transaction_commit_state);
2355 struct ctdb_transaction_handle *h = state->h;
2356 struct ctdb_reply_control *reply;
2361 status = ctdb_client_control_recv(subreq, &ret, state, &reply);
2362 TALLOC_FREE(subreq);
2365 ("transaction_commit: %s TRANS3_COMMIT failed, ret=%d\n",
2366 h->db->db_name, ret));
2367 tevent_req_error(req, ret);
2371 ret = ctdb_reply_control_trans3_commit(reply);
2375 /* Control failed due to recovery */
2377 ret = ctdb_transaction_fetch_db_seqnum(h, &seqnum);
2379 tevent_req_error(req, ret);
2383 if (seqnum == state->seqnum) {
2384 struct ctdb_req_control request;
2387 ctdb_req_control_trans3_commit(&request,
2389 subreq = ctdb_client_control_send(
2390 state, state->ev, state->h->client,
2391 ctdb_client_pnn(state->h->client),
2392 state->timeout, &request);
2393 if (tevent_req_nomem(subreq, req)) {
2396 tevent_req_set_callback(subreq,
2397 ctdb_transaction_commit_done,
2402 if (seqnum != state->seqnum + 1) {
2404 ("transaction_commit: %s seqnum mismatch "
2405 "0x%"PRIx64" != 0x%"PRIx64" + 1\n",
2406 state->h->db->db_name, seqnum, state->seqnum));
2407 tevent_req_error(req, EIO);
2412 /* trans3_commit successful */
2413 subreq = ctdb_g_lock_unlock_send(state, state->ev, h->client,
2414 h->db_g_lock, h->lock_name, h->sid);
2415 if (tevent_req_nomem(subreq, req)) {
2418 tevent_req_set_callback(subreq, ctdb_transaction_commit_g_lock_done,
2422 static void ctdb_transaction_commit_g_lock_done(struct tevent_req *subreq)
2424 struct tevent_req *req = tevent_req_callback_data(
2425 subreq, struct tevent_req);
2426 struct ctdb_transaction_commit_state *state = tevent_req_data(
2427 req, struct ctdb_transaction_commit_state);
2431 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2432 TALLOC_FREE(subreq);
2435 ("transaction_commit: %s g_lock unlock failed, ret=%d\n",
2436 state->h->db->db_name, ret));
2437 tevent_req_error(req, ret);
2441 talloc_free(state->h);
2442 tevent_req_done(req);
2445 bool ctdb_transaction_commit_recv(struct tevent_req *req, int *perr)
2449 if (tevent_req_is_unix_error(req, &err)) {
2459 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
2461 struct tevent_context *ev = h->ev;
2462 TALLOC_CTX *mem_ctx;
2463 struct tevent_req *req;
2467 if (h->readonly || ! h->updated) {
2468 return ctdb_transaction_cancel(h);
2471 mem_ctx = talloc_new(NULL);
2472 if (mem_ctx == NULL) {
2476 req = ctdb_transaction_commit_send(mem_ctx, ev,
2477 tevent_timeval_zero(), h);
2479 talloc_free(mem_ctx);
2483 tevent_req_poll(req, ev);
2485 status = ctdb_transaction_commit_recv(req, &ret);
2487 talloc_free(mem_ctx);
2491 talloc_free(mem_ctx);
2495 struct ctdb_transaction_cancel_state {
2496 struct tevent_context *ev;
2497 struct ctdb_transaction_handle *h;
2498 struct timeval timeout;
2501 static void ctdb_transaction_cancel_done(struct tevent_req *subreq);
2503 struct tevent_req *ctdb_transaction_cancel_send(
2504 TALLOC_CTX *mem_ctx,
2505 struct tevent_context *ev,
2506 struct timeval timeout,
2507 struct ctdb_transaction_handle *h)
2509 struct tevent_req *req, *subreq;
2510 struct ctdb_transaction_cancel_state *state;
2512 req = tevent_req_create(mem_ctx, &state,
2513 struct ctdb_transaction_cancel_state);
2520 state->timeout = timeout;
2522 subreq = ctdb_g_lock_unlock_send(state, state->ev, state->h->client,
2523 state->h->db_g_lock,
2524 state->h->lock_name, state->h->sid);
2525 if (tevent_req_nomem(subreq, req)) {
2526 return tevent_req_post(req, ev);
2528 tevent_req_set_callback(subreq, ctdb_transaction_cancel_done,
2534 static void ctdb_transaction_cancel_done(struct tevent_req *subreq)
2536 struct tevent_req *req = tevent_req_callback_data(
2537 subreq, struct tevent_req);
2538 struct ctdb_transaction_cancel_state *state = tevent_req_data(
2539 req, struct ctdb_transaction_cancel_state);
2543 status = ctdb_g_lock_unlock_recv(subreq, &ret);
2544 TALLOC_FREE(subreq);
2547 ("transaction_cancel: %s g_lock unlock failed, ret=%d\n",
2548 state->h->db->db_name, ret));
2549 talloc_free(state->h);
2550 tevent_req_error(req, ret);
2554 talloc_free(state->h);
2555 tevent_req_done(req);
2558 bool ctdb_transaction_cancel_recv(struct tevent_req *req, int *perr)
2562 if (tevent_req_is_unix_error(req, &err)) {
2572 int ctdb_transaction_cancel(struct ctdb_transaction_handle *h)
2574 struct tevent_context *ev = h->ev;
2575 struct tevent_req *req;
2576 TALLOC_CTX *mem_ctx;
2580 mem_ctx = talloc_new(NULL);
2581 if (mem_ctx == NULL) {
2586 req = ctdb_transaction_cancel_send(mem_ctx, ev,
2587 tevent_timeval_zero(), h);
2589 talloc_free(mem_ctx);
2594 tevent_req_poll(req, ev);
2596 status = ctdb_transaction_cancel_recv(req, &ret);
2598 talloc_free(mem_ctx);
2602 talloc_free(mem_ctx);
2609 * In future Samba should register SERVER_ID.
2610 * Make that structure same as struct srvid {}.