RECOVER: When we pull databases during recovery, we used to reallocate the databuffer...
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, see <http://www.gnu.org/licenses/>.
19 */
20 #include "includes.h"
21 #include "lib/tdb/include/tdb.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/wait.h"
26 #include "../include/ctdb_private.h"
27 #include "lib/util/dlinklist.h"
28 #include "db_wrap.h"
29
30 /*
31   lock all databases - mark only
32  */
33 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb, uint32_t priority)
34 {
35         struct ctdb_db_context *ctdb_db;
36
37         /* these are internal tdb functions */
38         int tdb_transaction_write_lock_mark(struct tdb_context *tdb);
39         int tdb_transaction_write_lock_unmark(struct tdb_context *tdb);
40
41         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
42                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
43                 return -1;
44         }
45
46         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
47                 DEBUG(DEBUG_ERR,("Attempt to mark all databases locked when not frozen\n"));
48                 return -1;
49         }
50         /* The dual loop is a woraround for older versions of samba
51            that does not yet support the set-db-priority/lock order
52            call. So that we get basic deadlock avoiidance also for
53            these old versions of samba.
54            This code will be removed in the future.
55         */
56         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
57                 if (ctdb_db->priority != priority) {
58                         continue;
59                 }
60                 if (strstr(ctdb_db->db_name, "notify") != NULL) {
61                         continue;
62                 }
63                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
64                         return -1;
65                 }
66                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
67                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
68                         return -1;
69                 }
70         }
71         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
72                 if (ctdb_db->priority != priority) {
73                         continue;
74                 }
75                 if (strstr(ctdb_db->db_name, "notify") == NULL) {
76                         continue;
77                 }
78                 if (tdb_transaction_write_lock_mark(ctdb_db->ltdb->tdb) != 0) {
79                         return -1;
80                 }
81                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
82                         tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
83                         return -1;
84                 }
85         }
86         return 0;
87 }
88
89 /*
90   lock all databases - unmark only
91  */
92 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb, uint32_t priority)
93 {
94         struct ctdb_db_context *ctdb_db;
95
96         /* this is an internal tdb functions */
97         int tdb_transaction_write_lock_unmark(struct tdb_context *tdb);
98
99         if ((priority < 1) || (priority > NUM_DB_PRIORITIES)) {
100                 DEBUG(DEBUG_ERR,(__location__ " Illegal priority when trying to mark all databases Prio:%u\n", priority));
101                 return -1;
102         }
103
104         if (ctdb->freeze_mode[priority] != CTDB_FREEZE_FROZEN) {
105                 DEBUG(DEBUG_ERR,("Attempt to unmark all databases locked when not frozen\n"));
106                 return -1;
107         }
108         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
109                 if (ctdb_db->priority != priority) {
110                         continue;
111                 }
112                 tdb_transaction_write_lock_unmark(ctdb_db->ltdb->tdb);
113                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
114                         return -1;
115                 }
116         }
117         return 0;
118 }
119
120
121 int 
122 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
123 {
124         CHECK_CONTROL_DATA_SIZE(0);
125         struct ctdb_vnn_map_wire *map;
126         size_t len;
127
128         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
129         map = talloc_size(outdata, len);
130         CTDB_NO_MEMORY(ctdb, map);
131
132         map->generation = ctdb->vnn_map->generation;
133         map->size = ctdb->vnn_map->size;
134         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
135
136         outdata->dsize = len;
137         outdata->dptr  = (uint8_t *)map;
138
139         return 0;
140 }
141
142 int 
143 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
144 {
145         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
146         int i;
147
148         for(i=1; i<=NUM_DB_PRIORITIES; i++) {
149                 if (ctdb->freeze_mode[i] != CTDB_FREEZE_FROZEN) {
150                         DEBUG(DEBUG_ERR,("Attempt to set vnnmap when not frozen\n"));
151                         return -1;
152                 }
153         }
154
155         talloc_free(ctdb->vnn_map);
156
157         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
158         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
159
160         ctdb->vnn_map->generation = map->generation;
161         ctdb->vnn_map->size       = map->size;
162         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
163         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
164
165         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
166
167         return 0;
168 }
169
170 int 
171 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
172 {
173         uint32_t i, len;
174         struct ctdb_db_context *ctdb_db;
175         struct ctdb_dbid_map *dbid_map;
176
177         CHECK_CONTROL_DATA_SIZE(0);
178
179         len = 0;
180         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
181                 len++;
182         }
183
184
185         outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
186         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
187         if (!outdata->dptr) {
188                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate dbmap array\n"));
189                 exit(1);
190         }
191
192         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
193         dbid_map->num = len;
194         for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
195                 dbid_map->dbs[i].dbid       = ctdb_db->db_id;
196                 if (ctdb_db->persistent != 0) {
197                         dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_PERSISTENT;
198                 }
199                 if (ctdb_db->readonly != 0) {
200                         dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_READONLY;
201                 }
202                 if (ctdb_db->sticky != 0) {
203                         dbid_map->dbs[i].flags |= CTDB_DB_FLAGS_STICKY;
204                 }
205         }
206
207         return 0;
208 }
209
210 int 
211 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
212 {
213         uint32_t i, num_nodes;
214         struct ctdb_node_map *node_map;
215
216         CHECK_CONTROL_DATA_SIZE(0);
217
218         num_nodes = ctdb->num_nodes;
219
220         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
221         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
222         if (!outdata->dptr) {
223                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
224                 exit(1);
225         }
226
227         node_map = (struct ctdb_node_map *)outdata->dptr;
228         node_map->num = num_nodes;
229         for (i=0; i<num_nodes; i++) {
230                 if (parse_ip(ctdb->nodes[i]->address.address,
231                              NULL, /* TODO: pass in the correct interface here*/
232                              0,
233                              &node_map->nodes[i].addr) == 0)
234                 {
235                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
236                 }
237
238                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
239                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
240         }
241
242         return 0;
243 }
244
245 /*
246    get an old style ipv4-only nodemap
247 */
248 int 
249 ctdb_control_getnodemapv4(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
250 {
251         uint32_t i, num_nodes;
252         struct ctdb_node_mapv4 *node_map;
253
254         CHECK_CONTROL_DATA_SIZE(0);
255
256         num_nodes = ctdb->num_nodes;
257
258         outdata->dsize = offsetof(struct ctdb_node_mapv4, nodes) + num_nodes*sizeof(struct ctdb_node_and_flagsv4);
259         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
260         if (!outdata->dptr) {
261                 DEBUG(DEBUG_ALERT, (__location__ " Failed to allocate nodemap array\n"));
262                 exit(1);
263         }
264
265         node_map = (struct ctdb_node_mapv4 *)outdata->dptr;
266         node_map->num = num_nodes;
267         for (i=0; i<num_nodes; i++) {
268                 if (parse_ipv4(ctdb->nodes[i]->address.address, 0, &node_map->nodes[i].sin) == 0) {
269                         DEBUG(DEBUG_ERR, (__location__ " Failed to parse %s into a sockaddr\n", ctdb->nodes[i]->address.address));
270                         return -1;
271                 }
272
273                 node_map->nodes[i].pnn   = ctdb->nodes[i]->pnn;
274                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
275         }
276
277         return 0;
278 }
279
280 static void
281 ctdb_reload_nodes_event(struct event_context *ev, struct timed_event *te, 
282                                struct timeval t, void *private_data)
283 {
284         int i, num_nodes;
285         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
286         TALLOC_CTX *tmp_ctx;
287         struct ctdb_node **nodes;       
288
289         tmp_ctx = talloc_new(ctdb);
290
291         /* steal the old nodes file for a while */
292         talloc_steal(tmp_ctx, ctdb->nodes);
293         nodes = ctdb->nodes;
294         ctdb->nodes = NULL;
295         num_nodes = ctdb->num_nodes;
296         ctdb->num_nodes = 0;
297
298         /* load the new nodes file */
299         ctdb_load_nodes_file(ctdb);
300
301         for (i=0; i<ctdb->num_nodes; i++) {
302                 /* keep any identical pre-existing nodes and connections */
303                 if ((i < num_nodes) && ctdb_same_address(&ctdb->nodes[i]->address, &nodes[i]->address)) {
304                         talloc_free(ctdb->nodes[i]);
305                         ctdb->nodes[i] = talloc_steal(ctdb->nodes, nodes[i]);
306                         continue;
307                 }
308
309                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
310                         continue;
311                 }
312
313                 /* any new or different nodes must be added */
314                 if (ctdb->methods->add_node(ctdb->nodes[i]) != 0) {
315                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_node failed at %d\n", i));
316                         ctdb_fatal(ctdb, "failed to add node. shutting down\n");
317                 }
318                 if (ctdb->methods->connect_node(ctdb->nodes[i]) != 0) {
319                         DEBUG(DEBUG_CRIT, (__location__ " methods->add_connect failed at %d\n", i));
320                         ctdb_fatal(ctdb, "failed to connect to node. shutting down\n");
321                 }
322         }
323
324         /* tell the recovery daemon to reaload the nodes file too */
325         ctdb_daemon_send_message(ctdb, ctdb->pnn, CTDB_SRVID_RELOAD_NODES, tdb_null);
326
327         talloc_free(tmp_ctx);
328         return;
329 }
330
331 /*
332   reload the nodes file after a short delay (so that we can send the response
333   back first
334 */
335 int 
336 ctdb_control_reload_nodes_file(struct ctdb_context *ctdb, uint32_t opcode)
337 {
338         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_reload_nodes_event, ctdb);
339
340         return 0;
341 }
342
343 /* 
344    a traverse function for pulling all relevent records from pulldb
345  */
346 struct pulldb_data {
347         struct ctdb_context *ctdb;
348         struct ctdb_db_context *ctdb_db;
349         struct ctdb_marshall_buffer *pulldata;
350         uint32_t len;
351         uint32_t allocated_len;
352         bool failed;
353 };
354
355 static int traverse_pulldb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
356 {
357         struct pulldb_data *params = (struct pulldb_data *)p;
358         struct ctdb_rec_data *rec;
359         struct ctdb_context *ctdb = params->ctdb;
360         struct ctdb_db_context *ctdb_db = params->ctdb_db;
361
362         /* add the record to the blob */
363         rec = ctdb_marshall_record(params->pulldata, 0, key, NULL, data);
364         if (rec == NULL) {
365                 params->failed = true;
366                 return -1;
367         }
368         if (params->len + rec->length >= params->allocated_len) {
369                 params->allocated_len = rec->length + params->len + ctdb->tunable.pulldb_preallocation_size;
370                 params->pulldata = talloc_realloc_size(NULL, params->pulldata, params->allocated_len);
371         }
372         if (params->pulldata == NULL) {
373                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand pulldb_data to %u\n", rec->length + params->len));
374                 ctdb_fatal(params->ctdb, "failed to allocate memory for recovery. shutting down\n");
375         }
376         params->pulldata->count++;
377         memcpy(params->len+(uint8_t *)params->pulldata, rec, rec->length);
378         params->len += rec->length;
379
380         if (ctdb->tunable.db_record_size_warn != 0 && rec->length > ctdb->tunable.db_record_size_warn) {
381                 DEBUG(DEBUG_ERR,("Data record in %s is big. Record size is %d bytes\n", ctdb_db->db_name, (int)rec->length));
382         }
383
384         talloc_free(rec);
385
386         return 0;
387 }
388
389 /*
390   pul a bunch of records from a ltdb, filtering by lmaster
391  */
392 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
393 {
394         struct ctdb_control_pulldb *pull;
395         struct ctdb_db_context *ctdb_db;
396         struct pulldb_data params;
397         struct ctdb_marshall_buffer *reply;
398
399         pull = (struct ctdb_control_pulldb *)indata.dptr;
400         
401         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
402         if (!ctdb_db) {
403                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", pull->db_id));
404                 return -1;
405         }
406
407         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
408                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_pull_db when not frozen\n"));
409                 return -1;
410         }
411
412         reply = talloc_zero(outdata, struct ctdb_marshall_buffer);
413         CTDB_NO_MEMORY(ctdb, reply);
414
415         reply->db_id = pull->db_id;
416
417         params.ctdb = ctdb;
418         params.ctdb_db = ctdb_db;
419         params.pulldata = reply;
420         params.len = offsetof(struct ctdb_marshall_buffer, data);
421         params.allocated_len = params.len;
422         params.failed = false;
423
424         if (ctdb_db->unhealthy_reason) {
425                 /* this is just a warning, as the tdb should be empty anyway */
426                 DEBUG(DEBUG_WARNING,("db(%s) unhealty in ctdb_control_pull_db: %s\n",
427                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
428         }
429
430         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
431                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
432                 return -1;
433         }
434
435         if (tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_pulldb, &params) == -1) {
436                 DEBUG(DEBUG_ERR,(__location__ " Failed to get traverse db '%s'\n", ctdb_db->db_name));
437                 ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
438                 talloc_free(params.pulldata);
439                 return -1;
440         }
441
442         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
443
444         outdata->dptr = (uint8_t *)params.pulldata;
445         outdata->dsize = params.len;
446
447         if (ctdb->tunable.db_record_count_warn != 0 && params.pulldata->count > ctdb->tunable.db_record_count_warn) {
448                 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d records\n", ctdb_db->db_name, params.pulldata->count));
449         }
450         if (ctdb->tunable.db_size_warn != 0 && outdata->dsize > ctdb->tunable.db_size_warn) {
451                 DEBUG(DEBUG_ERR,("Database %s is big. Contains %d bytes\n", ctdb_db->db_name, (int)outdata->dsize));
452         }
453
454
455         return 0;
456 }
457
458 /*
459   push a bunch of records into a ltdb, filtering by rsn
460  */
461 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
462 {
463         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
464         struct ctdb_db_context *ctdb_db;
465         int i, ret;
466         struct ctdb_rec_data *rec;
467
468         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
469                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
470                 return -1;
471         }
472
473         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
474         if (!ctdb_db) {
475                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
476                 return -1;
477         }
478
479         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
480                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_push_db when not frozen\n"));
481                 return -1;
482         }
483
484         if (ctdb_lock_all_databases_mark(ctdb, ctdb_db->priority) != 0) {
485                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
486                 return -1;
487         }
488
489         rec = (struct ctdb_rec_data *)&reply->data[0];
490
491         DEBUG(DEBUG_INFO,("starting push of %u records for dbid 0x%x\n",
492                  reply->count, reply->db_id));
493
494         for (i=0;i<reply->count;i++) {
495                 TDB_DATA key, data;
496                 struct ctdb_ltdb_header *hdr;
497
498                 key.dptr = &rec->data[0];
499                 key.dsize = rec->keylen;
500                 data.dptr = &rec->data[key.dsize];
501                 data.dsize = rec->datalen;
502
503                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
504                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
505                         goto failed;
506                 }
507                 hdr = (struct ctdb_ltdb_header *)data.dptr;
508                 /* strip off any read only record flags. All readonly records
509                    are revoked implicitely by a recovery
510                 */
511                 hdr->flags &= ~(CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE);
512
513                 data.dptr += sizeof(*hdr);
514                 data.dsize -= sizeof(*hdr);
515
516                 ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
517                 if (ret != 0) {
518                         DEBUG(DEBUG_CRIT, (__location__ " Unable to store record\n"));
519                         goto failed;
520                 }
521
522                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
523         }           
524
525         DEBUG(DEBUG_DEBUG,("finished push of %u records for dbid 0x%x\n",
526                  reply->count, reply->db_id));
527
528         if (ctdb_db->readonly) {
529                 DEBUG(DEBUG_CRIT,("Clearing the tracking database for dbid 0x%x\n",
530                                   ctdb_db->db_id));
531                 if (tdb_wipe_all(ctdb_db->rottdb) != 0) {
532                         DEBUG(DEBUG_ERR,("Failed to wipe tracking database for 0x%x. Dropping read-only delegation support\n", ctdb_db->db_id));
533                         ctdb_db->readonly = false;
534                         tdb_close(ctdb_db->rottdb);
535                         ctdb_db->rottdb = NULL;
536                         ctdb_db->readonly = false;
537                 }
538                 while (ctdb_db->revokechild_active != NULL) {
539                         talloc_free(ctdb_db->revokechild_active);
540                 }
541         }
542
543         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
544         return 0;
545
546 failed:
547         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
548         return -1;
549 }
550
551
552 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
553 {
554         uint32_t *dmaster = (uint32_t *)p;
555         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
556         int ret;
557
558         /* skip if already correct */
559         if (header->dmaster == *dmaster) {
560                 return 0;
561         }
562
563         header->dmaster = *dmaster;
564
565         ret = tdb_store(tdb, key, data, TDB_REPLACE);
566         if (ret) {
567                 DEBUG(DEBUG_CRIT,(__location__ " failed to write tdb data back  ret:%d\n",ret));
568                 return ret;
569         }
570
571         /* TODO: add error checking here */
572
573         return 0;
574 }
575
576 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
577 {
578         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
579         struct ctdb_db_context *ctdb_db;
580
581         ctdb_db = find_ctdb_db(ctdb, p->db_id);
582         if (!ctdb_db) {
583                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", p->db_id));
584                 return -1;
585         }
586
587         if (ctdb->freeze_mode[ctdb_db->priority] != CTDB_FREEZE_FROZEN) {
588                 DEBUG(DEBUG_DEBUG,("rejecting ctdb_control_set_dmaster when not frozen\n"));
589                 return -1;
590         }
591
592         if (ctdb_lock_all_databases_mark(ctdb,  ctdb_db->priority) != 0) {
593                 DEBUG(DEBUG_ERR,(__location__ " Failed to get lock on entired db - failing\n"));
594                 return -1;
595         }
596
597         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
598
599         ctdb_lock_all_databases_unmark(ctdb, ctdb_db->priority);
600         
601         return 0;
602 }
603
604 struct ctdb_set_recmode_state {
605         struct ctdb_context *ctdb;
606         struct ctdb_req_control *c;
607         uint32_t recmode;
608         int fd[2];
609         struct timed_event *te;
610         struct fd_event *fde;
611         pid_t child;
612         struct timeval start_time;
613 };
614
615 /*
616   called if our set_recmode child times out. this would happen if
617   ctdb_recovery_lock() would block.
618  */
619 static void ctdb_set_recmode_timeout(struct event_context *ev, struct timed_event *te, 
620                                          struct timeval t, void *private_data)
621 {
622         struct ctdb_set_recmode_state *state = talloc_get_type(private_data, 
623                                            struct ctdb_set_recmode_state);
624
625         /* we consider this a success, not a failure, as we failed to
626            set the recovery lock which is what we wanted.  This can be
627            caused by the cluster filesystem being very slow to
628            arbitrate locks immediately after a node failure.       
629          */
630         DEBUG(DEBUG_ERR,(__location__ " set_recmode child process hung/timedout CFS slow to grant locks? (allowing recmode set anyway)\n"));
631         state->ctdb->recovery_mode = state->recmode;
632         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
633         talloc_free(state);
634 }
635
636
637 /* when we free the recmode state we must kill any child process.
638 */
639 static int set_recmode_destructor(struct ctdb_set_recmode_state *state)
640 {
641         double l = timeval_elapsed(&state->start_time);
642
643         CTDB_UPDATE_RECLOCK_LATENCY(state->ctdb, "daemon reclock", reclock.ctdbd, l);
644
645         if (state->fd[0] != -1) {
646                 state->fd[0] = -1;
647         }
648         if (state->fd[1] != -1) {
649                 state->fd[1] = -1;
650         }
651         ctdb_kill(state->ctdb, state->child, SIGKILL);
652         return 0;
653 }
654
655 /* this is called when the client process has completed ctdb_recovery_lock()
656    and has written data back to us through the pipe.
657 */
658 static void set_recmode_handler(struct event_context *ev, struct fd_event *fde, 
659                              uint16_t flags, void *private_data)
660 {
661         struct ctdb_set_recmode_state *state= talloc_get_type(private_data, 
662                                              struct ctdb_set_recmode_state);
663         char c = 0;
664         int ret;
665
666         /* we got a response from our child process so we can abort the
667            timeout.
668         */
669         talloc_free(state->te);
670         state->te = NULL;
671
672
673         /* read the childs status when trying to lock the reclock file.
674            child wrote 0 if everything is fine and 1 if it did manage
675            to lock the file, which would be a problem since that means
676            we got a request to exit from recovery but we could still lock
677            the file   which at this time SHOULD be locked by the recovery
678            daemon on the recmaster
679         */              
680         ret = read(state->fd[0], &c, 1);
681         if (ret != 1 || c != 0) {
682                 ctdb_request_control_reply(state->ctdb, state->c, NULL, -1, "managed to lock reclock file from inside daemon");
683                 talloc_free(state);
684                 return;
685         }
686
687         state->ctdb->recovery_mode = state->recmode;
688
689         /* release any deferred attach calls from clients */
690         if (state->recmode == CTDB_RECOVERY_NORMAL) {
691                 ctdb_process_deferred_attach(state->ctdb);
692         }
693
694         ctdb_request_control_reply(state->ctdb, state->c, NULL, 0, NULL);
695         talloc_free(state);
696         return;
697 }
698
699 static void
700 ctdb_drop_all_ips_event(struct event_context *ev, struct timed_event *te, 
701                                struct timeval t, void *private_data)
702 {
703         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
704
705         DEBUG(DEBUG_ERR,(__location__ " Been in recovery mode for too long. Dropping all IPS\n"));
706         talloc_free(ctdb->release_ips_ctx);
707         ctdb->release_ips_ctx = NULL;
708
709         ctdb_release_all_ips(ctdb);
710 }
711
712 /*
713  * Set up an event to drop all public ips if we remain in recovery for too
714  * long
715  */
716 int ctdb_deferred_drop_all_ips(struct ctdb_context *ctdb)
717 {
718         if (ctdb->release_ips_ctx != NULL) {
719                 talloc_free(ctdb->release_ips_ctx);
720         }
721         ctdb->release_ips_ctx = talloc_new(ctdb);
722         CTDB_NO_MEMORY(ctdb, ctdb->release_ips_ctx);
723
724         event_add_timed(ctdb->ev, ctdb->release_ips_ctx, timeval_current_ofs(ctdb->tunable.recovery_drop_all_ips, 0), ctdb_drop_all_ips_event, ctdb);
725         return 0;
726 }
727
728 /*
729   set the recovery mode
730  */
731 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
732                                  struct ctdb_req_control *c,
733                                  TDB_DATA indata, bool *async_reply,
734                                  const char **errormsg)
735 {
736         uint32_t recmode = *(uint32_t *)indata.dptr;
737         int i, ret;
738         struct ctdb_set_recmode_state *state;
739         pid_t parent = getpid();
740
741         /* if we enter recovery but stay in recovery for too long
742            we will eventually drop all our ip addresses
743         */
744         if (recmode == CTDB_RECOVERY_NORMAL) {
745                 talloc_free(ctdb->release_ips_ctx);
746                 ctdb->release_ips_ctx = NULL;
747         } else {
748                 if (ctdb_deferred_drop_all_ips(ctdb) != 0) {
749                         DEBUG(DEBUG_ERR,("Failed to set up deferred drop all ips\n"));
750                 }
751         }
752
753         if (recmode != ctdb->recovery_mode) {
754                 DEBUG(DEBUG_NOTICE,(__location__ " Recovery mode set to %s\n", 
755                          recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
756         }
757
758         if (recmode != CTDB_RECOVERY_NORMAL ||
759             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
760                 ctdb->recovery_mode = recmode;
761                 return 0;
762         }
763
764         /* some special handling when ending recovery mode */
765
766         /* force the databases to thaw */
767         for (i=1; i<=NUM_DB_PRIORITIES; i++) {
768                 if (ctdb->freeze_handles[i] != NULL) {
769                         ctdb_control_thaw(ctdb, i);
770                 }
771         }
772
773         state = talloc(ctdb, struct ctdb_set_recmode_state);
774         CTDB_NO_MEMORY(ctdb, state);
775
776         state->start_time = timeval_current();
777         state->fd[0] = -1;
778         state->fd[1] = -1;
779
780         /* release any deferred attach calls from clients */
781         if (recmode == CTDB_RECOVERY_NORMAL) {
782                 ctdb_process_deferred_attach(ctdb);
783         }
784
785         if (ctdb->tunable.verify_recovery_lock == 0) {
786                 /* dont need to verify the reclock file */
787                 ctdb->recovery_mode = recmode;
788                 return 0;
789         }
790
791         /* For the rest of what needs to be done, we need to do this in
792            a child process since 
793            1, the call to ctdb_recovery_lock() can block if the cluster
794               filesystem is in the process of recovery.
795         */
796         ret = pipe(state->fd);
797         if (ret != 0) {
798                 talloc_free(state);
799                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for set_recmode child\n"));
800                 return -1;
801         }
802
803         state->child = ctdb_fork(ctdb);
804         if (state->child == (pid_t)-1) {
805                 close(state->fd[0]);
806                 close(state->fd[1]);
807                 talloc_free(state);
808                 return -1;
809         }
810
811         if (state->child == 0) {
812                 char cc = 0;
813                 close(state->fd[0]);
814
815                 debug_extra = talloc_asprintf(NULL, "set_recmode:");
816                 /* we should not be able to get the lock on the reclock file, 
817                   as it should  be held by the recovery master 
818                 */
819                 if (ctdb_recovery_lock(ctdb, false)) {
820                         DEBUG(DEBUG_CRIT,("ERROR: recovery lock file %s not locked when recovering!\n", ctdb->recovery_lock_file));
821                         cc = 1;
822                 }
823
824                 write(state->fd[1], &cc, 1);
825                 /* make sure we die when our parent dies */
826                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
827                         sleep(5);
828                         write(state->fd[1], &cc, 1);
829                 }
830                 _exit(0);
831         }
832         close(state->fd[1]);
833         set_close_on_exec(state->fd[0]);
834
835         state->fd[1] = -1;
836
837         talloc_set_destructor(state, set_recmode_destructor);
838
839         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
840
841         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
842                                     ctdb_set_recmode_timeout, state);
843
844         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
845                                 EVENT_FD_READ,
846                                 set_recmode_handler,
847                                 (void *)state);
848
849         if (state->fde == NULL) {
850                 talloc_free(state);
851                 return -1;
852         }
853         tevent_fd_set_auto_close(state->fde);
854
855         state->ctdb    = ctdb;
856         state->recmode = recmode;
857         state->c       = talloc_steal(state, c);
858
859         *async_reply = true;
860
861         return 0;
862 }
863
864
865 /*
866   try and get the recovery lock in shared storage - should only work
867   on the recovery master recovery daemon. Anywhere else is a bug
868  */
869 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
870 {
871         struct flock lock;
872
873         if (keep) {
874                 DEBUG(DEBUG_ERR, ("Take the recovery lock\n"));
875         }
876         if (ctdb->recovery_lock_fd != -1) {
877                 close(ctdb->recovery_lock_fd);
878                 ctdb->recovery_lock_fd = -1;
879         }
880
881         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
882         if (ctdb->recovery_lock_fd == -1) {
883                 DEBUG(DEBUG_ERR,("ctdb_recovery_lock: Unable to open %s - (%s)\n", 
884                          ctdb->recovery_lock_file, strerror(errno)));
885                 return false;
886         }
887
888         set_close_on_exec(ctdb->recovery_lock_fd);
889
890         lock.l_type = F_WRLCK;
891         lock.l_whence = SEEK_SET;
892         lock.l_start = 0;
893         lock.l_len = 1;
894         lock.l_pid = 0;
895
896         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
897                 close(ctdb->recovery_lock_fd);
898                 ctdb->recovery_lock_fd = -1;
899                 if (keep) {
900                         DEBUG(DEBUG_CRIT,("ctdb_recovery_lock: Failed to get recovery lock on '%s'\n", ctdb->recovery_lock_file));
901                 }
902                 return false;
903         }
904
905         if (!keep) {
906                 close(ctdb->recovery_lock_fd);
907                 ctdb->recovery_lock_fd = -1;
908         }
909
910         if (keep) {
911                 DEBUG(DEBUG_NOTICE, ("Recovery lock taken successfully\n"));
912         }
913
914         DEBUG(DEBUG_NOTICE,("ctdb_recovery_lock: Got recovery lock on '%s'\n", ctdb->recovery_lock_file));
915
916         return true;
917 }
918
919 /*
920   delete a record as part of the vacuum process
921   only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
922   use non-blocking locks
923
924   return 0 if the record was successfully deleted (i.e. it does not exist
925   when the function returns)
926   or !0 is the record still exists in the tdb after returning.
927  */
928 static int delete_tdb_record(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db, struct ctdb_rec_data *rec)
929 {
930         TDB_DATA key, data;
931         struct ctdb_ltdb_header *hdr, *hdr2;
932         
933         /* these are really internal tdb functions - but we need them here for
934            non-blocking lock of the freelist */
935         int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
936         int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
937
938
939         key.dsize = rec->keylen;
940         key.dptr  = &rec->data[0];
941         data.dsize = rec->datalen;
942         data.dptr = &rec->data[rec->keylen];
943
944         if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
945                 DEBUG(DEBUG_INFO,(__location__ " Called delete on record where we are lmaster\n"));
946                 return -1;
947         }
948
949         if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
950                 DEBUG(DEBUG_ERR,(__location__ " Bad record size\n"));
951                 return -1;
952         }
953
954         hdr = (struct ctdb_ltdb_header *)data.dptr;
955
956         /* use a non-blocking lock */
957         if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
958                 return -1;
959         }
960
961         data = tdb_fetch(ctdb_db->ltdb->tdb, key);
962         if (data.dptr == NULL) {
963                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
964                 return 0;
965         }
966
967         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
968                 if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
969                         tdb_delete(ctdb_db->ltdb->tdb, key);
970                         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
971                         DEBUG(DEBUG_CRIT,(__location__ " Deleted corrupt record\n"));
972                 }
973                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
974                 free(data.dptr);
975                 return 0;
976         }
977         
978         hdr2 = (struct ctdb_ltdb_header *)data.dptr;
979
980         if (hdr2->rsn > hdr->rsn) {
981                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
982                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
983                          (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
984                 free(data.dptr);
985                 return -1;              
986         }
987
988         /* do not allow deleting record that have readonly flags set. */
989         if (hdr->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
990                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
991                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
992                 free(data.dptr);
993                 return -1;              
994         }
995         if (hdr2->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
996                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
997                 DEBUG(DEBUG_INFO,(__location__ " Skipping record with readonly flags set\n"));
998                 free(data.dptr);
999                 return -1;              
1000         }
1001
1002         if (hdr2->dmaster == ctdb->pnn) {
1003                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1004                 DEBUG(DEBUG_INFO,(__location__ " Attempted delete record where we are the dmaster\n"));
1005                 free(data.dptr);
1006                 return -1;                              
1007         }
1008
1009         if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
1010                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1011                 free(data.dptr);
1012                 return -1;                              
1013         }
1014
1015         if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
1016                 tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1017                 tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1018                 DEBUG(DEBUG_INFO,(__location__ " Failed to delete record\n"));
1019                 free(data.dptr);
1020                 return -1;                                              
1021         }
1022
1023         tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
1024         tdb_chainunlock(ctdb_db->ltdb->tdb, key);
1025         free(data.dptr);
1026         return 0;       
1027 }
1028
1029
1030
1031 struct recovery_callback_state {
1032         struct ctdb_req_control *c;
1033 };
1034
1035
1036 /*
1037   called when the 'recovered' event script has finished
1038  */
1039 static void ctdb_end_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1040 {
1041         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1042
1043         ctdb_enable_monitoring(ctdb);
1044         CTDB_INCREMENT_STAT(ctdb, num_recoveries);
1045
1046         if (status != 0) {
1047                 DEBUG(DEBUG_ERR,(__location__ " recovered event script failed (status %d)\n", status));
1048                 if (status == -ETIME) {
1049                         ctdb_ban_self(ctdb);
1050                 }
1051         }
1052
1053         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1054         talloc_free(state);
1055
1056         gettimeofday(&ctdb->last_recovery_finished, NULL);
1057 }
1058
1059 /*
1060   recovery has finished
1061  */
1062 int32_t ctdb_control_end_recovery(struct ctdb_context *ctdb, 
1063                                 struct ctdb_req_control *c,
1064                                 bool *async_reply)
1065 {
1066         int ret;
1067         struct recovery_callback_state *state;
1068
1069         DEBUG(DEBUG_NOTICE,("Recovery has finished\n"));
1070
1071         ctdb_persistent_finish_trans3_commits(ctdb);
1072
1073         state = talloc(ctdb, struct recovery_callback_state);
1074         CTDB_NO_MEMORY(ctdb, state);
1075
1076         state->c    = c;
1077
1078         ctdb_disable_monitoring(ctdb);
1079
1080         ret = ctdb_event_script_callback(ctdb, state,
1081                                          ctdb_end_recovery_callback, 
1082                                          state, 
1083                                          false,
1084                                          CTDB_EVENT_RECOVERED, "%s", "");
1085
1086         if (ret != 0) {
1087                 ctdb_enable_monitoring(ctdb);
1088
1089                 DEBUG(DEBUG_ERR,(__location__ " Failed to end recovery\n"));
1090                 talloc_free(state);
1091                 return -1;
1092         }
1093
1094         /* tell the control that we will be reply asynchronously */
1095         state->c    = talloc_steal(state, c);
1096         *async_reply = true;
1097         return 0;
1098 }
1099
1100 /*
1101   called when the 'startrecovery' event script has finished
1102  */
1103 static void ctdb_start_recovery_callback(struct ctdb_context *ctdb, int status, void *p)
1104 {
1105         struct recovery_callback_state *state = talloc_get_type(p, struct recovery_callback_state);
1106
1107         if (status != 0) {
1108                 DEBUG(DEBUG_ERR,(__location__ " startrecovery event script failed (status %d)\n", status));
1109         }
1110
1111         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1112         talloc_free(state);
1113 }
1114
1115 /*
1116   run the startrecovery eventscript
1117  */
1118 int32_t ctdb_control_start_recovery(struct ctdb_context *ctdb, 
1119                                 struct ctdb_req_control *c,
1120                                 bool *async_reply)
1121 {
1122         int ret;
1123         struct recovery_callback_state *state;
1124
1125         DEBUG(DEBUG_NOTICE,(__location__ " startrecovery eventscript has been invoked\n"));
1126         gettimeofday(&ctdb->last_recovery_started, NULL);
1127
1128         state = talloc(ctdb, struct recovery_callback_state);
1129         CTDB_NO_MEMORY(ctdb, state);
1130
1131         state->c    = talloc_steal(state, c);
1132
1133         ctdb_disable_monitoring(ctdb);
1134
1135         ret = ctdb_event_script_callback(ctdb, state,
1136                                          ctdb_start_recovery_callback, 
1137                                          state, false,
1138                                          CTDB_EVENT_START_RECOVERY,
1139                                          "%s", "");
1140
1141         if (ret != 0) {
1142                 DEBUG(DEBUG_ERR,(__location__ " Failed to start recovery\n"));
1143                 talloc_free(state);
1144                 return -1;
1145         }
1146
1147         /* tell the control that we will be reply asynchronously */
1148         *async_reply = true;
1149         return 0;
1150 }
1151
1152 /*
1153  try to delete all these records as part of the vacuuming process
1154  and return the records we failed to delete
1155 */
1156 int32_t ctdb_control_try_delete_records(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
1157 {
1158         struct ctdb_marshall_buffer *reply = (struct ctdb_marshall_buffer *)indata.dptr;
1159         struct ctdb_db_context *ctdb_db;
1160         int i;
1161         struct ctdb_rec_data *rec;
1162         struct ctdb_marshall_buffer *records;
1163
1164         if (indata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
1165                 DEBUG(DEBUG_ERR,(__location__ " invalid data in try_delete_records\n"));
1166                 return -1;
1167         }
1168
1169         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
1170         if (!ctdb_db) {
1171                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%08x\n", reply->db_id));
1172                 return -1;
1173         }
1174
1175
1176         DEBUG(DEBUG_DEBUG,("starting try_delete_records of %u records for dbid 0x%x\n",
1177                  reply->count, reply->db_id));
1178
1179
1180         /* create a blob to send back the records we couldnt delete */  
1181         records = (struct ctdb_marshall_buffer *)
1182                         talloc_zero_size(outdata, 
1183                                     offsetof(struct ctdb_marshall_buffer, data));
1184         if (records == NULL) {
1185                 DEBUG(DEBUG_ERR,(__location__ " Out of memory\n"));
1186                 return -1;
1187         }
1188         records->db_id = ctdb_db->db_id;
1189
1190
1191         rec = (struct ctdb_rec_data *)&reply->data[0];
1192         for (i=0;i<reply->count;i++) {
1193                 TDB_DATA key, data;
1194
1195                 key.dptr = &rec->data[0];
1196                 key.dsize = rec->keylen;
1197                 data.dptr = &rec->data[key.dsize];
1198                 data.dsize = rec->datalen;
1199
1200                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1201                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record in indata\n"));
1202                         return -1;
1203                 }
1204
1205                 /* If we cant delete the record we must add it to the reply
1206                    so the lmaster knows it may not purge this record
1207                 */
1208                 if (delete_tdb_record(ctdb, ctdb_db, rec) != 0) {
1209                         size_t old_size;
1210                         struct ctdb_ltdb_header *hdr;
1211
1212                         hdr = (struct ctdb_ltdb_header *)data.dptr;
1213                         data.dptr += sizeof(*hdr);
1214                         data.dsize -= sizeof(*hdr);
1215
1216                         DEBUG(DEBUG_INFO, (__location__ " Failed to vacuum delete record with hash 0x%08x\n", ctdb_hash(&key)));
1217
1218                         old_size = talloc_get_size(records);
1219                         records = talloc_realloc_size(outdata, records, old_size + rec->length);
1220                         if (records == NULL) {
1221                                 DEBUG(DEBUG_ERR,(__location__ " Failed to expand\n"));
1222                                 return -1;
1223                         }
1224                         records->count++;
1225                         memcpy(old_size+(uint8_t *)records, rec, rec->length);
1226                 } 
1227
1228                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
1229         }           
1230
1231
1232         outdata->dptr = (uint8_t *)records;
1233         outdata->dsize = talloc_get_size(records);
1234
1235         return 0;
1236 }
1237
1238 /*
1239   report capabilities
1240  */
1241 int32_t ctdb_control_get_capabilities(struct ctdb_context *ctdb, TDB_DATA *outdata)
1242 {
1243         uint32_t *capabilities = NULL;
1244
1245         capabilities = talloc(outdata, uint32_t);
1246         CTDB_NO_MEMORY(ctdb, capabilities);
1247         *capabilities = ctdb->capabilities;
1248
1249         outdata->dsize = sizeof(uint32_t);
1250         outdata->dptr = (uint8_t *)capabilities;
1251
1252         return 0;       
1253 }
1254
1255 static void ctdb_recd_ping_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1256 {
1257         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
1258         uint32_t *count = talloc_get_type(ctdb->recd_ping_count, uint32_t);
1259
1260         DEBUG(DEBUG_ERR, ("Recovery daemon ping timeout. Count : %u\n", *count));
1261
1262         if (*count < ctdb->tunable.recd_ping_failcount) {
1263                 (*count)++;
1264                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1265                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1266                         ctdb_recd_ping_timeout, ctdb);
1267                 return;
1268         }
1269
1270         DEBUG(DEBUG_ERR, ("Final timeout for recovery daemon ping. Restarting recovery daemon. (This can be caused if the cluster filesystem has hung)\n"));
1271
1272         ctdb_stop_recoverd(ctdb);
1273         ctdb_start_recoverd(ctdb);
1274 }
1275
1276 /* The recovery daemon will ping us at regular intervals.
1277    If we havent been pinged for a while we assume the recovery
1278    daemon is inoperable and we shut down.
1279 */
1280 int32_t ctdb_control_recd_ping(struct ctdb_context *ctdb)
1281 {
1282         talloc_free(ctdb->recd_ping_count);
1283
1284         ctdb->recd_ping_count = talloc_zero(ctdb, uint32_t);
1285         CTDB_NO_MEMORY(ctdb, ctdb->recd_ping_count);
1286
1287         if (ctdb->tunable.recd_ping_timeout != 0) {
1288                 event_add_timed(ctdb->ev, ctdb->recd_ping_count, 
1289                         timeval_current_ofs(ctdb->tunable.recd_ping_timeout, 0),
1290                         ctdb_recd_ping_timeout, ctdb);
1291         }
1292
1293         return 0;
1294 }
1295
1296
1297
1298 int32_t ctdb_control_set_recmaster(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata)
1299 {
1300         CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
1301
1302         ctdb->recovery_master = ((uint32_t *)(&indata.dptr[0]))[0];
1303         return 0;
1304 }
1305
1306
1307 struct stop_node_callback_state {
1308         struct ctdb_req_control *c;
1309 };
1310
1311 /*
1312   called when the 'stopped' event script has finished
1313  */
1314 static void ctdb_stop_node_callback(struct ctdb_context *ctdb, int status, void *p)
1315 {
1316         struct stop_node_callback_state *state = talloc_get_type(p, struct stop_node_callback_state);
1317
1318         if (status != 0) {
1319                 DEBUG(DEBUG_ERR,(__location__ " stopped event script failed (status %d)\n", status));
1320                 ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1321                 if (status == -ETIME) {
1322                         ctdb_ban_self(ctdb);
1323                 }
1324         }
1325
1326         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
1327         talloc_free(state);
1328 }
1329
1330 int32_t ctdb_control_stop_node(struct ctdb_context *ctdb, struct ctdb_req_control *c, bool *async_reply)
1331 {
1332         int ret;
1333         struct stop_node_callback_state *state;
1334
1335         DEBUG(DEBUG_INFO,(__location__ " Stopping node\n"));
1336
1337         state = talloc(ctdb, struct stop_node_callback_state);
1338         CTDB_NO_MEMORY(ctdb, state);
1339
1340         state->c    = talloc_steal(state, c);
1341
1342         ctdb_disable_monitoring(ctdb);
1343
1344         ret = ctdb_event_script_callback(ctdb, state,
1345                                          ctdb_stop_node_callback, 
1346                                          state, false,
1347                                          CTDB_EVENT_STOPPED, "%s", "");
1348
1349         if (ret != 0) {
1350                 ctdb_enable_monitoring(ctdb);
1351
1352                 DEBUG(DEBUG_ERR,(__location__ " Failed to stop node\n"));
1353                 talloc_free(state);
1354                 return -1;
1355         }
1356
1357         ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1358
1359         *async_reply = true;
1360
1361         return 0;
1362 }
1363
1364 int32_t ctdb_control_continue_node(struct ctdb_context *ctdb)
1365 {
1366         DEBUG(DEBUG_INFO,(__location__ " Continue node\n"));
1367         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_STOPPED;
1368
1369         return 0;
1370 }
1371