add a libctdb version of the control to create a database
[sahlberg/ctdb.git] / client / ctdb_client.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "db_wrap.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "lib/util/dlinklist.h"
25 #include "lib/events/events.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "system/locale.h"
29 #include <stdlib.h>
30 #include "include/ctdb_protocol.h"
31 #include "include/ctdb_private.h"
32 #include "lib/util/dlinklist.h"
33
34
35
36 struct ctdb_record_handle {
37         struct ctdb_db_context *ctdb_db;
38         TDB_DATA key;
39         TDB_DATA *data;
40         struct ctdb_ltdb_header header;
41 };
42
43
44 /*
45   make a recv call to the local ctdb daemon - called from client context
46
47   This is called when the program wants to wait for a ctdb_call to complete and get the 
48   results. This call will block unless the call has already completed.
49 */
50 int ctdb_call_recv(struct ctdb_client_call_state *state, struct ctdb_call *call)
51 {
52         if (state == NULL) {
53                 return -1;
54         }
55
56         while (state->state < CTDB_CALL_DONE) {
57                 event_loop_once(state->ctdb_db->ctdb->ev);
58         }
59         if (state->state != CTDB_CALL_DONE) {
60                 DEBUG(DEBUG_ERR,(__location__ " ctdb_call_recv failed\n"));
61                 talloc_free(state);
62                 return -1;
63         }
64
65         if (state->call->reply_data.dsize) {
66                 call->reply_data.dptr = talloc_memdup(state->ctdb_db,
67                                                       state->call->reply_data.dptr,
68                                                       state->call->reply_data.dsize);
69                 call->reply_data.dsize = state->call->reply_data.dsize;
70         } else {
71                 call->reply_data.dptr = NULL;
72                 call->reply_data.dsize = 0;
73         }
74         call->status = state->call->status;
75         talloc_free(state);
76
77         return 0;
78 }
79
80
81
82
83
84
85
86 /*
87   full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
88 */
89 int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
90 {
91         struct ctdb_client_call_state *state;
92
93         state = ctdb_call_send(ctdb_db, call);
94         return ctdb_call_recv(state, call);
95 }
96
97
98
99
100
101 /*
102   cancel a ctdb_fetch_lock operation, releasing the lock
103  */
104 static int fetch_lock_destructor(struct ctdb_record_handle *h)
105 {
106         ctdb_ltdb_unlock(h->ctdb_db, h->key);
107         return 0;
108 }
109
110 /*
111   force the migration of a record to this node
112  */
113 static int ctdb_client_force_migration(struct ctdb_db_context *ctdb_db, TDB_DATA key)
114 {
115         struct ctdb_call call;
116         ZERO_STRUCT(call);
117         call.call_id = CTDB_NULL_FUNC;
118         call.key = key;
119         call.flags = CTDB_IMMEDIATE_MIGRATION;
120         return ctdb_call(ctdb_db, &call);
121 }
122
123 /*
124   get a lock on a record, and return the records data. Blocks until it gets the lock
125  */
126 struct ctdb_record_handle *ctdb_fetch_lock(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
127                                            TDB_DATA key, TDB_DATA *data)
128 {
129         int ret;
130         struct ctdb_record_handle *h;
131
132         /*
133           procedure is as follows:
134
135           1) get the chain lock. 
136           2) check if we are dmaster
137           3) if we are the dmaster then return handle 
138           4) if not dmaster then ask ctdb daemon to make us dmaster, and wait for
139              reply from ctdbd
140           5) when we get the reply, goto (1)
141          */
142
143         h = talloc_zero(mem_ctx, struct ctdb_record_handle);
144         if (h == NULL) {
145                 return NULL;
146         }
147
148         h->ctdb_db = ctdb_db;
149         h->key     = key;
150         h->key.dptr = talloc_memdup(h, key.dptr, key.dsize);
151         if (h->key.dptr == NULL) {
152                 talloc_free(h);
153                 return NULL;
154         }
155         h->data    = data;
156
157         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: key=%*.*s\n", (int)key.dsize, (int)key.dsize, 
158                  (const char *)key.dptr));
159
160 again:
161         /* step 1 - get the chain lock */
162         ret = ctdb_ltdb_lock(ctdb_db, key);
163         if (ret != 0) {
164                 DEBUG(DEBUG_ERR, (__location__ " failed to lock ltdb record\n"));
165                 talloc_free(h);
166                 return NULL;
167         }
168
169         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: got chain lock\n"));
170
171         talloc_set_destructor(h, fetch_lock_destructor);
172
173         ret = ctdb_ltdb_fetch(ctdb_db, key, &h->header, h, data);
174
175         /* when torturing, ensure we test the remote path */
176         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
177             random() % 5 == 0) {
178                 h->header.dmaster = (uint32_t)-1;
179         }
180
181
182         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: done local fetch\n"));
183
184         if (ret != 0 || h->header.dmaster != ctdb_db->ctdb->pnn) {
185                 ctdb_ltdb_unlock(ctdb_db, key);
186                 ret = ctdb_client_force_migration(ctdb_db, key);
187                 if (ret != 0) {
188                         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: force_migration failed\n"));
189                         talloc_free(h);
190                         return NULL;
191                 }
192                 goto again;
193         }
194
195         DEBUG(DEBUG_DEBUG,("ctdb_fetch_lock: we are dmaster - done\n"));
196         return h;
197 }
198
199 /*
200   store some data to the record that was locked with ctdb_fetch_lock()
201 */
202 int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
203 {
204         if (h->ctdb_db->persistent) {
205                 DEBUG(DEBUG_ERR, (__location__ " ctdb_record_store prohibited for persistent dbs\n"));
206                 return -1;
207         }
208
209         return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
210 }
211
212 /*
213   non-locking fetch of a record
214  */
215 int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx, 
216                TDB_DATA key, TDB_DATA *data)
217 {
218         struct ctdb_call call;
219         int ret;
220
221         call.call_id = CTDB_FETCH_FUNC;
222         call.call_data.dptr = NULL;
223         call.call_data.dsize = 0;
224
225         ret = ctdb_call(ctdb_db, &call);
226
227         if (ret == 0) {
228                 *data = call.reply_data;
229                 talloc_steal(mem_ctx, data->dptr);
230         }
231
232         return ret;
233 }
234
235
236
237
238
239
240
241
242 /*
243   send a ctdb control message
244   timeout specifies how long we should wait for a reply.
245   if timeout is NULL we wait indefinitely
246  */
247 int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid, 
248                  uint32_t opcode, uint32_t flags, TDB_DATA data, 
249                  TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status,
250                  struct timeval *timeout,
251                  char **errormsg)
252 {
253         struct ctdb_client_control_state *state;
254
255         state = ctdb_control_send(ctdb, destnode, srvid, opcode, 
256                         flags, data, mem_ctx,
257                         errormsg);
258         if (state != NULL && timeout && !timeval_is_zero(timeout)) {
259                 event_add_timed(ctdb->ev, state, *timeout, ctdb_control_timeout_func, state);
260         }
261
262         return ctdb_control_recv(ctdb, state, mem_ctx, outdata, status, 
263                         errormsg);
264 }
265
266
267
268
269 /*
270   a process exists call. Returns 0 if process exists, -1 otherwise
271  */
272 int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid)
273 {
274         int ret;
275         TDB_DATA data;
276         int32_t status;
277
278         data.dptr = (uint8_t*)&pid;
279         data.dsize = sizeof(pid);
280
281         ret = ctdb_control(ctdb, destnode, 0, 
282                            CTDB_CONTROL_PROCESS_EXISTS, 0, data, 
283                            NULL, NULL, &status, NULL, NULL);
284         if (ret != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for process_exists failed\n"));
286                 return -1;
287         }
288
289         return status;
290 }
291
292 /*
293   get remote statistics
294  */
295 int ctdb_ctrl_statistics(struct ctdb_context *ctdb, uint32_t destnode, struct ctdb_statistics *status)
296 {
297         int ret;
298         TDB_DATA data;
299         int32_t res;
300
301         ret = ctdb_control(ctdb, destnode, 0, 
302                            CTDB_CONTROL_STATISTICS, 0, tdb_null, 
303                            ctdb, &data, &res, NULL, NULL);
304         if (ret != 0 || res != 0) {
305                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for statistics failed\n"));
306                 return -1;
307         }
308
309         if (data.dsize != sizeof(struct ctdb_statistics)) {
310                 DEBUG(DEBUG_ERR,(__location__ " Wrong statistics size %u - expected %u\n",
311                          (unsigned)data.dsize, (unsigned)sizeof(struct ctdb_statistics)));
312                       return -1;
313         }
314
315         *status = *(struct ctdb_statistics *)data.dptr;
316         talloc_free(data.dptr);
317                         
318         return 0;
319 }
320
321 /*
322   shutdown a remote ctdb node
323  */
324 int ctdb_ctrl_shutdown(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
325 {
326         struct ctdb_client_control_state *state;
327
328         state = ctdb_control_send(ctdb, destnode, 0, 
329                            CTDB_CONTROL_SHUTDOWN, 0, tdb_null, 
330                            NULL, NULL);
331         if (state == NULL) {
332                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for shutdown failed\n"));
333                 return -1;
334         }
335         if (!timeval_is_zero(&timeout)) {
336                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
337         }
338
339         return 0;
340 }
341
342 /*
343   get vnn map from a remote node
344  */
345 int ctdb_ctrl_getvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_vnn_map **vnnmap)
346 {
347         int ret;
348         TDB_DATA outdata;
349         int32_t res;
350         struct ctdb_vnn_map_wire *map;
351
352         ret = ctdb_control(ctdb, destnode, 0, 
353                            CTDB_CONTROL_GETVNNMAP, 0, tdb_null, 
354                            mem_ctx, &outdata, &res, &timeout, NULL);
355         if (ret != 0 || res != 0) {
356                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getvnnmap failed\n"));
357                 return -1;
358         }
359         
360         map = (struct ctdb_vnn_map_wire *)outdata.dptr;
361         if (outdata.dsize < offsetof(struct ctdb_vnn_map_wire, map) ||
362             outdata.dsize != map->size*sizeof(uint32_t) + offsetof(struct ctdb_vnn_map_wire, map)) {
363                 DEBUG(DEBUG_ERR,("Bad vnn map size received in ctdb_ctrl_getvnnmap\n"));
364                 return -1;
365         }
366
367         (*vnnmap) = talloc(mem_ctx, struct ctdb_vnn_map);
368         CTDB_NO_MEMORY(ctdb, *vnnmap);
369         (*vnnmap)->generation = map->generation;
370         (*vnnmap)->size       = map->size;
371         (*vnnmap)->map        = talloc_array(*vnnmap, uint32_t, map->size);
372
373         CTDB_NO_MEMORY(ctdb, (*vnnmap)->map);
374         memcpy((*vnnmap)->map, map->map, sizeof(uint32_t)*map->size);
375         talloc_free(outdata.dptr);
376                     
377         return 0;
378 }
379
380
381 /*
382   get the recovery mode of a remote node
383  */
384 struct ctdb_client_control_state *
385 ctdb_ctrl_getrecmode_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
386 {
387         struct ctdb_client_control_state *state;
388
389         state = ctdb_control_send(ctdb, destnode, 0, 
390                            CTDB_CONTROL_GET_RECMODE, 0, tdb_null, 
391                            mem_ctx, NULL);
392
393         if (state != NULL && !timeval_is_zero(&timeout)) {
394                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
395         }
396
397         return state;
398 }
399
400 int ctdb_ctrl_getrecmode_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmode)
401 {
402         int ret;
403         int32_t res;
404
405         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
406         if (ret != 0) {
407                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmode_recv failed\n"));
408                 return -1;
409         }
410
411         if (recmode) {
412                 *recmode = (uint32_t)res;
413         }
414
415         return 0;
416 }
417
418 int ctdb_ctrl_getrecmode(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmode)
419 {
420         struct ctdb_client_control_state *state;
421
422         state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, timeout, destnode);
423         return ctdb_ctrl_getrecmode_recv(ctdb, mem_ctx, state, recmode);
424 }
425
426
427
428
429 /*
430   set the recovery mode of a remote node
431  */
432 int ctdb_ctrl_setrecmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmode)
433 {
434         int ret;
435         TDB_DATA data;
436         int32_t res;
437
438         data.dsize = sizeof(uint32_t);
439         data.dptr = (unsigned char *)&recmode;
440
441         ret = ctdb_control(ctdb, destnode, 0, 
442                            CTDB_CONTROL_SET_RECMODE, 0, data, 
443                            NULL, NULL, &res, &timeout, NULL);
444         if (ret != 0 || res != 0) {
445                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmode failed\n"));
446                 return -1;
447         }
448
449         return 0;
450 }
451
452
453
454 /*
455   get the recovery master of a remote node
456  */
457 struct ctdb_client_control_state *
458 ctdb_ctrl_getrecmaster_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, 
459                         struct timeval timeout, uint32_t destnode)
460 {
461         struct ctdb_client_control_state *state;
462
463         state = ctdb_control_send(ctdb, destnode, 0, 
464                            CTDB_CONTROL_GET_RECMASTER, 0, tdb_null, 
465                            mem_ctx, NULL);
466         if (state != NULL && !timeval_is_zero(&timeout)) {
467                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
468         }
469
470         return state;
471 }
472
473 int ctdb_ctrl_getrecmaster_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *recmaster)
474 {
475         int ret;
476         int32_t res;
477
478         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
479         if (ret != 0) {
480                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getrecmaster_recv failed\n"));
481                 return -1;
482         }
483
484         if (recmaster) {
485                 *recmaster = (uint32_t)res;
486         }
487
488         return 0;
489 }
490
491 int ctdb_ctrl_getrecmaster(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t *recmaster)
492 {
493         struct ctdb_client_control_state *state;
494
495         state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, timeout, destnode);
496         return ctdb_ctrl_getrecmaster_recv(ctdb, mem_ctx, state, recmaster);
497 }
498
499
500 /*
501   set the recovery master of a remote node
502  */
503 int ctdb_ctrl_setrecmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmaster)
504 {
505         int ret;
506         TDB_DATA data;
507         int32_t res;
508
509         ZERO_STRUCT(data);
510         data.dsize = sizeof(uint32_t);
511         data.dptr = (unsigned char *)&recmaster;
512
513         ret = ctdb_control(ctdb, destnode, 0, 
514                            CTDB_CONTROL_SET_RECMASTER, 0, data, 
515                            NULL, NULL, &res, &timeout, NULL);
516         if (ret != 0 || res != 0) {
517                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmaster failed\n"));
518                 return -1;
519         }
520
521         return 0;
522 }
523
524
525 /*
526   get a list of databases off a remote node
527  */
528 int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
529                        TALLOC_CTX *mem_ctx, struct ctdb_dbid_map **dbmap)
530 {
531         int ret;
532         TDB_DATA outdata;
533         int32_t res;
534
535         ret = ctdb_control(ctdb, destnode, 0, 
536                            CTDB_CONTROL_GET_DBMAP, 0, tdb_null, 
537                            mem_ctx, &outdata, &res, &timeout, NULL);
538         if (ret != 0 || res != 0) {
539                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getdbmap failed ret:%d res:%d\n", ret, res));
540                 return -1;
541         }
542
543         *dbmap = (struct ctdb_dbid_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
544         talloc_free(outdata.dptr);
545                     
546         return 0;
547 }
548
549 /*
550   get a list of nodes (vnn and flags ) from a remote node
551  */
552 int ctdb_ctrl_getnodemap(struct ctdb_context *ctdb, 
553                 struct timeval timeout, uint32_t destnode, 
554                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
555 {
556         int ret;
557         TDB_DATA outdata;
558         int32_t res;
559
560         ret = ctdb_control(ctdb, destnode, 0, 
561                            CTDB_CONTROL_GET_NODEMAP, 0, tdb_null, 
562                            mem_ctx, &outdata, &res, &timeout, NULL);
563         if (ret == 0 && res == -1 && outdata.dsize == 0) {
564                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed, falling back to ipv4-only control\n"));
565                 return ctdb_ctrl_getnodemapv4(ctdb, timeout, destnode, mem_ctx, nodemap);
566         }
567         if (ret != 0 || res != 0 || outdata.dsize == 0) {
568                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodes failed ret:%d res:%d\n", ret, res));
569                 return -1;
570         }
571
572         *nodemap = (struct ctdb_node_map *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
573         talloc_free(outdata.dptr);
574                     
575         return 0;
576 }
577
578 /*
579   old style ipv4-only get a list of nodes (vnn and flags ) from a remote node
580  */
581 int ctdb_ctrl_getnodemapv4(struct ctdb_context *ctdb, 
582                 struct timeval timeout, uint32_t destnode, 
583                 TALLOC_CTX *mem_ctx, struct ctdb_node_map **nodemap)
584 {
585         int ret, i, len;
586         TDB_DATA outdata;
587         struct ctdb_node_mapv4 *nodemapv4;
588         int32_t res;
589
590         ret = ctdb_control(ctdb, destnode, 0, 
591                            CTDB_CONTROL_GET_NODEMAPv4, 0, tdb_null, 
592                            mem_ctx, &outdata, &res, &timeout, NULL);
593         if (ret != 0 || res != 0 || outdata.dsize == 0) {
594                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getnodesv4 failed ret:%d res:%d\n", ret, res));
595                 return -1;
596         }
597
598         nodemapv4 = (struct ctdb_node_mapv4 *)outdata.dptr;
599
600         len = offsetof(struct ctdb_node_map, nodes) + nodemapv4->num*sizeof(struct ctdb_node_and_flags);
601         (*nodemap) = talloc_zero_size(mem_ctx, len);
602         CTDB_NO_MEMORY(ctdb, (*nodemap));
603
604         (*nodemap)->num = nodemapv4->num;
605         for (i=0; i<nodemapv4->num; i++) {
606                 (*nodemap)->nodes[i].pnn     = nodemapv4->nodes[i].pnn;
607                 (*nodemap)->nodes[i].flags   = nodemapv4->nodes[i].flags;
608                 (*nodemap)->nodes[i].addr.ip = nodemapv4->nodes[i].sin;
609                 (*nodemap)->nodes[i].addr.sa.sa_family = AF_INET;
610         }
611                 
612         talloc_free(outdata.dptr);
613                     
614         return 0;
615 }
616
617 /*
618   drop the transport, reload the nodes file and restart the transport
619  */
620 int ctdb_ctrl_reload_nodes_file(struct ctdb_context *ctdb, 
621                     struct timeval timeout, uint32_t destnode)
622 {
623         int ret;
624         int32_t res;
625
626         ret = ctdb_control(ctdb, destnode, 0, 
627                            CTDB_CONTROL_RELOAD_NODES_FILE, 0, tdb_null, 
628                            NULL, NULL, &res, &timeout, NULL);
629         if (ret != 0 || res != 0) {
630                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reloadnodesfile failed\n"));
631                 return -1;
632         }
633
634         return 0;
635 }
636
637
638 /*
639   set vnn map on a node
640  */
641 int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
642                         TALLOC_CTX *mem_ctx, struct ctdb_vnn_map *vnnmap)
643 {
644         int ret;
645         TDB_DATA data;
646         int32_t res;
647         struct ctdb_vnn_map_wire *map;
648         size_t len;
649
650         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*vnnmap->size;
651         map = talloc_size(mem_ctx, len);
652         CTDB_NO_MEMORY(ctdb, map);
653
654         map->generation = vnnmap->generation;
655         map->size = vnnmap->size;
656         memcpy(map->map, vnnmap->map, sizeof(uint32_t)*map->size);
657         
658         data.dsize = len;
659         data.dptr  = (uint8_t *)map;
660
661         ret = ctdb_control(ctdb, destnode, 0, 
662                            CTDB_CONTROL_SETVNNMAP, 0, data, 
663                            NULL, NULL, &res, &timeout, NULL);
664         if (ret != 0 || res != 0) {
665                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setvnnmap failed\n"));
666                 return -1;
667         }
668
669         talloc_free(map);
670
671         return 0;
672 }
673
674
675 /*
676   async send for pull database
677  */
678 struct ctdb_client_control_state *ctdb_ctrl_pulldb_send(
679         struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid,
680         uint32_t lmaster, TALLOC_CTX *mem_ctx, struct timeval timeout)
681 {
682         TDB_DATA indata;
683         struct ctdb_control_pulldb *pull;
684         struct ctdb_client_control_state *state;
685
686         pull = talloc(mem_ctx, struct ctdb_control_pulldb);
687         CTDB_NO_MEMORY_NULL(ctdb, pull);
688
689         pull->db_id   = dbid;
690         pull->lmaster = lmaster;
691
692         indata.dsize = sizeof(struct ctdb_control_pulldb);
693         indata.dptr  = (unsigned char *)pull;
694
695         state = ctdb_control_send(ctdb, destnode, 0, 
696                                   CTDB_CONTROL_PULL_DB, 0, indata, 
697                                   mem_ctx, NULL);
698         if (state != NULL && !timeval_is_zero(&timeout)) {
699                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
700         }
701
702         talloc_free(pull);
703
704         return state;
705 }
706
707 /*
708   async recv for pull database
709  */
710 int ctdb_ctrl_pulldb_recv(
711         struct ctdb_context *ctdb, 
712         TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, 
713         TDB_DATA *outdata)
714 {
715         int ret;
716         int32_t res;
717
718         ret = ctdb_control_recv(ctdb, state, mem_ctx, outdata, &res, NULL);
719         if ( (ret != 0) || (res != 0) ){
720                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_pulldb_recv failed\n"));
721                 return -1;
722         }
723
724         return 0;
725 }
726
727 /*
728   pull all keys and records for a specific database on a node
729  */
730 int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, 
731                 uint32_t dbid, uint32_t lmaster, 
732                 TALLOC_CTX *mem_ctx, struct timeval timeout,
733                 TDB_DATA *outdata)
734 {
735         struct ctdb_client_control_state *state;
736
737         state = ctdb_ctrl_pulldb_send(ctdb, destnode, dbid, lmaster, mem_ctx,
738                                       timeout);
739         
740         return ctdb_ctrl_pulldb_recv(ctdb, mem_ctx, state, outdata);
741 }
742
743
744 /*
745   change dmaster for all keys in the database to the new value
746  */
747 int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
748                          TALLOC_CTX *mem_ctx, uint32_t dbid, uint32_t dmaster)
749 {
750         int ret;
751         TDB_DATA indata;
752         int32_t res;
753
754         indata.dsize = 2*sizeof(uint32_t);
755         indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
756
757         ((uint32_t *)(&indata.dptr[0]))[0] = dbid;
758         ((uint32_t *)(&indata.dptr[0]))[1] = dmaster;
759
760         ret = ctdb_control(ctdb, destnode, 0, 
761                            CTDB_CONTROL_SET_DMASTER, 0, indata, 
762                            NULL, NULL, &res, &timeout, NULL);
763         if (ret != 0 || res != 0) {
764                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setdmaster failed\n"));
765                 return -1;
766         }
767
768         return 0;
769 }
770
771 /*
772   ping a node, return number of clients connected
773  */
774 int ctdb_ctrl_ping(struct ctdb_context *ctdb, uint32_t destnode)
775 {
776         int ret;
777         int32_t res;
778
779         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_PING, 0, 
780                            tdb_null, NULL, NULL, &res, NULL, NULL);
781         if (ret != 0) {
782                 return -1;
783         }
784         return res;
785 }
786
787 /*
788   find the real path to a ltdb 
789  */
790 int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
791                    const char **path)
792 {
793         int ret;
794         int32_t res;
795         TDB_DATA data;
796
797         data.dptr = (uint8_t *)&dbid;
798         data.dsize = sizeof(dbid);
799
800         ret = ctdb_control(ctdb, destnode, 0, 
801                            CTDB_CONTROL_GETDBPATH, 0, data, 
802                            mem_ctx, &data, &res, &timeout, NULL);
803         if (ret != 0 || res != 0) {
804                 return -1;
805         }
806
807         (*path) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
808         if ((*path) == NULL) {
809                 return -1;
810         }
811
812         talloc_free(data.dptr);
813
814         return 0;
815 }
816
817 /*
818   find the name of a db 
819  */
820 int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, 
821                    const char **name)
822 {
823         int ret;
824         int32_t res;
825         TDB_DATA data;
826
827         data.dptr = (uint8_t *)&dbid;
828         data.dsize = sizeof(dbid);
829
830         ret = ctdb_control(ctdb, destnode, 0, 
831                            CTDB_CONTROL_GET_DBNAME, 0, data, 
832                            mem_ctx, &data, &res, &timeout, NULL);
833         if (ret != 0 || res != 0) {
834                 return -1;
835         }
836
837         (*name) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
838         if ((*name) == NULL) {
839                 return -1;
840         }
841
842         talloc_free(data.dptr);
843
844         return 0;
845 }
846
847 /*
848   get the health status of a db
849  */
850 int ctdb_ctrl_getdbhealth(struct ctdb_context *ctdb,
851                           struct timeval timeout,
852                           uint32_t destnode,
853                           uint32_t dbid, TALLOC_CTX *mem_ctx,
854                           const char **reason)
855 {
856         int ret;
857         int32_t res;
858         TDB_DATA data;
859
860         data.dptr = (uint8_t *)&dbid;
861         data.dsize = sizeof(dbid);
862
863         ret = ctdb_control(ctdb, destnode, 0,
864                            CTDB_CONTROL_DB_GET_HEALTH, 0, data,
865                            mem_ctx, &data, &res, &timeout, NULL);
866         if (ret != 0 || res != 0) {
867                 return -1;
868         }
869
870         if (data.dsize == 0) {
871                 (*reason) = NULL;
872                 return 0;
873         }
874
875         (*reason) = talloc_strndup(mem_ctx, (const char *)data.dptr, data.dsize);
876         if ((*reason) == NULL) {
877                 return -1;
878         }
879
880         talloc_free(data.dptr);
881
882         return 0;
883 }
884
885 /*
886   create a database
887  */
888 int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
889                        TALLOC_CTX *mem_ctx, const char *name, bool persistent)
890 {
891         int ret;
892         int32_t res;
893         TDB_DATA data;
894
895         data.dptr = discard_const(name);
896         data.dsize = strlen(name)+1;
897
898         ret = ctdb_control(ctdb, destnode, 0, 
899                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH, 
900                            0, data, 
901                            mem_ctx, &data, &res, &timeout, NULL);
902
903         if (ret != 0 || res != 0) {
904                 return -1;
905         }
906
907         return 0;
908 }
909
910 /*
911   get debug level on a node
912  */
913 int ctdb_ctrl_get_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t *level)
914 {
915         int ret;
916         int32_t res;
917         TDB_DATA data;
918
919         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_DEBUG, 0, tdb_null, 
920                            ctdb, &data, &res, NULL, NULL);
921         if (ret != 0 || res != 0) {
922                 return -1;
923         }
924         if (data.dsize != sizeof(int32_t)) {
925                 DEBUG(DEBUG_ERR,("Bad control reply size in ctdb_get_debuglevel (got %u)\n",
926                          (unsigned)data.dsize));
927                 return -1;
928         }
929         *level = *(int32_t *)data.dptr;
930         talloc_free(data.dptr);
931         return 0;
932 }
933
934 /*
935   set debug level on a node
936  */
937 int ctdb_ctrl_set_debuglevel(struct ctdb_context *ctdb, uint32_t destnode, int32_t level)
938 {
939         int ret;
940         int32_t res;
941         TDB_DATA data;
942
943         data.dptr = (uint8_t *)&level;
944         data.dsize = sizeof(level);
945
946         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_DEBUG, 0, data, 
947                            NULL, NULL, &res, NULL, NULL);
948         if (ret != 0 || res != 0) {
949                 return -1;
950         }
951         return 0;
952 }
953
954
955 /*
956   get a list of connected nodes
957  */
958 uint32_t *ctdb_get_connected_nodes(struct ctdb_context *ctdb, 
959                                 struct timeval timeout,
960                                 TALLOC_CTX *mem_ctx,
961                                 uint32_t *num_nodes)
962 {
963         struct ctdb_node_map *map=NULL;
964         int ret, i;
965         uint32_t *nodes;
966
967         *num_nodes = 0;
968
969         ret = ctdb_ctrl_getnodemap(ctdb, timeout, CTDB_CURRENT_NODE, mem_ctx, &map);
970         if (ret != 0) {
971                 return NULL;
972         }
973
974         nodes = talloc_array(mem_ctx, uint32_t, map->num);
975         if (nodes == NULL) {
976                 return NULL;
977         }
978
979         for (i=0;i<map->num;i++) {
980                 if (!(map->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
981                         nodes[*num_nodes] = map->nodes[i].pnn;
982                         (*num_nodes)++;
983                 }
984         }
985
986         return nodes;
987 }
988
989
990 /*
991   reset remote status
992  */
993 int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
994 {
995         int ret;
996         int32_t res;
997
998         ret = ctdb_control(ctdb, destnode, 0, 
999                            CTDB_CONTROL_STATISTICS_RESET, 0, tdb_null, 
1000                            NULL, NULL, &res, NULL, NULL);
1001         if (ret != 0 || res != 0) {
1002                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for reset statistics failed\n"));
1003                 return -1;
1004         }
1005         return 0;
1006 }
1007
1008 /*
1009   this is the dummy null procedure that all databases support
1010 */
1011 static int ctdb_null_func(struct ctdb_call_info *call)
1012 {
1013         return 0;
1014 }
1015
1016 /*
1017   this is a plain fetch procedure that all databases support
1018 */
1019 static int ctdb_fetch_func(struct ctdb_call_info *call)
1020 {
1021         call->reply_data = &call->record_data;
1022         return 0;
1023 }
1024
1025 /*
1026   attach to a specific database - client call
1027 */
1028 struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent, uint32_t tdb_flags)
1029 {
1030         struct ctdb_db_context *ctdb_db;
1031         TDB_DATA data;
1032         int ret;
1033         int32_t res;
1034
1035         ctdb_db = ctdb_db_handle(ctdb, name);
1036         if (ctdb_db) {
1037                 return ctdb_db;
1038         }
1039
1040         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
1041         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db);
1042
1043         ctdb_db->ctdb = ctdb;
1044         ctdb_db->db_name = talloc_strdup(ctdb_db, name);
1045         CTDB_NO_MEMORY_NULL(ctdb, ctdb_db->db_name);
1046
1047         data.dptr = discard_const(name);
1048         data.dsize = strlen(name)+1;
1049
1050         /* tell ctdb daemon to attach */
1051         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, tdb_flags, 
1052                            persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
1053                            0, data, ctdb_db, &data, &res, NULL, NULL);
1054         if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
1055                 DEBUG(DEBUG_ERR,("Failed to attach to database '%s'\n", name));
1056                 talloc_free(ctdb_db);
1057                 return NULL;
1058         }
1059         
1060         ctdb_db->db_id = *(uint32_t *)data.dptr;
1061         talloc_free(data.dptr);
1062
1063         ret = ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(2, 0), CTDB_CURRENT_NODE, ctdb_db->db_id, ctdb_db, &ctdb_db->db_path);
1064         if (ret != 0) {
1065                 DEBUG(DEBUG_ERR,("Failed to get dbpath for database '%s'\n", name));
1066                 talloc_free(ctdb_db);
1067                 return NULL;
1068         }
1069
1070         tdb_flags = persistent?TDB_DEFAULT:TDB_NOSYNC;
1071         if (ctdb->valgrinding) {
1072                 tdb_flags |= TDB_NOMMAP;
1073         }
1074         tdb_flags |= TDB_DISALLOW_NESTING;
1075
1076         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 0, tdb_flags, O_RDWR, 0);
1077         if (ctdb_db->ltdb == NULL) {
1078                 ctdb_set_error(ctdb, "Failed to open tdb '%s'\n", ctdb_db->db_path);
1079                 talloc_free(ctdb_db);
1080                 return NULL;
1081         }
1082
1083         ctdb_db->persistent = persistent;
1084
1085         DLIST_ADD(ctdb->db_list, ctdb_db);
1086
1087         /* add well known functions */
1088         ctdb_set_call(ctdb_db, ctdb_null_func, CTDB_NULL_FUNC);
1089         ctdb_set_call(ctdb_db, ctdb_fetch_func, CTDB_FETCH_FUNC);
1090
1091         return ctdb_db;
1092 }
1093
1094
1095 /*
1096   setup a call for a database
1097  */
1098 int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
1099 {
1100         struct ctdb_registered_call *call;
1101
1102 #if 0
1103         TDB_DATA data;
1104         int32_t status;
1105         struct ctdb_control_set_call c;
1106         int ret;
1107
1108         /* this is no longer valid with the separate daemon architecture */
1109         c.db_id = ctdb_db->db_id;
1110         c.fn    = fn;
1111         c.id    = id;
1112
1113         data.dptr = (uint8_t *)&c;
1114         data.dsize = sizeof(c);
1115
1116         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_SET_CALL, 0,
1117                            data, NULL, NULL, &status, NULL, NULL);
1118         if (ret != 0 || status != 0) {
1119                 DEBUG(DEBUG_ERR,("ctdb_set_call failed for call %u\n", id));
1120                 return -1;
1121         }
1122 #endif
1123
1124         /* also register locally */
1125         call = talloc(ctdb_db, struct ctdb_registered_call);
1126         call->fn = fn;
1127         call->id = id;
1128
1129         DLIST_ADD(ctdb_db->calls, call);        
1130         return 0;
1131 }
1132
1133
1134 struct traverse_state {
1135         bool done;
1136         uint32_t count;
1137         ctdb_traverse_func fn;
1138         void *private_data;
1139 };
1140
1141 /*
1142   called on each key during a ctdb_traverse
1143  */
1144 static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
1145 {
1146         struct traverse_state *state = (struct traverse_state *)p;
1147         struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
1148         TDB_DATA key;
1149
1150         if (data.dsize < sizeof(uint32_t) ||
1151             d->length != data.dsize) {
1152                 DEBUG(DEBUG_ERR,("Bad data size %u in traverse_handler\n", (unsigned)data.dsize));
1153                 state->done = True;
1154                 return;
1155         }
1156
1157         key.dsize = d->keylen;
1158         key.dptr  = &d->data[0];
1159         data.dsize = d->datalen;
1160         data.dptr = &d->data[d->keylen];
1161
1162         if (key.dsize == 0 && data.dsize == 0) {
1163                 /* end of traverse */
1164                 state->done = True;
1165                 return;
1166         }
1167
1168         if (data.dsize == sizeof(struct ctdb_ltdb_header)) {
1169                 /* empty records are deleted records in ctdb */
1170                 return;
1171         }
1172
1173         if (state->fn(ctdb, key, data, state->private_data) != 0) {
1174                 state->done = True;
1175         }
1176
1177         state->count++;
1178 }
1179
1180
1181 /*
1182   start a cluster wide traverse, calling the supplied fn on each record
1183   return the number of records traversed, or -1 on error
1184  */
1185 int ctdb_traverse(struct ctdb_db_context *ctdb_db, ctdb_traverse_func fn, void *private_data)
1186 {
1187         TDB_DATA data;
1188         struct ctdb_traverse_start t;
1189         int32_t status;
1190         int ret;
1191         uint64_t srvid = (getpid() | 0xFLL<<60);
1192         struct traverse_state state;
1193
1194         state.done = False;
1195         state.count = 0;
1196         state.private_data = private_data;
1197         state.fn = fn;
1198
1199         ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, traverse_handler, &state);
1200         if (ret != 0) {
1201                 DEBUG(DEBUG_ERR,("Failed to setup traverse handler\n"));
1202                 return -1;
1203         }
1204
1205         t.db_id = ctdb_db->db_id;
1206         t.srvid = srvid;
1207         t.reqid = 0;
1208
1209         data.dptr = (uint8_t *)&t;
1210         data.dsize = sizeof(t);
1211
1212         ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
1213                            data, NULL, NULL, &status, NULL, NULL);
1214         if (ret != 0 || status != 0) {
1215                 DEBUG(DEBUG_ERR,("ctdb_traverse_all failed\n"));
1216                 ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1217                 return -1;
1218         }
1219
1220         while (!state.done) {
1221                 event_loop_once(ctdb_db->ctdb->ev);
1222         }
1223
1224         ret = ctdb_remove_message_handler(ctdb_db->ctdb, srvid, &state);
1225         if (ret != 0) {
1226                 DEBUG(DEBUG_ERR,("Failed to remove ctdb_traverse handler\n"));
1227                 return -1;
1228         }
1229
1230         return state.count;
1231 }
1232
1233 #define ISASCII(x) ((x>31)&&(x<128))
1234 /*
1235   called on each key during a catdb
1236  */
1237 int ctdb_dumpdb_record(struct ctdb_context *ctdb, TDB_DATA key, TDB_DATA data, void *p)
1238 {
1239         int i;
1240         FILE *f = (FILE *)p;
1241         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
1242
1243         fprintf(f, "key(%u) = \"", (unsigned)key.dsize);
1244         for (i=0;i<key.dsize;i++) {
1245                 if (ISASCII(key.dptr[i])) {
1246                         fprintf(f, "%c", key.dptr[i]);
1247                 } else {
1248                         fprintf(f, "\\%02X", key.dptr[i]);
1249                 }
1250         }
1251         fprintf(f, "\"\n");
1252
1253         fprintf(f, "dmaster: %u\n", h->dmaster);
1254         fprintf(f, "rsn: %llu\n", (unsigned long long)h->rsn);
1255
1256         fprintf(f, "data(%u) = \"", (unsigned)(data.dsize - sizeof(*h)));
1257         for (i=sizeof(*h);i<data.dsize;i++) {
1258                 if (ISASCII(data.dptr[i])) {
1259                         fprintf(f, "%c", data.dptr[i]);
1260                 } else {
1261                         fprintf(f, "\\%02X", data.dptr[i]);
1262                 }
1263         }
1264         fprintf(f, "\"\n");
1265
1266         fprintf(f, "\n");
1267
1268         return 0;
1269 }
1270
1271 /*
1272   convenience function to list all keys to stdout
1273  */
1274 int ctdb_dump_db(struct ctdb_db_context *ctdb_db, FILE *f)
1275 {
1276         return ctdb_traverse(ctdb_db, ctdb_dumpdb_record, f);
1277 }
1278
1279 /*
1280   get the pid of a ctdb daemon
1281  */
1282 int ctdb_ctrl_getpid(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *pid)
1283 {
1284         int ret;
1285         int32_t res;
1286
1287         ret = ctdb_control(ctdb, destnode, 0, 
1288                            CTDB_CONTROL_GET_PID, 0, tdb_null, 
1289                            NULL, NULL, &res, &timeout, NULL);
1290         if (ret != 0) {
1291                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpid failed\n"));
1292                 return -1;
1293         }
1294
1295         *pid = res;
1296
1297         return 0;
1298 }
1299
1300
1301 /*
1302   async freeze send control
1303  */
1304 struct ctdb_client_control_state *
1305 ctdb_ctrl_freeze_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, uint32_t priority)
1306 {
1307         struct ctdb_client_control_state *state;
1308
1309         state = ctdb_control_send(ctdb, destnode, priority, 
1310                            CTDB_CONTROL_FREEZE, 0, tdb_null, 
1311                            mem_ctx, NULL);
1312         if (state != NULL && !timeval_is_zero(&timeout)) {
1313                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
1314         }
1315
1316         return state;
1317 }
1318
1319 /* 
1320    async freeze recv control
1321 */
1322 int ctdb_ctrl_freeze_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state)
1323 {
1324         int ret;
1325         int32_t res;
1326
1327         ret = ctdb_control_recv(ctdb, state, mem_ctx, NULL, &res, NULL);
1328         if ( (ret != 0) || (res != 0) ){
1329                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_freeze_recv failed\n"));
1330                 return -1;
1331         }
1332
1333         return 0;
1334 }
1335
1336 /*
1337   freeze databases of a certain priority
1338  */
1339 int ctdb_ctrl_freeze_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1340 {
1341         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1342         struct ctdb_client_control_state *state;
1343         int ret;
1344
1345         state = ctdb_ctrl_freeze_send(ctdb, tmp_ctx, timeout, destnode, priority);
1346         ret = ctdb_ctrl_freeze_recv(ctdb, tmp_ctx, state);
1347         talloc_free(tmp_ctx);
1348
1349         return ret;
1350 }
1351
1352 /* Freeze all databases */
1353 int ctdb_ctrl_freeze(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1354 {
1355         int i;
1356
1357         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
1358                 if (ctdb_ctrl_freeze_priority(ctdb, timeout, destnode, i) != 0) {
1359                         return -1;
1360                 }
1361         }
1362         return 0;
1363 }
1364
1365 /*
1366   thaw databases of a certain priority
1367  */
1368 int ctdb_ctrl_thaw_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t priority)
1369 {
1370         int ret;
1371         int32_t res;
1372
1373         ret = ctdb_control(ctdb, destnode, priority, 
1374                            CTDB_CONTROL_THAW, 0, tdb_null, 
1375                            NULL, NULL, &res, &timeout, NULL);
1376         if (ret != 0 || res != 0) {
1377                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control thaw failed\n"));
1378                 return -1;
1379         }
1380
1381         return 0;
1382 }
1383
1384 /* thaw all databases */
1385 int ctdb_ctrl_thaw(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1386 {
1387         return ctdb_ctrl_thaw_priority(ctdb, timeout, destnode, 0);
1388 }
1389
1390 /*
1391   get pnn of a node, or -1
1392  */
1393 int ctdb_ctrl_getpnn(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1394 {
1395         int ret;
1396         uint32_t pnn;
1397         ctdb_handle *handle;
1398
1399         handle = ctdb_getpnn_send(ctdb, destnode, NULL, NULL);
1400         if (handle == NULL) {
1401                 DEBUG(DEBUG_ERR, (__location__ " Failed to send getpnn control\n"));
1402                 return -1;
1403         }
1404
1405         if (!timeval_is_zero(&timeout)) {
1406                 event_add_timed(ctdb->ev, handle, timeout, ctdb_control_timeout_func, handle);
1407         }
1408
1409         ret = ctdb_getpnn_recv(ctdb, handle, &pnn);
1410         if (ret != 0) {
1411                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpnn failed\n"));
1412                 return -1;
1413         }
1414
1415         return pnn;
1416 }
1417
1418 /*
1419   get the monitoring mode of a remote node
1420  */
1421 int ctdb_ctrl_getmonmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *monmode)
1422 {
1423         int ret;
1424         int32_t res;
1425
1426         ret = ctdb_control(ctdb, destnode, 0, 
1427                            CTDB_CONTROL_GET_MONMODE, 0, tdb_null, 
1428                            NULL, NULL, &res, &timeout, NULL);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getmonmode failed\n"));
1431                 return -1;
1432         }
1433
1434         *monmode = res;
1435
1436         return 0;
1437 }
1438
1439
1440 /*
1441  set the monitoring mode of a remote node to active
1442  */
1443 int ctdb_ctrl_enable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1444 {
1445         int ret;
1446         
1447
1448         ret = ctdb_control(ctdb, destnode, 0, 
1449                            CTDB_CONTROL_ENABLE_MONITOR, 0, tdb_null, 
1450                            NULL, NULL,NULL, &timeout, NULL);
1451         if (ret != 0) {
1452                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enable_monitor failed\n"));
1453                 return -1;
1454         }
1455
1456         
1457
1458         return 0;
1459 }
1460
1461 /*
1462   set the monitoring mode of a remote node to disable
1463  */
1464 int ctdb_ctrl_disable_monmode(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
1465 {
1466         int ret;
1467         
1468
1469         ret = ctdb_control(ctdb, destnode, 0, 
1470                            CTDB_CONTROL_DISABLE_MONITOR, 0, tdb_null, 
1471                            NULL, NULL, NULL, &timeout, NULL);
1472         if (ret != 0) {
1473                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disable_monitor failed\n"));
1474                 return -1;
1475         }
1476
1477         
1478
1479         return 0;
1480 }
1481
1482
1483
1484 /* 
1485   sent to a node to make it take over an ip address
1486 */
1487 int ctdb_ctrl_takeover_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1488                           uint32_t destnode, struct ctdb_public_ip *ip)
1489 {
1490         TDB_DATA data;
1491         struct ctdb_public_ipv4 ipv4;
1492         int ret;
1493         int32_t res;
1494
1495         if (ip->addr.sa.sa_family == AF_INET) {
1496                 ipv4.pnn = ip->pnn;
1497                 ipv4.sin = ip->addr.ip;
1498
1499                 data.dsize = sizeof(ipv4);
1500                 data.dptr  = (uint8_t *)&ipv4;
1501
1502                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IPv4, 0, data, NULL,
1503                            NULL, &res, &timeout, NULL);
1504         } else {
1505                 data.dsize = sizeof(*ip);
1506                 data.dptr  = (uint8_t *)ip;
1507
1508                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_TAKEOVER_IP, 0, data, NULL,
1509                            NULL, &res, &timeout, NULL);
1510         }
1511
1512         if (ret != 0 || res != 0) {
1513                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for takeover_ip failed\n"));
1514                 return -1;
1515         }
1516
1517         return 0;       
1518 }
1519
1520
1521 /* 
1522   sent to a node to make it release an ip address
1523 */
1524 int ctdb_ctrl_release_ip(struct ctdb_context *ctdb, struct timeval timeout, 
1525                          uint32_t destnode, struct ctdb_public_ip *ip)
1526 {
1527         TDB_DATA data;
1528         struct ctdb_public_ipv4 ipv4;
1529         int ret;
1530         int32_t res;
1531
1532         if (ip->addr.sa.sa_family == AF_INET) {
1533                 ipv4.pnn = ip->pnn;
1534                 ipv4.sin = ip->addr.ip;
1535
1536                 data.dsize = sizeof(ipv4);
1537                 data.dptr  = (uint8_t *)&ipv4;
1538
1539                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IPv4, 0, data, NULL,
1540                                    NULL, &res, &timeout, NULL);
1541         } else {
1542                 data.dsize = sizeof(*ip);
1543                 data.dptr  = (uint8_t *)ip;
1544
1545                 ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_RELEASE_IP, 0, data, NULL,
1546                                    NULL, &res, &timeout, NULL);
1547         }
1548
1549         if (ret != 0 || res != 0) {
1550                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for release_ip failed\n"));
1551                 return -1;
1552         }
1553
1554         return 0;       
1555 }
1556
1557
1558 /*
1559   get a tunable
1560  */
1561 int ctdb_ctrl_get_tunable(struct ctdb_context *ctdb, 
1562                           struct timeval timeout, 
1563                           uint32_t destnode,
1564                           const char *name, uint32_t *value)
1565 {
1566         struct ctdb_control_get_tunable *t;
1567         TDB_DATA data, outdata;
1568         int32_t res;
1569         int ret;
1570
1571         data.dsize = offsetof(struct ctdb_control_get_tunable, name) + strlen(name) + 1;
1572         data.dptr  = talloc_size(ctdb, data.dsize);
1573         CTDB_NO_MEMORY(ctdb, data.dptr);
1574
1575         t = (struct ctdb_control_get_tunable *)data.dptr;
1576         t->length = strlen(name)+1;
1577         memcpy(t->name, name, t->length);
1578
1579         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_TUNABLE, 0, data, ctdb,
1580                            &outdata, &res, &timeout, NULL);
1581         talloc_free(data.dptr);
1582         if (ret != 0 || res != 0) {
1583                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_tunable failed\n"));
1584                 return -1;
1585         }
1586
1587         if (outdata.dsize != sizeof(uint32_t)) {
1588                 DEBUG(DEBUG_ERR,("Invalid return data in get_tunable\n"));
1589                 talloc_free(outdata.dptr);
1590                 return -1;
1591         }
1592         
1593         *value = *(uint32_t *)outdata.dptr;
1594         talloc_free(outdata.dptr);
1595
1596         return 0;
1597 }
1598
1599 /*
1600   set a tunable
1601  */
1602 int ctdb_ctrl_set_tunable(struct ctdb_context *ctdb, 
1603                           struct timeval timeout, 
1604                           uint32_t destnode,
1605                           const char *name, uint32_t value)
1606 {
1607         struct ctdb_control_set_tunable *t;
1608         TDB_DATA data;
1609         int32_t res;
1610         int ret;
1611
1612         data.dsize = offsetof(struct ctdb_control_set_tunable, name) + strlen(name) + 1;
1613         data.dptr  = talloc_size(ctdb, data.dsize);
1614         CTDB_NO_MEMORY(ctdb, data.dptr);
1615
1616         t = (struct ctdb_control_set_tunable *)data.dptr;
1617         t->length = strlen(name)+1;
1618         memcpy(t->name, name, t->length);
1619         t->value = value;
1620
1621         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SET_TUNABLE, 0, data, NULL,
1622                            NULL, &res, &timeout, NULL);
1623         talloc_free(data.dptr);
1624         if (ret != 0 || res != 0) {
1625                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_tunable failed\n"));
1626                 return -1;
1627         }
1628
1629         return 0;
1630 }
1631
1632 /*
1633   list tunables
1634  */
1635 int ctdb_ctrl_list_tunables(struct ctdb_context *ctdb, 
1636                             struct timeval timeout, 
1637                             uint32_t destnode,
1638                             TALLOC_CTX *mem_ctx,
1639                             const char ***list, uint32_t *count)
1640 {
1641         TDB_DATA outdata;
1642         int32_t res;
1643         int ret;
1644         struct ctdb_control_list_tunable *t;
1645         char *p, *s, *ptr;
1646
1647         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_LIST_TUNABLES, 0, tdb_null, 
1648                            mem_ctx, &outdata, &res, &timeout, NULL);
1649         if (ret != 0 || res != 0) {
1650                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for list_tunables failed\n"));
1651                 return -1;
1652         }
1653
1654         t = (struct ctdb_control_list_tunable *)outdata.dptr;
1655         if (outdata.dsize < offsetof(struct ctdb_control_list_tunable, data) ||
1656             t->length > outdata.dsize-offsetof(struct ctdb_control_list_tunable, data)) {
1657                 DEBUG(DEBUG_ERR,("Invalid data in list_tunables reply\n"));
1658                 talloc_free(outdata.dptr);
1659                 return -1;              
1660         }
1661         
1662         p = talloc_strndup(mem_ctx, (char *)t->data, t->length);
1663         CTDB_NO_MEMORY(ctdb, p);
1664
1665         talloc_free(outdata.dptr);
1666         
1667         (*list) = NULL;
1668         (*count) = 0;
1669
1670         for (s=strtok_r(p, ":", &ptr); s; s=strtok_r(NULL, ":", &ptr)) {
1671                 (*list) = talloc_realloc(mem_ctx, *list, const char *, 1+(*count));
1672                 CTDB_NO_MEMORY(ctdb, *list);
1673                 (*list)[*count] = talloc_strdup(*list, s);
1674                 CTDB_NO_MEMORY(ctdb, (*list)[*count]);
1675                 (*count)++;
1676         }
1677
1678         talloc_free(p);
1679
1680         return 0;
1681 }
1682
1683
1684 int ctdb_ctrl_get_public_ips_flags(struct ctdb_context *ctdb,
1685                                    struct timeval timeout, uint32_t destnode,
1686                                    TALLOC_CTX *mem_ctx,
1687                                    uint32_t flags,
1688                                    struct ctdb_all_public_ips **ips)
1689 {
1690         int ret;
1691         TDB_DATA outdata;
1692         int32_t res;
1693
1694         ret = ctdb_control(ctdb, destnode, 0, 
1695                            CTDB_CONTROL_GET_PUBLIC_IPS, flags, tdb_null,
1696                            mem_ctx, &outdata, &res, &timeout, NULL);
1697         if (ret == 0 && res == -1) {
1698                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control to get public ips failed, falling back to ipv4-only version\n"));
1699                 return ctdb_ctrl_get_public_ipsv4(ctdb, timeout, destnode, mem_ctx, ips);
1700         }
1701         if (ret != 0 || res != 0) {
1702           DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed ret:%d res:%d\n", ret, res));
1703                 return -1;
1704         }
1705
1706         *ips = (struct ctdb_all_public_ips *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
1707         talloc_free(outdata.dptr);
1708                     
1709         return 0;
1710 }
1711
1712 int ctdb_ctrl_get_public_ips(struct ctdb_context *ctdb,
1713                              struct timeval timeout, uint32_t destnode,
1714                              TALLOC_CTX *mem_ctx,
1715                              struct ctdb_all_public_ips **ips)
1716 {
1717         return ctdb_ctrl_get_public_ips_flags(ctdb, timeout,
1718                                               destnode, mem_ctx,
1719                                               0, ips);
1720 }
1721
1722 int ctdb_ctrl_get_public_ipsv4(struct ctdb_context *ctdb, 
1723                         struct timeval timeout, uint32_t destnode, 
1724                         TALLOC_CTX *mem_ctx, struct ctdb_all_public_ips **ips)
1725 {
1726         int ret, i, len;
1727         TDB_DATA outdata;
1728         int32_t res;
1729         struct ctdb_all_public_ipsv4 *ipsv4;
1730
1731         ret = ctdb_control(ctdb, destnode, 0, 
1732                            CTDB_CONTROL_GET_PUBLIC_IPSv4, 0, tdb_null, 
1733                            mem_ctx, &outdata, &res, &timeout, NULL);
1734         if (ret != 0 || res != 0) {
1735                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getpublicips failed\n"));
1736                 return -1;
1737         }
1738
1739         ipsv4 = (struct ctdb_all_public_ipsv4 *)outdata.dptr;
1740         len = offsetof(struct ctdb_all_public_ips, ips) +
1741                 ipsv4->num*sizeof(struct ctdb_public_ip);
1742         *ips = talloc_zero_size(mem_ctx, len);
1743         CTDB_NO_MEMORY(ctdb, *ips);
1744         (*ips)->num = ipsv4->num;
1745         for (i=0; i<ipsv4->num; i++) {
1746                 (*ips)->ips[i].pnn     = ipsv4->ips[i].pnn;
1747                 (*ips)->ips[i].addr.ip = ipsv4->ips[i].sin;
1748         }
1749
1750         talloc_free(outdata.dptr);
1751                     
1752         return 0;
1753 }
1754
1755 int ctdb_ctrl_get_public_ip_info(struct ctdb_context *ctdb,
1756                                  struct timeval timeout, uint32_t destnode,
1757                                  TALLOC_CTX *mem_ctx,
1758                                  const ctdb_sock_addr *addr,
1759                                  struct ctdb_control_public_ip_info **_info)
1760 {
1761         int ret;
1762         TDB_DATA indata;
1763         TDB_DATA outdata;
1764         int32_t res;
1765         struct ctdb_control_public_ip_info *info;
1766         uint32_t len;
1767         uint32_t i;
1768
1769         indata.dptr = discard_const_p(uint8_t, addr);
1770         indata.dsize = sizeof(*addr);
1771
1772         ret = ctdb_control(ctdb, destnode, 0,
1773                            CTDB_CONTROL_GET_PUBLIC_IP_INFO, 0, indata,
1774                            mem_ctx, &outdata, &res, &timeout, NULL);
1775         if (ret != 0 || res != 0) {
1776                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1777                                 "failed ret:%d res:%d\n",
1778                                 ret, res));
1779                 return -1;
1780         }
1781
1782         len = offsetof(struct ctdb_control_public_ip_info, ifaces);
1783         if (len > outdata.dsize) {
1784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1785                                 "returned invalid data with size %u > %u\n",
1786                                 (unsigned int)outdata.dsize,
1787                                 (unsigned int)len));
1788                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1789                 return -1;
1790         }
1791
1792         info = (struct ctdb_control_public_ip_info *)outdata.dptr;
1793         len += info->num*sizeof(struct ctdb_control_iface_info);
1794
1795         if (len > outdata.dsize) {
1796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1797                                 "returned invalid data with size %u > %u\n",
1798                                 (unsigned int)outdata.dsize,
1799                                 (unsigned int)len));
1800                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1801                 return -1;
1802         }
1803
1804         /* make sure we null terminate the returned strings */
1805         for (i=0; i < info->num; i++) {
1806                 info->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1807         }
1808
1809         *_info = (struct ctdb_control_public_ip_info *)talloc_memdup(mem_ctx,
1810                                                                 outdata.dptr,
1811                                                                 outdata.dsize);
1812         talloc_free(outdata.dptr);
1813         if (*_info == NULL) {
1814                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get public ip info "
1815                                 "talloc_memdup size %u failed\n",
1816                                 (unsigned int)outdata.dsize));
1817                 return -1;
1818         }
1819
1820         return 0;
1821 }
1822
1823 int ctdb_ctrl_get_ifaces(struct ctdb_context *ctdb,
1824                          struct timeval timeout, uint32_t destnode,
1825                          TALLOC_CTX *mem_ctx,
1826                          struct ctdb_control_get_ifaces **_ifaces)
1827 {
1828         int ret;
1829         TDB_DATA outdata;
1830         int32_t res;
1831         struct ctdb_control_get_ifaces *ifaces;
1832         uint32_t len;
1833         uint32_t i;
1834
1835         ret = ctdb_control(ctdb, destnode, 0,
1836                            CTDB_CONTROL_GET_IFACES, 0, tdb_null,
1837                            mem_ctx, &outdata, &res, &timeout, NULL);
1838         if (ret != 0 || res != 0) {
1839                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1840                                 "failed ret:%d res:%d\n",
1841                                 ret, res));
1842                 return -1;
1843         }
1844
1845         len = offsetof(struct ctdb_control_get_ifaces, ifaces);
1846         if (len > outdata.dsize) {
1847                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1848                                 "returned invalid data with size %u > %u\n",
1849                                 (unsigned int)outdata.dsize,
1850                                 (unsigned int)len));
1851                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1852                 return -1;
1853         }
1854
1855         ifaces = (struct ctdb_control_get_ifaces *)outdata.dptr;
1856         len += ifaces->num*sizeof(struct ctdb_control_iface_info);
1857
1858         if (len > outdata.dsize) {
1859                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1860                                 "returned invalid data with size %u > %u\n",
1861                                 (unsigned int)outdata.dsize,
1862                                 (unsigned int)len));
1863                 dump_data(DEBUG_DEBUG, outdata.dptr, outdata.dsize);
1864                 return -1;
1865         }
1866
1867         /* make sure we null terminate the returned strings */
1868         for (i=0; i < ifaces->num; i++) {
1869                 ifaces->ifaces[i].name[CTDB_IFACE_SIZE] = '\0';
1870         }
1871
1872         *_ifaces = (struct ctdb_control_get_ifaces *)talloc_memdup(mem_ctx,
1873                                                                   outdata.dptr,
1874                                                                   outdata.dsize);
1875         talloc_free(outdata.dptr);
1876         if (*_ifaces == NULL) {
1877                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get ifaces "
1878                                 "talloc_memdup size %u failed\n",
1879                                 (unsigned int)outdata.dsize));
1880                 return -1;
1881         }
1882
1883         return 0;
1884 }
1885
1886 int ctdb_ctrl_set_iface_link(struct ctdb_context *ctdb,
1887                              struct timeval timeout, uint32_t destnode,
1888                              TALLOC_CTX *mem_ctx,
1889                              const struct ctdb_control_iface_info *info)
1890 {
1891         int ret;
1892         TDB_DATA indata;
1893         int32_t res;
1894
1895         indata.dptr = discard_const_p(uint8_t, info);
1896         indata.dsize = sizeof(*info);
1897
1898         ret = ctdb_control(ctdb, destnode, 0,
1899                            CTDB_CONTROL_SET_IFACE_LINK_STATE, 0, indata,
1900                            mem_ctx, NULL, &res, &timeout, NULL);
1901         if (ret != 0 || res != 0) {
1902                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set iface link "
1903                                 "failed ret:%d res:%d\n",
1904                                 ret, res));
1905                 return -1;
1906         }
1907
1908         return 0;
1909 }
1910
1911 /*
1912   set/clear the permanent disabled bit on a remote node
1913  */
1914 int ctdb_ctrl_modflags(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, 
1915                        uint32_t set, uint32_t clear)
1916 {
1917         int ret;
1918         TDB_DATA data;
1919         struct ctdb_node_map *nodemap=NULL;
1920         struct ctdb_node_flag_change c;
1921         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1922         uint32_t recmaster;
1923         uint32_t *nodes;
1924
1925
1926         /* find the recovery master */
1927         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, timeout, CTDB_CURRENT_NODE, &recmaster);
1928         if (ret != 0) {
1929                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
1930                 talloc_free(tmp_ctx);
1931                 return ret;
1932         }
1933
1934
1935         /* read the node flags from the recmaster */
1936         ret = ctdb_ctrl_getnodemap(ctdb, timeout, recmaster, tmp_ctx, &nodemap);
1937         if (ret != 0) {
1938                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", destnode));
1939                 talloc_free(tmp_ctx);
1940                 return -1;
1941         }
1942         if (destnode >= nodemap->num) {
1943                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", destnode));
1944                 talloc_free(tmp_ctx);
1945                 return -1;
1946         }
1947
1948         c.pnn       = destnode;
1949         c.old_flags = nodemap->nodes[destnode].flags;
1950         c.new_flags = c.old_flags;
1951         c.new_flags |= set;
1952         c.new_flags &= ~clear;
1953
1954         data.dsize = sizeof(c);
1955         data.dptr = (unsigned char *)&c;
1956
1957         /* send the flags update to all connected nodes */
1958         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
1959
1960         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
1961                                         nodes, 0,
1962                                         timeout, false, data,
1963                                         NULL, NULL,
1964                                         NULL) != 0) {
1965                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1966
1967                 talloc_free(tmp_ctx);
1968                 return -1;
1969         }
1970
1971         talloc_free(tmp_ctx);
1972         return 0;
1973 }
1974
1975
1976 /*
1977   get all tunables
1978  */
1979 int ctdb_ctrl_get_all_tunables(struct ctdb_context *ctdb, 
1980                                struct timeval timeout, 
1981                                uint32_t destnode,
1982                                struct ctdb_tunable *tunables)
1983 {
1984         TDB_DATA outdata;
1985         int ret;
1986         int32_t res;
1987
1988         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_GET_ALL_TUNABLES, 0, tdb_null, ctdb,
1989                            &outdata, &res, &timeout, NULL);
1990         if (ret != 0 || res != 0) {
1991                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get all tunables failed\n"));
1992                 return -1;
1993         }
1994
1995         if (outdata.dsize != sizeof(*tunables)) {
1996                 DEBUG(DEBUG_ERR,(__location__ " bad data size %u in ctdb_ctrl_get_all_tunables should be %u\n",
1997                          (unsigned)outdata.dsize, (unsigned)sizeof(*tunables)));
1998                 return -1;              
1999         }
2000
2001         *tunables = *(struct ctdb_tunable *)outdata.dptr;
2002         talloc_free(outdata.dptr);
2003         return 0;
2004 }
2005
2006 /*
2007   add a public address to a node
2008  */
2009 int ctdb_ctrl_add_public_ip(struct ctdb_context *ctdb, 
2010                       struct timeval timeout, 
2011                       uint32_t destnode,
2012                       struct ctdb_control_ip_iface *pub)
2013 {
2014         TDB_DATA data;
2015         int32_t res;
2016         int ret;
2017
2018         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2019         data.dptr  = (unsigned char *)pub;
2020
2021         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_ADD_PUBLIC_IP, 0, data, NULL,
2022                            NULL, &res, &timeout, NULL);
2023         if (ret != 0 || res != 0) {
2024                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for add_public_ip failed\n"));
2025                 return -1;
2026         }
2027
2028         return 0;
2029 }
2030
2031 /*
2032   delete a public address from a node
2033  */
2034 int ctdb_ctrl_del_public_ip(struct ctdb_context *ctdb, 
2035                       struct timeval timeout, 
2036                       uint32_t destnode,
2037                       struct ctdb_control_ip_iface *pub)
2038 {
2039         TDB_DATA data;
2040         int32_t res;
2041         int ret;
2042
2043         data.dsize = offsetof(struct ctdb_control_ip_iface, iface) + pub->len;
2044         data.dptr  = (unsigned char *)pub;
2045
2046         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_DEL_PUBLIC_IP, 0, data, NULL,
2047                            NULL, &res, &timeout, NULL);
2048         if (ret != 0 || res != 0) {
2049                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for del_public_ip failed\n"));
2050                 return -1;
2051         }
2052
2053         return 0;
2054 }
2055
2056 /*
2057   kill a tcp connection
2058  */
2059 int ctdb_ctrl_killtcp(struct ctdb_context *ctdb, 
2060                       struct timeval timeout, 
2061                       uint32_t destnode,
2062                       struct ctdb_control_killtcp *killtcp)
2063 {
2064         TDB_DATA data;
2065         int32_t res;
2066         int ret;
2067
2068         data.dsize = sizeof(struct ctdb_control_killtcp);
2069         data.dptr  = (unsigned char *)killtcp;
2070
2071         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_KILL_TCP, 0, data, NULL,
2072                            NULL, &res, &timeout, NULL);
2073         if (ret != 0 || res != 0) {
2074                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for killtcp failed\n"));
2075                 return -1;
2076         }
2077
2078         return 0;
2079 }
2080
2081 /*
2082   send a gratious arp
2083  */
2084 int ctdb_ctrl_gratious_arp(struct ctdb_context *ctdb, 
2085                       struct timeval timeout, 
2086                       uint32_t destnode,
2087                       ctdb_sock_addr *addr,
2088                       const char *ifname)
2089 {
2090         TDB_DATA data;
2091         int32_t res;
2092         int ret, len;
2093         struct ctdb_control_gratious_arp *gratious_arp;
2094         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2095
2096
2097         len = strlen(ifname)+1;
2098         gratious_arp = talloc_size(tmp_ctx, 
2099                 offsetof(struct ctdb_control_gratious_arp, iface) + len);
2100         CTDB_NO_MEMORY(ctdb, gratious_arp);
2101
2102         gratious_arp->addr = *addr;
2103         gratious_arp->len = len;
2104         memcpy(&gratious_arp->iface[0], ifname, len);
2105
2106
2107         data.dsize = offsetof(struct ctdb_control_gratious_arp, iface) + len;
2108         data.dptr  = (unsigned char *)gratious_arp;
2109
2110         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_SEND_GRATIOUS_ARP, 0, data, NULL,
2111                            NULL, &res, &timeout, NULL);
2112         if (ret != 0 || res != 0) {
2113                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for gratious_arp failed\n"));
2114                 talloc_free(tmp_ctx);
2115                 return -1;
2116         }
2117
2118         talloc_free(tmp_ctx);
2119         return 0;
2120 }
2121
2122 /*
2123   get a list of all tcp tickles that a node knows about for a particular vnn
2124  */
2125 int ctdb_ctrl_get_tcp_tickles(struct ctdb_context *ctdb, 
2126                               struct timeval timeout, uint32_t destnode, 
2127                               TALLOC_CTX *mem_ctx, 
2128                               ctdb_sock_addr *addr,
2129                               struct ctdb_control_tcp_tickle_list **list)
2130 {
2131         int ret;
2132         TDB_DATA data, outdata;
2133         int32_t status;
2134
2135         data.dptr = (uint8_t*)addr;
2136         data.dsize = sizeof(ctdb_sock_addr);
2137
2138         ret = ctdb_control(ctdb, destnode, 0, 
2139                            CTDB_CONTROL_GET_TCP_TICKLE_LIST, 0, data, 
2140                            mem_ctx, &outdata, &status, NULL, NULL);
2141         if (ret != 0 || status != 0) {
2142                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get tcp tickles failed\n"));
2143                 return -1;
2144         }
2145
2146         *list = (struct ctdb_control_tcp_tickle_list *)outdata.dptr;
2147
2148         return status;
2149 }
2150
2151 /*
2152   register a server id
2153  */
2154 int ctdb_ctrl_register_server_id(struct ctdb_context *ctdb, 
2155                       struct timeval timeout, 
2156                       struct ctdb_server_id *id)
2157 {
2158         TDB_DATA data;
2159         int32_t res;
2160         int ret;
2161
2162         data.dsize = sizeof(struct ctdb_server_id);
2163         data.dptr  = (unsigned char *)id;
2164
2165         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2166                         CTDB_CONTROL_REGISTER_SERVER_ID, 
2167                         0, data, NULL,
2168                         NULL, &res, &timeout, NULL);
2169         if (ret != 0 || res != 0) {
2170                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for register server id failed\n"));
2171                 return -1;
2172         }
2173
2174         return 0;
2175 }
2176
2177 /*
2178   unregister a server id
2179  */
2180 int ctdb_ctrl_unregister_server_id(struct ctdb_context *ctdb, 
2181                       struct timeval timeout, 
2182                       struct ctdb_server_id *id)
2183 {
2184         TDB_DATA data;
2185         int32_t res;
2186         int ret;
2187
2188         data.dsize = sizeof(struct ctdb_server_id);
2189         data.dptr  = (unsigned char *)id;
2190
2191         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, 
2192                         CTDB_CONTROL_UNREGISTER_SERVER_ID, 
2193                         0, data, NULL,
2194                         NULL, &res, &timeout, NULL);
2195         if (ret != 0 || res != 0) {
2196                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for unregister server id failed\n"));
2197                 return -1;
2198         }
2199
2200         return 0;
2201 }
2202
2203
2204 /*
2205   check if a server id exists
2206
2207   if a server id does exist, return *status == 1, otherwise *status == 0
2208  */
2209 int ctdb_ctrl_check_server_id(struct ctdb_context *ctdb, 
2210                       struct timeval timeout, 
2211                       uint32_t destnode,
2212                       struct ctdb_server_id *id,
2213                       uint32_t *status)
2214 {
2215         TDB_DATA data;
2216         int32_t res;
2217         int ret;
2218
2219         data.dsize = sizeof(struct ctdb_server_id);
2220         data.dptr  = (unsigned char *)id;
2221
2222         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CHECK_SERVER_ID, 
2223                         0, data, NULL,
2224                         NULL, &res, &timeout, NULL);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for check server id failed\n"));
2227                 return -1;
2228         }
2229
2230         if (res) {
2231                 *status = 1;
2232         } else {
2233                 *status = 0;
2234         }
2235
2236         return 0;
2237 }
2238
2239 /*
2240    get the list of server ids that are registered on a node
2241 */
2242 int ctdb_ctrl_get_server_id_list(struct ctdb_context *ctdb,
2243                 TALLOC_CTX *mem_ctx,
2244                 struct timeval timeout, uint32_t destnode, 
2245                 struct ctdb_server_id_list **svid_list)
2246 {
2247         int ret;
2248         TDB_DATA outdata;
2249         int32_t res;
2250
2251         ret = ctdb_control(ctdb, destnode, 0, 
2252                            CTDB_CONTROL_GET_SERVER_ID_LIST, 0, tdb_null, 
2253                            mem_ctx, &outdata, &res, &timeout, NULL);
2254         if (ret != 0 || res != 0) {
2255                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for get_server_id_list failed\n"));
2256                 return -1;
2257         }
2258
2259         *svid_list = (struct ctdb_server_id_list *)talloc_steal(mem_ctx, outdata.dptr);
2260                     
2261         return 0;
2262 }
2263
2264 /*
2265   set some ctdb flags
2266 */
2267 void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
2268 {
2269         ctdb->flags |= flags;
2270 }
2271
2272
2273 /*
2274   return the pnn of this node
2275 */
2276 uint32_t ctdb_get_pnn(struct ctdb_context *ctdb)
2277 {
2278         return ctdb->pnn;
2279 }
2280
2281
2282 /*
2283   get the uptime of a remote node
2284  */
2285 struct ctdb_client_control_state *
2286 ctdb_ctrl_uptime_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2287 {
2288         struct ctdb_client_control_state *state;
2289
2290         state = ctdb_control_send(ctdb, destnode, 0, 
2291                            CTDB_CONTROL_UPTIME, 0, tdb_null, 
2292                            mem_ctx, NULL);
2293         if (state != NULL && !timeval_is_zero(&timeout)) {
2294                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2295         }
2296
2297         return state;
2298 }
2299
2300 int ctdb_ctrl_uptime_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, struct ctdb_uptime **uptime)
2301 {
2302         int ret;
2303         int32_t res;
2304         TDB_DATA outdata;
2305
2306         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2307         if (ret != 0 || res != 0) {
2308                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_uptime_recv failed\n"));
2309                 return -1;
2310         }
2311
2312         *uptime = (struct ctdb_uptime *)talloc_steal(mem_ctx, outdata.dptr);
2313
2314         return 0;
2315 }
2316
2317 int ctdb_ctrl_uptime(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode, struct ctdb_uptime **uptime)
2318 {
2319         struct ctdb_client_control_state *state;
2320
2321         state = ctdb_ctrl_uptime_send(ctdb, mem_ctx, timeout, destnode);
2322         return ctdb_ctrl_uptime_recv(ctdb, mem_ctx, state, uptime);
2323 }
2324
2325 /*
2326   send a control to execute the "recovered" event script on a node
2327  */
2328 int ctdb_ctrl_end_recovery(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
2329 {
2330         int ret;
2331         int32_t status;
2332
2333         ret = ctdb_control(ctdb, destnode, 0, 
2334                            CTDB_CONTROL_END_RECOVERY, 0, tdb_null, 
2335                            NULL, NULL, &status, &timeout, NULL);
2336         if (ret != 0 || status != 0) {
2337                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for end_recovery failed\n"));
2338                 return -1;
2339         }
2340
2341         return 0;
2342 }
2343
2344 /* 
2345   callback for the async helpers used when sending the same control
2346   to multiple nodes in parallell.
2347 */
2348 static void async_callback(struct ctdb_client_control_state *state)
2349 {
2350         struct client_async_data *data = talloc_get_type(state->async.private_data, struct client_async_data);
2351         struct ctdb_context *ctdb = talloc_get_type(state->ctdb, struct ctdb_context);
2352         int ret;
2353         TDB_DATA outdata;
2354         int32_t res;
2355         uint32_t destnode = state->c->hdr.destnode;
2356
2357         /* one more node has responded with recmode data */
2358         data->count--;
2359
2360         /* if we failed to push the db, then return an error and let
2361            the main loop try again.
2362         */
2363         if (state->state != CTDB_CONTROL_DONE) {
2364                 if ( !data->dont_log_errors) {
2365                         DEBUG(DEBUG_ERR,("Async operation failed with state %d, opcode:%u\n", state->state, data->opcode));
2366                 }
2367                 data->fail_count++;
2368                 if (data->fail_callback) {
2369                         data->fail_callback(ctdb, destnode, res, outdata,
2370                                         data->callback_data);
2371                 }
2372                 return;
2373         }
2374         
2375         state->async.fn = NULL;
2376
2377         ret = ctdb_control_recv(ctdb, state, data, &outdata, &res, NULL);
2378         if ((ret != 0) || (res != 0)) {
2379                 if ( !data->dont_log_errors) {
2380                         DEBUG(DEBUG_ERR,("Async operation failed with ret=%d res=%d opcode=%u\n", ret, (int)res, data->opcode));
2381                 }
2382                 data->fail_count++;
2383                 if (data->fail_callback) {
2384                         data->fail_callback(ctdb, destnode, res, outdata,
2385                                         data->callback_data);
2386                 }
2387         }
2388         if ((ret == 0) && (data->callback != NULL)) {
2389                 data->callback(ctdb, destnode, res, outdata,
2390                                         data->callback_data);
2391         }
2392 }
2393
2394
2395 void ctdb_client_async_add(struct client_async_data *data, struct ctdb_client_control_state *state)
2396 {
2397         /* set up the callback functions */
2398         state->async.fn = async_callback;
2399         state->async.private_data = data;
2400         
2401         /* one more control to wait for to complete */
2402         data->count++;
2403 }
2404
2405
2406 /* wait for up to the maximum number of seconds allowed
2407    or until all nodes we expect a response from has replied
2408 */
2409 int ctdb_client_async_wait(struct ctdb_context *ctdb, struct client_async_data *data)
2410 {
2411         while (data->count > 0) {
2412                 event_loop_once(ctdb->ev);
2413         }
2414         if (data->fail_count != 0) {
2415                 if (!data->dont_log_errors) {
2416                         DEBUG(DEBUG_ERR,("Async wait failed - fail_count=%u\n", 
2417                                  data->fail_count));
2418                 }
2419                 return -1;
2420         }
2421         return 0;
2422 }
2423
2424
2425 /* 
2426    perform a simple control on the listed nodes
2427    The control cannot return data
2428  */
2429 int ctdb_client_async_control(struct ctdb_context *ctdb,
2430                                 enum ctdb_controls opcode,
2431                                 uint32_t *nodes,
2432                                 uint64_t srvid,
2433                                 struct timeval timeout,
2434                                 bool dont_log_errors,
2435                                 TDB_DATA data,
2436                                 client_async_callback client_callback,
2437                                 client_async_callback fail_callback,
2438                                 void *callback_data)
2439 {
2440         struct client_async_data *async_data;
2441         struct ctdb_client_control_state *state;
2442         int j, num_nodes;
2443
2444         async_data = talloc_zero(ctdb, struct client_async_data);
2445         CTDB_NO_MEMORY_FATAL(ctdb, async_data);
2446         async_data->dont_log_errors = dont_log_errors;
2447         async_data->callback = client_callback;
2448         async_data->fail_callback = fail_callback;
2449         async_data->callback_data = callback_data;
2450         async_data->opcode        = opcode;
2451
2452         num_nodes = talloc_get_size(nodes) / sizeof(uint32_t);
2453
2454         /* loop over all nodes and send an async control to each of them */
2455         for (j=0; j<num_nodes; j++) {
2456                 uint32_t pnn = nodes[j];
2457
2458                 state = ctdb_control_send(ctdb, pnn, srvid, opcode, 
2459                                           0, data, async_data, NULL);
2460                 if (state == NULL) {
2461                         DEBUG(DEBUG_ERR,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
2462                         talloc_free(async_data);
2463                         return -1;
2464                 }
2465                 if (!timeval_is_zero(&timeout)) {
2466                         event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2467                 }
2468                 
2469                 ctdb_client_async_add(async_data, state);
2470         }
2471
2472         if (ctdb_client_async_wait(ctdb, async_data) != 0) {
2473                 talloc_free(async_data);
2474                 return -1;
2475         }
2476
2477         talloc_free(async_data);
2478         return 0;
2479 }
2480
2481 uint32_t *list_of_vnnmap_nodes(struct ctdb_context *ctdb,
2482                                 struct ctdb_vnn_map *vnn_map,
2483                                 TALLOC_CTX *mem_ctx,
2484                                 bool include_self)
2485 {
2486         int i, j, num_nodes;
2487         uint32_t *nodes;
2488
2489         for (i=num_nodes=0;i<vnn_map->size;i++) {
2490                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2491                         continue;
2492                 }
2493                 num_nodes++;
2494         } 
2495
2496         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2497         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2498
2499         for (i=j=0;i<vnn_map->size;i++) {
2500                 if (vnn_map->map[i] == ctdb->pnn && !include_self) {
2501                         continue;
2502                 }
2503                 nodes[j++] = vnn_map->map[i];
2504         } 
2505
2506         return nodes;
2507 }
2508
2509 uint32_t *list_of_active_nodes(struct ctdb_context *ctdb,
2510                                 struct ctdb_node_map *node_map,
2511                                 TALLOC_CTX *mem_ctx,
2512                                 bool include_self)
2513 {
2514         int i, j, num_nodes;
2515         uint32_t *nodes;
2516
2517         for (i=num_nodes=0;i<node_map->num;i++) {
2518                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2519                         continue;
2520                 }
2521                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2522                         continue;
2523                 }
2524                 num_nodes++;
2525         } 
2526
2527         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2528         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2529
2530         for (i=j=0;i<node_map->num;i++) {
2531                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2532                         continue;
2533                 }
2534                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2535                         continue;
2536                 }
2537                 nodes[j++] = node_map->nodes[i].pnn;
2538         } 
2539
2540         return nodes;
2541 }
2542
2543 uint32_t *list_of_active_nodes_except_pnn(struct ctdb_context *ctdb,
2544                                 struct ctdb_node_map *node_map,
2545                                 TALLOC_CTX *mem_ctx,
2546                                 uint32_t pnn)
2547 {
2548         int i, j, num_nodes;
2549         uint32_t *nodes;
2550
2551         for (i=num_nodes=0;i<node_map->num;i++) {
2552                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2553                         continue;
2554                 }
2555                 if (node_map->nodes[i].pnn == pnn) {
2556                         continue;
2557                 }
2558                 num_nodes++;
2559         } 
2560
2561         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2562         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2563
2564         for (i=j=0;i<node_map->num;i++) {
2565                 if (node_map->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2566                         continue;
2567                 }
2568                 if (node_map->nodes[i].pnn == pnn) {
2569                         continue;
2570                 }
2571                 nodes[j++] = node_map->nodes[i].pnn;
2572         } 
2573
2574         return nodes;
2575 }
2576
2577 uint32_t *list_of_connected_nodes(struct ctdb_context *ctdb,
2578                                 struct ctdb_node_map *node_map,
2579                                 TALLOC_CTX *mem_ctx,
2580                                 bool include_self)
2581 {
2582         int i, j, num_nodes;
2583         uint32_t *nodes;
2584
2585         for (i=num_nodes=0;i<node_map->num;i++) {
2586                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2587                         continue;
2588                 }
2589                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2590                         continue;
2591                 }
2592                 num_nodes++;
2593         } 
2594
2595         nodes = talloc_array(mem_ctx, uint32_t, num_nodes);
2596         CTDB_NO_MEMORY_FATAL(ctdb, nodes);
2597
2598         for (i=j=0;i<node_map->num;i++) {
2599                 if (node_map->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2600                         continue;
2601                 }
2602                 if (node_map->nodes[i].pnn == ctdb->pnn && !include_self) {
2603                         continue;
2604                 }
2605                 nodes[j++] = node_map->nodes[i].pnn;
2606         } 
2607
2608         return nodes;
2609 }
2610
2611 /* 
2612   this is used to test if a pnn lock exists and if it exists will return
2613   the number of connections that pnn has reported or -1 if that recovery
2614   daemon is not running.
2615 */
2616 int
2617 ctdb_read_pnn_lock(int fd, int32_t pnn)
2618 {
2619         struct flock lock;
2620         char c;
2621
2622         lock.l_type = F_WRLCK;
2623         lock.l_whence = SEEK_SET;
2624         lock.l_start = pnn;
2625         lock.l_len = 1;
2626         lock.l_pid = 0;
2627
2628         if (fcntl(fd, F_GETLK, &lock) != 0) {
2629                 DEBUG(DEBUG_ERR, (__location__ " F_GETLK failed with %s\n", strerror(errno)));
2630                 return -1;
2631         }
2632
2633         if (lock.l_type == F_UNLCK) {
2634                 return -1;
2635         }
2636
2637         if (pread(fd, &c, 1, pnn) == -1) {
2638                 DEBUG(DEBUG_CRIT,(__location__ " failed read pnn count - %s\n", strerror(errno)));
2639                 return -1;
2640         }
2641
2642         return c;
2643 }
2644
2645 /*
2646   get capabilities of a remote node
2647  */
2648 struct ctdb_client_control_state *
2649 ctdb_ctrl_getcapabilities_send(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct timeval timeout, uint32_t destnode)
2650 {
2651         struct ctdb_client_control_state *state;
2652
2653         state = ctdb_control_send(ctdb, destnode, 0, 
2654                            CTDB_CONTROL_GET_CAPABILITIES, 0, tdb_null, 
2655                            mem_ctx, NULL);
2656         if (state != NULL && !timeval_is_zero(&timeout)) {
2657                 event_add_timed(ctdb->ev, state, timeout, ctdb_control_timeout_func, state);
2658         }
2659
2660         return state;
2661 }
2662
2663 int ctdb_ctrl_getcapabilities_recv(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx, struct ctdb_client_control_state *state, uint32_t *capabilities)
2664 {
2665         int ret;
2666         int32_t res;
2667         TDB_DATA outdata;
2668
2669         ret = ctdb_control_recv(ctdb, state, mem_ctx, &outdata, &res, NULL);
2670         if ( (ret != 0) || (res != 0) ) {
2671                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ctrl_getcapabilities_recv failed\n"));
2672                 return -1;
2673         }
2674
2675         if (capabilities) {
2676                 *capabilities = *((uint32_t *)outdata.dptr);
2677         }
2678
2679         return 0;
2680 }
2681
2682 int ctdb_ctrl_getcapabilities(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t *capabilities)
2683 {
2684         struct ctdb_client_control_state *state;
2685         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2686         int ret;
2687
2688         state = ctdb_ctrl_getcapabilities_send(ctdb, tmp_ctx, timeout, destnode);
2689         ret = ctdb_ctrl_getcapabilities_recv(ctdb, tmp_ctx, state, capabilities);
2690         talloc_free(tmp_ctx);
2691         return ret;
2692 }
2693
2694 /**
2695  * check whether a transaction is active on a given db on a given node
2696  */
2697 int32_t ctdb_ctrl_transaction_active(struct ctdb_context *ctdb,
2698                                      uint32_t destnode,
2699                                      uint32_t db_id)
2700 {
2701         int32_t status;
2702         int ret;
2703         TDB_DATA indata;
2704
2705         indata.dptr = (uint8_t *)&db_id;
2706         indata.dsize = sizeof(db_id);
2707
2708         ret = ctdb_control(ctdb, destnode, 0,
2709                            CTDB_CONTROL_TRANS2_ACTIVE,
2710                            0, indata, NULL, NULL, &status,
2711                            NULL, NULL);
2712
2713         if (ret != 0) {
2714                 DEBUG(DEBUG_ERR, (__location__ " ctdb control for transaction_active failed\n"));
2715                 return -1;
2716         }
2717
2718         return status;
2719 }
2720
2721
2722 struct ctdb_transaction_handle {
2723         struct ctdb_db_context *ctdb_db;
2724         bool in_replay;
2725         /*
2726          * we store the reads and writes done under a transaction:
2727          * - one list stores both reads and writes (m_all),
2728          * - the other just writes (m_write)
2729          */
2730         struct ctdb_marshall_buffer *m_all;
2731         struct ctdb_marshall_buffer *m_write;
2732 };
2733
2734 /* start a transaction on a database */
2735 static int ctdb_transaction_destructor(struct ctdb_transaction_handle *h)
2736 {
2737         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
2738         return 0;
2739 }
2740
2741 /* start a transaction on a database */
2742 static int ctdb_transaction_fetch_start(struct ctdb_transaction_handle *h)
2743 {
2744         struct ctdb_record_handle *rh;
2745         TDB_DATA key;
2746         TDB_DATA data;
2747         struct ctdb_ltdb_header header;
2748         TALLOC_CTX *tmp_ctx;
2749         const char *keyname = CTDB_TRANSACTION_LOCK_KEY;
2750         int ret;
2751         struct ctdb_db_context *ctdb_db = h->ctdb_db;
2752         pid_t pid;
2753         int32_t status;
2754
2755         key.dptr = discard_const(keyname);
2756         key.dsize = strlen(keyname);
2757
2758         if (!ctdb_db->persistent) {
2759                 DEBUG(DEBUG_ERR,(__location__ " Attempted transaction on non-persistent database\n"));
2760                 return -1;
2761         }
2762
2763 again:
2764         tmp_ctx = talloc_new(h);
2765
2766         rh = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, NULL);
2767         if (rh == NULL) {
2768                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch_lock database\n"));
2769                 talloc_free(tmp_ctx);
2770                 return -1;
2771         }
2772
2773         status = ctdb_ctrl_transaction_active(ctdb_db->ctdb,
2774                                               CTDB_CURRENT_NODE,
2775                                               ctdb_db->db_id);
2776         if (status == 1) {
2777                 unsigned long int usec = (1000 + random()) % 100000;
2778                 DEBUG(DEBUG_DEBUG, (__location__ " transaction is active "
2779                                     "on db_id[0x%08x]. waiting for %lu "
2780                                     "microseconds\n",
2781                                     ctdb_db->db_id, usec));
2782                 talloc_free(tmp_ctx);
2783                 usleep(usec);
2784                 goto again;
2785         }
2786
2787         /*
2788          * store the pid in the database:
2789          * it is not enough that the node is dmaster...
2790          */
2791         pid = getpid();
2792         data.dptr = (unsigned char *)&pid;
2793         data.dsize = sizeof(pid_t);
2794         rh->header.rsn++;
2795         rh->header.dmaster = ctdb_db->ctdb->pnn;
2796         ret = ctdb_ltdb_store(ctdb_db, key, &(rh->header), data);
2797         if (ret != 0) {
2798                 DEBUG(DEBUG_ERR, (__location__ " Failed to store pid in "
2799                                   "transaction record\n"));
2800                 talloc_free(tmp_ctx);
2801                 return -1;
2802         }
2803
2804         talloc_free(rh);
2805
2806         ret = tdb_transaction_start(ctdb_db->ltdb->tdb);
2807         if (ret != 0) {
2808                 DEBUG(DEBUG_ERR,(__location__ " Failed to start tdb transaction\n"));
2809                 talloc_free(tmp_ctx);
2810                 return -1;
2811         }
2812
2813         ret = ctdb_ltdb_fetch(ctdb_db, key, &header, tmp_ctx, &data);
2814         if (ret != 0) {
2815                 DEBUG(DEBUG_ERR,(__location__ " Failed to re-fetch transaction "
2816                                  "lock record inside transaction\n"));
2817                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2818                 talloc_free(tmp_ctx);
2819                 goto again;
2820         }
2821
2822         if (header.dmaster != ctdb_db->ctdb->pnn) {
2823                 DEBUG(DEBUG_DEBUG,(__location__ " not dmaster any more on "
2824                                    "transaction lock record\n"));
2825                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2826                 talloc_free(tmp_ctx);
2827                 goto again;
2828         }
2829
2830         if ((data.dsize != sizeof(pid_t)) || (*(pid_t *)(data.dptr) != pid)) {
2831                 DEBUG(DEBUG_DEBUG, (__location__ " my pid is not stored in "
2832                                     "the transaction lock record\n"));
2833                 tdb_transaction_cancel(ctdb_db->ltdb->tdb);
2834                 talloc_free(tmp_ctx);
2835                 goto again;
2836         }
2837
2838         talloc_free(tmp_ctx);
2839
2840         return 0;
2841 }
2842
2843
2844 /* start a transaction on a database */
2845 struct ctdb_transaction_handle *ctdb_transaction_start(struct ctdb_db_context *ctdb_db,
2846                                                        TALLOC_CTX *mem_ctx)
2847 {
2848         struct ctdb_transaction_handle *h;
2849         int ret;
2850
2851         h = talloc_zero(mem_ctx, struct ctdb_transaction_handle);
2852         if (h == NULL) {
2853                 DEBUG(DEBUG_ERR,(__location__ " oom for transaction handle\n"));                
2854                 return NULL;
2855         }
2856
2857         h->ctdb_db = ctdb_db;
2858
2859         ret = ctdb_transaction_fetch_start(h);
2860         if (ret != 0) {
2861                 talloc_free(h);
2862                 return NULL;
2863         }
2864
2865         talloc_set_destructor(h, ctdb_transaction_destructor);
2866
2867         return h;
2868 }
2869
2870
2871
2872 /*
2873   fetch a record inside a transaction
2874  */
2875 int ctdb_transaction_fetch(struct ctdb_transaction_handle *h, 
2876                            TALLOC_CTX *mem_ctx, 
2877                            TDB_DATA key, TDB_DATA *data)
2878 {
2879         struct ctdb_ltdb_header header;
2880         int ret;
2881
2882         ZERO_STRUCT(header);
2883
2884         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, mem_ctx, data);
2885         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2886                 /* record doesn't exist yet */
2887                 *data = tdb_null;
2888                 ret = 0;
2889         }
2890         
2891         if (ret != 0) {
2892                 return ret;
2893         }
2894
2895         if (!h->in_replay) {
2896                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 1, key, NULL, *data);
2897                 if (h->m_all == NULL) {
2898                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2899                         return -1;
2900                 }
2901         }
2902
2903         return 0;
2904 }
2905
2906 /*
2907   stores a record inside a transaction
2908  */
2909 int ctdb_transaction_store(struct ctdb_transaction_handle *h, 
2910                            TDB_DATA key, TDB_DATA data)
2911 {
2912         TALLOC_CTX *tmp_ctx = talloc_new(h);
2913         struct ctdb_ltdb_header header;
2914         TDB_DATA olddata;
2915         int ret;
2916
2917         ZERO_STRUCT(header);
2918
2919         /* we need the header so we can update the RSN */
2920         ret = ctdb_ltdb_fetch(h->ctdb_db, key, &header, tmp_ctx, &olddata);
2921         if (ret == -1 && header.dmaster == (uint32_t)-1) {
2922                 /* the record doesn't exist - create one with us as dmaster.
2923                    This is only safe because we are in a transaction and this
2924                    is a persistent database */
2925                 ZERO_STRUCT(header);
2926         } else if (ret != 0) {
2927                 DEBUG(DEBUG_ERR,(__location__ " Failed to fetch record\n"));
2928                 talloc_free(tmp_ctx);
2929                 return ret;
2930         }
2931
2932         if (data.dsize == olddata.dsize &&
2933             memcmp(data.dptr, olddata.dptr, data.dsize) == 0) {
2934                 /* save writing the same data */
2935                 talloc_free(tmp_ctx);
2936                 return 0;
2937         }
2938
2939         header.dmaster = h->ctdb_db->ctdb->pnn;
2940         header.rsn++;
2941
2942         if (!h->in_replay) {
2943                 h->m_all = ctdb_marshall_add(h, h->m_all, h->ctdb_db->db_id, 0, key, NULL, data);
2944                 if (h->m_all == NULL) {
2945                         DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2946                         talloc_free(tmp_ctx);
2947                         return -1;
2948                 }
2949         }               
2950
2951         h->m_write = ctdb_marshall_add(h, h->m_write, h->ctdb_db->db_id, 0, key, &header, data);
2952         if (h->m_write == NULL) {
2953                 DEBUG(DEBUG_ERR,(__location__ " Failed to add to marshalling record\n"));
2954                 talloc_free(tmp_ctx);
2955                 return -1;
2956         }
2957         
2958         ret = ctdb_ltdb_store(h->ctdb_db, key, &header, data);
2959
2960         talloc_free(tmp_ctx);
2961         
2962         return ret;
2963 }
2964
2965 /*
2966   replay a transaction
2967  */
2968 static int ctdb_replay_transaction(struct ctdb_transaction_handle *h)
2969 {
2970         int ret, i;
2971         struct ctdb_rec_data *rec = NULL;
2972
2973         h->in_replay = true;
2974         talloc_free(h->m_write);
2975         h->m_write = NULL;
2976
2977         ret = ctdb_transaction_fetch_start(h);
2978         if (ret != 0) {
2979                 return ret;
2980         }
2981
2982         for (i=0;i<h->m_all->count;i++) {
2983                 TDB_DATA key, data;
2984
2985                 rec = ctdb_marshall_loop_next(h->m_all, rec, NULL, NULL, &key, &data);
2986                 if (rec == NULL) {
2987                         DEBUG(DEBUG_ERR, (__location__ " Out of records in ctdb_replay_transaction?\n"));
2988                         goto failed;
2989                 }
2990
2991                 if (rec->reqid == 0) {
2992                         /* its a store */
2993                         if (ctdb_transaction_store(h, key, data) != 0) {
2994                                 goto failed;
2995                         }
2996                 } else {
2997                         TDB_DATA data2;
2998                         TALLOC_CTX *tmp_ctx = talloc_new(h);
2999
3000                         if (ctdb_transaction_fetch(h, tmp_ctx, key, &data2) != 0) {
3001                                 talloc_free(tmp_ctx);
3002                                 goto failed;
3003                         }
3004                         if (data2.dsize != data.dsize ||
3005                             memcmp(data2.dptr, data.dptr, data.dsize) != 0) {
3006                                 /* the record has changed on us - we have to give up */
3007                                 talloc_free(tmp_ctx);
3008                                 goto failed;
3009                         }
3010                         talloc_free(tmp_ctx);
3011                 }
3012         }
3013         
3014         return 0;
3015
3016 failed:
3017         tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3018         return -1;
3019 }
3020
3021
3022 /*
3023   commit a transaction
3024  */
3025 int ctdb_transaction_commit(struct ctdb_transaction_handle *h)
3026 {
3027         int ret, retries=0;
3028         int32_t status;
3029         struct ctdb_context *ctdb = h->ctdb_db->ctdb;
3030         struct timeval timeout;
3031         enum ctdb_controls failure_control = CTDB_CONTROL_TRANS2_ERROR;
3032
3033         talloc_set_destructor(h, NULL);
3034
3035         /* our commit strategy is quite complex.
3036
3037            - we first try to commit the changes to all other nodes
3038
3039            - if that works, then we commit locally and we are done
3040
3041            - if a commit on another node fails, then we need to cancel
3042              the transaction, then restart the transaction (thus
3043              opening a window of time for a pending recovery to
3044              complete), then replay the transaction, checking all the
3045              reads and writes (checking that reads give the same data,
3046              and writes succeed). Then we retry the transaction to the
3047              other nodes
3048         */
3049
3050 again:
3051         if (h->m_write == NULL) {
3052                 /* no changes were made */
3053                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3054                 talloc_free(h);
3055                 return 0;
3056         }
3057
3058         /* tell ctdbd to commit to the other nodes */
3059         timeout = timeval_current_ofs(1, 0);
3060         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3061                            retries==0?CTDB_CONTROL_TRANS2_COMMIT:CTDB_CONTROL_TRANS2_COMMIT_RETRY, 0, 
3062                            ctdb_marshall_finish(h->m_write), NULL, NULL, &status, 
3063                            &timeout, NULL);
3064         if (ret != 0 || status != 0) {
3065                 tdb_transaction_cancel(h->ctdb_db->ltdb->tdb);
3066                 DEBUG(DEBUG_NOTICE, (__location__ " transaction commit%s failed"
3067                                      ", retrying after 1 second...\n",
3068                                      (retries==0)?"":"retry "));
3069                 sleep(1);
3070
3071                 if (ret != 0) {
3072                         failure_control = CTDB_CONTROL_TRANS2_ERROR;
3073                 } else {
3074                         /* work out what error code we will give if we 
3075                            have to fail the operation */
3076                         switch ((enum ctdb_trans2_commit_error)status) {
3077                         case CTDB_TRANS2_COMMIT_SUCCESS:
3078                         case CTDB_TRANS2_COMMIT_SOMEFAIL:
3079                         case CTDB_TRANS2_COMMIT_TIMEOUT:
3080                                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3081                                 break;
3082                         case CTDB_TRANS2_COMMIT_ALLFAIL:
3083                                 failure_control = CTDB_CONTROL_TRANS2_FINISHED;
3084                                 break;
3085                         }
3086                 }
3087
3088                 if (++retries == 100) {
3089                         DEBUG(DEBUG_ERR,(__location__ " Giving up transaction on db 0x%08x after %d retries failure_control=%u\n", 
3090                                          h->ctdb_db->db_id, retries, (unsigned)failure_control));
3091                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3092                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3093                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3094                         talloc_free(h);
3095                         return -1;
3096                 }               
3097
3098                 if (ctdb_replay_transaction(h) != 0) {
3099                         DEBUG(DEBUG_ERR, (__location__ " Failed to replay "
3100                                           "transaction on db 0x%08x, "
3101                                           "failure control =%u\n",
3102                                           h->ctdb_db->db_id,
3103                                           (unsigned)failure_control));
3104                         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3105                                      failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3106                                      tdb_null, NULL, NULL, NULL, NULL, NULL);           
3107                         talloc_free(h);
3108                         return -1;
3109                 }
3110                 goto again;
3111         } else {
3112                 failure_control = CTDB_CONTROL_TRANS2_ERROR;
3113         }
3114
3115         /* do the real commit locally */
3116         ret = tdb_transaction_commit(h->ctdb_db->ltdb->tdb);
3117         if (ret != 0) {
3118                 DEBUG(DEBUG_ERR, (__location__ " Failed to commit transaction "
3119                                   "on db id 0x%08x locally, "
3120                                   "failure_control=%u\n",
3121                                   h->ctdb_db->db_id,
3122                                   (unsigned)failure_control));
3123                 ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3124                              failure_control, CTDB_CTRL_FLAG_NOREPLY, 
3125                              tdb_null, NULL, NULL, NULL, NULL, NULL);           
3126                 talloc_free(h);
3127                 return ret;
3128         }
3129
3130         /* tell ctdbd that we are finished with our local commit */
3131         ctdb_control(ctdb, CTDB_CURRENT_NODE, h->ctdb_db->db_id, 
3132                      CTDB_CONTROL_TRANS2_FINISHED, CTDB_CTRL_FLAG_NOREPLY, 
3133                      tdb_null, NULL, NULL, NULL, NULL, NULL);
3134         talloc_free(h);
3135         return 0;
3136 }
3137
3138 /*
3139   recovery daemon ping to main daemon
3140  */
3141 int ctdb_ctrl_recd_ping(struct ctdb_context *ctdb)
3142 {
3143         int ret;
3144         int32_t res;
3145
3146         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_PING, 0, tdb_null, 
3147                            ctdb, NULL, &res, NULL, NULL);
3148         if (ret != 0 || res != 0) {
3149                 DEBUG(DEBUG_ERR,("Failed to send recd ping\n"));
3150                 return -1;
3151         }
3152
3153         return 0;
3154 }
3155
3156 /* when forking the main daemon and the child process needs to connect back
3157  * to the daemon as a client process, this function can be used to change
3158  * the ctdb context from daemon into client mode
3159  */
3160 int switch_from_server_to_client(struct ctdb_context *ctdb)
3161 {
3162         int ret;
3163
3164         /* shutdown the transport */
3165         if (ctdb->methods) {
3166                 ctdb->methods->shutdown(ctdb);
3167         }
3168
3169         /* get a new event context */
3170         talloc_free(ctdb->ev);
3171         ctdb->ev = event_context_init(ctdb);
3172
3173         close(ctdb->daemon.sd);
3174         ctdb->daemon.sd = -1;
3175
3176         /* initialise ctdb */
3177         ret = ctdb_socket_connect(ctdb);
3178         if (ret != 0) {
3179                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
3180                 return -1;
3181         }
3182
3183          return 0;
3184 }
3185
3186 /*
3187   get the status of running the monitor eventscripts: NULL means never run.
3188  */
3189 int ctdb_ctrl_getscriptstatus(struct ctdb_context *ctdb, 
3190                 struct timeval timeout, uint32_t destnode, 
3191                 TALLOC_CTX *mem_ctx, enum ctdb_eventscript_call type,
3192                 struct ctdb_scripts_wire **script_status)
3193 {
3194         int ret;
3195         TDB_DATA outdata, indata;
3196         int32_t res;
3197         uint32_t uinttype = type;
3198
3199         indata.dptr = (uint8_t *)&uinttype;
3200         indata.dsize = sizeof(uinttype);
3201
3202         ret = ctdb_control(ctdb, destnode, 0, 
3203                            CTDB_CONTROL_GET_EVENT_SCRIPT_STATUS, 0, indata,
3204                            mem_ctx, &outdata, &res, &timeout, NULL);
3205         if (ret != 0 || res != 0) {
3206                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for getscriptstatus failed ret:%d res:%d\n", ret, res));
3207                 return -1;
3208         }
3209
3210         if (outdata.dsize == 0) {
3211                 *script_status = NULL;
3212         } else {
3213                 *script_status = (struct ctdb_scripts_wire *)talloc_memdup(mem_ctx, outdata.dptr, outdata.dsize);
3214                 talloc_free(outdata.dptr);
3215         }
3216                     
3217         return 0;
3218 }
3219
3220 /*
3221   tell the main daemon how long it took to lock the reclock file
3222  */
3223 int ctdb_ctrl_report_recd_lock_latency(struct ctdb_context *ctdb, struct timeval timeout, double latency)
3224 {
3225         int ret;
3226         int32_t res;
3227         TDB_DATA data;
3228
3229         data.dptr = (uint8_t *)&latency;
3230         data.dsize = sizeof(latency);
3231
3232         ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_RECD_RECLOCK_LATENCY, 0, data, 
3233                            ctdb, NULL, &res, NULL, NULL);
3234         if (ret != 0 || res != 0) {
3235                 DEBUG(DEBUG_ERR,("Failed to send recd reclock latency\n"));
3236                 return -1;
3237         }
3238
3239         return 0;
3240 }
3241
3242 /*
3243   get the name of the reclock file
3244  */
3245 int ctdb_ctrl_getreclock(struct ctdb_context *ctdb, struct timeval timeout,
3246                          uint32_t destnode, TALLOC_CTX *mem_ctx,
3247                          const char **name)
3248 {
3249         int ret;
3250         int32_t res;
3251         TDB_DATA data;
3252
3253         ret = ctdb_control(ctdb, destnode, 0, 
3254                            CTDB_CONTROL_GET_RECLOCK_FILE, 0, tdb_null, 
3255                            mem_ctx, &data, &res, &timeout, NULL);
3256         if (ret != 0 || res != 0) {
3257                 return -1;
3258         }
3259
3260         if (data.dsize == 0) {
3261                 *name = NULL;
3262         } else {
3263                 *name = talloc_strdup(mem_ctx, discard_const(data.dptr));
3264         }
3265         talloc_free(data.dptr);
3266
3267         return 0;
3268 }
3269
3270 /*
3271   set the reclock filename for a node
3272  */
3273 int ctdb_ctrl_setreclock(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *reclock)
3274 {
3275         int ret;
3276         TDB_DATA data;
3277         int32_t res;
3278
3279         if (reclock == NULL) {
3280                 data.dsize = 0;
3281                 data.dptr  = NULL;
3282         } else {
3283                 data.dsize = strlen(reclock) + 1;
3284                 data.dptr  = discard_const(reclock);
3285         }
3286
3287         ret = ctdb_control(ctdb, destnode, 0, 
3288                            CTDB_CONTROL_SET_RECLOCK_FILE, 0, data, 
3289                            NULL, NULL, &res, &timeout, NULL);
3290         if (ret != 0 || res != 0) {
3291                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setreclock failed\n"));
3292                 return -1;
3293         }
3294
3295         return 0;
3296 }
3297
3298 /*
3299   stop a node
3300  */
3301 int ctdb_ctrl_stop_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3302 {
3303         int ret;
3304         int32_t res;
3305
3306         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_STOP_NODE, 0, tdb_null, 
3307                            ctdb, NULL, &res, &timeout, NULL);
3308         if (ret != 0 || res != 0) {
3309                 DEBUG(DEBUG_ERR,("Failed to stop node\n"));
3310                 return -1;
3311         }
3312
3313         return 0;
3314 }
3315
3316 /*
3317   continue a node
3318  */
3319 int ctdb_ctrl_continue_node(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode)
3320 {
3321         int ret;
3322
3323         ret = ctdb_control(ctdb, destnode, 0, CTDB_CONTROL_CONTINUE_NODE, 0, tdb_null, 
3324                            ctdb, NULL, NULL, &timeout, NULL);
3325         if (ret != 0) {
3326                 DEBUG(DEBUG_ERR,("Failed to continue node\n"));
3327                 return -1;
3328         }
3329
3330         return 0;
3331 }
3332
3333 /*
3334   set the natgw state for a node
3335  */
3336 int ctdb_ctrl_setnatgwstate(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t natgwstate)
3337 {
3338         int ret;
3339         TDB_DATA data;
3340         int32_t res;
3341
3342         data.dsize = sizeof(natgwstate);
3343         data.dptr  = (uint8_t *)&natgwstate;
3344
3345         ret = ctdb_control(ctdb, destnode, 0, 
3346                            CTDB_CONTROL_SET_NATGWSTATE, 0, data, 
3347                            NULL, NULL, &res, &timeout, NULL);
3348         if (ret != 0 || res != 0) {
3349                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setnatgwstate failed\n"));
3350                 return -1;
3351         }
3352
3353         return 0;
3354 }
3355
3356 /*
3357   set the lmaster role for a node
3358  */
3359 int ctdb_ctrl_setlmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t lmasterrole)
3360 {
3361         int ret;
3362         TDB_DATA data;
3363         int32_t res;
3364
3365         data.dsize = sizeof(lmasterrole);
3366         data.dptr  = (uint8_t *)&lmasterrole;
3367
3368         ret = ctdb_control(ctdb, destnode, 0, 
3369                            CTDB_CONTROL_SET_LMASTERROLE, 0, data, 
3370                            NULL, NULL, &res, &timeout, NULL);
3371         if (ret != 0 || res != 0) {
3372                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setlmasterrole failed\n"));
3373                 return -1;
3374         }
3375
3376         return 0;
3377 }
3378
3379 /*
3380   set the recmaster role for a node
3381  */
3382 int ctdb_ctrl_setrecmasterrole(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t recmasterrole)
3383 {
3384         int ret;
3385         TDB_DATA data;
3386         int32_t res;
3387
3388         data.dsize = sizeof(recmasterrole);
3389         data.dptr  = (uint8_t *)&recmasterrole;
3390
3391         ret = ctdb_control(ctdb, destnode, 0, 
3392                            CTDB_CONTROL_SET_RECMASTERROLE, 0, data, 
3393                            NULL, NULL, &res, &timeout, NULL);
3394         if (ret != 0 || res != 0) {
3395                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for setrecmasterrole failed\n"));
3396                 return -1;
3397         }
3398
3399         return 0;
3400 }
3401
3402 /* enable an eventscript
3403  */
3404 int ctdb_ctrl_enablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3405 {
3406         int ret;
3407         TDB_DATA data;
3408         int32_t res;
3409
3410         data.dsize = strlen(script) + 1;
3411         data.dptr  = discard_const(script);
3412
3413         ret = ctdb_control(ctdb, destnode, 0, 
3414                            CTDB_CONTROL_ENABLE_SCRIPT, 0, data, 
3415                            NULL, NULL, &res, &timeout, NULL);
3416         if (ret != 0 || res != 0) {
3417                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for enablescript failed\n"));
3418                 return -1;
3419         }
3420
3421         return 0;
3422 }
3423
3424 /* disable an eventscript
3425  */
3426 int ctdb_ctrl_disablescript(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, const char *script)
3427 {
3428         int ret;
3429         TDB_DATA data;
3430         int32_t res;
3431
3432         data.dsize = strlen(script) + 1;
3433         data.dptr  = discard_const(script);
3434
3435         ret = ctdb_control(ctdb, destnode, 0, 
3436                            CTDB_CONTROL_DISABLE_SCRIPT, 0, data, 
3437                            NULL, NULL, &res, &timeout, NULL);
3438         if (ret != 0 || res != 0) {
3439                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for disablescript failed\n"));
3440                 return -1;
3441         }
3442
3443         return 0;
3444 }
3445
3446
3447 int ctdb_ctrl_set_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_ban_time *bantime)
3448 {
3449         int ret;
3450         TDB_DATA data;
3451         int32_t res;
3452
3453         data.dsize = sizeof(*bantime);
3454         data.dptr  = (uint8_t *)bantime;
3455
3456         ret = ctdb_control(ctdb, destnode, 0, 
3457                            CTDB_CONTROL_SET_BAN_STATE, 0, data, 
3458                            NULL, NULL, &res, &timeout, NULL);
3459         if (ret != 0 || res != 0) {
3460                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3461                 return -1;
3462         }
3463
3464         return 0;
3465 }
3466
3467
3468 int ctdb_ctrl_get_ban(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, struct ctdb_ban_time **bantime)
3469 {
3470         int ret;
3471         TDB_DATA outdata;
3472         int32_t res;
3473         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3474
3475         ret = ctdb_control(ctdb, destnode, 0, 
3476                            CTDB_CONTROL_GET_BAN_STATE, 0, tdb_null,
3477                            tmp_ctx, &outdata, &res, &timeout, NULL);
3478         if (ret != 0 || res != 0) {
3479                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set ban state failed\n"));
3480                 talloc_free(tmp_ctx);
3481                 return -1;
3482         }
3483
3484         *bantime = (struct ctdb_ban_time *)talloc_steal(mem_ctx, outdata.dptr);
3485         talloc_free(tmp_ctx);
3486
3487         return 0;
3488 }
3489
3490
3491 int ctdb_ctrl_set_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, struct ctdb_db_priority *db_prio)
3492 {
3493         int ret;
3494         int32_t res;
3495         TDB_DATA data;
3496         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3497
3498         data.dptr = (uint8_t*)db_prio;
3499         data.dsize = sizeof(*db_prio);
3500
3501         ret = ctdb_control(ctdb, destnode, 0, 
3502                            CTDB_CONTROL_SET_DB_PRIORITY, 0, data,
3503                            tmp_ctx, NULL, &res, &timeout, NULL);
3504         if (ret != 0 || res != 0) {
3505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3506                 talloc_free(tmp_ctx);
3507                 return -1;
3508         }
3509
3510         talloc_free(tmp_ctx);
3511
3512         return 0;
3513 }
3514
3515 int ctdb_ctrl_get_db_priority(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t db_id, uint32_t *priority)
3516 {
3517         int ret;
3518         int32_t res;
3519         TDB_DATA data;
3520         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3521
3522         data.dptr = (uint8_t*)&db_id;
3523         data.dsize = sizeof(db_id);
3524
3525         ret = ctdb_control(ctdb, destnode, 0, 
3526                            CTDB_CONTROL_GET_DB_PRIORITY, 0, data,
3527                            tmp_ctx, NULL, &res, &timeout, NULL);
3528         if (ret != 0 || res < 0) {
3529                 DEBUG(DEBUG_ERR,(__location__ " ctdb_control for set_db_priority failed\n"));
3530                 talloc_free(tmp_ctx);
3531                 return -1;
3532         }
3533
3534         if (priority) {
3535                 *priority = res;
3536         }
3537
3538         talloc_free(tmp_ctx);
3539
3540         return 0;
3541 }
3542
3543 /* time out handler for ctdb_control */
3544 void ctdb_control_timeout_func(struct event_context *ev, struct timed_event *te, 
3545         struct timeval t, void *private_data)
3546 {
3547         struct ctdb_client_control_state *state = talloc_get_type(private_data, struct ctdb_client_control_state);
3548
3549         DEBUG(DEBUG_ERR,(__location__ " control timed out. reqid:%u opcode:%u "
3550                          "dstnode:%u\n", state->reqid, state->c->opcode,
3551                          state->c->hdr.destnode));
3552
3553         state->state = CTDB_CONTROL_TIMEOUT;
3554
3555         /* if we had a callback registered for this control, pull the response
3556            and call the callback.
3557         */
3558         if (state->async.fn) {
3559                 event_add_timed(state->ctdb->ev, state, timeval_zero(), ctdb_invoke_control_callback, state);
3560         }
3561 }
3562