clean out some more cruft
[sahlberg/ctdb.git] / common / ctdb_recover.c
1 /* 
2    ctdb recovery code
3
4    Copyright (C) Andrew Tridgell  2007
5    Copyright (C) Ronnie Sahlberg  2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 2 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program; if not, write to the Free Software
19    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 */
21 #include "includes.h"
22 #include "lib/events/events.h"
23 #include "lib/tdb/include/tdb.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_private.h"
28 #include "lib/util/dlinklist.h"
29 #include "db_wrap.h"
30
31 /*
32   lock all databases - mark only
33  */
34 static int ctdb_lock_all_databases_mark(struct ctdb_context *ctdb)
35 {
36         struct ctdb_db_context *ctdb_db;
37         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
38                 DEBUG(0,("Attempt to mark all databases locked when not frozen\n"));
39                 return -1;
40         }
41         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
42                 if (tdb_lockall_mark(ctdb_db->ltdb->tdb) != 0) {
43                         return -1;
44                 }
45         }
46         return 0;
47 }
48
49 /*
50   lock all databases - unmark only
51  */
52 static int ctdb_lock_all_databases_unmark(struct ctdb_context *ctdb)
53 {
54         struct ctdb_db_context *ctdb_db;
55         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
56                 DEBUG(0,("Attempt to unmark all databases locked when not frozen\n"));
57                 return -1;
58         }
59         for (ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next) {
60                 if (tdb_lockall_unmark(ctdb_db->ltdb->tdb) != 0) {
61                         return -1;
62                 }
63         }
64         return 0;
65 }
66
67
68 int 
69 ctdb_control_getvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
70 {
71         CHECK_CONTROL_DATA_SIZE(0);
72         struct ctdb_vnn_map_wire *map;
73         size_t len;
74
75         len = offsetof(struct ctdb_vnn_map_wire, map) + sizeof(uint32_t)*ctdb->vnn_map->size;
76         map = talloc_size(outdata, len);
77         CTDB_NO_MEMORY_VOID(ctdb, map);
78
79         map->generation = ctdb->vnn_map->generation;
80         map->size = ctdb->vnn_map->size;
81         memcpy(map->map, ctdb->vnn_map->map, sizeof(uint32_t)*map->size);
82
83         outdata->dsize = len;
84         outdata->dptr  = (uint8_t *)map;
85
86         return 0;
87 }
88
89 int 
90 ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
91 {
92         struct ctdb_vnn_map_wire *map = (struct ctdb_vnn_map_wire *)indata.dptr;
93
94         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
95                 DEBUG(0,("Attempt to set vnnmap when not frozen\n"));
96                 return -1;
97         }
98
99         talloc_free(ctdb->vnn_map);
100
101         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
102         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map);
103
104         ctdb->vnn_map->generation = map->generation;
105         ctdb->vnn_map->size       = map->size;
106         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, map->size);
107         CTDB_NO_MEMORY(ctdb, ctdb->vnn_map->map);
108
109         memcpy(ctdb->vnn_map->map, map->map, sizeof(uint32_t)*map->size);
110
111         return 0;
112 }
113
114 int 
115 ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
116 {
117         uint32_t i, len;
118         struct ctdb_db_context *ctdb_db;
119         struct ctdb_dbid_map *dbid_map;
120
121         CHECK_CONTROL_DATA_SIZE(0);
122
123         len = 0;
124         for(ctdb_db=ctdb->db_list;ctdb_db;ctdb_db=ctdb_db->next){
125                 len++;
126         }
127
128
129         outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len;
130         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
131         if (!outdata->dptr) {
132                 DEBUG(0, (__location__ " Failed to allocate dbmap array\n"));
133                 exit(1);
134         }
135
136         dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
137         dbid_map->num = len;
138         for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
139                 dbid_map->dbids[i] = ctdb_db->db_id;
140         }
141
142         return 0;
143 }
144
145 int 
146 ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
147 {
148         uint32_t i, num_nodes;
149         struct ctdb_node_map *node_map;
150
151         CHECK_CONTROL_DATA_SIZE(0);
152
153         num_nodes = ctdb->num_nodes;
154
155         outdata->dsize = offsetof(struct ctdb_node_map, nodes) + num_nodes*sizeof(struct ctdb_node_and_flags);
156         outdata->dptr  = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
157         if (!outdata->dptr) {
158                 DEBUG(0, (__location__ " Failed to allocate nodemap array\n"));
159                 exit(1);
160         }
161
162         node_map = (struct ctdb_node_map *)outdata->dptr;
163         node_map->num = num_nodes;
164         for (i=0; i<num_nodes; i++) {
165                 inet_aton(ctdb->nodes[i]->address.address, &node_map->nodes[i].sin.sin_addr);
166                 node_map->nodes[i].vnn   = ctdb->nodes[i]->vnn;
167                 node_map->nodes[i].flags = ctdb->nodes[i]->flags;
168         }
169
170         return 0;
171 }
172
173 struct getkeys_params {
174         struct ctdb_context *ctdb;
175         uint32_t lmaster;
176         uint32_t rec_count;
177         struct getkeys_rec {
178                 TDB_DATA key;
179                 TDB_DATA data;
180         } *recs;
181 };
182
183 static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
184 {
185         struct getkeys_params *params = (struct getkeys_params *)p;
186         uint32_t lmaster;
187
188         lmaster = ctdb_lmaster(params->ctdb, &key);
189
190         /* only include this record if the lmaster matches or if
191            the wildcard lmaster (-1) was specified.
192         */
193         if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) {
194                 return 0;
195         }
196
197         params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1);
198         key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize);
199         data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize);
200         params->recs[params->rec_count].key = key;
201         params->recs[params->rec_count].data = data;
202         params->rec_count++;
203
204         return 0;
205 }
206
207 /*
208   pul a bunch of records from a ltdb, filtering by lmaster
209  */
210 int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
211 {
212         struct ctdb_control_pulldb *pull;
213         struct ctdb_db_context *ctdb_db;
214         struct getkeys_params params;
215         struct ctdb_control_pulldb_reply *reply;
216         int i;
217         size_t len = 0;
218
219         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
220                 DEBUG(0,("rejecting ctdb_control_pull_db when not frozen\n"));
221                 return -1;
222         }
223
224         pull = (struct ctdb_control_pulldb *)indata.dptr;
225         
226         ctdb_db = find_ctdb_db(ctdb, pull->db_id);
227         if (!ctdb_db) {
228                 DEBUG(0,(__location__ " Unknown db\n"));
229                 return -1;
230         }
231
232         params.ctdb = ctdb;
233         params.lmaster = pull->lmaster;
234
235         params.rec_count = 0;
236         params.recs = talloc_array(outdata, struct getkeys_rec, 0);
237         CTDB_NO_MEMORY(ctdb, params.recs);
238
239         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
240                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
241                 return -1;
242         }
243
244         tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
245
246         ctdb_lock_all_databases_unmark(ctdb);
247
248         reply = talloc(outdata, struct ctdb_control_pulldb_reply);
249         CTDB_NO_MEMORY(ctdb, reply);
250
251         reply->db_id = pull->db_id;
252         reply->count = params.rec_count;
253
254         len = offsetof(struct ctdb_control_pulldb_reply, data);
255
256         for (i=0;i<reply->count;i++) {
257                 struct ctdb_rec_data *rec;
258                 rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
259                 reply = talloc_realloc_size(outdata, reply, rec->length + len);
260                 memcpy(len+(uint8_t *)reply, rec, rec->length);
261                 len += rec->length;
262                 talloc_free(rec);
263         }
264
265         talloc_free(params.recs);
266
267         outdata->dptr = (uint8_t *)reply;
268         outdata->dsize = len;
269
270         return 0;
271 }
272
273 /*
274   push a bunch of records into a ltdb, filtering by rsn
275  */
276 int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
277 {
278         struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
279         struct ctdb_db_context *ctdb_db;
280         int i, ret;
281         struct ctdb_rec_data *rec;
282
283         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
284                 DEBUG(0,("rejecting ctdb_control_push_db when not frozen\n"));
285                 return -1;
286         }
287
288         if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
289                 DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
290                 return -1;
291         }
292
293         ctdb_db = find_ctdb_db(ctdb, reply->db_id);
294         if (!ctdb_db) {
295                 DEBUG(0,(__location__ " Unknown db 0x%08x\n", reply->db_id));
296                 return -1;
297         }
298
299         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
300                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
301                 return -1;
302         }
303
304         rec = (struct ctdb_rec_data *)&reply->data[0];
305
306         for (i=0;i<reply->count;i++) {
307                 TDB_DATA key, data;
308                 struct ctdb_ltdb_header *hdr, header;
309
310                 key.dptr = &rec->data[0];
311                 key.dsize = rec->keylen;
312                 data.dptr = &rec->data[key.dsize];
313                 data.dsize = rec->datalen;
314
315                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
316                         DEBUG(0,(__location__ " bad ltdb record\n"));
317                         goto failed;
318                 }
319                 hdr = (struct ctdb_ltdb_header *)data.dptr;
320                 data.dptr += sizeof(*hdr);
321                 data.dsize -= sizeof(*hdr);
322
323                 ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
324                 if (ret != 0) {
325                         DEBUG(0, (__location__ " Unable to fetch record\n"));
326                         goto failed;
327                 }
328                 /* The check for dmaster gives priority to the dmaster
329                    if the rsn values are equal */
330                 if (header.rsn < hdr->rsn ||
331                     (header.dmaster != ctdb->vnn && header.rsn == hdr->rsn)) {
332                         ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
333                         if (ret != 0) {
334                                 DEBUG(0, (__location__ " Unable to store record\n"));
335                                 goto failed;
336                         }
337                 }
338
339                 rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
340         }           
341
342         ctdb_lock_all_databases_unmark(ctdb);
343         return 0;
344
345 failed:
346         ctdb_lock_all_databases_unmark(ctdb);
347         return -1;
348 }
349
350
351 static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
352 {
353         uint32_t *dmaster = (uint32_t *)p;
354         struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
355         int ret;
356
357         header->dmaster = *dmaster;
358
359         ret = tdb_store(tdb, key, data, TDB_REPLACE);
360         if (ret) {
361                 DEBUG(0,(__location__ " failed to write tdb data back  ret:%d\n",ret));
362                 return ret;
363         }
364         return 0;
365 }
366
367 int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
368 {
369         struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
370         struct ctdb_db_context *ctdb_db;
371
372         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
373                 DEBUG(0,("rejecting ctdb_control_set_dmaster when not frozen\n"));
374                 return -1;
375         }
376
377         ctdb_db = find_ctdb_db(ctdb, p->db_id);
378         if (!ctdb_db) {
379                 DEBUG(0,(__location__ " Unknown db 0x%08x\n", p->db_id));
380                 return -1;
381         }
382
383         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
384                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
385                 return -1;
386         }
387
388         tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
389
390         ctdb_lock_all_databases_unmark(ctdb);
391         
392         return 0;
393 }
394
395 struct ctdb_set_recmode_state {
396         struct ctdb_req_control *c;
397         uint32_t recmode;
398 };
399
400 /*
401   called when the 'recovered' event script has finished
402  */
403 static void ctdb_recovered_callback(struct ctdb_context *ctdb, int status, void *p)
404 {
405         struct ctdb_set_recmode_state *state = talloc_get_type(p, struct ctdb_set_recmode_state);
406
407         if (status == 0) {
408                 ctdb->recovery_mode = state->recmode;
409         } else {
410                 DEBUG(0,(__location__ " recovered event script failed (status %d)\n", status));
411         }
412
413         ctdb_request_control_reply(ctdb, state->c, NULL, status, NULL);
414         talloc_free(state);
415 }
416
417 /*
418   set the recovery mode
419  */
420 int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb, 
421                                  struct ctdb_req_control *c,
422                                  TDB_DATA indata, bool *async_reply,
423                                  const char **errormsg)
424 {
425         uint32_t recmode = *(uint32_t *)indata.dptr;
426         int ret;
427         struct ctdb_set_recmode_state *state;
428
429         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
430                 DEBUG(0,("Attempt to change recovery mode to %u when not frozen\n", 
431                          recmode));
432                 (*errormsg) = "Cannot change recovery mode while not frozen";
433                 return -1;
434         }
435
436         if (recmode != CTDB_RECOVERY_NORMAL ||
437             ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
438                 ctdb->recovery_mode = recmode;
439                 return 0;
440         }
441
442         /* some special handling when ending recovery mode */
443         state = talloc(ctdb, struct ctdb_set_recmode_state);
444         CTDB_NO_MEMORY(ctdb, state);
445
446         /* we should not be able to get the lock on the nodes list, as it should be
447            held by the recovery master */
448         if (ctdb_recovery_lock(ctdb, false)) {
449                 DEBUG(0,("ERROR: recovery lock file %s not locked when recovering!\n",
450                          ctdb->recovery_lock_file));
451                 return -1;
452         }       
453
454         state->c = talloc_steal(state, c);
455         state->recmode = recmode;
456         /* call the events script to tell all subsystems that we have recovered */
457         ret = ctdb_event_script_callback(ctdb, state, 
458                                          ctdb_recovered_callback, 
459                                          state, "recovered");
460         if (ret != 0) {
461                 return ret;
462         }
463         *async_reply = true;
464
465         return 0;
466 }
467
468 /*
469   callback for ctdb_control_max_rsn
470  */
471 static int traverse_max_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
472 {
473         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
474         uint64_t *max_rsn = (uint64_t *)p;
475
476         if (data.dsize >= sizeof(*h)) {
477                 (*max_rsn) = MAX(*max_rsn, h->rsn);
478         }
479         return 0;
480 }
481
482 /*
483   get max rsn across an entire db
484  */
485 int32_t ctdb_control_max_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
486 {
487         struct ctdb_db_context *ctdb_db;
488         uint32_t db_id = *(uint32_t *)indata.dptr;
489         uint64_t max_rsn = 0;
490         int ret;
491
492         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
493                 DEBUG(0,("rejecting ctdb_control_max_rsn when not frozen\n"));
494                 return -1;
495         }
496
497         ctdb_db = find_ctdb_db(ctdb, db_id);
498         if (!ctdb_db) {
499                 DEBUG(0,(__location__ " Unknown db\n"));
500                 return -1;
501         }
502
503         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
504                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
505                 return -1;
506         }
507
508         ret = tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_max_rsn, &max_rsn);
509         if (ret < 0) {
510                 DEBUG(0,(__location__ " traverse failed in ctdb_control_max_rsn\n"));
511                 return -1;
512         }
513
514         ctdb_lock_all_databases_unmark(ctdb);
515
516         outdata->dptr = (uint8_t *)talloc(outdata, uint64_t);
517         if (!outdata->dptr) {
518                 return -1;
519         }
520         (*(uint64_t *)outdata->dptr) = max_rsn;
521         outdata->dsize = sizeof(uint64_t);
522
523         return 0;
524 }
525
526
527 /*
528   callback for ctdb_control_set_rsn_nonempty
529  */
530 static int traverse_set_rsn_nonempty(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
531 {
532         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
533         uint64_t *rsn = (uint64_t *)p;
534
535         if (data.dsize > sizeof(*h)) {
536                 h->rsn = *rsn;
537                 if (tdb_store(tdb, key, data, TDB_REPLACE) != 0) {
538                         return -1;
539                 }
540         }
541         return 0;
542 }
543
544 /*
545   set rsn for all non-empty records in a database to a given rsn
546  */
547 int32_t ctdb_control_set_rsn_nonempty(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
548 {
549         struct ctdb_control_set_rsn_nonempty *p = (struct ctdb_control_set_rsn_nonempty *)indata.dptr;
550         struct ctdb_db_context *ctdb_db;
551         int ret;
552
553         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
554                 DEBUG(0,("rejecting ctdb_control_set_rsn_nonempty when not frozen\n"));
555                 return -1;
556         }
557
558         ctdb_db = find_ctdb_db(ctdb, p->db_id);
559         if (!ctdb_db) {
560                 DEBUG(0,(__location__ " Unknown db\n"));
561                 return -1;
562         }
563
564         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
565                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
566                 return -1;
567         }
568
569         ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_set_rsn_nonempty, &p->rsn);
570         if (ret < 0) {
571                 DEBUG(0,(__location__ " traverse failed in ctdb_control_set_rsn_nonempty\n"));
572                 return -1;
573         }
574
575         ctdb_lock_all_databases_unmark(ctdb);
576
577         return 0;
578 }
579
580
581 /*
582   callback for ctdb_control_delete_low_rsn
583  */
584 static int traverse_delete_low_rsn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
585 {
586         struct ctdb_ltdb_header *h = (struct ctdb_ltdb_header *)data.dptr;
587         uint64_t *rsn = (uint64_t *)p;
588
589         if (data.dsize < sizeof(*h) || h->rsn < *rsn) {
590                 if (tdb_delete(tdb, key) != 0) {
591                         return -1;
592                 }
593         }
594         return 0;
595 }
596
597 /*
598   delete any records with a rsn < the given rsn
599  */
600 int32_t ctdb_control_delete_low_rsn(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
601 {
602         struct ctdb_control_delete_low_rsn *p = (struct ctdb_control_delete_low_rsn *)indata.dptr;
603         struct ctdb_db_context *ctdb_db;
604         int ret;
605
606         if (ctdb->freeze_mode != CTDB_FREEZE_FROZEN) {
607                 DEBUG(0,("rejecting ctdb_control_delete_low_rsn when not frozen\n"));
608                 return -1;
609         }
610
611         ctdb_db = find_ctdb_db(ctdb, p->db_id);
612         if (!ctdb_db) {
613                 DEBUG(0,(__location__ " Unknown db\n"));
614                 return -1;
615         }
616
617         if (ctdb_lock_all_databases_mark(ctdb) != 0) {
618                 DEBUG(0,(__location__ " Failed to get lock on entired db - failing\n"));
619                 return -1;
620         }
621
622         ret = tdb_traverse(ctdb_db->ltdb->tdb, traverse_delete_low_rsn, &p->rsn);
623         if (ret < 0) {
624                 DEBUG(0,(__location__ " traverse failed in ctdb_control_delete_low_rsn\n"));
625                 return -1;
626         }
627
628         ctdb_lock_all_databases_unmark(ctdb);
629
630         return 0;
631 }
632
633
634 /*
635   try and get the recovery lock in shared storage - should only work
636   on the recovery master recovery daemon. Anywhere else is a bug
637  */
638 bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
639 {
640         struct flock lock;
641
642         if (ctdb->recovery_lock_fd != -1) {
643                 close(ctdb->recovery_lock_fd);
644         }
645         ctdb->recovery_lock_fd = open(ctdb->recovery_lock_file, O_RDWR|O_CREAT, 0600);
646         if (ctdb->recovery_lock_fd == -1) {
647                 DEBUG(0,("Unable to open %s - (%s)\n", 
648                          ctdb->recovery_lock_file, strerror(errno)));
649                 return false;
650         }
651
652         lock.l_type = F_WRLCK;
653         lock.l_whence = SEEK_SET;
654         lock.l_start = 0;
655         lock.l_len = 1;
656         lock.l_pid = 0;
657
658         if (fcntl(ctdb->recovery_lock_fd, F_SETLK, &lock) != 0) {
659                 return false;
660         }
661
662         if (!keep) {
663                 close(ctdb->recovery_lock_fd);
664                 ctdb->recovery_lock_fd = -1;
665         }
666
667         return true;
668 }