4 Copyright (C) Andrew Tridgell 2007
5 Copyright (C) Ronnie Sahlberg 2007
7 This program is free software; you can redistribute it and/or modify
8 it under the terms of the GNU General Public License as published by
9 the Free Software Foundation; either version 3 of the License, or
10 (at your option) any later version.
12 This program is distributed in the hope that it will be useful,
13 but WITHOUT ANY WARRANTY; without even the implied warranty of
14 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 GNU General Public License for more details.
17 You should have received a copy of the GNU General Public License
18 along with this program; if not, see <http://www.gnu.org/licenses/>.
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/tevent/tevent.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
30 #include "../include/ctdb_private.h"
31 #include "lib/util/dlinklist.h"
36 allocate a packet for use in client<->daemon communication
38 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
40 enum ctdb_operation operation,
41 size_t length, size_t slength,
45 struct ctdb_req_header *hdr;
47 length = MAX(length, slength);
48 size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
50 hdr = (struct ctdb_req_header *)talloc_size(mem_ctx, size);
52 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
53 operation, (unsigned)length));
56 talloc_set_name_const(hdr, type);
57 memset(hdr, 0, slength);
59 hdr->operation = operation;
60 hdr->ctdb_magic = CTDB_MAGIC;
61 hdr->ctdb_version = CTDB_VERSION;
62 hdr->srcnode = ctdb->pnn;
64 hdr->generation = ctdb->vnn_map->generation;
71 local version of ctdb_call
73 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
74 struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
75 TDB_DATA *data, uint32_t caller)
77 struct ctdb_call_info *c;
78 struct ctdb_registered_call *fn;
79 struct ctdb_context *ctdb = ctdb_db->ctdb;
81 c = talloc(ctdb, struct ctdb_call_info);
82 CTDB_NO_MEMORY(ctdb, c);
85 c->call_data = &call->call_data;
86 c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
87 c->record_data.dsize = data->dsize;
88 CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
93 for (fn=ctdb_db->calls;fn;fn=fn->next) {
94 if (fn->id == call->call_id) break;
97 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
102 if (fn->fn(c) != 0) {
103 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
108 /* we need to force the record to be written out if this was a remote access */
109 if (header->laccessor != caller) {
112 header->laccessor = caller;
115 /* we need to force the record to be written out if this was a remote access,
116 so that the lacount is updated */
117 if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
118 c->new_data = &c->record_data;
122 /* XXX check that we always have the lock here? */
123 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
124 ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
131 call->reply_data = *c->reply_data;
133 talloc_steal(call, call->reply_data.dptr);
134 talloc_set_name_const(call->reply_data.dptr, __location__);
136 call->reply_data.dptr = NULL;
137 call->reply_data.dsize = 0;
139 call->status = c->status;
148 queue a packet for sending from client to daemon
150 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
152 return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
157 called when a CTDB_REPLY_CALL packet comes in in the client
159 This packet comes in response to a CTDB_REQ_CALL request packet. It
160 contains any reply data from the call
162 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
164 struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
165 struct ctdb_client_call_state *state;
167 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
169 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
173 if (hdr->reqid != state->reqid) {
174 /* we found a record but it was the wrong one */
175 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
179 state->call->reply_data.dptr = c->data;
180 state->call->reply_data.dsize = c->datalen;
181 state->call->status = c->status;
183 talloc_steal(state, c);
185 state->state = CTDB_CALL_DONE;
187 if (state->async.fn) {
188 state->async.fn(state);
192 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
195 this is called in the client, when data comes in from the daemon
197 static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
199 struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
200 struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
203 /* place the packet as a child of a tmp_ctx. We then use
204 talloc_free() below to free it. If any of the calls want
205 to keep it, then they will steal it somewhere else, and the
206 talloc_free() will be a no-op */
207 tmp_ctx = talloc_new(ctdb);
208 talloc_steal(tmp_ctx, hdr);
211 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
215 if (cnt < sizeof(*hdr)) {
216 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
219 if (cnt != hdr->length) {
220 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n",
221 (unsigned)hdr->length, (unsigned)cnt);
225 if (hdr->ctdb_magic != CTDB_MAGIC) {
226 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
230 if (hdr->ctdb_version != CTDB_VERSION) {
231 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
235 switch (hdr->operation) {
236 case CTDB_REPLY_CALL:
237 ctdb_client_reply_call(ctdb, hdr);
240 case CTDB_REQ_MESSAGE:
241 ctdb_request_message(ctdb, hdr);
244 case CTDB_REPLY_CONTROL:
245 ctdb_client_reply_control(ctdb, hdr);
249 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
253 talloc_free(tmp_ctx);
257 connect to a unix domain socket
259 int ctdb_socket_connect(struct ctdb_context *ctdb)
261 struct sockaddr_un addr;
263 memset(&addr, 0, sizeof(addr));
264 addr.sun_family = AF_UNIX;
265 strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
267 ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
268 if (ctdb->daemon.sd == -1) {
269 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
273 set_nonblocking(ctdb->daemon.sd);
274 set_close_on_exec(ctdb->daemon.sd);
276 if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
277 close(ctdb->daemon.sd);
278 ctdb->daemon.sd = -1;
279 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
283 ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
285 ctdb_client_read_cb, ctdb, "to-ctdbd");
290 struct ctdb_record_handle {
291 struct ctdb_db_context *ctdb_db;
294 struct ctdb_ltdb_header header;
299 make a recv call to the local ctdb daemon - called from client context
301 This is called when the program wants to wait for a ctdb_call to complete and get the
302 results. This call will block unless the call has already completed.
304 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
310 while (state->state < CTDB_CALL_DONE) {
311 event_loop_once(state->ctdb_db->ctdb->ev);
313 if (state->state != CTDB_CALL_DONE) {
314 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
319 if (state->call->reply_data.dsize) {
320 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
321 state->call->reply_data.dptr,
322 state->call->reply_data.dsize);
323 call->reply_data.dsize = state->call->reply_data.dsize;
325 call->reply_data.dptr = NULL;
326 call->reply_data.dsize = 0;
328 call->status = state->call->status;
338 destroy a ctdb_call in client
340 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)
342 ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
347 construct an event driven local ctdb_call
349 this is used so that locally processed ctdb_call requests are processed
350 in an event driven manner
352 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db,
353 struct ctdb_call *call,
354 struct ctdb_ltdb_header *header,
357 struct ctdb_client_call_state *state;
358 struct ctdb_context *ctdb = ctdb_db->ctdb;
361 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
362 CTDB_NO_MEMORY_NULL(ctdb, state);
363 state->call = talloc_zero(state, struct ctdb_call);
364 CTDB_NO_MEMORY_NULL(ctdb, state->call);
366 talloc_steal(state, data->dptr);
368 state->state = CTDB_CALL_DONE;
369 *(state->call) = *call;
370 state->ctdb_db = ctdb_db;
372 ret = ctdb_call_local(ctdb_db, state->call, header, state, data, ctdb->pnn);
378 make a ctdb call to the local daemon - async send. Called from client context.
380 This constructs a ctdb_call request and queues it for processing.
381 This call never blocks.
383 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
384 struct ctdb_call *call)
386 struct ctdb_client_call_state *state;
387 struct ctdb_context *ctdb = ctdb_db->ctdb;
388 struct ctdb_ltdb_header header;
392 struct ctdb_req_call *c;
394 /* if the domain socket is not yet open, open it */
395 if (ctdb->daemon.sd==-1) {
396 ctdb_socket_connect(ctdb);
399 ret = ctdb_ltdb_lock(ctdb_db, call->key);
401 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
405 ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
407 if (ret == 0 && header.dmaster == ctdb->pnn) {
408 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
409 talloc_free(data.dptr);
410 ctdb_ltdb_unlock(ctdb_db, call->key);
414 ctdb_ltdb_unlock(ctdb_db, call->key);
415 talloc_free(data.dptr);
417 state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
419 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
422 state->call = talloc_zero(state, struct ctdb_call);
423 if (state->call == NULL) {
424 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
428 len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
429 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
431 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
435 state->reqid = ctdb_reqid_new(ctdb, state);
436 state->ctdb_db = ctdb_db;
437 talloc_set_destructor(state, ctdb_client_call_destructor);
439 c->hdr.reqid = state->reqid;
440 c->flags = call->flags;
441 c->db_id = ctdb_db->db_id;
442 c->callid = call->call_id;
444 c->keylen = call->key.dsize;
445 c->calldatalen = call->call_data.dsize;
446 memcpy(&c->data[0], call->key.dptr, call->key.dsize);
447 memcpy(&c->data[call->key.dsize],
448 call->call_data.dptr, call->call_data.dsize);
449 *(state->call) = *call;
450 state->call->call_data.dptr = &c->data[call->key.dsize];
451 state->call->key.dptr = &c->data[0];
453 state->state = CTDB_CALL_WAIT;
456 ctdb_client_queue_pkt(ctdb, &c->hdr);
463 full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
465 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
467 struct ctdb_client_call_state *state;
469 state = ctdb_call_send(ctdb_db, call);
470 return ctdb_call_recv(state, call);
475 tell the daemon what messaging srvid we will use, and register the message
476 handler function in the client
478 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
479 ctdb_msg_fn_t handler,
486 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0,
487 tdb_null, NULL, NULL, &status, NULL, NULL);
488 if (res != 0 || status != 0) {
489 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
493 /* also need to register the handler with our own ctdb structure */
494 return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
498 tell the daemon we no longer want a srvid
500 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
505 res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0,
506 tdb_null, NULL, NULL, &status, NULL, NULL);
507 if (res != 0 || status != 0) {
508 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
512 /* also need to register the handler with our own ctdb structure */
513 ctdb_deregister_message_handler(ctdb, srvid, private_data);
519 send a message - from client context
521 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
522 uint64_t srvid, TDB_DATA data)
524 struct ctdb_req_message *r;
527 len = offsetof(struct ctdb_req_message, data) + data.dsize;
528 r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
529 len, struct ctdb_req_message);
530 CTDB_NO_MEMORY(ctdb, r);
532 r->hdr.destnode = pnn;
534 r->datalen = data.dsize;
535 memcpy(&r->data[0], data.dptr, data.dsize);
537 res = ctdb_client_queue_pkt(ctdb, &r->hdr);
548 cancel a ctdb_fetch_lock operation, releasing the lock
550 static int fetch_lock_destructor(struct ctdb_record_handle *h)
552 ctdb_ltdb_unlock(h->ctdb_db, h->key);
557 force the migration of a record to this node
559 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
561 struct ctdb_call call;
563 call.call_id = CTDB_NULL_FUNC;
565 call.flags = CTDB_IMMEDIATE_MIGRATION;
566 return ctdb_call(ctdb_db, &call);
570 get a lock on a record, and return the records data. Blocks until it gets the lock
572 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
573 TDB_DATA key, TDB_DATA *data)
576 struct ctdb_record_handle *h;
579 procedure is as follows:
581 1) get the chain lock.
582 2) check if we are dmaster
583 3) if we are the dmaster then return handle
584 4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
586 5) when we get the reply, goto (1)
589 h = talloc_zero(mem_ctx, struct ctdb_record_handle);
594 h->ctdb_db = ctdb_db;
596 h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
597 if (h->key.dptr == NULL) {
603 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize,
604 (const char *)key.dptr));
607 /* step 1 - get the chain lock */
608 ret = ctdb_ltdb_lock(ctdb_db, key);
610 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
615 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
617 talloc_set_destructor(h, fetch_lock_destructor);
619 ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
621 /* when torturing, ensure we test the remote path */
622 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
624 h->header.dmaster = (uint32_t)-1;
628 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
630 if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
631 ctdb_ltdb_unlock(ctdb_db, key);
632 ret = ctdb_client_force_migration(ctdb_db, key);
634 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
641 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
646 store some data to the record that was locked with ctdb_fetch_lock()
648 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
650 if (h->ctdb_db->persistent) {
651 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
655 return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
659 non-locking fetch of a record
661 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
662 TDB_DATA key, TDB_DATA *data)
664 struct ctdb_call call;
667 call.call_id = CTDB_FETCH_FUNC;
668 call.call_data.dptr = NULL;
669 call.call_data.dsize = 0;
671 ret = ctdb_call(ctdb_db, &call);
674 *data = call.reply_data;
675 talloc_steal(mem_ctx, data->dptr);
684 called when a control completes or timesout to invoke the callback
685 function the user provided
687 static void invoke_control_callback(struct event_context *ev, struct timed_event *te,
688 struct timeval t, void *private_data)
690 struct ctdb_client_control_state *state;
691 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
694 state = talloc_get_type(private_data, struct ctdb_client_control_state);
695 talloc_steal(tmp_ctx, state);
697 ret = ctdb_control_recv(state->ctdb, state, state,
702 talloc_free(tmp_ctx);
706 called when a CTDB_REPLY_CONTROL packet comes in in the client
708 This packet comes in response to a CTDB_REQ_CONTROL request packet. It
709 contains any reply data from the control
711 static void ctdb_client_reply_control(struct ctdb_context *ctdb,
712 struct ctdb_req_header *hdr)
714 struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
715 struct ctdb_client_control_state *state;
717 state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
719 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
723 if (hdr->reqid != state->reqid) {
724 /* we found a record but it was the wrong one */
725 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
729 state->outdata.dptr = c->data;
730 state->outdata.dsize = c->datalen;
731 state->status = c->status;
733 state->errormsg = talloc_strndup(state,
734 (char *)&c->data[c->datalen],
738 /* state->outdata now uses resources from c so we dont want c
739 to just dissappear from under us while state is still alive
741 talloc_steal(state, c);
743 state->state = CTDB_CONTROL_DONE;
745 /* if we had a callback registered for this control, pull the response
746 and call the callback.
748 if (state->async.fn) {
749 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
755 destroy a ctdb_control in client
757 static int ctdb_control_destructor(struct ctdb_client_control_state *state)
759 ctdb_reqid_remove(state->ctdb, state->reqid);
764 /* time out handler for ctdb_control */
765 static void control_timeout_func(struct event_context *ev, struct timed_event *te,
766 struct timeval t, void *private_data)
768 struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
770 DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
771 "dstnode:%u\n", state->reqid, state->c->opcode,
772 state->c->hdr.destnode));
774 state->state = CTDB_CONTROL_TIMEOUT;
776 /* if we had a callback registered for this control, pull the response
777 and call the callback.
779 if (state->async.fn) {
780 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
784 /* async version of send control request */
785 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb,
786 uint32_t destnode, uint64_t srvid,
787 uint32_t opcode, uint32_t flags, TDB_DATA data,
789 struct timeval *timeout,
792 struct ctdb_client_control_state *state;
794 struct ctdb_req_control *c;
801 /* if the domain socket is not yet open, open it */
802 if (ctdb->daemon.sd==-1) {
803 ctdb_socket_connect(ctdb);
806 state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
807 CTDB_NO_MEMORY_NULL(ctdb, state);
810 state->reqid = ctdb_reqid_new(ctdb, state);
811 state->state = CTDB_CONTROL_WAIT;
812 state->errormsg = NULL;
814 talloc_set_destructor(state, ctdb_control_destructor);
816 len = offsetof(struct ctdb_req_control, data) + data.dsize;
817 c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
818 len, struct ctdb_req_control);
820 CTDB_NO_MEMORY_NULL(ctdb, c);
821 c->hdr.reqid = state->reqid;
822 c->hdr.destnode = destnode;
827 c->datalen = data.dsize;
829 memcpy(&c->data[0], data.dptr, data.dsize);
833 if (timeout && !timeval_is_zero(timeout)) {
834 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
837 ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
843 if (flags & CTDB_CTRL_FLAG_NOREPLY) {
852 /* async version of receive control reply */
853 int ctdb_control_recv(struct ctdb_context *ctdb,
854 struct ctdb_client_control_state *state,
856 TDB_DATA *outdata, int32_t *status, char **errormsg)
860 if (status != NULL) {
863 if (errormsg != NULL) {
871 /* prevent double free of state */
872 tmp_ctx = talloc_new(ctdb);
873 talloc_steal(tmp_ctx, state);
875 /* loop one event at a time until we either timeout or the control
878 while (state->state == CTDB_CONTROL_WAIT) {
879 event_loop_once(ctdb->ev);
882 if (state->state != CTDB_CONTROL_DONE) {
883 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
884 if (state->async.fn) {
885 state->async.fn(state);
887 talloc_free(tmp_ctx);
891 if (state->errormsg) {
892 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
894 (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
896 if (state->async.fn) {
897 state->async.fn(state);
899 talloc_free(tmp_ctx);
904 *outdata = state->outdata;
905 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
909 *status = state->status;
912 if (state->async.fn) {
913 state->async.fn(state);
916 talloc_free(tmp_ctx);
923 send a ctdb control message
924 timeout specifies how long we should wait for a reply.
925 if timeout is NULL we wait indefinitely
927 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
928 uint32_t opcode, uint32_t flags, TDB_DATA data,
929 TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
930 struct timeval *timeout,
933 struct ctdb_client_control_state *state;
935 state = ctdb_control_send(ctdb, destnode, srvid, opcode,
936 flags, data, mem_ctx,
938 return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status,
946 a process exists call. Returns 0 if process exists, -1 otherwise
948 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
954 data.dptr = (uint8_t*)&pid;
955 data.dsize = sizeof(pid);
957 ret = ctdb_control(ctdb, destnode, 0,
958 CTDB_CONTROL_PROCESS_EXISTS, 0, data,
959 NULL, NULL, &status, NULL, NULL);
961 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
969 get remote statistics
971 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
977 ret = ctdb_control(ctdb, destnode, 0,
978 CTDB_CONTROL_STATISTICS, 0, tdb_null,
979 ctdb, &data, &res, NULL, NULL);
980 if (ret != 0 || res != 0) {
981 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
985 if (data.dsize != sizeof(struct ctdb_statistics)) {
986 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
987 (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
991 *status = *(struct ctdb_statistics *)data.dptr;
992 talloc_free(data.dptr);
998 shutdown a remote ctdb node
1000 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1002 struct ctdb_client_control_state *state;
1004 state = ctdb_control_send(ctdb, destnode, 0,
1005 CTDB_CONTROL_SHUTDOWN, 0, tdb_null,
1006 NULL, &timeout, NULL);
1007 if (state == NULL) {
1008 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1016 get vnn map from a remote node
1018 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1023 struct ctdb_vnn_map_wire *map;
1025 ret = ctdb_control(ctdb, destnode, 0,
1026 CTDB_CONTROL_GETVNNMAP, 0, tdb_null,
1027 mem_ctx, &outdata, &res, &timeout, NULL);
1028 if (ret != 0 || res != 0) {
1029 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1033 map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1034 if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1035 outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1036 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1040 (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1041 CTDB_NO_MEMORY(ctdb, *vnnmap);
1042 (*vnnmap)->generation = map->generation;
1043 (*vnnmap)->size = map->size;
1044 (*vnnmap)->map = talloc_array(*vnnmap, uint32_t, map->size);
1046 CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1047 memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1048 talloc_free(outdata.dptr);
1055 get the recovery mode of a remote node
1057 struct ctdb_client_control_state *
1058 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1060 return ctdb_control_send(ctdb, destnode, 0,
1061 CTDB_CONTROL_GET_RECMODE, 0, tdb_null,
1062 mem_ctx, &timeout, NULL);
1065 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1070 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1072 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1077 *recmode = (uint32_t)res;
1083 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1085 struct ctdb_client_control_state *state;
1087 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1088 return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1095 set the recovery mode of a remote node
1097 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1103 data.dsize = sizeof(uint32_t);
1104 data.dptr = (unsigned char *)&recmode;
1106 ret = ctdb_control(ctdb, destnode, 0,
1107 CTDB_CONTROL_SET_RECMODE, 0, data,
1108 NULL, NULL, &res, &timeout, NULL);
1109 if (ret != 0 || res != 0) {
1110 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1120 get the recovery master of a remote node
1122 struct ctdb_client_control_state *
1123 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
1124 struct timeval timeout, uint32_t destnode)
1126 return ctdb_control_send(ctdb, destnode, 0,
1127 CTDB_CONTROL_GET_RECMASTER, 0, tdb_null,
1128 mem_ctx, &timeout, NULL);
1131 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1136 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1138 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1143 *recmaster = (uint32_t)res;
1149 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1151 struct ctdb_client_control_state *state;
1153 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1154 return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1159 set the recovery master of a remote node
1161 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1168 data.dsize = sizeof(uint32_t);
1169 data.dptr = (unsigned char *)&recmaster;
1171 ret = ctdb_control(ctdb, destnode, 0,
1172 CTDB_CONTROL_SET_RECMASTER, 0, data,
1173 NULL, NULL, &res, &timeout, NULL);
1174 if (ret != 0 || res != 0) {
1175 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1184 get a list of databases off a remote node
1186 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1187 TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1193 ret = ctdb_control(ctdb, destnode, 0,
1194 CTDB_CONTROL_GET_DBMAP, 0, tdb_null,
1195 mem_ctx, &outdata, &res, &timeout, NULL);
1196 if (ret != 0 || res != 0) {
1197 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1201 *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1202 talloc_free(outdata.dptr);
1208 get a list of nodes (vnn and flags ) from a remote node
1210 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb,
1211 struct timeval timeout, uint32_t destnode,
1212 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1218 ret = ctdb_control(ctdb, destnode, 0,
1219 CTDB_CONTROL_GET_NODEMAP, 0, tdb_null,
1220 mem_ctx, &outdata, &res, &timeout, NULL);
1221 if (ret == 0 && res == -1 && outdata.dsize == 0) {
1222 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1223 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1225 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1226 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1230 *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1231 talloc_free(outdata.dptr);
1237 old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1239 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb,
1240 struct timeval timeout, uint32_t destnode,
1241 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1245 struct ctdb_node_mapv4 *nodemapv4;
1248 ret = ctdb_control(ctdb, destnode, 0,
1249 CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null,
1250 mem_ctx, &outdata, &res, &timeout, NULL);
1251 if (ret != 0 || res != 0 || outdata.dsize == 0) {
1252 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1256 nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1258 len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1259 (*nodemap) = talloc_zero_size(mem_ctx, len);
1260 CTDB_NO_MEMORY(ctdb, (*nodemap));
1262 (*nodemap)->num = nodemapv4->num;
1263 for (i=0; i<nodemapv4->num; i++) {
1264 (*nodemap)->nodes[i].pnn = nodemapv4->nodes[i].pnn;
1265 (*nodemap)->nodes[i].flags = nodemapv4->nodes[i].flags;
1266 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1267 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1270 talloc_free(outdata.dptr);
1276 drop the transport, reload the nodes file and restart the transport
1278 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb,
1279 struct timeval timeout, uint32_t destnode)
1284 ret = ctdb_control(ctdb, destnode, 0,
1285 CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null,
1286 NULL, NULL, &res, &timeout, NULL);
1287 if (ret != 0 || res != 0) {
1288 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1297 set vnn map on a node
1299 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1300 TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1305 struct ctdb_vnn_map_wire *map;
1308 len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1309 map = talloc_size(mem_ctx, len);
1310 CTDB_NO_MEMORY(ctdb, map);
1312 map->generation = vnnmap->generation;
1313 map->size = vnnmap->size;
1314 memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1317 data.dptr = (uint8_t *)map;
1319 ret = ctdb_control(ctdb, destnode, 0,
1320 CTDB_CONTROL_SETVNNMAP, 0, data,
1321 NULL, NULL, &res, &timeout, NULL);
1322 if (ret != 0 || res != 0) {
1323 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1334 async send for pull database
1336 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1337 struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1338 uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1341 struct ctdb_control_pulldb *pull;
1342 struct ctdb_client_control_state *state;
1344 pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1345 CTDB_NO_MEMORY_NULL(ctdb, pull);
1348 pull->lmaster = lmaster;
1350 indata.dsize = sizeof(struct ctdb_control_pulldb);
1351 indata.dptr = (unsigned char *)pull;
1353 state = ctdb_control_send(ctdb, destnode, 0,
1354 CTDB_CONTROL_PULL_DB, 0, indata,
1355 mem_ctx, &timeout, NULL);
1362 async recv for pull database
1364 int ctdb_ctrl_pulldb_recv(
1365 struct ctdb_context *ctdb,
1366 TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state,
1372 ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1373 if ( (ret != 0) || (res != 0) ){
1374 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1382 pull all keys and records for a specific database on a node
1384 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode,
1385 uint32_t dbid, uint32_t lmaster,
1386 TALLOC_CTX *mem_ctx, struct timeval timeout,
1389 struct ctdb_client_control_state *state;
1391 state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1394 return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1399 change dmaster for all keys in the database to the new value
1401 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1402 TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1408 indata.dsize = 2*sizeof(uint32_t);
1409 indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1411 ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1412 ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1414 ret = ctdb_control(ctdb, destnode, 0,
1415 CTDB_CONTROL_SET_DMASTER, 0, indata,
1416 NULL, NULL, &res, &timeout, NULL);
1417 if (ret != 0 || res != 0) {
1418 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1426 ping a node, return number of clients connected
1428 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1433 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0,
1434 tdb_null, NULL, NULL, &res, NULL, NULL);
1442 find the real path to a ltdb
1444 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1451 data.dptr = (uint8_t *)&dbid;
1452 data.dsize = sizeof(dbid);
1454 ret = ctdb_control(ctdb, destnode, 0,
1455 CTDB_CONTROL_GETDBPATH, 0, data,
1456 mem_ctx, &data, &res, &timeout, NULL);
1457 if (ret != 0 || res != 0) {
1461 (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1462 if ((*path) == NULL) {
1466 talloc_free(data.dptr);
1472 find the name of a db
1474 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx,
1481 data.dptr = (uint8_t *)&dbid;
1482 data.dsize = sizeof(dbid);
1484 ret = ctdb_control(ctdb, destnode, 0,
1485 CTDB_CONTROL_GET_DBNAME, 0, data,
1486 mem_ctx, &data, &res, &timeout, NULL);
1487 if (ret != 0 || res != 0) {
1491 (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1492 if ((*name) == NULL) {
1496 talloc_free(data.dptr);
1502 get the health status of a db
1504 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1505 struct timeval timeout,
1507 uint32_t dbid, TALLOC_CTX *mem_ctx,
1508 const char **reason)
1514 data.dptr = (uint8_t *)&dbid;
1515 data.dsize = sizeof(dbid);
1517 ret = ctdb_control(ctdb, destnode, 0,
1518 CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1519 mem_ctx, &data, &res, &timeout, NULL);
1520 if (ret != 0 || res != 0) {
1524 if (data.dsize == 0) {
1529 (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1530 if ((*reason) == NULL) {
1534 talloc_free(data.dptr);
1542 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
1543 TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1549 data.dptr = discard_const(name);
1550 data.dsize = strlen(name)+1;
1552 ret = ctdb_control(ctdb, destnode, 0,
1553 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1555 mem_ctx, &data, &res, &timeout, NULL);
1557 if (ret != 0 || res != 0) {
1565 get debug level on a node
1567 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1573 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null,
1574 ctdb, &data, &res, NULL, NULL);
1575 if (ret != 0 || res != 0) {
1578 if (data.dsize != sizeof(int32_t)) {
1579 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1580 (unsigned)data.dsize));
1583 *level = *(int32_t *)data.dptr;
1584 talloc_free(data.dptr);
1589 set debug level on a node
1591 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1597 data.dptr = (uint8_t *)&level;
1598 data.dsize = sizeof(level);
1600 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data,
1601 NULL, NULL, &res, NULL, NULL);
1602 if (ret != 0 || res != 0) {
1610 get a list of connected nodes
1612 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb,
1613 struct timeval timeout,
1614 TALLOC_CTX *mem_ctx,
1615 uint32_t *num_nodes)
1617 struct ctdb_node_map *map=NULL;
1623 ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1628 nodes = talloc_array(mem_ctx, uint32_t, map->num);
1629 if (nodes == NULL) {
1633 for (i=0;i<map->num;i++) {
1634 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1635 nodes[*num_nodes] = map->nodes[i].pnn;
1647 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1652 ret = ctdb_control(ctdb, destnode, 0,
1653 CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null,
1654 NULL, NULL, &res, NULL, NULL);
1655 if (ret != 0 || res != 0) {
1656 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1663 this is the dummy null procedure that all databases support
1665 static int ctdb_null_func(struct ctdb_call_info *call)
1671 this is a plain fetch procedure that all databases support
1673 static int ctdb_fetch_func(struct ctdb_call_info *call)
1675 call->reply_data = &call->record_data;
1680 attach to a specific database - client call
1682 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1684 struct ctdb_db_context *ctdb_db;
1689 ctdb_db = ctdb_db_handle(ctdb, name);
1694 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1695 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1697 ctdb_db->ctdb = ctdb;
1698 ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1699 CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1701 data.dptr = discard_const(name);
1702 data.dsize = strlen(name)+1;
1704 /* tell ctdb daemon to attach */
1705 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags,
1706 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1707 0, data, ctdb_db, &data, &res, NULL, NULL);
1708 if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1709 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1710 talloc_free(ctdb_db);
1714 ctdb_db->db_id = *(uint32_t *)data.dptr;
1715 talloc_free(data.dptr);
1717 ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1719 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1720 talloc_free(ctdb_db);
1724 tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1725 if (ctdb->valgrinding) {
1726 tdb_flags |= TDB_NOMMAP;
1728 tdb_flags |= TDB_DISALLOW_NESTING;
1730 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1731 if (ctdb_db->ltdb == NULL) {
1732 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1733 talloc_free(ctdb_db);
1737 ctdb_db->persistent = persistent;
1739 DLIST_ADD(ctdb->db_list, ctdb_db);
1741 /* add well known functions */
1742 ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1743 ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1750 setup a call for a database
1752 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1754 struct ctdb_registered_call *call;
1759 struct ctdb_control_set_call c;
1762 /* this is no longer valid with the separate daemon architecture */
1763 c.db_id = ctdb_db->db_id;
1767 data.dptr = (uint8_t *)&c;
1768 data.dsize = sizeof(c);
1770 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1771 data, NULL, NULL, &status, NULL, NULL);
1772 if (ret != 0 || status != 0) {
1773 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1778 /* also register locally */
1779 call = talloc(ctdb_db, struct ctdb_registered_call);
1783 DLIST_ADD(ctdb_db->calls, call);
1788 struct traverse_state {
1791 ctdb_traverse_func fn;
1796 called on each key during a ctdb_traverse
1798 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1800 struct traverse_state *state = (struct traverse_state *)p;
1801 struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1804 if (data.dsize < sizeof(uint32_t) ||
1805 d->length != data.dsize) {
1806 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1811 key.dsize = d->keylen;
1812 key.dptr = &d->data[0];
1813 data.dsize = d->datalen;
1814 data.dptr = &d->data[d->keylen];
1816 if (key.dsize == 0 && data.dsize == 0) {
1817 /* end of traverse */
1822 if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1823 /* empty records are deleted records in ctdb */
1827 if (state->fn(ctdb, key, data, state->private_data) != 0) {
1836 start a cluster wide traverse, calling the supplied fn on each record
1837 return the number of records traversed, or -1 on error
1839 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1842 struct ctdb_traverse_start t;
1845 uint64_t srvid = (getpid() | 0xFLL<<60);
1846 struct traverse_state state;
1850 state.private_data = private_data;
1853 ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1855 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1859 t.db_id = ctdb_db->db_id;
1863 data.dptr = (uint8_t *)&t;
1864 data.dsize = sizeof(t);
1866 ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1867 data, NULL, NULL, &status, NULL, NULL);
1868 if (ret != 0 || status != 0) {
1869 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1870 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1874 while (!state.done) {
1875 event_loop_once(ctdb_db->ctdb->ev);
1878 ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1880 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1887 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
1889 called on each key during a catdb
1891 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1894 FILE *f = (FILE *)p;
1895 struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1897 fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1898 for (i=0;i<key.dsize;i++) {
1899 if (ISASCII(key.dptr[i])) {
1900 fprintf(f, "%c", key.dptr[i]);
1902 fprintf(f, "\\%02X", key.dptr[i]);
1907 fprintf(f, "dmaster: %u\n", h->dmaster);
1908 fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1910 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1911 for (i=sizeof(*h);i<data.dsize;i++) {
1912 if (ISASCII(data.dptr[i])) {
1913 fprintf(f, "%c", data.dptr[i]);
1915 fprintf(f, "\\%02X", data.dptr[i]);
1926 convenience function to list all keys to stdout
1928 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1930 return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1934 get the pid of a ctdb daemon
1936 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1941 ret = ctdb_control(ctdb, destnode, 0,
1942 CTDB_CONTROL_GET_PID, 0, tdb_null,
1943 NULL, NULL, &res, &timeout, NULL);
1945 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1956 async freeze send control
1958 struct ctdb_client_control_state *
1959 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1961 return ctdb_control_send(ctdb, destnode, priority,
1962 CTDB_CONTROL_FREEZE, 0, tdb_null,
1963 mem_ctx, &timeout, NULL);
1967 async freeze recv control
1969 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1974 ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1975 if ( (ret != 0) || (res != 0) ){
1976 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1984 freeze databases of a certain priority
1986 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1988 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1989 struct ctdb_client_control_state *state;
1992 state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1993 ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1994 talloc_free(tmp_ctx);
1999 /* Freeze all databases */
2000 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2004 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2005 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2013 thaw databases of a certain priority
2015 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2020 ret = ctdb_control(ctdb, destnode, priority,
2021 CTDB_CONTROL_THAW, 0, tdb_null,
2022 NULL, NULL, &res, &timeout, NULL);
2023 if (ret != 0 || res != 0) {
2024 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2031 /* thaw all databases */
2032 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2034 return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2038 get pnn of a node, or -1
2040 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2045 ret = ctdb_control(ctdb, destnode, 0,
2046 CTDB_CONTROL_GET_PNN, 0, tdb_null,
2047 NULL, NULL, &res, &timeout, NULL);
2049 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2057 get the monitoring mode of a remote node
2059 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2064 ret = ctdb_control(ctdb, destnode, 0,
2065 CTDB_CONTROL_GET_MONMODE, 0, tdb_null,
2066 NULL, NULL, &res, &timeout, NULL);
2068 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2079 set the monitoring mode of a remote node to active
2081 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2086 ret = ctdb_control(ctdb, destnode, 0,
2087 CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null,
2088 NULL, NULL,NULL, &timeout, NULL);
2090 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2100 set the monitoring mode of a remote node to disable
2102 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2107 ret = ctdb_control(ctdb, destnode, 0,
2108 CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null,
2109 NULL, NULL, NULL, &timeout, NULL);
2111 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2123 sent to a node to make it take over an ip address
2125 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout,
2126 uint32_t destnode, struct ctdb_public_ip *ip)
2129 struct ctdb_public_ipv4 ipv4;
2133 if (ip->addr.sa.sa_family == AF_INET) {
2135 ipv4.sin = ip->addr.ip;
2137 data.dsize = sizeof(ipv4);
2138 data.dptr = (uint8_t *)&ipv4;
2140 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2141 NULL, &res, &timeout, NULL);
2143 data.dsize = sizeof(*ip);
2144 data.dptr = (uint8_t *)ip;
2146 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2147 NULL, &res, &timeout, NULL);
2150 if (ret != 0 || res != 0) {
2151 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2160 sent to a node to make it release an ip address
2162 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout,
2163 uint32_t destnode, struct ctdb_public_ip *ip)
2166 struct ctdb_public_ipv4 ipv4;
2170 if (ip->addr.sa.sa_family == AF_INET) {
2172 ipv4.sin = ip->addr.ip;
2174 data.dsize = sizeof(ipv4);
2175 data.dptr = (uint8_t *)&ipv4;
2177 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2178 NULL, &res, &timeout, NULL);
2180 data.dsize = sizeof(*ip);
2181 data.dptr = (uint8_t *)ip;
2183 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2184 NULL, &res, &timeout, NULL);
2187 if (ret != 0 || res != 0) {
2188 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2199 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb,
2200 struct timeval timeout,
2202 const char *name, uint32_t *value)
2204 struct ctdb_control_get_tunable *t;
2205 TDB_DATA data, outdata;
2209 data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2210 data.dptr = talloc_size(ctdb, data.dsize);
2211 CTDB_NO_MEMORY(ctdb, data.dptr);
2213 t = (struct ctdb_control_get_tunable *)data.dptr;
2214 t->length = strlen(name)+1;
2215 memcpy(t->name, name, t->length);
2217 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2218 &outdata, &res, &timeout, NULL);
2219 talloc_free(data.dptr);
2220 if (ret != 0 || res != 0) {
2221 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2225 if (outdata.dsize != sizeof(uint32_t)) {
2226 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2227 talloc_free(outdata.dptr);
2231 *value = *(uint32_t *)outdata.dptr;
2232 talloc_free(outdata.dptr);
2240 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb,
2241 struct timeval timeout,
2243 const char *name, uint32_t value)
2245 struct ctdb_control_set_tunable *t;
2250 data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2251 data.dptr = talloc_size(ctdb, data.dsize);
2252 CTDB_NO_MEMORY(ctdb, data.dptr);
2254 t = (struct ctdb_control_set_tunable *)data.dptr;
2255 t->length = strlen(name)+1;
2256 memcpy(t->name, name, t->length);
2259 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2260 NULL, &res, &timeout, NULL);
2261 talloc_free(data.dptr);
2262 if (ret != 0 || res != 0) {
2263 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2273 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb,
2274 struct timeval timeout,
2276 TALLOC_CTX *mem_ctx,
2277 const char ***list, uint32_t *count)
2282 struct ctdb_control_list_tunable *t;
2285 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null,
2286 mem_ctx, &outdata, &res, &timeout, NULL);
2287 if (ret != 0 || res != 0) {
2288 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2292 t = (struct ctdb_control_list_tunable *)outdata.dptr;
2293 if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2294 t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2295 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2296 talloc_free(outdata.dptr);
2300 p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2301 CTDB_NO_MEMORY(ctdb, p);
2303 talloc_free(outdata.dptr);
2308 for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2309 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2310 CTDB_NO_MEMORY(ctdb, *list);
2311 (*list)[*count] = talloc_strdup(*list, s);
2312 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2322 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2323 struct timeval timeout, uint32_t destnode,
2324 TALLOC_CTX *mem_ctx,
2326 struct ctdb_all_public_ips **ips)
2332 ret = ctdb_control(ctdb, destnode, 0,
2333 CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2334 mem_ctx, &outdata, &res, &timeout, NULL);
2335 if (ret == 0 && res == -1) {
2336 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2337 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2339 if (ret != 0 || res != 0) {
2340 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2344 *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2345 talloc_free(outdata.dptr);
2350 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2351 struct timeval timeout, uint32_t destnode,
2352 TALLOC_CTX *mem_ctx,
2353 struct ctdb_all_public_ips **ips)
2355 return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2360 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb,
2361 struct timeval timeout, uint32_t destnode,
2362 TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2367 struct ctdb_all_public_ipsv4 *ipsv4;
2369 ret = ctdb_control(ctdb, destnode, 0,
2370 CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null,
2371 mem_ctx, &outdata, &res, &timeout, NULL);
2372 if (ret != 0 || res != 0) {
2373 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2377 ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2378 len = offsetof(struct ctdb_all_public_ips, ips) +
2379 ipsv4->num*sizeof(struct ctdb_public_ip);
2380 *ips = talloc_zero_size(mem_ctx, len);
2381 CTDB_NO_MEMORY(ctdb, *ips);
2382 (*ips)->num = ipsv4->num;
2383 for (i=0; i<ipsv4->num; i++) {
2384 (*ips)->ips[i].pnn = ipsv4->ips[i].pnn;
2385 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2388 talloc_free(outdata.dptr);
2393 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2394 struct timeval timeout, uint32_t destnode,
2395 TALLOC_CTX *mem_ctx,
2396 const ctdb_sock_addr *addr,
2397 struct ctdb_control_public_ip_info **_info)
2403 struct ctdb_control_public_ip_info *info;
2407 indata.dptr = discard_const_p(uint8_t, addr);
2408 indata.dsize = sizeof(*addr);
2410 ret = ctdb_control(ctdb, destnode, 0,
2411 CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2412 mem_ctx, &outdata, &res, &timeout, NULL);
2413 if (ret != 0 || res != 0) {
2414 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2415 "failed ret:%d res:%d\n",
2420 len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2421 if (len > outdata.dsize) {
2422 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2423 "returned invalid data with size %u > %u\n",
2424 (unsigned int)outdata.dsize,
2425 (unsigned int)len));
2426 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2430 info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2431 len += info->num*sizeof(struct ctdb_control_iface_info);
2433 if (len > outdata.dsize) {
2434 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2435 "returned invalid data with size %u > %u\n",
2436 (unsigned int)outdata.dsize,
2437 (unsigned int)len));
2438 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2442 /* make sure we null terminate the returned strings */
2443 for (i=0; i < info->num; i++) {
2444 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2447 *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2450 talloc_free(outdata.dptr);
2451 if (*_info == NULL) {
2452 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2453 "talloc_memdup size %u failed\n",
2454 (unsigned int)outdata.dsize));
2461 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2462 struct timeval timeout, uint32_t destnode,
2463 TALLOC_CTX *mem_ctx,
2464 struct ctdb_control_get_ifaces **_ifaces)
2469 struct ctdb_control_get_ifaces *ifaces;
2473 ret = ctdb_control(ctdb, destnode, 0,
2474 CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2475 mem_ctx, &outdata, &res, &timeout, NULL);
2476 if (ret != 0 || res != 0) {
2477 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2478 "failed ret:%d res:%d\n",
2483 len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2484 if (len > outdata.dsize) {
2485 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2486 "returned invalid data with size %u > %u\n",
2487 (unsigned int)outdata.dsize,
2488 (unsigned int)len));
2489 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2493 ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2494 len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2496 if (len > outdata.dsize) {
2497 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2498 "returned invalid data with size %u > %u\n",
2499 (unsigned int)outdata.dsize,
2500 (unsigned int)len));
2501 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2505 /* make sure we null terminate the returned strings */
2506 for (i=0; i < ifaces->num; i++) {
2507 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2510 *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2513 talloc_free(outdata.dptr);
2514 if (*_ifaces == NULL) {
2515 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2516 "talloc_memdup size %u failed\n",
2517 (unsigned int)outdata.dsize));
2524 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2525 struct timeval timeout, uint32_t destnode,
2526 TALLOC_CTX *mem_ctx,
2527 const struct ctdb_control_iface_info *info)
2533 indata.dptr = discard_const_p(uint8_t, info);
2534 indata.dsize = sizeof(*info);
2536 ret = ctdb_control(ctdb, destnode, 0,
2537 CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2538 mem_ctx, NULL, &res, &timeout, NULL);
2539 if (ret != 0 || res != 0) {
2540 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2541 "failed ret:%d res:%d\n",
2550 set/clear the permanent disabled bit on a remote node
2552 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
2553 uint32_t set, uint32_t clear)
2557 struct ctdb_node_map *nodemap=NULL;
2558 struct ctdb_node_flag_change c;
2559 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2564 /* find the recovery master */
2565 ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2567 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2568 talloc_free(tmp_ctx);
2573 /* read the node flags from the recmaster */
2574 ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2576 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2577 talloc_free(tmp_ctx);
2580 if (destnode >= nodemap->num) {
2581 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2582 talloc_free(tmp_ctx);
2587 c.old_flags = nodemap->nodes[destnode].flags;
2588 c.new_flags = c.old_flags;
2590 c.new_flags &= ~clear;
2592 data.dsize = sizeof(c);
2593 data.dptr = (unsigned char *)&c;
2595 /* send the flags update to all connected nodes */
2596 nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2598 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2600 timeout, false, data,
2603 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2605 talloc_free(tmp_ctx);
2609 talloc_free(tmp_ctx);
2617 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb,
2618 struct timeval timeout,
2620 struct ctdb_tunable *tunables)
2626 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2627 &outdata, &res, &timeout, NULL);
2628 if (ret != 0 || res != 0) {
2629 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2633 if (outdata.dsize != sizeof(*tunables)) {
2634 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2635 (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2639 *tunables = *(struct ctdb_tunable *)outdata.dptr;
2640 talloc_free(outdata.dptr);
2645 add a public address to a node
2647 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb,
2648 struct timeval timeout,
2650 struct ctdb_control_ip_iface *pub)
2656 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2657 data.dptr = (unsigned char *)pub;
2659 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2660 NULL, &res, &timeout, NULL);
2661 if (ret != 0 || res != 0) {
2662 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2670 delete a public address from a node
2672 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb,
2673 struct timeval timeout,
2675 struct ctdb_control_ip_iface *pub)
2681 data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2682 data.dptr = (unsigned char *)pub;
2684 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2685 NULL, &res, &timeout, NULL);
2686 if (ret != 0 || res != 0) {
2687 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2695 kill a tcp connection
2697 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb,
2698 struct timeval timeout,
2700 struct ctdb_control_killtcp *killtcp)
2706 data.dsize = sizeof(struct ctdb_control_killtcp);
2707 data.dptr = (unsigned char *)killtcp;
2709 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2710 NULL, &res, &timeout, NULL);
2711 if (ret != 0 || res != 0) {
2712 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2722 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb,
2723 struct timeval timeout,
2725 ctdb_sock_addr *addr,
2731 struct ctdb_control_gratious_arp *gratious_arp;
2732 TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2735 len = strlen(ifname)+1;
2736 gratious_arp = talloc_size(tmp_ctx,
2737 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2738 CTDB_NO_MEMORY(ctdb, gratious_arp);
2740 gratious_arp->addr = *addr;
2741 gratious_arp->len = len;
2742 memcpy(&gratious_arp->iface[0], ifname, len);
2745 data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2746 data.dptr = (unsigned char *)gratious_arp;
2748 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2749 NULL, &res, &timeout, NULL);
2750 if (ret != 0 || res != 0) {
2751 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2752 talloc_free(tmp_ctx);
2756 talloc_free(tmp_ctx);
2761 get a list of all tcp tickles that a node knows about for a particular vnn
2763 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb,
2764 struct timeval timeout, uint32_t destnode,
2765 TALLOC_CTX *mem_ctx,
2766 ctdb_sock_addr *addr,
2767 struct ctdb_control_tcp_tickle_list **list)
2770 TDB_DATA data, outdata;
2773 data.dptr = (uint8_t*)addr;
2774 data.dsize = sizeof(ctdb_sock_addr);
2776 ret = ctdb_control(ctdb, destnode, 0,
2777 CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data,
2778 mem_ctx, &outdata, &status, NULL, NULL);
2779 if (ret != 0 || status != 0) {
2780 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2784 *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2790 register a server id
2792 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb,
2793 struct timeval timeout,
2794 struct ctdb_server_id *id)
2800 data.dsize = sizeof(struct ctdb_server_id);
2801 data.dptr = (unsigned char *)id;
2803 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2804 CTDB_CONTROL_REGISTER_SERVER_ID,
2806 NULL, &res, &timeout, NULL);
2807 if (ret != 0 || res != 0) {
2808 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2816 unregister a server id
2818 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb,
2819 struct timeval timeout,
2820 struct ctdb_server_id *id)
2826 data.dsize = sizeof(struct ctdb_server_id);
2827 data.dptr = (unsigned char *)id;
2829 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
2830 CTDB_CONTROL_UNREGISTER_SERVER_ID,
2832 NULL, &res, &timeout, NULL);
2833 if (ret != 0 || res != 0) {
2834 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2843 check if a server id exists
2845 if a server id does exist, return *status == 1, otherwise *status == 0
2847 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb,
2848 struct timeval timeout,
2850 struct ctdb_server_id *id,
2857 data.dsize = sizeof(struct ctdb_server_id);
2858 data.dptr = (unsigned char *)id;
2860 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID,
2862 NULL, &res, &timeout, NULL);
2864 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2878 get the list of server ids that are registered on a node
2880 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2881 TALLOC_CTX *mem_ctx,
2882 struct timeval timeout, uint32_t destnode,
2883 struct ctdb_server_id_list **svid_list)
2889 ret = ctdb_control(ctdb, destnode, 0,
2890 CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null,
2891 mem_ctx, &outdata, &res, &timeout, NULL);
2892 if (ret != 0 || res != 0) {
2893 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2897 *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2903 initialise the ctdb daemon for client applications
2905 NOTE: In current code the daemon does not fork. This is for testing purposes only
2906 and to simplify the code.
2908 struct ctdb_context *ctdb_init(struct event_context *ev)
2911 struct ctdb_context *ctdb;
2913 ctdb = talloc_zero(ev, struct ctdb_context);
2915 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
2919 ctdb->idr = idr_init(ctdb);
2920 /* Wrap early to exercise code. */
2921 ctdb->lastid = INT_MAX-200;
2922 CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
2924 ret = ctdb_set_socketname(ctdb, CTDB_PATH);
2926 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
2931 ctdb->statistics.statistics_start_time = timeval_current();
2940 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2942 ctdb->flags |= flags;
2946 setup the local socket name
2948 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
2950 ctdb->daemon.name = talloc_strdup(ctdb, socketname);
2951 CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
2956 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
2958 return ctdb->daemon.name;
2962 return the pnn of this node
2964 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2971 get the uptime of a remote node
2973 struct ctdb_client_control_state *
2974 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2976 return ctdb_control_send(ctdb, destnode, 0,
2977 CTDB_CONTROL_UPTIME, 0, tdb_null,
2978 mem_ctx, &timeout, NULL);
2981 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2987 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2988 if (ret != 0 || res != 0) {
2989 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2993 *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2998 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3000 struct ctdb_client_control_state *state;
3002 state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3003 return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3007 send a control to execute the "recovered" event script on a node
3009 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3014 ret = ctdb_control(ctdb, destnode, 0,
3015 CTDB_CONTROL_END_RECOVERY, 0, tdb_null,
3016 NULL, NULL, &status, &timeout, NULL);
3017 if (ret != 0 || status != 0) {
3018 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3026 callback for the async helpers used when sending the same control
3027 to multiple nodes in parallell.
3029 static void async_callback(struct ctdb_client_control_state *state)
3031 struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3032 struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3036 uint32_t destnode = state->c->hdr.destnode;
3038 /* one more node has responded with recmode data */
3041 /* if we failed to push the db, then return an error and let
3042 the main loop try again.
3044 if (state->state != CTDB_CONTROL_DONE) {
3045 if ( !data->dont_log_errors) {
3046 DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3049 if (data->fail_callback) {
3050 data->fail_callback(ctdb, destnode, res, outdata,
3051 data->callback_data);
3056 state->async.fn = NULL;
3058 ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3059 if ((ret != 0) || (res != 0)) {
3060 if ( !data->dont_log_errors) {
3061 DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3064 if (data->fail_callback) {
3065 data->fail_callback(ctdb, destnode, res, outdata,
3066 data->callback_data);
3069 if ((ret == 0) && (data->callback != NULL)) {
3070 data->callback(ctdb, destnode, res, outdata,
3071 data->callback_data);
3076 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3078 /* set up the callback functions */
3079 state->async.fn = async_callback;
3080 state->async.private_data = data;
3082 /* one more control to wait for to complete */
3087 /* wait for up to the maximum number of seconds allowed
3088 or until all nodes we expect a response from has replied
3090 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3092 while (data->count > 0) {
3093 event_loop_once(ctdb->ev);
3095 if (data->fail_count != 0) {
3096 if (!data->dont_log_errors) {
3097 DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n",
3107 perform a simple control on the listed nodes
3108 The control cannot return data
3110 int ctdb_client_async_control(struct ctdb_context *ctdb,
3111 enum ctdb_controls opcode,
3114 struct timeval timeout,
3115 bool dont_log_errors,
3117 client_async_callback client_callback,
3118 client_async_callback fail_callback,
3119 void *callback_data)
3121 struct client_async_data *async_data;
3122 struct ctdb_client_control_state *state;
3125 async_data = talloc_zero(ctdb, struct client_async_data);
3126 CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3127 async_data->dont_log_errors = dont_log_errors;
3128 async_data->callback = client_callback;
3129 async_data->fail_callback = fail_callback;
3130 async_data->callback_data = callback_data;
3131 async_data->opcode = opcode;
3133 num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3135 /* loop over all nodes and send an async control to each of them */
3136 for (j=0; j<num_nodes; j++) {
3137 uint32_t pnn = nodes[j];
3139 state = ctdb_control_send(ctdb, pnn, srvid, opcode,
3140 0, data, async_data, &timeout, NULL);
3141 if (state == NULL) {
3142 DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3143 talloc_free(async_data);
3147 ctdb_client_async_add(async_data, state);
3150 if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3151 talloc_free(async_data);
3155 talloc_free(async_data);
3159 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3160 struct ctdb_vnn_map *vnn_map,
3161 TALLOC_CTX *mem_ctx,
3164 int i, j, num_nodes;
3167 for (i=num_nodes=0;i<vnn_map->size;i++) {
3168 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3174 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3175 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3177 for (i=j=0;i<vnn_map->size;i++) {
3178 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3181 nodes[j++] = vnn_map->map[i];
3187 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3188 struct ctdb_node_map *node_map,
3189 TALLOC_CTX *mem_ctx,
3192 int i, j, num_nodes;
3195 for (i=num_nodes=0;i<node_map->num;i++) {
3196 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3199 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3205 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3206 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3208 for (i=j=0;i<node_map->num;i++) {
3209 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3212 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3215 nodes[j++] = node_map->nodes[i].pnn;
3221 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3222 struct ctdb_node_map *node_map,
3223 TALLOC_CTX *mem_ctx,
3226 int i, j, num_nodes;
3229 for (i=num_nodes=0;i<node_map->num;i++) {
3230 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3233 if (node_map->nodes[i].pnn == pnn) {
3239 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3240 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3242 for (i=j=0;i<node_map->num;i++) {
3243 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
3246 if (node_map->nodes[i].pnn == pnn) {
3249 nodes[j++] = node_map->nodes[i].pnn;
3255 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3256 struct ctdb_node_map *node_map,
3257 TALLOC_CTX *mem_ctx,
3260 int i, j, num_nodes;
3263 for (i=num_nodes=0;i<node_map->num;i++) {
3264 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3267 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3273 nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3274 CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3276 for (i=j=0;i<node_map->num;i++) {
3277 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3280 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
3283 nodes[j++] = node_map->nodes[i].pnn;
3290 this is used to test if a pnn lock exists and if it exists will return
3291 the number of connections that pnn has reported or -1 if that recovery
3292 daemon is not running.
3295 ctdb_read_pnn_lock(int fd, int32_t pnn)
3300 lock.l_type = F_WRLCK;
3301 lock.l_whence = SEEK_SET;
3306 if (fcntl(fd, F_GETLK, &lock) != 0) {
3307 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3311 if (lock.l_type == F_UNLCK) {
3315 if (pread(fd, &c, 1, pnn) == -1) {
3316 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3324 get capabilities of a remote node
3326 struct ctdb_client_control_state *
3327 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3329 return ctdb_control_send(ctdb, destnode, 0,
3330 CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null,
3331 mem_ctx, &timeout, NULL);
3334 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3340 ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3341 if ( (ret != 0) || (res != 0) ) {
3342 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3347 *capabilities = *((uint32_t *)outdata.dptr);
3353 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3355 struct ctdb_client_control_state *state;
3356 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3359 state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3360 ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3361 talloc_free(tmp_ctx);
3366 * check whether a transaction is active on a given db on a given node
3368 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3376 indata.dptr = (uint8_t *)&db_id;
3377 indata.dsize = sizeof(db_id);
3379 ret = ctdb_control(ctdb, destnode, 0,
3380 CTDB_CONTROL_TRANS2_ACTIVE,
3381 0, indata, NULL, NULL, &status,
3385 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3393 struct ctdb_transaction_handle {
3394 struct ctdb_db_context *ctdb_db;
3397 * we store the reads and writes done under a transaction:
3398 * - one list stores both reads and writes (m_all),
3399 * - the other just writes (m_write)
3401 struct ctdb_marshall_buffer *m_all;
3402 struct ctdb_marshall_buffer *m_write;
3405 /* start a transaction on a database */
3406 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3408 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3412 /* start a transaction on a database */
3413 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3415 struct ctdb_record_handle *rh;
3418 struct ctdb_ltdb_header header;
3419 TALLOC_CTX *tmp_ctx;
3420 const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3422 struct ctdb_db_context *ctdb_db = h->ctdb_db;
3426 key.dptr = discard_const(keyname);
3427 key.dsize = strlen(keyname);
3429 if (!ctdb_db->persistent) {
3430 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3435 tmp_ctx = talloc_new(h);
3437 rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3439 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3440 talloc_free(tmp_ctx);
3444 status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3448 unsigned long int usec = (1000 + random()) % 100000;
3449 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3450 "on db_id[0x%08x]. waiting for %lu "
3452 ctdb_db->db_id, usec));
3453 talloc_free(tmp_ctx);
3459 * store the pid in the database:
3460 * it is not enough that the node is dmaster...
3463 data.dptr = (unsigned char *)&pid;
3464 data.dsize = sizeof(pid_t);
3466 rh->header.dmaster = ctdb_db->ctdb->pnn;
3467 ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3469 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3470 "transaction record\n"));
3471 talloc_free(tmp_ctx);
3477 ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3479 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3480 talloc_free(tmp_ctx);
3484 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3486 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3487 "lock record inside transaction\n"));
3488 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3489 talloc_free(tmp_ctx);
3493 if (header.dmaster != ctdb_db->ctdb->pnn) {
3494 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3495 "transaction lock record\n"));
3496 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3497 talloc_free(tmp_ctx);
3501 if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3502 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3503 "the transaction lock record\n"));
3504 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3505 talloc_free(tmp_ctx);
3509 talloc_free(tmp_ctx);
3515 /* start a transaction on a database */
3516 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3517 TALLOC_CTX *mem_ctx)
3519 struct ctdb_transaction_handle *h;
3522 h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3524 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));
3528 h->ctdb_db = ctdb_db;
3530 ret = ctdb_transaction_fetch_start(h);
3536 talloc_set_destructor(h, ctdb_transaction_destructor);
3544 fetch a record inside a transaction
3546 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h,
3547 TALLOC_CTX *mem_ctx,
3548 TDB_DATA key, TDB_DATA *data)
3550 struct ctdb_ltdb_header header;
3553 ZERO_STRUCT(header);
3555 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3556 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3557 /* record doesn't exist yet */
3566 if (!h->in_replay) {
3567 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3568 if (h->m_all == NULL) {
3569 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3578 stores a record inside a transaction
3580 int ctdb_transaction_store(struct ctdb_transaction_handle *h,
3581 TDB_DATA key, TDB_DATA data)
3583 TALLOC_CTX *tmp_ctx = talloc_new(h);
3584 struct ctdb_ltdb_header header;
3588 ZERO_STRUCT(header);
3590 /* we need the header so we can update the RSN */
3591 ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3592 if (ret == -1 && header.dmaster == (uint32_t)-1) {
3593 /* the record doesn't exist - create one with us as dmaster.
3594 This is only safe because we are in a transaction and this
3595 is a persistent database */
3596 ZERO_STRUCT(header);
3597 } else if (ret != 0) {
3598 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3599 talloc_free(tmp_ctx);
3603 if (data.dsize == olddata.dsize &&
3604 memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3605 /* save writing the same data */
3606 talloc_free(tmp_ctx);
3610 header.dmaster = h->ctdb_db->ctdb->pnn;
3613 if (!h->in_replay) {
3614 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3615 if (h->m_all == NULL) {
3616 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3617 talloc_free(tmp_ctx);
3622 h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3623 if (h->m_write == NULL) {
3624 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3625 talloc_free(tmp_ctx);
3629 ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3631 talloc_free(tmp_ctx);
3637 replay a transaction
3639 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3642 struct ctdb_rec_data *rec = NULL;
3644 h->in_replay = true;
3645 talloc_free(h->m_write);
3648 ret = ctdb_transaction_fetch_start(h);
3653 for (i=0;i<h->m_all->count;i++) {
3656 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3658 DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3662 if (rec->reqid == 0) {
3664 if (ctdb_transaction_store(h, key, data) != 0) {
3669 TALLOC_CTX *tmp_ctx = talloc_new(h);
3671 if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3672 talloc_free(tmp_ctx);
3675 if (data2.dsize != data.dsize ||
3676 memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3677 /* the record has changed on us - we have to give up */
3678 talloc_free(tmp_ctx);
3681 talloc_free(tmp_ctx);
3688 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3694 commit a transaction
3696 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3700 struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3701 struct timeval timeout;
3702 enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3704 talloc_set_destructor(h, NULL);
3706 /* our commit strategy is quite complex.
3708 - we first try to commit the changes to all other nodes
3710 - if that works, then we commit locally and we are done
3712 - if a commit on another node fails, then we need to cancel
3713 the transaction, then restart the transaction (thus
3714 opening a window of time for a pending recovery to
3715 complete), then replay the transaction, checking all the
3716 reads and writes (checking that reads give the same data,
3717 and writes succeed). Then we retry the transaction to the
3722 if (h->m_write == NULL) {
3723 /* no changes were made */
3724 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3729 /* tell ctdbd to commit to the other nodes */
3730 timeout = timeval_current_ofs(1, 0);
3731 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3732 retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0,
3733 ctdb_marshall_finish(h->m_write), NULL, NULL, &status,
3735 if (ret != 0 || status != 0) {
3736 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3737 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3738 ", retrying after 1 second...\n",
3739 (retries==0)?"":"retry "));
3743 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3745 /* work out what error code we will give if we
3746 have to fail the operation */
3747 switch ((enum ctdb_trans2_commit_error)status) {
3748 case CTDB_TRANS2_COMMIT_SUCCESS:
3749 case CTDB_TRANS2_COMMIT_SOMEFAIL:
3750 case CTDB_TRANS2_COMMIT_TIMEOUT:
3751 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3753 case CTDB_TRANS2_COMMIT_ALLFAIL:
3754 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3759 if (++retries == 100) {
3760 DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n",
3761 h->ctdb_db->db_id, retries, (unsigned)failure_control));
3762 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3763 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3764 tdb_null, NULL, NULL, NULL, NULL, NULL);
3769 if (ctdb_replay_transaction(h) != 0) {
3770 DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3771 "transaction on db 0x%08x, "
3772 "failure control =%u\n",
3774 (unsigned)failure_control));
3775 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3776 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3777 tdb_null, NULL, NULL, NULL, NULL, NULL);
3783 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3786 /* do the real commit locally */
3787 ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3789 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3790 "on db id 0x%08x locally, "
3791 "failure_control=%u\n",
3793 (unsigned)failure_control));
3794 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3795 failure_control, CTDB_CTRL_FLAG_NOREPLY,
3796 tdb_null, NULL, NULL, NULL, NULL, NULL);
3801 /* tell ctdbd that we are finished with our local commit */
3802 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id,
3803 CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY,
3804 tdb_null, NULL, NULL, NULL, NULL, NULL);
3810 recovery daemon ping to main daemon
3812 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3817 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null,
3818 ctdb, NULL, &res, NULL, NULL);
3819 if (ret != 0 || res != 0) {
3820 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3827 /* when forking the main daemon and the child process needs to connect back
3828 * to the daemon as a client process, this function can be used to change
3829 * the ctdb context from daemon into client mode
3831 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
3836 /* Add extra information so we can identify this in the logs */
3838 debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
3841 /* shutdown the transport */
3842 if (ctdb->methods) {
3843 ctdb->methods->shutdown(ctdb);
3846 /* get a new event context */
3847 talloc_free(ctdb->ev);
3848 ctdb->ev = event_context_init(ctdb);
3849 tevent_loop_allow_nesting(ctdb->ev);
3851 close(ctdb->daemon.sd);
3852 ctdb->daemon.sd = -1;
3854 /* the client does not need to be realtime */
3855 if (ctdb->do_setsched) {
3856 ctdb_restore_scheduler(ctdb);
3859 /* initialise ctdb */
3860 ret = ctdb_socket_connect(ctdb);
3862 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3870 get the status of running the monitor eventscripts: NULL means never run.
3872 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb,
3873 struct timeval timeout, uint32_t destnode,
3874 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3875 struct ctdb_scripts_wire **script_status)
3878 TDB_DATA outdata, indata;
3880 uint32_t uinttype = type;
3882 indata.dptr = (uint8_t *)&uinttype;
3883 indata.dsize = sizeof(uinttype);
3885 ret = ctdb_control(ctdb, destnode, 0,
3886 CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3887 mem_ctx, &outdata, &res, &timeout, NULL);
3888 if (ret != 0 || res != 0) {
3889 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3893 if (outdata.dsize == 0) {
3894 *script_status = NULL;
3896 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3897 talloc_free(outdata.dptr);
3904 tell the main daemon how long it took to lock the reclock file
3906 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3912 data.dptr = (uint8_t *)&latency;
3913 data.dsize = sizeof(latency);
3915 ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data,
3916 ctdb, NULL, &res, NULL, NULL);
3917 if (ret != 0 || res != 0) {
3918 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3926 get the name of the reclock file
3928 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3929 uint32_t destnode, TALLOC_CTX *mem_ctx,
3936 ret = ctdb_control(ctdb, destnode, 0,
3937 CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null,
3938 mem_ctx, &data, &res, &timeout, NULL);
3939 if (ret != 0 || res != 0) {
3943 if (data.dsize == 0) {
3946 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3948 talloc_free(data.dptr);
3954 set the reclock filename for a node
3956 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3962 if (reclock == NULL) {
3966 data.dsize = strlen(reclock) + 1;
3967 data.dptr = discard_const(reclock);
3970 ret = ctdb_control(ctdb, destnode, 0,
3971 CTDB_CONTROL_SET_RECLOCK_FILE, 0, data,
3972 NULL, NULL, &res, &timeout, NULL);
3973 if (ret != 0 || res != 0) {
3974 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3984 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3989 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null,
3990 ctdb, NULL, &res, &timeout, NULL);
3991 if (ret != 0 || res != 0) {
3992 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4002 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4006 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null,
4007 ctdb, NULL, NULL, &timeout, NULL);
4009 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4017 set the natgw state for a node
4019 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4025 data.dsize = sizeof(natgwstate);
4026 data.dptr = (uint8_t *)&natgwstate;
4028 ret = ctdb_control(ctdb, destnode, 0,
4029 CTDB_CONTROL_SET_NATGWSTATE, 0, data,
4030 NULL, NULL, &res, &timeout, NULL);
4031 if (ret != 0 || res != 0) {
4032 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4040 set the lmaster role for a node
4042 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4048 data.dsize = sizeof(lmasterrole);
4049 data.dptr = (uint8_t *)&lmasterrole;
4051 ret = ctdb_control(ctdb, destnode, 0,
4052 CTDB_CONTROL_SET_LMASTERROLE, 0, data,
4053 NULL, NULL, &res, &timeout, NULL);
4054 if (ret != 0 || res != 0) {
4055 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4063 set the recmaster role for a node
4065 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4071 data.dsize = sizeof(recmasterrole);
4072 data.dptr = (uint8_t *)&recmasterrole;
4074 ret = ctdb_control(ctdb, destnode, 0,
4075 CTDB_CONTROL_SET_RECMASTERROLE, 0, data,
4076 NULL, NULL, &res, &timeout, NULL);
4077 if (ret != 0 || res != 0) {
4078 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4085 /* enable an eventscript
4087 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4093 data.dsize = strlen(script) + 1;
4094 data.dptr = discard_const(script);
4096 ret = ctdb_control(ctdb, destnode, 0,
4097 CTDB_CONTROL_ENABLE_SCRIPT, 0, data,
4098 NULL, NULL, &res, &timeout, NULL);
4099 if (ret != 0 || res != 0) {
4100 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4107 /* disable an eventscript
4109 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4115 data.dsize = strlen(script) + 1;
4116 data.dptr = discard_const(script);
4118 ret = ctdb_control(ctdb, destnode, 0,
4119 CTDB_CONTROL_DISABLE_SCRIPT, 0, data,
4120 NULL, NULL, &res, &timeout, NULL);
4121 if (ret != 0 || res != 0) {
4122 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4130 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4136 data.dsize = sizeof(*bantime);
4137 data.dptr = (uint8_t *)bantime;
4139 ret = ctdb_control(ctdb, destnode, 0,
4140 CTDB_CONTROL_SET_BAN_STATE, 0, data,
4141 NULL, NULL, &res, &timeout, NULL);
4142 if (ret != 0 || res != 0) {
4143 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4151 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4156 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4158 ret = ctdb_control(ctdb, destnode, 0,
4159 CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4160 tmp_ctx, &outdata, &res, &timeout, NULL);
4161 if (ret != 0 || res != 0) {
4162 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4163 talloc_free(tmp_ctx);
4167 *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4168 talloc_free(tmp_ctx);
4174 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4179 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4181 data.dptr = (uint8_t*)db_prio;
4182 data.dsize = sizeof(*db_prio);
4184 ret = ctdb_control(ctdb, destnode, 0,
4185 CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4186 tmp_ctx, NULL, &res, &timeout, NULL);
4187 if (ret != 0 || res != 0) {
4188 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4189 talloc_free(tmp_ctx);
4193 talloc_free(tmp_ctx);
4198 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4203 TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4205 data.dptr = (uint8_t*)&db_id;
4206 data.dsize = sizeof(db_id);
4208 ret = ctdb_control(ctdb, destnode, 0,
4209 CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4210 tmp_ctx, NULL, &res, &timeout, NULL);
4211 if (ret != 0 || res < 0) {
4212 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4213 talloc_free(tmp_ctx);
4221 talloc_free(tmp_ctx);
4226 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4232 ret = ctdb_control(ctdb, destnode, 0,
4233 CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null,
4234 mem_ctx, &outdata, &res, &timeout, NULL);
4235 if (ret != 0 || res != 0 || outdata.dsize == 0) {
4236 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4240 *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4241 talloc_free(outdata.dptr);
4246 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)