4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
36 allocate a packet for use in client<->daemon communication
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
40 enum ctdb_operation operation,
41 size_t length, size_t slength,
45 struct ctdb_req_header *hdr;
47 length = MAX(length, slength);
48 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
50 hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
52 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53 operation, (unsigned)length));
56 talloc_set_name_const(hdr, type);
58 hdr->operation = operation;
59 hdr->ctdb_magic = CTDB_MAGIC;
60 hdr->ctdb_version = CTDB_VERSION;
61 hdr->srcnode = ctdb->pnn;
63 hdr->generation = ctdb->vnn_map->generation;
70 local version of ctdb_call
72 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
73 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
76 struct ctdb_call_info *c;
77 struct ctdb_registered_call *fn;
78 struct ctdb_context *ctdb = ctdb_db->ctdb;
80 c = talloc(ctdb, struct ctdb_call_info);
81 CTDB_NO_MEMORY(ctdb, c);
84 c->call_data = &call->call_data;
85 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
86 c->record_data.dsize = data->dsize;
87 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
92 for (fn=ctdb_db->calls;fn;fn=fn->next) {
93 if (fn->id == call->call_id) break;
96 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
101 if (fn->fn(c) != 0) {
102 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
107 /* we need to force the record to be written out if this was a remote access */
108 if (c->new_data == NULL) {
109 c->new_data = &c->record_data;
113 /* XXX check that we always have the lock here? */
114 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
122 call->reply_data = *c->reply_data;
124 talloc_steal(call, call->reply_data.dptr);
125 talloc_set_name_const(call->reply_data.dptr, __location__);
127 call->reply_data.dptr = NULL;
128 call->reply_data.dsize = 0;
130 call->status = c->status;
139 queue a packet for sending from client to daemon
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
143 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
148 called when a CTDB_REPLY_CALL packet comes in in the client
150 This packet comes in response to a CTDB_REQ_CALL request packet. It
151 contains any reply data from the call
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
155 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156 struct ctdb_client_call_state *state;
158 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
160 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
164 if (hdr->reqid != state->reqid) {
165 /* we found a record but it was the wrong one */
166 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
170 state->call->reply_data.dptr = c->data;
171 state->call->reply_data.dsize = c->datalen;
172 state->call->status = c->status;
174 talloc_steal(state, c);
176 state->state = CTDB_CALL_DONE;
178 if (state->async.fn) {
179 state->async.fn(state);
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
186 this is called in the client, when data comes in from the daemon
188 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
190 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
194 /* place the packet as a child of a tmp_ctx. We then use
195 talloc_free() below to free it. If any of the calls want
196 to keep it, then they will steal it somewhere else, and the
197 talloc_free() will be a no-op */
198 tmp_ctx = talloc_new(ctdb);
199 talloc_steal(tmp_ctx, hdr);
202 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
206 if (cnt < sizeof(*hdr)) {
207 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
210 if (cnt != hdr->length) {
211 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
212 (unsigned)hdr->length, (unsigned)cnt);
216 if (hdr->ctdb_magic != CTDB_MAGIC) {
217 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
221 if (hdr->ctdb_version != CTDB_VERSION) {
222 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
226 switch (hdr->operation) {
227 case CTDB_REPLY_CALL:
228 ctdb_client_reply_call(ctdb, hdr);
231 case CTDB_REQ_MESSAGE:
232 ctdb_request_message(ctdb, hdr);
235 case CTDB_REPLY_CONTROL:
236 ctdb_client_reply_control(ctdb, hdr);
240 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
244 talloc_free(tmp_ctx);
248 connect to a unix domain socket
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
252 struct sockaddr_un addr;
254 memset(&addr, 0, sizeof(addr));
255 addr.sun_family = AF_UNIX;
256 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
258 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259 if (ctdb->daemon.sd == -1) {
260 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
264 set_nonblocking(ctdb->daemon.sd);
265 set_close_on_exec(ctdb->daemon.sd);
267 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
268 close(ctdb->daemon.sd);
269 ctdb->daemon.sd = -1;
270 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
274 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
276 ctdb_client_read_cb, ctdb);
281 struct ctdb_record_handle {
282 struct ctdb_db_context *ctdb_db;
285 struct ctdb_ltdb_header header;
290 make a recv call to the local ctdb daemon - called from client context
292 This is called when the program wants to wait for a ctdb_call to complete and get the
293 results. This call will block unless the call has already completed.
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
301 while (state->state < CTDB_CALL_DONE) {
302 event_loop_once(state->ctdb_db->ctdb->ev);
304 if (state->state != CTDB_CALL_DONE) {
305 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
310 if (state->call->reply_data.dsize) {
311 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312 state->call->reply_data.dptr,
313 state->call->reply_data.dsize);
314 call->reply_data.dsize = state->call->reply_data.dsize;
316 call->reply_data.dptr = NULL;
317 call->reply_data.dsize = 0;
319 call->status = state->call->status;
329 destroy a ctdb_call in client
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
333 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
338 construct an event driven local ctdb_call
340 this is used so that locally processed ctdb_call requests are processed
341 in an event driven manner
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
344 struct ctdb_call *call,
345 struct ctdb_ltdb_header *header,
348 struct ctdb_client_call_state *state;
349 struct ctdb_context *ctdb = ctdb_db->ctdb;
352 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353 CTDB_NO_MEMORY_NULL(ctdb, state);
354 state->call = talloc_zero(state, struct ctdb_call);
355 CTDB_NO_MEMORY_NULL(ctdb, state->call);
357 talloc_steal(state, data->dptr);
359 state->state = CTDB_CALL_DONE;
360 *(state->call) = *call;
361 state->ctdb_db = ctdb_db;
363 ret = ctdb_call_local(ctdb_db, state->call, header, state, data);
369 make a ctdb call to the local daemon - async send. Called from client context.
371 This constructs a ctdb_call request and queues it for processing.
372 This call never blocks.
374 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
375 struct ctdb_call *call)
377 struct ctdb_client_call_state *state;
378 struct ctdb_context *ctdb = ctdb_db->ctdb;
379 struct ctdb_ltdb_header header;
383 struct ctdb_req_call *c;
385 /* if the domain socket is not yet open, open it */
386 if (ctdb->daemon.sd==-1) {
387 ctdb_socket_connect(ctdb);
390 ret = ctdb_ltdb_lock(ctdb_db, call->key);
392 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
398 if (ret == 0 && header.dmaster == ctdb->pnn) {
399 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
400 talloc_free(data.dptr);
401 ctdb_ltdb_unlock(ctdb_db, call->key);
405 ctdb_ltdb_unlock(ctdb_db, call->key);
406 talloc_free(data.dptr);
408 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
410 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
413 state->call = talloc_zero(state, struct ctdb_call);
414 if (state->call == NULL) {
415 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
419 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
420 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
422 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
426 state->reqid = ctdb_reqid_new(ctdb, state);
427 state->ctdb_db = ctdb_db;
428 talloc_set_destructor(state, ctdb_client_call_destructor);
430 c->hdr.reqid = state->reqid;
431 c->flags = call->flags;
432 c->db_id = ctdb_db->db_id;
433 c->callid = call->call_id;
435 c->keylen = call->key.dsize;
436 c->calldatalen = call->call_data.dsize;
437 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
438 memcpy(&c->data[call->key.dsize],
439 call->call_data.dptr, call->call_data.dsize);
440 *(state->call) = *call;
441 state->call->call_data.dptr = &c->data[call->key.dsize];
442 state->call->key.dptr = &c->data[0];
444 state->state = CTDB_CALL_WAIT;
447 ctdb_client_queue_pkt(ctdb, &c->hdr);
454 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
456 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
458 struct ctdb_client_call_state *state;
460 state = ctdb_call_send(ctdb_db, call);
461 return ctdb_call_recv(state, call);
466 tell the daemon what messaging srvid we will use, and register the message
467 handler function in the client
469 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
470 ctdb_message_fn_t handler,
477 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
478 tdb_null, NULL, NULL, &status, NULL, NULL);
479 if (res != 0 || status != 0) {
480 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
484 /* also need to register the handler with our own ctdb structure */
485 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
489 tell the daemon we no longer want a srvid
491 int ctdb_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
496 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
497 tdb_null, NULL, NULL, &status, NULL, NULL);
498 if (res != 0 || status != 0) {
499 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
503 /* also need to register the handler with our own ctdb structure */
504 ctdb_deregister_message_handler(ctdb, srvid, private_data);
510 send a message - from client context
512 int ctdb_send_message(struct ctdb_context *ctdb, uint32_t pnn,
513 uint64_t srvid, TDB_DATA data)
515 struct ctdb_req_message *r;
518 len = offsetof(struct ctdb_req_message, data) + data.dsize;
519 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
520 len, struct ctdb_req_message);
521 CTDB_NO_MEMORY(ctdb, r);
523 r->hdr.destnode = pnn;
525 r->datalen = data.dsize;
526 memcpy(&r->data[0], data.dptr, data.dsize);
528 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
535 cancel a ctdb_fetch_lock operation, releasing the lock
537 static int fetch_lock_destructor(struct ctdb_record_handle *h)
539 ctdb_ltdb_unlock(h->ctdb_db, h->key);
544 force the migration of a record to this node
546 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
548 struct ctdb_call call;
550 call.call_id = CTDB_NULL_FUNC;
552 call.flags = CTDB_IMMEDIATE_MIGRATION;
553 return ctdb_call(ctdb_db, &call);
557 get a lock on a record, and return the records data. Blocks until it gets the lock
559 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
560 TDB_DATA key, TDB_DATA *data)
563 struct ctdb_record_handle *h;
566 procedure is as follows:
568 1) get the chain lock.
569 2) check if we are dmaster
570 3) if we are the dmaster then return handle
571 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
573 5) when we get the reply, goto (1)
576 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
581 h->ctdb_db = ctdb_db;
583 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
584 if (h->key.dptr == NULL) {
590 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
591 (const char *)key.dptr));
594 /* step 1 - get the chain lock */
595 ret = ctdb_ltdb_lock(ctdb_db, key);
597 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
602 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
604 talloc_set_destructor(h, fetch_lock_destructor);
606 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
608 /* when torturing, ensure we test the remote path */
609 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
611 h->header.dmaster = (uint32_t)-1;
615 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
617 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
618 ctdb_ltdb_unlock(ctdb_db, key);
619 ret = ctdb_client_force_migration(ctdb_db, key);
621 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
628 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
633 store some data to the record that was locked with ctdb_fetch_lock()
635 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
637 if (h->ctdb_db->persistent) {
638 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
642 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
646 non-locking fetch of a record
648 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
649 TDB_DATA key, TDB_DATA *data)
651 struct ctdb_call call;
654 call.call_id = CTDB_FETCH_FUNC;
655 call.call_data.dptr = NULL;
656 call.call_data.dsize = 0;
658 ret = ctdb_call(ctdb_db, &call);
661 *data = call.reply_data;
662 talloc_steal(mem_ctx, data->dptr);
671 called when a control completes or timesout to invoke the callback
672 function the user provided
674 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
675 struct timeval t, void *private_data)
677 struct ctdb_client_control_state *state;
678 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
681 state = talloc_get_type(private_data, struct ctdb_client_control_state);
682 talloc_steal(tmp_ctx, state);
684 ret = ctdb_control_recv(state->ctdb, state, state,
689 talloc_free(tmp_ctx);
693 called when a CTDB_REPLY_CONTROL packet comes in in the client
695 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
696 contains any reply data from the control
698 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
699 struct ctdb_req_header *hdr)
701 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
702 struct ctdb_client_control_state *state;
704 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
706 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
710 if (hdr->reqid != state->reqid) {
711 /* we found a record but it was the wrong one */
712 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
716 state->outdata.dptr = c->data;
717 state->outdata.dsize = c->datalen;
718 state->status = c->status;
720 state->errormsg = talloc_strndup(state,
721 (char *)&c->data[c->datalen],
725 /* state->outdata now uses resources from c so we dont want c
726 to just dissappear from under us while state is still alive
728 talloc_steal(state, c);
730 state->state = CTDB_CONTROL_DONE;
732 /* if we had a callback registered for this control, pull the response
733 and call the callback.
735 if (state->async.fn) {
736 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
742 destroy a ctdb_control in client
744 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
746 ctdb_reqid_remove(state->ctdb, state->reqid);
751 /* time out handler for ctdb_control */
752 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
753 struct timeval t, void *private_data)
755 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
757 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
758 "dstnode:%u\n", state->reqid, state->c->opcode,
759 state->c->hdr.destnode));
761 state->state = CTDB_CONTROL_TIMEOUT;
763 /* if we had a callback registered for this control, pull the response
764 and call the callback.
766 if (state->async.fn) {
767 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
771 /* async version of send control request */
772 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
773 uint32_t destnode, uint64_t srvid,
774 uint32_t opcode, uint32_t flags, TDB_DATA data,
776 struct timeval *timeout,
779 struct ctdb_client_control_state *state;
781 struct ctdb_req_control *c;
788 /* if the domain socket is not yet open, open it */
789 if (ctdb->daemon.sd==-1) {
790 ctdb_socket_connect(ctdb);
793 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
794 CTDB_NO_MEMORY_NULL(ctdb, state);
797 state->reqid = ctdb_reqid_new(ctdb, state);
798 state->state = CTDB_CONTROL_WAIT;
799 state->errormsg = NULL;
801 talloc_set_destructor(state, ctdb_control_destructor);
803 len = offsetof(struct ctdb_req_control, data) + data.dsize;
804 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
805 len, struct ctdb_req_control);
807 CTDB_NO_MEMORY_NULL(ctdb, c);
808 c->hdr.reqid = state->reqid;
809 c->hdr.destnode = destnode;
814 c->datalen = data.dsize;
816 memcpy(&c->data[0], data.dptr, data.dsize);
820 if (timeout && !timeval_is_zero(timeout)) {
821 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
824 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
830 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
839 /* async version of receive control reply */
840 int ctdb_control_recv(struct ctdb_context *ctdb,
841 struct ctdb_client_control_state *state,
843 TDB_DATA *outdata, int32_t *status, char **errormsg)
847 if (status != NULL) {
850 if (errormsg != NULL) {
858 /* prevent double free of state */
859 tmp_ctx = talloc_new(ctdb);
860 talloc_steal(tmp_ctx, state);
862 /* loop one event at a time until we either timeout or the control
865 while (state->state == CTDB_CONTROL_WAIT) {
866 event_loop_once(ctdb->ev);
869 if (state->state != CTDB_CONTROL_DONE) {
870 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
871 if (state->async.fn) {
872 state->async.fn(state);
874 talloc_free(tmp_ctx);
878 if (state->errormsg) {
879 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
881 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
883 if (state->async.fn) {
884 state->async.fn(state);
886 talloc_free(tmp_ctx);
891 *outdata = state->outdata;
892 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
896 *status = state->status;
899 if (state->async.fn) {
900 state->async.fn(state);
903 talloc_free(tmp_ctx);
910 send a ctdb control message
911 timeout specifies how long we should wait for a reply.
912 if timeout is NULL we wait indefinitely
914 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
915 uint32_t opcode, uint32_t flags, TDB_DATA data,
916 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
917 struct timeval *timeout,
920 struct ctdb_client_control_state *state;
922 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
923 flags, data, mem_ctx,
926 /* FIXME: Error conditions in ctdb_control_send return NULL without
927 * setting errormsg. So, there is no way to distinguish between sucess
928 * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
929 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
930 if (status != NULL) {
936 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
944 a process exists call. Returns 0 if process exists, -1 otherwise
946 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
952 data.dptr = (uint8_t*)&pid;
953 data.dsize = sizeof(pid);
955 ret = ctdb_control(ctdb, destnode, 0,
956 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
957 NULL, NULL, &status, NULL, NULL);
959 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
967 get remote statistics
969 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
975 ret = ctdb_control(ctdb, destnode, 0,
976 CTDB_CONTROL_STATISTICS, 0, tdb_null,
977 ctdb, &data, &res, NULL, NULL);
978 if (ret != 0 || res != 0) {
979 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
983 if (data.dsize != sizeof(struct ctdb_statistics)) {
984 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
985 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
989 *status = *(struct ctdb_statistics *)data.dptr;
990 talloc_free(data.dptr);
996 shutdown a remote ctdb node
998 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1000 struct ctdb_client_control_state *state;
1002 state = ctdb_control_send(ctdb, destnode, 0,
1003 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1004 NULL, &timeout, NULL);
1005 if (state == NULL) {
1006 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1014 get vnn map from a remote node
1016 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1021 struct ctdb_vnn_map_wire *map;
1023 ret = ctdb_control(ctdb, destnode, 0,
1024 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1025 mem_ctx, &outdata, &res, &timeout, NULL);
1026 if (ret != 0 || res != 0) {
1027 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1031 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1032 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1033 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1034 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1038 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1039 CTDB_NO_MEMORY(ctdb, *vnnmap);
1040 (*vnnmap)->generation = map->generation;
1041 (*vnnmap)->size = map->size;
1042 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1044 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1045 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1046 talloc_free(outdata.dptr);
1053 get the recovery mode of a remote node
1055 struct ctdb_client_control_state *
1056 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1058 return ctdb_control_send(ctdb, destnode, 0,
1059 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1060 mem_ctx, &timeout, NULL);
1063 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1068 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1070 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1075 *recmode = (uint32_t)res;
1081 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1083 struct ctdb_client_control_state *state;
1085 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1086 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1093 set the recovery mode of a remote node
1095 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1101 data.dsize = sizeof(uint32_t);
1102 data.dptr = (unsigned char *)&recmode;
1104 ret = ctdb_control(ctdb, destnode, 0,
1105 CTDB_CONTROL_SET_RECMODE, 0, data,
1106 NULL, NULL, &res, &timeout, NULL);
1107 if (ret != 0 || res != 0) {
1108 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1118 get the recovery master of a remote node
1120 struct ctdb_client_control_state *
1121 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1122 struct timeval timeout, uint32_t destnode)
1124 return ctdb_control_send(ctdb, destnode, 0,
1125 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1126 mem_ctx, &timeout, NULL);
1129 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1134 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1136 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1141 *recmaster = (uint32_t)res;
1147 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1149 struct ctdb_client_control_state *state;
1151 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1152 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1157 set the recovery master of a remote node
1159 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1166 data.dsize = sizeof(uint32_t);
1167 data.dptr = (unsigned char *)&recmaster;
1169 ret = ctdb_control(ctdb, destnode, 0,
1170 CTDB_CONTROL_SET_RECMASTER, 0, data,
1171 NULL, NULL, &res, &timeout, NULL);
1172 if (ret != 0 || res != 0) {
1173 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1182 get a list of databases off a remote node
1184 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1185 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1191 ret = ctdb_control(ctdb, destnode, 0,
1192 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1193 mem_ctx, &outdata, &res, &timeout, NULL);
1194 if (ret != 0 || res != 0) {
1195 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1199 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1200 talloc_free(outdata.dptr);
1206 get a list of nodes (vnn and flags ) from a remote node
1208 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1209 struct timeval timeout, uint32_t destnode,
1210 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1216 ret = ctdb_control(ctdb, destnode, 0,
1217 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1218 mem_ctx, &outdata, &res, &timeout, NULL);
1219 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1220 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1221 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1223 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1224 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1228 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1229 talloc_free(outdata.dptr);
1235 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1237 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1238 struct timeval timeout, uint32_t destnode,
1239 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1243 struct ctdb_node_mapv4 *nodemapv4;
1246 ret = ctdb_control(ctdb, destnode, 0,
1247 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1248 mem_ctx, &outdata, &res, &timeout, NULL);
1249 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1250 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1254 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1256 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1257 (*nodemap) = talloc_zero_size(mem_ctx, len);
1258 CTDB_NO_MEMORY(ctdb, (*nodemap));
1260 (*nodemap)->num = nodemapv4->num;
1261 for (i=0; i<nodemapv4->num; i++) {
1262 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1263 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1264 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1265 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1268 talloc_free(outdata.dptr);
1274 drop the transport, reload the nodes file and restart the transport
1276 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1277 struct timeval timeout, uint32_t destnode)
1282 ret = ctdb_control(ctdb, destnode, 0,
1283 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1284 NULL, NULL, &res, &timeout, NULL);
1285 if (ret != 0 || res != 0) {
1286 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1295 set vnn map on a node
1297 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1298 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1303 struct ctdb_vnn_map_wire *map;
1306 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1307 map = talloc_size(mem_ctx, len);
1308 CTDB_NO_MEMORY(ctdb, map);
1310 map->generation = vnnmap->generation;
1311 map->size = vnnmap->size;
1312 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1315 data.dptr = (uint8_t *)map;
1317 ret = ctdb_control(ctdb, destnode, 0,
1318 CTDB_CONTROL_SETVNNMAP, 0, data,
1319 NULL, NULL, &res, &timeout, NULL);
1320 if (ret != 0 || res != 0) {
1321 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1332 async send for pull database
1334 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1335 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1336 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1339 struct ctdb_control_pulldb *pull;
1340 struct ctdb_client_control_state *state;
1342 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1343 CTDB_NO_MEMORY_NULL(ctdb, pull);
1346 pull->lmaster = lmaster;
1348 indata.dsize = sizeof(struct ctdb_control_pulldb);
1349 indata.dptr = (unsigned char *)pull;
1351 state = ctdb_control_send(ctdb, destnode, 0,
1352 CTDB_CONTROL_PULL_DB, 0, indata,
1353 mem_ctx, &timeout, NULL);
1360 async recv for pull database
1362 int ctdb_ctrl_pulldb_recv(
1363 struct ctdb_context *ctdb,
1364 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1370 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1371 if ( (ret != 0) || (res != 0) ){
1372 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1380 pull all keys and records for a specific database on a node
1382 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1383 uint32_t dbid, uint32_t lmaster,
1384 TALLOC_CTX *mem_ctx, struct timeval timeout,
1387 struct ctdb_client_control_state *state;
1389 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1392 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1397 change dmaster for all keys in the database to the new value
1399 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1400 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1406 indata.dsize = 2*sizeof(uint32_t);
1407 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1409 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1410 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1412 ret = ctdb_control(ctdb, destnode, 0,
1413 CTDB_CONTROL_SET_DMASTER, 0, indata,
1414 NULL, NULL, &res, &timeout, NULL);
1415 if (ret != 0 || res != 0) {
1416 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1424 ping a node, return number of clients connected
1426 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1431 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1432 tdb_null, NULL, NULL, &res, NULL, NULL);
1440 find the real path to a ltdb
1442 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1449 data.dptr = (uint8_t *)&dbid;
1450 data.dsize = sizeof(dbid);
1452 ret = ctdb_control(ctdb, destnode, 0,
1453 CTDB_CONTROL_GETDBPATH, 0, data,
1454 mem_ctx, &data, &res, &timeout, NULL);
1455 if (ret != 0 || res != 0) {
1459 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1460 if ((*path) == NULL) {
1464 talloc_free(data.dptr);
1470 find the name of a db
1472 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1479 data.dptr = (uint8_t *)&dbid;
1480 data.dsize = sizeof(dbid);
1482 ret = ctdb_control(ctdb, destnode, 0,
1483 CTDB_CONTROL_GET_DBNAME, 0, data,
1484 mem_ctx, &data, &res, &timeout, NULL);
1485 if (ret != 0 || res != 0) {
1489 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1490 if ((*name) == NULL) {
1494 talloc_free(data.dptr);
1500 get the health status of a db
1502 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1503 struct timeval timeout,
1505 uint32_t dbid, TALLOC_CTX *mem_ctx,
1506 const char **reason)
1512 data.dptr = (uint8_t *)&dbid;
1513 data.dsize = sizeof(dbid);
1515 ret = ctdb_control(ctdb, destnode, 0,
1516 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1517 mem_ctx, &data, &res, &timeout, NULL);
1518 if (ret != 0 || res != 0) {
1522 if (data.dsize == 0) {
1527 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1528 if ((*reason) == NULL) {
1532 talloc_free(data.dptr);
1540 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1541 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1547 data.dptr = discard_const(name);
1548 data.dsize = strlen(name)+1;
1550 ret = ctdb_control(ctdb, destnode, 0,
1551 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1553 mem_ctx, &data, &res, &timeout, NULL);
1555 if (ret != 0 || res != 0) {
1563 get debug level on a node
1565 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1571 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1572 ctdb, &data, &res, NULL, NULL);
1573 if (ret != 0 || res != 0) {
1576 if (data.dsize != sizeof(int32_t)) {
1577 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1578 (unsigned)data.dsize));
1581 *level = *(int32_t *)data.dptr;
1582 talloc_free(data.dptr);
1587 set debug level on a node
1589 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1595 data.dptr = (uint8_t *)&level;
1596 data.dsize = sizeof(level);
1598 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1599 NULL, NULL, &res, NULL, NULL);
1600 if (ret != 0 || res != 0) {
1608 get a list of connected nodes
1610 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1611 struct timeval timeout,
1612 TALLOC_CTX *mem_ctx,
1613 uint32_t *num_nodes)
1615 struct ctdb_node_map *map=NULL;
1621 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1626 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1627 if (nodes == NULL) {
1631 for (i=0;i<map->num;i++) {
1632 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1633 nodes[*num_nodes] = map->nodes[i].pnn;
1645 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1650 ret = ctdb_control(ctdb, destnode, 0,
1651 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1652 NULL, NULL, &res, NULL, NULL);
1653 if (ret != 0 || res != 0) {
1654 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1661 this is the dummy null procedure that all databases support
1663 static int ctdb_null_func(struct ctdb_call_info *call)
1669 this is a plain fetch procedure that all databases support
1671 static int ctdb_fetch_func(struct ctdb_call_info *call)
1673 call->reply_data = &call->record_data;
1678 attach to a specific database - client call
1680 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1682 struct ctdb_db_context *ctdb_db;
1687 ctdb_db = ctdb_db_handle(ctdb, name);
1692 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1693 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1695 ctdb_db->ctdb = ctdb;
1696 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1697 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1699 data.dptr = discard_const(name);
1700 data.dsize = strlen(name)+1;
1702 /* tell ctdb daemon to attach */
1703 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1704 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1705 0, data, ctdb_db, &data, &res, NULL, NULL);
1706 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1707 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1708 talloc_free(ctdb_db);
1712 ctdb_db->db_id = *(uint32_t *)data.dptr;
1713 talloc_free(data.dptr);
1715 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1717 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1718 talloc_free(ctdb_db);
1722 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1723 if (ctdb->valgrinding) {
1724 tdb_flags |= TDB_NOMMAP;
1726 tdb_flags |= TDB_DISALLOW_NESTING;
1728 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1729 if (ctdb_db->ltdb == NULL) {
1730 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1731 talloc_free(ctdb_db);
1735 ctdb_db->persistent = persistent;
1737 DLIST_ADD(ctdb->db_list, ctdb_db);
1739 /* add well known functions */
1740 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1741 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1748 setup a call for a database
1750 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1752 struct ctdb_registered_call *call;
1757 struct ctdb_control_set_call c;
1760 /* this is no longer valid with the separate daemon architecture */
1761 c.db_id = ctdb_db->db_id;
1765 data.dptr = (uint8_t *)&c;
1766 data.dsize = sizeof(c);
1768 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1769 data, NULL, NULL, &status, NULL, NULL);
1770 if (ret != 0 || status != 0) {
1771 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1776 /* also register locally */
1777 call = talloc(ctdb_db, struct ctdb_registered_call);
1781 DLIST_ADD(ctdb_db->calls, call);
1786 struct traverse_state {
1789 ctdb_traverse_func fn;
1791 bool listemptyrecords;
1795 called on each key during a ctdb_traverse
1797 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1799 struct traverse_state *state = (struct traverse_state *)p;
1800 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1803 if (data.dsize < sizeof(uint32_t) ||
1804 d->length != data.dsize) {
1805 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1810 key.dsize = d->keylen;
1811 key.dptr = &d->data[0];
1812 data.dsize = d->datalen;
1813 data.dptr = &d->data[d->keylen];
1815 if (key.dsize == 0 && data.dsize == 0) {
1816 /* end of traverse */
1821 if (!state->listemptyrecords &&
1822 data.dsize == sizeof(struct ctdb_ltdb_header))
1824 /* empty records are deleted records in ctdb */
1828 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1836 * start a cluster wide traverse, calling the supplied fn on each record
1837 * return the number of records traversed, or -1 on error
1839 * Extendet variant with a flag to signal whether empty records should
1842 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
1843 ctdb_traverse_func fn,
1844 bool withemptyrecords,
1848 struct ctdb_traverse_start_ext t;
1851 uint64_t srvid = (getpid() | 0xFLL<<60);
1852 struct traverse_state state;
1856 state.private_data = private_data;
1858 state.listemptyrecords = withemptyrecords;
1860 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1862 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1866 t.db_id = ctdb_db->db_id;
1869 t.withemptyrecords = withemptyrecords;
1871 data.dptr = (uint8_t *)&t;
1872 data.dsize = sizeof(t);
1874 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
1875 data, NULL, NULL, &status, NULL, NULL);
1876 if (ret != 0 || status != 0) {
1877 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1878 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1882 while (!state.done) {
1883 event_loop_once(ctdb_db->ctdb->ev);
1886 ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1888 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1896 * start a cluster wide traverse, calling the supplied fn on each record
1897 * return the number of records traversed, or -1 on error
1899 * Standard version which does not list the empty records:
1900 * These are considered deleted.
1902 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1904 return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
1907 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1909 called on each key during a catdb
1911 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1914 struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
1916 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1918 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1919 for (i=0;i<key.dsize;i++) {
1920 if (ISASCII(key.dptr[i])) {
1921 fprintf(f, "%c", key.dptr[i]);
1923 fprintf(f, "\\%02X", key.dptr[i]);
1928 fprintf(f, "dmaster: %u\n", h->dmaster);
1929 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1931 if (c->printlmaster && ctdb->vnn_map != NULL) {
1932 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
1936 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
1937 fprintf(f, "jenkins hash: 0x%08x\n", (uint32_t)tdb_jenkins_hash(&key));
1940 if (c->printrecordflags) {
1941 fprintf(f, "flags: 0x%08x", h->flags);
1942 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
1943 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
1944 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
1948 if (c->printdatasize) {
1949 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
1951 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1952 for (i=sizeof(*h);i<data.dsize;i++) {
1953 if (ISASCII(data.dptr[i])) {
1954 fprintf(f, "%c", data.dptr[i]);
1956 fprintf(f, "\\%02X", data.dptr[i]);
1968 convenience function to list all keys to stdout
1970 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
1971 struct ctdb_dump_db_context *ctx)
1973 return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
1974 ctx->printemptyrecords, ctx);
1978 get the pid of a ctdb daemon
1980 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1985 ret = ctdb_control(ctdb, destnode, 0,
1986 CTDB_CONTROL_GET_PID, 0, tdb_null,
1987 NULL, NULL, &res, &timeout, NULL);
1989 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2000 async freeze send control
2002 struct ctdb_client_control_state *
2003 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2005 return ctdb_control_send(ctdb, destnode, priority,
2006 CTDB_CONTROL_FREEZE, 0, tdb_null,
2007 mem_ctx, &timeout, NULL);
2011 async freeze recv control
2013 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2018 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2019 if ( (ret != 0) || (res != 0) ){
2020 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2028 freeze databases of a certain priority
2030 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2032 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2033 struct ctdb_client_control_state *state;
2036 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2037 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2038 talloc_free(tmp_ctx);
2043 /* Freeze all databases */
2044 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2048 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2049 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2057 thaw databases of a certain priority
2059 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2064 ret = ctdb_control(ctdb, destnode, priority,
2065 CTDB_CONTROL_THAW, 0, tdb_null,
2066 NULL, NULL, &res, &timeout, NULL);
2067 if (ret != 0 || res != 0) {
2068 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2075 /* thaw all databases */
2076 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2078 return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2082 get pnn of a node, or -1
2084 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2089 ret = ctdb_control(ctdb, destnode, 0,
2090 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2091 NULL, NULL, &res, &timeout, NULL);
2093 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2101 get the monitoring mode of a remote node
2103 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2108 ret = ctdb_control(ctdb, destnode, 0,
2109 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2110 NULL, NULL, &res, &timeout, NULL);
2112 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2123 set the monitoring mode of a remote node to active
2125 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2130 ret = ctdb_control(ctdb, destnode, 0,
2131 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2132 NULL, NULL,NULL, &timeout, NULL);
2134 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2144 set the monitoring mode of a remote node to disable
2146 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2151 ret = ctdb_control(ctdb, destnode, 0,
2152 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2153 NULL, NULL, NULL, &timeout, NULL);
2155 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2167 sent to a node to make it take over an ip address
2169 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2170 uint32_t destnode, struct ctdb_public_ip *ip)
2173 struct ctdb_public_ipv4 ipv4;
2177 if (ip->addr.sa.sa_family == AF_INET) {
2179 ipv4.sin = ip->addr.ip;
2181 data.dsize = sizeof(ipv4);
2182 data.dptr = (uint8_t *)&ipv4;
2184 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2185 NULL, &res, &timeout, NULL);
2187 data.dsize = sizeof(*ip);
2188 data.dptr = (uint8_t *)ip;
2190 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2191 NULL, &res, &timeout, NULL);
2194 if (ret != 0 || res != 0) {
2195 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2204 sent to a node to make it release an ip address
2206 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2207 uint32_t destnode, struct ctdb_public_ip *ip)
2210 struct ctdb_public_ipv4 ipv4;
2214 if (ip->addr.sa.sa_family == AF_INET) {
2216 ipv4.sin = ip->addr.ip;
2218 data.dsize = sizeof(ipv4);
2219 data.dptr = (uint8_t *)&ipv4;
2221 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2222 NULL, &res, &timeout, NULL);
2224 data.dsize = sizeof(*ip);
2225 data.dptr = (uint8_t *)ip;
2227 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2228 NULL, &res, &timeout, NULL);
2231 if (ret != 0 || res != 0) {
2232 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2243 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2244 struct timeval timeout,
2246 const char *name, uint32_t *value)
2248 struct ctdb_control_get_tunable *t;
2249 TDB_DATA data, outdata;
2253 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2254 data.dptr = talloc_size(ctdb, data.dsize);
2255 CTDB_NO_MEMORY(ctdb, data.dptr);
2257 t = (struct ctdb_control_get_tunable *)data.dptr;
2258 t->length = strlen(name)+1;
2259 memcpy(t->name, name, t->length);
2261 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2262 &outdata, &res, &timeout, NULL);
2263 talloc_free(data.dptr);
2264 if (ret != 0 || res != 0) {
2265 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2269 if (outdata.dsize != sizeof(uint32_t)) {
2270 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2271 talloc_free(outdata.dptr);
2275 *value = *(uint32_t *)outdata.dptr;
2276 talloc_free(outdata.dptr);
2284 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2285 struct timeval timeout,
2287 const char *name, uint32_t value)
2289 struct ctdb_control_set_tunable *t;
2294 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2295 data.dptr = talloc_size(ctdb, data.dsize);
2296 CTDB_NO_MEMORY(ctdb, data.dptr);
2298 t = (struct ctdb_control_set_tunable *)data.dptr;
2299 t->length = strlen(name)+1;
2300 memcpy(t->name, name, t->length);
2303 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2304 NULL, &res, &timeout, NULL);
2305 talloc_free(data.dptr);
2306 if (ret != 0 || res != 0) {
2307 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2317 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2318 struct timeval timeout,
2320 TALLOC_CTX *mem_ctx,
2321 const char ***list, uint32_t *count)
2326 struct ctdb_control_list_tunable *t;
2329 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2330 mem_ctx, &outdata, &res, &timeout, NULL);
2331 if (ret != 0 || res != 0) {
2332 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2336 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2337 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2338 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2339 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2340 talloc_free(outdata.dptr);
2344 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2345 CTDB_NO_MEMORY(ctdb, p);
2347 talloc_free(outdata.dptr);
2352 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2353 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2354 CTDB_NO_MEMORY(ctdb, *list);
2355 (*list)[*count] = talloc_strdup(*list, s);
2356 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2366 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2367 struct timeval timeout, uint32_t destnode,
2368 TALLOC_CTX *mem_ctx,
2370 struct ctdb_all_public_ips **ips)
2376 ret = ctdb_control(ctdb, destnode, 0,
2377 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2378 mem_ctx, &outdata, &res, &timeout, NULL);
2379 if (ret == 0 && res == -1) {
2380 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2381 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2383 if (ret != 0 || res != 0) {
2384 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2388 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2389 talloc_free(outdata.dptr);
2394 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2395 struct timeval timeout, uint32_t destnode,
2396 TALLOC_CTX *mem_ctx,
2397 struct ctdb_all_public_ips **ips)
2399 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2404 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2405 struct timeval timeout, uint32_t destnode,
2406 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2411 struct ctdb_all_public_ipsv4 *ipsv4;
2413 ret = ctdb_control(ctdb, destnode, 0,
2414 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2415 mem_ctx, &outdata, &res, &timeout, NULL);
2416 if (ret != 0 || res != 0) {
2417 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2421 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2422 len = offsetof(struct ctdb_all_public_ips, ips) +
2423 ipsv4->num*sizeof(struct ctdb_public_ip);
2424 *ips = talloc_zero_size(mem_ctx, len);
2425 CTDB_NO_MEMORY(ctdb, *ips);
2426 (*ips)->num = ipsv4->num;
2427 for (i=0; i<ipsv4->num; i++) {
2428 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2429 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2432 talloc_free(outdata.dptr);
2437 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2438 struct timeval timeout, uint32_t destnode,
2439 TALLOC_CTX *mem_ctx,
2440 const ctdb_sock_addr *addr,
2441 struct ctdb_control_public_ip_info **_info)
2447 struct ctdb_control_public_ip_info *info;
2451 indata.dptr = discard_const_p(uint8_t, addr);
2452 indata.dsize = sizeof(*addr);
2454 ret = ctdb_control(ctdb, destnode, 0,
2455 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2456 mem_ctx, &outdata, &res, &timeout, NULL);
2457 if (ret != 0 || res != 0) {
2458 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2459 "failed ret:%d res:%d\n",
2464 len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2465 if (len > outdata.dsize) {
2466 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2467 "returned invalid data with size %u > %u\n",
2468 (unsigned int)outdata.dsize,
2469 (unsigned int)len));
2470 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2474 info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2475 len += info->num*sizeof(struct ctdb_control_iface_info);
2477 if (len > outdata.dsize) {
2478 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2479 "returned invalid data with size %u > %u\n",
2480 (unsigned int)outdata.dsize,
2481 (unsigned int)len));
2482 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2486 /* make sure we null terminate the returned strings */
2487 for (i=0; i < info->num; i++) {
2488 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2491 *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2494 talloc_free(outdata.dptr);
2495 if (*_info == NULL) {
2496 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2497 "talloc_memdup size %u failed\n",
2498 (unsigned int)outdata.dsize));
2505 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2506 struct timeval timeout, uint32_t destnode,
2507 TALLOC_CTX *mem_ctx,
2508 struct ctdb_control_get_ifaces **_ifaces)
2513 struct ctdb_control_get_ifaces *ifaces;
2517 ret = ctdb_control(ctdb, destnode, 0,
2518 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2519 mem_ctx, &outdata, &res, &timeout, NULL);
2520 if (ret != 0 || res != 0) {
2521 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2522 "failed ret:%d res:%d\n",
2527 len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2528 if (len > outdata.dsize) {
2529 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2530 "returned invalid data with size %u > %u\n",
2531 (unsigned int)outdata.dsize,
2532 (unsigned int)len));
2533 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2537 ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2538 len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2540 if (len > outdata.dsize) {
2541 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2542 "returned invalid data with size %u > %u\n",
2543 (unsigned int)outdata.dsize,
2544 (unsigned int)len));
2545 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2549 /* make sure we null terminate the returned strings */
2550 for (i=0; i < ifaces->num; i++) {
2551 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2554 *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2557 talloc_free(outdata.dptr);
2558 if (*_ifaces == NULL) {
2559 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2560 "talloc_memdup size %u failed\n",
2561 (unsigned int)outdata.dsize));
2568 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2569 struct timeval timeout, uint32_t destnode,
2570 TALLOC_CTX *mem_ctx,
2571 const struct ctdb_control_iface_info *info)
2577 indata.dptr = discard_const_p(uint8_t, info);
2578 indata.dsize = sizeof(*info);
2580 ret = ctdb_control(ctdb, destnode, 0,
2581 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2582 mem_ctx, NULL, &res, &timeout, NULL);
2583 if (ret != 0 || res != 0) {
2584 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2585 "failed ret:%d res:%d\n",
2594 set/clear the permanent disabled bit on a remote node
2596 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2597 uint32_t set, uint32_t clear)
2601 struct ctdb_node_map *nodemap=NULL;
2602 struct ctdb_node_flag_change c;
2603 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2608 /* find the recovery master */
2609 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2611 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2612 talloc_free(tmp_ctx);
2617 /* read the node flags from the recmaster */
2618 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2620 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2621 talloc_free(tmp_ctx);
2624 if (destnode >= nodemap->num) {
2625 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2626 talloc_free(tmp_ctx);
2631 c.old_flags = nodemap->nodes[destnode].flags;
2632 c.new_flags = c.old_flags;
2634 c.new_flags &= ~clear;
2636 data.dsize = sizeof(c);
2637 data.dptr = (unsigned char *)&c;
2639 /* send the flags update to all connected nodes */
2640 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2642 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2644 timeout, false, data,
2647 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2649 talloc_free(tmp_ctx);
2653 talloc_free(tmp_ctx);
2661 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2662 struct timeval timeout,
2664 struct ctdb_tunable *tunables)
2670 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2671 &outdata, &res, &timeout, NULL);
2672 if (ret != 0 || res != 0) {
2673 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2677 if (outdata.dsize != sizeof(*tunables)) {
2678 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2679 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2683 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2684 talloc_free(outdata.dptr);
2689 add a public address to a node
2691 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2692 struct timeval timeout,
2694 struct ctdb_control_ip_iface *pub)
2700 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2701 data.dptr = (unsigned char *)pub;
2703 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2704 NULL, &res, &timeout, NULL);
2705 if (ret != 0 || res != 0) {
2706 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2714 delete a public address from a node
2716 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2717 struct timeval timeout,
2719 struct ctdb_control_ip_iface *pub)
2725 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2726 data.dptr = (unsigned char *)pub;
2728 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2729 NULL, &res, &timeout, NULL);
2730 if (ret != 0 || res != 0) {
2731 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2739 kill a tcp connection
2741 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2742 struct timeval timeout,
2744 struct ctdb_control_killtcp *killtcp)
2750 data.dsize = sizeof(struct ctdb_control_killtcp);
2751 data.dptr = (unsigned char *)killtcp;
2753 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2754 NULL, &res, &timeout, NULL);
2755 if (ret != 0 || res != 0) {
2756 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2766 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2767 struct timeval timeout,
2769 ctdb_sock_addr *addr,
2775 struct ctdb_control_gratious_arp *gratious_arp;
2776 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2779 len = strlen(ifname)+1;
2780 gratious_arp = talloc_size(tmp_ctx,
2781 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2782 CTDB_NO_MEMORY(ctdb, gratious_arp);
2784 gratious_arp->addr = *addr;
2785 gratious_arp->len = len;
2786 memcpy(&gratious_arp->iface[0], ifname, len);
2789 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2790 data.dptr = (unsigned char *)gratious_arp;
2792 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2793 NULL, &res, &timeout, NULL);
2794 if (ret != 0 || res != 0) {
2795 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2796 talloc_free(tmp_ctx);
2800 talloc_free(tmp_ctx);
2805 get a list of all tcp tickles that a node knows about for a particular vnn
2807 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2808 struct timeval timeout, uint32_t destnode,
2809 TALLOC_CTX *mem_ctx,
2810 ctdb_sock_addr *addr,
2811 struct ctdb_control_tcp_tickle_list **list)
2814 TDB_DATA data, outdata;
2817 data.dptr = (uint8_t*)addr;
2818 data.dsize = sizeof(ctdb_sock_addr);
2820 ret = ctdb_control(ctdb, destnode, 0,
2821 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2822 mem_ctx, &outdata, &status, NULL, NULL);
2823 if (ret != 0 || status != 0) {
2824 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2828 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2834 register a server id
2836 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2837 struct timeval timeout,
2838 struct ctdb_server_id *id)
2844 data.dsize = sizeof(struct ctdb_server_id);
2845 data.dptr = (unsigned char *)id;
2847 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2848 CTDB_CONTROL_REGISTER_SERVER_ID,
2850 NULL, &res, &timeout, NULL);
2851 if (ret != 0 || res != 0) {
2852 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2860 unregister a server id
2862 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2863 struct timeval timeout,
2864 struct ctdb_server_id *id)
2870 data.dsize = sizeof(struct ctdb_server_id);
2871 data.dptr = (unsigned char *)id;
2873 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2874 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2876 NULL, &res, &timeout, NULL);
2877 if (ret != 0 || res != 0) {
2878 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2887 check if a server id exists
2889 if a server id does exist, return *status == 1, otherwise *status == 0
2891 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2892 struct timeval timeout,
2894 struct ctdb_server_id *id,
2901 data.dsize = sizeof(struct ctdb_server_id);
2902 data.dptr = (unsigned char *)id;
2904 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2906 NULL, &res, &timeout, NULL);
2908 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2922 get the list of server ids that are registered on a node
2924 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2925 TALLOC_CTX *mem_ctx,
2926 struct timeval timeout, uint32_t destnode,
2927 struct ctdb_server_id_list **svid_list)
2933 ret = ctdb_control(ctdb, destnode, 0,
2934 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2935 mem_ctx, &outdata, &res, &timeout, NULL);
2936 if (ret != 0 || res != 0) {
2937 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2941 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2947 initialise the ctdb daemon for client applications
2949 NOTE: In current code the daemon does not fork. This is for testing purposes only
2950 and to simplify the code.
2952 struct ctdb_context *ctdb_init(struct event_context *ev)
2955 struct ctdb_context *ctdb;
2957 ctdb = talloc_zero(ev, struct ctdb_context);
2959 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2963 ctdb->idr = idr_init(ctdb);
2964 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2966 ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2968 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2980 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2982 ctdb->flags |= flags;
2986 setup the local socket name
2988 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2990 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2991 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2997 return the pnn of this node
2999 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3006 get the uptime of a remote node
3008 struct ctdb_client_control_state *
3009 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3011 return ctdb_control_send(ctdb, destnode, 0,
3012 CTDB_CONTROL_UPTIME, 0, tdb_null,
3013 mem_ctx, &timeout, NULL);
3016 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3022 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3023 if (ret != 0 || res != 0) {
3024 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3028 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3033 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3035 struct ctdb_client_control_state *state;
3037 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3038 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3042 send a control to execute the "recovered" event script on a node
3044 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3049 ret = ctdb_control(ctdb, destnode, 0,
3050 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3051 NULL, NULL, &status, &timeout, NULL);
3052 if (ret != 0 || status != 0) {
3053 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3061 callback for the async helpers used when sending the same control
3062 to multiple nodes in parallell.
3064 static void async_callback(struct ctdb_client_control_state *state)
3066 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3067 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3071 uint32_t destnode = state->c->hdr.destnode;
3073 /* one more node has responded with recmode data */
3076 /* if we failed to push the db, then return an error and let
3077 the main loop try again.
3079 if (state->state != CTDB_CONTROL_DONE) {
3080 if ( !data->dont_log_errors) {
3081 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3084 if (data->fail_callback) {
3085 data->fail_callback(ctdb, destnode, res, outdata,
3086 data->callback_data);
3091 state->async.fn = NULL;
3093 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3094 if ((ret != 0) || (res != 0)) {
3095 if ( !data->dont_log_errors) {
3096 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3099 if (data->fail_callback) {
3100 data->fail_callback(ctdb, destnode, res, outdata,
3101 data->callback_data);
3104 if ((ret == 0) && (data->callback != NULL)) {
3105 data->callback(ctdb, destnode, res, outdata,
3106 data->callback_data);
3111 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3113 /* set up the callback functions */
3114 state->async.fn = async_callback;
3115 state->async.private_data = data;
3117 /* one more control to wait for to complete */
3122 /* wait for up to the maximum number of seconds allowed
3123 or until all nodes we expect a response from has replied
3125 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3127 while (data->count > 0) {
3128 event_loop_once(ctdb->ev);
3130 if (data->fail_count != 0) {
3131 if (!data->dont_log_errors) {
3132 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3142 perform a simple control on the listed nodes
3143 The control cannot return data
3145 int ctdb_client_async_control(struct ctdb_context *ctdb,
3146 enum ctdb_controls opcode,
3149 struct timeval timeout,
3150 bool dont_log_errors,
3152 client_async_callback client_callback,
3153 client_async_callback fail_callback,
3154 void *callback_data)
3156 struct client_async_data *async_data;
3157 struct ctdb_client_control_state *state;
3160 async_data = talloc_zero(ctdb, struct client_async_data);
3161 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3162 async_data->dont_log_errors = dont_log_errors;
3163 async_data->callback = client_callback;
3164 async_data->fail_callback = fail_callback;
3165 async_data->callback_data = callback_data;
3166 async_data->opcode = opcode;
3168 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3170 /* loop over all nodes and send an async control to each of them */
3171 for (j=0; j<num_nodes; j++) {
3172 uint32_t pnn = nodes[j];
3174 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3175 0, data, async_data, &timeout, NULL);
3176 if (state == NULL) {
3177 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3178 talloc_free(async_data);
3182 ctdb_client_async_add(async_data, state);
3185 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3186 talloc_free(async_data);
3190 talloc_free(async_data);
3194 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3195 struct ctdb_vnn_map *vnn_map,
3196 TALLOC_CTX *mem_ctx,
3199 int i, j, num_nodes;
3202 for (i=num_nodes=0;i<vnn_map->size;i++) {
3203 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3209 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3210 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3212 for (i=j=0;i<vnn_map->size;i++) {
3213 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3216 nodes[j++] = vnn_map->map[i];
3222 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3223 struct ctdb_node_map *node_map,
3224 TALLOC_CTX *mem_ctx,
3227 int i, j, num_nodes;
3230 for (i=num_nodes=0;i<node_map->num;i++) {
3231 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3234 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3240 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3241 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3243 for (i=j=0;i<node_map->num;i++) {
3244 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3247 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3250 nodes[j++] = node_map->nodes[i].pnn;
3256 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3257 struct ctdb_node_map *node_map,
3258 TALLOC_CTX *mem_ctx,
3261 int i, j, num_nodes;
3264 for (i=num_nodes=0;i<node_map->num;i++) {
3265 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3268 if (node_map->nodes[i].pnn == pnn) {
3274 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3275 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3277 for (i=j=0;i<node_map->num;i++) {
3278 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3281 if (node_map->nodes[i].pnn == pnn) {
3284 nodes[j++] = node_map->nodes[i].pnn;
3290 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3291 struct ctdb_node_map *node_map,
3292 TALLOC_CTX *mem_ctx,
3295 int i, j, num_nodes;
3298 for (i=num_nodes=0;i<node_map->num;i++) {
3299 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3302 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3308 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3309 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3311 for (i=j=0;i<node_map->num;i++) {
3312 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3315 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3318 nodes[j++] = node_map->nodes[i].pnn;
3325 this is used to test if a pnn lock exists and if it exists will return
3326 the number of connections that pnn has reported or -1 if that recovery
3327 daemon is not running.
3330 ctdb_read_pnn_lock(int fd, int32_t pnn)
3335 lock.l_type = F_WRLCK;
3336 lock.l_whence = SEEK_SET;
3341 if (fcntl(fd, F_GETLK, &lock) != 0) {
3342 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3346 if (lock.l_type == F_UNLCK) {
3350 if (pread(fd, &c, 1, pnn) == -1) {
3351 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3359 get capabilities of a remote node
3361 struct ctdb_client_control_state *
3362 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3364 return ctdb_control_send(ctdb, destnode, 0,
3365 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3366 mem_ctx, &timeout, NULL);
3369 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3375 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3376 if ( (ret != 0) || (res != 0) ) {
3377 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3382 *capabilities = *((uint32_t *)outdata.dptr);
3388 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3390 struct ctdb_client_control_state *state;
3391 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3394 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3395 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3396 talloc_free(tmp_ctx);
3401 * check whether a transaction is active on a given db on a given node
3403 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3411 indata.dptr = (uint8_t *)&db_id;
3412 indata.dsize = sizeof(db_id);
3414 ret = ctdb_control(ctdb, destnode, 0,
3415 CTDB_CONTROL_TRANS2_ACTIVE,
3416 0, indata, NULL, NULL, &status,
3420 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3428 struct ctdb_transaction_handle {
3429 struct ctdb_db_context *ctdb_db;
3432 * we store the reads and writes done under a transaction:
3433 * - one list stores both reads and writes (m_all),
3434 * - the other just writes (m_write)
3436 struct ctdb_marshall_buffer *m_all;
3437 struct ctdb_marshall_buffer *m_write;
3440 /* start a transaction on a database */
3441 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3443 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3447 /* start a transaction on a database */
3448 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3450 struct ctdb_record_handle *rh;
3453 struct ctdb_ltdb_header header;
3454 TALLOC_CTX *tmp_ctx;
3455 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3457 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3461 key.dptr = discard_const(keyname);
3462 key.dsize = strlen(keyname);
3464 if (!ctdb_db->persistent) {
3465 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3470 tmp_ctx = talloc_new(h);
3472 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3474 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3475 talloc_free(tmp_ctx);
3479 status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3483 unsigned long int usec = (1000 + random()) % 100000;
3484 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3485 "on db_id[0x%08x]. waiting for %lu "
3487 ctdb_db->db_id, usec));
3488 talloc_free(tmp_ctx);
3494 * store the pid in the database:
3495 * it is not enough that the node is dmaster...
3498 data.dptr = (unsigned char *)&pid;
3499 data.dsize = sizeof(pid_t);
3501 rh->header.dmaster = ctdb_db->ctdb->pnn;
3502 ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3504 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3505 "transaction record\n"));
3506 talloc_free(tmp_ctx);
3512 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3514 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3515 talloc_free(tmp_ctx);
3519 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3521 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3522 "lock record inside transaction\n"));
3523 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3524 talloc_free(tmp_ctx);
3528 if (header.dmaster != ctdb_db->ctdb->pnn) {
3529 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3530 "transaction lock record\n"));
3531 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3532 talloc_free(tmp_ctx);
3536 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3537 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3538 "the transaction lock record\n"));
3539 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3540 talloc_free(tmp_ctx);
3544 talloc_free(tmp_ctx);
3550 /* start a transaction on a database */
3551 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3552 TALLOC_CTX *mem_ctx)
3554 struct ctdb_transaction_handle *h;
3557 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3559 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3563 h->ctdb_db = ctdb_db;
3565 ret = ctdb_transaction_fetch_start(h);
3571 talloc_set_destructor(h, ctdb_transaction_destructor);
3579 fetch a record inside a transaction
3581 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3582 TALLOC_CTX *mem_ctx,
3583 TDB_DATA key, TDB_DATA *data)
3585 struct ctdb_ltdb_header header;
3588 ZERO_STRUCT(header);
3590 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3591 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3592 /* record doesn't exist yet */
3601 if (!h->in_replay) {
3602 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3603 if (h->m_all == NULL) {
3604 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3613 stores a record inside a transaction
3615 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3616 TDB_DATA key, TDB_DATA data)
3618 TALLOC_CTX *tmp_ctx = talloc_new(h);
3619 struct ctdb_ltdb_header header;
3623 ZERO_STRUCT(header);
3625 /* we need the header so we can update the RSN */
3626 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3627 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3628 /* the record doesn't exist - create one with us as dmaster.
3629 This is only safe because we are in a transaction and this
3630 is a persistent database */
3631 ZERO_STRUCT(header);
3632 } else if (ret != 0) {
3633 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3634 talloc_free(tmp_ctx);
3638 if (data.dsize == olddata.dsize &&
3639 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3640 /* save writing the same data */
3641 talloc_free(tmp_ctx);
3645 header.dmaster = h->ctdb_db->ctdb->pnn;
3648 if (!h->in_replay) {
3649 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3650 if (h->m_all == NULL) {
3651 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3652 talloc_free(tmp_ctx);
3657 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3658 if (h->m_write == NULL) {
3659 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3660 talloc_free(tmp_ctx);
3664 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3666 talloc_free(tmp_ctx);
3672 replay a transaction
3674 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3677 struct ctdb_rec_data *rec = NULL;
3679 h->in_replay = true;
3680 talloc_free(h->m_write);
3683 ret = ctdb_transaction_fetch_start(h);
3688 for (i=0;i<h->m_all->count;i++) {
3691 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3693 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3697 if (rec->reqid == 0) {
3699 if (ctdb_transaction_store(h, key, data) != 0) {
3704 TALLOC_CTX *tmp_ctx = talloc_new(h);
3706 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3707 talloc_free(tmp_ctx);
3710 if (data2.dsize != data.dsize ||
3711 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3712 /* the record has changed on us - we have to give up */
3713 talloc_free(tmp_ctx);
3716 talloc_free(tmp_ctx);
3723 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3729 commit a transaction
3731 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3735 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3736 struct timeval timeout;
3737 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3739 talloc_set_destructor(h, NULL);
3741 /* our commit strategy is quite complex.
3743 - we first try to commit the changes to all other nodes
3745 - if that works, then we commit locally and we are done
3747 - if a commit on another node fails, then we need to cancel
3748 the transaction, then restart the transaction (thus
3749 opening a window of time for a pending recovery to
3750 complete), then replay the transaction, checking all the
3751 reads and writes (checking that reads give the same data,
3752 and writes succeed). Then we retry the transaction to the
3757 if (h->m_write == NULL) {
3758 /* no changes were made */
3759 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3764 /* tell ctdbd to commit to the other nodes */
3765 timeout = timeval_current_ofs(1, 0);
3766 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3767 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3768 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3770 if (ret != 0 || status != 0) {
3771 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3772 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3773 ", retrying after 1 second...\n",
3774 (retries==0)?"":"retry "));
3778 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3780 /* work out what error code we will give if we
3781 have to fail the operation */
3782 switch ((enum ctdb_trans2_commit_error)status) {
3783 case CTDB_TRANS2_COMMIT_SUCCESS:
3784 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3785 case CTDB_TRANS2_COMMIT_TIMEOUT:
3786 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3788 case CTDB_TRANS2_COMMIT_ALLFAIL:
3789 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3794 if (++retries == 100) {
3795 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3796 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3797 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3798 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3799 tdb_null, NULL, NULL, NULL, NULL, NULL);
3804 if (ctdb_replay_transaction(h) != 0) {
3805 DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3806 "transaction on db 0x%08x, "
3807 "failure control =%u\n",
3809 (unsigned)failure_control));
3810 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3811 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3812 tdb_null, NULL, NULL, NULL, NULL, NULL);
3818 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3821 /* do the real commit locally */
3822 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3824 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3825 "on db id 0x%08x locally, "
3826 "failure_control=%u\n",
3828 (unsigned)failure_control));
3829 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3830 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3831 tdb_null, NULL, NULL, NULL, NULL, NULL);
3836 /* tell ctdbd that we are finished with our local commit */
3837 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3838 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3839 tdb_null, NULL, NULL, NULL, NULL, NULL);
3845 recovery daemon ping to main daemon
3847 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3852 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3853 ctdb, NULL, &res, NULL, NULL);
3854 if (ret != 0 || res != 0) {
3855 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3862 /* when forking the main daemon and the child process needs to connect back
3863 * to the daemon as a client process, this function can be used to change
3864 * the ctdb context from daemon into client mode
3866 int switch_from_server_to_client(struct ctdb_context *ctdb)
3870 /* shutdown the transport */
3871 if (ctdb->methods) {
3872 ctdb->methods->shutdown(ctdb);
3875 /* get a new event context */
3876 talloc_free(ctdb->ev);
3877 ctdb->ev = event_context_init(ctdb);
3879 close(ctdb->daemon.sd);
3880 ctdb->daemon.sd = -1;
3882 /* initialise ctdb */
3883 ret = ctdb_socket_connect(ctdb);
3885 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3893 get the status of running the monitor eventscripts: NULL means never run.
3895 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
3896 struct timeval timeout, uint32_t destnode,
3897 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3898 struct ctdb_scripts_wire **script_status)
3901 TDB_DATA outdata, indata;
3903 uint32_t uinttype = type;
3905 indata.dptr = (uint8_t *)&uinttype;
3906 indata.dsize = sizeof(uinttype);
3908 ret = ctdb_control(ctdb, destnode, 0,
3909 CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3910 mem_ctx, &outdata, &res, &timeout, NULL);
3911 if (ret != 0 || res != 0) {
3912 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3916 if (outdata.dsize == 0) {
3917 *script_status = NULL;
3919 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3920 talloc_free(outdata.dptr);
3927 tell the main daemon how long it took to lock the reclock file
3929 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3935 data.dptr = (uint8_t *)&latency;
3936 data.dsize = sizeof(latency);
3938 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
3939 ctdb, NULL, &res, NULL, NULL);
3940 if (ret != 0 || res != 0) {
3941 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3949 get the name of the reclock file
3951 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3952 uint32_t destnode, TALLOC_CTX *mem_ctx,
3959 ret = ctdb_control(ctdb, destnode, 0,
3960 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
3961 mem_ctx, &data, &res, &timeout, NULL);
3962 if (ret != 0 || res != 0) {
3966 if (data.dsize == 0) {
3969 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3971 talloc_free(data.dptr);
3977 set the reclock filename for a node
3979 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3985 if (reclock == NULL) {
3989 data.dsize = strlen(reclock) + 1;
3990 data.dptr = discard_const(reclock);
3993 ret = ctdb_control(ctdb, destnode, 0,
3994 CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
3995 NULL, NULL, &res, &timeout, NULL);
3996 if (ret != 0 || res != 0) {
3997 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4007 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4012 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
4013 ctdb, NULL, &res, &timeout, NULL);
4014 if (ret != 0 || res != 0) {
4015 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4025 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4029 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4030 ctdb, NULL, NULL, &timeout, NULL);
4032 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4040 set the natgw state for a node
4042 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4048 data.dsize = sizeof(natgwstate);
4049 data.dptr = (uint8_t *)&natgwstate;
4051 ret = ctdb_control(ctdb, destnode, 0,
4052 CTDB_CONTROL_SET_NATGWSTATE, 0, data,
4053 NULL, NULL, &res, &timeout, NULL);
4054 if (ret != 0 || res != 0) {
4055 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4063 set the lmaster role for a node
4065 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4071 data.dsize = sizeof(lmasterrole);
4072 data.dptr = (uint8_t *)&lmasterrole;
4074 ret = ctdb_control(ctdb, destnode, 0,
4075 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4076 NULL, NULL, &res, &timeout, NULL);
4077 if (ret != 0 || res != 0) {
4078 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4086 set the recmaster role for a node
4088 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4094 data.dsize = sizeof(recmasterrole);
4095 data.dptr = (uint8_t *)&recmasterrole;
4097 ret = ctdb_control(ctdb, destnode, 0,
4098 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4099 NULL, NULL, &res, &timeout, NULL);
4100 if (ret != 0 || res != 0) {
4101 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4108 /* enable an eventscript
4110 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4116 data.dsize = strlen(script) + 1;
4117 data.dptr = discard_const(script);
4119 ret = ctdb_control(ctdb, destnode, 0,
4120 CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
4121 NULL, NULL, &res, &timeout, NULL);
4122 if (ret != 0 || res != 0) {
4123 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4130 /* disable an eventscript
4132 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4138 data.dsize = strlen(script) + 1;
4139 data.dptr = discard_const(script);
4141 ret = ctdb_control(ctdb, destnode, 0,
4142 CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
4143 NULL, NULL, &res, &timeout, NULL);
4144 if (ret != 0 || res != 0) {
4145 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4153 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4159 data.dsize = sizeof(*bantime);
4160 data.dptr = (uint8_t *)bantime;
4162 ret = ctdb_control(ctdb, destnode, 0,
4163 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4164 NULL, NULL, &res, &timeout, NULL);
4165 if (ret != 0 || res != 0) {
4166 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4174 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4179 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4181 ret = ctdb_control(ctdb, destnode, 0,
4182 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4183 tmp_ctx, &outdata, &res, &timeout, NULL);
4184 if (ret != 0 || res != 0) {
4185 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4186 talloc_free(tmp_ctx);
4190 *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4191 talloc_free(tmp_ctx);
4197 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4202 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4204 data.dptr = (uint8_t*)db_prio;
4205 data.dsize = sizeof(*db_prio);
4207 ret = ctdb_control(ctdb, destnode, 0,
4208 CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4209 tmp_ctx, NULL, &res, &timeout, NULL);
4210 if (ret != 0 || res != 0) {
4211 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4212 talloc_free(tmp_ctx);
4216 talloc_free(tmp_ctx);
4221 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4226 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4228 data.dptr = (uint8_t*)&db_id;
4229 data.dsize = sizeof(db_id);
4231 ret = ctdb_control(ctdb, destnode, 0,
4232 CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4233 tmp_ctx, NULL, &res, &timeout, NULL);
4234 if (ret != 0 || res < 0) {
4235 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4236 talloc_free(tmp_ctx);
4244 talloc_free(tmp_ctx);
4249 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)