client: Set the socket non-blocking only after connect succeeds
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb, uint32_t caller)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (header->laccessor != caller) {
109                 header->lacount = 0;
110         }
111         header->laccessor = caller;
112         header->lacount++;
113
114         /* we need to force the record to be written out if this was a remote access,
115            so that the lacount is updated */
116         if (c->new_data == NULL && header->laccessor != ctdb->pnn) {
117                 c->new_data = &c->record_data;
118         }
119
120         if (c->new_data && updatetdb) {
121                 /* XXX check that we always have the lock here? */
122                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
123                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
124                         talloc_free(c);
125                         return -1;
126                 }
127         }
128
129         if (c->reply_data) {
130                 call->reply_data = *c->reply_data;
131
132                 talloc_steal(call, call->reply_data.dptr);
133                 talloc_set_name_const(call->reply_data.dptr, __location__);
134         } else {
135                 call->reply_data.dptr = NULL;
136                 call->reply_data.dsize = 0;
137         }
138         call->status = c->status;
139
140         talloc_free(c);
141
142         return 0;
143 }
144
145
146 /*
147   queue a packet for sending from client to daemon
148 */
149 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
150 {
151         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
152 }
153
154
155 /*
156   called when a CTDB_REPLY_CALL packet comes in in the client
157
158   This packet comes in response to a CTDB_REQ_CALL request packet. It
159   contains any reply data from the call
160 */
161 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
162 {
163         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
164         struct ctdb_client_call_state *state;
165
166         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
167         if (state == NULL) {
168                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
169                 return;
170         }
171
172         if (hdr->reqid != state->reqid) {
173                 /* we found a record  but it was the wrong one */
174                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
175                 return;
176         }
177
178         state->call->reply_data.dptr = c->data;
179         state->call->reply_data.dsize = c->datalen;
180         state->call->status = c->status;
181
182         talloc_steal(state, c);
183
184         state->state = CTDB_CALL_DONE;
185
186         if (state->async.fn) {
187                 state->async.fn(state);
188         }
189 }
190
191 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
192
193 /*
194   this is called in the client, when data comes in from the daemon
195  */
196 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
197 {
198         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
199         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
200         TALLOC_CTX *tmp_ctx;
201
202         /* place the packet as a child of a tmp_ctx. We then use
203            talloc_free() below to free it. If any of the calls want
204            to keep it, then they will steal it somewhere else, and the
205            talloc_free() will be a no-op */
206         tmp_ctx = talloc_new(ctdb);
207         talloc_steal(tmp_ctx, hdr);
208
209         if (cnt == 0) {
210                 DEBUG(DEBUG_INFO,("Daemon has exited - shutting down client\n"));
211                 exit(0);
212         }
213
214         if (cnt < sizeof(*hdr)) {
215                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
216                 goto done;
217         }
218         if (cnt != hdr->length) {
219                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
220                                (unsigned)hdr->length, (unsigned)cnt);
221                 goto done;
222         }
223
224         if (hdr->ctdb_magic != CTDB_MAGIC) {
225                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
226                 goto done;
227         }
228
229         if (hdr->ctdb_version != CTDB_VERSION) {
230                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
231                 goto done;
232         }
233
234         switch (hdr->operation) {
235         case CTDB_REPLY_CALL:
236                 ctdb_client_reply_call(ctdb, hdr);
237                 break;
238
239         case CTDB_REQ_MESSAGE:
240                 ctdb_request_message(ctdb, hdr);
241                 break;
242
243         case CTDB_REPLY_CONTROL:
244                 ctdb_client_reply_control(ctdb, hdr);
245                 break;
246
247         default:
248                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
249         }
250
251 done:
252         talloc_free(tmp_ctx);
253 }
254
255 /*
256   connect to a unix domain socket
257 */
258 int ctdb_socket_connect(struct ctdb_context *ctdb)
259 {
260         struct sockaddr_un addr;
261
262         memset(&addr, 0, sizeof(addr));
263         addr.sun_family = AF_UNIX;
264         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
265
266         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
267         if (ctdb->daemon.sd == -1) {
268                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
269                 return -1;
270         }
271
272         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
273                 close(ctdb->daemon.sd);
274                 ctdb->daemon.sd = -1;
275                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
276                 return -1;
277         }
278
279         set_nonblocking(ctdb->daemon.sd);
280         set_close_on_exec(ctdb->daemon.sd);
281         
282         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
283                                               CTDB_DS_ALIGNMENT, 
284                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
285         return 0;
286 }
287
288
289 struct ctdb_record_handle {
290         struct ctdb_db_context *ctdb_db;
291         TDB_DATA key;
292         TDB_DATA *data;
293         struct ctdb_ltdb_header header;
294 };
295
296
297 /*
298   make a recv call to the local ctdb daemon - called from client context
299
300   This is called when the program wants to wait for a ctdb_call to complete and get the 
301   results. This call will block unless the call has already completed.
302 */
303 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
304 {
305         if (state == NULL) {
306                 return -1;
307         }
308
309         while (state->state < CTDB_CALL_DONE) {
310                 event_loop_once(state->ctdb_db->ctdb->ev);
311         }
312         if (state->state != CTDB_CALL_DONE) {
313                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
314                 talloc_free(state);
315                 return -1;
316         }
317
318         if (state->call->reply_data.dsize) {
319                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
320                                                       state->call->reply_data.dptr,
321                                                       state->call->reply_data.dsize);
322                 call->reply_data.dsize = state->call->reply_data.dsize;
323         } else {
324                 call->reply_data.dptr = NULL;
325                 call->reply_data.dsize = 0;
326         }
327         call->status = state->call->status;
328         talloc_free(state);
329
330         return call->status;
331 }
332
333
334
335
336 /*
337   destroy a ctdb_call in client
338 */
339 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
340 {
341         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
342         return 0;
343 }
344
345 /*
346   construct an event driven local ctdb_call
347
348   this is used so that locally processed ctdb_call requests are processed
349   in an event driven manner
350 */
351 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
352                                                                   struct ctdb_call *call,
353                                                                   struct ctdb_ltdb_header *header,
354                                                                   TDB_DATA *data)
355 {
356         struct ctdb_client_call_state *state;
357         struct ctdb_context *ctdb = ctdb_db->ctdb;
358         int ret;
359
360         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
361         CTDB_NO_MEMORY_NULL(ctdb, state);
362         state->call = talloc_zero(state, struct ctdb_call);
363         CTDB_NO_MEMORY_NULL(ctdb, state->call);
364
365         talloc_steal(state, data->dptr);
366
367         state->state   = CTDB_CALL_DONE;
368         *(state->call) = *call;
369         state->ctdb_db = ctdb_db;
370
371         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true, ctdb->pnn);
372         if (ret != 0) {
373                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
374         }
375
376         return state;
377 }
378
379 /*
380   make a ctdb call to the local daemon - async send. Called from client context.
381
382   This constructs a ctdb_call request and queues it for processing. 
383   This call never blocks.
384 */
385 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
386                                               struct ctdb_call *call)
387 {
388         struct ctdb_client_call_state *state;
389         struct ctdb_context *ctdb = ctdb_db->ctdb;
390         struct ctdb_ltdb_header header;
391         TDB_DATA data;
392         int ret;
393         size_t len;
394         struct ctdb_req_call *c;
395
396         /* if the domain socket is not yet open, open it */
397         if (ctdb->daemon.sd==-1) {
398                 ctdb_socket_connect(ctdb);
399         }
400
401         ret = ctdb_ltdb_lock(ctdb_db, call->key);
402         if (ret != 0) {
403                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
404                 return NULL;
405         }
406
407         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
408
409         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
410                 ret = -1;
411         }
412
413         if (ret == 0 && header.dmaster == ctdb->pnn) {
414                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
415                 talloc_free(data.dptr);
416                 ctdb_ltdb_unlock(ctdb_db, call->key);
417                 return state;
418         }
419
420         ctdb_ltdb_unlock(ctdb_db, call->key);
421         talloc_free(data.dptr);
422
423         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
424         if (state == NULL) {
425                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
426                 return NULL;
427         }
428         state->call = talloc_zero(state, struct ctdb_call);
429         if (state->call == NULL) {
430                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
431                 return NULL;
432         }
433
434         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
435         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
436         if (c == NULL) {
437                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
438                 return NULL;
439         }
440
441         state->reqid     = ctdb_reqid_new(ctdb, state);
442         state->ctdb_db = ctdb_db;
443         talloc_set_destructor(state, ctdb_client_call_destructor);
444
445         c->hdr.reqid     = state->reqid;
446         c->flags         = call->flags;
447         c->db_id         = ctdb_db->db_id;
448         c->callid        = call->call_id;
449         c->hopcount      = 0;
450         c->keylen        = call->key.dsize;
451         c->calldatalen   = call->call_data.dsize;
452         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
453         memcpy(&c->data[call->key.dsize], 
454                call->call_data.dptr, call->call_data.dsize);
455         *(state->call)              = *call;
456         state->call->call_data.dptr = &c->data[call->key.dsize];
457         state->call->key.dptr       = &c->data[0];
458
459         state->state  = CTDB_CALL_WAIT;
460
461
462         ctdb_client_queue_pkt(ctdb, &c->hdr);
463
464         return state;
465 }
466
467
468 /*
469   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
470 */
471 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
472 {
473         struct ctdb_client_call_state *state;
474
475         state = ctdb_call_send(ctdb_db, call);
476         return ctdb_call_recv(state, call);
477 }
478
479
480 /*
481   tell the daemon what messaging srvid we will use, and register the message
482   handler function in the client
483 */
484 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
485                              ctdb_msg_fn_t handler,
486                              void *private_data)
487                                     
488 {
489         int res;
490         int32_t status;
491         
492         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
493                            tdb_null, NULL, NULL, &status, NULL, NULL);
494         if (res != 0 || status != 0) {
495                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
496                 return -1;
497         }
498
499         /* also need to register the handler with our own ctdb structure */
500         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
501 }
502
503 /*
504   tell the daemon we no longer want a srvid
505 */
506 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
507 {
508         int res;
509         int32_t status;
510         
511         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
512                            tdb_null, NULL, NULL, &status, NULL, NULL);
513         if (res != 0 || status != 0) {
514                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
515                 return -1;
516         }
517
518         /* also need to register the handler with our own ctdb structure */
519         ctdb_deregister_message_handler(ctdb, srvid, private_data);
520         return 0;
521 }
522
523
524 /*
525   send a message - from client context
526  */
527 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
528                       uint64_t srvid, TDB_DATA data)
529 {
530         struct ctdb_req_message *r;
531         int len, res;
532
533         len = offsetof(struct ctdb_req_message, data) + data.dsize;
534         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
535                                len, struct ctdb_req_message);
536         CTDB_NO_MEMORY(ctdb, r);
537
538         r->hdr.destnode  = pnn;
539         r->srvid         = srvid;
540         r->datalen       = data.dsize;
541         memcpy(&r->data[0], data.dptr, data.dsize);
542         
543         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
544         if (res != 0) {
545                 return res;
546         }
547
548         talloc_free(r);
549         return 0;
550 }
551
552
553 /*
554   cancel a ctdb_fetch_lock operation, releasing the lock
555  */
556 static int fetch_lock_destructor(struct ctdb_record_handle *h)
557 {
558         ctdb_ltdb_unlock(h->ctdb_db, h->key);
559         return 0;
560 }
561
562 /*
563   force the migration of a record to this node
564  */
565 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
566 {
567         struct ctdb_call call;
568         ZERO_STRUCT(call);
569         call.call_id = CTDB_NULL_FUNC;
570         call.key = key;
571         call.flags = CTDB_IMMEDIATE_MIGRATION;
572         return ctdb_call(ctdb_db, &call);
573 }
574
575 /*
576   try to fetch a readonly copy of a record
577  */
578 static int
579 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
580 {
581         int ret;
582
583         struct ctdb_call call;
584         ZERO_STRUCT(call);
585
586         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
587         call.call_data.dptr = NULL;
588         call.call_data.dsize = 0;
589         call.key = key;
590         call.flags = CTDB_WANT_READONLY;
591         ret = ctdb_call(ctdb_db, &call);
592
593         if (ret != 0) {
594                 return -1;
595         }
596         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
597                 return -1;
598         }
599
600         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
601         if (*hdr == NULL) {
602                 talloc_free(call.reply_data.dptr);
603                 return -1;
604         }
605
606         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
607         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
608         if (data->dptr == NULL) {
609                 talloc_free(call.reply_data.dptr);
610                 talloc_free(hdr);
611                 return -1;
612         }
613
614         return 0;
615 }
616
617 /*
618   get a lock on a record, and return the records data. Blocks until it gets the lock
619  */
620 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
621                                            TDB_DATA key, TDB_DATA *data)
622 {
623         int ret;
624         struct ctdb_record_handle *h;
625
626         /*
627           procedure is as follows:
628
629           1) get the chain lock. 
630           2) check if we are dmaster
631           3) if we are the dmaster then return handle 
632           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
633              reply from ctdbd
634           5) when we get the reply, goto (1)
635          */
636
637         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
638         if (h == NULL) {
639                 return NULL;
640         }
641
642         h->ctdb_db = ctdb_db;
643         h->key     = key;
644         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
645         if (h->key.dptr == NULL) {
646                 talloc_free(h);
647                 return NULL;
648         }
649         h->data    = data;
650
651         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
652                  (const char *)key.dptr));
653
654 again:
655         /* step 1 - get the chain lock */
656         ret = ctdb_ltdb_lock(ctdb_db, key);
657         if (ret != 0) {
658                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
659                 talloc_free(h);
660                 return NULL;
661         }
662
663         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
664
665         talloc_set_destructor(h, fetch_lock_destructor);
666
667         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
668
669         /* when torturing, ensure we test the remote path */
670         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
671             random() % 5 == 0) {
672                 h->header.dmaster = (uint32_t)-1;
673         }
674
675
676         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
677
678         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
679                 ctdb_ltdb_unlock(ctdb_db, key);
680                 ret = ctdb_client_force_migration(ctdb_db, key);
681                 if (ret != 0) {
682                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
683                         talloc_free(h);
684                         return NULL;
685                 }
686                 goto again;
687         }
688
689         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
690         return h;
691 }
692
693 /*
694   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
695  */
696 struct ctdb_record_handle *
697 ctdb_fetch_readonly_lock(
698         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
699         TDB_DATA key, TDB_DATA *data,
700         int read_only)
701 {
702         int ret;
703         struct ctdb_record_handle *h;
704         struct ctdb_ltdb_header *roheader = NULL;
705
706         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
707         if (h == NULL) {
708                 return NULL;
709         }
710
711         h->ctdb_db = ctdb_db;
712         h->key     = key;
713         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
714         if (h->key.dptr == NULL) {
715                 talloc_free(h);
716                 return NULL;
717         }
718         h->data    = data;
719
720         data->dptr = NULL;
721         data->dsize = 0;
722
723
724 again:
725         talloc_free(roheader);
726         roheader = NULL;
727
728         talloc_free(data->dptr);
729         data->dptr = NULL;
730         data->dsize = 0;
731
732         /* Lock the record/chain */
733         ret = ctdb_ltdb_lock(ctdb_db, key);
734         if (ret != 0) {
735                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
736                 talloc_free(h);
737                 return NULL;
738         }
739
740         talloc_set_destructor(h, fetch_lock_destructor);
741
742         /* Check if record exists yet in the TDB */
743         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
744         if (ret != 0) {
745                 ctdb_ltdb_unlock(ctdb_db, key);
746                 ret = ctdb_client_force_migration(ctdb_db, key);
747                 if (ret != 0) {
748                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
749                         talloc_free(h);
750                         return NULL;
751                 }
752                 goto again;
753         }
754
755         /* if this is a request for read/write and we have delegations
756            we have to revoke all delegations first
757         */
758         if ((read_only == 0) 
759         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
760         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
761                 ctdb_ltdb_unlock(ctdb_db, key);
762                 ret = ctdb_client_force_migration(ctdb_db, key);
763                 if (ret != 0) {
764                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
765                         talloc_free(h);
766                         return NULL;
767                 }
768                 goto again;
769         }
770
771         /* if we are dmaster, just return the handle */
772         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
773                 return h;
774         }
775
776         if (read_only != 0) {
777                 TDB_DATA rodata = {NULL, 0};
778
779                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
780                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
781                         return h;
782                 }
783
784                 ctdb_ltdb_unlock(ctdb_db, key);
785                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
786                 if (ret != 0) {
787                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
788                         ret = ctdb_client_force_migration(ctdb_db, key);
789                         if (ret != 0) {
790                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
791                                 talloc_free(h);
792                                 return NULL;
793                         }
794
795                         goto again;
796                 }
797
798                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
799                         ret = ctdb_client_force_migration(ctdb_db, key);
800                         if (ret != 0) {
801                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
802                                 talloc_free(h);
803                                 return NULL;
804                         }
805
806                         goto again;
807                 }
808
809                 ret = ctdb_ltdb_lock(ctdb_db, key);
810                 if (ret != 0) {
811                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
812                         talloc_free(h);
813                         return NULL;
814                 }
815
816                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
817                 if (ret != 0) {
818                         ctdb_ltdb_unlock(ctdb_db, key);
819
820                         ret = ctdb_client_force_migration(ctdb_db, key);
821                         if (ret != 0) {
822                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
823                                 talloc_free(h);
824                                 return NULL;
825                         }
826
827                         goto again;
828                 }
829
830                 return h;
831         }
832
833         /* we are not dmaster and this was not a request for a readonly lock
834          * so unlock the record, migrate it and try again
835          */
836         ctdb_ltdb_unlock(ctdb_db, key);
837         ret = ctdb_client_force_migration(ctdb_db, key);
838         if (ret != 0) {
839                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
840                 talloc_free(h);
841                 return NULL;
842         }
843         goto again;
844 }
845
846 /*
847   store some data to the record that was locked with ctdb_fetch_lock()
848 */
849 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
850 {
851         if (h->ctdb_db->persistent) {
852                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
853                 return -1;
854         }
855
856         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
857 }
858
859 /*
860   non-locking fetch of a record
861  */
862 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
863                TDB_DATA key, TDB_DATA *data)
864 {
865         struct ctdb_call call;
866         int ret;
867
868         call.call_id = CTDB_FETCH_FUNC;
869         call.call_data.dptr = NULL;
870         call.call_data.dsize = 0;
871         call.key = key;
872
873         ret = ctdb_call(ctdb_db, &call);
874
875         if (ret == 0) {
876                 *data = call.reply_data;
877                 talloc_steal(mem_ctx, data->dptr);
878         }
879
880         return ret;
881 }
882
883
884
885 /*
886    called when a control completes or timesout to invoke the callback
887    function the user provided
888 */
889 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
890         struct timeval t, void *private_data)
891 {
892         struct ctdb_client_control_state *state;
893         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
894         int ret;
895
896         state = talloc_get_type(private_data, struct ctdb_client_control_state);
897         talloc_steal(tmp_ctx, state);
898
899         ret = ctdb_control_recv(state->ctdb, state, state,
900                         NULL, 
901                         NULL, 
902                         NULL);
903         if (ret != 0) {
904                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
905         }
906
907         talloc_free(tmp_ctx);
908 }
909
910 /*
911   called when a CTDB_REPLY_CONTROL packet comes in in the client
912
913   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
914   contains any reply data from the control
915 */
916 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
917                                       struct ctdb_req_header *hdr)
918 {
919         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
920         struct ctdb_client_control_state *state;
921
922         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
923         if (state == NULL) {
924                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
925                 return;
926         }
927
928         if (hdr->reqid != state->reqid) {
929                 /* we found a record  but it was the wrong one */
930                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
931                 return;
932         }
933
934         state->outdata.dptr = c->data;
935         state->outdata.dsize = c->datalen;
936         state->status = c->status;
937         if (c->errorlen) {
938                 state->errormsg = talloc_strndup(state, 
939                                                  (char *)&c->data[c->datalen], 
940                                                  c->errorlen);
941         }
942
943         /* state->outdata now uses resources from c so we dont want c
944            to just dissappear from under us while state is still alive
945         */
946         talloc_steal(state, c);
947
948         state->state = CTDB_CONTROL_DONE;
949
950         /* if we had a callback registered for this control, pull the response
951            and call the callback.
952         */
953         if (state->async.fn) {
954                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
955         }
956 }
957
958
959 /*
960   destroy a ctdb_control in client
961 */
962 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
963 {
964         ctdb_reqid_remove(state->ctdb, state->reqid);
965         return 0;
966 }
967
968
969 /* time out handler for ctdb_control */
970 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
971         struct timeval t, void *private_data)
972 {
973         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
974
975         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
976                          "dstnode:%u\n", state->reqid, state->c->opcode,
977                          state->c->hdr.destnode));
978
979         state->state = CTDB_CONTROL_TIMEOUT;
980
981         /* if we had a callback registered for this control, pull the response
982            and call the callback.
983         */
984         if (state->async.fn) {
985                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
986         }
987 }
988
989 /* async version of send control request */
990 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
991                 uint32_t destnode, uint64_t srvid, 
992                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
993                 TALLOC_CTX *mem_ctx,
994                 struct timeval *timeout,
995                 char **errormsg)
996 {
997         struct ctdb_client_control_state *state;
998         size_t len;
999         struct ctdb_req_control *c;
1000         int ret;
1001
1002         if (errormsg) {
1003                 *errormsg = NULL;
1004         }
1005
1006         /* if the domain socket is not yet open, open it */
1007         if (ctdb->daemon.sd==-1) {
1008                 ctdb_socket_connect(ctdb);
1009         }
1010
1011         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1012         CTDB_NO_MEMORY_NULL(ctdb, state);
1013
1014         state->ctdb       = ctdb;
1015         state->reqid      = ctdb_reqid_new(ctdb, state);
1016         state->state      = CTDB_CONTROL_WAIT;
1017         state->errormsg   = NULL;
1018
1019         talloc_set_destructor(state, ctdb_client_control_destructor);
1020
1021         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1022         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1023                                len, struct ctdb_req_control);
1024         state->c            = c;        
1025         CTDB_NO_MEMORY_NULL(ctdb, c);
1026         c->hdr.reqid        = state->reqid;
1027         c->hdr.destnode     = destnode;
1028         c->opcode           = opcode;
1029         c->client_id        = 0;
1030         c->flags            = flags;
1031         c->srvid            = srvid;
1032         c->datalen          = data.dsize;
1033         if (data.dsize) {
1034                 memcpy(&c->data[0], data.dptr, data.dsize);
1035         }
1036
1037         /* timeout */
1038         if (timeout && !timeval_is_zero(timeout)) {
1039                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1040         }
1041
1042         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1043         if (ret != 0) {
1044                 talloc_free(state);
1045                 return NULL;
1046         }
1047
1048         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1049                 talloc_free(state);
1050                 return NULL;
1051         }
1052
1053         return state;
1054 }
1055
1056
1057 /* async version of receive control reply */
1058 int ctdb_control_recv(struct ctdb_context *ctdb, 
1059                 struct ctdb_client_control_state *state, 
1060                 TALLOC_CTX *mem_ctx,
1061                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1062 {
1063         TALLOC_CTX *tmp_ctx;
1064
1065         if (status != NULL) {
1066                 *status = -1;
1067         }
1068         if (errormsg != NULL) {
1069                 *errormsg = NULL;
1070         }
1071
1072         if (state == NULL) {
1073                 return -1;
1074         }
1075
1076         /* prevent double free of state */
1077         tmp_ctx = talloc_new(ctdb);
1078         talloc_steal(tmp_ctx, state);
1079
1080         /* loop one event at a time until we either timeout or the control
1081            completes.
1082         */
1083         while (state->state == CTDB_CONTROL_WAIT) {
1084                 event_loop_once(ctdb->ev);
1085         }
1086
1087         if (state->state != CTDB_CONTROL_DONE) {
1088                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1089                 if (state->async.fn) {
1090                         state->async.fn(state);
1091                 }
1092                 talloc_free(tmp_ctx);
1093                 return -1;
1094         }
1095
1096         if (state->errormsg) {
1097                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1098                 if (errormsg) {
1099                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1100                 }
1101                 if (state->async.fn) {
1102                         state->async.fn(state);
1103                 }
1104                 talloc_free(tmp_ctx);
1105                 return -1;
1106         }
1107
1108         if (outdata) {
1109                 *outdata = state->outdata;
1110                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1111         }
1112
1113         if (status) {
1114                 *status = state->status;
1115         }
1116
1117         if (state->async.fn) {
1118                 state->async.fn(state);
1119         }
1120
1121         talloc_free(tmp_ctx);
1122         return 0;
1123 }
1124
1125
1126
1127 /*
1128   send a ctdb control message
1129   timeout specifies how long we should wait for a reply.
1130   if timeout is NULL we wait indefinitely
1131  */
1132 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1133                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1134                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1135                  struct timeval *timeout,
1136                  char **errormsg)
1137 {
1138         struct ctdb_client_control_state *state;
1139
1140         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1141                         flags, data, mem_ctx,
1142                         timeout, errormsg);
1143         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1144                         errormsg);
1145 }
1146
1147
1148
1149
1150 /*
1151   a process exists call. Returns 0 if process exists, -1 otherwise
1152  */
1153 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1154 {
1155         int ret;
1156         TDB_DATA data;
1157         int32_t status;
1158
1159         data.dptr = (uint8_t*)&pid;
1160         data.dsize = sizeof(pid);
1161
1162         ret = ctdb_control(ctdb, destnode, 0, 
1163                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1164                            NULL, NULL, &status, NULL, NULL);
1165         if (ret != 0) {
1166                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1167                 return -1;
1168         }
1169
1170         return status;
1171 }
1172
1173 /*
1174   get remote statistics
1175  */
1176 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1177 {
1178         int ret;
1179         TDB_DATA data;
1180         int32_t res;
1181
1182         ret = ctdb_control(ctdb, destnode, 0, 
1183                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1184                            ctdb, &data, &res, NULL, NULL);
1185         if (ret != 0 || res != 0) {
1186                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1187                 return -1;
1188         }
1189
1190         if (data.dsize != sizeof(struct ctdb_statistics)) {
1191                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1192                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1193                       return -1;
1194         }
1195
1196         *status = *(struct ctdb_statistics *)data.dptr;
1197         talloc_free(data.dptr);
1198                         
1199         return 0;
1200 }
1201
1202 /*
1203   shutdown a remote ctdb node
1204  */
1205 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1206 {
1207         struct ctdb_client_control_state *state;
1208
1209         state = ctdb_control_send(ctdb, destnode, 0, 
1210                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1211                            NULL, &timeout, NULL);
1212         if (state == NULL) {
1213                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1214                 return -1;
1215         }
1216
1217         return 0;
1218 }
1219
1220 /*
1221   get vnn map from a remote node
1222  */
1223 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1224 {
1225         int ret;
1226         TDB_DATA outdata;
1227         int32_t res;
1228         struct ctdb_vnn_map_wire *map;
1229
1230         ret = ctdb_control(ctdb, destnode, 0, 
1231                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1232                            mem_ctx, &outdata, &res, &timeout, NULL);
1233         if (ret != 0 || res != 0) {
1234                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1235                 return -1;
1236         }
1237         
1238         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1239         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1240             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1241                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1242                 return -1;
1243         }
1244
1245         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1246         CTDB_NO_MEMORY(ctdb, *vnnmap);
1247         (*vnnmap)->generation = map->generation;
1248         (*vnnmap)->size       = map->size;
1249         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1250
1251         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1252         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1253         talloc_free(outdata.dptr);
1254                     
1255         return 0;
1256 }
1257
1258
1259 /*
1260   get the recovery mode of a remote node
1261  */
1262 struct ctdb_client_control_state *
1263 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1264 {
1265         return ctdb_control_send(ctdb, destnode, 0, 
1266                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1267                            mem_ctx, &timeout, NULL);
1268 }
1269
1270 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1271 {
1272         int ret;
1273         int32_t res;
1274
1275         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1276         if (ret != 0) {
1277                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1278                 return -1;
1279         }
1280
1281         if (recmode) {
1282                 *recmode = (uint32_t)res;
1283         }
1284
1285         return 0;
1286 }
1287
1288 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1289 {
1290         struct ctdb_client_control_state *state;
1291
1292         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1293         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1294 }
1295
1296
1297
1298
1299 /*
1300   set the recovery mode of a remote node
1301  */
1302 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1303 {
1304         int ret;
1305         TDB_DATA data;
1306         int32_t res;
1307
1308         data.dsize = sizeof(uint32_t);
1309         data.dptr = (unsigned char *)&recmode;
1310
1311         ret = ctdb_control(ctdb, destnode, 0, 
1312                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1313                            NULL, NULL, &res, &timeout, NULL);
1314         if (ret != 0 || res != 0) {
1315                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1316                 return -1;
1317         }
1318
1319         return 0;
1320 }
1321
1322
1323
1324 /*
1325   get the recovery master of a remote node
1326  */
1327 struct ctdb_client_control_state *
1328 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1329                         struct timeval timeout, uint32_t destnode)
1330 {
1331         return ctdb_control_send(ctdb, destnode, 0, 
1332                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1333                            mem_ctx, &timeout, NULL);
1334 }
1335
1336 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1337 {
1338         int ret;
1339         int32_t res;
1340
1341         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1342         if (ret != 0) {
1343                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1344                 return -1;
1345         }
1346
1347         if (recmaster) {
1348                 *recmaster = (uint32_t)res;
1349         }
1350
1351         return 0;
1352 }
1353
1354 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1355 {
1356         struct ctdb_client_control_state *state;
1357
1358         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1359         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1360 }
1361
1362
1363 /*
1364   set the recovery master of a remote node
1365  */
1366 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1367 {
1368         int ret;
1369         TDB_DATA data;
1370         int32_t res;
1371
1372         ZERO_STRUCT(data);
1373         data.dsize = sizeof(uint32_t);
1374         data.dptr = (unsigned char *)&recmaster;
1375
1376         ret = ctdb_control(ctdb, destnode, 0, 
1377                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1378                            NULL, NULL, &res, &timeout, NULL);
1379         if (ret != 0 || res != 0) {
1380                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1381                 return -1;
1382         }
1383
1384         return 0;
1385 }
1386
1387
1388 /*
1389   get a list of databases off a remote node
1390  */
1391 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1392                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1393 {
1394         int ret;
1395         TDB_DATA outdata;
1396         int32_t res;
1397
1398         ret = ctdb_control(ctdb, destnode, 0, 
1399                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1400                            mem_ctx, &outdata, &res, &timeout, NULL);
1401         if (ret != 0 || res != 0) {
1402                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1403                 return -1;
1404         }
1405
1406         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1407         talloc_free(outdata.dptr);
1408                     
1409         return 0;
1410 }
1411
1412 /*
1413   get a list of nodes (vnn and flags ) from a remote node
1414  */
1415 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1416                 struct timeval timeout, uint32_t destnode, 
1417                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1418 {
1419         int ret;
1420         TDB_DATA outdata;
1421         int32_t res;
1422
1423         ret = ctdb_control(ctdb, destnode, 0, 
1424                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1425                            mem_ctx, &outdata, &res, &timeout, NULL);
1426         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1427                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1428                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1429         }
1430         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1431                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1432                 return -1;
1433         }
1434
1435         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1436         talloc_free(outdata.dptr);
1437                     
1438         return 0;
1439 }
1440
1441 /*
1442   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1443  */
1444 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1445                 struct timeval timeout, uint32_t destnode, 
1446                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1447 {
1448         int ret, i, len;
1449         TDB_DATA outdata;
1450         struct ctdb_node_mapv4 *nodemapv4;
1451         int32_t res;
1452
1453         ret = ctdb_control(ctdb, destnode, 0, 
1454                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1455                            mem_ctx, &outdata, &res, &timeout, NULL);
1456         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1457                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1458                 return -1;
1459         }
1460
1461         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1462
1463         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1464         (*nodemap) = talloc_zero_size(mem_ctx, len);
1465         CTDB_NO_MEMORY(ctdb, (*nodemap));
1466
1467         (*nodemap)->num = nodemapv4->num;
1468         for (i=0; i<nodemapv4->num; i++) {
1469                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1470                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1471                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1472                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1473         }
1474                 
1475         talloc_free(outdata.dptr);
1476                     
1477         return 0;
1478 }
1479
1480 /*
1481   drop the transport, reload the nodes file and restart the transport
1482  */
1483 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1484                     struct timeval timeout, uint32_t destnode)
1485 {
1486         int ret;
1487         int32_t res;
1488
1489         ret = ctdb_control(ctdb, destnode, 0, 
1490                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1491                            NULL, NULL, &res, &timeout, NULL);
1492         if (ret != 0 || res != 0) {
1493                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1494                 return -1;
1495         }
1496
1497         return 0;
1498 }
1499
1500
1501 /*
1502   set vnn map on a node
1503  */
1504 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1505                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1506 {
1507         int ret;
1508         TDB_DATA data;
1509         int32_t res;
1510         struct ctdb_vnn_map_wire *map;
1511         size_t len;
1512
1513         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1514         map = talloc_size(mem_ctx, len);
1515         CTDB_NO_MEMORY(ctdb, map);
1516
1517         map->generation = vnnmap->generation;
1518         map->size = vnnmap->size;
1519         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1520         
1521         data.dsize = len;
1522         data.dptr  = (uint8_t *)map;
1523
1524         ret = ctdb_control(ctdb, destnode, 0, 
1525                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1526                            NULL, NULL, &res, &timeout, NULL);
1527         if (ret != 0 || res != 0) {
1528                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1529                 return -1;
1530         }
1531
1532         talloc_free(map);
1533
1534         return 0;
1535 }
1536
1537
1538 /*
1539   async send for pull database
1540  */
1541 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1542         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1543         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1544 {
1545         TDB_DATA indata;
1546         struct ctdb_control_pulldb *pull;
1547         struct ctdb_client_control_state *state;
1548
1549         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1550         CTDB_NO_MEMORY_NULL(ctdb, pull);
1551
1552         pull->db_id   = dbid;
1553         pull->lmaster = lmaster;
1554
1555         indata.dsize = sizeof(struct ctdb_control_pulldb);
1556         indata.dptr  = (unsigned char *)pull;
1557
1558         state = ctdb_control_send(ctdb, destnode, 0, 
1559                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1560                                   mem_ctx, &timeout, NULL);
1561         talloc_free(pull);
1562
1563         return state;
1564 }
1565
1566 /*
1567   async recv for pull database
1568  */
1569 int ctdb_ctrl_pulldb_recv(
1570         struct ctdb_context *ctdb, 
1571         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1572         TDB_DATA *outdata)
1573 {
1574         int ret;
1575         int32_t res;
1576
1577         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1578         if ( (ret != 0) || (res != 0) ){
1579                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1580                 return -1;
1581         }
1582
1583         return 0;
1584 }
1585
1586 /*
1587   pull all keys and records for a specific database on a node
1588  */
1589 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1590                 uint32_t dbid, uint32_t lmaster, 
1591                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1592                 TDB_DATA *outdata)
1593 {
1594         struct ctdb_client_control_state *state;
1595
1596         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1597                                       timeout);
1598         
1599         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1600 }
1601
1602
1603 /*
1604   change dmaster for all keys in the database to the new value
1605  */
1606 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1607                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1608 {
1609         int ret;
1610         TDB_DATA indata;
1611         int32_t res;
1612
1613         indata.dsize = 2*sizeof(uint32_t);
1614         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1615
1616         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1617         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1618
1619         ret = ctdb_control(ctdb, destnode, 0, 
1620                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1621                            NULL, NULL, &res, &timeout, NULL);
1622         if (ret != 0 || res != 0) {
1623                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1624                 return -1;
1625         }
1626
1627         return 0;
1628 }
1629
1630 /*
1631   ping a node, return number of clients connected
1632  */
1633 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1634 {
1635         int ret;
1636         int32_t res;
1637
1638         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1639                            tdb_null, NULL, NULL, &res, NULL, NULL);
1640         if (ret != 0) {
1641                 return -1;
1642         }
1643         return res;
1644 }
1645
1646 /*
1647   find the real path to a ltdb 
1648  */
1649 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1650                    const char **path)
1651 {
1652         int ret;
1653         int32_t res;
1654         TDB_DATA data;
1655
1656         data.dptr = (uint8_t *)&dbid;
1657         data.dsize = sizeof(dbid);
1658
1659         ret = ctdb_control(ctdb, destnode, 0, 
1660                            CTDB_CONTROL_GETDBPATH, 0, data, 
1661                            mem_ctx, &data, &res, &timeout, NULL);
1662         if (ret != 0 || res != 0) {
1663                 return -1;
1664         }
1665
1666         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1667         if ((*path) == NULL) {
1668                 return -1;
1669         }
1670
1671         talloc_free(data.dptr);
1672
1673         return 0;
1674 }
1675
1676 /*
1677   find the name of a db 
1678  */
1679 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1680                    const char **name)
1681 {
1682         int ret;
1683         int32_t res;
1684         TDB_DATA data;
1685
1686         data.dptr = (uint8_t *)&dbid;
1687         data.dsize = sizeof(dbid);
1688
1689         ret = ctdb_control(ctdb, destnode, 0, 
1690                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1691                            mem_ctx, &data, &res, &timeout, NULL);
1692         if (ret != 0 || res != 0) {
1693                 return -1;
1694         }
1695
1696         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1697         if ((*name) == NULL) {
1698                 return -1;
1699         }
1700
1701         talloc_free(data.dptr);
1702
1703         return 0;
1704 }
1705
1706 /*
1707   get the health status of a db
1708  */
1709 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1710                           struct timeval timeout,
1711                           uint32_t destnode,
1712                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1713                           const char **reason)
1714 {
1715         int ret;
1716         int32_t res;
1717         TDB_DATA data;
1718
1719         data.dptr = (uint8_t *)&dbid;
1720         data.dsize = sizeof(dbid);
1721
1722         ret = ctdb_control(ctdb, destnode, 0,
1723                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1724                            mem_ctx, &data, &res, &timeout, NULL);
1725         if (ret != 0 || res != 0) {
1726                 return -1;
1727         }
1728
1729         if (data.dsize == 0) {
1730                 (*reason) = NULL;
1731                 return 0;
1732         }
1733
1734         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1735         if ((*reason) == NULL) {
1736                 return -1;
1737         }
1738
1739         talloc_free(data.dptr);
1740
1741         return 0;
1742 }
1743
1744 /*
1745   create a database
1746  */
1747 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1748                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1749 {
1750         int ret;
1751         int32_t res;
1752         TDB_DATA data;
1753
1754         data.dptr = discard_const(name);
1755         data.dsize = strlen(name)+1;
1756
1757         ret = ctdb_control(ctdb, destnode, 0, 
1758                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1759                            0, data, 
1760                            mem_ctx, &data, &res, &timeout, NULL);
1761
1762         if (ret != 0 || res != 0) {
1763                 return -1;
1764         }
1765
1766         return 0;
1767 }
1768
1769 /*
1770   get debug level on a node
1771  */
1772 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1773 {
1774         int ret;
1775         int32_t res;
1776         TDB_DATA data;
1777
1778         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1779                            ctdb, &data, &res, NULL, NULL);
1780         if (ret != 0 || res != 0) {
1781                 return -1;
1782         }
1783         if (data.dsize != sizeof(int32_t)) {
1784                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1785                          (unsigned)data.dsize));
1786                 return -1;
1787         }
1788         *level = *(int32_t *)data.dptr;
1789         talloc_free(data.dptr);
1790         return 0;
1791 }
1792
1793 /*
1794   set debug level on a node
1795  */
1796 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1797 {
1798         int ret;
1799         int32_t res;
1800         TDB_DATA data;
1801
1802         data.dptr = (uint8_t *)&level;
1803         data.dsize = sizeof(level);
1804
1805         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1806                            NULL, NULL, &res, NULL, NULL);
1807         if (ret != 0 || res != 0) {
1808                 return -1;
1809         }
1810         return 0;
1811 }
1812
1813
1814 /*
1815   get a list of connected nodes
1816  */
1817 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1818                                 struct timeval timeout,
1819                                 TALLOC_CTX *mem_ctx,
1820                                 uint32_t *num_nodes)
1821 {
1822         struct ctdb_node_map *map=NULL;
1823         int ret, i;
1824         uint32_t *nodes;
1825
1826         *num_nodes = 0;
1827
1828         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1829         if (ret != 0) {
1830                 return NULL;
1831         }
1832
1833         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1834         if (nodes == NULL) {
1835                 return NULL;
1836         }
1837
1838         for (i=0;i<map->num;i++) {
1839                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1840                         nodes[*num_nodes] = map->nodes[i].pnn;
1841                         (*num_nodes)++;
1842                 }
1843         }
1844
1845         return nodes;
1846 }
1847
1848
1849 /*
1850   reset remote status
1851  */
1852 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1853 {
1854         int ret;
1855         int32_t res;
1856
1857         ret = ctdb_control(ctdb, destnode, 0, 
1858                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1859                            NULL, NULL, &res, NULL, NULL);
1860         if (ret != 0 || res != 0) {
1861                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1862                 return -1;
1863         }
1864         return 0;
1865 }
1866
1867 /*
1868   attach to a specific database - client call
1869 */
1870 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1871                                     struct timeval timeout,
1872                                     const char *name,
1873                                     bool persistent,
1874                                     uint32_t tdb_flags)
1875 {
1876         struct ctdb_db_context *ctdb_db;
1877         TDB_DATA data;
1878         int ret;
1879         int32_t res;
1880
1881         ctdb_db = ctdb_db_handle(ctdb, name);
1882         if (ctdb_db) {
1883                 return ctdb_db;
1884         }
1885
1886         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1887         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1888
1889         ctdb_db->ctdb = ctdb;
1890         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1891         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1892
1893         data.dptr = discard_const(name);
1894         data.dsize = strlen(name)+1;
1895
1896         /* tell ctdb daemon to attach */
1897         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1898                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1899                            0, data, ctdb_db, &data, &res, NULL, NULL);
1900         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1901                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1902                 talloc_free(ctdb_db);
1903                 return NULL;
1904         }
1905         
1906         ctdb_db->db_id = *(uint32_t *)data.dptr;
1907         talloc_free(data.dptr);
1908
1909         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1910         if (ret != 0) {
1911                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1912                 talloc_free(ctdb_db);
1913                 return NULL;
1914         }
1915
1916         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1917         if (ctdb->valgrinding) {
1918                 tdb_flags |= TDB_NOMMAP;
1919         }
1920         tdb_flags |= TDB_DISALLOW_NESTING;
1921
1922         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1923         if (ctdb_db->ltdb == NULL) {
1924                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1925                 talloc_free(ctdb_db);
1926                 return NULL;
1927         }
1928
1929         ctdb_db->persistent = persistent;
1930
1931         DLIST_ADD(ctdb->db_list, ctdb_db);
1932
1933         /* add well known functions */
1934         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1935         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1936         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1937
1938         return ctdb_db;
1939 }
1940
1941
1942 /*
1943   setup a call for a database
1944  */
1945 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1946 {
1947         struct ctdb_registered_call *call;
1948
1949 #if 0
1950         TDB_DATA data;
1951         int32_t status;
1952         struct ctdb_control_set_call c;
1953         int ret;
1954
1955         /* this is no longer valid with the separate daemon architecture */
1956         c.db_id = ctdb_db->db_id;
1957         c.fn    = fn;
1958         c.id    = id;
1959
1960         data.dptr = (uint8_t *)&c;
1961         data.dsize = sizeof(c);
1962
1963         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1964                            data, NULL, NULL, &status, NULL, NULL);
1965         if (ret != 0 || status != 0) {
1966                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1967                 return -1;
1968         }
1969 #endif
1970
1971         /* also register locally */
1972         call = talloc(ctdb_db, struct ctdb_registered_call);
1973         call->fn = fn;
1974         call->id = id;
1975
1976         DLIST_ADD(ctdb_db->calls, call);        
1977         return 0;
1978 }
1979
1980
1981 struct traverse_state {
1982         bool done;
1983         uint32_t count;
1984         ctdb_traverse_func fn;
1985         void *private_data;
1986         bool listemptyrecords;
1987 };
1988
1989 /*
1990   called on each key during a ctdb_traverse
1991  */
1992 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1993 {
1994         struct traverse_state *state = (struct traverse_state *)p;
1995         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1996         TDB_DATA key;
1997
1998         if (data.dsize < sizeof(uint32_t) ||
1999             d->length != data.dsize) {
2000                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2001                 state->done = true;
2002                 return;
2003         }
2004
2005         key.dsize = d->keylen;
2006         key.dptr  = &d->data[0];
2007         data.dsize = d->datalen;
2008         data.dptr = &d->data[d->keylen];
2009
2010         if (key.dsize == 0 && data.dsize == 0) {
2011                 /* end of traverse */
2012                 state->done = true;
2013                 return;
2014         }
2015
2016         if (!state->listemptyrecords &&
2017             data.dsize == sizeof(struct ctdb_ltdb_header))
2018         {
2019                 /* empty records are deleted records in ctdb */
2020                 return;
2021         }
2022
2023         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2024                 state->done = true;
2025         }
2026
2027         state->count++;
2028 }
2029
2030 /**
2031  * start a cluster wide traverse, calling the supplied fn on each record
2032  * return the number of records traversed, or -1 on error
2033  *
2034  * Extendet variant with a flag to signal whether empty records should
2035  * be listed.
2036  */
2037 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2038                              ctdb_traverse_func fn,
2039                              bool withemptyrecords,
2040                              void *private_data)
2041 {
2042         TDB_DATA data;
2043         struct ctdb_traverse_start_ext t;
2044         int32_t status;
2045         int ret;
2046         uint64_t srvid = (getpid() | 0xFLL<<60);
2047         struct traverse_state state;
2048
2049         state.done = false;
2050         state.count = 0;
2051         state.private_data = private_data;
2052         state.fn = fn;
2053         state.listemptyrecords = withemptyrecords;
2054
2055         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2056         if (ret != 0) {
2057                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2058                 return -1;
2059         }
2060
2061         t.db_id = ctdb_db->db_id;
2062         t.srvid = srvid;
2063         t.reqid = 0;
2064         t.withemptyrecords = withemptyrecords;
2065
2066         data.dptr = (uint8_t *)&t;
2067         data.dsize = sizeof(t);
2068
2069         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2070                            data, NULL, NULL, &status, NULL, NULL);
2071         if (ret != 0 || status != 0) {
2072                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2073                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2074                 return -1;
2075         }
2076
2077         while (!state.done) {
2078                 event_loop_once(ctdb_db->ctdb->ev);
2079         }
2080
2081         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2084                 return -1;
2085         }
2086
2087         return state.count;
2088 }
2089
2090 /**
2091  * start a cluster wide traverse, calling the supplied fn on each record
2092  * return the number of records traversed, or -1 on error
2093  *
2094  * Standard version which does not list the empty records:
2095  * These are considered deleted.
2096  */
2097 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2098 {
2099         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2100 }
2101
2102 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2103 /*
2104   called on each key during a catdb
2105  */
2106 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2107 {
2108         int i;
2109         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2110         FILE *f = c->f;
2111         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2112
2113         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2114         for (i=0;i<key.dsize;i++) {
2115                 if (ISASCII(key.dptr[i])) {
2116                         fprintf(f, "%c", key.dptr[i]);
2117                 } else {
2118                         fprintf(f, "\\%02X", key.dptr[i]);
2119                 }
2120         }
2121         fprintf(f, "\"\n");
2122
2123         fprintf(f, "dmaster: %u\n", h->dmaster);
2124         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2125
2126         if (c->printlmaster && ctdb->vnn_map != NULL) {
2127                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2128         }
2129
2130         if (c->printhash) {
2131                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2132         }
2133
2134         if (c->printrecordflags) {
2135                 fprintf(f, "flags: 0x%08x", h->flags);
2136                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2137                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2138                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2139                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2140                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2141                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2142                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2143                 fprintf(f, "\n");
2144         }
2145
2146         if (c->printdatasize) {
2147                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2148         } else {
2149                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2150                 for (i=sizeof(*h);i<data.dsize;i++) {
2151                         if (ISASCII(data.dptr[i])) {
2152                                 fprintf(f, "%c", data.dptr[i]);
2153                         } else {
2154                                 fprintf(f, "\\%02X", data.dptr[i]);
2155                         }
2156                 }
2157                 fprintf(f, "\"\n");
2158         }
2159
2160         fprintf(f, "\n");
2161
2162         return 0;
2163 }
2164
2165 /*
2166   convenience function to list all keys to stdout
2167  */
2168 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2169                  struct ctdb_dump_db_context *ctx)
2170 {
2171         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2172                                  ctx->printemptyrecords, ctx);
2173 }
2174
2175 /*
2176   get the pid of a ctdb daemon
2177  */
2178 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2179 {
2180         int ret;
2181         int32_t res;
2182
2183         ret = ctdb_control(ctdb, destnode, 0, 
2184                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2185                            NULL, NULL, &res, &timeout, NULL);
2186         if (ret != 0) {
2187                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2188                 return -1;
2189         }
2190
2191         *pid = res;
2192
2193         return 0;
2194 }
2195
2196
2197 /*
2198   async freeze send control
2199  */
2200 struct ctdb_client_control_state *
2201 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2202 {
2203         return ctdb_control_send(ctdb, destnode, priority, 
2204                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2205                            mem_ctx, &timeout, NULL);
2206 }
2207
2208 /* 
2209    async freeze recv control
2210 */
2211 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2212 {
2213         int ret;
2214         int32_t res;
2215
2216         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2217         if ( (ret != 0) || (res != 0) ){
2218                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2219                 return -1;
2220         }
2221
2222         return 0;
2223 }
2224
2225 /*
2226   freeze databases of a certain priority
2227  */
2228 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2229 {
2230         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2231         struct ctdb_client_control_state *state;
2232         int ret;
2233
2234         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2235         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2236         talloc_free(tmp_ctx);
2237
2238         return ret;
2239 }
2240
2241 /* Freeze all databases */
2242 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2243 {
2244         int i;
2245
2246         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2247                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2248                         return -1;
2249                 }
2250         }
2251         return 0;
2252 }
2253
2254 /*
2255   thaw databases of a certain priority
2256  */
2257 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2258 {
2259         int ret;
2260         int32_t res;
2261
2262         ret = ctdb_control(ctdb, destnode, priority, 
2263                            CTDB_CONTROL_THAW, 0, tdb_null, 
2264                            NULL, NULL, &res, &timeout, NULL);
2265         if (ret != 0 || res != 0) {
2266                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2267                 return -1;
2268         }
2269
2270         return 0;
2271 }
2272
2273 /* thaw all databases */
2274 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2275 {
2276         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2277 }
2278
2279 /*
2280   get pnn of a node, or -1
2281  */
2282 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2283 {
2284         int ret;
2285         int32_t res;
2286
2287         ret = ctdb_control(ctdb, destnode, 0, 
2288                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2289                            NULL, NULL, &res, &timeout, NULL);
2290         if (ret != 0) {
2291                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2292                 return -1;
2293         }
2294
2295         return res;
2296 }
2297
2298 /*
2299   get the monitoring mode of a remote node
2300  */
2301 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2302 {
2303         int ret;
2304         int32_t res;
2305
2306         ret = ctdb_control(ctdb, destnode, 0, 
2307                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2308                            NULL, NULL, &res, &timeout, NULL);
2309         if (ret != 0) {
2310                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2311                 return -1;
2312         }
2313
2314         *monmode = res;
2315
2316         return 0;
2317 }
2318
2319
2320 /*
2321  set the monitoring mode of a remote node to active
2322  */
2323 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2324 {
2325         int ret;
2326         
2327
2328         ret = ctdb_control(ctdb, destnode, 0, 
2329                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2330                            NULL, NULL,NULL, &timeout, NULL);
2331         if (ret != 0) {
2332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2333                 return -1;
2334         }
2335
2336         
2337
2338         return 0;
2339 }
2340
2341 /*
2342   set the monitoring mode of a remote node to disable
2343  */
2344 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2345 {
2346         int ret;
2347         
2348
2349         ret = ctdb_control(ctdb, destnode, 0, 
2350                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2351                            NULL, NULL, NULL, &timeout, NULL);
2352         if (ret != 0) {
2353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2354                 return -1;
2355         }
2356
2357         
2358
2359         return 0;
2360 }
2361
2362
2363
2364 /* 
2365   sent to a node to make it take over an ip address
2366 */
2367 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2368                           uint32_t destnode, struct ctdb_public_ip *ip)
2369 {
2370         TDB_DATA data;
2371         struct ctdb_public_ipv4 ipv4;
2372         int ret;
2373         int32_t res;
2374
2375         if (ip->addr.sa.sa_family == AF_INET) {
2376                 ipv4.pnn = ip->pnn;
2377                 ipv4.sin = ip->addr.ip;
2378
2379                 data.dsize = sizeof(ipv4);
2380                 data.dptr  = (uint8_t *)&ipv4;
2381
2382                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2383                            NULL, &res, &timeout, NULL);
2384         } else {
2385                 data.dsize = sizeof(*ip);
2386                 data.dptr  = (uint8_t *)ip;
2387
2388                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2389                            NULL, &res, &timeout, NULL);
2390         }
2391
2392         if (ret != 0 || res != 0) {
2393                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2394                 return -1;
2395         }
2396
2397         return 0;       
2398 }
2399
2400
2401 /* 
2402   sent to a node to make it release an ip address
2403 */
2404 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2405                          uint32_t destnode, struct ctdb_public_ip *ip)
2406 {
2407         TDB_DATA data;
2408         struct ctdb_public_ipv4 ipv4;
2409         int ret;
2410         int32_t res;
2411
2412         if (ip->addr.sa.sa_family == AF_INET) {
2413                 ipv4.pnn = ip->pnn;
2414                 ipv4.sin = ip->addr.ip;
2415
2416                 data.dsize = sizeof(ipv4);
2417                 data.dptr  = (uint8_t *)&ipv4;
2418
2419                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2420                                    NULL, &res, &timeout, NULL);
2421         } else {
2422                 data.dsize = sizeof(*ip);
2423                 data.dptr  = (uint8_t *)ip;
2424
2425                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2426                                    NULL, &res, &timeout, NULL);
2427         }
2428
2429         if (ret != 0 || res != 0) {
2430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2431                 return -1;
2432         }
2433
2434         return 0;       
2435 }
2436
2437
2438 /*
2439   get a tunable
2440  */
2441 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2442                           struct timeval timeout, 
2443                           uint32_t destnode,
2444                           const char *name, uint32_t *value)
2445 {
2446         struct ctdb_control_get_tunable *t;
2447         TDB_DATA data, outdata;
2448         int32_t res;
2449         int ret;
2450
2451         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2452         data.dptr  = talloc_size(ctdb, data.dsize);
2453         CTDB_NO_MEMORY(ctdb, data.dptr);
2454
2455         t = (struct ctdb_control_get_tunable *)data.dptr;
2456         t->length = strlen(name)+1;
2457         memcpy(t->name, name, t->length);
2458
2459         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2460                            &outdata, &res, &timeout, NULL);
2461         talloc_free(data.dptr);
2462         if (ret != 0 || res != 0) {
2463                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2464                 return -1;
2465         }
2466
2467         if (outdata.dsize != sizeof(uint32_t)) {
2468                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2469                 talloc_free(outdata.dptr);
2470                 return -1;
2471         }
2472         
2473         *value = *(uint32_t *)outdata.dptr;
2474         talloc_free(outdata.dptr);
2475
2476         return 0;
2477 }
2478
2479 /*
2480   set a tunable
2481  */
2482 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2483                           struct timeval timeout, 
2484                           uint32_t destnode,
2485                           const char *name, uint32_t value)
2486 {
2487         struct ctdb_control_set_tunable *t;
2488         TDB_DATA data;
2489         int32_t res;
2490         int ret;
2491
2492         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2493         data.dptr  = talloc_size(ctdb, data.dsize);
2494         CTDB_NO_MEMORY(ctdb, data.dptr);
2495
2496         t = (struct ctdb_control_set_tunable *)data.dptr;
2497         t->length = strlen(name)+1;
2498         memcpy(t->name, name, t->length);
2499         t->value = value;
2500
2501         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2502                            NULL, &res, &timeout, NULL);
2503         talloc_free(data.dptr);
2504         if (ret != 0 || res != 0) {
2505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2506                 return -1;
2507         }
2508
2509         return 0;
2510 }
2511
2512 /*
2513   list tunables
2514  */
2515 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2516                             struct timeval timeout, 
2517                             uint32_t destnode,
2518                             TALLOC_CTX *mem_ctx,
2519                             const char ***list, uint32_t *count)
2520 {
2521         TDB_DATA outdata;
2522         int32_t res;
2523         int ret;
2524         struct ctdb_control_list_tunable *t;
2525         char *p, *s, *ptr;
2526
2527         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2528                            mem_ctx, &outdata, &res, &timeout, NULL);
2529         if (ret != 0 || res != 0) {
2530                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2531                 return -1;
2532         }
2533
2534         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2535         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2536             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2537                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2538                 talloc_free(outdata.dptr);
2539                 return -1;              
2540         }
2541         
2542         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2543         CTDB_NO_MEMORY(ctdb, p);
2544
2545         talloc_free(outdata.dptr);
2546         
2547         (*list) = NULL;
2548         (*count) = 0;
2549
2550         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2551                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2552                 CTDB_NO_MEMORY(ctdb, *list);
2553                 (*list)[*count] = talloc_strdup(*list, s);
2554                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2555                 (*count)++;
2556         }
2557
2558         talloc_free(p);
2559
2560         return 0;
2561 }
2562
2563
2564 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2565                                    struct timeval timeout, uint32_t destnode,
2566                                    TALLOC_CTX *mem_ctx,
2567                                    uint32_t flags,
2568                                    struct ctdb_all_public_ips **ips)
2569 {
2570         int ret;
2571         TDB_DATA outdata;
2572         int32_t res;
2573
2574         ret = ctdb_control(ctdb, destnode, 0, 
2575                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2576                            mem_ctx, &outdata, &res, &timeout, NULL);
2577         if (ret == 0 && res == -1) {
2578                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2579                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2580         }
2581         if (ret != 0 || res != 0) {
2582           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2583                 return -1;
2584         }
2585
2586         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2587         talloc_free(outdata.dptr);
2588                     
2589         return 0;
2590 }
2591
2592 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2593                              struct timeval timeout, uint32_t destnode,
2594                              TALLOC_CTX *mem_ctx,
2595                              struct ctdb_all_public_ips **ips)
2596 {
2597         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2598                                               destnode, mem_ctx,
2599                                               0, ips);
2600 }
2601
2602 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2603                         struct timeval timeout, uint32_t destnode, 
2604                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2605 {
2606         int ret, i, len;
2607         TDB_DATA outdata;
2608         int32_t res;
2609         struct ctdb_all_public_ipsv4 *ipsv4;
2610
2611         ret = ctdb_control(ctdb, destnode, 0, 
2612                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2613                            mem_ctx, &outdata, &res, &timeout, NULL);
2614         if (ret != 0 || res != 0) {
2615                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2616                 return -1;
2617         }
2618
2619         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2620         len = offsetof(struct ctdb_all_public_ips, ips) +
2621                 ipsv4->num*sizeof(struct ctdb_public_ip);
2622         *ips = talloc_zero_size(mem_ctx, len);
2623         CTDB_NO_MEMORY(ctdb, *ips);
2624         (*ips)->num = ipsv4->num;
2625         for (i=0; i<ipsv4->num; i++) {
2626                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2627                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2628         }
2629
2630         talloc_free(outdata.dptr);
2631                     
2632         return 0;
2633 }
2634
2635 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2636                                  struct timeval timeout, uint32_t destnode,
2637                                  TALLOC_CTX *mem_ctx,
2638                                  const ctdb_sock_addr *addr,
2639                                  struct ctdb_control_public_ip_info **_info)
2640 {
2641         int ret;
2642         TDB_DATA indata;
2643         TDB_DATA outdata;
2644         int32_t res;
2645         struct ctdb_control_public_ip_info *info;
2646         uint32_t len;
2647         uint32_t i;
2648
2649         indata.dptr = discard_const_p(uint8_t, addr);
2650         indata.dsize = sizeof(*addr);
2651
2652         ret = ctdb_control(ctdb, destnode, 0,
2653                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2654                            mem_ctx, &outdata, &res, &timeout, NULL);
2655         if (ret != 0 || res != 0) {
2656                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2657                                 "failed ret:%d res:%d\n",
2658                                 ret, res));
2659                 return -1;
2660         }
2661
2662         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2663         if (len > outdata.dsize) {
2664                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2665                                 "returned invalid data with size %u > %u\n",
2666                                 (unsigned int)outdata.dsize,
2667                                 (unsigned int)len));
2668                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2669                 return -1;
2670         }
2671
2672         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2673         len += info->num*sizeof(struct ctdb_control_iface_info);
2674
2675         if (len > outdata.dsize) {
2676                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2677                                 "returned invalid data with size %u > %u\n",
2678                                 (unsigned int)outdata.dsize,
2679                                 (unsigned int)len));
2680                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2681                 return -1;
2682         }
2683
2684         /* make sure we null terminate the returned strings */
2685         for (i=0; i < info->num; i++) {
2686                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2687         }
2688
2689         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2690                                                                 outdata.dptr,
2691                                                                 outdata.dsize);
2692         talloc_free(outdata.dptr);
2693         if (*_info == NULL) {
2694                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2695                                 "talloc_memdup size %u failed\n",
2696                                 (unsigned int)outdata.dsize));
2697                 return -1;
2698         }
2699
2700         return 0;
2701 }
2702
2703 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2704                          struct timeval timeout, uint32_t destnode,
2705                          TALLOC_CTX *mem_ctx,
2706                          struct ctdb_control_get_ifaces **_ifaces)
2707 {
2708         int ret;
2709         TDB_DATA outdata;
2710         int32_t res;
2711         struct ctdb_control_get_ifaces *ifaces;
2712         uint32_t len;
2713         uint32_t i;
2714
2715         ret = ctdb_control(ctdb, destnode, 0,
2716                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2717                            mem_ctx, &outdata, &res, &timeout, NULL);
2718         if (ret != 0 || res != 0) {
2719                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2720                                 "failed ret:%d res:%d\n",
2721                                 ret, res));
2722                 return -1;
2723         }
2724
2725         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2726         if (len > outdata.dsize) {
2727                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2728                                 "returned invalid data with size %u > %u\n",
2729                                 (unsigned int)outdata.dsize,
2730                                 (unsigned int)len));
2731                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2732                 return -1;
2733         }
2734
2735         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2736         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2737
2738         if (len > outdata.dsize) {
2739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2740                                 "returned invalid data with size %u > %u\n",
2741                                 (unsigned int)outdata.dsize,
2742                                 (unsigned int)len));
2743                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2744                 return -1;
2745         }
2746
2747         /* make sure we null terminate the returned strings */
2748         for (i=0; i < ifaces->num; i++) {
2749                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2750         }
2751
2752         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2753                                                                   outdata.dptr,
2754                                                                   outdata.dsize);
2755         talloc_free(outdata.dptr);
2756         if (*_ifaces == NULL) {
2757                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2758                                 "talloc_memdup size %u failed\n",
2759                                 (unsigned int)outdata.dsize));
2760                 return -1;
2761         }
2762
2763         return 0;
2764 }
2765
2766 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2767                              struct timeval timeout, uint32_t destnode,
2768                              TALLOC_CTX *mem_ctx,
2769                              const struct ctdb_control_iface_info *info)
2770 {
2771         int ret;
2772         TDB_DATA indata;
2773         int32_t res;
2774
2775         indata.dptr = discard_const_p(uint8_t, info);
2776         indata.dsize = sizeof(*info);
2777
2778         ret = ctdb_control(ctdb, destnode, 0,
2779                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2780                            mem_ctx, NULL, &res, &timeout, NULL);
2781         if (ret != 0 || res != 0) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2783                                 "failed ret:%d res:%d\n",
2784                                 ret, res));
2785                 return -1;
2786         }
2787
2788         return 0;
2789 }
2790
2791 /*
2792   set/clear the permanent disabled bit on a remote node
2793  */
2794 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2795                        uint32_t set, uint32_t clear)
2796 {
2797         int ret;
2798         TDB_DATA data;
2799         struct ctdb_node_map *nodemap=NULL;
2800         struct ctdb_node_flag_change c;
2801         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2802         uint32_t recmaster;
2803         uint32_t *nodes;
2804
2805
2806         /* find the recovery master */
2807         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2808         if (ret != 0) {
2809                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2810                 talloc_free(tmp_ctx);
2811                 return ret;
2812         }
2813
2814
2815         /* read the node flags from the recmaster */
2816         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2817         if (ret != 0) {
2818                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2819                 talloc_free(tmp_ctx);
2820                 return -1;
2821         }
2822         if (destnode >= nodemap->num) {
2823                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2824                 talloc_free(tmp_ctx);
2825                 return -1;
2826         }
2827
2828         c.pnn       = destnode;
2829         c.old_flags = nodemap->nodes[destnode].flags;
2830         c.new_flags = c.old_flags;
2831         c.new_flags |= set;
2832         c.new_flags &= ~clear;
2833
2834         data.dsize = sizeof(c);
2835         data.dptr = (unsigned char *)&c;
2836
2837         /* send the flags update to all connected nodes */
2838         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2839
2840         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2841                                         nodes, 0,
2842                                         timeout, false, data,
2843                                         NULL, NULL,
2844                                         NULL) != 0) {
2845                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2846
2847                 talloc_free(tmp_ctx);
2848                 return -1;
2849         }
2850
2851         talloc_free(tmp_ctx);
2852         return 0;
2853 }
2854
2855
2856 /*
2857   get all tunables
2858  */
2859 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2860                                struct timeval timeout, 
2861                                uint32_t destnode,
2862                                struct ctdb_tunable *tunables)
2863 {
2864         TDB_DATA outdata;
2865         int ret;
2866         int32_t res;
2867
2868         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2869                            &outdata, &res, &timeout, NULL);
2870         if (ret != 0 || res != 0) {
2871                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2872                 return -1;
2873         }
2874
2875         if (outdata.dsize != sizeof(*tunables)) {
2876                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2877                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2878                 return -1;              
2879         }
2880
2881         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2882         talloc_free(outdata.dptr);
2883         return 0;
2884 }
2885
2886 /*
2887   add a public address to a node
2888  */
2889 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2890                       struct timeval timeout, 
2891                       uint32_t destnode,
2892                       struct ctdb_control_ip_iface *pub)
2893 {
2894         TDB_DATA data;
2895         int32_t res;
2896         int ret;
2897
2898         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2899         data.dptr  = (unsigned char *)pub;
2900
2901         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2902                            NULL, &res, &timeout, NULL);
2903         if (ret != 0 || res != 0) {
2904                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2905                 return -1;
2906         }
2907
2908         return 0;
2909 }
2910
2911 /*
2912   delete a public address from a node
2913  */
2914 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2915                       struct timeval timeout, 
2916                       uint32_t destnode,
2917                       struct ctdb_control_ip_iface *pub)
2918 {
2919         TDB_DATA data;
2920         int32_t res;
2921         int ret;
2922
2923         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2924         data.dptr  = (unsigned char *)pub;
2925
2926         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2927                            NULL, &res, &timeout, NULL);
2928         if (ret != 0 || res != 0) {
2929                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2930                 return -1;
2931         }
2932
2933         return 0;
2934 }
2935
2936 /*
2937   kill a tcp connection
2938  */
2939 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2940                       struct timeval timeout, 
2941                       uint32_t destnode,
2942                       struct ctdb_control_killtcp *killtcp)
2943 {
2944         TDB_DATA data;
2945         int32_t res;
2946         int ret;
2947
2948         data.dsize = sizeof(struct ctdb_control_killtcp);
2949         data.dptr  = (unsigned char *)killtcp;
2950
2951         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2952                            NULL, &res, &timeout, NULL);
2953         if (ret != 0 || res != 0) {
2954                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2955                 return -1;
2956         }
2957
2958         return 0;
2959 }
2960
2961 /*
2962   send a gratious arp
2963  */
2964 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2965                       struct timeval timeout, 
2966                       uint32_t destnode,
2967                       ctdb_sock_addr *addr,
2968                       const char *ifname)
2969 {
2970         TDB_DATA data;
2971         int32_t res;
2972         int ret, len;
2973         struct ctdb_control_gratious_arp *gratious_arp;
2974         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2975
2976
2977         len = strlen(ifname)+1;
2978         gratious_arp = talloc_size(tmp_ctx, 
2979                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2980         CTDB_NO_MEMORY(ctdb, gratious_arp);
2981
2982         gratious_arp->addr = *addr;
2983         gratious_arp->len = len;
2984         memcpy(&gratious_arp->iface[0], ifname, len);
2985
2986
2987         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2988         data.dptr  = (unsigned char *)gratious_arp;
2989
2990         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2991                            NULL, &res, &timeout, NULL);
2992         if (ret != 0 || res != 0) {
2993                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2994                 talloc_free(tmp_ctx);
2995                 return -1;
2996         }
2997
2998         talloc_free(tmp_ctx);
2999         return 0;
3000 }
3001
3002 /*
3003   get a list of all tcp tickles that a node knows about for a particular vnn
3004  */
3005 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3006                               struct timeval timeout, uint32_t destnode, 
3007                               TALLOC_CTX *mem_ctx, 
3008                               ctdb_sock_addr *addr,
3009                               struct ctdb_control_tcp_tickle_list **list)
3010 {
3011         int ret;
3012         TDB_DATA data, outdata;
3013         int32_t status;
3014
3015         data.dptr = (uint8_t*)addr;
3016         data.dsize = sizeof(ctdb_sock_addr);
3017
3018         ret = ctdb_control(ctdb, destnode, 0, 
3019                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3020                            mem_ctx, &outdata, &status, NULL, NULL);
3021         if (ret != 0 || status != 0) {
3022                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3023                 return -1;
3024         }
3025
3026         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3027
3028         return status;
3029 }
3030
3031 /*
3032   register a server id
3033  */
3034 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3035                       struct timeval timeout, 
3036                       struct ctdb_server_id *id)
3037 {
3038         TDB_DATA data;
3039         int32_t res;
3040         int ret;
3041
3042         data.dsize = sizeof(struct ctdb_server_id);
3043         data.dptr  = (unsigned char *)id;
3044
3045         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3046                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3047                         0, data, NULL,
3048                         NULL, &res, &timeout, NULL);
3049         if (ret != 0 || res != 0) {
3050                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3051                 return -1;
3052         }
3053
3054         return 0;
3055 }
3056
3057 /*
3058   unregister a server id
3059  */
3060 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3061                       struct timeval timeout, 
3062                       struct ctdb_server_id *id)
3063 {
3064         TDB_DATA data;
3065         int32_t res;
3066         int ret;
3067
3068         data.dsize = sizeof(struct ctdb_server_id);
3069         data.dptr  = (unsigned char *)id;
3070
3071         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3072                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3073                         0, data, NULL,
3074                         NULL, &res, &timeout, NULL);
3075         if (ret != 0 || res != 0) {
3076                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3077                 return -1;
3078         }
3079
3080         return 0;
3081 }
3082
3083
3084 /*
3085   check if a server id exists
3086
3087   if a server id does exist, return *status == 1, otherwise *status == 0
3088  */
3089 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3090                       struct timeval timeout, 
3091                       uint32_t destnode,
3092                       struct ctdb_server_id *id,
3093                       uint32_t *status)
3094 {
3095         TDB_DATA data;
3096         int32_t res;
3097         int ret;
3098
3099         data.dsize = sizeof(struct ctdb_server_id);
3100         data.dptr  = (unsigned char *)id;
3101
3102         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3103                         0, data, NULL,
3104                         NULL, &res, &timeout, NULL);
3105         if (ret != 0) {
3106                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3107                 return -1;
3108         }
3109
3110         if (res) {
3111                 *status = 1;
3112         } else {
3113                 *status = 0;
3114         }
3115
3116         return 0;
3117 }
3118
3119 /*
3120    get the list of server ids that are registered on a node
3121 */
3122 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3123                 TALLOC_CTX *mem_ctx,
3124                 struct timeval timeout, uint32_t destnode, 
3125                 struct ctdb_server_id_list **svid_list)
3126 {
3127         int ret;
3128         TDB_DATA outdata;
3129         int32_t res;
3130
3131         ret = ctdb_control(ctdb, destnode, 0, 
3132                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3133                            mem_ctx, &outdata, &res, &timeout, NULL);
3134         if (ret != 0 || res != 0) {
3135                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3136                 return -1;
3137         }
3138
3139         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3140                     
3141         return 0;
3142 }
3143
3144 /*
3145   initialise the ctdb daemon for client applications
3146
3147   NOTE: In current code the daemon does not fork. This is for testing purposes only
3148   and to simplify the code.
3149 */
3150 struct ctdb_context *ctdb_init(struct event_context *ev)
3151 {
3152         int ret;
3153         struct ctdb_context *ctdb;
3154
3155         ctdb = talloc_zero(ev, struct ctdb_context);
3156         if (ctdb == NULL) {
3157                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3158                 return NULL;
3159         }
3160         ctdb->ev  = ev;
3161         ctdb->idr = idr_init(ctdb);
3162         /* Wrap early to exercise code. */
3163         ctdb->lastid = INT_MAX-200;
3164         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3165
3166         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3167         if (ret != 0) {
3168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3169                 talloc_free(ctdb);
3170                 return NULL;
3171         }
3172
3173         ctdb->statistics.statistics_start_time = timeval_current();
3174
3175         return ctdb;
3176 }
3177
3178
3179 /*
3180   set some ctdb flags
3181 */
3182 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3183 {
3184         ctdb->flags |= flags;
3185 }
3186
3187 /*
3188   setup the local socket name
3189 */
3190 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3191 {
3192         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3193         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3194
3195         return 0;
3196 }
3197
3198 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3199 {
3200         return ctdb->daemon.name;
3201 }
3202
3203 /*
3204   return the pnn of this node
3205 */
3206 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3207 {
3208         return ctdb->pnn;
3209 }
3210
3211
3212 /*
3213   get the uptime of a remote node
3214  */
3215 struct ctdb_client_control_state *
3216 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3217 {
3218         return ctdb_control_send(ctdb, destnode, 0, 
3219                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3220                            mem_ctx, &timeout, NULL);
3221 }
3222
3223 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3224 {
3225         int ret;
3226         int32_t res;
3227         TDB_DATA outdata;
3228
3229         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3230         if (ret != 0 || res != 0) {
3231                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3232                 return -1;
3233         }
3234
3235         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3236
3237         return 0;
3238 }
3239
3240 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3241 {
3242         struct ctdb_client_control_state *state;
3243
3244         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3245         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3246 }
3247
3248 /*
3249   send a control to execute the "recovered" event script on a node
3250  */
3251 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3252 {
3253         int ret;
3254         int32_t status;
3255
3256         ret = ctdb_control(ctdb, destnode, 0, 
3257                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3258                            NULL, NULL, &status, &timeout, NULL);
3259         if (ret != 0 || status != 0) {
3260                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3261                 return -1;
3262         }
3263
3264         return 0;
3265 }
3266
3267 /* 
3268   callback for the async helpers used when sending the same control
3269   to multiple nodes in parallell.
3270 */
3271 static void async_callback(struct ctdb_client_control_state *state)
3272 {
3273         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3274         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3275         int ret;
3276         TDB_DATA outdata;
3277         int32_t res;
3278         uint32_t destnode = state->c->hdr.destnode;
3279
3280         /* one more node has responded with recmode data */
3281         data->count--;
3282
3283         /* if we failed to push the db, then return an error and let
3284            the main loop try again.
3285         */
3286         if (state->state != CTDB_CONTROL_DONE) {
3287                 if ( !data->dont_log_errors) {
3288                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3289                 }
3290                 data->fail_count++;
3291                 if (data->fail_callback) {
3292                         data->fail_callback(ctdb, destnode, res, outdata,
3293                                         data->callback_data);
3294                 }
3295                 return;
3296         }
3297         
3298         state->async.fn = NULL;
3299
3300         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3301         if ((ret != 0) || (res != 0)) {
3302                 if ( !data->dont_log_errors) {
3303                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3304                 }
3305                 data->fail_count++;
3306                 if (data->fail_callback) {
3307                         data->fail_callback(ctdb, destnode, res, outdata,
3308                                         data->callback_data);
3309                 }
3310         }
3311         if ((ret == 0) && (data->callback != NULL)) {
3312                 data->callback(ctdb, destnode, res, outdata,
3313                                         data->callback_data);
3314         }
3315 }
3316
3317
3318 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3319 {
3320         /* set up the callback functions */
3321         state->async.fn = async_callback;
3322         state->async.private_data = data;
3323         
3324         /* one more control to wait for to complete */
3325         data->count++;
3326 }
3327
3328
3329 /* wait for up to the maximum number of seconds allowed
3330    or until all nodes we expect a response from has replied
3331 */
3332 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3333 {
3334         while (data->count > 0) {
3335                 event_loop_once(ctdb->ev);
3336         }
3337         if (data->fail_count != 0) {
3338                 if (!data->dont_log_errors) {
3339                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3340                                  data->fail_count));
3341                 }
3342                 return -1;
3343         }
3344         return 0;
3345 }
3346
3347
3348 /* 
3349    perform a simple control on the listed nodes
3350    The control cannot return data
3351  */
3352 int ctdb_client_async_control(struct ctdb_context *ctdb,
3353                                 enum ctdb_controls opcode,
3354                                 uint32_t *nodes,
3355                                 uint64_t srvid,
3356                                 struct timeval timeout,
3357                                 bool dont_log_errors,
3358                                 TDB_DATA data,
3359                                 client_async_callback client_callback,
3360                                 client_async_callback fail_callback,
3361                                 void *callback_data)
3362 {
3363         struct client_async_data *async_data;
3364         struct ctdb_client_control_state *state;
3365         int j, num_nodes;
3366
3367         async_data = talloc_zero(ctdb, struct client_async_data);
3368         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3369         async_data->dont_log_errors = dont_log_errors;
3370         async_data->callback = client_callback;
3371         async_data->fail_callback = fail_callback;
3372         async_data->callback_data = callback_data;
3373         async_data->opcode        = opcode;
3374
3375         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3376
3377         /* loop over all nodes and send an async control to each of them */
3378         for (j=0; j<num_nodes; j++) {
3379                 uint32_t pnn = nodes[j];
3380
3381                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3382                                           0, data, async_data, &timeout, NULL);
3383                 if (state == NULL) {
3384                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3385                         talloc_free(async_data);
3386                         return -1;
3387                 }
3388                 
3389                 ctdb_client_async_add(async_data, state);
3390         }
3391
3392         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3393                 talloc_free(async_data);
3394                 return -1;
3395         }
3396
3397         talloc_free(async_data);
3398         return 0;
3399 }
3400
3401 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3402                                 struct ctdb_vnn_map *vnn_map,
3403                                 TALLOC_CTX *mem_ctx,
3404                                 bool include_self)
3405 {
3406         int i, j, num_nodes;
3407         uint32_t *nodes;
3408
3409         for (i=num_nodes=0;i<vnn_map->size;i++) {
3410                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3411                         continue;
3412                 }
3413                 num_nodes++;
3414         } 
3415
3416         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3417         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3418
3419         for (i=j=0;i<vnn_map->size;i++) {
3420                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3421                         continue;
3422                 }
3423                 nodes[j++] = vnn_map->map[i];
3424         } 
3425
3426         return nodes;
3427 }
3428
3429 /* Get list of nodes not including those with flags specified by mask.
3430  * If exclude_pnn is not -1 then exclude that pnn from the list.
3431  */
3432 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3433                         struct ctdb_node_map *node_map,
3434                         TALLOC_CTX *mem_ctx,
3435                         uint32_t mask,
3436                         int exclude_pnn)
3437 {
3438         int i, j, num_nodes;
3439         uint32_t *nodes;
3440
3441         for (i=num_nodes=0;i<node_map->num;i++) {
3442                 if (node_map->nodes[i].flags & mask) {
3443                         continue;
3444                 }
3445                 if (node_map->nodes[i].pnn == exclude_pnn) {
3446                         continue;
3447                 }
3448                 num_nodes++;
3449         } 
3450
3451         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3452         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3453
3454         for (i=j=0;i<node_map->num;i++) {
3455                 if (node_map->nodes[i].flags & mask) {
3456                         continue;
3457                 }
3458                 if (node_map->nodes[i].pnn == exclude_pnn) {
3459                         continue;
3460                 }
3461                 nodes[j++] = node_map->nodes[i].pnn;
3462         } 
3463
3464         return nodes;
3465 }
3466
3467 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3468                                 struct ctdb_node_map *node_map,
3469                                 TALLOC_CTX *mem_ctx,
3470                                 bool include_self)
3471 {
3472         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3473                              include_self ? -1 : ctdb->pnn);
3474 }
3475
3476 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
3477                                 struct ctdb_node_map *node_map,
3478                                 TALLOC_CTX *mem_ctx,
3479                                 uint32_t pnn)
3480 {
3481         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE, pnn);
3482 }
3483
3484 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3485                                 struct ctdb_node_map *node_map,
3486                                 TALLOC_CTX *mem_ctx,
3487                                 bool include_self)
3488 {
3489         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3490                              include_self ? -1 : ctdb->pnn);
3491 }
3492
3493 /* 
3494   this is used to test if a pnn lock exists and if it exists will return
3495   the number of connections that pnn has reported or -1 if that recovery
3496   daemon is not running.
3497 */
3498 int
3499 ctdb_read_pnn_lock(int fd, int32_t pnn)
3500 {
3501         struct flock lock;
3502         char c;
3503
3504         lock.l_type = F_WRLCK;
3505         lock.l_whence = SEEK_SET;
3506         lock.l_start = pnn;
3507         lock.l_len = 1;
3508         lock.l_pid = 0;
3509
3510         if (fcntl(fd, F_GETLK, &lock) != 0) {
3511                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3512                 return -1;
3513         }
3514
3515         if (lock.l_type == F_UNLCK) {
3516                 return -1;
3517         }
3518
3519         if (pread(fd, &c, 1, pnn) == -1) {
3520                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3521                 return -1;
3522         }
3523
3524         return c;
3525 }
3526
3527 /*
3528   get capabilities of a remote node
3529  */
3530 struct ctdb_client_control_state *
3531 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3532 {
3533         return ctdb_control_send(ctdb, destnode, 0, 
3534                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3535                            mem_ctx, &timeout, NULL);
3536 }
3537
3538 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3539 {
3540         int ret;
3541         int32_t res;
3542         TDB_DATA outdata;
3543
3544         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3545         if ( (ret != 0) || (res != 0) ) {
3546                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3547                 return -1;
3548         }
3549
3550         if (capabilities) {
3551                 *capabilities = *((uint32_t *)outdata.dptr);
3552         }
3553
3554         return 0;
3555 }
3556
3557 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3558 {
3559         struct ctdb_client_control_state *state;
3560         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3561         int ret;
3562
3563         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3564         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3565         talloc_free(tmp_ctx);
3566         return ret;
3567 }
3568
3569 /**
3570  * check whether a transaction is active on a given db on a given node
3571  */
3572 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3573                                      uint32_t destnode,
3574                                      uint32_t db_id)
3575 {
3576         int32_t status;
3577         int ret;
3578         TDB_DATA indata;
3579
3580         indata.dptr = (uint8_t *)&db_id;
3581         indata.dsize = sizeof(db_id);
3582
3583         ret = ctdb_control(ctdb, destnode, 0,
3584                            CTDB_CONTROL_TRANS2_ACTIVE,
3585                            0, indata, NULL, NULL, &status,
3586                            NULL, NULL);
3587
3588         if (ret != 0) {
3589                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3590                 return -1;
3591         }
3592
3593         return status;
3594 }
3595
3596
3597 struct ctdb_transaction_handle {
3598         struct ctdb_db_context *ctdb_db;
3599         bool in_replay;
3600         /*
3601          * we store the reads and writes done under a transaction:
3602          * - one list stores both reads and writes (m_all),
3603          * - the other just writes (m_write)
3604          */
3605         struct ctdb_marshall_buffer *m_all;
3606         struct ctdb_marshall_buffer *m_write;
3607 };
3608
3609 /* start a transaction on a database */
3610 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3611 {
3612         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3613         return 0;
3614 }
3615
3616 /* start a transaction on a database */
3617 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3618 {
3619         struct ctdb_record_handle *rh;
3620         TDB_DATA key;
3621         TDB_DATA data;
3622         struct ctdb_ltdb_header header;
3623         TALLOC_CTX *tmp_ctx;
3624         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3625         int ret;
3626         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3627         pid_t pid;
3628         int32_t status;
3629
3630         key.dptr = discard_const(keyname);
3631         key.dsize = strlen(keyname);
3632
3633         if (!ctdb_db->persistent) {
3634                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3635                 return -1;
3636         }
3637
3638 again:
3639         tmp_ctx = talloc_new(h);
3640
3641         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3642         if (rh == NULL) {
3643                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3644                 talloc_free(tmp_ctx);
3645                 return -1;
3646         }
3647
3648         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3649                                               CTDB_CURRENT_NODE,
3650                                               ctdb_db->db_id);
3651         if (status == 1) {
3652                 unsigned long int usec = (1000 + random()) % 100000;
3653                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3654                                     "on db_id[0x%08x]. waiting for %lu "
3655                                     "microseconds\n",
3656                                     ctdb_db->db_id, usec));
3657                 talloc_free(tmp_ctx);
3658                 usleep(usec);
3659                 goto again;
3660         }
3661
3662         /*
3663          * store the pid in the database:
3664          * it is not enough that the node is dmaster...
3665          */
3666         pid = getpid();
3667         data.dptr = (unsigned char *)&pid;
3668         data.dsize = sizeof(pid_t);
3669         rh->header.rsn++;
3670         rh->header.dmaster = ctdb_db->ctdb->pnn;
3671         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3672         if (ret != 0) {
3673                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3674                                   "transaction record\n"));
3675                 talloc_free(tmp_ctx);
3676                 return -1;
3677         }
3678
3679         talloc_free(rh);
3680
3681         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3682         if (ret != 0) {
3683                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3684                 talloc_free(tmp_ctx);
3685                 return -1;
3686         }
3687
3688         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3689         if (ret != 0) {
3690                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3691                                  "lock record inside transaction\n"));
3692                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3693                 talloc_free(tmp_ctx);
3694                 goto again;
3695         }
3696
3697         if (header.dmaster != ctdb_db->ctdb->pnn) {
3698                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3699                                    "transaction lock record\n"));
3700                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3701                 talloc_free(tmp_ctx);
3702                 goto again;
3703         }
3704
3705         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3706                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3707                                     "the transaction lock record\n"));
3708                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3709                 talloc_free(tmp_ctx);
3710                 goto again;
3711         }
3712
3713         talloc_free(tmp_ctx);
3714
3715         return 0;
3716 }
3717
3718
3719 /* start a transaction on a database */
3720 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3721                                                        TALLOC_CTX *mem_ctx)
3722 {
3723         struct ctdb_transaction_handle *h;
3724         int ret;
3725
3726         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3727         if (h == NULL) {
3728                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3729                 return NULL;
3730         }
3731
3732         h->ctdb_db = ctdb_db;
3733
3734         ret = ctdb_transaction_fetch_start(h);
3735         if (ret != 0) {
3736                 talloc_free(h);
3737                 return NULL;
3738         }
3739
3740         talloc_set_destructor(h, ctdb_transaction_destructor);
3741
3742         return h;
3743 }
3744
3745
3746
3747 /*
3748   fetch a record inside a transaction
3749  */
3750 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3751                            TALLOC_CTX *mem_ctx, 
3752                            TDB_DATA key, TDB_DATA *data)
3753 {
3754         struct ctdb_ltdb_header header;
3755         int ret;
3756
3757         ZERO_STRUCT(header);
3758
3759         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3760         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3761                 /* record doesn't exist yet */
3762                 *data = tdb_null;
3763                 ret = 0;
3764         }
3765         
3766         if (ret != 0) {
3767                 return ret;
3768         }
3769
3770         if (!h->in_replay) {
3771                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3772                 if (h->m_all == NULL) {
3773                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3774                         return -1;
3775                 }
3776         }
3777
3778         return 0;
3779 }
3780
3781 /*
3782   stores a record inside a transaction
3783  */
3784 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3785                            TDB_DATA key, TDB_DATA data)
3786 {
3787         TALLOC_CTX *tmp_ctx = talloc_new(h);
3788         struct ctdb_ltdb_header header;
3789         TDB_DATA olddata;
3790         int ret;
3791
3792         ZERO_STRUCT(header);
3793
3794         /* we need the header so we can update the RSN */
3795         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3796         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3797                 /* the record doesn't exist - create one with us as dmaster.
3798                    This is only safe because we are in a transaction and this
3799                    is a persistent database */
3800                 ZERO_STRUCT(header);
3801         } else if (ret != 0) {
3802                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3803                 talloc_free(tmp_ctx);
3804                 return ret;
3805         }
3806
3807         if (data.dsize == olddata.dsize &&
3808             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3809                 /* save writing the same data */
3810                 talloc_free(tmp_ctx);
3811                 return 0;
3812         }
3813
3814         header.dmaster = h->ctdb_db->ctdb->pnn;
3815         header.rsn++;
3816
3817         if (!h->in_replay) {
3818                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3819                 if (h->m_all == NULL) {
3820                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3821                         talloc_free(tmp_ctx);
3822                         return -1;
3823                 }
3824         }               
3825
3826         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3827         if (h->m_write == NULL) {
3828                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3829                 talloc_free(tmp_ctx);
3830                 return -1;
3831         }
3832         
3833         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3834
3835         talloc_free(tmp_ctx);
3836         
3837         return ret;
3838 }
3839
3840 /*
3841   replay a transaction
3842  */
3843 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3844 {
3845         int ret, i;
3846         struct ctdb_rec_data *rec = NULL;
3847
3848         h->in_replay = true;
3849         talloc_free(h->m_write);
3850         h->m_write = NULL;
3851
3852         ret = ctdb_transaction_fetch_start(h);
3853         if (ret != 0) {
3854                 return ret;
3855         }
3856
3857         for (i=0;i<h->m_all->count;i++) {
3858                 TDB_DATA key, data;
3859
3860                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3861                 if (rec == NULL) {
3862                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3863                         goto failed;
3864                 }
3865
3866                 if (rec->reqid == 0) {
3867                         /* its a store */
3868                         if (ctdb_transaction_store(h, key, data) != 0) {
3869                                 goto failed;
3870                         }
3871                 } else {
3872                         TDB_DATA data2;
3873                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3874
3875                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3876                                 talloc_free(tmp_ctx);
3877                                 goto failed;
3878                         }
3879                         if (data2.dsize != data.dsize ||
3880                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3881                                 /* the record has changed on us - we have to give up */
3882                                 talloc_free(tmp_ctx);
3883                                 goto failed;
3884                         }
3885                         talloc_free(tmp_ctx);
3886                 }
3887         }
3888         
3889         return 0;
3890
3891 failed:
3892         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3893         return -1;
3894 }
3895
3896
3897 /*
3898   commit a transaction
3899  */
3900 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3901 {
3902         int ret, retries=0;
3903         int32_t status;
3904         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3905         struct timeval timeout;
3906         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3907
3908         talloc_set_destructor(h, NULL);
3909
3910         /* our commit strategy is quite complex.
3911
3912            - we first try to commit the changes to all other nodes
3913
3914            - if that works, then we commit locally and we are done
3915
3916            - if a commit on another node fails, then we need to cancel
3917              the transaction, then restart the transaction (thus
3918              opening a window of time for a pending recovery to
3919              complete), then replay the transaction, checking all the
3920              reads and writes (checking that reads give the same data,
3921              and writes succeed). Then we retry the transaction to the
3922              other nodes
3923         */
3924
3925 again:
3926         if (h->m_write == NULL) {
3927                 /* no changes were made */
3928                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3929                 talloc_free(h);
3930                 return 0;
3931         }
3932
3933         /* tell ctdbd to commit to the other nodes */
3934         timeout = timeval_current_ofs(1, 0);
3935         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3936                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3937                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3938                            &timeout, NULL);
3939         if (ret != 0 || status != 0) {
3940                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3941                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3942                                      ", retrying after 1 second...\n",
3943                                      (retries==0)?"":"retry "));
3944                 sleep(1);
3945
3946                 if (ret != 0) {
3947                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3948                 } else {
3949                         /* work out what error code we will give if we 
3950                            have to fail the operation */
3951                         switch ((enum ctdb_trans2_commit_error)status) {
3952                         case CTDB_TRANS2_COMMIT_SUCCESS:
3953                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3954                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3955                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3956                                 break;
3957                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3958                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3959                                 break;
3960                         }
3961                 }
3962
3963                 if (++retries == 100) {
3964                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3965                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3966                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3967                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3968                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3969                         talloc_free(h);
3970                         return -1;
3971                 }               
3972
3973                 if (ctdb_replay_transaction(h) != 0) {
3974                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3975                                           "transaction on db 0x%08x, "
3976                                           "failure control =%u\n",
3977                                           h->ctdb_db->db_id,
3978                                           (unsigned)failure_control));
3979                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3980                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3981                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3982                         talloc_free(h);
3983                         return -1;
3984                 }
3985                 goto again;
3986         } else {
3987                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3988         }
3989
3990         /* do the real commit locally */
3991         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3992         if (ret != 0) {
3993                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3994                                   "on db id 0x%08x locally, "
3995                                   "failure_control=%u\n",
3996                                   h->ctdb_db->db_id,
3997                                   (unsigned)failure_control));
3998                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3999                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4000                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4001                 talloc_free(h);
4002                 return ret;
4003         }
4004
4005         /* tell ctdbd that we are finished with our local commit */
4006         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4007                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4008                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4009         talloc_free(h);
4010         return 0;
4011 }
4012
4013 /*
4014   recovery daemon ping to main daemon
4015  */
4016 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4017 {
4018         int ret;
4019         int32_t res;
4020
4021         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4022                            ctdb, NULL, &res, NULL, NULL);
4023         if (ret != 0 || res != 0) {
4024                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4025                 return -1;
4026         }
4027
4028         return 0;
4029 }
4030
4031 /* When forking the main daemon and the child process needs to connect
4032  * back to the daemon as a client process, this function can be used
4033  * to change the ctdb context from daemon into client mode.  The child
4034  * process must be created using ctdb_fork() and not fork() -
4035  * ctdb_fork() does some necessary housekeeping.
4036  */
4037 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4038 {
4039         int ret;
4040         va_list ap;
4041
4042         /* Add extra information so we can identify this in the logs */
4043         va_start(ap, fmt);
4044         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4045         va_end(ap);
4046
4047         /* get a new event context */
4048         ctdb->ev = event_context_init(ctdb);
4049         tevent_loop_allow_nesting(ctdb->ev);
4050
4051         /* Connect to main CTDB daemon */
4052         ret = ctdb_socket_connect(ctdb);
4053         if (ret != 0) {
4054                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4055                 return -1;
4056         }
4057
4058         ctdb->can_send_controls = true;
4059
4060         return 0;
4061 }
4062
4063 /*
4064   get the status of running the monitor eventscripts: NULL means never run.
4065  */
4066 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4067                 struct timeval timeout, uint32_t destnode, 
4068                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4069                 struct ctdb_scripts_wire **scripts)
4070 {
4071         int ret;
4072         TDB_DATA outdata, indata;
4073         int32_t res;
4074         uint32_t uinttype = type;
4075
4076         indata.dptr = (uint8_t *)&uinttype;
4077         indata.dsize = sizeof(uinttype);
4078
4079         ret = ctdb_control(ctdb, destnode, 0, 
4080                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4081                            mem_ctx, &outdata, &res, &timeout, NULL);
4082         if (ret != 0 || res != 0) {
4083                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4084                 return -1;
4085         }
4086
4087         if (outdata.dsize == 0) {
4088                 *scripts = NULL;
4089         } else {
4090                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4091                 talloc_free(outdata.dptr);
4092         }
4093                     
4094         return 0;
4095 }
4096
4097 /*
4098   tell the main daemon how long it took to lock the reclock file
4099  */
4100 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4101 {
4102         int ret;
4103         int32_t res;
4104         TDB_DATA data;
4105
4106         data.dptr = (uint8_t *)&latency;
4107         data.dsize = sizeof(latency);
4108
4109         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4110                            ctdb, NULL, &res, NULL, NULL);
4111         if (ret != 0 || res != 0) {
4112                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4113                 return -1;
4114         }
4115
4116         return 0;
4117 }
4118
4119 /*
4120   get the name of the reclock file
4121  */
4122 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4123                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4124                          const char **name)
4125 {
4126         int ret;
4127         int32_t res;
4128         TDB_DATA data;
4129
4130         ret = ctdb_control(ctdb, destnode, 0, 
4131                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4132                            mem_ctx, &data, &res, &timeout, NULL);
4133         if (ret != 0 || res != 0) {
4134                 return -1;
4135         }
4136
4137         if (data.dsize == 0) {
4138                 *name = NULL;
4139         } else {
4140                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4141         }
4142         talloc_free(data.dptr);
4143
4144         return 0;
4145 }
4146
4147 /*
4148   set the reclock filename for a node
4149  */
4150 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4151 {
4152         int ret;
4153         TDB_DATA data;
4154         int32_t res;
4155
4156         if (reclock == NULL) {
4157                 data.dsize = 0;
4158                 data.dptr  = NULL;
4159         } else {
4160                 data.dsize = strlen(reclock) + 1;
4161                 data.dptr  = discard_const(reclock);
4162         }
4163
4164         ret = ctdb_control(ctdb, destnode, 0, 
4165                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4166                            NULL, NULL, &res, &timeout, NULL);
4167         if (ret != 0 || res != 0) {
4168                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4169                 return -1;
4170         }
4171
4172         return 0;
4173 }
4174
4175 /*
4176   stop a node
4177  */
4178 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4179 {
4180         int ret;
4181         int32_t res;
4182
4183         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4184                            ctdb, NULL, &res, &timeout, NULL);
4185         if (ret != 0 || res != 0) {
4186                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4187                 return -1;
4188         }
4189
4190         return 0;
4191 }
4192
4193 /*
4194   continue a node
4195  */
4196 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4197 {
4198         int ret;
4199
4200         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4201                            ctdb, NULL, NULL, &timeout, NULL);
4202         if (ret != 0) {
4203                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4204                 return -1;
4205         }
4206
4207         return 0;
4208 }
4209
4210 /*
4211   set the natgw state for a node
4212  */
4213 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4214 {
4215         int ret;
4216         TDB_DATA data;
4217         int32_t res;
4218
4219         data.dsize = sizeof(natgwstate);
4220         data.dptr  = (uint8_t *)&natgwstate;
4221
4222         ret = ctdb_control(ctdb, destnode, 0, 
4223                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4224                            NULL, NULL, &res, &timeout, NULL);
4225         if (ret != 0 || res != 0) {
4226                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4227                 return -1;
4228         }
4229
4230         return 0;
4231 }
4232
4233 /*
4234   set the lmaster role for a node
4235  */
4236 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4237 {
4238         int ret;
4239         TDB_DATA data;
4240         int32_t res;
4241
4242         data.dsize = sizeof(lmasterrole);
4243         data.dptr  = (uint8_t *)&lmasterrole;
4244
4245         ret = ctdb_control(ctdb, destnode, 0, 
4246                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4247                            NULL, NULL, &res, &timeout, NULL);
4248         if (ret != 0 || res != 0) {
4249                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4250                 return -1;
4251         }
4252
4253         return 0;
4254 }
4255
4256 /*
4257   set the recmaster role for a node
4258  */
4259 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4260 {
4261         int ret;
4262         TDB_DATA data;
4263         int32_t res;
4264
4265         data.dsize = sizeof(recmasterrole);
4266         data.dptr  = (uint8_t *)&recmasterrole;
4267
4268         ret = ctdb_control(ctdb, destnode, 0, 
4269                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4270                            NULL, NULL, &res, &timeout, NULL);
4271         if (ret != 0 || res != 0) {
4272                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4273                 return -1;
4274         }
4275
4276         return 0;
4277 }
4278
4279 /* enable an eventscript
4280  */
4281 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4282 {
4283         int ret;
4284         TDB_DATA data;
4285         int32_t res;
4286
4287         data.dsize = strlen(script) + 1;
4288         data.dptr  = discard_const(script);
4289
4290         ret = ctdb_control(ctdb, destnode, 0, 
4291                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4292                            NULL, NULL, &res, &timeout, NULL);
4293         if (ret != 0 || res != 0) {
4294                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4295                 return -1;
4296         }
4297
4298         return 0;
4299 }
4300
4301 /* disable an eventscript
4302  */
4303 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4304 {
4305         int ret;
4306         TDB_DATA data;
4307         int32_t res;
4308
4309         data.dsize = strlen(script) + 1;
4310         data.dptr  = discard_const(script);
4311
4312         ret = ctdb_control(ctdb, destnode, 0, 
4313                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4314                            NULL, NULL, &res, &timeout, NULL);
4315         if (ret != 0 || res != 0) {
4316                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4317                 return -1;
4318         }
4319
4320         return 0;
4321 }
4322
4323
4324 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4325 {
4326         int ret;
4327         TDB_DATA data;
4328         int32_t res;
4329
4330         data.dsize = sizeof(*bantime);
4331         data.dptr  = (uint8_t *)bantime;
4332
4333         ret = ctdb_control(ctdb, destnode, 0, 
4334                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4335                            NULL, NULL, &res, &timeout, NULL);
4336         if (ret != 0 || res != 0) {
4337                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4338                 return -1;
4339         }
4340
4341         return 0;
4342 }
4343
4344
4345 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4346 {
4347         int ret;
4348         TDB_DATA outdata;
4349         int32_t res;
4350         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4351
4352         ret = ctdb_control(ctdb, destnode, 0, 
4353                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4354                            tmp_ctx, &outdata, &res, &timeout, NULL);
4355         if (ret != 0 || res != 0) {
4356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4357                 talloc_free(tmp_ctx);
4358                 return -1;
4359         }
4360
4361         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4362         talloc_free(tmp_ctx);
4363
4364         return 0;
4365 }
4366
4367
4368 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4369 {
4370         int ret;
4371         int32_t res;
4372         TDB_DATA data;
4373         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4374
4375         data.dptr = (uint8_t*)db_prio;
4376         data.dsize = sizeof(*db_prio);
4377
4378         ret = ctdb_control(ctdb, destnode, 0, 
4379                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4380                            tmp_ctx, NULL, &res, &timeout, NULL);
4381         if (ret != 0 || res != 0) {
4382                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4383                 talloc_free(tmp_ctx);
4384                 return -1;
4385         }
4386
4387         talloc_free(tmp_ctx);
4388
4389         return 0;
4390 }
4391
4392 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4393 {
4394         int ret;
4395         int32_t res;
4396         TDB_DATA data;
4397         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4398
4399         data.dptr = (uint8_t*)&db_id;
4400         data.dsize = sizeof(db_id);
4401
4402         ret = ctdb_control(ctdb, destnode, 0, 
4403                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4404                            tmp_ctx, NULL, &res, &timeout, NULL);
4405         if (ret != 0 || res < 0) {
4406                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4407                 talloc_free(tmp_ctx);
4408                 return -1;
4409         }
4410
4411         if (priority) {
4412                 *priority = res;
4413         }
4414
4415         talloc_free(tmp_ctx);
4416
4417         return 0;
4418 }
4419
4420 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4421 {
4422         int ret;
4423         TDB_DATA outdata;
4424         int32_t res;
4425
4426         ret = ctdb_control(ctdb, destnode, 0, 
4427                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4428                            mem_ctx, &outdata, &res, &timeout, NULL);
4429         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4431                 return -1;
4432         }
4433
4434         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4435         talloc_free(outdata.dptr);
4436                     
4437         return 0;
4438 }
4439
4440 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4441 {
4442         if (h == NULL) {
4443                 return NULL;
4444         }
4445
4446         return &h->header;
4447 }
4448
4449
4450 struct ctdb_client_control_state *
4451 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4452 {
4453         struct ctdb_client_control_state *handle;
4454         struct ctdb_marshall_buffer *m;
4455         struct ctdb_rec_data *rec;
4456         TDB_DATA outdata;
4457
4458         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4459         if (m == NULL) {
4460                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4461                 return NULL;
4462         }
4463
4464         m->db_id = ctdb_db->db_id;
4465
4466         rec = ctdb_marshall_record(m, 0, key, header, data);
4467         if (rec == NULL) {
4468                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4469                 talloc_free(m);
4470                 return NULL;
4471         }
4472         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4473         if (m == NULL) {
4474                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4475                 talloc_free(m);
4476                 return NULL;
4477         }
4478         m->count++;
4479         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4480
4481
4482         outdata.dptr = (uint8_t *)m;
4483         outdata.dsize = talloc_get_size(m);
4484
4485         handle = ctdb_control_send(ctdb, destnode, 0, 
4486                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4487                            mem_ctx, &timeout, NULL);
4488         talloc_free(m);
4489         return handle;
4490 }
4491
4492 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4493 {
4494         int ret;
4495         int32_t res;
4496
4497         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4498         if ( (ret != 0) || (res != 0) ){
4499                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4500                 return -1;
4501         }
4502
4503         return 0;
4504 }
4505
4506 int
4507 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4508 {
4509         struct ctdb_client_control_state *state;
4510
4511         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4512         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4513 }
4514
4515
4516
4517
4518
4519
4520 /*
4521   set a database to be readonly
4522  */
4523 struct ctdb_client_control_state *
4524 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4525 {
4526         TDB_DATA data;
4527
4528         data.dptr = (uint8_t *)&dbid;
4529         data.dsize = sizeof(dbid);
4530
4531         return ctdb_control_send(ctdb, destnode, 0, 
4532                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4533                            ctdb, NULL, NULL);
4534 }
4535
4536 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4537 {
4538         int ret;
4539         int32_t res;
4540
4541         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4542         if (ret != 0 || res != 0) {
4543           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4544                 return -1;
4545         }
4546
4547         return 0;
4548 }
4549
4550 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4551 {
4552         struct ctdb_client_control_state *state;
4553
4554         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4555         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4556 }
4557
4558 /*
4559   set a database to be sticky
4560  */
4561 struct ctdb_client_control_state *
4562 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4563 {
4564         TDB_DATA data;
4565
4566         data.dptr = (uint8_t *)&dbid;
4567         data.dsize = sizeof(dbid);
4568
4569         return ctdb_control_send(ctdb, destnode, 0, 
4570                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4571                            ctdb, NULL, NULL);
4572 }
4573
4574 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4575 {
4576         int ret;
4577         int32_t res;
4578
4579         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4580         if (ret != 0 || res != 0) {
4581                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4582                 return -1;
4583         }
4584
4585         return 0;
4586 }
4587
4588 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4589 {
4590         struct ctdb_client_control_state *state;
4591
4592         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4593         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4594 }