3c41bb1ab70c5432f6683e8c5e64e94286530ff7
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /*
36   this is the dummy null procedure that all databases support
37 */
38 static int ctdb_null_func(struct ctdb_call_info *call)
39 {
40         return 0;
41 }
42
43 /*
44   this is a plain fetch procedure that all databases support
45 */
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 {
48         call->reply_data = &call->record_data;
49         return 0;
50 }
51
52 /*
53   this is a plain fetch procedure that all databases support
54   this returns the full record including the ltdb header
55 */
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = talloc(call, TDB_DATA);
59         if (call->reply_data == NULL) {
60                 return -1;
61         }
62         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
64         if (call->reply_data->dptr == NULL) {
65                 return -1;
66         }
67         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
69
70         return 0;
71 }
72
73
74 /**
75  * write a record to a normal database
76  *
77  * This is the server-variant of the ctdb_ltdb_store function.
78  * It contains logic to determine whether a record should be
79  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80  * controls to the local ctdb daemon if apporpriate.
81  */
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
83                                   TDB_DATA key,
84                                   struct ctdb_ltdb_header *header,
85                                   TDB_DATA data)
86 {
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88         TDB_DATA rec;
89         int ret;
90         bool seqnum_suppressed = false;
91         bool keep = false;
92         bool schedule_for_deletion = false;
93         uint32_t lmaster;
94
95         if (ctdb->flags & CTDB_FLAG_TORTURE) {
96                 struct ctdb_ltdb_header *h2;
97                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
102                 }
103                 if (rec.dptr) free(rec.dptr);
104         }
105
106         if (ctdb->vnn_map == NULL) {
107                 /*
108                  * Called from a client: always store the record
109                  * Also don't call ctdb_lmaster since it uses the vnn_map!
110                  */
111                 keep = true;
112                 goto store;
113         }
114
115         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
116
117         /*
118          * If we migrate an empty record off to another node
119          * and the record has not been migrated with data,
120          * delete the record instead of storing the empty record.
121          */
122         if (data.dsize != 0) {
123                 keep = true;
124         } else if (ctdb_db->persistent) {
125                 keep = true;
126         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
127                 /*
128                  * The record is not created by the client but
129                  * automatically by the ctdb_ltdb_fetch logic that
130                  * creates a record with an initial header in the
131                  * ltdb before trying to migrate the record from
132                  * the current lmaster. Keep it instead of trying
133                  * to delete the non-existing record...
134                  */
135                 keep = true;
136                 schedule_for_deletion = true;
137         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
138                 keep = true;
139         } else if (ctdb_db->ctdb->pnn == lmaster) {
140                 /*
141                  * If we are lmaster, then we usually keep the record.
142                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143                  * and the record is empty and has never been migrated
144                  * with data, then we should delete it instead of storing it.
145                  * This is part of the vacuuming process.
146                  *
147                  * The reason that we usually need to store even empty records
148                  * on the lmaster is that a client operating directly on the
149                  * lmaster (== dmaster) expects the local copy of the record to
150                  * exist after successful ctdb migrate call. If the record does
151                  * not exist, the client goes into a migrate loop and eventually
152                  * fails. So storing the empty record makes sure that we do not
153                  * need to change the client code.
154                  */
155                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
156                         keep = true;
157                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
158                         keep = true;
159                 }
160         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
161                 keep = true;
162         }
163
164         if (keep &&
165             (data.dsize == 0) &&
166             !ctdb_db->persistent &&
167             (ctdb_db->ctdb->pnn == header->dmaster))
168         {
169                 schedule_for_deletion = true;
170         }
171
172 store:
173         /*
174          * The VACUUM_MIGRATED flag is only set temporarily for
175          * the above logic when the record was retrieved by a
176          * VACUUM_MIGRATE call and should not be stored in the
177          * database.
178          *
179          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180          * and there are two cases in which the corresponding record
181          * is stored in the local database:
182          * 1. The record has been migrated with data in the past
183          *    (the MIGRATED_WITH_DATA record flag is set).
184          * 2. The record has been filled with data again since it
185          *    had been submitted in the VACUUM_FETCH message to the
186          *    lmaster.
187          * For such records it is important to not store the
188          * VACUUM_MIGRATED flag in the database.
189          */
190         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
191
192         /*
193          * Similarly, clear the AUTOMATIC flag which should not enter
194          * the local database copy since this would require client
195          * modifications to clear the flag when the client stores
196          * the record.
197          */
198         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
199
200         rec.dsize = sizeof(*header) + data.dsize;
201         rec.dptr = talloc_size(ctdb, rec.dsize);
202         CTDB_NO_MEMORY(ctdb, rec.dptr);
203
204         memcpy(rec.dptr, header, sizeof(*header));
205         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
206
207         /* Databases with seqnum updates enabled only get their seqnum
208            changes when/if we modify the data */
209         if (ctdb_db->seqnum_update != NULL) {
210                 TDB_DATA old;
211                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
212
213                 if ( (old.dsize == rec.dsize)
214                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215                           rec.dptr+sizeof(struct ctdb_ltdb_header),
216                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218                         seqnum_suppressed = true;
219                 }
220                 if (old.dptr) free(old.dptr);
221         }
222
223         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
224                             ctdb_db->db_name,
225                             keep?"storing":"deleting",
226                             ctdb_hash(&key)));
227
228         if (keep) {
229                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
230         } else {
231                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
232         }
233
234         if (ret != 0) {
235                 int lvl = DEBUG_ERR;
236
237                 if (keep == false &&
238                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
239                 {
240                         lvl = DEBUG_DEBUG;
241                 }
242
243                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
244                             "%d - %s\n",
245                             ctdb_db->db_name,
246                             keep?"store":"delete", ret,
247                             tdb_errorstr(ctdb_db->ltdb->tdb)));
248
249                 schedule_for_deletion = false;
250         }
251         if (seqnum_suppressed) {
252                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
253         }
254
255         talloc_free(rec.dptr);
256
257         if (schedule_for_deletion) {
258                 int ret2;
259                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
260                 if (ret != 0) {
261                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
262                 }
263         }
264
265         return ret;
266 }
267
268 struct lock_fetch_state {
269         struct ctdb_context *ctdb;
270         void (*recv_pkt)(void *, struct ctdb_req_header *);
271         void *recv_context;
272         struct ctdb_req_header *hdr;
273         uint32_t generation;
274         bool ignore_generation;
275 };
276
277 /*
278   called when we should retry the operation
279  */
280 static void lock_fetch_callback(void *p)
281 {
282         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283         if (!state->ignore_generation &&
284             state->generation != state->ctdb->vnn_map->generation) {
285                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286                 talloc_free(state->hdr);
287                 return;
288         }
289         state->recv_pkt(state->recv_context, state->hdr);
290         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
291 }
292
293
294 /*
295   do a non-blocking ltdb_lock, deferring this ctdb request until we
296   have the chainlock
297
298   It does the following:
299
300    1) tries to get the chainlock. If it succeeds, then it returns 0
301
302    2) if it fails to get a chainlock immediately then it sets up a
303    non-blocking chainlock via ctdb_lockwait, and when it gets the
304    chainlock it re-submits this ctdb request to the main packet
305    receive function
306
307    This effectively queues all ctdb requests that cannot be
308    immediately satisfied until it can get the lock. This means that
309    the main ctdb daemon will not block waiting for a chainlock held by
310    a client
311
312    There are 3 possible return values:
313
314        0:    means that it got the lock immediately.
315       -1:    means that it failed to get the lock, and won't retry
316       -2:    means that it failed to get the lock immediately, but will retry
317  */
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
319                            TDB_DATA key, struct ctdb_req_header *hdr,
320                            void (*recv_pkt)(void *, struct ctdb_req_header *),
321                            void *recv_context, bool ignore_generation)
322 {
323         int ret;
324         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325         struct lockwait_handle *h;
326         struct lock_fetch_state *state;
327         
328         ret = tdb_chainlock_nonblock(tdb, key);
329
330         if (ret != 0 &&
331             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332                 /* a hard failure - don't try again */
333                 return -1;
334         }
335
336         /* when torturing, ensure we test the contended path */
337         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
338             random() % 5 == 0) {
339                 ret = -1;
340                 tdb_chainunlock(tdb, key);
341         }
342
343         /* first the non-contended path */
344         if (ret == 0) {
345                 return 0;
346         }
347
348         state = talloc(hdr, struct lock_fetch_state);
349         state->ctdb = ctdb_db->ctdb;
350         state->hdr = hdr;
351         state->recv_pkt = recv_pkt;
352         state->recv_context = recv_context;
353         state->generation = ctdb_db->ctdb->vnn_map->generation;
354         state->ignore_generation = ignore_generation;
355
356         /* now the contended path */
357         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
358         if (h == NULL) {
359                 return -1;
360         }
361
362         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363            so it won't be freed yet */
364         talloc_steal(state, hdr);
365         talloc_steal(state, h);
366
367         /* now tell the caller than we will retry asynchronously */
368         return -2;
369 }
370
371 /*
372   a varient of ctdb_ltdb_lock_requeue that also fetches the record
373  */
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
375                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
376                                  struct ctdb_req_header *hdr, TDB_DATA *data,
377                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
378                                  void *recv_context, bool ignore_generation)
379 {
380         int ret;
381
382         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
383                                      recv_context, ignore_generation);
384         if (ret == 0) {
385                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
386                 if (ret != 0) {
387                         int uret;
388                         uret = ctdb_ltdb_unlock(ctdb_db, key);
389                         if (uret != 0) {
390                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
391                         }
392                 }
393         }
394         return ret;
395 }
396
397
398 /*
399   paraoid check to see if the db is empty
400  */
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
402 {
403         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404         int count = tdb_traverse_read(tdb, NULL, NULL);
405         if (count != 0) {
406                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
407                          ctdb_db->db_path));
408                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
409         }
410 }
411
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413                                 struct ctdb_db_context *ctdb_db)
414 {
415         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
416         char *old;
417         char *reason = NULL;
418         TDB_DATA key;
419         TDB_DATA val;
420
421         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422         key.dsize = strlen(ctdb_db->db_name);
423
424         old = ctdb_db->unhealthy_reason;
425         ctdb_db->unhealthy_reason = NULL;
426
427         val = tdb_fetch(tdb, key);
428         if (val.dsize > 0) {
429                 reason = talloc_strndup(ctdb_db,
430                                         (const char *)val.dptr,
431                                         val.dsize);
432                 if (reason == NULL) {
433                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
434                                            (int)val.dsize));
435                         ctdb_db->unhealthy_reason = old;
436                         free(val.dptr);
437                         return -1;
438                 }
439         }
440
441         if (val.dptr) {
442                 free(val.dptr);
443         }
444
445         talloc_free(old);
446         ctdb_db->unhealthy_reason = reason;
447         return 0;
448 }
449
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451                                   struct ctdb_db_context *ctdb_db,
452                                   const char *given_reason,/* NULL means healthy */
453                                   int num_healthy_nodes)
454 {
455         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
456         int ret;
457         TDB_DATA key;
458         TDB_DATA val;
459         char *new_reason = NULL;
460         char *old_reason = NULL;
461
462         ret = tdb_transaction_start(tdb);
463         if (ret != 0) {
464                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
466                 return -1;
467         }
468
469         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
470         if (ret != 0) {
471                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472                                    ctdb_db->db_name, ret));
473                 return -1;
474         }
475         old_reason = ctdb_db->unhealthy_reason;
476
477         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478         key.dsize = strlen(ctdb_db->db_name);
479
480         if (given_reason) {
481                 new_reason = talloc_strdup(ctdb_db, given_reason);
482                 if (new_reason == NULL) {
483                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
484                                           given_reason));
485                         return -1;
486                 }
487         } else if (old_reason && num_healthy_nodes == 0) {
488                 /*
489                  * If the reason indicates ok, but there where no healthy nodes
490                  * available, that it means, we have not recovered valid content
491                  * of the db. So if there's an old reason, prefix it with
492                  * "NO-HEALTHY-NODES - "
493                  */
494                 const char *prefix;
495
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
498                 if (ret != 0) {
499                         prefix = _TMP_PREFIX;
500                 } else {
501                         prefix = "";
502                 }
503                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
504                                          prefix, old_reason);
505                 if (new_reason == NULL) {
506                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507                                           prefix, old_reason));
508                         return -1;
509                 }
510 #undef _TMP_PREFIX
511         }
512
513         if (new_reason) {
514                 val.dptr = discard_const_p(uint8_t, new_reason);
515                 val.dsize = strlen(new_reason);
516
517                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
518                 if (ret != 0) {
519                         tdb_transaction_cancel(tdb);
520                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
522                                            ret, tdb_errorstr(tdb)));
523                         talloc_free(new_reason);
524                         return -1;
525                 }
526                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527                                    ctdb_db->db_name, new_reason));
528         } else if (old_reason) {
529                 ret = tdb_delete(tdb, key);
530                 if (ret != 0) {
531                         tdb_transaction_cancel(tdb);
532                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533                                            tdb_name(tdb), ctdb_db->db_name,
534                                            ret, tdb_errorstr(tdb)));
535                         talloc_free(new_reason);
536                         return -1;
537                 }
538                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
539                                    ctdb_db->db_name));
540         }
541
542         ret = tdb_transaction_commit(tdb);
543         if (ret != TDB_SUCCESS) {
544                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
546                 talloc_free(new_reason);
547                 return -1;
548         }
549
550         talloc_free(old_reason);
551         ctdb_db->unhealthy_reason = new_reason;
552
553         return 0;
554 }
555
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557                                      struct ctdb_db_context *ctdb_db)
558 {
559         time_t now = time(NULL);
560         char *new_path;
561         char *new_reason;
562         int ret;
563         struct tm *tm;
564
565         tm = gmtime(&now);
566
567         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569                                    "%04u%02u%02u%02u%02u%02u.0Z",
570                                    ctdb_db->db_path,
571                                    tm->tm_year+1900, tm->tm_mon+1,
572                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
573                                    tm->tm_sec);
574         if (new_path == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578
579         new_reason = talloc_asprintf(ctdb_db,
580                                      "ERROR - Backup of corrupted TDB in '%s'",
581                                      new_path);
582         if (new_reason == NULL) {
583                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
584                 return -1;
585         }
586         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587         talloc_free(new_reason);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
591                                  ctdb_db->db_path));
592                 return -1;
593         }
594
595         ret = rename(ctdb_db->db_path, new_path);
596         if (ret != 0) {
597                 DEBUG(DEBUG_CRIT,(__location__
598                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599                                   ctdb_db->db_path, new_path,
600                                   errno, strerror(errno)));
601                 talloc_free(new_path);
602                 return -1;
603         }
604
605         DEBUG(DEBUG_CRIT,(__location__
606                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607                          ctdb_db->db_path, new_path));
608         talloc_free(new_path);
609         return 0;
610 }
611
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
613 {
614         struct ctdb_db_context *ctdb_db;
615         int ret;
616         int ok = 0;
617         int fail = 0;
618
619         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620                 if (!ctdb_db->persistent) {
621                         continue;
622                 }
623
624                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
625                 if (ret != 0) {
626                         DEBUG(DEBUG_ALERT,(__location__
627                                            " load persistent health for '%s' failed\n",
628                                            ctdb_db->db_path));
629                         return -1;
630                 }
631
632                 if (ctdb_db->unhealthy_reason == NULL) {
633                         ok++;
634                         DEBUG(DEBUG_INFO,(__location__
635                                    " persistent db '%s' healthy\n",
636                                    ctdb_db->db_path));
637                         continue;
638                 }
639
640                 fail++;
641                 DEBUG(DEBUG_ALERT,(__location__
642                                    " persistent db '%s' unhealthy: %s\n",
643                                    ctdb_db->db_path,
644                                    ctdb_db->unhealthy_reason));
645         }
646         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
648                ok, fail));
649
650         if (fail != 0) {
651                 return -1;
652         }
653
654         return 0;
655 }
656
657
658 /*
659   mark a database - as healthy
660  */
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
662 {
663         uint32_t db_id = *(uint32_t *)indata.dptr;
664         struct ctdb_db_context *ctdb_db;
665         int ret;
666         bool may_recover = false;
667
668         ctdb_db = find_ctdb_db(ctdb, db_id);
669         if (!ctdb_db) {
670                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
671                 return -1;
672         }
673
674         if (ctdb_db->unhealthy_reason) {
675                 may_recover = true;
676         }
677
678         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR,(__location__
681                                  " ctdb_update_persistent_health(%s) failed\n",
682                                  ctdb_db->db_name));
683                 return -1;
684         }
685
686         if (may_recover && !ctdb->done_startup) {
687                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
688                                   ctdb_db->db_name));
689                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
690         }
691
692         return 0;
693 }
694
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
696                                    TDB_DATA indata,
697                                    TDB_DATA *outdata)
698 {
699         uint32_t db_id = *(uint32_t *)indata.dptr;
700         struct ctdb_db_context *ctdb_db;
701         int ret;
702
703         ctdb_db = find_ctdb_db(ctdb, db_id);
704         if (!ctdb_db) {
705                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
706                 return -1;
707         }
708
709         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
710         if (ret != 0) {
711                 DEBUG(DEBUG_ERR,(__location__
712                                  " ctdb_load_persistent_health(%s) failed\n",
713                                  ctdb_db->db_name));
714                 return -1;
715         }
716
717         *outdata = tdb_null;
718         if (ctdb_db->unhealthy_reason) {
719                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
721         }
722
723         return 0;
724 }
725
726 /*
727   attach to a database, handling both persistent and non-persistent databases
728   return 0 on success, -1 on failure
729  */
730 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
731                              bool persistent, const char *unhealthy_reason,
732                              bool jenkinshash)
733 {
734         struct ctdb_db_context *ctdb_db, *tmp_db;
735         int ret;
736         struct TDB_DATA key;
737         unsigned tdb_flags;
738         int mode = 0600;
739         int remaining_tries = 0;
740
741         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
742         CTDB_NO_MEMORY(ctdb, ctdb_db);
743
744         ctdb_db->priority = 1;
745         ctdb_db->ctdb = ctdb;
746         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
747         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
748
749         key.dsize = strlen(db_name)+1;
750         key.dptr  = discard_const(db_name);
751         ctdb_db->db_id = ctdb_hash(&key);
752         ctdb_db->persistent = persistent;
753
754         if (!ctdb_db->persistent) {
755                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
756                 if (ctdb_db->delete_queue == NULL) {
757                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
758                 }
759
760                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
761         }
762
763         /* check for hash collisions */
764         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
765                 if (tmp_db->db_id == ctdb_db->db_id) {
766                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
767                                  tmp_db->db_id, db_name, tmp_db->db_name));
768                         talloc_free(ctdb_db);
769                         return -1;
770                 }
771         }
772
773         if (persistent) {
774                 if (unhealthy_reason) {
775                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
776                                                             unhealthy_reason, 0);
777                         if (ret != 0) {
778                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
779                                                    ctdb_db->db_name, unhealthy_reason, ret));
780                                 talloc_free(ctdb_db);
781                                 return -1;
782                         }
783                 }
784
785                 if (ctdb->max_persistent_check_errors > 0) {
786                         remaining_tries = 1;
787                 }
788                 if (ctdb->done_startup) {
789                         remaining_tries = 0;
790                 }
791
792                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
793                 if (ret != 0) {
794                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
795                                    ctdb_db->db_name, ret));
796                         talloc_free(ctdb_db);
797                         return -1;
798                 }
799         }
800
801         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
802                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
803                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
804                 talloc_free(ctdb_db);
805                 return -1;
806         }
807
808         if (ctdb_db->unhealthy_reason) {
809                 /* this is just a warning, but we want that in the log file! */
810                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
811                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
812         }
813
814         /* open the database */
815         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
816                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
817                                            db_name, ctdb->pnn);
818
819         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
820         if (ctdb->valgrinding) {
821                 tdb_flags |= TDB_NOMMAP;
822         }
823         tdb_flags |= TDB_DISALLOW_NESTING;
824         if (jenkinshash) {
825                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
826         }
827
828 again:
829         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
830                                       ctdb->tunable.database_hash_size, 
831                                       tdb_flags, 
832                                       O_CREAT|O_RDWR, mode);
833         if (ctdb_db->ltdb == NULL) {
834                 struct stat st;
835                 int saved_errno = errno;
836
837                 if (!persistent) {
838                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
839                                           ctdb_db->db_path,
840                                           saved_errno,
841                                           strerror(saved_errno)));
842                         talloc_free(ctdb_db);
843                         return -1;
844                 }
845
846                 if (remaining_tries == 0) {
847                         DEBUG(DEBUG_CRIT,(__location__
848                                           "Failed to open persistent tdb '%s': %d - %s\n",
849                                           ctdb_db->db_path,
850                                           saved_errno,
851                                           strerror(saved_errno)));
852                         talloc_free(ctdb_db);
853                         return -1;
854                 }
855
856                 ret = stat(ctdb_db->db_path, &st);
857                 if (ret != 0) {
858                         DEBUG(DEBUG_CRIT,(__location__
859                                           "Failed to open persistent tdb '%s': %d - %s\n",
860                                           ctdb_db->db_path,
861                                           saved_errno,
862                                           strerror(saved_errno)));
863                         talloc_free(ctdb_db);
864                         return -1;
865                 }
866
867                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
868                 if (ret != 0) {
869                         DEBUG(DEBUG_CRIT,(__location__
870                                           "Failed to open persistent tdb '%s': %d - %s\n",
871                                           ctdb_db->db_path,
872                                           saved_errno,
873                                           strerror(saved_errno)));
874                         talloc_free(ctdb_db);
875                         return -1;
876                 }
877
878                 remaining_tries--;
879                 mode = st.st_mode;
880                 goto again;
881         }
882
883         if (!persistent) {
884                 ctdb_check_db_empty(ctdb_db);
885         } else {
886                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
887                 if (ret != 0) {
888                         int fd;
889                         struct stat st;
890
891                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
892                                           ctdb_db->db_path, ret,
893                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
894                         if (remaining_tries == 0) {
895                                 talloc_free(ctdb_db);
896                                 return -1;
897                         }
898
899                         fd = tdb_fd(ctdb_db->ltdb->tdb);
900                         ret = fstat(fd, &st);
901                         if (ret != 0) {
902                                 DEBUG(DEBUG_CRIT,(__location__
903                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
904                                                   ctdb_db->db_path,
905                                                   errno,
906                                                   strerror(errno)));
907                                 talloc_free(ctdb_db);
908                                 return -1;
909                         }
910
911                         /* close the TDB */
912                         talloc_free(ctdb_db->ltdb);
913                         ctdb_db->ltdb = NULL;
914
915                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
916                         if (ret != 0) {
917                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
918                                                   ctdb_db->db_path));
919                                 talloc_free(ctdb_db);
920                                 return -1;
921                         }
922
923                         remaining_tries--;
924                         mode = st.st_mode;
925                         goto again;
926                 }
927         }
928
929         /* Assume all non-persistent databases support read only delegations */
930         if (!ctdb_db->persistent) {
931                 ctdb_db->readonly = true;
932         }
933
934         if (ctdb_db->readonly) {
935                 char *ropath;
936
937                 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
938                 if (ropath == NULL) {
939                         DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
940                         ctdb_db->readonly = false;
941                         talloc_free(ctdb_db);
942                         return -1;
943                 }
944                 ctdb_db->rottdb = tdb_open(ropath, 
945                                       ctdb->tunable.database_hash_size, 
946                                       TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
947                                       O_CREAT|O_RDWR, 0);
948                 if (ctdb_db->rottdb == NULL) {
949                         DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
950                         ctdb_db->readonly = false;
951                         talloc_free(ctdb_db);
952                         return -1;
953                 }
954                 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
955         }
956
957
958         DLIST_ADD(ctdb->db_list, ctdb_db);
959
960         /* setting this can help some high churn databases */
961         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
962
963         /* 
964            all databases support the "null" function. we need this in
965            order to do forced migration of records
966         */
967         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
968         if (ret != 0) {
969                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
970                 talloc_free(ctdb_db);
971                 return -1;
972         }
973
974         /* 
975            all databases support the "fetch" function. we need this
976            for efficient Samba3 ctdb fetch
977         */
978         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
979         if (ret != 0) {
980                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
981                 talloc_free(ctdb_db);
982                 return -1;
983         }
984
985         /* 
986            all databases support the "fetch_with_header" function. we need this
987            for efficient readonly record fetches
988         */
989         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
990         if (ret != 0) {
991                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
992                 talloc_free(ctdb_db);
993                 return -1;
994         }
995
996         ret = ctdb_vacuum_init(ctdb_db);
997         if (ret != 0) {
998                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
999                                   "database '%s'\n", ctdb_db->db_name));
1000                 talloc_free(ctdb_db);
1001                 return -1;
1002         }
1003
1004
1005         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1006         
1007         /* success */
1008         return 0;
1009 }
1010
1011
1012 struct ctdb_deferred_attach_context {
1013         struct ctdb_deferred_attach_context *next, *prev;
1014         struct ctdb_context *ctdb;
1015         struct ctdb_req_control *c;
1016 };
1017
1018
1019 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1020 {
1021         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1022
1023         return 0;
1024 }
1025
1026 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1027 {
1028         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1029         struct ctdb_context *ctdb = da_ctx->ctdb;
1030
1031         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1032         talloc_free(da_ctx);
1033 }
1034
1035 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1036 {
1037         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1038         struct ctdb_context *ctdb = da_ctx->ctdb;
1039
1040         /* This talloc-steals the packet ->c */
1041         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1042         talloc_free(da_ctx);
1043 }
1044
1045 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1046 {
1047         struct ctdb_deferred_attach_context *da_ctx;
1048
1049         /* call it from the main event loop as soon as the current event 
1050            finishes.
1051          */
1052         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1053                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1054                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1055         }
1056
1057         return 0;
1058 }
1059
1060 /*
1061   a client has asked to attach a new database
1062  */
1063 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1064                                TDB_DATA *outdata, uint64_t tdb_flags, 
1065                                bool persistent, uint32_t client_id,
1066                                struct ctdb_req_control *c,
1067                                bool *async_reply)
1068 {
1069         const char *db_name = (const char *)indata.dptr;
1070         struct ctdb_db_context *db;
1071         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1072         struct ctdb_client *client = NULL;
1073
1074         /* dont allow any local clients to attach while we are in recovery mode
1075          * except for the recovery daemon.
1076          * allow all attach from the network since these are always from remote
1077          * recovery daemons.
1078          */
1079         if (client_id != 0) {
1080                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1081         }
1082         if (client != NULL) {
1083                 /* If the node is inactive it is not part of the cluster
1084                    and we should not allow clients to attach to any
1085                    databases
1086                 */
1087                 if (node->flags & NODE_FLAGS_INACTIVE) {
1088                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1089                         return -1;
1090                 }
1091
1092                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1093                  && client->pid != ctdb->recoverd_pid
1094                  && !ctdb->done_startup) {
1095                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1096
1097                         if (da_ctx == NULL) {
1098                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1099                                 return -1;
1100                         }
1101
1102                         da_ctx->ctdb = ctdb;
1103                         da_ctx->c = talloc_steal(da_ctx, c);
1104                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1105                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1106
1107                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1108
1109                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1110                         *async_reply = true;
1111                         return 0;
1112                 }
1113         }
1114
1115         /* the client can optionally pass additional tdb flags, but we
1116            only allow a subset of those on the database in ctdb. Note
1117            that tdb_flags is passed in via the (otherwise unused)
1118            srvid to the attach control */
1119         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1120
1121         /* see if we already have this name */
1122         db = ctdb_db_handle(ctdb, db_name);
1123         if (db) {
1124                 outdata->dptr  = (uint8_t *)&db->db_id;
1125                 outdata->dsize = sizeof(db->db_id);
1126                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1127                 return 0;
1128         }
1129
1130         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1131                 return -1;
1132         }
1133
1134         db = ctdb_db_handle(ctdb, db_name);
1135         if (!db) {
1136                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1137                 return -1;
1138         }
1139
1140         /* remember the flags the client has specified */
1141         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1142
1143         outdata->dptr  = (uint8_t *)&db->db_id;
1144         outdata->dsize = sizeof(db->db_id);
1145
1146         /* Try to ensure it's locked in mem */
1147         ctdb_lockdown_memory(ctdb);
1148
1149         /* tell all the other nodes about this database */
1150         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1151                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1152                                                 CTDB_CONTROL_DB_ATTACH,
1153                                  0, CTDB_CTRL_FLAG_NOREPLY,
1154                                  indata, NULL, NULL);
1155
1156         /* success */
1157         return 0;
1158 }
1159
1160
1161 /*
1162   attach to all existing persistent databases
1163  */
1164 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1165                                   const char *unhealthy_reason)
1166 {
1167         DIR *d;
1168         struct dirent *de;
1169
1170         /* open the persistent db directory and scan it for files */
1171         d = opendir(ctdb->db_directory_persistent);
1172         if (d == NULL) {
1173                 return 0;
1174         }
1175
1176         while ((de=readdir(d))) {
1177                 char *p, *s, *q;
1178                 size_t len = strlen(de->d_name);
1179                 uint32_t node;
1180                 int invalid_name = 0;
1181                 
1182                 s = talloc_strdup(ctdb, de->d_name);
1183                 CTDB_NO_MEMORY(ctdb, s);
1184
1185                 /* only accept names ending in .tdb */
1186                 p = strstr(s, ".tdb.");
1187                 if (len < 7 || p == NULL) {
1188                         talloc_free(s);
1189                         continue;
1190                 }
1191
1192                 /* only accept names ending with .tdb. and any number of digits */
1193                 q = p+5;
1194                 while (*q != 0 && invalid_name == 0) {
1195                         if (!isdigit(*q++)) {
1196                                 invalid_name = 1;
1197                         }
1198                 }
1199                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1200                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1201                         talloc_free(s);
1202                         continue;
1203                 }
1204                 p[4] = 0;
1205
1206                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1207                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1208                         closedir(d);
1209                         talloc_free(s);
1210                         return -1;
1211                 }
1212
1213                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1214
1215                 talloc_free(s);
1216         }
1217         closedir(d);
1218         return 0;
1219 }
1220
1221 int ctdb_attach_databases(struct ctdb_context *ctdb)
1222 {
1223         int ret;
1224         char *persistent_health_path = NULL;
1225         char *unhealthy_reason = NULL;
1226         bool first_try = true;
1227
1228         if (ctdb->db_directory == NULL) {
1229                 ctdb->db_directory = VARDIR "/ctdb";
1230         }
1231         if (ctdb->db_directory_persistent == NULL) {
1232                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1233         }
1234         if (ctdb->db_directory_state == NULL) {
1235                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1236         }
1237
1238         /* make sure the db directory exists */
1239         ret = mkdir(ctdb->db_directory, 0700);
1240         if (ret == -1 && errno != EEXIST) {
1241                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1242                          ctdb->db_directory));
1243                 return -1;
1244         }
1245
1246         /* make sure the persistent db directory exists */
1247         ret = mkdir(ctdb->db_directory_persistent, 0700);
1248         if (ret == -1 && errno != EEXIST) {
1249                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1250                          ctdb->db_directory_persistent));
1251                 return -1;
1252         }
1253
1254         /* make sure the internal state db directory exists */
1255         ret = mkdir(ctdb->db_directory_state, 0700);
1256         if (ret == -1 && errno != EEXIST) {
1257                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1258                          ctdb->db_directory_state));
1259                 return -1;
1260         }
1261
1262         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1263                                                  ctdb->db_directory_state,
1264                                                  PERSISTENT_HEALTH_TDB,
1265                                                  ctdb->pnn);
1266         if (persistent_health_path == NULL) {
1267                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1268                 return -1;
1269         }
1270
1271 again:
1272
1273         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1274                                                    0, TDB_DISALLOW_NESTING,
1275                                                    O_CREAT | O_RDWR, 0600);
1276         if (ctdb->db_persistent_health == NULL) {
1277                 struct tdb_wrap *tdb;
1278
1279                 if (!first_try) {
1280                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1281                                           persistent_health_path,
1282                                           errno,
1283                                           strerror(errno)));
1284                         talloc_free(persistent_health_path);
1285                         talloc_free(unhealthy_reason);
1286                         return -1;
1287                 }
1288                 first_try = false;
1289
1290                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1291                                                    persistent_health_path,
1292                                                    "was cleared after a failure",
1293                                                    "manual verification needed");
1294                 if (unhealthy_reason == NULL) {
1295                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1296                         talloc_free(persistent_health_path);
1297                         return -1;
1298                 }
1299
1300                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1301                                   persistent_health_path));
1302                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1303                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1304                                     O_CREAT | O_RDWR, 0600);
1305                 if (tdb) {
1306                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1307                                           persistent_health_path,
1308                                           errno,
1309                                           strerror(errno)));
1310                         talloc_free(persistent_health_path);
1311                         talloc_free(unhealthy_reason);
1312                         return -1;
1313                 }
1314
1315                 talloc_free(tdb);
1316                 goto again;
1317         }
1318         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1319         if (ret != 0) {
1320                 struct tdb_wrap *tdb;
1321
1322                 talloc_free(ctdb->db_persistent_health);
1323                 ctdb->db_persistent_health = NULL;
1324
1325                 if (!first_try) {
1326                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1327                                           persistent_health_path));
1328                         talloc_free(persistent_health_path);
1329                         talloc_free(unhealthy_reason);
1330                         return -1;
1331                 }
1332                 first_try = false;
1333
1334                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1335                                                    persistent_health_path,
1336                                                    "was cleared after a failure",
1337                                                    "manual verification needed");
1338                 if (unhealthy_reason == NULL) {
1339                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1340                         talloc_free(persistent_health_path);
1341                         return -1;
1342                 }
1343
1344                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1345                                   persistent_health_path));
1346                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1347                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1348                                     O_CREAT | O_RDWR, 0600);
1349                 if (tdb) {
1350                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1351                                           persistent_health_path,
1352                                           errno,
1353                                           strerror(errno)));
1354                         talloc_free(persistent_health_path);
1355                         talloc_free(unhealthy_reason);
1356                         return -1;
1357                 }
1358
1359                 talloc_free(tdb);
1360                 goto again;
1361         }
1362         talloc_free(persistent_health_path);
1363
1364         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1365         talloc_free(unhealthy_reason);
1366         if (ret != 0) {
1367                 return ret;
1368         }
1369
1370         return 0;
1371 }
1372
1373 /*
1374   called when a broadcast seqnum update comes in
1375  */
1376 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1377 {
1378         struct ctdb_db_context *ctdb_db;
1379         if (srcnode == ctdb->pnn) {
1380                 /* don't update ourselves! */
1381                 return 0;
1382         }
1383
1384         ctdb_db = find_ctdb_db(ctdb, db_id);
1385         if (!ctdb_db) {
1386                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1387                 return -1;
1388         }
1389
1390         if (ctdb_db->unhealthy_reason) {
1391                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1392                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1393                 return -1;
1394         }
1395
1396         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1397         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1398         return 0;
1399 }
1400
1401 /*
1402   timer to check for seqnum changes in a ltdb and propogate them
1403  */
1404 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1405                                    struct timeval t, void *p)
1406 {
1407         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1408         struct ctdb_context *ctdb = ctdb_db->ctdb;
1409         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1410         if (new_seqnum != ctdb_db->seqnum) {
1411                 /* something has changed - propogate it */
1412                 TDB_DATA data;
1413                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1414                 data.dsize = sizeof(uint32_t);
1415                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1416                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1417                                          data, NULL, NULL);             
1418         }
1419         ctdb_db->seqnum = new_seqnum;
1420
1421         /* setup a new timer */
1422         ctdb_db->seqnum_update =
1423                 event_add_timed(ctdb->ev, ctdb_db, 
1424                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1425                                 ctdb_ltdb_seqnum_check, ctdb_db);
1426 }
1427
1428 /*
1429   enable seqnum handling on this db
1430  */
1431 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1432 {
1433         struct ctdb_db_context *ctdb_db;
1434         ctdb_db = find_ctdb_db(ctdb, db_id);
1435         if (!ctdb_db) {
1436                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1437                 return -1;
1438         }
1439
1440         if (ctdb_db->seqnum_update == NULL) {
1441                 ctdb_db->seqnum_update =
1442                         event_add_timed(ctdb->ev, ctdb_db, 
1443                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1444                                         ctdb_ltdb_seqnum_check, ctdb_db);
1445         }
1446
1447         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1448         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1449         return 0;
1450 }
1451
1452 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1453 {
1454         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1455         struct ctdb_db_context *ctdb_db;
1456
1457         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1458         if (!ctdb_db) {
1459                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1460                 return 0;
1461         }
1462
1463         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1464                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1465                 return 0;
1466         }
1467
1468         ctdb_db->priority = db_prio->priority;
1469         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1470
1471         return 0;
1472 }
1473