we sometimes need to send special flags when asking ctdb to create a new database...
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36 struct ctdb_record_handle {
37         struct ctdb_db_context *ctdb_db;
38         TDB_DATA key;
39         TDB_DATA *data;
40         struct ctdb_ltdb_header header;
41 };
42
43
44 /*
45   make a recv call to the local ctdb daemon - called from client context
46
47   This is called when the program wants to wait for a ctdb_call to complete and get the 
48   results. This call will block unless the call has already completed.
49 */
50 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
51 {
52         if (state == NULL) {
53                 return -1;
54         }
55
56         while (state->state < CTDB_CALL_DONE) {
57                 event_loop_once(state->ctdb_db->ctdb->ev);
58         }
59         if (state->state != CTDB_CALL_DONE) {
60                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
61                 talloc_free(state);
62                 return -1;
63         }
64
65         if (state->call->reply_data.dsize) {
66                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
67                                                       state->call->reply_data.dptr,
68                                                       state->call->reply_data.dsize);
69                 call->reply_data.dsize = state->call->reply_data.dsize;
70         } else {
71                 call->reply_data.dptr = NULL;
72                 call->reply_data.dsize = 0;
73         }
74         call->status = state->call->status;
75         talloc_free(state);
76
77         return 0;
78 }
79
80
81
82
83
84
85
86 /*
87   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
88 */
89 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
90 {
91         struct ctdb_client_call_state *state;
92
93         state = ctdb_call_send(ctdb_db, call);
94         return ctdb_call_recv(state, call);
95 }
96
97
98
99
100
101 /*
102   cancel a ctdb_fetch_lock operation, releasing the lock
103  */
104 static int fetch_lock_destructor(struct ctdb_record_handle *h)
105 {
106         ctdb_ltdb_unlock(h->ctdb_db, h->key);
107         return 0;
108 }
109
110 /*
111   force the migration of a record to this node
112  */
113 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
114 {
115         struct ctdb_call call;
116         ZERO_STRUCT(call);
117         call.call_id = CTDB_NULL_FUNC;
118         call.key = key;
119         call.flags = CTDB_IMMEDIATE_MIGRATION;
120         return ctdb_call(ctdb_db, &call);
121 }
122
123 /*
124   get a lock on a record, and return the records data. Blocks until it gets the lock
125  */
126 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
127                                            TDB_DATA key, TDB_DATA *data)
128 {
129         int ret;
130         struct ctdb_record_handle *h;
131
132         /*
133           procedure is as follows:
134
135           1) get the chain lock. 
136           2) check if we are dmaster
137           3) if we are the dmaster then return handle 
138           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
139              reply from ctdbd
140           5) when we get the reply, goto (1)
141          */
142
143         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
144         if (h == NULL) {
145                 return NULL;
146         }
147
148         h->ctdb_db = ctdb_db;
149         h->key     = key;
150         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
151         if (h->key.dptr == NULL) {
152                 talloc_free(h);
153                 return NULL;
154         }
155         h->data    = data;
156
157         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
158                  (const char *)key.dptr));
159
160 again:
161         /* step 1 - get the chain lock */
162         ret = ctdb_ltdb_lock(ctdb_db, key);
163         if (ret != 0) {
164                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
170
171         talloc_set_destructor(h, fetch_lock_destructor);
172
173         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
174
175         /* when torturing, ensure we test the remote path */
176         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
177             random() % 5 == 0) {
178                 h->header.dmaster = (uint32_t)-1;
179         }
180
181
182         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
183
184         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
185                 ctdb_ltdb_unlock(ctdb_db, key);
186                 ret = ctdb_client_force_migration(ctdb_db, key);
187                 if (ret != 0) {
188                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
189                         talloc_free(h);
190                         return NULL;
191                 }
192                 goto again;
193         }
194
195         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
196         return h;
197 }
198
199 /*
200   store some data to the record that was locked with ctdb_fetch_lock()
201 */
202 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
203 {
204         if (h->ctdb_db->persistent) {
205                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
206                 return -1;
207         }
208
209         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
210 }
211
212 /*
213   non-locking fetch of a record
214  */
215 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
216                TDB_DATA key, TDB_DATA *data)
217 {
218         struct ctdb_call call;
219         int ret;
220
221         call.call_id = CTDB_FETCH_FUNC;
222         call.call_data.dptr = NULL;
223         call.call_data.dsize = 0;
224
225         ret = ctdb_call(ctdb_db, &call);
226
227         if (ret == 0) {
228                 *data = call.reply_data;
229                 talloc_steal(mem_ctx, data->dptr);
230         }
231
232         return ret;
233 }
234
235
236
237
238
239
240
241
242 /*
243   send a ctdb control message
244   timeout specifies how long we should wait for a reply.
245   if timeout is NULL we wait indefinitely
246  */
247 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
248                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
249                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
250                  struct timeval *timeout,
251                  char **errormsg)
252 {
253         struct ctdb_client_control_state *state;
254
255         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
256                         flags, data, mem_ctx,
257                         errormsg);
258         if (state != NULL && timeout && !timeval_is_zero(timeout)) {
259                 event_add_timed(ctdb->ev, state, *timeout, ctdb_control_timeout_func, state);
260         }
261
262         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
263                         errormsg);
264 }
265
266
267
268
269 /*
270   a process exists call. Returns 0 if process exists, -1 otherwise
271  */
272 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
273 {
274         int ret;
275         TDB_DATA data;
276         int32_t status;
277
278         data.dptr = (uint8_t*)&pid;
279         data.dsize = sizeof(pid);
280
281         ret = ctdb_control(ctdb, destnode, 0, 
282                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
283                            NULL, NULL, &status, NULL, NULL);
284         if (ret != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
286                 return -1;
287         }
288
289         return status;
290 }
291
292 /*
293   get remote statistics
294  */
295 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
296 {
297         int ret;
298         TDB_DATA data;
299         int32_t res;
300
301         ret = ctdb_control(ctdb, destnode, 0, 
302                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
303                            ctdb, &data, &res, NULL, NULL);
304         if (ret != 0 || res != 0) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
306                 return -1;
307         }
308
309         if (data.dsize != sizeof(struct ctdb_statistics)) {
310                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
311                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
312                       return -1;
313         }
314
315         *status = *(struct ctdb_statistics *)data.dptr;
316         talloc_free(data.dptr);
317                         
318         return 0;
319 }
320
321 /*
322   shutdown a remote ctdb node
323  */
324 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
325 {
326         struct ctdb_client_control_state *state;
327
328         state = ctdb_control_send(ctdb, destnode, 0, 
329                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
330                            NULL, NULL);
331         if (state == NULL) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
333                 return -1;
334         }
335         if (!timeval_is_zero(&timeout)) {
336                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
337         }
338
339         return 0;
340 }
341
342 /*
343   get vnn map from a remote node
344  */
345 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
346 {
347         int ret;
348         TDB_DATA outdata;
349         int32_t res;
350         struct ctdb_vnn_map_wire *map;
351
352         ret = ctdb_control(ctdb, destnode, 0, 
353                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
354                            mem_ctx, &outdata, &res, &timeout, NULL);
355         if (ret != 0 || res != 0) {
356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
357                 return -1;
358         }
359         
360         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
361         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
362             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
363                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
364                 return -1;
365         }
366
367         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
368         CTDB_NO_MEMORY(ctdb, *vnnmap);
369         (*vnnmap)->generation = map->generation;
370         (*vnnmap)->size       = map->size;
371         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
372
373         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
374         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
375         talloc_free(outdata.dptr);
376                     
377         return 0;
378 }
379
380
381 /*
382   get the recovery mode of a remote node
383  */
384 struct ctdb_client_control_state *
385 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
386 {
387         struct ctdb_client_control_state *state;
388
389         state = ctdb_control_send(ctdb, destnode, 0, 
390                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
391                            mem_ctx, NULL);
392
393         if (state != NULL && !timeval_is_zero(&timeout)) {
394                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
395         }
396
397         return state;
398 }
399
400 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
401 {
402         int ret;
403         int32_t res;
404
405         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
406         if (ret != 0) {
407                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
408                 return -1;
409         }
410
411         if (recmode) {
412                 *recmode = (uint32_t)res;
413         }
414
415         return 0;
416 }
417
418 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
419 {
420         struct ctdb_client_control_state *state;
421
422         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
423         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
424 }
425
426
427
428
429 /*
430   set the recovery mode of a remote node
431  */
432 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
433 {
434         int ret;
435         TDB_DATA data;
436         int32_t res;
437
438         data.dsize = sizeof(uint32_t);
439         data.dptr = (unsigned char *)&recmode;
440
441         ret = ctdb_control(ctdb, destnode, 0, 
442                            CTDB_CONTROL_SET_RECMODE, 0, data, 
443                            NULL, NULL, &res, &timeout, NULL);
444         if (ret != 0 || res != 0) {
445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
446                 return -1;
447         }
448
449         return 0;
450 }
451
452
453
454 /*
455   get the recovery master of a remote node
456  */
457 struct ctdb_client_control_state *
458 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
459                         struct timeval timeout, uint32_t destnode)
460 {
461         struct ctdb_client_control_state *state;
462
463         state = ctdb_control_send(ctdb, destnode, 0, 
464                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
465                            mem_ctx, NULL);
466         if (state != NULL && !timeval_is_zero(&timeout)) {
467                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
468         }
469
470         return state;
471 }
472
473 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
474 {
475         int ret;
476         int32_t res;
477
478         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
479         if (ret != 0) {
480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
481                 return -1;
482         }
483
484         if (recmaster) {
485                 *recmaster = (uint32_t)res;
486         }
487
488         return 0;
489 }
490
491 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
492 {
493         struct ctdb_client_control_state *state;
494
495         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
496         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
497 }
498
499
500 /*
501   set the recovery master of a remote node
502  */
503 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
504 {
505         int ret;
506         TDB_DATA data;
507         int32_t res;
508
509         ZERO_STRUCT(data);
510         data.dsize = sizeof(uint32_t);
511         data.dptr = (unsigned char *)&recmaster;
512
513         ret = ctdb_control(ctdb, destnode, 0, 
514                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
515                            NULL, NULL, &res, &timeout, NULL);
516         if (ret != 0 || res != 0) {
517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
518                 return -1;
519         }
520
521         return 0;
522 }
523
524
525 /*
526   get a list of databases off a remote node
527  */
528 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
529                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
530 {
531         int ret;
532         TDB_DATA outdata;
533         int32_t res;
534
535         ret = ctdb_control(ctdb, destnode, 0, 
536                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
537                            mem_ctx, &outdata, &res, &timeout, NULL);
538         if (ret != 0 || res != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
540                 return -1;
541         }
542
543         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
544         talloc_free(outdata.dptr);
545                     
546         return 0;
547 }
548
549 /*
550   get a list of nodes (vnn and flags ) from a remote node
551  */
552 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
553                 struct timeval timeout, uint32_t destnode, 
554                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
555 {
556         int ret;
557         TDB_DATA outdata;
558         int32_t res;
559
560         ret = ctdb_control(ctdb, destnode, 0, 
561                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
562                            mem_ctx, &outdata, &res, &timeout, NULL);
563         if (ret == 0 && res == -1 && outdata.dsize == 0) {
564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
565                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
566         }
567         if (ret != 0 || res != 0 || outdata.dsize == 0) {
568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
569                 return -1;
570         }
571
572         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
573         talloc_free(outdata.dptr);
574                     
575         return 0;
576 }
577
578 /*
579   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
580  */
581 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
582                 struct timeval timeout, uint32_t destnode, 
583                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
584 {
585         int ret, i, len;
586         TDB_DATA outdata;
587         struct ctdb_node_mapv4 *nodemapv4;
588         int32_t res;
589
590         ret = ctdb_control(ctdb, destnode, 0, 
591                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
592                            mem_ctx, &outdata, &res, &timeout, NULL);
593         if (ret != 0 || res != 0 || outdata.dsize == 0) {
594                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
595                 return -1;
596         }
597
598         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
599
600         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
601         (*nodemap) = talloc_zero_size(mem_ctx, len);
602         CTDB_NO_MEMORY(ctdb, (*nodemap));
603
604         (*nodemap)->num = nodemapv4->num;
605         for (i=0; i<nodemapv4->num; i++) {
606                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
607                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
608                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
609                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
610         }
611                 
612         talloc_free(outdata.dptr);
613                     
614         return 0;
615 }
616
617 /*
618   drop the transport, reload the nodes file and restart the transport
619  */
620 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
621                     struct timeval timeout, uint32_t destnode)
622 {
623         int ret;
624         int32_t res;
625
626         ret = ctdb_control(ctdb, destnode, 0, 
627                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
628                            NULL, NULL, &res, &timeout, NULL);
629         if (ret != 0 || res != 0) {
630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
631                 return -1;
632         }
633
634         return 0;
635 }
636
637
638 /*
639   set vnn map on a node
640  */
641 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
642                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
643 {
644         int ret;
645         TDB_DATA data;
646         int32_t res;
647         struct ctdb_vnn_map_wire *map;
648         size_t len;
649
650         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
651         map = talloc_size(mem_ctx, len);
652         CTDB_NO_MEMORY(ctdb, map);
653
654         map->generation = vnnmap->generation;
655         map->size = vnnmap->size;
656         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
657         
658         data.dsize = len;
659         data.dptr  = (uint8_t *)map;
660
661         ret = ctdb_control(ctdb, destnode, 0, 
662                            CTDB_CONTROL_SETVNNMAP, 0, data, 
663                            NULL, NULL, &res, &timeout, NULL);
664         if (ret != 0 || res != 0) {
665                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
666                 return -1;
667         }
668
669         talloc_free(map);
670
671         return 0;
672 }
673
674
675 /*
676   async send for pull database
677  */
678 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
679         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
680         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
681 {
682         TDB_DATA indata;
683         struct ctdb_control_pulldb *pull;
684         struct ctdb_client_control_state *state;
685
686         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
687         CTDB_NO_MEMORY_NULL(ctdb, pull);
688
689         pull->db_id   = dbid;
690         pull->lmaster = lmaster;
691
692         indata.dsize = sizeof(struct ctdb_control_pulldb);
693         indata.dptr  = (unsigned char *)pull;
694
695         state = ctdb_control_send(ctdb, destnode, 0, 
696                                   CTDB_CONTROL_PULL_DB, 0, indata, 
697                                   mem_ctx, NULL);
698         if (state != NULL && !timeval_is_zero(&timeout)) {
699                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
700         }
701
702         talloc_free(pull);
703
704         return state;
705 }
706
707 /*
708   async recv for pull database
709  */
710 int ctdb_ctrl_pulldb_recv(
711         struct ctdb_context *ctdb, 
712         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
713         TDB_DATA *outdata)
714 {
715         int ret;
716         int32_t res;
717
718         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
719         if ( (ret != 0) || (res != 0) ){
720                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
721                 return -1;
722         }
723
724         return 0;
725 }
726
727 /*
728   pull all keys and records for a specific database on a node
729  */
730 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
731                 uint32_t dbid, uint32_t lmaster, 
732                 TALLOC_CTX *mem_ctx, struct timeval timeout,
733                 TDB_DATA *outdata)
734 {
735         struct ctdb_client_control_state *state;
736
737         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
738                                       timeout);
739         
740         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
741 }
742
743
744 /*
745   change dmaster for all keys in the database to the new value
746  */
747 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
748                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
749 {
750         int ret;
751         TDB_DATA indata;
752         int32_t res;
753
754         indata.dsize = 2*sizeof(uint32_t);
755         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
756
757         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
758         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
759
760         ret = ctdb_control(ctdb, destnode, 0, 
761                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
762                            NULL, NULL, &res, &timeout, NULL);
763         if (ret != 0 || res != 0) {
764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
765                 return -1;
766         }
767
768         return 0;
769 }
770
771 /*
772   ping a node, return number of clients connected
773  */
774 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
775 {
776         int ret;
777         int32_t res;
778
779         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
780                            tdb_null, NULL, NULL, &res, NULL, NULL);
781         if (ret != 0) {
782                 return -1;
783         }
784         return res;
785 }
786
787 /*
788   find the real path to a ltdb 
789  */
790 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
791                    const char **path)
792 {
793         int ret;
794         int32_t res;
795         TDB_DATA data;
796
797         data.dptr = (uint8_t *)&dbid;
798         data.dsize = sizeof(dbid);
799
800         ret = ctdb_control(ctdb, destnode, 0, 
801                            CTDB_CONTROL_GETDBPATH, 0, data, 
802                            mem_ctx, &data, &res, &timeout, NULL);
803         if (ret != 0 || res != 0) {
804                 return -1;
805         }
806
807         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
808         if ((*path) == NULL) {
809                 return -1;
810         }
811
812         talloc_free(data.dptr);
813
814         return 0;
815 }
816
817 /*
818   find the name of a db 
819  */
820 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
821                    const char **name)
822 {
823         int ret;
824         int32_t res;
825         TDB_DATA data;
826
827         data.dptr = (uint8_t *)&dbid;
828         data.dsize = sizeof(dbid);
829
830         ret = ctdb_control(ctdb, destnode, 0, 
831                            CTDB_CONTROL_GET_DBNAME, 0, data, 
832                            mem_ctx, &data, &res, &timeout, NULL);
833         if (ret != 0 || res != 0) {
834                 return -1;
835         }
836
837         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
838         if ((*name) == NULL) {
839                 return -1;
840         }
841
842         talloc_free(data.dptr);
843
844         return 0;
845 }
846
847 /*
848   get the health status of a db
849  */
850 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
851                           struct timeval timeout,
852                           uint32_t destnode,
853                           uint32_t dbid, TALLOC_CTX *mem_ctx,
854                           const char **reason)
855 {
856         int ret;
857         int32_t res;
858         TDB_DATA data;
859
860         data.dptr = (uint8_t *)&dbid;
861         data.dsize = sizeof(dbid);
862
863         ret = ctdb_control(ctdb, destnode, 0,
864                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
865                            mem_ctx, &data, &res, &timeout, NULL);
866         if (ret != 0 || res != 0) {
867                 return -1;
868         }
869
870         if (data.dsize == 0) {
871                 (*reason) = NULL;
872                 return 0;
873         }
874
875         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
876         if ((*reason) == NULL) {
877                 return -1;
878         }
879
880         talloc_free(data.dptr);
881
882         return 0;
883 }
884
885 /*
886   create a database
887  */
888 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout,
889                         uint32_t destnode, 
890                         const char *name, bool persistent)
891 {
892         int ret;
893         ctdb_handle *handle;
894
895         handle = ctdb_createdb_send(ctdb, destnode, name, persistent, 0, NULL, NULL);
896         if (handle == NULL) {
897                 DEBUG(DEBUG_ERR, (__location__  " Failed to send createdb control\n"));
898                 return -1;
899         }
900
901         if (!timeval_is_zero(&timeout)) {
902                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
903         }
904
905         ret = ctdb_createdb_recv(ctdb, handle, NULL);
906         if (ret != 0) {
907                 DEBUG(DEBUG_ERR,(__location__ " ctdb control for createdb failed\n"));
908                 return -1;
909         }
910
911         return 0;
912 }
913
914 /*
915   get debug level on a node
916  */
917 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
918 {
919         int ret;
920         int32_t res;
921         TDB_DATA data;
922
923         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
924                            ctdb, &data, &res, NULL, NULL);
925         if (ret != 0 || res != 0) {
926                 return -1;
927         }
928         if (data.dsize != sizeof(int32_t)) {
929                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
930                          (unsigned)data.dsize));
931                 return -1;
932         }
933         *level = *(int32_t *)data.dptr;
934         talloc_free(data.dptr);
935         return 0;
936 }
937
938 /*
939   set debug level on a node
940  */
941 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
942 {
943         int ret;
944         int32_t res;
945         TDB_DATA data;
946
947         data.dptr = (uint8_t *)&level;
948         data.dsize = sizeof(level);
949
950         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
951                            NULL, NULL, &res, NULL, NULL);
952         if (ret != 0 || res != 0) {
953                 return -1;
954         }
955         return 0;
956 }
957
958
959 /*
960   get a list of connected nodes
961  */
962 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
963                                 struct timeval timeout,
964                                 TALLOC_CTX *mem_ctx,
965                                 uint32_t *num_nodes)
966 {
967         struct ctdb_node_map *map=NULL;
968         int ret, i;
969         uint32_t *nodes;
970
971         *num_nodes = 0;
972
973         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
974         if (ret != 0) {
975                 return NULL;
976         }
977
978         nodes = talloc_array(mem_ctx, uint32_t, map->num);
979         if (nodes == NULL) {
980                 return NULL;
981         }
982
983         for (i=0;i<map->num;i++) {
984                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
985                         nodes[*num_nodes] = map->nodes[i].pnn;
986                         (*num_nodes)++;
987                 }
988         }
989
990         return nodes;
991 }
992
993
994 /*
995   reset remote status
996  */
997 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
998 {
999         int ret;
1000         int32_t res;
1001
1002         ret = ctdb_control(ctdb, destnode, 0, 
1003                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1004                            NULL, NULL, &res, NULL, NULL);
1005         if (ret != 0 || res != 0) {
1006                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1007                 return -1;
1008         }
1009         return 0;
1010 }
1011
1012 /*
1013   this is the dummy null procedure that all databases support
1014 */
1015 static int ctdb_null_func(struct ctdb_call_info *call)
1016 {
1017         return 0;
1018 }
1019
1020 /*
1021   this is a plain fetch procedure that all databases support
1022 */
1023 static int ctdb_fetch_func(struct ctdb_call_info *call)
1024 {
1025         call->reply_data = &call->record_data;
1026         return 0;
1027 }
1028
1029 /*
1030   attach to a specific database - client call
1031 */
1032 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1033 {
1034         struct ctdb_db_context *ctdb_db;
1035         TDB_DATA data;
1036         int ret;
1037         int32_t res;
1038
1039         ctdb_db = ctdb_db_handle(ctdb, name);
1040         if (ctdb_db) {
1041                 return ctdb_db;
1042         }
1043
1044         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1045         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1046
1047         ctdb_db->ctdb = ctdb;
1048         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1049         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1050
1051         data.dptr = discard_const(name);
1052         data.dsize = strlen(name)+1;
1053
1054         /* tell ctdb daemon to attach */
1055         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1056                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1057                            0, data, ctdb_db, &data, &res, NULL, NULL);
1058         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1059                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1060                 talloc_free(ctdb_db);
1061                 return NULL;
1062         }
1063         
1064         ctdb_db->db_id = *(uint32_t *)data.dptr;
1065         talloc_free(data.dptr);
1066
1067         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1068         if (ret != 0) {
1069                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1070                 talloc_free(ctdb_db);
1071                 return NULL;
1072         }
1073
1074         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1075         if (ctdb->valgrinding) {
1076                 tdb_flags |= TDB_NOMMAP;
1077         }
1078         tdb_flags |= TDB_DISALLOW_NESTING;
1079
1080         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1081         if (ctdb_db->ltdb == NULL) {
1082                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1083                 talloc_free(ctdb_db);
1084                 return NULL;
1085         }
1086
1087         ctdb_db->persistent = persistent;
1088
1089         DLIST_ADD(ctdb->db_list, ctdb_db);
1090
1091         /* add well known functions */
1092         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1093         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1094
1095         return ctdb_db;
1096 }
1097
1098
1099 /*
1100   setup a call for a database
1101  */
1102 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1103 {
1104         struct ctdb_registered_call *call;
1105
1106 #if 0
1107         TDB_DATA data;
1108         int32_t status;
1109         struct ctdb_control_set_call c;
1110         int ret;
1111
1112         /* this is no longer valid with the separate daemon architecture */
1113         c.db_id = ctdb_db->db_id;
1114         c.fn    = fn;
1115         c.id    = id;
1116
1117         data.dptr = (uint8_t *)&c;
1118         data.dsize = sizeof(c);
1119
1120         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1121                            data, NULL, NULL, &status, NULL, NULL);
1122         if (ret != 0 || status != 0) {
1123                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1124                 return -1;
1125         }
1126 #endif
1127
1128         /* also register locally */
1129         call = talloc(ctdb_db, struct ctdb_registered_call);
1130         call->fn = fn;
1131         call->id = id;
1132
1133         DLIST_ADD(ctdb_db->calls, call);        
1134         return 0;
1135 }
1136
1137
1138 struct traverse_state {
1139         bool done;
1140         uint32_t count;
1141         ctdb_traverse_func fn;
1142         void *private_data;
1143 };
1144
1145 /*
1146   called on each key during a ctdb_traverse
1147  */
1148 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1149 {
1150         struct traverse_state *state = (struct traverse_state *)p;
1151         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1152         TDB_DATA key;
1153
1154         if (data.dsize < sizeof(uint32_t) ||
1155             d->length != data.dsize) {
1156                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1157                 state->done = True;
1158                 return;
1159         }
1160
1161         key.dsize = d->keylen;
1162         key.dptr  = &d->data[0];
1163         data.dsize = d->datalen;
1164         data.dptr = &d->data[d->keylen];
1165
1166         if (key.dsize == 0 && data.dsize == 0) {
1167                 /* end of traverse */
1168                 state->done = True;
1169                 return;
1170         }
1171
1172         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1173                 /* empty records are deleted records in ctdb */
1174                 return;
1175         }
1176
1177         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1178                 state->done = True;
1179         }
1180
1181         state->count++;
1182 }
1183
1184
1185 /*
1186   start a cluster wide traverse, calling the supplied fn on each record
1187   return the number of records traversed, or -1 on error
1188  */
1189 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1190 {
1191         TDB_DATA data;
1192         struct ctdb_traverse_start t;
1193         int32_t status;
1194         int ret;
1195         uint64_t srvid = (getpid() | 0xFLL<<60);
1196         struct traverse_state state;
1197
1198         state.done = False;
1199         state.count = 0;
1200         state.private_data = private_data;
1201         state.fn = fn;
1202
1203         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1204         if (ret != 0) {
1205                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1206                 return -1;
1207         }
1208
1209         t.db_id = ctdb_db->db_id;
1210         t.srvid = srvid;
1211         t.reqid = 0;
1212
1213         data.dptr = (uint8_t *)&t;
1214         data.dsize = sizeof(t);
1215
1216         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1217                            data, NULL, NULL, &status, NULL, NULL);
1218         if (ret != 0 || status != 0) {
1219                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1220                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid);
1221                 return -1;
1222         }
1223
1224         while (!state.done) {
1225                 event_loop_once(ctdb_db->ctdb->ev);
1226         }
1227
1228         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid);
1229         if (ret != 0) {
1230                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1231                 return -1;
1232         }
1233
1234         return state.count;
1235 }
1236
1237 #define ISASCII(x) ((x>31)&&(x<128))
1238 /*
1239   called on each key during a catdb
1240  */
1241 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1242 {
1243         int i;
1244         FILE *f = (FILE *)p;
1245         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1246
1247         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1248         for (i=0;i<key.dsize;i++) {
1249                 if (ISASCII(key.dptr[i])) {
1250                         fprintf(f, "%c", key.dptr[i]);
1251                 } else {
1252                         fprintf(f, "\\%02X", key.dptr[i]);
1253                 }
1254         }
1255         fprintf(f, "\"\n");
1256
1257         fprintf(f, "dmaster: %u\n", h->dmaster);
1258         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1259
1260         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1261         for (i=sizeof(*h);i<data.dsize;i++) {
1262                 if (ISASCII(data.dptr[i])) {
1263                         fprintf(f, "%c", data.dptr[i]);
1264                 } else {
1265                         fprintf(f, "\\%02X", data.dptr[i]);
1266                 }
1267         }
1268         fprintf(f, "\"\n");
1269
1270         fprintf(f, "\n");
1271
1272         return 0;
1273 }
1274
1275 /*
1276   convenience function to list all keys to stdout
1277  */
1278 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1279 {
1280         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1281 }
1282
1283 /*
1284   get the pid of a ctdb daemon
1285  */
1286 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1287 {
1288         int ret;
1289         int32_t res;
1290
1291         ret = ctdb_control(ctdb, destnode, 0, 
1292                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1293                            NULL, NULL, &res, &timeout, NULL);
1294         if (ret != 0) {
1295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1296                 return -1;
1297         }
1298
1299         *pid = res;
1300
1301         return 0;
1302 }
1303
1304
1305 /*
1306   async freeze send control
1307  */
1308 struct ctdb_client_control_state *
1309 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1310 {
1311         struct ctdb_client_control_state *state;
1312
1313         state = ctdb_control_send(ctdb, destnode, priority, 
1314                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1315                            mem_ctx, NULL);
1316         if (state != NULL && !timeval_is_zero(&timeout)) {
1317                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
1318         }
1319
1320         return state;
1321 }
1322
1323 /* 
1324    async freeze recv control
1325 */
1326 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1327 {
1328         int ret;
1329         int32_t res;
1330
1331         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1332         if ( (ret != 0) || (res != 0) ){
1333                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1334                 return -1;
1335         }
1336
1337         return 0;
1338 }
1339
1340 /*
1341   freeze databases of a certain priority
1342  */
1343 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1344 {
1345         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1346         struct ctdb_client_control_state *state;
1347         int ret;
1348
1349         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1350         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1351         talloc_free(tmp_ctx);
1352
1353         return ret;
1354 }
1355
1356 /* Freeze all databases */
1357 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1358 {
1359         int i;
1360
1361         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1362                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1363                         return -1;
1364                 }
1365         }
1366         return 0;
1367 }
1368
1369 /*
1370   thaw databases of a certain priority
1371  */
1372 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1373 {
1374         int ret;
1375         int32_t res;
1376
1377         ret = ctdb_control(ctdb, destnode, priority, 
1378                            CTDB_CONTROL_THAW, 0, tdb_null, 
1379                            NULL, NULL, &res, &timeout, NULL);
1380         if (ret != 0 || res != 0) {
1381                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1382                 return -1;
1383         }
1384
1385         return 0;
1386 }
1387
1388 /* thaw all databases */
1389 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1390 {
1391         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1392 }
1393
1394 /*
1395   get pnn of a node, or -1
1396  */
1397 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1398 {
1399         int ret;
1400         uint32_t pnn;
1401         ctdb_handle *handle;
1402
1403         handle = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
1404         if (handle == NULL) {
1405                 DEBUG(DEBUG_ERR, (__location__ " Failed to send getpnn control\n"));
1406                 return -1;
1407         }
1408
1409         if (!timeval_is_zero(&timeout)) {
1410                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
1411         }
1412
1413         ret = ctdb_getpnn_recv(ctdb, handle, &pnn);
1414         if (ret != 0) {
1415                 DEBUG(DEBUG_ERR,(__location__ " ctdb control for getpnn failed\n"));
1416                 return -1;
1417         }
1418
1419         return pnn;
1420 }
1421
1422 /*
1423   get the monitoring mode of a remote node
1424  */
1425 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1426 {
1427         int ret;
1428         int32_t res;
1429
1430         ret = ctdb_control(ctdb, destnode, 0, 
1431                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1432                            NULL, NULL, &res, &timeout, NULL);
1433         if (ret != 0) {
1434                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1435                 return -1;
1436         }
1437
1438         *monmode = res;
1439
1440         return 0;
1441 }
1442
1443
1444 /*
1445  set the monitoring mode of a remote node to active
1446  */
1447 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1448 {
1449         int ret;
1450         
1451
1452         ret = ctdb_control(ctdb, destnode, 0, 
1453                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1454                            NULL, NULL,NULL, &timeout, NULL);
1455         if (ret != 0) {
1456                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1457                 return -1;
1458         }
1459
1460         
1461
1462         return 0;
1463 }
1464
1465 /*
1466   set the monitoring mode of a remote node to disable
1467  */
1468 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1469 {
1470         int ret;
1471         
1472
1473         ret = ctdb_control(ctdb, destnode, 0, 
1474                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1475                            NULL, NULL, NULL, &timeout, NULL);
1476         if (ret != 0) {
1477                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1478                 return -1;
1479         }
1480
1481         
1482
1483         return 0;
1484 }
1485
1486
1487
1488 /* 
1489   sent to a node to make it take over an ip address
1490 */
1491 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1492                           uint32_t destnode, struct ctdb_public_ip *ip)
1493 {
1494         TDB_DATA data;
1495         struct ctdb_public_ipv4 ipv4;
1496         int ret;
1497         int32_t res;
1498
1499         if (ip->addr.sa.sa_family == AF_INET) {
1500                 ipv4.pnn = ip->pnn;
1501                 ipv4.sin = ip->addr.ip;
1502
1503                 data.dsize = sizeof(ipv4);
1504                 data.dptr  = (uint8_t *)&ipv4;
1505
1506                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1507                            NULL, &res, &timeout, NULL);
1508         } else {
1509                 data.dsize = sizeof(*ip);
1510                 data.dptr  = (uint8_t *)ip;
1511
1512                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1513                            NULL, &res, &timeout, NULL);
1514         }
1515
1516         if (ret != 0 || res != 0) {
1517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1518                 return -1;
1519         }
1520
1521         return 0;       
1522 }
1523
1524
1525 /* 
1526   sent to a node to make it release an ip address
1527 */
1528 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1529                          uint32_t destnode, struct ctdb_public_ip *ip)
1530 {
1531         TDB_DATA data;
1532         struct ctdb_public_ipv4 ipv4;
1533         int ret;
1534         int32_t res;
1535
1536         if (ip->addr.sa.sa_family == AF_INET) {
1537                 ipv4.pnn = ip->pnn;
1538                 ipv4.sin = ip->addr.ip;
1539
1540                 data.dsize = sizeof(ipv4);
1541                 data.dptr  = (uint8_t *)&ipv4;
1542
1543                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1544                                    NULL, &res, &timeout, NULL);
1545         } else {
1546                 data.dsize = sizeof(*ip);
1547                 data.dptr  = (uint8_t *)ip;
1548
1549                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1550                                    NULL, &res, &timeout, NULL);
1551         }
1552
1553         if (ret != 0 || res != 0) {
1554                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1555                 return -1;
1556         }
1557
1558         return 0;       
1559 }
1560
1561
1562 /*
1563   get a tunable
1564  */
1565 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1566                           struct timeval timeout, 
1567                           uint32_t destnode,
1568                           const char *name, uint32_t *value)
1569 {
1570         struct ctdb_control_get_tunable *t;
1571         TDB_DATA data, outdata;
1572         int32_t res;
1573         int ret;
1574
1575         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1576         data.dptr  = talloc_size(ctdb, data.dsize);
1577         CTDB_NO_MEMORY(ctdb, data.dptr);
1578
1579         t = (struct ctdb_control_get_tunable *)data.dptr;
1580         t->length = strlen(name)+1;
1581         memcpy(t->name, name, t->length);
1582
1583         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1584                            &outdata, &res, &timeout, NULL);
1585         talloc_free(data.dptr);
1586         if (ret != 0 || res != 0) {
1587                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1588                 return -1;
1589         }
1590
1591         if (outdata.dsize != sizeof(uint32_t)) {
1592                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1593                 talloc_free(outdata.dptr);
1594                 return -1;
1595         }
1596         
1597         *value = *(uint32_t *)outdata.dptr;
1598         talloc_free(outdata.dptr);
1599
1600         return 0;
1601 }
1602
1603 /*
1604   set a tunable
1605  */
1606 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1607                           struct timeval timeout, 
1608                           uint32_t destnode,
1609                           const char *name, uint32_t value)
1610 {
1611         struct ctdb_control_set_tunable *t;
1612         TDB_DATA data;
1613         int32_t res;
1614         int ret;
1615
1616         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1617         data.dptr  = talloc_size(ctdb, data.dsize);
1618         CTDB_NO_MEMORY(ctdb, data.dptr);
1619
1620         t = (struct ctdb_control_set_tunable *)data.dptr;
1621         t->length = strlen(name)+1;
1622         memcpy(t->name, name, t->length);
1623         t->value = value;
1624
1625         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1626                            NULL, &res, &timeout, NULL);
1627         talloc_free(data.dptr);
1628         if (ret != 0 || res != 0) {
1629                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1630                 return -1;
1631         }
1632
1633         return 0;
1634 }
1635
1636 /*
1637   list tunables
1638  */
1639 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1640                             struct timeval timeout, 
1641                             uint32_t destnode,
1642                             TALLOC_CTX *mem_ctx,
1643                             const char ***list, uint32_t *count)
1644 {
1645         TDB_DATA outdata;
1646         int32_t res;
1647         int ret;
1648         struct ctdb_control_list_tunable *t;
1649         char *p, *s, *ptr;
1650
1651         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1652                            mem_ctx, &outdata, &res, &timeout, NULL);
1653         if (ret != 0 || res != 0) {
1654                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1655                 return -1;
1656         }
1657
1658         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1659         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1660             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1661                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1662                 talloc_free(outdata.dptr);
1663                 return -1;              
1664         }
1665         
1666         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1667         CTDB_NO_MEMORY(ctdb, p);
1668
1669         talloc_free(outdata.dptr);
1670         
1671         (*list) = NULL;
1672         (*count) = 0;
1673
1674         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1675                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1676                 CTDB_NO_MEMORY(ctdb, *list);
1677                 (*list)[*count] = talloc_strdup(*list, s);
1678                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1679                 (*count)++;
1680         }
1681
1682         talloc_free(p);
1683
1684         return 0;
1685 }
1686
1687
1688 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1689                                    struct timeval timeout, uint32_t destnode,
1690                                    TALLOC_CTX *mem_ctx,
1691                                    uint32_t flags,
1692                                    struct ctdb_all_public_ips **ips)
1693 {
1694         int ret;
1695         TDB_DATA outdata;
1696         int32_t res;
1697
1698         ret = ctdb_control(ctdb, destnode, 0, 
1699                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1700                            mem_ctx, &outdata, &res, &timeout, NULL);
1701         if (ret == 0 && res == -1) {
1702                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1703                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1704         }
1705         if (ret != 0 || res != 0) {
1706           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1707                 return -1;
1708         }
1709
1710         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1711         talloc_free(outdata.dptr);
1712                     
1713         return 0;
1714 }
1715
1716 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1717                              struct timeval timeout, uint32_t destnode,
1718                              TALLOC_CTX *mem_ctx,
1719                              struct ctdb_all_public_ips **ips)
1720 {
1721         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1722                                               destnode, mem_ctx,
1723                                               0, ips);
1724 }
1725
1726 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1727                         struct timeval timeout, uint32_t destnode, 
1728                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1729 {
1730         int ret, i, len;
1731         TDB_DATA outdata;
1732         int32_t res;
1733         struct ctdb_all_public_ipsv4 *ipsv4;
1734
1735         ret = ctdb_control(ctdb, destnode, 0, 
1736                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1737                            mem_ctx, &outdata, &res, &timeout, NULL);
1738         if (ret != 0 || res != 0) {
1739                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1740                 return -1;
1741         }
1742
1743         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1744         len = offsetof(struct ctdb_all_public_ips, ips) +
1745                 ipsv4->num*sizeof(struct ctdb_public_ip);
1746         *ips = talloc_zero_size(mem_ctx, len);
1747         CTDB_NO_MEMORY(ctdb, *ips);
1748         (*ips)->num = ipsv4->num;
1749         for (i=0; i<ipsv4->num; i++) {
1750                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1751                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1752         }
1753
1754         talloc_free(outdata.dptr);
1755                     
1756         return 0;
1757 }
1758
1759 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1760                                  struct timeval timeout, uint32_t destnode,
1761                                  TALLOC_CTX *mem_ctx,
1762                                  const ctdb_sock_addr *addr,
1763                                  struct ctdb_control_public_ip_info **_info)
1764 {
1765         int ret;
1766         TDB_DATA indata;
1767         TDB_DATA outdata;
1768         int32_t res;
1769         struct ctdb_control_public_ip_info *info;
1770         uint32_t len;
1771         uint32_t i;
1772
1773         indata.dptr = discard_const_p(uint8_t, addr);
1774         indata.dsize = sizeof(*addr);
1775
1776         ret = ctdb_control(ctdb, destnode, 0,
1777                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1778                            mem_ctx, &outdata, &res, &timeout, NULL);
1779         if (ret != 0 || res != 0) {
1780                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1781                                 "failed ret:%d res:%d\n",
1782                                 ret, res));
1783                 return -1;
1784         }
1785
1786         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1787         if (len > outdata.dsize) {
1788                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1789                                 "returned invalid data with size %u > %u\n",
1790                                 (unsigned int)outdata.dsize,
1791                                 (unsigned int)len));
1792                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1793                 return -1;
1794         }
1795
1796         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1797         len += info->num*sizeof(struct ctdb_control_iface_info);
1798
1799         if (len > outdata.dsize) {
1800                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1801                                 "returned invalid data with size %u > %u\n",
1802                                 (unsigned int)outdata.dsize,
1803                                 (unsigned int)len));
1804                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1805                 return -1;
1806         }
1807
1808         /* make sure we null terminate the returned strings */
1809         for (i=0; i < info->num; i++) {
1810                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1811         }
1812
1813         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1814                                                                 outdata.dptr,
1815                                                                 outdata.dsize);
1816         talloc_free(outdata.dptr);
1817         if (*_info == NULL) {
1818                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1819                                 "talloc_memdup size %u failed\n",
1820                                 (unsigned int)outdata.dsize));
1821                 return -1;
1822         }
1823
1824         return 0;
1825 }
1826
1827 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1828                          struct timeval timeout, uint32_t destnode,
1829                          TALLOC_CTX *mem_ctx,
1830                          struct ctdb_control_get_ifaces **_ifaces)
1831 {
1832         int ret;
1833         TDB_DATA outdata;
1834         int32_t res;
1835         struct ctdb_control_get_ifaces *ifaces;
1836         uint32_t len;
1837         uint32_t i;
1838
1839         ret = ctdb_control(ctdb, destnode, 0,
1840                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1841                            mem_ctx, &outdata, &res, &timeout, NULL);
1842         if (ret != 0 || res != 0) {
1843                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1844                                 "failed ret:%d res:%d\n",
1845                                 ret, res));
1846                 return -1;
1847         }
1848
1849         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1850         if (len > outdata.dsize) {
1851                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1852                                 "returned invalid data with size %u > %u\n",
1853                                 (unsigned int)outdata.dsize,
1854                                 (unsigned int)len));
1855                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1856                 return -1;
1857         }
1858
1859         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1860         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1861
1862         if (len > outdata.dsize) {
1863                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1864                                 "returned invalid data with size %u > %u\n",
1865                                 (unsigned int)outdata.dsize,
1866                                 (unsigned int)len));
1867                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1868                 return -1;
1869         }
1870
1871         /* make sure we null terminate the returned strings */
1872         for (i=0; i < ifaces->num; i++) {
1873                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1874         }
1875
1876         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1877                                                                   outdata.dptr,
1878                                                                   outdata.dsize);
1879         talloc_free(outdata.dptr);
1880         if (*_ifaces == NULL) {
1881                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1882                                 "talloc_memdup size %u failed\n",
1883                                 (unsigned int)outdata.dsize));
1884                 return -1;
1885         }
1886
1887         return 0;
1888 }
1889
1890 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1891                              struct timeval timeout, uint32_t destnode,
1892                              TALLOC_CTX *mem_ctx,
1893                              const struct ctdb_control_iface_info *info)
1894 {
1895         int ret;
1896         TDB_DATA indata;
1897         int32_t res;
1898
1899         indata.dptr = discard_const_p(uint8_t, info);
1900         indata.dsize = sizeof(*info);
1901
1902         ret = ctdb_control(ctdb, destnode, 0,
1903                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1904                            mem_ctx, NULL, &res, &timeout, NULL);
1905         if (ret != 0 || res != 0) {
1906                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1907                                 "failed ret:%d res:%d\n",
1908                                 ret, res));
1909                 return -1;
1910         }
1911
1912         return 0;
1913 }
1914
1915 /*
1916   set/clear the permanent disabled bit on a remote node
1917  */
1918 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1919                        uint32_t set, uint32_t clear)
1920 {
1921         int ret;
1922         TDB_DATA data;
1923         struct ctdb_node_map *nodemap=NULL;
1924         struct ctdb_node_flag_change c;
1925         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1926         uint32_t recmaster;
1927         uint32_t *nodes;
1928
1929
1930         /* find the recovery master */
1931         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1932         if (ret != 0) {
1933                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1934                 talloc_free(tmp_ctx);
1935                 return ret;
1936         }
1937
1938
1939         /* read the node flags from the recmaster */
1940         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1941         if (ret != 0) {
1942                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1943                 talloc_free(tmp_ctx);
1944                 return -1;
1945         }
1946         if (destnode >= nodemap->num) {
1947                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1948                 talloc_free(tmp_ctx);
1949                 return -1;
1950         }
1951
1952         c.pnn       = destnode;
1953         c.old_flags = nodemap->nodes[destnode].flags;
1954         c.new_flags = c.old_flags;
1955         c.new_flags |= set;
1956         c.new_flags &= ~clear;
1957
1958         data.dsize = sizeof(c);
1959         data.dptr = (unsigned char *)&c;
1960
1961         /* send the flags update to all connected nodes */
1962         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1963
1964         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1965                                         nodes, 0,
1966                                         timeout, false, data,
1967                                         NULL, NULL,
1968                                         NULL) != 0) {
1969                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1970
1971                 talloc_free(tmp_ctx);
1972                 return -1;
1973         }
1974
1975         talloc_free(tmp_ctx);
1976         return 0;
1977 }
1978
1979
1980 /*
1981   get all tunables
1982  */
1983 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1984                                struct timeval timeout, 
1985                                uint32_t destnode,
1986                                struct ctdb_tunable *tunables)
1987 {
1988         TDB_DATA outdata;
1989         int ret;
1990         int32_t res;
1991
1992         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1993                            &outdata, &res, &timeout, NULL);
1994         if (ret != 0 || res != 0) {
1995                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1996                 return -1;
1997         }
1998
1999         if (outdata.dsize != sizeof(*tunables)) {
2000                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
2001                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
2002                 return -1;              
2003         }
2004
2005         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2006         talloc_free(outdata.dptr);
2007         return 0;
2008 }
2009
2010 /*
2011   add a public address to a node
2012  */
2013 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2014                       struct timeval timeout, 
2015                       uint32_t destnode,
2016                       struct ctdb_control_ip_iface *pub)
2017 {
2018         TDB_DATA data;
2019         int32_t res;
2020         int ret;
2021
2022         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2023         data.dptr  = (unsigned char *)pub;
2024
2025         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2026                            NULL, &res, &timeout, NULL);
2027         if (ret != 0 || res != 0) {
2028                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2029                 return -1;
2030         }
2031
2032         return 0;
2033 }
2034
2035 /*
2036   delete a public address from a node
2037  */
2038 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2039                       struct timeval timeout, 
2040                       uint32_t destnode,
2041                       struct ctdb_control_ip_iface *pub)
2042 {
2043         TDB_DATA data;
2044         int32_t res;
2045         int ret;
2046
2047         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2048         data.dptr  = (unsigned char *)pub;
2049
2050         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2051                            NULL, &res, &timeout, NULL);
2052         if (ret != 0 || res != 0) {
2053                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2054                 return -1;
2055         }
2056
2057         return 0;
2058 }
2059
2060 /*
2061   kill a tcp connection
2062  */
2063 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2064                       struct timeval timeout, 
2065                       uint32_t destnode,
2066                       struct ctdb_control_killtcp *killtcp)
2067 {
2068         TDB_DATA data;
2069         int32_t res;
2070         int ret;
2071
2072         data.dsize = sizeof(struct ctdb_control_killtcp);
2073         data.dptr  = (unsigned char *)killtcp;
2074
2075         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2076                            NULL, &res, &timeout, NULL);
2077         if (ret != 0 || res != 0) {
2078                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2079                 return -1;
2080         }
2081
2082         return 0;
2083 }
2084
2085 /*
2086   send a gratious arp
2087  */
2088 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2089                       struct timeval timeout, 
2090                       uint32_t destnode,
2091                       ctdb_sock_addr *addr,
2092                       const char *ifname)
2093 {
2094         TDB_DATA data;
2095         int32_t res;
2096         int ret, len;
2097         struct ctdb_control_gratious_arp *gratious_arp;
2098         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2099
2100
2101         len = strlen(ifname)+1;
2102         gratious_arp = talloc_size(tmp_ctx, 
2103                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2104         CTDB_NO_MEMORY(ctdb, gratious_arp);
2105
2106         gratious_arp->addr = *addr;
2107         gratious_arp->len = len;
2108         memcpy(&gratious_arp->iface[0], ifname, len);
2109
2110
2111         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2112         data.dptr  = (unsigned char *)gratious_arp;
2113
2114         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2115                            NULL, &res, &timeout, NULL);
2116         if (ret != 0 || res != 0) {
2117                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2118                 talloc_free(tmp_ctx);
2119                 return -1;
2120         }
2121
2122         talloc_free(tmp_ctx);
2123         return 0;
2124 }
2125
2126 /*
2127   get a list of all tcp tickles that a node knows about for a particular vnn
2128  */
2129 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2130                               struct timeval timeout, uint32_t destnode, 
2131                               TALLOC_CTX *mem_ctx, 
2132                               ctdb_sock_addr *addr,
2133                               struct ctdb_control_tcp_tickle_list **list)
2134 {
2135         int ret;
2136         TDB_DATA data, outdata;
2137         int32_t status;
2138
2139         data.dptr = (uint8_t*)addr;
2140         data.dsize = sizeof(ctdb_sock_addr);
2141
2142         ret = ctdb_control(ctdb, destnode, 0, 
2143                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2144                            mem_ctx, &outdata, &status, NULL, NULL);
2145         if (ret != 0 || status != 0) {
2146                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2147                 return -1;
2148         }
2149
2150         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2151
2152         return status;
2153 }
2154
2155 /*
2156   register a server id
2157  */
2158 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2159                       struct timeval timeout, 
2160                       struct ctdb_server_id *id)
2161 {
2162         TDB_DATA data;
2163         int32_t res;
2164         int ret;
2165
2166         data.dsize = sizeof(struct ctdb_server_id);
2167         data.dptr  = (unsigned char *)id;
2168
2169         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2170                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2171                         0, data, NULL,
2172                         NULL, &res, &timeout, NULL);
2173         if (ret != 0 || res != 0) {
2174                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2175                 return -1;
2176         }
2177
2178         return 0;
2179 }
2180
2181 /*
2182   unregister a server id
2183  */
2184 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2185                       struct timeval timeout, 
2186                       struct ctdb_server_id *id)
2187 {
2188         TDB_DATA data;
2189         int32_t res;
2190         int ret;
2191
2192         data.dsize = sizeof(struct ctdb_server_id);
2193         data.dptr  = (unsigned char *)id;
2194
2195         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2196                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2197                         0, data, NULL,
2198                         NULL, &res, &timeout, NULL);
2199         if (ret != 0 || res != 0) {
2200                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2201                 return -1;
2202         }
2203
2204         return 0;
2205 }
2206
2207
2208 /*
2209   check if a server id exists
2210
2211   if a server id does exist, return *status == 1, otherwise *status == 0
2212  */
2213 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2214                       struct timeval timeout, 
2215                       uint32_t destnode,
2216                       struct ctdb_server_id *id,
2217                       uint32_t *status)
2218 {
2219         TDB_DATA data;
2220         int32_t res;
2221         int ret;
2222
2223         data.dsize = sizeof(struct ctdb_server_id);
2224         data.dptr  = (unsigned char *)id;
2225
2226         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2227                         0, data, NULL,
2228                         NULL, &res, &timeout, NULL);
2229         if (ret != 0) {
2230                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2231                 return -1;
2232         }
2233
2234         if (res) {
2235                 *status = 1;
2236         } else {
2237                 *status = 0;
2238         }
2239
2240         return 0;
2241 }
2242
2243 /*
2244    get the list of server ids that are registered on a node
2245 */
2246 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2247                 TALLOC_CTX *mem_ctx,
2248                 struct timeval timeout, uint32_t destnode, 
2249                 struct ctdb_server_id_list **svid_list)
2250 {
2251         int ret;
2252         TDB_DATA outdata;
2253         int32_t res;
2254
2255         ret = ctdb_control(ctdb, destnode, 0, 
2256                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2257                            mem_ctx, &outdata, &res, &timeout, NULL);
2258         if (ret != 0 || res != 0) {
2259                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2260                 return -1;
2261         }
2262
2263         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2264                     
2265         return 0;
2266 }
2267
2268 /*
2269   set some ctdb flags
2270 */
2271 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2272 {
2273         ctdb->flags |= flags;
2274 }
2275
2276
2277 /*
2278   return the pnn of this node
2279 */
2280 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2281 {
2282         return ctdb->pnn;
2283 }
2284
2285
2286 /*
2287   get the uptime of a remote node
2288  */
2289 struct ctdb_client_control_state *
2290 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2291 {
2292         struct ctdb_client_control_state *state;
2293
2294         state = ctdb_control_send(ctdb, destnode, 0, 
2295                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2296                            mem_ctx, NULL);
2297         if (state != NULL && !timeval_is_zero(&timeout)) {
2298                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2299         }
2300
2301         return state;
2302 }
2303
2304 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2305 {
2306         int ret;
2307         int32_t res;
2308         TDB_DATA outdata;
2309
2310         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2311         if (ret != 0 || res != 0) {
2312                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2313                 return -1;
2314         }
2315
2316         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2317
2318         return 0;
2319 }
2320
2321 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2322 {
2323         struct ctdb_client_control_state *state;
2324
2325         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2326         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2327 }
2328
2329 /*
2330   send a control to execute the "recovered" event script on a node
2331  */
2332 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2333 {
2334         int ret;
2335         int32_t status;
2336
2337         ret = ctdb_control(ctdb, destnode, 0, 
2338                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2339                            NULL, NULL, &status, &timeout, NULL);
2340         if (ret != 0 || status != 0) {
2341                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2342                 return -1;
2343         }
2344
2345         return 0;
2346 }
2347
2348 /* 
2349   callback for the async helpers used when sending the same control
2350   to multiple nodes in parallell.
2351 */
2352 static void async_callback(struct ctdb_client_control_state *state)
2353 {
2354         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2355         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2356         int ret;
2357         TDB_DATA outdata;
2358         int32_t res;
2359         uint32_t destnode = state->c->hdr.destnode;
2360
2361         /* one more node has responded with recmode data */
2362         data->count--;
2363
2364         /* if we failed to push the db, then return an error and let
2365            the main loop try again.
2366         */
2367         if (state->state != CTDB_CONTROL_DONE) {
2368                 if ( !data->dont_log_errors) {
2369                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2370                 }
2371                 data->fail_count++;
2372                 if (data->fail_callback) {
2373                         data->fail_callback(ctdb, destnode, res, outdata,
2374                                         data->callback_data);
2375                 }
2376                 return;
2377         }
2378         
2379         state->async.fn = NULL;
2380
2381         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2382         if ((ret != 0) || (res != 0)) {
2383                 if ( !data->dont_log_errors) {
2384                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2385                 }
2386                 data->fail_count++;
2387                 if (data->fail_callback) {
2388                         data->fail_callback(ctdb, destnode, res, outdata,
2389                                         data->callback_data);
2390                 }
2391         }
2392         if ((ret == 0) && (data->callback != NULL)) {
2393                 data->callback(ctdb, destnode, res, outdata,
2394                                         data->callback_data);
2395         }
2396 }
2397
2398
2399 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2400 {
2401         /* set up the callback functions */
2402         state->async.fn = async_callback;
2403         state->async.private_data = data;
2404         
2405         /* one more control to wait for to complete */
2406         data->count++;
2407 }
2408
2409
2410 /* wait for up to the maximum number of seconds allowed
2411    or until all nodes we expect a response from has replied
2412 */
2413 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2414 {
2415         while (data->count > 0) {
2416                 event_loop_once(ctdb->ev);
2417         }
2418         if (data->fail_count != 0) {
2419                 if (!data->dont_log_errors) {
2420                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2421                                  data->fail_count));
2422                 }
2423                 return -1;
2424         }
2425         return 0;
2426 }
2427
2428
2429 /* 
2430    perform a simple control on the listed nodes
2431    The control cannot return data
2432  */
2433 int ctdb_client_async_control(struct ctdb_context *ctdb,
2434                                 enum ctdb_controls opcode,
2435                                 uint32_t *nodes,
2436                                 uint64_t srvid,
2437                                 struct timeval timeout,
2438                                 bool dont_log_errors,
2439                                 TDB_DATA data,
2440                                 client_async_callback client_callback,
2441                                 client_async_callback fail_callback,
2442                                 void *callback_data)
2443 {
2444         struct client_async_data *async_data;
2445         struct ctdb_client_control_state *state;
2446         int j, num_nodes;
2447
2448         async_data = talloc_zero(ctdb, struct client_async_data);
2449         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2450         async_data->dont_log_errors = dont_log_errors;
2451         async_data->callback = client_callback;
2452         async_data->fail_callback = fail_callback;
2453         async_data->callback_data = callback_data;
2454         async_data->opcode        = opcode;
2455
2456         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2457
2458         /* loop over all nodes and send an async control to each of them */
2459         for (j=0; j<num_nodes; j++) {
2460                 uint32_t pnn = nodes[j];
2461
2462                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2463                                           0, data, async_data, NULL);
2464                 if (state == NULL) {
2465                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2466                         talloc_free(async_data);
2467                         return -1;
2468                 }
2469                 if (!timeval_is_zero(&timeout)) {
2470                         event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2471                 }
2472                 
2473                 ctdb_client_async_add(async_data, state);
2474         }
2475
2476         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2477                 talloc_free(async_data);
2478                 return -1;
2479         }
2480
2481         talloc_free(async_data);
2482         return 0;
2483 }
2484
2485 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2486                                 struct ctdb_vnn_map *vnn_map,
2487                                 TALLOC_CTX *mem_ctx,
2488                                 bool include_self)
2489 {
2490         int i, j, num_nodes;
2491         uint32_t *nodes;
2492
2493         for (i=num_nodes=0;i<vnn_map->size;i++) {
2494                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2495                         continue;
2496                 }
2497                 num_nodes++;
2498         } 
2499
2500         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2501         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2502
2503         for (i=j=0;i<vnn_map->size;i++) {
2504                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2505                         continue;
2506                 }
2507                 nodes[j++] = vnn_map->map[i];
2508         } 
2509
2510         return nodes;
2511 }
2512
2513 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2514                                 struct ctdb_node_map *node_map,
2515                                 TALLOC_CTX *mem_ctx,
2516                                 bool include_self)
2517 {
2518         int i, j, num_nodes;
2519         uint32_t *nodes;
2520
2521         for (i=num_nodes=0;i<node_map->num;i++) {
2522                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2523                         continue;
2524                 }
2525                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2526                         continue;
2527                 }
2528                 num_nodes++;
2529         } 
2530
2531         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2532         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2533
2534         for (i=j=0;i<node_map->num;i++) {
2535                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2536                         continue;
2537                 }
2538                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2539                         continue;
2540                 }
2541                 nodes[j++] = node_map->nodes[i].pnn;
2542         } 
2543
2544         return nodes;
2545 }
2546
2547 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2548                                 struct ctdb_node_map *node_map,
2549                                 TALLOC_CTX *mem_ctx,
2550                                 uint32_t pnn)
2551 {
2552         int i, j, num_nodes;
2553         uint32_t *nodes;
2554
2555         for (i=num_nodes=0;i<node_map->num;i++) {
2556                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2557                         continue;
2558                 }
2559                 if (node_map->nodes[i].pnn == pnn) {
2560                         continue;
2561                 }
2562                 num_nodes++;
2563         } 
2564
2565         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2566         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2567
2568         for (i=j=0;i<node_map->num;i++) {
2569                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2570                         continue;
2571                 }
2572                 if (node_map->nodes[i].pnn == pnn) {
2573                         continue;
2574                 }
2575                 nodes[j++] = node_map->nodes[i].pnn;
2576         } 
2577
2578         return nodes;
2579 }
2580
2581 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2582                                 struct ctdb_node_map *node_map,
2583                                 TALLOC_CTX *mem_ctx,
2584                                 bool include_self)
2585 {
2586         int i, j, num_nodes;
2587         uint32_t *nodes;
2588
2589         for (i=num_nodes=0;i<node_map->num;i++) {
2590                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2591                         continue;
2592                 }
2593                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2594                         continue;
2595                 }
2596                 num_nodes++;
2597         } 
2598
2599         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2600         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2601
2602         for (i=j=0;i<node_map->num;i++) {
2603                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2604                         continue;
2605                 }
2606                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2607                         continue;
2608                 }
2609                 nodes[j++] = node_map->nodes[i].pnn;
2610         } 
2611
2612         return nodes;
2613 }
2614
2615 /* 
2616   this is used to test if a pnn lock exists and if it exists will return
2617   the number of connections that pnn has reported or -1 if that recovery
2618   daemon is not running.
2619 */
2620 int
2621 ctdb_read_pnn_lock(int fd, int32_t pnn)
2622 {
2623         struct flock lock;
2624         char c;
2625
2626         lock.l_type = F_WRLCK;
2627         lock.l_whence = SEEK_SET;
2628         lock.l_start = pnn;
2629         lock.l_len = 1;
2630         lock.l_pid = 0;
2631
2632         if (fcntl(fd, F_GETLK, &lock) != 0) {
2633                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2634                 return -1;
2635         }
2636
2637         if (lock.l_type == F_UNLCK) {
2638                 return -1;
2639         }
2640
2641         if (pread(fd, &c, 1, pnn) == -1) {
2642                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2643                 return -1;
2644         }
2645
2646         return c;
2647 }
2648
2649 /*
2650   get capabilities of a remote node
2651  */
2652 struct ctdb_client_control_state *
2653 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2654 {
2655         struct ctdb_client_control_state *state;
2656
2657         state = ctdb_control_send(ctdb, destnode, 0, 
2658                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2659                            mem_ctx, NULL);
2660         if (state != NULL && !timeval_is_zero(&timeout)) {
2661                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2662         }
2663
2664         return state;
2665 }
2666
2667 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2668 {
2669         int ret;
2670         int32_t res;
2671         TDB_DATA outdata;
2672
2673         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2674         if ( (ret != 0) || (res != 0) ) {
2675                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2676                 return -1;
2677         }
2678
2679         if (capabilities) {
2680                 *capabilities = *((uint32_t *)outdata.dptr);
2681         }
2682
2683         return 0;
2684 }
2685
2686 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2687 {
2688         struct ctdb_client_control_state *state;
2689         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2690         int ret;
2691
2692         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2693         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2694         talloc_free(tmp_ctx);
2695         return ret;
2696 }
2697
2698 /**
2699  * check whether a transaction is active on a given db on a given node
2700  */
2701 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2702                                      uint32_t destnode,
2703                                      uint32_t db_id)
2704 {
2705         int32_t status;
2706         int ret;
2707         TDB_DATA indata;
2708
2709         indata.dptr = (uint8_t *)&db_id;
2710         indata.dsize = sizeof(db_id);
2711
2712         ret = ctdb_control(ctdb, destnode, 0,
2713                            CTDB_CONTROL_TRANS2_ACTIVE,
2714                            0, indata, NULL, NULL, &status,
2715                            NULL, NULL);
2716
2717         if (ret != 0) {
2718                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2719                 return -1;
2720         }
2721
2722         return status;
2723 }
2724
2725
2726 struct ctdb_transaction_handle {
2727         struct ctdb_db_context *ctdb_db;
2728         bool in_replay;
2729         /*
2730          * we store the reads and writes done under a transaction:
2731          * - one list stores both reads and writes (m_all),
2732          * - the other just writes (m_write)
2733          */
2734         struct ctdb_marshall_buffer *m_all;
2735         struct ctdb_marshall_buffer *m_write;
2736 };
2737
2738 /* start a transaction on a database */
2739 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2740 {
2741         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2742         return 0;
2743 }
2744
2745 /* start a transaction on a database */
2746 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2747 {
2748         struct ctdb_record_handle *rh;
2749         TDB_DATA key;
2750         TDB_DATA data;
2751         struct ctdb_ltdb_header header;
2752         TALLOC_CTX *tmp_ctx;
2753         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2754         int ret;
2755         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2756         pid_t pid;
2757         int32_t status;
2758
2759         key.dptr = discard_const(keyname);
2760         key.dsize = strlen(keyname);
2761
2762         if (!ctdb_db->persistent) {
2763                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2764                 return -1;
2765         }
2766
2767 again:
2768         tmp_ctx = talloc_new(h);
2769
2770         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2771         if (rh == NULL) {
2772                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2773                 talloc_free(tmp_ctx);
2774                 return -1;
2775         }
2776
2777         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2778                                               CTDB_CURRENT_NODE,
2779                                               ctdb_db->db_id);
2780         if (status == 1) {
2781                 unsigned long int usec = (1000 + random()) % 100000;
2782                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2783                                     "on db_id[0x%08x]. waiting for %lu "
2784                                     "microseconds\n",
2785                                     ctdb_db->db_id, usec));
2786                 talloc_free(tmp_ctx);
2787                 usleep(usec);
2788                 goto again;
2789         }
2790
2791         /*
2792          * store the pid in the database:
2793          * it is not enough that the node is dmaster...
2794          */
2795         pid = getpid();
2796         data.dptr = (unsigned char *)&pid;
2797         data.dsize = sizeof(pid_t);
2798         rh->header.rsn++;
2799         rh->header.dmaster = ctdb_db->ctdb->pnn;
2800         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2801         if (ret != 0) {
2802                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2803                                   "transaction record\n"));
2804                 talloc_free(tmp_ctx);
2805                 return -1;
2806         }
2807
2808         talloc_free(rh);
2809
2810         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2811         if (ret != 0) {
2812                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2813                 talloc_free(tmp_ctx);
2814                 return -1;
2815         }
2816
2817         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2818         if (ret != 0) {
2819                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2820                                  "lock record inside transaction\n"));
2821                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2822                 talloc_free(tmp_ctx);
2823                 goto again;
2824         }
2825
2826         if (header.dmaster != ctdb_db->ctdb->pnn) {
2827                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2828                                    "transaction lock record\n"));
2829                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2830                 talloc_free(tmp_ctx);
2831                 goto again;
2832         }
2833
2834         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2835                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2836                                     "the transaction lock record\n"));
2837                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2838                 talloc_free(tmp_ctx);
2839                 goto again;
2840         }
2841
2842         talloc_free(tmp_ctx);
2843
2844         return 0;
2845 }
2846
2847
2848 /* start a transaction on a database */
2849 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2850                                                        TALLOC_CTX *mem_ctx)
2851 {
2852         struct ctdb_transaction_handle *h;
2853         int ret;
2854
2855         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2856         if (h == NULL) {
2857                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2858                 return NULL;
2859         }
2860
2861         h->ctdb_db = ctdb_db;
2862
2863         ret = ctdb_transaction_fetch_start(h);
2864         if (ret != 0) {
2865                 talloc_free(h);
2866                 return NULL;
2867         }
2868
2869         talloc_set_destructor(h, ctdb_transaction_destructor);
2870
2871         return h;
2872 }
2873
2874
2875
2876 /*
2877   fetch a record inside a transaction
2878  */
2879 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2880                            TALLOC_CTX *mem_ctx, 
2881                            TDB_DATA key, TDB_DATA *data)
2882 {
2883         struct ctdb_ltdb_header header;
2884         int ret;
2885
2886         ZERO_STRUCT(header);
2887
2888         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2889         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2890                 /* record doesn't exist yet */
2891                 *data = tdb_null;
2892                 ret = 0;
2893         }
2894         
2895         if (ret != 0) {
2896                 return ret;
2897         }
2898
2899         if (!h->in_replay) {
2900                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2901                 if (h->m_all == NULL) {
2902                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2903                         return -1;
2904                 }
2905         }
2906
2907         return 0;
2908 }
2909
2910 /*
2911   stores a record inside a transaction
2912  */
2913 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2914                            TDB_DATA key, TDB_DATA data)
2915 {
2916         TALLOC_CTX *tmp_ctx = talloc_new(h);
2917         struct ctdb_ltdb_header header;
2918         TDB_DATA olddata;
2919         int ret;
2920
2921         ZERO_STRUCT(header);
2922
2923         /* we need the header so we can update the RSN */
2924         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2925         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2926                 /* the record doesn't exist - create one with us as dmaster.
2927                    This is only safe because we are in a transaction and this
2928                    is a persistent database */
2929                 ZERO_STRUCT(header);
2930         } else if (ret != 0) {
2931                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2932                 talloc_free(tmp_ctx);
2933                 return ret;
2934         }
2935
2936         if (data.dsize == olddata.dsize &&
2937             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2938                 /* save writing the same data */
2939                 talloc_free(tmp_ctx);
2940                 return 0;
2941         }
2942
2943         header.dmaster = h->ctdb_db->ctdb->pnn;
2944         header.rsn++;
2945
2946         if (!h->in_replay) {
2947                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2948                 if (h->m_all == NULL) {
2949                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2950                         talloc_free(tmp_ctx);
2951                         return -1;
2952                 }
2953         }               
2954
2955         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2956         if (h->m_write == NULL) {
2957                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2958                 talloc_free(tmp_ctx);
2959                 return -1;
2960         }
2961         
2962         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2963
2964         talloc_free(tmp_ctx);
2965         
2966         return ret;
2967 }
2968
2969 /*
2970   replay a transaction
2971  */
2972 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2973 {
2974         int ret, i;
2975         struct ctdb_rec_data *rec = NULL;
2976
2977         h->in_replay = true;
2978         talloc_free(h->m_write);
2979         h->m_write = NULL;
2980
2981         ret = ctdb_transaction_fetch_start(h);
2982         if (ret != 0) {
2983                 return ret;
2984         }
2985
2986         for (i=0;i<h->m_all->count;i++) {
2987                 TDB_DATA key, data;
2988
2989                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2990                 if (rec == NULL) {
2991                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2992                         goto failed;
2993                 }
2994
2995                 if (rec->reqid == 0) {
2996                         /* its a store */
2997                         if (ctdb_transaction_store(h, key, data) != 0) {
2998                                 goto failed;
2999                         }
3000                 } else {
3001                         TDB_DATA data2;
3002                         TALLOC_CTX *tmp_ctx = talloc_new(h);
3003
3004                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3005                                 talloc_free(tmp_ctx);
3006                                 goto failed;
3007                         }
3008                         if (data2.dsize != data.dsize ||
3009                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3010                                 /* the record has changed on us - we have to give up */
3011                                 talloc_free(tmp_ctx);
3012                                 goto failed;
3013                         }
3014                         talloc_free(tmp_ctx);
3015                 }
3016         }
3017         
3018         return 0;
3019
3020 failed:
3021         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3022         return -1;
3023 }
3024
3025
3026 /*
3027   commit a transaction
3028  */
3029 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3030 {
3031         int ret, retries=0;
3032         int32_t status;
3033         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3034         struct timeval timeout;
3035         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3036
3037         talloc_set_destructor(h, NULL);
3038
3039         /* our commit strategy is quite complex.
3040
3041            - we first try to commit the changes to all other nodes
3042
3043            - if that works, then we commit locally and we are done
3044
3045            - if a commit on another node fails, then we need to cancel
3046              the transaction, then restart the transaction (thus
3047              opening a window of time for a pending recovery to
3048              complete), then replay the transaction, checking all the
3049              reads and writes (checking that reads give the same data,
3050              and writes succeed). Then we retry the transaction to the
3051              other nodes
3052         */
3053
3054 again:
3055         if (h->m_write == NULL) {
3056                 /* no changes were made */
3057                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3058                 talloc_free(h);
3059                 return 0;
3060         }
3061
3062         /* tell ctdbd to commit to the other nodes */
3063         timeout = timeval_current_ofs(1, 0);
3064         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3065                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3066                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3067                            &timeout, NULL);
3068         if (ret != 0 || status != 0) {
3069                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3070                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3071                                      ", retrying after 1 second...\n",
3072                                      (retries==0)?"":"retry "));
3073                 sleep(1);
3074
3075                 if (ret != 0) {
3076                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3077                 } else {
3078                         /* work out what error code we will give if we 
3079                            have to fail the operation */
3080                         switch ((enum ctdb_trans2_commit_error)status) {
3081                         case CTDB_TRANS2_COMMIT_SUCCESS:
3082                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3083                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3084                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3085                                 break;
3086                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3087                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3088                                 break;
3089                         }
3090                 }
3091
3092                 if (++retries == 100) {
3093                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3094                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3095                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3096                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3097                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3098                         talloc_free(h);
3099                         return -1;
3100                 }               
3101
3102                 if (ctdb_replay_transaction(h) != 0) {
3103                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3104                                           "transaction on db 0x%08x, "
3105                                           "failure control =%u\n",
3106                                           h->ctdb_db->db_id,
3107                                           (unsigned)failure_control));
3108                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3109                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3110                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3111                         talloc_free(h);
3112                         return -1;
3113                 }
3114                 goto again;
3115         } else {
3116                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3117         }
3118
3119         /* do the real commit locally */
3120         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3121         if (ret != 0) {
3122                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3123                                   "on db id 0x%08x locally, "
3124                                   "failure_control=%u\n",
3125                                   h->ctdb_db->db_id,
3126                                   (unsigned)failure_control));
3127                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3128                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3129                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3130                 talloc_free(h);
3131                 return ret;
3132         }
3133
3134         /* tell ctdbd that we are finished with our local commit */
3135         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3136                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3137                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3138         talloc_free(h);
3139         return 0;
3140 }
3141
3142 /*
3143   recovery daemon ping to main daemon
3144  */
3145 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3146 {
3147         int ret;
3148         int32_t res;
3149
3150         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3151                            ctdb, NULL, &res, NULL, NULL);
3152         if (ret != 0 || res != 0) {
3153                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3154                 return -1;
3155         }
3156
3157         return 0;
3158 }
3159
3160 /* when forking the main daemon and the child process needs to connect back
3161  * to the daemon as a client process, this function can be used to change
3162  * the ctdb context from daemon into client mode
3163  */
3164 int switch_from_server_to_client(struct ctdb_context *ctdb)
3165 {
3166         int ret;
3167
3168         /* shutdown the transport */
3169         if (ctdb->methods) {
3170                 ctdb->methods->shutdown(ctdb);
3171         }
3172
3173         /* get a new event context */
3174         talloc_free(ctdb->ev);
3175         ctdb->ev = event_context_init(ctdb);
3176
3177         close(ctdb->daemon.sd);
3178         ctdb->daemon.sd = -1;
3179
3180         /* initialise ctdb */
3181         ret = ctdb_socket_connect(ctdb);
3182         if (ret != 0) {
3183                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3184                 return -1;
3185         }
3186
3187          return 0;
3188 }
3189
3190 /*
3191   get the status of running the monitor eventscripts: NULL means never run.
3192  */
3193 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3194                 struct timeval timeout, uint32_t destnode, 
3195                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3196                 struct ctdb_scripts_wire **script_status)
3197 {
3198         int ret;
3199         TDB_DATA outdata, indata;
3200         int32_t res;
3201         uint32_t uinttype = type;
3202
3203         indata.dptr = (uint8_t *)&uinttype;
3204         indata.dsize = sizeof(uinttype);
3205
3206         ret = ctdb_control(ctdb, destnode, 0, 
3207                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3208                            mem_ctx, &outdata, &res, &timeout, NULL);
3209         if (ret != 0 || res != 0) {
3210                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3211                 return -1;
3212         }
3213
3214         if (outdata.dsize == 0) {
3215                 *script_status = NULL;
3216         } else {
3217                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3218                 talloc_free(outdata.dptr);
3219         }
3220                     
3221         return 0;
3222 }
3223
3224 /*
3225   tell the main daemon how long it took to lock the reclock file
3226  */
3227 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3228 {
3229         int ret;
3230         int32_t res;
3231         TDB_DATA data;
3232
3233         data.dptr = (uint8_t *)&latency;
3234         data.dsize = sizeof(latency);
3235
3236         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3237                            ctdb, NULL, &res, NULL, NULL);
3238         if (ret != 0 || res != 0) {
3239                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3240                 return -1;
3241         }
3242
3243         return 0;
3244 }
3245
3246 /*
3247   get the name of the reclock file
3248  */
3249 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3250                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3251                          const char **name)
3252 {
3253         int ret;
3254         int32_t res;
3255         TDB_DATA data;
3256
3257         ret = ctdb_control(ctdb, destnode, 0, 
3258                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3259                            mem_ctx, &data, &res, &timeout, NULL);
3260         if (ret != 0 || res != 0) {
3261                 return -1;
3262         }
3263
3264         if (data.dsize == 0) {
3265                 *name = NULL;
3266         } else {
3267                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3268         }
3269         talloc_free(data.dptr);
3270
3271         return 0;
3272 }
3273
3274 /*
3275   set the reclock filename for a node
3276  */
3277 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3278 {
3279         int ret;
3280         TDB_DATA data;
3281         int32_t res;
3282
3283         if (reclock == NULL) {
3284                 data.dsize = 0;
3285                 data.dptr  = NULL;
3286         } else {
3287                 data.dsize = strlen(reclock) + 1;
3288                 data.dptr  = discard_const(reclock);
3289         }
3290
3291         ret = ctdb_control(ctdb, destnode, 0, 
3292                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3293                            NULL, NULL, &res, &timeout, NULL);
3294         if (ret != 0 || res != 0) {
3295                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3296                 return -1;
3297         }
3298
3299         return 0;
3300 }
3301
3302 /*
3303   stop a node
3304  */
3305 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3306 {
3307         int ret;
3308         int32_t res;
3309
3310         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3311                            ctdb, NULL, &res, &timeout, NULL);
3312         if (ret != 0 || res != 0) {
3313                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3314                 return -1;
3315         }
3316
3317         return 0;
3318 }
3319
3320 /*
3321   continue a node
3322  */
3323 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3324 {
3325         int ret;
3326
3327         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3328                            ctdb, NULL, NULL, &timeout, NULL);
3329         if (ret != 0) {
3330                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3331                 return -1;
3332         }
3333
3334         return 0;
3335 }
3336
3337 /*
3338   set the natgw state for a node
3339  */
3340 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3341 {
3342         int ret;
3343         TDB_DATA data;
3344         int32_t res;
3345
3346         data.dsize = sizeof(natgwstate);
3347         data.dptr  = (uint8_t *)&natgwstate;
3348
3349         ret = ctdb_control(ctdb, destnode, 0, 
3350                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3351                            NULL, NULL, &res, &timeout, NULL);
3352         if (ret != 0 || res != 0) {
3353                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3354                 return -1;
3355         }
3356
3357         return 0;
3358 }
3359
3360 /*
3361   set the lmaster role for a node
3362  */
3363 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3364 {
3365         int ret;
3366         TDB_DATA data;
3367         int32_t res;
3368
3369         data.dsize = sizeof(lmasterrole);
3370         data.dptr  = (uint8_t *)&lmasterrole;
3371
3372         ret = ctdb_control(ctdb, destnode, 0, 
3373                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3374                            NULL, NULL, &res, &timeout, NULL);
3375         if (ret != 0 || res != 0) {
3376                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3377                 return -1;
3378         }
3379
3380         return 0;
3381 }
3382
3383 /*
3384   set the recmaster role for a node
3385  */
3386 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3387 {
3388         int ret;
3389         TDB_DATA data;
3390         int32_t res;
3391
3392         data.dsize = sizeof(recmasterrole);
3393         data.dptr  = (uint8_t *)&recmasterrole;
3394
3395         ret = ctdb_control(ctdb, destnode, 0, 
3396                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3397                            NULL, NULL, &res, &timeout, NULL);
3398         if (ret != 0 || res != 0) {
3399                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3400                 return -1;
3401         }
3402
3403         return 0;
3404 }
3405
3406 /* enable an eventscript
3407  */
3408 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3409 {
3410         int ret;
3411         TDB_DATA data;
3412         int32_t res;
3413
3414         data.dsize = strlen(script) + 1;
3415         data.dptr  = discard_const(script);
3416
3417         ret = ctdb_control(ctdb, destnode, 0, 
3418                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3419                            NULL, NULL, &res, &timeout, NULL);
3420         if (ret != 0 || res != 0) {
3421                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3422                 return -1;
3423         }
3424
3425         return 0;
3426 }
3427
3428 /* disable an eventscript
3429  */
3430 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3431 {
3432         int ret;
3433         TDB_DATA data;
3434         int32_t res;
3435
3436         data.dsize = strlen(script) + 1;
3437         data.dptr  = discard_const(script);
3438
3439         ret = ctdb_control(ctdb, destnode, 0, 
3440                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3441                            NULL, NULL, &res, &timeout, NULL);
3442         if (ret != 0 || res != 0) {
3443                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3444                 return -1;
3445         }
3446
3447         return 0;
3448 }
3449
3450
3451 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3452 {
3453         int ret;
3454         TDB_DATA data;
3455         int32_t res;
3456
3457         data.dsize = sizeof(*bantime);
3458         data.dptr  = (uint8_t *)bantime;
3459
3460         ret = ctdb_control(ctdb, destnode, 0, 
3461                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3462                            NULL, NULL, &res, &timeout, NULL);
3463         if (ret != 0 || res != 0) {
3464                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3465                 return -1;
3466         }
3467
3468         return 0;
3469 }
3470
3471
3472 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3473 {
3474         int ret;
3475         TDB_DATA outdata;
3476         int32_t res;
3477         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3478
3479         ret = ctdb_control(ctdb, destnode, 0, 
3480                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3481                            tmp_ctx, &outdata, &res, &timeout, NULL);
3482         if (ret != 0 || res != 0) {
3483                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3484                 talloc_free(tmp_ctx);
3485                 return -1;
3486         }
3487
3488         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3489         talloc_free(tmp_ctx);
3490
3491         return 0;
3492 }
3493
3494
3495 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3496 {
3497         int ret;
3498         int32_t res;
3499         TDB_DATA data;
3500         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3501
3502         data.dptr = (uint8_t*)db_prio;
3503         data.dsize = sizeof(*db_prio);
3504
3505         ret = ctdb_control(ctdb, destnode, 0, 
3506                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3507                            tmp_ctx, NULL, &res, &timeout, NULL);
3508         if (ret != 0 || res != 0) {
3509                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3510                 talloc_free(tmp_ctx);
3511                 return -1;
3512         }
3513
3514         talloc_free(tmp_ctx);
3515
3516         return 0;
3517 }
3518
3519 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3520 {
3521         int ret;
3522         int32_t res;
3523         TDB_DATA data;
3524         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3525
3526         data.dptr = (uint8_t*)&db_id;
3527         data.dsize = sizeof(db_id);
3528
3529         ret = ctdb_control(ctdb, destnode, 0, 
3530                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3531                            tmp_ctx, NULL, &res, &timeout, NULL);
3532         if (ret != 0 || res < 0) {
3533                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3534                 talloc_free(tmp_ctx);
3535                 return -1;
3536         }
3537
3538         if (priority) {
3539                 *priority = res;
3540         }
3541
3542         talloc_free(tmp_ctx);
3543
3544         return 0;
3545 }
3546
3547 /* time out handler for ctdb_control */
3548 void ctdb_control_timeout_func(struct event_context *ev, struct timed_event *te, 
3549         struct timeval t, void *private_data)
3550 {
3551         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
3552
3553         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
3554                          "dstnode:%u\n", state->reqid, state->c->opcode,
3555                          state->c->hdr.destnode));
3556
3557         state->state = CTDB_CONTROL_TIMEOUT;
3558
3559         /* if we had a callback registered for this control, pull the response
3560            and call the callback.
3561         */
3562         if (state->async.fn) {
3563                 event_add_timed(state->ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
3564         }
3565 }
3566