client: Remove unused function list_of_active_nodes_except_pnn()
[ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/locale.h"
28 #include <stdlib.h>
29 #include "../include/ctdb_private.h"
30 #include "lib/util/dlinklist.h"
31
32 pid_t ctdbd_pid;
33
34 /*
35   allocate a packet for use in client<->daemon communication
36  */
37 struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
38                                             TALLOC_CTX *mem_ctx, 
39                                             enum ctdb_operation operation, 
40                                             size_t length, size_t slength,
41                                             const char *type)
42 {
43         int size;
44         struct ctdb_req_header *hdr;
45
46         length = MAX(length, slength);
47         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
48
49         hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
50         if (hdr == NULL) {
51                 DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
52                          operation, (unsigned)length));
53                 return NULL;
54         }
55         talloc_set_name_const(hdr, type);
56         hdr->length       = length;
57         hdr->operation    = operation;
58         hdr->ctdb_magic   = CTDB_MAGIC;
59         hdr->ctdb_version = CTDB_VERSION;
60         hdr->srcnode      = ctdb->pnn;
61         if (ctdb->vnn_map) {
62                 hdr->generation = ctdb->vnn_map->generation;
63         }
64
65         return hdr;
66 }
67
68 /*
69   local version of ctdb_call
70 */
71 int ctdb_call_local(struct ctdb_db_context *ctdb_db, struct ctdb_call *call,
72                     struct ctdb_ltdb_header *header, TALLOC_CTX *mem_ctx,
73                     TDB_DATA *data, bool updatetdb)
74 {
75         struct ctdb_call_info *c;
76         struct ctdb_registered_call *fn;
77         struct ctdb_context *ctdb = ctdb_db->ctdb;
78         
79         c = talloc(ctdb, struct ctdb_call_info);
80         CTDB_NO_MEMORY(ctdb, c);
81
82         c->key = call->key;
83         c->call_data = &call->call_data;
84         c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
85         c->record_data.dsize = data->dsize;
86         CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
87         c->new_data = NULL;
88         c->reply_data = NULL;
89         c->status = 0;
90         c->header = header;
91
92         for (fn=ctdb_db->calls;fn;fn=fn->next) {
93                 if (fn->id == call->call_id) break;
94         }
95         if (fn == NULL) {
96                 ctdb_set_error(ctdb, "Unknown call id %u\n", call->call_id);
97                 talloc_free(c);
98                 return -1;
99         }
100
101         if (fn->fn(c) != 0) {
102                 ctdb_set_error(ctdb, "ctdb_call %u failed\n", call->call_id);
103                 talloc_free(c);
104                 return -1;
105         }
106
107         /* we need to force the record to be written out if this was a remote access */
108         if (c->new_data == NULL) {
109                 c->new_data = &c->record_data;
110         }
111
112         if (c->new_data && updatetdb) {
113                 /* XXX check that we always have the lock here? */
114                 if (ctdb_ltdb_store(ctdb_db, call->key, header, *c->new_data) != 0) {
115                         ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
116                         talloc_free(c);
117                         return -1;
118                 }
119         }
120
121         if (c->reply_data) {
122                 call->reply_data = *c->reply_data;
123
124                 talloc_steal(call, call->reply_data.dptr);
125                 talloc_set_name_const(call->reply_data.dptr, __location__);
126         } else {
127                 call->reply_data.dptr = NULL;
128                 call->reply_data.dsize = 0;
129         }
130         call->status = c->status;
131
132         talloc_free(c);
133
134         return 0;
135 }
136
137
138 /*
139   queue a packet for sending from client to daemon
140 */
141 static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
142 {
143         return ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)hdr, hdr->length);
144 }
145
146
147 /*
148   called when a CTDB_REPLY_CALL packet comes in in the client
149
150   This packet comes in response to a CTDB_REQ_CALL request packet. It
151   contains any reply data from the call
152 */
153 static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
154 {
155         struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
156         struct ctdb_client_call_state *state;
157
158         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_call_state);
159         if (state == NULL) {
160                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
161                 return;
162         }
163
164         if (hdr->reqid != state->reqid) {
165                 /* we found a record  but it was the wrong one */
166                 DEBUG(DEBUG_ERR, ("Dropped client call reply with reqid:%u\n",hdr->reqid));
167                 return;
168         }
169
170         state->call->reply_data.dptr = c->data;
171         state->call->reply_data.dsize = c->datalen;
172         state->call->status = c->status;
173
174         talloc_steal(state, c);
175
176         state->state = CTDB_CALL_DONE;
177
178         if (state->async.fn) {
179                 state->async.fn(state);
180         }
181 }
182
183 static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
184
185 /*
186   this is called in the client, when data comes in from the daemon
187  */
188 void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(args, struct ctdb_context);
191         struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
192         TALLOC_CTX *tmp_ctx;
193
194         /* place the packet as a child of a tmp_ctx. We then use
195            talloc_free() below to free it. If any of the calls want
196            to keep it, then they will steal it somewhere else, and the
197            talloc_free() will be a no-op */
198         tmp_ctx = talloc_new(ctdb);
199         talloc_steal(tmp_ctx, hdr);
200
201         if (cnt == 0) {
202                 DEBUG(DEBUG_CRIT,("Daemon has exited - shutting down client\n"));
203                 exit(1);
204         }
205
206         if (cnt < sizeof(*hdr)) {
207                 DEBUG(DEBUG_CRIT,("Bad packet length %u in client\n", (unsigned)cnt));
208                 goto done;
209         }
210         if (cnt != hdr->length) {
211                 ctdb_set_error(ctdb, "Bad header length %u expected %u in client\n", 
212                                (unsigned)hdr->length, (unsigned)cnt);
213                 goto done;
214         }
215
216         if (hdr->ctdb_magic != CTDB_MAGIC) {
217                 ctdb_set_error(ctdb, "Non CTDB packet rejected in client\n");
218                 goto done;
219         }
220
221         if (hdr->ctdb_version != CTDB_VERSION) {
222                 ctdb_set_error(ctdb, "Bad CTDB version 0x%x rejected in client\n", hdr->ctdb_version);
223                 goto done;
224         }
225
226         switch (hdr->operation) {
227         case CTDB_REPLY_CALL:
228                 ctdb_client_reply_call(ctdb, hdr);
229                 break;
230
231         case CTDB_REQ_MESSAGE:
232                 ctdb_request_message(ctdb, hdr);
233                 break;
234
235         case CTDB_REPLY_CONTROL:
236                 ctdb_client_reply_control(ctdb, hdr);
237                 break;
238
239         default:
240                 DEBUG(DEBUG_CRIT,("bogus operation code:%u\n",hdr->operation));
241         }
242
243 done:
244         talloc_free(tmp_ctx);
245 }
246
247 /*
248   connect to a unix domain socket
249 */
250 int ctdb_socket_connect(struct ctdb_context *ctdb)
251 {
252         struct sockaddr_un addr;
253
254         memset(&addr, 0, sizeof(addr));
255         addr.sun_family = AF_UNIX;
256         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
257
258         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
259         if (ctdb->daemon.sd == -1) {
260                 DEBUG(DEBUG_ERR,(__location__ " Failed to open client socket. Errno:%s(%d)\n", strerror(errno), errno));
261                 return -1;
262         }
263
264         if (connect(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
265                 close(ctdb->daemon.sd);
266                 ctdb->daemon.sd = -1;
267                 DEBUG(DEBUG_ERR,(__location__ " Failed to connect client socket to daemon. Errno:%s(%d)\n", strerror(errno), errno));
268                 return -1;
269         }
270
271         set_nonblocking(ctdb->daemon.sd);
272         set_close_on_exec(ctdb->daemon.sd);
273         
274         ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd, 
275                                               CTDB_DS_ALIGNMENT, 
276                                               ctdb_client_read_cb, ctdb, "to-ctdbd");
277         return 0;
278 }
279
280
281 struct ctdb_record_handle {
282         struct ctdb_db_context *ctdb_db;
283         TDB_DATA key;
284         TDB_DATA *data;
285         struct ctdb_ltdb_header header;
286 };
287
288
289 /*
290   make a recv call to the local ctdb daemon - called from client context
291
292   This is called when the program wants to wait for a ctdb_call to complete and get the 
293   results. This call will block unless the call has already completed.
294 */
295 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
296 {
297         if (state == NULL) {
298                 return -1;
299         }
300
301         while (state->state < CTDB_CALL_DONE) {
302                 event_loop_once(state->ctdb_db->ctdb->ev);
303         }
304         if (state->state != CTDB_CALL_DONE) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
306                 talloc_free(state);
307                 return -1;
308         }
309
310         if (state->call->reply_data.dsize) {
311                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
312                                                       state->call->reply_data.dptr,
313                                                       state->call->reply_data.dsize);
314                 call->reply_data.dsize = state->call->reply_data.dsize;
315         } else {
316                 call->reply_data.dptr = NULL;
317                 call->reply_data.dsize = 0;
318         }
319         call->status = state->call->status;
320         talloc_free(state);
321
322         return call->status;
323 }
324
325
326
327
328 /*
329   destroy a ctdb_call in client
330 */
331 static int ctdb_client_call_destructor(struct ctdb_client_call_state *state)    
332 {
333         ctdb_reqid_remove(state->ctdb_db->ctdb, state->reqid);
334         return 0;
335 }
336
337 /*
338   construct an event driven local ctdb_call
339
340   this is used so that locally processed ctdb_call requests are processed
341   in an event driven manner
342 */
343 static struct ctdb_client_call_state *ctdb_client_call_local_send(struct ctdb_db_context *ctdb_db, 
344                                                                   struct ctdb_call *call,
345                                                                   struct ctdb_ltdb_header *header,
346                                                                   TDB_DATA *data)
347 {
348         struct ctdb_client_call_state *state;
349         struct ctdb_context *ctdb = ctdb_db->ctdb;
350         int ret;
351
352         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
353         CTDB_NO_MEMORY_NULL(ctdb, state);
354         state->call = talloc_zero(state, struct ctdb_call);
355         CTDB_NO_MEMORY_NULL(ctdb, state->call);
356
357         talloc_steal(state, data->dptr);
358
359         state->state   = CTDB_CALL_DONE;
360         *(state->call) = *call;
361         state->ctdb_db = ctdb_db;
362
363         ret = ctdb_call_local(ctdb_db, state->call, header, state, data, true);
364         if (ret != 0) {
365                 DEBUG(DEBUG_DEBUG,("ctdb_call_local() failed, ignoring return code %d\n", ret));
366         }
367
368         return state;
369 }
370
371 /*
372   make a ctdb call to the local daemon - async send. Called from client context.
373
374   This constructs a ctdb_call request and queues it for processing. 
375   This call never blocks.
376 */
377 struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db, 
378                                               struct ctdb_call *call)
379 {
380         struct ctdb_client_call_state *state;
381         struct ctdb_context *ctdb = ctdb_db->ctdb;
382         struct ctdb_ltdb_header header;
383         TDB_DATA data;
384         int ret;
385         size_t len;
386         struct ctdb_req_call *c;
387
388         /* if the domain socket is not yet open, open it */
389         if (ctdb->daemon.sd==-1) {
390                 ctdb_socket_connect(ctdb);
391         }
392
393         ret = ctdb_ltdb_lock(ctdb_db, call->key);
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Failed to get chainlock\n"));
396                 return NULL;
397         }
398
399         ret = ctdb_ltdb_fetch(ctdb_db, call->key, &header, ctdb_db, &data);
400
401         if ((call->flags & CTDB_IMMEDIATE_MIGRATION) && (header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
402                 ret = -1;
403         }
404
405         if (ret == 0 && header.dmaster == ctdb->pnn) {
406                 state = ctdb_client_call_local_send(ctdb_db, call, &header, &data);
407                 talloc_free(data.dptr);
408                 ctdb_ltdb_unlock(ctdb_db, call->key);
409                 return state;
410         }
411
412         ctdb_ltdb_unlock(ctdb_db, call->key);
413         talloc_free(data.dptr);
414
415         state = talloc_zero(ctdb_db, struct ctdb_client_call_state);
416         if (state == NULL) {
417                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state\n"));
418                 return NULL;
419         }
420         state->call = talloc_zero(state, struct ctdb_call);
421         if (state->call == NULL) {
422                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate state->call\n"));
423                 return NULL;
424         }
425
426         len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
427         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CALL, len, struct ctdb_req_call);
428         if (c == NULL) {
429                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate packet\n"));
430                 return NULL;
431         }
432
433         state->reqid     = ctdb_reqid_new(ctdb, state);
434         state->ctdb_db = ctdb_db;
435         talloc_set_destructor(state, ctdb_client_call_destructor);
436
437         c->hdr.reqid     = state->reqid;
438         c->flags         = call->flags;
439         c->db_id         = ctdb_db->db_id;
440         c->callid        = call->call_id;
441         c->hopcount      = 0;
442         c->keylen        = call->key.dsize;
443         c->calldatalen   = call->call_data.dsize;
444         memcpy(&c->data[0], call->key.dptr, call->key.dsize);
445         memcpy(&c->data[call->key.dsize], 
446                call->call_data.dptr, call->call_data.dsize);
447         *(state->call)              = *call;
448         state->call->call_data.dptr = &c->data[call->key.dsize];
449         state->call->key.dptr       = &c->data[0];
450
451         state->state  = CTDB_CALL_WAIT;
452
453
454         ctdb_client_queue_pkt(ctdb, &c->hdr);
455
456         return state;
457 }
458
459
460 /*
461   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
462 */
463 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
464 {
465         struct ctdb_client_call_state *state;
466
467         state = ctdb_call_send(ctdb_db, call);
468         return ctdb_call_recv(state, call);
469 }
470
471
472 /*
473   tell the daemon what messaging srvid we will use, and register the message
474   handler function in the client
475 */
476 int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
477                              ctdb_msg_fn_t handler,
478                              void *private_data)
479                                     
480 {
481         int res;
482         int32_t status;
483         
484         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_REGISTER_SRVID, 0, 
485                            tdb_null, NULL, NULL, &status, NULL, NULL);
486         if (res != 0 || status != 0) {
487                 DEBUG(DEBUG_ERR,("Failed to register srvid %llu\n", (unsigned long long)srvid));
488                 return -1;
489         }
490
491         /* also need to register the handler with our own ctdb structure */
492         return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
493 }
494
495 /*
496   tell the daemon we no longer want a srvid
497 */
498 int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
499 {
500         int res;
501         int32_t status;
502         
503         res = ctdb_control(ctdb, CTDB_CURRENT_NODE, srvid, CTDB_CONTROL_DEREGISTER_SRVID, 0, 
504                            tdb_null, NULL, NULL, &status, NULL, NULL);
505         if (res != 0 || status != 0) {
506                 DEBUG(DEBUG_ERR,("Failed to deregister srvid %llu\n", (unsigned long long)srvid));
507                 return -1;
508         }
509
510         /* also need to register the handler with our own ctdb structure */
511         ctdb_deregister_message_handler(ctdb, srvid, private_data);
512         return 0;
513 }
514
515
516 /*
517   send a message - from client context
518  */
519 int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t pnn,
520                       uint64_t srvid, TDB_DATA data)
521 {
522         struct ctdb_req_message *r;
523         int len, res;
524
525         len = offsetof(struct ctdb_req_message, data) + data.dsize;
526         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
527                                len, struct ctdb_req_message);
528         CTDB_NO_MEMORY(ctdb, r);
529
530         r->hdr.destnode  = pnn;
531         r->srvid         = srvid;
532         r->datalen       = data.dsize;
533         memcpy(&r->data[0], data.dptr, data.dsize);
534         
535         res = ctdb_client_queue_pkt(ctdb, &r->hdr);
536         talloc_free(r);
537         return res;
538 }
539
540
541 /*
542   cancel a ctdb_fetch_lock operation, releasing the lock
543  */
544 static int fetch_lock_destructor(struct ctdb_record_handle *h)
545 {
546         ctdb_ltdb_unlock(h->ctdb_db, h->key);
547         return 0;
548 }
549
550 /*
551   force the migration of a record to this node
552  */
553 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
554 {
555         struct ctdb_call call;
556         ZERO_STRUCT(call);
557         call.call_id = CTDB_NULL_FUNC;
558         call.key = key;
559         call.flags = CTDB_IMMEDIATE_MIGRATION;
560         return ctdb_call(ctdb_db, &call);
561 }
562
563 /*
564   try to fetch a readonly copy of a record
565  */
566 static int
567 ctdb_client_fetch_readonly(struct ctdb_db_context *ctdb_db, TDB_DATA key, TALLOC_CTX *mem_ctx, struct ctdb_ltdb_header **hdr, TDB_DATA *data)
568 {
569         int ret;
570
571         struct ctdb_call call;
572         ZERO_STRUCT(call);
573
574         call.call_id = CTDB_FETCH_WITH_HEADER_FUNC;
575         call.call_data.dptr = NULL;
576         call.call_data.dsize = 0;
577         call.key = key;
578         call.flags = CTDB_WANT_READONLY;
579         ret = ctdb_call(ctdb_db, &call);
580
581         if (ret != 0) {
582                 return -1;
583         }
584         if (call.reply_data.dsize < sizeof(struct ctdb_ltdb_header)) {
585                 return -1;
586         }
587
588         *hdr = talloc_memdup(mem_ctx, &call.reply_data.dptr[0], sizeof(struct ctdb_ltdb_header));
589         if (*hdr == NULL) {
590                 talloc_free(call.reply_data.dptr);
591                 return -1;
592         }
593
594         data->dsize = call.reply_data.dsize - sizeof(struct ctdb_ltdb_header);
595         data->dptr  = talloc_memdup(mem_ctx, &call.reply_data.dptr[sizeof(struct ctdb_ltdb_header)], data->dsize);
596         if (data->dptr == NULL) {
597                 talloc_free(call.reply_data.dptr);
598                 talloc_free(hdr);
599                 return -1;
600         }
601
602         return 0;
603 }
604
605 /*
606   get a lock on a record, and return the records data. Blocks until it gets the lock
607  */
608 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
609                                            TDB_DATA key, TDB_DATA *data)
610 {
611         int ret;
612         struct ctdb_record_handle *h;
613
614         /*
615           procedure is as follows:
616
617           1) get the chain lock. 
618           2) check if we are dmaster
619           3) if we are the dmaster then return handle 
620           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
621              reply from ctdbd
622           5) when we get the reply, goto (1)
623          */
624
625         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
626         if (h == NULL) {
627                 return NULL;
628         }
629
630         h->ctdb_db = ctdb_db;
631         h->key     = key;
632         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
633         if (h->key.dptr == NULL) {
634                 talloc_free(h);
635                 return NULL;
636         }
637         h->data    = data;
638
639         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
640                  (const char *)key.dptr));
641
642 again:
643         /* step 1 - get the chain lock */
644         ret = ctdb_ltdb_lock(ctdb_db, key);
645         if (ret != 0) {
646                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
647                 talloc_free(h);
648                 return NULL;
649         }
650
651         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
652
653         talloc_set_destructor(h, fetch_lock_destructor);
654
655         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
656
657         /* when torturing, ensure we test the remote path */
658         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
659             random() % 5 == 0) {
660                 h->header.dmaster = (uint32_t)-1;
661         }
662
663
664         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
665
666         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
667                 ctdb_ltdb_unlock(ctdb_db, key);
668                 ret = ctdb_client_force_migration(ctdb_db, key);
669                 if (ret != 0) {
670                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
671                         talloc_free(h);
672                         return NULL;
673                 }
674                 goto again;
675         }
676
677         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
678         return h;
679 }
680
681 /*
682   get a readonly lock on a record, and return the records data. Blocks until it gets the lock
683  */
684 struct ctdb_record_handle *
685 ctdb_fetch_readonly_lock(
686         struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
687         TDB_DATA key, TDB_DATA *data,
688         int read_only)
689 {
690         int ret;
691         struct ctdb_record_handle *h;
692         struct ctdb_ltdb_header *roheader = NULL;
693
694         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
695         if (h == NULL) {
696                 return NULL;
697         }
698
699         h->ctdb_db = ctdb_db;
700         h->key     = key;
701         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
702         if (h->key.dptr == NULL) {
703                 talloc_free(h);
704                 return NULL;
705         }
706         h->data    = data;
707
708         data->dptr = NULL;
709         data->dsize = 0;
710
711
712 again:
713         talloc_free(roheader);
714         roheader = NULL;
715
716         talloc_free(data->dptr);
717         data->dptr = NULL;
718         data->dsize = 0;
719
720         /* Lock the record/chain */
721         ret = ctdb_ltdb_lock(ctdb_db, key);
722         if (ret != 0) {
723                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
724                 talloc_free(h);
725                 return NULL;
726         }
727
728         talloc_set_destructor(h, fetch_lock_destructor);
729
730         /* Check if record exists yet in the TDB */
731         ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
732         if (ret != 0) {
733                 ctdb_ltdb_unlock(ctdb_db, key);
734                 ret = ctdb_client_force_migration(ctdb_db, key);
735                 if (ret != 0) {
736                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
737                         talloc_free(h);
738                         return NULL;
739                 }
740                 goto again;
741         }
742
743         /* if this is a request for read/write and we have delegations
744            we have to revoke all delegations first
745         */
746         if ((read_only == 0) 
747         &&  (h->header.dmaster == ctdb_db->ctdb->pnn)
748         &&  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
749                 ctdb_ltdb_unlock(ctdb_db, key);
750                 ret = ctdb_client_force_migration(ctdb_db, key);
751                 if (ret != 0) {
752                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
753                         talloc_free(h);
754                         return NULL;
755                 }
756                 goto again;
757         }
758
759         /* if we are dmaster, just return the handle */
760         if (h->header.dmaster == ctdb_db->ctdb->pnn) {
761                 return h;
762         }
763
764         if (read_only != 0) {
765                 TDB_DATA rodata = {NULL, 0};
766
767                 if ((h->header.flags & CTDB_REC_RO_HAVE_READONLY)
768                 ||  (h->header.flags & CTDB_REC_RO_HAVE_DELEGATIONS)) {
769                         return h;
770                 }
771
772                 ctdb_ltdb_unlock(ctdb_db, key);
773                 ret = ctdb_client_fetch_readonly(ctdb_db, key, h, &roheader, &rodata);
774                 if (ret != 0) {
775                         DEBUG(DEBUG_ERR,("ctdb_fetch_readonly_lock:  failed. force migration and try again\n"));
776                         ret = ctdb_client_force_migration(ctdb_db, key);
777                         if (ret != 0) {
778                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
779                                 talloc_free(h);
780                                 return NULL;
781                         }
782
783                         goto again;
784                 }
785
786                 if (!(roheader->flags&CTDB_REC_RO_HAVE_READONLY)) {
787                         ret = ctdb_client_force_migration(ctdb_db, key);
788                         if (ret != 0) {
789                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
790                                 talloc_free(h);
791                                 return NULL;
792                         }
793
794                         goto again;
795                 }
796
797                 ret = ctdb_ltdb_lock(ctdb_db, key);
798                 if (ret != 0) {
799                         DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
800                         talloc_free(h);
801                         return NULL;
802                 }
803
804                 ret = ctdb_ltdb_fetch_with_header(ctdb_db, key, &h->header, h, data);
805                 if (ret != 0) {
806                         ctdb_ltdb_unlock(ctdb_db, key);
807
808                         ret = ctdb_client_force_migration(ctdb_db, key);
809                         if (ret != 0) {
810                                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_readonly_lock: force_migration failed\n"));
811                                 talloc_free(h);
812                                 return NULL;
813                         }
814
815                         goto again;
816                 }
817
818                 return h;
819         }
820
821         /* we are not dmaster and this was not a request for a readonly lock
822          * so unlock the record, migrate it and try again
823          */
824         ctdb_ltdb_unlock(ctdb_db, key);
825         ret = ctdb_client_force_migration(ctdb_db, key);
826         if (ret != 0) {
827                 DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
828                 talloc_free(h);
829                 return NULL;
830         }
831         goto again;
832 }
833
834 /*
835   store some data to the record that was locked with ctdb_fetch_lock()
836 */
837 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
838 {
839         if (h->ctdb_db->persistent) {
840                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
841                 return -1;
842         }
843
844         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
845 }
846
847 /*
848   non-locking fetch of a record
849  */
850 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
851                TDB_DATA key, TDB_DATA *data)
852 {
853         struct ctdb_call call;
854         int ret;
855
856         call.call_id = CTDB_FETCH_FUNC;
857         call.call_data.dptr = NULL;
858         call.call_data.dsize = 0;
859         call.key = key;
860
861         ret = ctdb_call(ctdb_db, &call);
862
863         if (ret == 0) {
864                 *data = call.reply_data;
865                 talloc_steal(mem_ctx, data->dptr);
866         }
867
868         return ret;
869 }
870
871
872
873 /*
874    called when a control completes or timesout to invoke the callback
875    function the user provided
876 */
877 static void invoke_control_callback(struct event_context *ev, struct timed_event *te, 
878         struct timeval t, void *private_data)
879 {
880         struct ctdb_client_control_state *state;
881         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
882         int ret;
883
884         state = talloc_get_type(private_data, struct ctdb_client_control_state);
885         talloc_steal(tmp_ctx, state);
886
887         ret = ctdb_control_recv(state->ctdb, state, state,
888                         NULL, 
889                         NULL, 
890                         NULL);
891         if (ret != 0) {
892                 DEBUG(DEBUG_DEBUG,("ctdb_control_recv() failed, ignoring return code %d\n", ret));
893         }
894
895         talloc_free(tmp_ctx);
896 }
897
898 /*
899   called when a CTDB_REPLY_CONTROL packet comes in in the client
900
901   This packet comes in response to a CTDB_REQ_CONTROL request packet. It
902   contains any reply data from the control
903 */
904 static void ctdb_client_reply_control(struct ctdb_context *ctdb, 
905                                       struct ctdb_req_header *hdr)
906 {
907         struct ctdb_reply_control *c = (struct ctdb_reply_control *)hdr;
908         struct ctdb_client_control_state *state;
909
910         state = ctdb_reqid_find(ctdb, hdr->reqid, struct ctdb_client_control_state);
911         if (state == NULL) {
912                 DEBUG(DEBUG_ERR,(__location__ " reqid %u not found\n", hdr->reqid));
913                 return;
914         }
915
916         if (hdr->reqid != state->reqid) {
917                 /* we found a record  but it was the wrong one */
918                 DEBUG(DEBUG_ERR, ("Dropped orphaned reply control with reqid:%u\n",hdr->reqid));
919                 return;
920         }
921
922         state->outdata.dptr = c->data;
923         state->outdata.dsize = c->datalen;
924         state->status = c->status;
925         if (c->errorlen) {
926                 state->errormsg = talloc_strndup(state, 
927                                                  (char *)&c->data[c->datalen], 
928                                                  c->errorlen);
929         }
930
931         /* state->outdata now uses resources from c so we dont want c
932            to just dissappear from under us while state is still alive
933         */
934         talloc_steal(state, c);
935
936         state->state = CTDB_CONTROL_DONE;
937
938         /* if we had a callback registered for this control, pull the response
939            and call the callback.
940         */
941         if (state->async.fn) {
942                 event_add_timed(ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
943         }
944 }
945
946
947 /*
948   destroy a ctdb_control in client
949 */
950 static int ctdb_client_control_destructor(struct ctdb_client_control_state *state)
951 {
952         ctdb_reqid_remove(state->ctdb, state->reqid);
953         return 0;
954 }
955
956
957 /* time out handler for ctdb_control */
958 static void control_timeout_func(struct event_context *ev, struct timed_event *te, 
959         struct timeval t, void *private_data)
960 {
961         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
962
963         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
964                          "dstnode:%u\n", state->reqid, state->c->opcode,
965                          state->c->hdr.destnode));
966
967         state->state = CTDB_CONTROL_TIMEOUT;
968
969         /* if we had a callback registered for this control, pull the response
970            and call the callback.
971         */
972         if (state->async.fn) {
973                 event_add_timed(state->ctdb->ev, state, timeval_zero(), invoke_control_callback, state);
974         }
975 }
976
977 /* async version of send control request */
978 struct ctdb_client_control_state *ctdb_control_send(struct ctdb_context *ctdb, 
979                 uint32_t destnode, uint64_t srvid, 
980                 uint32_t opcode, uint32_t flags, TDB_DATA data, 
981                 TALLOC_CTX *mem_ctx,
982                 struct timeval *timeout,
983                 char **errormsg)
984 {
985         struct ctdb_client_control_state *state;
986         size_t len;
987         struct ctdb_req_control *c;
988         int ret;
989
990         if (errormsg) {
991                 *errormsg = NULL;
992         }
993
994         /* if the domain socket is not yet open, open it */
995         if (ctdb->daemon.sd==-1) {
996                 ctdb_socket_connect(ctdb);
997         }
998
999         state = talloc_zero(mem_ctx, struct ctdb_client_control_state);
1000         CTDB_NO_MEMORY_NULL(ctdb, state);
1001
1002         state->ctdb       = ctdb;
1003         state->reqid      = ctdb_reqid_new(ctdb, state);
1004         state->state      = CTDB_CONTROL_WAIT;
1005         state->errormsg   = NULL;
1006
1007         talloc_set_destructor(state, ctdb_client_control_destructor);
1008
1009         len = offsetof(struct ctdb_req_control, data) + data.dsize;
1010         c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL, 
1011                                len, struct ctdb_req_control);
1012         state->c            = c;        
1013         CTDB_NO_MEMORY_NULL(ctdb, c);
1014         c->hdr.reqid        = state->reqid;
1015         c->hdr.destnode     = destnode;
1016         c->opcode           = opcode;
1017         c->client_id        = 0;
1018         c->flags            = flags;
1019         c->srvid            = srvid;
1020         c->datalen          = data.dsize;
1021         if (data.dsize) {
1022                 memcpy(&c->data[0], data.dptr, data.dsize);
1023         }
1024
1025         /* timeout */
1026         if (timeout && !timeval_is_zero(timeout)) {
1027                 event_add_timed(ctdb->ev, state, *timeout, control_timeout_func, state);
1028         }
1029
1030         ret = ctdb_client_queue_pkt(ctdb, &(c->hdr));
1031         if (ret != 0) {
1032                 talloc_free(state);
1033                 return NULL;
1034         }
1035
1036         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1037                 talloc_free(state);
1038                 return NULL;
1039         }
1040
1041         return state;
1042 }
1043
1044
1045 /* async version of receive control reply */
1046 int ctdb_control_recv(struct ctdb_context *ctdb, 
1047                 struct ctdb_client_control_state *state, 
1048                 TALLOC_CTX *mem_ctx,
1049                 TDB_DATA *outdata, int32_t *status, char **errormsg)
1050 {
1051         TALLOC_CTX *tmp_ctx;
1052
1053         if (status != NULL) {
1054                 *status = -1;
1055         }
1056         if (errormsg != NULL) {
1057                 *errormsg = NULL;
1058         }
1059
1060         if (state == NULL) {
1061                 return -1;
1062         }
1063
1064         /* prevent double free of state */
1065         tmp_ctx = talloc_new(ctdb);
1066         talloc_steal(tmp_ctx, state);
1067
1068         /* loop one event at a time until we either timeout or the control
1069            completes.
1070         */
1071         while (state->state == CTDB_CONTROL_WAIT) {
1072                 event_loop_once(ctdb->ev);
1073         }
1074
1075         if (state->state != CTDB_CONTROL_DONE) {
1076                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control_recv failed\n"));
1077                 if (state->async.fn) {
1078                         state->async.fn(state);
1079                 }
1080                 talloc_free(tmp_ctx);
1081                 return -1;
1082         }
1083
1084         if (state->errormsg) {
1085                 DEBUG(DEBUG_ERR,("ctdb_control error: '%s'\n", state->errormsg));
1086                 if (errormsg) {
1087                         (*errormsg) = talloc_move(mem_ctx, &state->errormsg);
1088                 }
1089                 if (state->async.fn) {
1090                         state->async.fn(state);
1091                 }
1092                 talloc_free(tmp_ctx);
1093                 return -1;
1094         }
1095
1096         if (outdata) {
1097                 *outdata = state->outdata;
1098                 outdata->dptr = talloc_memdup(mem_ctx, outdata->dptr, outdata->dsize);
1099         }
1100
1101         if (status) {
1102                 *status = state->status;
1103         }
1104
1105         if (state->async.fn) {
1106                 state->async.fn(state);
1107         }
1108
1109         talloc_free(tmp_ctx);
1110         return 0;
1111 }
1112
1113
1114
1115 /*
1116   send a ctdb control message
1117   timeout specifies how long we should wait for a reply.
1118   if timeout is NULL we wait indefinitely
1119  */
1120 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
1121                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
1122                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
1123                  struct timeval *timeout,
1124                  char **errormsg)
1125 {
1126         struct ctdb_client_control_state *state;
1127
1128         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
1129                         flags, data, mem_ctx,
1130                         timeout, errormsg);
1131
1132         /* FIXME: Error conditions in ctdb_control_send return NULL without
1133          * setting errormsg.  So, there is no way to distinguish between sucess
1134          * and failure when CTDB_CTRL_FLAG_NOREPLY is set */
1135         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
1136                 if (status != NULL) {
1137                         *status = 0;
1138                 }
1139                 return 0;
1140         }
1141
1142         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
1143                         errormsg);
1144 }
1145
1146
1147
1148
1149 /*
1150   a process exists call. Returns 0 if process exists, -1 otherwise
1151  */
1152 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
1153 {
1154         int ret;
1155         TDB_DATA data;
1156         int32_t status;
1157
1158         data.dptr = (uint8_t*)&pid;
1159         data.dsize = sizeof(pid);
1160
1161         ret = ctdb_control(ctdb, destnode, 0, 
1162                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
1163                            NULL, NULL, &status, NULL, NULL);
1164         if (ret != 0) {
1165                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
1166                 return -1;
1167         }
1168
1169         return status;
1170 }
1171
1172 /*
1173   get remote statistics
1174  */
1175 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
1176 {
1177         int ret;
1178         TDB_DATA data;
1179         int32_t res;
1180
1181         ret = ctdb_control(ctdb, destnode, 0, 
1182                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
1183                            ctdb, &data, &res, NULL, NULL);
1184         if (ret != 0 || res != 0) {
1185                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
1186                 return -1;
1187         }
1188
1189         if (data.dsize != sizeof(struct ctdb_statistics)) {
1190                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
1191                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
1192                       return -1;
1193         }
1194
1195         *status = *(struct ctdb_statistics *)data.dptr;
1196         talloc_free(data.dptr);
1197                         
1198         return 0;
1199 }
1200
1201 /*
1202   shutdown a remote ctdb node
1203  */
1204 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1205 {
1206         struct ctdb_client_control_state *state;
1207
1208         state = ctdb_control_send(ctdb, destnode, 0, 
1209                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
1210                            NULL, &timeout, NULL);
1211         if (state == NULL) {
1212                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
1213                 return -1;
1214         }
1215
1216         return 0;
1217 }
1218
1219 /*
1220   get vnn map from a remote node
1221  */
1222 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
1223 {
1224         int ret;
1225         TDB_DATA outdata;
1226         int32_t res;
1227         struct ctdb_vnn_map_wire *map;
1228
1229         ret = ctdb_control(ctdb, destnode, 0, 
1230                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
1231                            mem_ctx, &outdata, &res, &timeout, NULL);
1232         if (ret != 0 || res != 0) {
1233                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
1234                 return -1;
1235         }
1236         
1237         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
1238         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
1239             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
1240                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
1241                 return -1;
1242         }
1243
1244         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
1245         CTDB_NO_MEMORY(ctdb, *vnnmap);
1246         (*vnnmap)->generation = map->generation;
1247         (*vnnmap)->size       = map->size;
1248         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
1249
1250         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
1251         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
1252         talloc_free(outdata.dptr);
1253                     
1254         return 0;
1255 }
1256
1257
1258 /*
1259   get the recovery mode of a remote node
1260  */
1261 struct ctdb_client_control_state *
1262 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
1263 {
1264         return ctdb_control_send(ctdb, destnode, 0, 
1265                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
1266                            mem_ctx, &timeout, NULL);
1267 }
1268
1269 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
1270 {
1271         int ret;
1272         int32_t res;
1273
1274         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1275         if (ret != 0) {
1276                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
1277                 return -1;
1278         }
1279
1280         if (recmode) {
1281                 *recmode = (uint32_t)res;
1282         }
1283
1284         return 0;
1285 }
1286
1287 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
1288 {
1289         struct ctdb_client_control_state *state;
1290
1291         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
1292         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
1293 }
1294
1295
1296
1297
1298 /*
1299   set the recovery mode of a remote node
1300  */
1301 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
1302 {
1303         int ret;
1304         TDB_DATA data;
1305         int32_t res;
1306
1307         data.dsize = sizeof(uint32_t);
1308         data.dptr = (unsigned char *)&recmode;
1309
1310         ret = ctdb_control(ctdb, destnode, 0, 
1311                            CTDB_CONTROL_SET_RECMODE, 0, data, 
1312                            NULL, NULL, &res, &timeout, NULL);
1313         if (ret != 0 || res != 0) {
1314                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
1315                 return -1;
1316         }
1317
1318         return 0;
1319 }
1320
1321
1322
1323 /*
1324   get the recovery master of a remote node
1325  */
1326 struct ctdb_client_control_state *
1327 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
1328                         struct timeval timeout, uint32_t destnode)
1329 {
1330         return ctdb_control_send(ctdb, destnode, 0, 
1331                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
1332                            mem_ctx, &timeout, NULL);
1333 }
1334
1335 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
1336 {
1337         int ret;
1338         int32_t res;
1339
1340         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1341         if (ret != 0) {
1342                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
1343                 return -1;
1344         }
1345
1346         if (recmaster) {
1347                 *recmaster = (uint32_t)res;
1348         }
1349
1350         return 0;
1351 }
1352
1353 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
1354 {
1355         struct ctdb_client_control_state *state;
1356
1357         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
1358         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
1359 }
1360
1361
1362 /*
1363   set the recovery master of a remote node
1364  */
1365 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
1366 {
1367         int ret;
1368         TDB_DATA data;
1369         int32_t res;
1370
1371         ZERO_STRUCT(data);
1372         data.dsize = sizeof(uint32_t);
1373         data.dptr = (unsigned char *)&recmaster;
1374
1375         ret = ctdb_control(ctdb, destnode, 0, 
1376                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
1377                            NULL, NULL, &res, &timeout, NULL);
1378         if (ret != 0 || res != 0) {
1379                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
1380                 return -1;
1381         }
1382
1383         return 0;
1384 }
1385
1386
1387 /*
1388   get a list of databases off a remote node
1389  */
1390 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1391                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
1392 {
1393         int ret;
1394         TDB_DATA outdata;
1395         int32_t res;
1396
1397         ret = ctdb_control(ctdb, destnode, 0, 
1398                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
1399                            mem_ctx, &outdata, &res, &timeout, NULL);
1400         if (ret != 0 || res != 0) {
1401                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
1402                 return -1;
1403         }
1404
1405         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1406         talloc_free(outdata.dptr);
1407                     
1408         return 0;
1409 }
1410
1411 /*
1412   get a list of nodes (vnn and flags ) from a remote node
1413  */
1414 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
1415                 struct timeval timeout, uint32_t destnode, 
1416                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1417 {
1418         int ret;
1419         TDB_DATA outdata;
1420         int32_t res;
1421
1422         ret = ctdb_control(ctdb, destnode, 0, 
1423                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
1424                            mem_ctx, &outdata, &res, &timeout, NULL);
1425         if (ret == 0 && res == -1 && outdata.dsize == 0) {
1426                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
1427                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
1428         }
1429         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
1431                 return -1;
1432         }
1433
1434         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1435         talloc_free(outdata.dptr);
1436                     
1437         return 0;
1438 }
1439
1440 /*
1441   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
1442  */
1443 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
1444                 struct timeval timeout, uint32_t destnode, 
1445                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
1446 {
1447         int ret, i, len;
1448         TDB_DATA outdata;
1449         struct ctdb_node_mapv4 *nodemapv4;
1450         int32_t res;
1451
1452         ret = ctdb_control(ctdb, destnode, 0, 
1453                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
1454                            mem_ctx, &outdata, &res, &timeout, NULL);
1455         if (ret != 0 || res != 0 || outdata.dsize == 0) {
1456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
1457                 return -1;
1458         }
1459
1460         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
1461
1462         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
1463         (*nodemap) = talloc_zero_size(mem_ctx, len);
1464         CTDB_NO_MEMORY(ctdb, (*nodemap));
1465
1466         (*nodemap)->num = nodemapv4->num;
1467         for (i=0; i<nodemapv4->num; i++) {
1468                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
1469                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
1470                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
1471                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
1472         }
1473                 
1474         talloc_free(outdata.dptr);
1475                     
1476         return 0;
1477 }
1478
1479 /*
1480   drop the transport, reload the nodes file and restart the transport
1481  */
1482 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
1483                     struct timeval timeout, uint32_t destnode)
1484 {
1485         int ret;
1486         int32_t res;
1487
1488         ret = ctdb_control(ctdb, destnode, 0, 
1489                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
1490                            NULL, NULL, &res, &timeout, NULL);
1491         if (ret != 0 || res != 0) {
1492                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
1493                 return -1;
1494         }
1495
1496         return 0;
1497 }
1498
1499
1500 /*
1501   set vnn map on a node
1502  */
1503 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1504                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
1505 {
1506         int ret;
1507         TDB_DATA data;
1508         int32_t res;
1509         struct ctdb_vnn_map_wire *map;
1510         size_t len;
1511
1512         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
1513         map = talloc_size(mem_ctx, len);
1514         CTDB_NO_MEMORY(ctdb, map);
1515
1516         map->generation = vnnmap->generation;
1517         map->size = vnnmap->size;
1518         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
1519         
1520         data.dsize = len;
1521         data.dptr  = (uint8_t *)map;
1522
1523         ret = ctdb_control(ctdb, destnode, 0, 
1524                            CTDB_CONTROL_SETVNNMAP, 0, data, 
1525                            NULL, NULL, &res, &timeout, NULL);
1526         if (ret != 0 || res != 0) {
1527                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
1528                 return -1;
1529         }
1530
1531         talloc_free(map);
1532
1533         return 0;
1534 }
1535
1536
1537 /*
1538   async send for pull database
1539  */
1540 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
1541         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
1542         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
1543 {
1544         TDB_DATA indata;
1545         struct ctdb_control_pulldb *pull;
1546         struct ctdb_client_control_state *state;
1547
1548         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
1549         CTDB_NO_MEMORY_NULL(ctdb, pull);
1550
1551         pull->db_id   = dbid;
1552         pull->lmaster = lmaster;
1553
1554         indata.dsize = sizeof(struct ctdb_control_pulldb);
1555         indata.dptr  = (unsigned char *)pull;
1556
1557         state = ctdb_control_send(ctdb, destnode, 0, 
1558                                   CTDB_CONTROL_PULL_DB, 0, indata, 
1559                                   mem_ctx, &timeout, NULL);
1560         talloc_free(pull);
1561
1562         return state;
1563 }
1564
1565 /*
1566   async recv for pull database
1567  */
1568 int ctdb_ctrl_pulldb_recv(
1569         struct ctdb_context *ctdb, 
1570         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
1571         TDB_DATA *outdata)
1572 {
1573         int ret;
1574         int32_t res;
1575
1576         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
1577         if ( (ret != 0) || (res != 0) ){
1578                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
1579                 return -1;
1580         }
1581
1582         return 0;
1583 }
1584
1585 /*
1586   pull all keys and records for a specific database on a node
1587  */
1588 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
1589                 uint32_t dbid, uint32_t lmaster, 
1590                 TALLOC_CTX *mem_ctx, struct timeval timeout,
1591                 TDB_DATA *outdata)
1592 {
1593         struct ctdb_client_control_state *state;
1594
1595         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
1596                                       timeout);
1597         
1598         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
1599 }
1600
1601
1602 /*
1603   change dmaster for all keys in the database to the new value
1604  */
1605 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1606                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
1607 {
1608         int ret;
1609         TDB_DATA indata;
1610         int32_t res;
1611
1612         indata.dsize = 2*sizeof(uint32_t);
1613         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
1614
1615         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
1616         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
1617
1618         ret = ctdb_control(ctdb, destnode, 0, 
1619                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
1620                            NULL, NULL, &res, &timeout, NULL);
1621         if (ret != 0 || res != 0) {
1622                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
1623                 return -1;
1624         }
1625
1626         return 0;
1627 }
1628
1629 /*
1630   ping a node, return number of clients connected
1631  */
1632 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
1633 {
1634         int ret;
1635         int32_t res;
1636
1637         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
1638                            tdb_null, NULL, NULL, &res, NULL, NULL);
1639         if (ret != 0) {
1640                 return -1;
1641         }
1642         return res;
1643 }
1644
1645 int ctdb_ctrl_get_runstate(struct ctdb_context *ctdb, 
1646                            struct timeval timeout, 
1647                            uint32_t destnode,
1648                            uint32_t *runstate)
1649 {
1650         TDB_DATA outdata;
1651         int32_t res;
1652         int ret;
1653
1654         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_RUNSTATE, 0,
1655                            tdb_null, ctdb, &outdata, &res, &timeout, NULL);
1656         if (ret != 0 || res != 0) {
1657                 DEBUG(DEBUG_ERR,("ctdb_control for get_runstate failed\n"));
1658                 return ret != 0 ? ret : res;
1659         }
1660
1661         if (outdata.dsize != sizeof(uint32_t)) {
1662                 DEBUG(DEBUG_ERR,("Invalid return data in get_runstate\n"));
1663                 talloc_free(outdata.dptr);
1664                 return -1;
1665         }
1666
1667         if (runstate != NULL) {
1668                 *runstate = *(uint32_t *)outdata.dptr;
1669         }
1670         talloc_free(outdata.dptr);
1671
1672         return 0;
1673 }
1674
1675 /*
1676   find the real path to a ltdb 
1677  */
1678 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1679                    const char **path)
1680 {
1681         int ret;
1682         int32_t res;
1683         TDB_DATA data;
1684
1685         data.dptr = (uint8_t *)&dbid;
1686         data.dsize = sizeof(dbid);
1687
1688         ret = ctdb_control(ctdb, destnode, 0, 
1689                            CTDB_CONTROL_GETDBPATH, 0, data, 
1690                            mem_ctx, &data, &res, &timeout, NULL);
1691         if (ret != 0 || res != 0) {
1692                 return -1;
1693         }
1694
1695         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1696         if ((*path) == NULL) {
1697                 return -1;
1698         }
1699
1700         talloc_free(data.dptr);
1701
1702         return 0;
1703 }
1704
1705 /*
1706   find the name of a db 
1707  */
1708 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
1709                    const char **name)
1710 {
1711         int ret;
1712         int32_t res;
1713         TDB_DATA data;
1714
1715         data.dptr = (uint8_t *)&dbid;
1716         data.dsize = sizeof(dbid);
1717
1718         ret = ctdb_control(ctdb, destnode, 0, 
1719                            CTDB_CONTROL_GET_DBNAME, 0, data, 
1720                            mem_ctx, &data, &res, &timeout, NULL);
1721         if (ret != 0 || res != 0) {
1722                 return -1;
1723         }
1724
1725         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1726         if ((*name) == NULL) {
1727                 return -1;
1728         }
1729
1730         talloc_free(data.dptr);
1731
1732         return 0;
1733 }
1734
1735 /*
1736   get the health status of a db
1737  */
1738 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
1739                           struct timeval timeout,
1740                           uint32_t destnode,
1741                           uint32_t dbid, TALLOC_CTX *mem_ctx,
1742                           const char **reason)
1743 {
1744         int ret;
1745         int32_t res;
1746         TDB_DATA data;
1747
1748         data.dptr = (uint8_t *)&dbid;
1749         data.dsize = sizeof(dbid);
1750
1751         ret = ctdb_control(ctdb, destnode, 0,
1752                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
1753                            mem_ctx, &data, &res, &timeout, NULL);
1754         if (ret != 0 || res != 0) {
1755                 return -1;
1756         }
1757
1758         if (data.dsize == 0) {
1759                 (*reason) = NULL;
1760                 return 0;
1761         }
1762
1763         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
1764         if ((*reason) == NULL) {
1765                 return -1;
1766         }
1767
1768         talloc_free(data.dptr);
1769
1770         return 0;
1771 }
1772
1773 /*
1774   create a database
1775  */
1776 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1777                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
1778 {
1779         int ret;
1780         int32_t res;
1781         TDB_DATA data;
1782         uint64_t tdb_flags = 0;
1783
1784         data.dptr = discard_const(name);
1785         data.dsize = strlen(name)+1;
1786
1787         /* Make sure that volatile databases use jenkins hash */
1788         if (!persistent) {
1789                 tdb_flags = TDB_INCOMPATIBLE_HASH;
1790         }
1791
1792         ret = ctdb_control(ctdb, destnode, tdb_flags,
1793                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
1794                            0, data, 
1795                            mem_ctx, &data, &res, &timeout, NULL);
1796
1797         if (ret != 0 || res != 0) {
1798                 return -1;
1799         }
1800
1801         return 0;
1802 }
1803
1804 /*
1805   get debug level on a node
1806  */
1807 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
1808 {
1809         int ret;
1810         int32_t res;
1811         TDB_DATA data;
1812
1813         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
1814                            ctdb, &data, &res, NULL, NULL);
1815         if (ret != 0 || res != 0) {
1816                 return -1;
1817         }
1818         if (data.dsize != sizeof(int32_t)) {
1819                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
1820                          (unsigned)data.dsize));
1821                 return -1;
1822         }
1823         *level = *(int32_t *)data.dptr;
1824         talloc_free(data.dptr);
1825         return 0;
1826 }
1827
1828 /*
1829   set debug level on a node
1830  */
1831 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
1832 {
1833         int ret;
1834         int32_t res;
1835         TDB_DATA data;
1836
1837         data.dptr = (uint8_t *)&level;
1838         data.dsize = sizeof(level);
1839
1840         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
1841                            NULL, NULL, &res, NULL, NULL);
1842         if (ret != 0 || res != 0) {
1843                 return -1;
1844         }
1845         return 0;
1846 }
1847
1848
1849 /*
1850   get a list of connected nodes
1851  */
1852 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
1853                                 struct timeval timeout,
1854                                 TALLOC_CTX *mem_ctx,
1855                                 uint32_t *num_nodes)
1856 {
1857         struct ctdb_node_map *map=NULL;
1858         int ret, i;
1859         uint32_t *nodes;
1860
1861         *num_nodes = 0;
1862
1863         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
1864         if (ret != 0) {
1865                 return NULL;
1866         }
1867
1868         nodes = talloc_array(mem_ctx, uint32_t, map->num);
1869         if (nodes == NULL) {
1870                 return NULL;
1871         }
1872
1873         for (i=0;i<map->num;i++) {
1874                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1875                         nodes[*num_nodes] = map->nodes[i].pnn;
1876                         (*num_nodes)++;
1877                 }
1878         }
1879
1880         return nodes;
1881 }
1882
1883
1884 /*
1885   reset remote status
1886  */
1887 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
1888 {
1889         int ret;
1890         int32_t res;
1891
1892         ret = ctdb_control(ctdb, destnode, 0, 
1893                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1894                            NULL, NULL, &res, NULL, NULL);
1895         if (ret != 0 || res != 0) {
1896                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1897                 return -1;
1898         }
1899         return 0;
1900 }
1901
1902 /*
1903   attach to a specific database - client call
1904 */
1905 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb,
1906                                     struct timeval timeout,
1907                                     const char *name,
1908                                     bool persistent,
1909                                     uint32_t tdb_flags)
1910 {
1911         struct ctdb_db_context *ctdb_db;
1912         TDB_DATA data;
1913         int ret;
1914         int32_t res;
1915
1916         ctdb_db = ctdb_db_handle(ctdb, name);
1917         if (ctdb_db) {
1918                 return ctdb_db;
1919         }
1920
1921         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1922         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1923
1924         ctdb_db->ctdb = ctdb;
1925         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1926         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1927
1928         data.dptr = discard_const(name);
1929         data.dsize = strlen(name)+1;
1930
1931         /* CTDB has switched to using jenkins hash for volatile databases.
1932          * Even if tdb_flags do not explicitly mention TDB_INCOMPATIBLE_HASH,
1933          * always set it.
1934          */
1935         if (!persistent) {
1936                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
1937         }
1938
1939         /* tell ctdb daemon to attach */
1940         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1941                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1942                            0, data, ctdb_db, &data, &res, NULL, NULL);
1943         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1944                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1945                 talloc_free(ctdb_db);
1946                 return NULL;
1947         }
1948         
1949         ctdb_db->db_id = *(uint32_t *)data.dptr;
1950         talloc_free(data.dptr);
1951
1952         ret = ctdb_ctrl_getdbpath(ctdb, timeout, CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1953         if (ret != 0) {
1954                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1955                 talloc_free(ctdb_db);
1956                 return NULL;
1957         }
1958
1959         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1960         if (ctdb->valgrinding) {
1961                 tdb_flags |= TDB_NOMMAP;
1962         }
1963         tdb_flags |= TDB_DISALLOW_NESTING;
1964
1965         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1966         if (ctdb_db->ltdb == NULL) {
1967                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1968                 talloc_free(ctdb_db);
1969                 return NULL;
1970         }
1971
1972         ctdb_db->persistent = persistent;
1973
1974         DLIST_ADD(ctdb->db_list, ctdb_db);
1975
1976         /* add well known functions */
1977         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1978         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1979         ctdb_set_call(ctdb_db, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1980
1981         return ctdb_db;
1982 }
1983
1984
1985 /*
1986   setup a call for a database
1987  */
1988 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1989 {
1990         struct ctdb_registered_call *call;
1991
1992 #if 0
1993         TDB_DATA data;
1994         int32_t status;
1995         struct ctdb_control_set_call c;
1996         int ret;
1997
1998         /* this is no longer valid with the separate daemon architecture */
1999         c.db_id = ctdb_db->db_id;
2000         c.fn    = fn;
2001         c.id    = id;
2002
2003         data.dptr = (uint8_t *)&c;
2004         data.dsize = sizeof(c);
2005
2006         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
2007                            data, NULL, NULL, &status, NULL, NULL);
2008         if (ret != 0 || status != 0) {
2009                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
2010                 return -1;
2011         }
2012 #endif
2013
2014         /* also register locally */
2015         call = talloc(ctdb_db, struct ctdb_registered_call);
2016         call->fn = fn;
2017         call->id = id;
2018
2019         DLIST_ADD(ctdb_db->calls, call);        
2020         return 0;
2021 }
2022
2023
2024 struct traverse_state {
2025         bool done;
2026         uint32_t count;
2027         ctdb_traverse_func fn;
2028         void *private_data;
2029         bool listemptyrecords;
2030 };
2031
2032 /*
2033   called on each key during a ctdb_traverse
2034  */
2035 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
2036 {
2037         struct traverse_state *state = (struct traverse_state *)p;
2038         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
2039         TDB_DATA key;
2040
2041         if (data.dsize < sizeof(uint32_t) ||
2042             d->length != data.dsize) {
2043                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
2044                 state->done = true;
2045                 return;
2046         }
2047
2048         key.dsize = d->keylen;
2049         key.dptr  = &d->data[0];
2050         data.dsize = d->datalen;
2051         data.dptr = &d->data[d->keylen];
2052
2053         if (key.dsize == 0 && data.dsize == 0) {
2054                 /* end of traverse */
2055                 state->done = true;
2056                 return;
2057         }
2058
2059         if (!state->listemptyrecords &&
2060             data.dsize == sizeof(struct ctdb_ltdb_header))
2061         {
2062                 /* empty records are deleted records in ctdb */
2063                 return;
2064         }
2065
2066         if (state->fn(ctdb, key, data, state->private_data) != 0) {
2067                 state->done = true;
2068         }
2069
2070         state->count++;
2071 }
2072
2073 /**
2074  * start a cluster wide traverse, calling the supplied fn on each record
2075  * return the number of records traversed, or -1 on error
2076  *
2077  * Extendet variant with a flag to signal whether empty records should
2078  * be listed.
2079  */
2080 static int ctdb_traverse_ext(struct ctdb_db_context *ctdb_db,
2081                              ctdb_traverse_func fn,
2082                              bool withemptyrecords,
2083                              void *private_data)
2084 {
2085         TDB_DATA data;
2086         struct ctdb_traverse_start_ext t;
2087         int32_t status;
2088         int ret;
2089         uint64_t srvid = (getpid() | 0xFLL<<60);
2090         struct traverse_state state;
2091
2092         state.done = false;
2093         state.count = 0;
2094         state.private_data = private_data;
2095         state.fn = fn;
2096         state.listemptyrecords = withemptyrecords;
2097
2098         ret = ctdb_client_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
2099         if (ret != 0) {
2100                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
2101                 return -1;
2102         }
2103
2104         t.db_id = ctdb_db->db_id;
2105         t.srvid = srvid;
2106         t.reqid = 0;
2107         t.withemptyrecords = withemptyrecords;
2108
2109         data.dptr = (uint8_t *)&t;
2110         data.dsize = sizeof(t);
2111
2112         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START_EXT, 0,
2113                            data, NULL, NULL, &status, NULL, NULL);
2114         if (ret != 0 || status != 0) {
2115                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
2116                 ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2117                 return -1;
2118         }
2119
2120         while (!state.done) {
2121                 event_loop_once(ctdb_db->ctdb->ev);
2122         }
2123
2124         ret = ctdb_client_remove_message_handler(ctdb_db->ctdb, srvid, &state);
2125         if (ret != 0) {
2126                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
2127                 return -1;
2128         }
2129
2130         return state.count;
2131 }
2132
2133 /**
2134  * start a cluster wide traverse, calling the supplied fn on each record
2135  * return the number of records traversed, or -1 on error
2136  *
2137  * Standard version which does not list the empty records:
2138  * These are considered deleted.
2139  */
2140 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
2141 {
2142         return ctdb_traverse_ext(ctdb_db, fn, false, private_data);
2143 }
2144
2145 #define ISASCII(x) (isprint(x) && !strchr("\"\\", (x)))
2146 /*
2147   called on each key during a catdb
2148  */
2149 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
2150 {
2151         int i;
2152         struct ctdb_dump_db_context *c = (struct ctdb_dump_db_context *)p;
2153         FILE *f = c->f;
2154         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
2155
2156         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
2157         for (i=0;i<key.dsize;i++) {
2158                 if (ISASCII(key.dptr[i])) {
2159                         fprintf(f, "%c", key.dptr[i]);
2160                 } else {
2161                         fprintf(f, "\\%02X", key.dptr[i]);
2162                 }
2163         }
2164         fprintf(f, "\"\n");
2165
2166         fprintf(f, "dmaster: %u\n", h->dmaster);
2167         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
2168
2169         if (c->printlmaster && ctdb->vnn_map != NULL) {
2170                 fprintf(f, "lmaster: %u\n", ctdb_lmaster(ctdb, &key));
2171         }
2172
2173         if (c->printhash) {
2174                 fprintf(f, "hash: 0x%08x\n", ctdb_hash(&key));
2175         }
2176
2177         if (c->printrecordflags) {
2178                 fprintf(f, "flags: 0x%08x", h->flags);
2179                 if (h->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) printf(" MIGRATED_WITH_DATA");
2180                 if (h->flags & CTDB_REC_FLAG_VACUUM_MIGRATED) printf(" VACUUM_MIGRATED");
2181                 if (h->flags & CTDB_REC_FLAG_AUTOMATIC) printf(" AUTOMATIC");
2182                 if (h->flags & CTDB_REC_RO_HAVE_DELEGATIONS) printf(" RO_HAVE_DELEGATIONS");
2183                 if (h->flags & CTDB_REC_RO_HAVE_READONLY) printf(" RO_HAVE_READONLY");
2184                 if (h->flags & CTDB_REC_RO_REVOKING_READONLY) printf(" RO_REVOKING_READONLY");
2185                 if (h->flags & CTDB_REC_RO_REVOKE_COMPLETE) printf(" RO_REVOKE_COMPLETE");
2186                 fprintf(f, "\n");
2187         }
2188
2189         if (c->printdatasize) {
2190                 fprintf(f, "data size: %u\n", (unsigned)data.dsize);
2191         } else {
2192                 fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
2193                 for (i=sizeof(*h);i<data.dsize;i++) {
2194                         if (ISASCII(data.dptr[i])) {
2195                                 fprintf(f, "%c", data.dptr[i]);
2196                         } else {
2197                                 fprintf(f, "\\%02X", data.dptr[i]);
2198                         }
2199                 }
2200                 fprintf(f, "\"\n");
2201         }
2202
2203         fprintf(f, "\n");
2204
2205         return 0;
2206 }
2207
2208 /*
2209   convenience function to list all keys to stdout
2210  */
2211 int ctdb_dump_db(struct ctdb_db_context *ctdb_db,
2212                  struct ctdb_dump_db_context *ctx)
2213 {
2214         return ctdb_traverse_ext(ctdb_db, ctdb_dumpdb_record,
2215                                  ctx->printemptyrecords, ctx);
2216 }
2217
2218 /*
2219   get the pid of a ctdb daemon
2220  */
2221 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
2222 {
2223         int ret;
2224         int32_t res;
2225
2226         ret = ctdb_control(ctdb, destnode, 0, 
2227                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
2228                            NULL, NULL, &res, &timeout, NULL);
2229         if (ret != 0) {
2230                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
2231                 return -1;
2232         }
2233
2234         *pid = res;
2235
2236         return 0;
2237 }
2238
2239
2240 /*
2241   async freeze send control
2242  */
2243 struct ctdb_client_control_state *
2244 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
2245 {
2246         return ctdb_control_send(ctdb, destnode, priority, 
2247                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
2248                            mem_ctx, &timeout, NULL);
2249 }
2250
2251 /* 
2252    async freeze recv control
2253 */
2254 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
2255 {
2256         int ret;
2257         int32_t res;
2258
2259         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
2260         if ( (ret != 0) || (res != 0) ){
2261                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
2262                 return -1;
2263         }
2264
2265         return 0;
2266 }
2267
2268 /*
2269   freeze databases of a certain priority
2270  */
2271 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2272 {
2273         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2274         struct ctdb_client_control_state *state;
2275         int ret;
2276
2277         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
2278         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
2279         talloc_free(tmp_ctx);
2280
2281         return ret;
2282 }
2283
2284 /* Freeze all databases */
2285 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2286 {
2287         int i;
2288
2289         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
2290                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
2291                         return -1;
2292                 }
2293         }
2294         return 0;
2295 }
2296
2297 /*
2298   thaw databases of a certain priority
2299  */
2300 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
2301 {
2302         int ret;
2303         int32_t res;
2304
2305         ret = ctdb_control(ctdb, destnode, priority, 
2306                            CTDB_CONTROL_THAW, 0, tdb_null, 
2307                            NULL, NULL, &res, &timeout, NULL);
2308         if (ret != 0 || res != 0) {
2309                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
2310                 return -1;
2311         }
2312
2313         return 0;
2314 }
2315
2316 /* thaw all databases */
2317 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2318 {
2319         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
2320 }
2321
2322 /*
2323   get pnn of a node, or -1
2324  */
2325 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2326 {
2327         int ret;
2328         int32_t res;
2329
2330         ret = ctdb_control(ctdb, destnode, 0, 
2331                            CTDB_CONTROL_GET_PNN, 0, tdb_null, 
2332                            NULL, NULL, &res, &timeout, NULL);
2333         if (ret != 0) {
2334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
2335                 return -1;
2336         }
2337
2338         return res;
2339 }
2340
2341 /*
2342   get the monitoring mode of a remote node
2343  */
2344 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
2345 {
2346         int ret;
2347         int32_t res;
2348
2349         ret = ctdb_control(ctdb, destnode, 0, 
2350                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
2351                            NULL, NULL, &res, &timeout, NULL);
2352         if (ret != 0) {
2353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
2354                 return -1;
2355         }
2356
2357         *monmode = res;
2358
2359         return 0;
2360 }
2361
2362
2363 /*
2364  set the monitoring mode of a remote node to active
2365  */
2366 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2367 {
2368         int ret;
2369         
2370
2371         ret = ctdb_control(ctdb, destnode, 0, 
2372                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
2373                            NULL, NULL,NULL, &timeout, NULL);
2374         if (ret != 0) {
2375                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
2376                 return -1;
2377         }
2378
2379         
2380
2381         return 0;
2382 }
2383
2384 /*
2385   set the monitoring mode of a remote node to disable
2386  */
2387 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2388 {
2389         int ret;
2390         
2391
2392         ret = ctdb_control(ctdb, destnode, 0, 
2393                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
2394                            NULL, NULL, NULL, &timeout, NULL);
2395         if (ret != 0) {
2396                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
2397                 return -1;
2398         }
2399
2400         
2401
2402         return 0;
2403 }
2404
2405
2406
2407 /* 
2408   sent to a node to make it take over an ip address
2409 */
2410 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2411                           uint32_t destnode, struct ctdb_public_ip *ip)
2412 {
2413         TDB_DATA data;
2414         struct ctdb_public_ipv4 ipv4;
2415         int ret;
2416         int32_t res;
2417
2418         if (ip->addr.sa.sa_family == AF_INET) {
2419                 ipv4.pnn = ip->pnn;
2420                 ipv4.sin = ip->addr.ip;
2421
2422                 data.dsize = sizeof(ipv4);
2423                 data.dptr  = (uint8_t *)&ipv4;
2424
2425                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
2426                            NULL, &res, &timeout, NULL);
2427         } else {
2428                 data.dsize = sizeof(*ip);
2429                 data.dptr  = (uint8_t *)ip;
2430
2431                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
2432                            NULL, &res, &timeout, NULL);
2433         }
2434
2435         if (ret != 0 || res != 0) {
2436                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
2437                 return -1;
2438         }
2439
2440         return 0;       
2441 }
2442
2443
2444 /* 
2445   sent to a node to make it release an ip address
2446 */
2447 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
2448                          uint32_t destnode, struct ctdb_public_ip *ip)
2449 {
2450         TDB_DATA data;
2451         struct ctdb_public_ipv4 ipv4;
2452         int ret;
2453         int32_t res;
2454
2455         if (ip->addr.sa.sa_family == AF_INET) {
2456                 ipv4.pnn = ip->pnn;
2457                 ipv4.sin = ip->addr.ip;
2458
2459                 data.dsize = sizeof(ipv4);
2460                 data.dptr  = (uint8_t *)&ipv4;
2461
2462                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
2463                                    NULL, &res, &timeout, NULL);
2464         } else {
2465                 data.dsize = sizeof(*ip);
2466                 data.dptr  = (uint8_t *)ip;
2467
2468                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
2469                                    NULL, &res, &timeout, NULL);
2470         }
2471
2472         if (ret != 0 || res != 0) {
2473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
2474                 return -1;
2475         }
2476
2477         return 0;       
2478 }
2479
2480
2481 /*
2482   get a tunable
2483  */
2484 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
2485                           struct timeval timeout, 
2486                           uint32_t destnode,
2487                           const char *name, uint32_t *value)
2488 {
2489         struct ctdb_control_get_tunable *t;
2490         TDB_DATA data, outdata;
2491         int32_t res;
2492         int ret;
2493
2494         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
2495         data.dptr  = talloc_size(ctdb, data.dsize);
2496         CTDB_NO_MEMORY(ctdb, data.dptr);
2497
2498         t = (struct ctdb_control_get_tunable *)data.dptr;
2499         t->length = strlen(name)+1;
2500         memcpy(t->name, name, t->length);
2501
2502         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
2503                            &outdata, &res, &timeout, NULL);
2504         talloc_free(data.dptr);
2505         if (ret != 0 || res != 0) {
2506                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
2507                 return ret != 0 ? ret : res;
2508         }
2509
2510         if (outdata.dsize != sizeof(uint32_t)) {
2511                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
2512                 talloc_free(outdata.dptr);
2513                 return -1;
2514         }
2515         
2516         *value = *(uint32_t *)outdata.dptr;
2517         talloc_free(outdata.dptr);
2518
2519         return 0;
2520 }
2521
2522 /*
2523   set a tunable
2524  */
2525 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
2526                           struct timeval timeout, 
2527                           uint32_t destnode,
2528                           const char *name, uint32_t value)
2529 {
2530         struct ctdb_control_set_tunable *t;
2531         TDB_DATA data;
2532         int32_t res;
2533         int ret;
2534
2535         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
2536         data.dptr  = talloc_size(ctdb, data.dsize);
2537         CTDB_NO_MEMORY(ctdb, data.dptr);
2538
2539         t = (struct ctdb_control_set_tunable *)data.dptr;
2540         t->length = strlen(name)+1;
2541         memcpy(t->name, name, t->length);
2542         t->value = value;
2543
2544         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
2545                            NULL, &res, &timeout, NULL);
2546         talloc_free(data.dptr);
2547         if (ret != 0 || res != 0) {
2548                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
2549                 return -1;
2550         }
2551
2552         return 0;
2553 }
2554
2555 /*
2556   list tunables
2557  */
2558 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
2559                             struct timeval timeout, 
2560                             uint32_t destnode,
2561                             TALLOC_CTX *mem_ctx,
2562                             const char ***list, uint32_t *count)
2563 {
2564         TDB_DATA outdata;
2565         int32_t res;
2566         int ret;
2567         struct ctdb_control_list_tunable *t;
2568         char *p, *s, *ptr;
2569
2570         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
2571                            mem_ctx, &outdata, &res, &timeout, NULL);
2572         if (ret != 0 || res != 0) {
2573                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
2574                 return -1;
2575         }
2576
2577         t = (struct ctdb_control_list_tunable *)outdata.dptr;
2578         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
2579             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
2580                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
2581                 talloc_free(outdata.dptr);
2582                 return -1;              
2583         }
2584         
2585         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
2586         CTDB_NO_MEMORY(ctdb, p);
2587
2588         talloc_free(outdata.dptr);
2589         
2590         (*list) = NULL;
2591         (*count) = 0;
2592
2593         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
2594                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
2595                 CTDB_NO_MEMORY(ctdb, *list);
2596                 (*list)[*count] = talloc_strdup(*list, s);
2597                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
2598                 (*count)++;
2599         }
2600
2601         talloc_free(p);
2602
2603         return 0;
2604 }
2605
2606
2607 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
2608                                    struct timeval timeout, uint32_t destnode,
2609                                    TALLOC_CTX *mem_ctx,
2610                                    uint32_t flags,
2611                                    struct ctdb_all_public_ips **ips)
2612 {
2613         int ret;
2614         TDB_DATA outdata;
2615         int32_t res;
2616
2617         ret = ctdb_control(ctdb, destnode, 0, 
2618                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
2619                            mem_ctx, &outdata, &res, &timeout, NULL);
2620         if (ret == 0 && res == -1) {
2621                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
2622                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
2623         }
2624         if (ret != 0 || res != 0) {
2625           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
2626                 return -1;
2627         }
2628
2629         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
2630         talloc_free(outdata.dptr);
2631                     
2632         return 0;
2633 }
2634
2635 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
2636                              struct timeval timeout, uint32_t destnode,
2637                              TALLOC_CTX *mem_ctx,
2638                              struct ctdb_all_public_ips **ips)
2639 {
2640         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
2641                                               destnode, mem_ctx,
2642                                               0, ips);
2643 }
2644
2645 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
2646                         struct timeval timeout, uint32_t destnode, 
2647                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
2648 {
2649         int ret, i, len;
2650         TDB_DATA outdata;
2651         int32_t res;
2652         struct ctdb_all_public_ipsv4 *ipsv4;
2653
2654         ret = ctdb_control(ctdb, destnode, 0, 
2655                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
2656                            mem_ctx, &outdata, &res, &timeout, NULL);
2657         if (ret != 0 || res != 0) {
2658                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
2659                 return -1;
2660         }
2661
2662         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
2663         len = offsetof(struct ctdb_all_public_ips, ips) +
2664                 ipsv4->num*sizeof(struct ctdb_public_ip);
2665         *ips = talloc_zero_size(mem_ctx, len);
2666         CTDB_NO_MEMORY(ctdb, *ips);
2667         (*ips)->num = ipsv4->num;
2668         for (i=0; i<ipsv4->num; i++) {
2669                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
2670                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
2671         }
2672
2673         talloc_free(outdata.dptr);
2674                     
2675         return 0;
2676 }
2677
2678 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
2679                                  struct timeval timeout, uint32_t destnode,
2680                                  TALLOC_CTX *mem_ctx,
2681                                  const ctdb_sock_addr *addr,
2682                                  struct ctdb_control_public_ip_info **_info)
2683 {
2684         int ret;
2685         TDB_DATA indata;
2686         TDB_DATA outdata;
2687         int32_t res;
2688         struct ctdb_control_public_ip_info *info;
2689         uint32_t len;
2690         uint32_t i;
2691
2692         indata.dptr = discard_const_p(uint8_t, addr);
2693         indata.dsize = sizeof(*addr);
2694
2695         ret = ctdb_control(ctdb, destnode, 0,
2696                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
2697                            mem_ctx, &outdata, &res, &timeout, NULL);
2698         if (ret != 0 || res != 0) {
2699                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2700                                 "failed ret:%d res:%d\n",
2701                                 ret, res));
2702                 return -1;
2703         }
2704
2705         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
2706         if (len > outdata.dsize) {
2707                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2708                                 "returned invalid data with size %u > %u\n",
2709                                 (unsigned int)outdata.dsize,
2710                                 (unsigned int)len));
2711                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2712                 return -1;
2713         }
2714
2715         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
2716         len += info->num*sizeof(struct ctdb_control_iface_info);
2717
2718         if (len > outdata.dsize) {
2719                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2720                                 "returned invalid data with size %u > %u\n",
2721                                 (unsigned int)outdata.dsize,
2722                                 (unsigned int)len));
2723                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2724                 return -1;
2725         }
2726
2727         /* make sure we null terminate the returned strings */
2728         for (i=0; i < info->num; i++) {
2729                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2730         }
2731
2732         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
2733                                                                 outdata.dptr,
2734                                                                 outdata.dsize);
2735         talloc_free(outdata.dptr);
2736         if (*_info == NULL) {
2737                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
2738                                 "talloc_memdup size %u failed\n",
2739                                 (unsigned int)outdata.dsize));
2740                 return -1;
2741         }
2742
2743         return 0;
2744 }
2745
2746 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
2747                          struct timeval timeout, uint32_t destnode,
2748                          TALLOC_CTX *mem_ctx,
2749                          struct ctdb_control_get_ifaces **_ifaces)
2750 {
2751         int ret;
2752         TDB_DATA outdata;
2753         int32_t res;
2754         struct ctdb_control_get_ifaces *ifaces;
2755         uint32_t len;
2756         uint32_t i;
2757
2758         ret = ctdb_control(ctdb, destnode, 0,
2759                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
2760                            mem_ctx, &outdata, &res, &timeout, NULL);
2761         if (ret != 0 || res != 0) {
2762                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2763                                 "failed ret:%d res:%d\n",
2764                                 ret, res));
2765                 return -1;
2766         }
2767
2768         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
2769         if (len > outdata.dsize) {
2770                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2771                                 "returned invalid data with size %u > %u\n",
2772                                 (unsigned int)outdata.dsize,
2773                                 (unsigned int)len));
2774                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2775                 return -1;
2776         }
2777
2778         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
2779         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
2780
2781         if (len > outdata.dsize) {
2782                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2783                                 "returned invalid data with size %u > %u\n",
2784                                 (unsigned int)outdata.dsize,
2785                                 (unsigned int)len));
2786                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
2787                 return -1;
2788         }
2789
2790         /* make sure we null terminate the returned strings */
2791         for (i=0; i < ifaces->num; i++) {
2792                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
2793         }
2794
2795         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
2796                                                                   outdata.dptr,
2797                                                                   outdata.dsize);
2798         talloc_free(outdata.dptr);
2799         if (*_ifaces == NULL) {
2800                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
2801                                 "talloc_memdup size %u failed\n",
2802                                 (unsigned int)outdata.dsize));
2803                 return -1;
2804         }
2805
2806         return 0;
2807 }
2808
2809 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
2810                              struct timeval timeout, uint32_t destnode,
2811                              TALLOC_CTX *mem_ctx,
2812                              const struct ctdb_control_iface_info *info)
2813 {
2814         int ret;
2815         TDB_DATA indata;
2816         int32_t res;
2817
2818         indata.dptr = discard_const_p(uint8_t, info);
2819         indata.dsize = sizeof(*info);
2820
2821         ret = ctdb_control(ctdb, destnode, 0,
2822                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
2823                            mem_ctx, NULL, &res, &timeout, NULL);
2824         if (ret != 0 || res != 0) {
2825                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
2826                                 "failed ret:%d res:%d\n",
2827                                 ret, res));
2828                 return -1;
2829         }
2830
2831         return 0;
2832 }
2833
2834 /*
2835   set/clear the permanent disabled bit on a remote node
2836  */
2837 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
2838                        uint32_t set, uint32_t clear)
2839 {
2840         int ret;
2841         TDB_DATA data;
2842         struct ctdb_node_map *nodemap=NULL;
2843         struct ctdb_node_flag_change c;
2844         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2845         uint32_t recmaster;
2846         uint32_t *nodes;
2847
2848
2849         /* find the recovery master */
2850         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
2851         if (ret != 0) {
2852                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2853                 talloc_free(tmp_ctx);
2854                 return ret;
2855         }
2856
2857
2858         /* read the node flags from the recmaster */
2859         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
2860         if (ret != 0) {
2861                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
2862                 talloc_free(tmp_ctx);
2863                 return -1;
2864         }
2865         if (destnode >= nodemap->num) {
2866                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
2867                 talloc_free(tmp_ctx);
2868                 return -1;
2869         }
2870
2871         c.pnn       = destnode;
2872         c.old_flags = nodemap->nodes[destnode].flags;
2873         c.new_flags = c.old_flags;
2874         c.new_flags |= set;
2875         c.new_flags &= ~clear;
2876
2877         data.dsize = sizeof(c);
2878         data.dptr = (unsigned char *)&c;
2879
2880         /* send the flags update to all connected nodes */
2881         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2882
2883         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2884                                         nodes, 0,
2885                                         timeout, false, data,
2886                                         NULL, NULL,
2887                                         NULL) != 0) {
2888                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2889
2890                 talloc_free(tmp_ctx);
2891                 return -1;
2892         }
2893
2894         talloc_free(tmp_ctx);
2895         return 0;
2896 }
2897
2898
2899 /*
2900   get all tunables
2901  */
2902 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
2903                                struct timeval timeout, 
2904                                uint32_t destnode,
2905                                struct ctdb_tunable *tunables)
2906 {
2907         TDB_DATA outdata;
2908         int ret;
2909         int32_t res;
2910
2911         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
2912                            &outdata, &res, &timeout, NULL);
2913         if (ret != 0 || res != 0) {
2914                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
2915                 return -1;
2916         }
2917
2918         if (outdata.dsize != sizeof(*tunables)) {
2919                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2920                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2921                 return -1;              
2922         }
2923
2924         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2925         talloc_free(outdata.dptr);
2926         return 0;
2927 }
2928
2929 /*
2930   add a public address to a node
2931  */
2932 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2933                       struct timeval timeout, 
2934                       uint32_t destnode,
2935                       struct ctdb_control_ip_iface *pub)
2936 {
2937         TDB_DATA data;
2938         int32_t res;
2939         int ret;
2940
2941         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2942         data.dptr  = (unsigned char *)pub;
2943
2944         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2945                            NULL, &res, &timeout, NULL);
2946         if (ret != 0 || res != 0) {
2947                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2948                 return -1;
2949         }
2950
2951         return 0;
2952 }
2953
2954 /*
2955   delete a public address from a node
2956  */
2957 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2958                       struct timeval timeout, 
2959                       uint32_t destnode,
2960                       struct ctdb_control_ip_iface *pub)
2961 {
2962         TDB_DATA data;
2963         int32_t res;
2964         int ret;
2965
2966         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2967         data.dptr  = (unsigned char *)pub;
2968
2969         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2970                            NULL, &res, &timeout, NULL);
2971         if (ret != 0 || res != 0) {
2972                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2973                 return -1;
2974         }
2975
2976         return 0;
2977 }
2978
2979 /*
2980   kill a tcp connection
2981  */
2982 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2983                       struct timeval timeout, 
2984                       uint32_t destnode,
2985                       struct ctdb_control_killtcp *killtcp)
2986 {
2987         TDB_DATA data;
2988         int32_t res;
2989         int ret;
2990
2991         data.dsize = sizeof(struct ctdb_control_killtcp);
2992         data.dptr  = (unsigned char *)killtcp;
2993
2994         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2995                            NULL, &res, &timeout, NULL);
2996         if (ret != 0 || res != 0) {
2997                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2998                 return -1;
2999         }
3000
3001         return 0;
3002 }
3003
3004 /*
3005   send a gratious arp
3006  */
3007 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
3008                       struct timeval timeout, 
3009                       uint32_t destnode,
3010                       ctdb_sock_addr *addr,
3011                       const char *ifname)
3012 {
3013         TDB_DATA data;
3014         int32_t res;
3015         int ret, len;
3016         struct ctdb_control_gratious_arp *gratious_arp;
3017         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
3018
3019
3020         len = strlen(ifname)+1;
3021         gratious_arp = talloc_size(tmp_ctx, 
3022                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
3023         CTDB_NO_MEMORY(ctdb, gratious_arp);
3024
3025         gratious_arp->addr = *addr;
3026         gratious_arp->len = len;
3027         memcpy(&gratious_arp->iface[0], ifname, len);
3028
3029
3030         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
3031         data.dptr  = (unsigned char *)gratious_arp;
3032
3033         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
3034                            NULL, &res, &timeout, NULL);
3035         if (ret != 0 || res != 0) {
3036                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
3037                 talloc_free(tmp_ctx);
3038                 return -1;
3039         }
3040
3041         talloc_free(tmp_ctx);
3042         return 0;
3043 }
3044
3045 /*
3046   get a list of all tcp tickles that a node knows about for a particular vnn
3047  */
3048 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
3049                               struct timeval timeout, uint32_t destnode, 
3050                               TALLOC_CTX *mem_ctx, 
3051                               ctdb_sock_addr *addr,
3052                               struct ctdb_control_tcp_tickle_list **list)
3053 {
3054         int ret;
3055         TDB_DATA data, outdata;
3056         int32_t status;
3057
3058         data.dptr = (uint8_t*)addr;
3059         data.dsize = sizeof(ctdb_sock_addr);
3060
3061         ret = ctdb_control(ctdb, destnode, 0, 
3062                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
3063                            mem_ctx, &outdata, &status, NULL, NULL);
3064         if (ret != 0 || status != 0) {
3065                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
3066                 return -1;
3067         }
3068
3069         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
3070
3071         return status;
3072 }
3073
3074 /*
3075   register a server id
3076  */
3077 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
3078                       struct timeval timeout, 
3079                       struct ctdb_server_id *id)
3080 {
3081         TDB_DATA data;
3082         int32_t res;
3083         int ret;
3084
3085         data.dsize = sizeof(struct ctdb_server_id);
3086         data.dptr  = (unsigned char *)id;
3087
3088         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3089                         CTDB_CONTROL_REGISTER_SERVER_ID, 
3090                         0, data, NULL,
3091                         NULL, &res, &timeout, NULL);
3092         if (ret != 0 || res != 0) {
3093                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
3094                 return -1;
3095         }
3096
3097         return 0;
3098 }
3099
3100 /*
3101   unregister a server id
3102  */
3103 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
3104                       struct timeval timeout, 
3105                       struct ctdb_server_id *id)
3106 {
3107         TDB_DATA data;
3108         int32_t res;
3109         int ret;
3110
3111         data.dsize = sizeof(struct ctdb_server_id);
3112         data.dptr  = (unsigned char *)id;
3113
3114         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
3115                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
3116                         0, data, NULL,
3117                         NULL, &res, &timeout, NULL);
3118         if (ret != 0 || res != 0) {
3119                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
3120                 return -1;
3121         }
3122
3123         return 0;
3124 }
3125
3126
3127 /*
3128   check if a server id exists
3129
3130   if a server id does exist, return *status == 1, otherwise *status == 0
3131  */
3132 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
3133                       struct timeval timeout, 
3134                       uint32_t destnode,
3135                       struct ctdb_server_id *id,
3136                       uint32_t *status)
3137 {
3138         TDB_DATA data;
3139         int32_t res;
3140         int ret;
3141
3142         data.dsize = sizeof(struct ctdb_server_id);
3143         data.dptr  = (unsigned char *)id;
3144
3145         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
3146                         0, data, NULL,
3147                         NULL, &res, &timeout, NULL);
3148         if (ret != 0) {
3149                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
3150                 return -1;
3151         }
3152
3153         if (res) {
3154                 *status = 1;
3155         } else {
3156                 *status = 0;
3157         }
3158
3159         return 0;
3160 }
3161
3162 /*
3163    get the list of server ids that are registered on a node
3164 */
3165 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
3166                 TALLOC_CTX *mem_ctx,
3167                 struct timeval timeout, uint32_t destnode, 
3168                 struct ctdb_server_id_list **svid_list)
3169 {
3170         int ret;
3171         TDB_DATA outdata;
3172         int32_t res;
3173
3174         ret = ctdb_control(ctdb, destnode, 0, 
3175                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
3176                            mem_ctx, &outdata, &res, &timeout, NULL);
3177         if (ret != 0 || res != 0) {
3178                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
3179                 return -1;
3180         }
3181
3182         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
3183                     
3184         return 0;
3185 }
3186
3187 /*
3188   initialise the ctdb daemon for client applications
3189
3190   NOTE: In current code the daemon does not fork. This is for testing purposes only
3191   and to simplify the code.
3192 */
3193 struct ctdb_context *ctdb_init(struct event_context *ev)
3194 {
3195         int ret;
3196         struct ctdb_context *ctdb;
3197
3198         ctdb = talloc_zero(ev, struct ctdb_context);
3199         if (ctdb == NULL) {
3200                 DEBUG(DEBUG_ERR,(__location__ " talloc_zero failed.\n"));
3201                 return NULL;
3202         }
3203         ctdb->ev  = ev;
3204         ctdb->idr = idr_init(ctdb);
3205         /* Wrap early to exercise code. */
3206         ctdb->lastid = INT_MAX-200;
3207         CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
3208
3209         ret = ctdb_set_socketname(ctdb, CTDB_PATH);
3210         if (ret != 0) {
3211                 DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));
3212                 talloc_free(ctdb);
3213                 return NULL;
3214         }
3215
3216         ctdb->statistics.statistics_start_time = timeval_current();
3217
3218         return ctdb;
3219 }
3220
3221
3222 /*
3223   set some ctdb flags
3224 */
3225 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
3226 {
3227         ctdb->flags |= flags;
3228 }
3229
3230 /*
3231   setup the local socket name
3232 */
3233 int ctdb_set_socketname(struct ctdb_context *ctdb, const char *socketname)
3234 {
3235         ctdb->daemon.name = talloc_strdup(ctdb, socketname);
3236         CTDB_NO_MEMORY(ctdb, ctdb->daemon.name);
3237
3238         return 0;
3239 }
3240
3241 const char *ctdb_get_socketname(struct ctdb_context *ctdb)
3242 {
3243         return ctdb->daemon.name;
3244 }
3245
3246 /*
3247   return the pnn of this node
3248 */
3249 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
3250 {
3251         return ctdb->pnn;
3252 }
3253
3254
3255 /*
3256   get the uptime of a remote node
3257  */
3258 struct ctdb_client_control_state *
3259 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3260 {
3261         return ctdb_control_send(ctdb, destnode, 0, 
3262                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
3263                            mem_ctx, &timeout, NULL);
3264 }
3265
3266 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
3267 {
3268         int ret;
3269         int32_t res;
3270         TDB_DATA outdata;
3271
3272         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3273         if (ret != 0 || res != 0) {
3274                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
3275                 return -1;
3276         }
3277
3278         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
3279
3280         return 0;
3281 }
3282
3283 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
3284 {
3285         struct ctdb_client_control_state *state;
3286
3287         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
3288         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
3289 }
3290
3291 /*
3292   send a control to execute the "recovered" event script on a node
3293  */
3294 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3295 {
3296         int ret;
3297         int32_t status;
3298
3299         ret = ctdb_control(ctdb, destnode, 0, 
3300                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
3301                            NULL, NULL, &status, &timeout, NULL);
3302         if (ret != 0 || status != 0) {
3303                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
3304                 return -1;
3305         }
3306
3307         return 0;
3308 }
3309
3310 /* 
3311   callback for the async helpers used when sending the same control
3312   to multiple nodes in parallell.
3313 */
3314 static void async_callback(struct ctdb_client_control_state *state)
3315 {
3316         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
3317         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
3318         int ret;
3319         TDB_DATA outdata;
3320         int32_t res = -1;
3321         uint32_t destnode = state->c->hdr.destnode;
3322
3323         /* one more node has responded with recmode data */
3324         data->count--;
3325
3326         /* if we failed to push the db, then return an error and let
3327            the main loop try again.
3328         */
3329         if (state->state != CTDB_CONTROL_DONE) {
3330                 if ( !data->dont_log_errors) {
3331                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
3332                 }
3333                 data->fail_count++;
3334                 if (state->state == CTDB_CONTROL_TIMEOUT) {
3335                         res = -ETIME;
3336                 } else {
3337                         res = -1;
3338                 }
3339                 if (data->fail_callback) {
3340                         data->fail_callback(ctdb, destnode, res, outdata,
3341                                         data->callback_data);
3342                 }
3343                 return;
3344         }
3345         
3346         state->async.fn = NULL;
3347
3348         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
3349         if ((ret != 0) || (res != 0)) {
3350                 if ( !data->dont_log_errors) {
3351                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
3352                 }
3353                 data->fail_count++;
3354                 if (data->fail_callback) {
3355                         data->fail_callback(ctdb, destnode, res, outdata,
3356                                         data->callback_data);
3357                 }
3358         }
3359         if ((ret == 0) && (data->callback != NULL)) {
3360                 data->callback(ctdb, destnode, res, outdata,
3361                                         data->callback_data);
3362         }
3363 }
3364
3365
3366 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
3367 {
3368         /* set up the callback functions */
3369         state->async.fn = async_callback;
3370         state->async.private_data = data;
3371         
3372         /* one more control to wait for to complete */
3373         data->count++;
3374 }
3375
3376
3377 /* wait for up to the maximum number of seconds allowed
3378    or until all nodes we expect a response from has replied
3379 */
3380 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
3381 {
3382         while (data->count > 0) {
3383                 event_loop_once(ctdb->ev);
3384         }
3385         if (data->fail_count != 0) {
3386                 if (!data->dont_log_errors) {
3387                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
3388                                  data->fail_count));
3389                 }
3390                 return -1;
3391         }
3392         return 0;
3393 }
3394
3395
3396 /* 
3397    perform a simple control on the listed nodes
3398    The control cannot return data
3399  */
3400 int ctdb_client_async_control(struct ctdb_context *ctdb,
3401                                 enum ctdb_controls opcode,
3402                                 uint32_t *nodes,
3403                                 uint64_t srvid,
3404                                 struct timeval timeout,
3405                                 bool dont_log_errors,
3406                                 TDB_DATA data,
3407                                 client_async_callback client_callback,
3408                                 client_async_callback fail_callback,
3409                                 void *callback_data)
3410 {
3411         struct client_async_data *async_data;
3412         struct ctdb_client_control_state *state;
3413         int j, num_nodes;
3414
3415         async_data = talloc_zero(ctdb, struct client_async_data);
3416         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
3417         async_data->dont_log_errors = dont_log_errors;
3418         async_data->callback = client_callback;
3419         async_data->fail_callback = fail_callback;
3420         async_data->callback_data = callback_data;
3421         async_data->opcode        = opcode;
3422
3423         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
3424
3425         /* loop over all nodes and send an async control to each of them */
3426         for (j=0; j<num_nodes; j++) {
3427                 uint32_t pnn = nodes[j];
3428
3429                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
3430                                           0, data, async_data, &timeout, NULL);
3431                 if (state == NULL) {
3432                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
3433                         talloc_free(async_data);
3434                         return -1;
3435                 }
3436                 
3437                 ctdb_client_async_add(async_data, state);
3438         }
3439
3440         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
3441                 talloc_free(async_data);
3442                 return -1;
3443         }
3444
3445         talloc_free(async_data);
3446         return 0;
3447 }
3448
3449 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
3450                                 struct ctdb_vnn_map *vnn_map,
3451                                 TALLOC_CTX *mem_ctx,
3452                                 bool include_self)
3453 {
3454         int i, j, num_nodes;
3455         uint32_t *nodes;
3456
3457         for (i=num_nodes=0;i<vnn_map->size;i++) {
3458                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3459                         continue;
3460                 }
3461                 num_nodes++;
3462         } 
3463
3464         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3465         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3466
3467         for (i=j=0;i<vnn_map->size;i++) {
3468                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
3469                         continue;
3470                 }
3471                 nodes[j++] = vnn_map->map[i];
3472         } 
3473
3474         return nodes;
3475 }
3476
3477 /* Get list of nodes not including those with flags specified by mask.
3478  * If exclude_pnn is not -1 then exclude that pnn from the list.
3479  */
3480 uint32_t *list_of_nodes(struct ctdb_context *ctdb,
3481                         struct ctdb_node_map *node_map,
3482                         TALLOC_CTX *mem_ctx,
3483                         uint32_t mask,
3484                         int exclude_pnn)
3485 {
3486         int i, j, num_nodes;
3487         uint32_t *nodes;
3488
3489         for (i=num_nodes=0;i<node_map->num;i++) {
3490                 if (node_map->nodes[i].flags & mask) {
3491                         continue;
3492                 }
3493                 if (node_map->nodes[i].pnn == exclude_pnn) {
3494                         continue;
3495                 }
3496                 num_nodes++;
3497         } 
3498
3499         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
3500         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
3501
3502         for (i=j=0;i<node_map->num;i++) {
3503                 if (node_map->nodes[i].flags & mask) {
3504                         continue;
3505                 }
3506                 if (node_map->nodes[i].pnn == exclude_pnn) {
3507                         continue;
3508                 }
3509                 nodes[j++] = node_map->nodes[i].pnn;
3510         } 
3511
3512         return nodes;
3513 }
3514
3515 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
3516                                 struct ctdb_node_map *node_map,
3517                                 TALLOC_CTX *mem_ctx,
3518                                 bool include_self)
3519 {
3520         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_INACTIVE,
3521                              include_self ? -1 : ctdb->pnn);
3522 }
3523
3524 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
3525                                 struct ctdb_node_map *node_map,
3526                                 TALLOC_CTX *mem_ctx,
3527                                 bool include_self)
3528 {
3529         return list_of_nodes(ctdb, node_map, mem_ctx, NODE_FLAGS_DISCONNECTED,
3530                              include_self ? -1 : ctdb->pnn);
3531 }
3532
3533 /* 
3534   this is used to test if a pnn lock exists and if it exists will return
3535   the number of connections that pnn has reported or -1 if that recovery
3536   daemon is not running.
3537 */
3538 int
3539 ctdb_read_pnn_lock(int fd, int32_t pnn)
3540 {
3541         struct flock lock;
3542         char c;
3543
3544         lock.l_type = F_WRLCK;
3545         lock.l_whence = SEEK_SET;
3546         lock.l_start = pnn;
3547         lock.l_len = 1;
3548         lock.l_pid = 0;
3549
3550         if (fcntl(fd, F_GETLK, &lock) != 0) {
3551                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
3552                 return -1;
3553         }
3554
3555         if (lock.l_type == F_UNLCK) {
3556                 return -1;
3557         }
3558
3559         if (pread(fd, &c, 1, pnn) == -1) {
3560                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
3561                 return -1;
3562         }
3563
3564         return c;
3565 }
3566
3567 /*
3568   get capabilities of a remote node
3569  */
3570 struct ctdb_client_control_state *
3571 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
3572 {
3573         return ctdb_control_send(ctdb, destnode, 0, 
3574                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
3575                            mem_ctx, &timeout, NULL);
3576 }
3577
3578 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
3579 {
3580         int ret;
3581         int32_t res;
3582         TDB_DATA outdata;
3583
3584         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
3585         if ( (ret != 0) || (res != 0) ) {
3586                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
3587                 return -1;
3588         }
3589
3590         if (capabilities) {
3591                 *capabilities = *((uint32_t *)outdata.dptr);
3592         }
3593
3594         return 0;
3595 }
3596
3597 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
3598 {
3599         struct ctdb_client_control_state *state;
3600         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3601         int ret;
3602
3603         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
3604         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
3605         talloc_free(tmp_ctx);
3606         return ret;
3607 }
3608
3609 /**
3610  * check whether a transaction is active on a given db on a given node
3611  */
3612 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
3613                                      uint32_t destnode,
3614                                      uint32_t db_id)
3615 {
3616         int32_t status;
3617         int ret;
3618         TDB_DATA indata;
3619
3620         indata.dptr = (uint8_t *)&db_id;
3621         indata.dsize = sizeof(db_id);
3622
3623         ret = ctdb_control(ctdb, destnode, 0,
3624                            CTDB_CONTROL_TRANS2_ACTIVE,
3625                            0, indata, NULL, NULL, &status,
3626                            NULL, NULL);
3627
3628         if (ret != 0) {
3629                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
3630                 return -1;
3631         }
3632
3633         return status;
3634 }
3635
3636
3637 struct ctdb_transaction_handle {
3638         struct ctdb_db_context *ctdb_db;
3639         bool in_replay;
3640         /*
3641          * we store the reads and writes done under a transaction:
3642          * - one list stores both reads and writes (m_all),
3643          * - the other just writes (m_write)
3644          */
3645         struct ctdb_marshall_buffer *m_all;
3646         struct ctdb_marshall_buffer *m_write;
3647 };
3648
3649 /* start a transaction on a database */
3650 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
3651 {
3652         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3653         return 0;
3654 }
3655
3656 /* start a transaction on a database */
3657 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
3658 {
3659         struct ctdb_record_handle *rh;
3660         TDB_DATA key;
3661         TDB_DATA data;
3662         struct ctdb_ltdb_header header;
3663         TALLOC_CTX *tmp_ctx;
3664         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
3665         int ret;
3666         struct ctdb_db_context *ctdb_db = h->ctdb_db;
3667         pid_t pid;
3668         int32_t status;
3669
3670         key.dptr = discard_const(keyname);
3671         key.dsize = strlen(keyname);
3672
3673         if (!ctdb_db->persistent) {
3674                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
3675                 return -1;
3676         }
3677
3678 again:
3679         tmp_ctx = talloc_new(h);
3680
3681         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
3682         if (rh == NULL) {
3683                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
3684                 talloc_free(tmp_ctx);
3685                 return -1;
3686         }
3687
3688         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
3689                                               CTDB_CURRENT_NODE,
3690                                               ctdb_db->db_id);
3691         if (status == 1) {
3692                 unsigned long int usec = (1000 + random()) % 100000;
3693                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
3694                                     "on db_id[0x%08x]. waiting for %lu "
3695                                     "microseconds\n",
3696                                     ctdb_db->db_id, usec));
3697                 talloc_free(tmp_ctx);
3698                 usleep(usec);
3699                 goto again;
3700         }
3701
3702         /*
3703          * store the pid in the database:
3704          * it is not enough that the node is dmaster...
3705          */
3706         pid = getpid();
3707         data.dptr = (unsigned char *)&pid;
3708         data.dsize = sizeof(pid_t);
3709         rh->header.rsn++;
3710         rh->header.dmaster = ctdb_db->ctdb->pnn;
3711         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
3712         if (ret != 0) {
3713                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
3714                                   "transaction record\n"));
3715                 talloc_free(tmp_ctx);
3716                 return -1;
3717         }
3718
3719         talloc_free(rh);
3720
3721         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
3722         if (ret != 0) {
3723                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
3724                 talloc_free(tmp_ctx);
3725                 return -1;
3726         }
3727
3728         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
3729         if (ret != 0) {
3730                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
3731                                  "lock record inside transaction\n"));
3732                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3733                 talloc_free(tmp_ctx);
3734                 goto again;
3735         }
3736
3737         if (header.dmaster != ctdb_db->ctdb->pnn) {
3738                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
3739                                    "transaction lock record\n"));
3740                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3741                 talloc_free(tmp_ctx);
3742                 goto again;
3743         }
3744
3745         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
3746                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
3747                                     "the transaction lock record\n"));
3748                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
3749                 talloc_free(tmp_ctx);
3750                 goto again;
3751         }
3752
3753         talloc_free(tmp_ctx);
3754
3755         return 0;
3756 }
3757
3758
3759 /* start a transaction on a database */
3760 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
3761                                                        TALLOC_CTX *mem_ctx)
3762 {
3763         struct ctdb_transaction_handle *h;
3764         int ret;
3765
3766         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
3767         if (h == NULL) {
3768                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
3769                 return NULL;
3770         }
3771
3772         h->ctdb_db = ctdb_db;
3773
3774         ret = ctdb_transaction_fetch_start(h);
3775         if (ret != 0) {
3776                 talloc_free(h);
3777                 return NULL;
3778         }
3779
3780         talloc_set_destructor(h, ctdb_transaction_destructor);
3781
3782         return h;
3783 }
3784
3785
3786
3787 /*
3788   fetch a record inside a transaction
3789  */
3790 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
3791                            TALLOC_CTX *mem_ctx, 
3792                            TDB_DATA key, TDB_DATA *data)
3793 {
3794         struct ctdb_ltdb_header header;
3795         int ret;
3796
3797         ZERO_STRUCT(header);
3798
3799         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
3800         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3801                 /* record doesn't exist yet */
3802                 *data = tdb_null;
3803                 ret = 0;
3804         }
3805         
3806         if (ret != 0) {
3807                 return ret;
3808         }
3809
3810         if (!h->in_replay) {
3811                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
3812                 if (h->m_all == NULL) {
3813                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3814                         return -1;
3815                 }
3816         }
3817
3818         return 0;
3819 }
3820
3821 /*
3822   stores a record inside a transaction
3823  */
3824 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
3825                            TDB_DATA key, TDB_DATA data)
3826 {
3827         TALLOC_CTX *tmp_ctx = talloc_new(h);
3828         struct ctdb_ltdb_header header;
3829         TDB_DATA olddata;
3830         int ret;
3831
3832         ZERO_STRUCT(header);
3833
3834         /* we need the header so we can update the RSN */
3835         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
3836         if (ret == -1 && header.dmaster == (uint32_t)-1) {
3837                 /* the record doesn't exist - create one with us as dmaster.
3838                    This is only safe because we are in a transaction and this
3839                    is a persistent database */
3840                 ZERO_STRUCT(header);
3841         } else if (ret != 0) {
3842                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
3843                 talloc_free(tmp_ctx);
3844                 return ret;
3845         }
3846
3847         if (data.dsize == olddata.dsize &&
3848             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
3849                 /* save writing the same data */
3850                 talloc_free(tmp_ctx);
3851                 return 0;
3852         }
3853
3854         header.dmaster = h->ctdb_db->ctdb->pnn;
3855         header.rsn++;
3856
3857         if (!h->in_replay) {
3858                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
3859                 if (h->m_all == NULL) {
3860                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3861                         talloc_free(tmp_ctx);
3862                         return -1;
3863                 }
3864         }               
3865
3866         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
3867         if (h->m_write == NULL) {
3868                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
3869                 talloc_free(tmp_ctx);
3870                 return -1;
3871         }
3872         
3873         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
3874
3875         talloc_free(tmp_ctx);
3876         
3877         return ret;
3878 }
3879
3880 /*
3881   replay a transaction
3882  */
3883 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
3884 {
3885         int ret, i;
3886         struct ctdb_rec_data *rec = NULL;
3887
3888         h->in_replay = true;
3889         talloc_free(h->m_write);
3890         h->m_write = NULL;
3891
3892         ret = ctdb_transaction_fetch_start(h);
3893         if (ret != 0) {
3894                 return ret;
3895         }
3896
3897         for (i=0;i<h->m_all->count;i++) {
3898                 TDB_DATA key, data;
3899
3900                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
3901                 if (rec == NULL) {
3902                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
3903                         goto failed;
3904                 }
3905
3906                 if (rec->reqid == 0) {
3907                         /* its a store */
3908                         if (ctdb_transaction_store(h, key, data) != 0) {
3909                                 goto failed;
3910                         }
3911                 } else {
3912                         TDB_DATA data2;
3913                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3914
3915                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3916                                 talloc_free(tmp_ctx);
3917                                 goto failed;
3918                         }
3919                         if (data2.dsize != data.dsize ||
3920                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3921                                 /* the record has changed on us - we have to give up */
3922                                 talloc_free(tmp_ctx);
3923                                 goto failed;
3924                         }
3925                         talloc_free(tmp_ctx);
3926                 }
3927         }
3928         
3929         return 0;
3930
3931 failed:
3932         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3933         return -1;
3934 }
3935
3936
3937 /*
3938   commit a transaction
3939  */
3940 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3941 {
3942         int ret, retries=0;
3943         int32_t status;
3944         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3945         struct timeval timeout;
3946         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3947
3948         talloc_set_destructor(h, NULL);
3949
3950         /* our commit strategy is quite complex.
3951
3952            - we first try to commit the changes to all other nodes
3953
3954            - if that works, then we commit locally and we are done
3955
3956            - if a commit on another node fails, then we need to cancel
3957              the transaction, then restart the transaction (thus
3958              opening a window of time for a pending recovery to
3959              complete), then replay the transaction, checking all the
3960              reads and writes (checking that reads give the same data,
3961              and writes succeed). Then we retry the transaction to the
3962              other nodes
3963         */
3964
3965 again:
3966         if (h->m_write == NULL) {
3967                 /* no changes were made */
3968                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3969                 talloc_free(h);
3970                 return 0;
3971         }
3972
3973         /* tell ctdbd to commit to the other nodes */
3974         timeout = timeval_current_ofs(1, 0);
3975         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3976                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3977                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3978                            &timeout, NULL);
3979         if (ret != 0 || status != 0) {
3980                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3981                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3982                                      ", retrying after 1 second...\n",
3983                                      (retries==0)?"":"retry "));
3984                 sleep(1);
3985
3986                 if (ret != 0) {
3987                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3988                 } else {
3989                         /* work out what error code we will give if we 
3990                            have to fail the operation */
3991                         switch ((enum ctdb_trans2_commit_error)status) {
3992                         case CTDB_TRANS2_COMMIT_SUCCESS:
3993                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3994                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3995                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3996                                 break;
3997                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3998                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3999                                 break;
4000                         }
4001                 }
4002
4003                 if (++retries == 100) {
4004                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
4005                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
4006                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4007                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4008                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4009                         talloc_free(h);
4010                         return -1;
4011                 }               
4012
4013                 if (ctdb_replay_transaction(h) != 0) {
4014                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
4015                                           "transaction on db 0x%08x, "
4016                                           "failure control =%u\n",
4017                                           h->ctdb_db->db_id,
4018                                           (unsigned)failure_control));
4019                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4020                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4021                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
4022                         talloc_free(h);
4023                         return -1;
4024                 }
4025                 goto again;
4026         } else {
4027                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
4028         }
4029
4030         /* do the real commit locally */
4031         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
4032         if (ret != 0) {
4033                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
4034                                   "on db id 0x%08x locally, "
4035                                   "failure_control=%u\n",
4036                                   h->ctdb_db->db_id,
4037                                   (unsigned)failure_control));
4038                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4039                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
4040                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
4041                 talloc_free(h);
4042                 return ret;
4043         }
4044
4045         /* tell ctdbd that we are finished with our local commit */
4046         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
4047                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
4048                      tdb_null, NULL, NULL, NULL, NULL, NULL);
4049         talloc_free(h);
4050         return 0;
4051 }
4052
4053 /*
4054   recovery daemon ping to main daemon
4055  */
4056 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
4057 {
4058         int ret;
4059         int32_t res;
4060
4061         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
4062                            ctdb, NULL, &res, NULL, NULL);
4063         if (ret != 0 || res != 0) {
4064                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
4065                 return -1;
4066         }
4067
4068         return 0;
4069 }
4070
4071 /* When forking the main daemon and the child process needs to connect
4072  * back to the daemon as a client process, this function can be used
4073  * to change the ctdb context from daemon into client mode.  The child
4074  * process must be created using ctdb_fork() and not fork() -
4075  * ctdb_fork() does some necessary housekeeping.
4076  */
4077 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
4078 {
4079         int ret;
4080         va_list ap;
4081
4082         /* Add extra information so we can identify this in the logs */
4083         va_start(ap, fmt);
4084         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
4085         va_end(ap);
4086
4087         /* get a new event context */
4088         ctdb->ev = event_context_init(ctdb);
4089         tevent_loop_allow_nesting(ctdb->ev);
4090
4091         /* Connect to main CTDB daemon */
4092         ret = ctdb_socket_connect(ctdb);
4093         if (ret != 0) {
4094                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
4095                 return -1;
4096         }
4097
4098         ctdb->can_send_controls = true;
4099
4100         return 0;
4101 }
4102
4103 /*
4104   get the status of running the monitor eventscripts: NULL means never run.
4105  */
4106 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
4107                 struct timeval timeout, uint32_t destnode, 
4108                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
4109                 struct ctdb_scripts_wire **scripts)
4110 {
4111         int ret;
4112         TDB_DATA outdata, indata;
4113         int32_t res;
4114         uint32_t uinttype = type;
4115
4116         indata.dptr = (uint8_t *)&uinttype;
4117         indata.dsize = sizeof(uinttype);
4118
4119         ret = ctdb_control(ctdb, destnode, 0, 
4120                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
4121                            mem_ctx, &outdata, &res, &timeout, NULL);
4122         if (ret != 0 || res != 0) {
4123                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
4124                 return -1;
4125         }
4126
4127         if (outdata.dsize == 0) {
4128                 *scripts = NULL;
4129         } else {
4130                 *scripts = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4131                 talloc_free(outdata.dptr);
4132         }
4133                     
4134         return 0;
4135 }
4136
4137 /*
4138   tell the main daemon how long it took to lock the reclock file
4139  */
4140 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
4141 {
4142         int ret;
4143         int32_t res;
4144         TDB_DATA data;
4145
4146         data.dptr = (uint8_t *)&latency;
4147         data.dsize = sizeof(latency);
4148
4149         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
4150                            ctdb, NULL, &res, NULL, NULL);
4151         if (ret != 0 || res != 0) {
4152                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
4153                 return -1;
4154         }
4155
4156         return 0;
4157 }
4158
4159 /*
4160   get the name of the reclock file
4161  */
4162 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
4163                          uint32_t destnode, TALLOC_CTX *mem_ctx,
4164                          const char **name)
4165 {
4166         int ret;
4167         int32_t res;
4168         TDB_DATA data;
4169
4170         ret = ctdb_control(ctdb, destnode, 0, 
4171                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
4172                            mem_ctx, &data, &res, &timeout, NULL);
4173         if (ret != 0 || res != 0) {
4174                 return -1;
4175         }
4176
4177         if (data.dsize == 0) {
4178                 *name = NULL;
4179         } else {
4180                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
4181         }
4182         talloc_free(data.dptr);
4183
4184         return 0;
4185 }
4186
4187 /*
4188   set the reclock filename for a node
4189  */
4190 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
4191 {
4192         int ret;
4193         TDB_DATA data;
4194         int32_t res;
4195
4196         if (reclock == NULL) {
4197                 data.dsize = 0;
4198                 data.dptr  = NULL;
4199         } else {
4200                 data.dsize = strlen(reclock) + 1;
4201                 data.dptr  = discard_const(reclock);
4202         }
4203
4204         ret = ctdb_control(ctdb, destnode, 0, 
4205                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
4206                            NULL, NULL, &res, &timeout, NULL);
4207         if (ret != 0 || res != 0) {
4208                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
4209                 return -1;
4210         }
4211
4212         return 0;
4213 }
4214
4215 /*
4216   stop a node
4217  */
4218 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4219 {
4220         int ret;
4221         int32_t res;
4222
4223         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
4224                            ctdb, NULL, &res, &timeout, NULL);
4225         if (ret != 0 || res != 0) {
4226                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
4227                 return -1;
4228         }
4229
4230         return 0;
4231 }
4232
4233 /*
4234   continue a node
4235  */
4236 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
4237 {
4238         int ret;
4239
4240         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
4241                            ctdb, NULL, NULL, &timeout, NULL);
4242         if (ret != 0) {
4243                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
4244                 return -1;
4245         }
4246
4247         return 0;
4248 }
4249
4250 /*
4251   set the natgw state for a node
4252  */
4253 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
4254 {
4255         int ret;
4256         TDB_DATA data;
4257         int32_t res;
4258
4259         data.dsize = sizeof(natgwstate);
4260         data.dptr  = (uint8_t *)&natgwstate;
4261
4262         ret = ctdb_control(ctdb, destnode, 0, 
4263                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
4264                            NULL, NULL, &res, &timeout, NULL);
4265         if (ret != 0 || res != 0) {
4266                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
4267                 return -1;
4268         }
4269
4270         return 0;
4271 }
4272
4273 /*
4274   set the lmaster role for a node
4275  */
4276 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
4277 {
4278         int ret;
4279         TDB_DATA data;
4280         int32_t res;
4281
4282         data.dsize = sizeof(lmasterrole);
4283         data.dptr  = (uint8_t *)&lmasterrole;
4284
4285         ret = ctdb_control(ctdb, destnode, 0, 
4286                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
4287                            NULL, NULL, &res, &timeout, NULL);
4288         if (ret != 0 || res != 0) {
4289                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
4290                 return -1;
4291         }
4292
4293         return 0;
4294 }
4295
4296 /*
4297   set the recmaster role for a node
4298  */
4299 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
4300 {
4301         int ret;
4302         TDB_DATA data;
4303         int32_t res;
4304
4305         data.dsize = sizeof(recmasterrole);
4306         data.dptr  = (uint8_t *)&recmasterrole;
4307
4308         ret = ctdb_control(ctdb, destnode, 0, 
4309                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
4310                            NULL, NULL, &res, &timeout, NULL);
4311         if (ret != 0 || res != 0) {
4312                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
4313                 return -1;
4314         }
4315
4316         return 0;
4317 }
4318
4319 /* enable an eventscript
4320  */
4321 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4322 {
4323         int ret;
4324         TDB_DATA data;
4325         int32_t res;
4326
4327         data.dsize = strlen(script) + 1;
4328         data.dptr  = discard_const(script);
4329
4330         ret = ctdb_control(ctdb, destnode, 0, 
4331                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
4332                            NULL, NULL, &res, &timeout, NULL);
4333         if (ret != 0 || res != 0) {
4334                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
4335                 return -1;
4336         }
4337
4338         return 0;
4339 }
4340
4341 /* disable an eventscript
4342  */
4343 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
4344 {
4345         int ret;
4346         TDB_DATA data;
4347         int32_t res;
4348
4349         data.dsize = strlen(script) + 1;
4350         data.dptr  = discard_const(script);
4351
4352         ret = ctdb_control(ctdb, destnode, 0, 
4353                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
4354                            NULL, NULL, &res, &timeout, NULL);
4355         if (ret != 0 || res != 0) {
4356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
4357                 return -1;
4358         }
4359
4360         return 0;
4361 }
4362
4363
4364 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
4365 {
4366         int ret;
4367         TDB_DATA data;
4368         int32_t res;
4369
4370         data.dsize = sizeof(*bantime);
4371         data.dptr  = (uint8_t *)bantime;
4372
4373         ret = ctdb_control(ctdb, destnode, 0, 
4374                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
4375                            NULL, NULL, &res, &timeout, NULL);
4376         if (ret != 0 || res != 0) {
4377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4378                 return -1;
4379         }
4380
4381         return 0;
4382 }
4383
4384
4385 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
4386 {
4387         int ret;
4388         TDB_DATA outdata;
4389         int32_t res;
4390         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4391
4392         ret = ctdb_control(ctdb, destnode, 0, 
4393                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
4394                            tmp_ctx, &outdata, &res, &timeout, NULL);
4395         if (ret != 0 || res != 0) {
4396                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
4397                 talloc_free(tmp_ctx);
4398                 return -1;
4399         }
4400
4401         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
4402         talloc_free(tmp_ctx);
4403
4404         return 0;
4405 }
4406
4407
4408 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
4409 {
4410         int ret;
4411         int32_t res;
4412         TDB_DATA data;
4413         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4414
4415         data.dptr = (uint8_t*)db_prio;
4416         data.dsize = sizeof(*db_prio);
4417
4418         ret = ctdb_control(ctdb, destnode, 0, 
4419                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
4420                            tmp_ctx, NULL, &res, &timeout, NULL);
4421         if (ret != 0 || res != 0) {
4422                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
4423                 talloc_free(tmp_ctx);
4424                 return -1;
4425         }
4426
4427         talloc_free(tmp_ctx);
4428
4429         return 0;
4430 }
4431
4432 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
4433 {
4434         int ret;
4435         int32_t res;
4436         TDB_DATA data;
4437         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
4438
4439         data.dptr = (uint8_t*)&db_id;
4440         data.dsize = sizeof(db_id);
4441
4442         ret = ctdb_control(ctdb, destnode, 0, 
4443                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
4444                            tmp_ctx, NULL, &res, &timeout, NULL);
4445         if (ret != 0 || res < 0) {
4446                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_db_priority failed\n"));
4447                 talloc_free(tmp_ctx);
4448                 return -1;
4449         }
4450
4451         if (priority) {
4452                 *priority = res;
4453         }
4454
4455         talloc_free(tmp_ctx);
4456
4457         return 0;
4458 }
4459
4460 int ctdb_ctrl_getstathistory(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_statistics_wire **stats)
4461 {
4462         int ret;
4463         TDB_DATA outdata;
4464         int32_t res;
4465
4466         ret = ctdb_control(ctdb, destnode, 0, 
4467                            CTDB_CONTROL_GET_STAT_HISTORY, 0, tdb_null, 
4468                            mem_ctx, &outdata, &res, &timeout, NULL);
4469         if (ret != 0 || res != 0 || outdata.dsize == 0) {
4470                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getstathistory failed ret:%d res:%d\n", ret, res));
4471                 return -1;
4472         }
4473
4474         *stats = (struct ctdb_statistics_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
4475         talloc_free(outdata.dptr);
4476                     
4477         return 0;
4478 }
4479
4480 struct ctdb_ltdb_header *ctdb_header_from_record_handle(struct ctdb_record_handle *h)
4481 {
4482         if (h == NULL) {
4483                 return NULL;
4484         }
4485
4486         return &h->header;
4487 }
4488
4489
4490 struct ctdb_client_control_state *
4491 ctdb_ctrl_updaterecord_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4492 {
4493         struct ctdb_client_control_state *handle;
4494         struct ctdb_marshall_buffer *m;
4495         struct ctdb_rec_data *rec;
4496         TDB_DATA outdata;
4497
4498         m = talloc_zero(mem_ctx, struct ctdb_marshall_buffer);
4499         if (m == NULL) {
4500                 DEBUG(DEBUG_ERR, ("Failed to allocate marshall buffer for update record\n"));
4501                 return NULL;
4502         }
4503
4504         m->db_id = ctdb_db->db_id;
4505
4506         rec = ctdb_marshall_record(m, 0, key, header, data);
4507         if (rec == NULL) {
4508                 DEBUG(DEBUG_ERR,("Failed to marshall record for update record\n"));
4509                 talloc_free(m);
4510                 return NULL;
4511         }
4512         m = talloc_realloc_size(mem_ctx, m, rec->length + offsetof(struct ctdb_marshall_buffer, data));
4513         if (m == NULL) {
4514                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata\n"));
4515                 talloc_free(m);
4516                 return NULL;
4517         }
4518         m->count++;
4519         memcpy((uint8_t *)m + offsetof(struct ctdb_marshall_buffer, data), rec, rec->length);
4520
4521
4522         outdata.dptr = (uint8_t *)m;
4523         outdata.dsize = talloc_get_size(m);
4524
4525         handle = ctdb_control_send(ctdb, destnode, 0, 
4526                            CTDB_CONTROL_UPDATE_RECORD, 0, outdata,
4527                            mem_ctx, &timeout, NULL);
4528         talloc_free(m);
4529         return handle;
4530 }
4531
4532 int ctdb_ctrl_updaterecord_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4533 {
4534         int ret;
4535         int32_t res;
4536
4537         ret = ctdb_control_recv(ctdb, state, state, NULL, &res, NULL);
4538         if ( (ret != 0) || (res != 0) ){
4539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_update_record_recv failed\n"));
4540                 return -1;
4541         }
4542
4543         return 0;
4544 }
4545
4546 int
4547 ctdb_ctrl_updaterecord(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_db_context *ctdb_db, TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA data)
4548 {
4549         struct ctdb_client_control_state *state;
4550
4551         state = ctdb_ctrl_updaterecord_send(ctdb, mem_ctx, timeout, destnode, ctdb_db, key, header, data);
4552         return ctdb_ctrl_updaterecord_recv(ctdb, state);
4553 }
4554
4555
4556
4557
4558
4559
4560 /*
4561   set a database to be readonly
4562  */
4563 struct ctdb_client_control_state *
4564 ctdb_ctrl_set_db_readonly_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4565 {
4566         TDB_DATA data;
4567
4568         data.dptr = (uint8_t *)&dbid;
4569         data.dsize = sizeof(dbid);
4570
4571         return ctdb_control_send(ctdb, destnode, 0, 
4572                            CTDB_CONTROL_SET_DB_READONLY, 0, data, 
4573                            ctdb, NULL, NULL);
4574 }
4575
4576 int ctdb_ctrl_set_db_readonly_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4577 {
4578         int ret;
4579         int32_t res;
4580
4581         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4582         if (ret != 0 || res != 0) {
4583           DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_readonly_recv failed  ret:%d res:%d\n", ret, res));
4584                 return -1;
4585         }
4586
4587         return 0;
4588 }
4589
4590 int ctdb_ctrl_set_db_readonly(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4591 {
4592         struct ctdb_client_control_state *state;
4593
4594         state = ctdb_ctrl_set_db_readonly_send(ctdb, destnode, dbid);
4595         return ctdb_ctrl_set_db_readonly_recv(ctdb, state);
4596 }
4597
4598 /*
4599   set a database to be sticky
4600  */
4601 struct ctdb_client_control_state *
4602 ctdb_ctrl_set_db_sticky_send(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4603 {
4604         TDB_DATA data;
4605
4606         data.dptr = (uint8_t *)&dbid;
4607         data.dsize = sizeof(dbid);
4608
4609         return ctdb_control_send(ctdb, destnode, 0, 
4610                            CTDB_CONTROL_SET_DB_STICKY, 0, data, 
4611                            ctdb, NULL, NULL);
4612 }
4613
4614 int ctdb_ctrl_set_db_sticky_recv(struct ctdb_context *ctdb, struct ctdb_client_control_state *state)
4615 {
4616         int ret;
4617         int32_t res;
4618
4619         ret = ctdb_control_recv(ctdb, state, ctdb, NULL, &res, NULL);
4620         if (ret != 0 || res != 0) {
4621                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_set_db_sticky_recv failed  ret:%d res:%d\n", ret, res));
4622                 return -1;
4623         }
4624
4625         return 0;
4626 }
4627
4628 int ctdb_ctrl_set_db_sticky(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid)
4629 {
4630         struct ctdb_client_control_state *state;
4631
4632         state = ctdb_ctrl_set_db_sticky_send(ctdb, destnode, dbid);
4633         return ctdb_ctrl_set_db_sticky_recv(ctdb, state);
4634 }