b1f29d77ec31bceef351d47e9893ec471af91ec3
[rusty/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /*
36   this is the dummy null procedure that all databases support
37 */
38 static int ctdb_null_func(struct ctdb_call_info *call)
39 {
40         return 0;
41 }
42
43 /*
44   this is a plain fetch procedure that all databases support
45 */
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 {
48         call->reply_data = &call->record_data;
49         return 0;
50 }
51
52 /*
53   this is a plain fetch procedure that all databases support
54   this returns the full record including the ltdb header
55 */
56 static int ctdb_fetch_with_header_func(struct ctdb_call_info *call)
57 {
58         call->reply_data = talloc(call, TDB_DATA);
59         if (call->reply_data == NULL) {
60                 return -1;
61         }
62         call->reply_data->dsize = sizeof(struct ctdb_ltdb_header) + call->record_data.dsize;
63         call->reply_data->dptr  = talloc_size(call->reply_data, call->reply_data->dsize);
64         if (call->reply_data->dptr == NULL) {
65                 return -1;
66         }
67         memcpy(call->reply_data->dptr, call->header, sizeof(struct ctdb_ltdb_header));
68         memcpy(&call->reply_data->dptr[sizeof(struct ctdb_ltdb_header)], call->record_data.dptr, call->record_data.dsize);
69
70         return 0;
71 }
72
73
74 /**
75  * write a record to a normal database
76  *
77  * This is the server-variant of the ctdb_ltdb_store function.
78  * It contains logic to determine whether a record should be
79  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
80  * controls to the local ctdb daemon if apporpriate.
81  */
82 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
83                                   TDB_DATA key,
84                                   struct ctdb_ltdb_header *header,
85                                   TDB_DATA data)
86 {
87         struct ctdb_context *ctdb = ctdb_db->ctdb;
88         TDB_DATA rec;
89         int ret;
90         bool seqnum_suppressed = false;
91         bool keep = false;
92         bool schedule_for_deletion = false;
93         uint32_t lmaster;
94
95         if (ctdb->flags & CTDB_FLAG_TORTURE) {
96                 struct ctdb_ltdb_header *h2;
97                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
98                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
99                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
100                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
101                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
102                 }
103                 if (rec.dptr) free(rec.dptr);
104         }
105
106         if (ctdb->vnn_map == NULL) {
107                 /*
108                  * Called from a client: always store the record
109                  * Also don't call ctdb_lmaster since it uses the vnn_map!
110                  */
111                 keep = true;
112                 goto store;
113         }
114
115         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
116
117         /*
118          * If we migrate an empty record off to another node
119          * and the record has not been migrated with data,
120          * delete the record instead of storing the empty record.
121          */
122         if (data.dsize != 0) {
123                 keep = true;
124         } else if (ctdb_db->persistent) {
125                 keep = true;
126         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
127                 /*
128                  * The record is not created by the client but
129                  * automatically by the ctdb_ltdb_fetch logic that
130                  * creates a record with an initial header in the
131                  * ltdb before trying to migrate the record from
132                  * the current lmaster. Keep it instead of trying
133                  * to delete the non-existing record...
134                  */
135                 keep = true;
136                 schedule_for_deletion = true;
137         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
138                 keep = true;
139         } else if (ctdb_db->ctdb->pnn == lmaster) {
140                 /*
141                  * If we are lmaster, then we usually keep the record.
142                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
143                  * and the record is empty and has never been migrated
144                  * with data, then we should delete it instead of storing it.
145                  * This is part of the vacuuming process.
146                  *
147                  * The reason that we usually need to store even empty records
148                  * on the lmaster is that a client operating directly on the
149                  * lmaster (== dmaster) expects the local copy of the record to
150                  * exist after successful ctdb migrate call. If the record does
151                  * not exist, the client goes into a migrate loop and eventually
152                  * fails. So storing the empty record makes sure that we do not
153                  * need to change the client code.
154                  */
155                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
156                         keep = true;
157                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
158                         keep = true;
159                 }
160         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
161                 keep = true;
162         }
163
164         if (keep &&
165             (data.dsize == 0) &&
166             !ctdb_db->persistent &&
167             (ctdb_db->ctdb->pnn == header->dmaster))
168         {
169                 schedule_for_deletion = true;
170         }
171
172 store:
173         /*
174          * The VACUUM_MIGRATED flag is only set temporarily for
175          * the above logic when the record was retrieved by a
176          * VACUUM_MIGRATE call and should not be stored in the
177          * database.
178          *
179          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
180          * and there are two cases in which the corresponding record
181          * is stored in the local database:
182          * 1. The record has been migrated with data in the past
183          *    (the MIGRATED_WITH_DATA record flag is set).
184          * 2. The record has been filled with data again since it
185          *    had been submitted in the VACUUM_FETCH message to the
186          *    lmaster.
187          * For such records it is important to not store the
188          * VACUUM_MIGRATED flag in the database.
189          */
190         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
191
192         /*
193          * Similarly, clear the AUTOMATIC flag which should not enter
194          * the local database copy since this would require client
195          * modifications to clear the flag when the client stores
196          * the record.
197          */
198         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
199
200         rec.dsize = sizeof(*header) + data.dsize;
201         rec.dptr = talloc_size(ctdb, rec.dsize);
202         CTDB_NO_MEMORY(ctdb, rec.dptr);
203
204         memcpy(rec.dptr, header, sizeof(*header));
205         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
206
207         /* Databases with seqnum updates enabled only get their seqnum
208            changes when/if we modify the data */
209         if (ctdb_db->seqnum_update != NULL) {
210                 TDB_DATA old;
211                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
212
213                 if ( (old.dsize == rec.dsize)
214                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
215                           rec.dptr+sizeof(struct ctdb_ltdb_header),
216                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
217                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
218                         seqnum_suppressed = true;
219                 }
220                 if (old.dptr) free(old.dptr);
221         }
222
223         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
224                             ctdb_db->db_name,
225                             keep?"storing":"deleting",
226                             ctdb_hash(&key)));
227
228         if (keep) {
229                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
230         } else {
231                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
232         }
233
234         if (ret != 0) {
235                 int lvl = DEBUG_ERR;
236
237                 if (keep == false &&
238                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
239                 {
240                         lvl = DEBUG_DEBUG;
241                 }
242
243                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
244                             "%d - %s\n",
245                             ctdb_db->db_name,
246                             keep?"store":"delete", ret,
247                             tdb_errorstr(ctdb_db->ltdb->tdb)));
248
249                 schedule_for_deletion = false;
250         }
251         if (seqnum_suppressed) {
252                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
253         }
254
255         talloc_free(rec.dptr);
256
257         if (schedule_for_deletion) {
258                 int ret2;
259                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
260                 if (ret2 != 0) {
261                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
262                 }
263         }
264
265         return ret;
266 }
267
268 struct lock_fetch_state {
269         struct ctdb_context *ctdb;
270         void (*recv_pkt)(void *, struct ctdb_req_header *);
271         void *recv_context;
272         struct ctdb_req_header *hdr;
273         uint32_t generation;
274         bool ignore_generation;
275 };
276
277 /*
278   called when we should retry the operation
279  */
280 static void lock_fetch_callback(void *p)
281 {
282         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
283         if (!state->ignore_generation &&
284             state->generation != state->ctdb->vnn_map->generation) {
285                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
286                 talloc_free(state->hdr);
287                 return;
288         }
289         state->recv_pkt(state->recv_context, state->hdr);
290         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
291 }
292
293
294 /*
295   do a non-blocking ltdb_lock, deferring this ctdb request until we
296   have the chainlock
297
298   It does the following:
299
300    1) tries to get the chainlock. If it succeeds, then it returns 0
301
302    2) if it fails to get a chainlock immediately then it sets up a
303    non-blocking chainlock via ctdb_lockwait, and when it gets the
304    chainlock it re-submits this ctdb request to the main packet
305    receive function
306
307    This effectively queues all ctdb requests that cannot be
308    immediately satisfied until it can get the lock. This means that
309    the main ctdb daemon will not block waiting for a chainlock held by
310    a client
311
312    There are 3 possible return values:
313
314        0:    means that it got the lock immediately.
315       -1:    means that it failed to get the lock, and won't retry
316       -2:    means that it failed to get the lock immediately, but will retry
317  */
318 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
319                            TDB_DATA key, struct ctdb_req_header *hdr,
320                            void (*recv_pkt)(void *, struct ctdb_req_header *),
321                            void *recv_context, bool ignore_generation)
322 {
323         int ret;
324         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
325         struct lockwait_handle *h;
326         struct lock_fetch_state *state;
327         
328         ret = tdb_chainlock_nonblock(tdb, key);
329
330         if (ret != 0 &&
331             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
332                 /* a hard failure - don't try again */
333                 return -1;
334         }
335
336         /* when torturing, ensure we test the contended path */
337         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
338             random() % 5 == 0) {
339                 ret = -1;
340                 tdb_chainunlock(tdb, key);
341         }
342
343         /* first the non-contended path */
344         if (ret == 0) {
345                 return 0;
346         }
347
348         state = talloc(hdr, struct lock_fetch_state);
349         state->ctdb = ctdb_db->ctdb;
350         state->hdr = hdr;
351         state->recv_pkt = recv_pkt;
352         state->recv_context = recv_context;
353         state->generation = ctdb_db->ctdb->vnn_map->generation;
354         state->ignore_generation = ignore_generation;
355
356         /* now the contended path */
357         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
358         if (h == NULL) {
359                 return -1;
360         }
361
362         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
363            so it won't be freed yet */
364         talloc_steal(state, hdr);
365         talloc_steal(state, h);
366
367         /* now tell the caller than we will retry asynchronously */
368         return -2;
369 }
370
371 /*
372   a varient of ctdb_ltdb_lock_requeue that also fetches the record
373  */
374 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
375                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
376                                  struct ctdb_req_header *hdr, TDB_DATA *data,
377                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
378                                  void *recv_context, bool ignore_generation)
379 {
380         int ret;
381
382         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
383                                      recv_context, ignore_generation);
384         if (ret == 0) {
385                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
386                 if (ret != 0) {
387                         int uret;
388                         uret = ctdb_ltdb_unlock(ctdb_db, key);
389                         if (uret != 0) {
390                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
391                         }
392                 }
393         }
394         return ret;
395 }
396
397
398 /*
399   paraoid check to see if the db is empty
400  */
401 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
402 {
403         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
404         int count = tdb_traverse_read(tdb, NULL, NULL);
405         if (count != 0) {
406                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
407                          ctdb_db->db_path));
408                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
409         }
410 }
411
412 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
413                                 struct ctdb_db_context *ctdb_db)
414 {
415         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
416         char *old;
417         char *reason = NULL;
418         TDB_DATA key;
419         TDB_DATA val;
420
421         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
422         key.dsize = strlen(ctdb_db->db_name);
423
424         old = ctdb_db->unhealthy_reason;
425         ctdb_db->unhealthy_reason = NULL;
426
427         val = tdb_fetch(tdb, key);
428         if (val.dsize > 0) {
429                 reason = talloc_strndup(ctdb_db,
430                                         (const char *)val.dptr,
431                                         val.dsize);
432                 if (reason == NULL) {
433                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
434                                            (int)val.dsize));
435                         ctdb_db->unhealthy_reason = old;
436                         free(val.dptr);
437                         return -1;
438                 }
439         }
440
441         if (val.dptr) {
442                 free(val.dptr);
443         }
444
445         talloc_free(old);
446         ctdb_db->unhealthy_reason = reason;
447         return 0;
448 }
449
450 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
451                                   struct ctdb_db_context *ctdb_db,
452                                   const char *given_reason,/* NULL means healthy */
453                                   int num_healthy_nodes)
454 {
455         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
456         int ret;
457         TDB_DATA key;
458         TDB_DATA val;
459         char *new_reason = NULL;
460         char *old_reason = NULL;
461
462         ret = tdb_transaction_start(tdb);
463         if (ret != 0) {
464                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
465                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
466                 return -1;
467         }
468
469         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
470         if (ret != 0) {
471                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
472                                    ctdb_db->db_name, ret));
473                 return -1;
474         }
475         old_reason = ctdb_db->unhealthy_reason;
476
477         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
478         key.dsize = strlen(ctdb_db->db_name);
479
480         if (given_reason) {
481                 new_reason = talloc_strdup(ctdb_db, given_reason);
482                 if (new_reason == NULL) {
483                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
484                                           given_reason));
485                         return -1;
486                 }
487         } else if (old_reason && num_healthy_nodes == 0) {
488                 /*
489                  * If the reason indicates ok, but there where no healthy nodes
490                  * available, that it means, we have not recovered valid content
491                  * of the db. So if there's an old reason, prefix it with
492                  * "NO-HEALTHY-NODES - "
493                  */
494                 const char *prefix;
495
496 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
497                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
498                 if (ret != 0) {
499                         prefix = _TMP_PREFIX;
500                 } else {
501                         prefix = "";
502                 }
503                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
504                                          prefix, old_reason);
505                 if (new_reason == NULL) {
506                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
507                                           prefix, old_reason));
508                         return -1;
509                 }
510 #undef _TMP_PREFIX
511         }
512
513         if (new_reason) {
514                 val.dptr = discard_const_p(uint8_t, new_reason);
515                 val.dsize = strlen(new_reason);
516
517                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
518                 if (ret != 0) {
519                         tdb_transaction_cancel(tdb);
520                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
521                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
522                                            ret, tdb_errorstr(tdb)));
523                         talloc_free(new_reason);
524                         return -1;
525                 }
526                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
527                                    ctdb_db->db_name, new_reason));
528         } else if (old_reason) {
529                 ret = tdb_delete(tdb, key);
530                 if (ret != 0) {
531                         tdb_transaction_cancel(tdb);
532                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
533                                            tdb_name(tdb), ctdb_db->db_name,
534                                            ret, tdb_errorstr(tdb)));
535                         talloc_free(new_reason);
536                         return -1;
537                 }
538                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
539                                    ctdb_db->db_name));
540         }
541
542         ret = tdb_transaction_commit(tdb);
543         if (ret != TDB_SUCCESS) {
544                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
545                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
546                 talloc_free(new_reason);
547                 return -1;
548         }
549
550         talloc_free(old_reason);
551         ctdb_db->unhealthy_reason = new_reason;
552
553         return 0;
554 }
555
556 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
557                                      struct ctdb_db_context *ctdb_db)
558 {
559         time_t now = time(NULL);
560         char *new_path;
561         char *new_reason;
562         int ret;
563         struct tm *tm;
564
565         tm = gmtime(&now);
566
567         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
568         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
569                                    "%04u%02u%02u%02u%02u%02u.0Z",
570                                    ctdb_db->db_path,
571                                    tm->tm_year+1900, tm->tm_mon+1,
572                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
573                                    tm->tm_sec);
574         if (new_path == NULL) {
575                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
576                 return -1;
577         }
578
579         new_reason = talloc_asprintf(ctdb_db,
580                                      "ERROR - Backup of corrupted TDB in '%s'",
581                                      new_path);
582         if (new_reason == NULL) {
583                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
584                 return -1;
585         }
586         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
587         talloc_free(new_reason);
588         if (ret != 0) {
589                 DEBUG(DEBUG_CRIT,(__location__
590                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
591                                  ctdb_db->db_path));
592                 return -1;
593         }
594
595         ret = rename(ctdb_db->db_path, new_path);
596         if (ret != 0) {
597                 DEBUG(DEBUG_CRIT,(__location__
598                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
599                                   ctdb_db->db_path, new_path,
600                                   errno, strerror(errno)));
601                 talloc_free(new_path);
602                 return -1;
603         }
604
605         DEBUG(DEBUG_CRIT,(__location__
606                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
607                          ctdb_db->db_path, new_path));
608         talloc_free(new_path);
609         return 0;
610 }
611
612 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
613 {
614         struct ctdb_db_context *ctdb_db;
615         int ret;
616         int ok = 0;
617         int fail = 0;
618
619         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
620                 if (!ctdb_db->persistent) {
621                         continue;
622                 }
623
624                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
625                 if (ret != 0) {
626                         DEBUG(DEBUG_ALERT,(__location__
627                                            " load persistent health for '%s' failed\n",
628                                            ctdb_db->db_path));
629                         return -1;
630                 }
631
632                 if (ctdb_db->unhealthy_reason == NULL) {
633                         ok++;
634                         DEBUG(DEBUG_INFO,(__location__
635                                    " persistent db '%s' healthy\n",
636                                    ctdb_db->db_path));
637                         continue;
638                 }
639
640                 fail++;
641                 DEBUG(DEBUG_ALERT,(__location__
642                                    " persistent db '%s' unhealthy: %s\n",
643                                    ctdb_db->db_path,
644                                    ctdb_db->unhealthy_reason));
645         }
646         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
647               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
648                ok, fail));
649
650         if (fail != 0) {
651                 return -1;
652         }
653
654         return 0;
655 }
656
657
658 /*
659   mark a database - as healthy
660  */
661 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
662 {
663         uint32_t db_id = *(uint32_t *)indata.dptr;
664         struct ctdb_db_context *ctdb_db;
665         int ret;
666         bool may_recover = false;
667
668         ctdb_db = find_ctdb_db(ctdb, db_id);
669         if (!ctdb_db) {
670                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
671                 return -1;
672         }
673
674         if (ctdb_db->unhealthy_reason) {
675                 may_recover = true;
676         }
677
678         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
679         if (ret != 0) {
680                 DEBUG(DEBUG_ERR,(__location__
681                                  " ctdb_update_persistent_health(%s) failed\n",
682                                  ctdb_db->db_name));
683                 return -1;
684         }
685
686         if (may_recover && !ctdb->done_startup) {
687                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
688                                   ctdb_db->db_name));
689                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
690         }
691
692         return 0;
693 }
694
695 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
696                                    TDB_DATA indata,
697                                    TDB_DATA *outdata)
698 {
699         uint32_t db_id = *(uint32_t *)indata.dptr;
700         struct ctdb_db_context *ctdb_db;
701         int ret;
702
703         ctdb_db = find_ctdb_db(ctdb, db_id);
704         if (!ctdb_db) {
705                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
706                 return -1;
707         }
708
709         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
710         if (ret != 0) {
711                 DEBUG(DEBUG_ERR,(__location__
712                                  " ctdb_load_persistent_health(%s) failed\n",
713                                  ctdb_db->db_name));
714                 return -1;
715         }
716
717         *outdata = tdb_null;
718         if (ctdb_db->unhealthy_reason) {
719                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
720                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
721         }
722
723         return 0;
724 }
725
726
727 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
728 {
729         char *ropath;
730
731         if (ctdb_db->readonly) {
732                 return 0;
733         }
734
735         if (ctdb_db->persistent) {
736                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
737                 return -1;
738         }
739
740         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
741         if (ropath == NULL) {
742                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
743                 return -1;
744         }
745         ctdb_db->rottdb = tdb_open(ropath, 
746                               ctdb->tunable.database_hash_size, 
747                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
748                               O_CREAT|O_RDWR, 0);
749         if (ctdb_db->rottdb == NULL) {
750                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
751                 talloc_free(ropath);
752                 return -1;
753         }
754
755         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
756
757         ctdb_db->readonly = true;
758         talloc_free(ropath);
759         return 0;
760 }
761
762 /*
763   attach to a database, handling both persistent and non-persistent databases
764   return 0 on success, -1 on failure
765  */
766 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
767                              bool persistent, const char *unhealthy_reason,
768                              bool jenkinshash)
769 {
770         struct ctdb_db_context *ctdb_db, *tmp_db;
771         int ret;
772         struct TDB_DATA key;
773         unsigned tdb_flags;
774         int mode = 0600;
775         int remaining_tries = 0;
776
777         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
778         CTDB_NO_MEMORY(ctdb, ctdb_db);
779
780         ctdb_db->priority = 1;
781         ctdb_db->ctdb = ctdb;
782         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
783         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
784
785         key.dsize = strlen(db_name)+1;
786         key.dptr  = discard_const(db_name);
787         ctdb_db->db_id = ctdb_hash(&key);
788         ctdb_db->persistent = persistent;
789
790         if (!ctdb_db->persistent) {
791                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
792                 if (ctdb_db->delete_queue == NULL) {
793                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
794                 }
795
796                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
797         }
798
799         /* check for hash collisions */
800         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
801                 if (tmp_db->db_id == ctdb_db->db_id) {
802                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
803                                  tmp_db->db_id, db_name, tmp_db->db_name));
804                         talloc_free(ctdb_db);
805                         return -1;
806                 }
807         }
808
809         if (persistent) {
810                 if (unhealthy_reason) {
811                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
812                                                             unhealthy_reason, 0);
813                         if (ret != 0) {
814                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
815                                                    ctdb_db->db_name, unhealthy_reason, ret));
816                                 talloc_free(ctdb_db);
817                                 return -1;
818                         }
819                 }
820
821                 if (ctdb->max_persistent_check_errors > 0) {
822                         remaining_tries = 1;
823                 }
824                 if (ctdb->done_startup) {
825                         remaining_tries = 0;
826                 }
827
828                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
829                 if (ret != 0) {
830                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
831                                    ctdb_db->db_name, ret));
832                         talloc_free(ctdb_db);
833                         return -1;
834                 }
835         }
836
837         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
838                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
839                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
840                 talloc_free(ctdb_db);
841                 return -1;
842         }
843
844         if (ctdb_db->unhealthy_reason) {
845                 /* this is just a warning, but we want that in the log file! */
846                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
847                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
848         }
849
850         /* open the database */
851         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
852                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
853                                            db_name, ctdb->pnn);
854
855         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
856         if (ctdb->valgrinding) {
857                 tdb_flags |= TDB_NOMMAP;
858         }
859         tdb_flags |= TDB_DISALLOW_NESTING;
860         if (jenkinshash) {
861                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
862         }
863
864 again:
865         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
866                                       ctdb->tunable.database_hash_size, 
867                                       tdb_flags, 
868                                       O_CREAT|O_RDWR, mode);
869         if (ctdb_db->ltdb == NULL) {
870                 struct stat st;
871                 int saved_errno = errno;
872
873                 if (!persistent) {
874                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
875                                           ctdb_db->db_path,
876                                           saved_errno,
877                                           strerror(saved_errno)));
878                         talloc_free(ctdb_db);
879                         return -1;
880                 }
881
882                 if (remaining_tries == 0) {
883                         DEBUG(DEBUG_CRIT,(__location__
884                                           "Failed to open persistent tdb '%s': %d - %s\n",
885                                           ctdb_db->db_path,
886                                           saved_errno,
887                                           strerror(saved_errno)));
888                         talloc_free(ctdb_db);
889                         return -1;
890                 }
891
892                 ret = stat(ctdb_db->db_path, &st);
893                 if (ret != 0) {
894                         DEBUG(DEBUG_CRIT,(__location__
895                                           "Failed to open persistent tdb '%s': %d - %s\n",
896                                           ctdb_db->db_path,
897                                           saved_errno,
898                                           strerror(saved_errno)));
899                         talloc_free(ctdb_db);
900                         return -1;
901                 }
902
903                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
904                 if (ret != 0) {
905                         DEBUG(DEBUG_CRIT,(__location__
906                                           "Failed to open persistent tdb '%s': %d - %s\n",
907                                           ctdb_db->db_path,
908                                           saved_errno,
909                                           strerror(saved_errno)));
910                         talloc_free(ctdb_db);
911                         return -1;
912                 }
913
914                 remaining_tries--;
915                 mode = st.st_mode;
916                 goto again;
917         }
918
919         if (!persistent) {
920                 ctdb_check_db_empty(ctdb_db);
921         } else {
922                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
923                 if (ret != 0) {
924                         int fd;
925                         struct stat st;
926
927                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
928                                           ctdb_db->db_path, ret,
929                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
930                         if (remaining_tries == 0) {
931                                 talloc_free(ctdb_db);
932                                 return -1;
933                         }
934
935                         fd = tdb_fd(ctdb_db->ltdb->tdb);
936                         ret = fstat(fd, &st);
937                         if (ret != 0) {
938                                 DEBUG(DEBUG_CRIT,(__location__
939                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
940                                                   ctdb_db->db_path,
941                                                   errno,
942                                                   strerror(errno)));
943                                 talloc_free(ctdb_db);
944                                 return -1;
945                         }
946
947                         /* close the TDB */
948                         talloc_free(ctdb_db->ltdb);
949                         ctdb_db->ltdb = NULL;
950
951                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
952                         if (ret != 0) {
953                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
954                                                   ctdb_db->db_path));
955                                 talloc_free(ctdb_db);
956                                 return -1;
957                         }
958
959                         remaining_tries--;
960                         mode = st.st_mode;
961                         goto again;
962                 }
963         }
964
965         /* set up a rb tree we can use to track which records we have a 
966            fetch-lock in-flight for so we can defer any additional calls
967            for the same record.
968          */
969         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
970         if (ctdb_db->deferred_fetch == NULL) {
971                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
972                 talloc_free(ctdb_db);
973                 return -1;
974         }
975
976         DLIST_ADD(ctdb->db_list, ctdb_db);
977
978         /* setting this can help some high churn databases */
979         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
980
981         /* 
982            all databases support the "null" function. we need this in
983            order to do forced migration of records
984         */
985         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
986         if (ret != 0) {
987                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
988                 talloc_free(ctdb_db);
989                 return -1;
990         }
991
992         /* 
993            all databases support the "fetch" function. we need this
994            for efficient Samba3 ctdb fetch
995         */
996         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
997         if (ret != 0) {
998                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
999                 talloc_free(ctdb_db);
1000                 return -1;
1001         }
1002
1003         /* 
1004            all databases support the "fetch_with_header" function. we need this
1005            for efficient readonly record fetches
1006         */
1007         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
1008         if (ret != 0) {
1009                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
1010                 talloc_free(ctdb_db);
1011                 return -1;
1012         }
1013
1014         ret = ctdb_vacuum_init(ctdb_db);
1015         if (ret != 0) {
1016                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
1017                                   "database '%s'\n", ctdb_db->db_name));
1018                 talloc_free(ctdb_db);
1019                 return -1;
1020         }
1021
1022
1023         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1024         
1025         /* success */
1026         return 0;
1027 }
1028
1029
1030 struct ctdb_deferred_attach_context {
1031         struct ctdb_deferred_attach_context *next, *prev;
1032         struct ctdb_context *ctdb;
1033         struct ctdb_req_control *c;
1034 };
1035
1036
1037 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1038 {
1039         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1040
1041         return 0;
1042 }
1043
1044 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1045 {
1046         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1047         struct ctdb_context *ctdb = da_ctx->ctdb;
1048
1049         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1050         talloc_free(da_ctx);
1051 }
1052
1053 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1054 {
1055         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1056         struct ctdb_context *ctdb = da_ctx->ctdb;
1057
1058         /* This talloc-steals the packet ->c */
1059         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1060         talloc_free(da_ctx);
1061 }
1062
1063 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1064 {
1065         struct ctdb_deferred_attach_context *da_ctx;
1066
1067         /* call it from the main event loop as soon as the current event 
1068            finishes.
1069          */
1070         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1071                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1072                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1073         }
1074
1075         return 0;
1076 }
1077
1078 /*
1079   a client has asked to attach a new database
1080  */
1081 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1082                                TDB_DATA *outdata, uint64_t tdb_flags, 
1083                                bool persistent, uint32_t client_id,
1084                                struct ctdb_req_control *c,
1085                                bool *async_reply)
1086 {
1087         const char *db_name = (const char *)indata.dptr;
1088         struct ctdb_db_context *db;
1089         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1090         struct ctdb_client *client = NULL;
1091
1092         if (ctdb->tunable.allow_client_db_attach == 0) {
1093                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1094                                   "AllowClientDBAccess == 0\n", db_name));
1095                 return -1;
1096         }
1097
1098         /* dont allow any local clients to attach while we are in recovery mode
1099          * except for the recovery daemon.
1100          * allow all attach from the network since these are always from remote
1101          * recovery daemons.
1102          */
1103         if (client_id != 0) {
1104                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1105         }
1106         if (client != NULL) {
1107                 /* If the node is inactive it is not part of the cluster
1108                    and we should not allow clients to attach to any
1109                    databases
1110                 */
1111                 if (node->flags & NODE_FLAGS_INACTIVE) {
1112                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1113                         return -1;
1114                 }
1115
1116                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1117                  && client->pid != ctdb->recoverd_pid
1118                  && !ctdb->done_startup) {
1119                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1120
1121                         if (da_ctx == NULL) {
1122                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1123                                 return -1;
1124                         }
1125
1126                         da_ctx->ctdb = ctdb;
1127                         da_ctx->c = talloc_steal(da_ctx, c);
1128                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1129                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1130
1131                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1132
1133                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1134                         *async_reply = true;
1135                         return 0;
1136                 }
1137         }
1138
1139         /* the client can optionally pass additional tdb flags, but we
1140            only allow a subset of those on the database in ctdb. Note
1141            that tdb_flags is passed in via the (otherwise unused)
1142            srvid to the attach control */
1143         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1144
1145         /* see if we already have this name */
1146         db = ctdb_db_handle(ctdb, db_name);
1147         if (db) {
1148                 outdata->dptr  = (uint8_t *)&db->db_id;
1149                 outdata->dsize = sizeof(db->db_id);
1150                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1151                 return 0;
1152         }
1153
1154         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1155                 return -1;
1156         }
1157
1158         db = ctdb_db_handle(ctdb, db_name);
1159         if (!db) {
1160                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1161                 return -1;
1162         }
1163
1164         /* remember the flags the client has specified */
1165         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1166
1167         outdata->dptr  = (uint8_t *)&db->db_id;
1168         outdata->dsize = sizeof(db->db_id);
1169
1170         /* Try to ensure it's locked in mem */
1171         ctdb_lockdown_memory(ctdb);
1172
1173         /* tell all the other nodes about this database */
1174         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1175                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1176                                                 CTDB_CONTROL_DB_ATTACH,
1177                                  0, CTDB_CTRL_FLAG_NOREPLY,
1178                                  indata, NULL, NULL);
1179
1180         /* success */
1181         return 0;
1182 }
1183
1184
1185 /*
1186   attach to all existing persistent databases
1187  */
1188 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1189                                   const char *unhealthy_reason)
1190 {
1191         DIR *d;
1192         struct dirent *de;
1193
1194         /* open the persistent db directory and scan it for files */
1195         d = opendir(ctdb->db_directory_persistent);
1196         if (d == NULL) {
1197                 return 0;
1198         }
1199
1200         while ((de=readdir(d))) {
1201                 char *p, *s, *q;
1202                 size_t len = strlen(de->d_name);
1203                 uint32_t node;
1204                 int invalid_name = 0;
1205                 
1206                 s = talloc_strdup(ctdb, de->d_name);
1207                 CTDB_NO_MEMORY(ctdb, s);
1208
1209                 /* only accept names ending in .tdb */
1210                 p = strstr(s, ".tdb.");
1211                 if (len < 7 || p == NULL) {
1212                         talloc_free(s);
1213                         continue;
1214                 }
1215
1216                 /* only accept names ending with .tdb. and any number of digits */
1217                 q = p+5;
1218                 while (*q != 0 && invalid_name == 0) {
1219                         if (!isdigit(*q++)) {
1220                                 invalid_name = 1;
1221                         }
1222                 }
1223                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1224                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1225                         talloc_free(s);
1226                         continue;
1227                 }
1228                 p[4] = 0;
1229
1230                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1231                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1232                         closedir(d);
1233                         talloc_free(s);
1234                         return -1;
1235                 }
1236
1237                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1238
1239                 talloc_free(s);
1240         }
1241         closedir(d);
1242         return 0;
1243 }
1244
1245 int ctdb_attach_databases(struct ctdb_context *ctdb)
1246 {
1247         int ret;
1248         char *persistent_health_path = NULL;
1249         char *unhealthy_reason = NULL;
1250         bool first_try = true;
1251
1252         if (ctdb->db_directory == NULL) {
1253                 ctdb->db_directory = VARDIR "/ctdb";
1254         }
1255         if (ctdb->db_directory_persistent == NULL) {
1256                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1257         }
1258         if (ctdb->db_directory_state == NULL) {
1259                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1260         }
1261
1262         /* make sure the db directory exists */
1263         ret = mkdir(ctdb->db_directory, 0700);
1264         if (ret == -1 && errno != EEXIST) {
1265                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1266                          ctdb->db_directory));
1267                 return -1;
1268         }
1269
1270         /* make sure the persistent db directory exists */
1271         ret = mkdir(ctdb->db_directory_persistent, 0700);
1272         if (ret == -1 && errno != EEXIST) {
1273                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1274                          ctdb->db_directory_persistent));
1275                 return -1;
1276         }
1277
1278         /* make sure the internal state db directory exists */
1279         ret = mkdir(ctdb->db_directory_state, 0700);
1280         if (ret == -1 && errno != EEXIST) {
1281                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1282                          ctdb->db_directory_state));
1283                 return -1;
1284         }
1285
1286         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1287                                                  ctdb->db_directory_state,
1288                                                  PERSISTENT_HEALTH_TDB,
1289                                                  ctdb->pnn);
1290         if (persistent_health_path == NULL) {
1291                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1292                 return -1;
1293         }
1294
1295 again:
1296
1297         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1298                                                    0, TDB_DISALLOW_NESTING,
1299                                                    O_CREAT | O_RDWR, 0600);
1300         if (ctdb->db_persistent_health == NULL) {
1301                 struct tdb_wrap *tdb;
1302
1303                 if (!first_try) {
1304                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1305                                           persistent_health_path,
1306                                           errno,
1307                                           strerror(errno)));
1308                         talloc_free(persistent_health_path);
1309                         talloc_free(unhealthy_reason);
1310                         return -1;
1311                 }
1312                 first_try = false;
1313
1314                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1315                                                    persistent_health_path,
1316                                                    "was cleared after a failure",
1317                                                    "manual verification needed");
1318                 if (unhealthy_reason == NULL) {
1319                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1320                         talloc_free(persistent_health_path);
1321                         return -1;
1322                 }
1323
1324                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1325                                   persistent_health_path));
1326                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1327                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1328                                     O_CREAT | O_RDWR, 0600);
1329                 if (tdb) {
1330                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1331                                           persistent_health_path,
1332                                           errno,
1333                                           strerror(errno)));
1334                         talloc_free(persistent_health_path);
1335                         talloc_free(unhealthy_reason);
1336                         return -1;
1337                 }
1338
1339                 talloc_free(tdb);
1340                 goto again;
1341         }
1342         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1343         if (ret != 0) {
1344                 struct tdb_wrap *tdb;
1345
1346                 talloc_free(ctdb->db_persistent_health);
1347                 ctdb->db_persistent_health = NULL;
1348
1349                 if (!first_try) {
1350                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1351                                           persistent_health_path));
1352                         talloc_free(persistent_health_path);
1353                         talloc_free(unhealthy_reason);
1354                         return -1;
1355                 }
1356                 first_try = false;
1357
1358                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1359                                                    persistent_health_path,
1360                                                    "was cleared after a failure",
1361                                                    "manual verification needed");
1362                 if (unhealthy_reason == NULL) {
1363                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1364                         talloc_free(persistent_health_path);
1365                         return -1;
1366                 }
1367
1368                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1369                                   persistent_health_path));
1370                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1371                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1372                                     O_CREAT | O_RDWR, 0600);
1373                 if (tdb) {
1374                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1375                                           persistent_health_path,
1376                                           errno,
1377                                           strerror(errno)));
1378                         talloc_free(persistent_health_path);
1379                         talloc_free(unhealthy_reason);
1380                         return -1;
1381                 }
1382
1383                 talloc_free(tdb);
1384                 goto again;
1385         }
1386         talloc_free(persistent_health_path);
1387
1388         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1389         talloc_free(unhealthy_reason);
1390         if (ret != 0) {
1391                 return ret;
1392         }
1393
1394         return 0;
1395 }
1396
1397 /*
1398   called when a broadcast seqnum update comes in
1399  */
1400 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1401 {
1402         struct ctdb_db_context *ctdb_db;
1403         if (srcnode == ctdb->pnn) {
1404                 /* don't update ourselves! */
1405                 return 0;
1406         }
1407
1408         ctdb_db = find_ctdb_db(ctdb, db_id);
1409         if (!ctdb_db) {
1410                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1411                 return -1;
1412         }
1413
1414         if (ctdb_db->unhealthy_reason) {
1415                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1416                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1417                 return -1;
1418         }
1419
1420         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1421         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1422         return 0;
1423 }
1424
1425 /*
1426   timer to check for seqnum changes in a ltdb and propogate them
1427  */
1428 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1429                                    struct timeval t, void *p)
1430 {
1431         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1432         struct ctdb_context *ctdb = ctdb_db->ctdb;
1433         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1434         if (new_seqnum != ctdb_db->seqnum) {
1435                 /* something has changed - propogate it */
1436                 TDB_DATA data;
1437                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1438                 data.dsize = sizeof(uint32_t);
1439                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1440                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1441                                          data, NULL, NULL);             
1442         }
1443         ctdb_db->seqnum = new_seqnum;
1444
1445         /* setup a new timer */
1446         ctdb_db->seqnum_update =
1447                 event_add_timed(ctdb->ev, ctdb_db, 
1448                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1449                                 ctdb_ltdb_seqnum_check, ctdb_db);
1450 }
1451
1452 /*
1453   enable seqnum handling on this db
1454  */
1455 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1456 {
1457         struct ctdb_db_context *ctdb_db;
1458         ctdb_db = find_ctdb_db(ctdb, db_id);
1459         if (!ctdb_db) {
1460                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1461                 return -1;
1462         }
1463
1464         if (ctdb_db->seqnum_update == NULL) {
1465                 ctdb_db->seqnum_update =
1466                         event_add_timed(ctdb->ev, ctdb_db, 
1467                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1468                                         ctdb_ltdb_seqnum_check, ctdb_db);
1469         }
1470
1471         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1472         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1473         return 0;
1474 }
1475
1476 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1477 {
1478         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1479         struct ctdb_db_context *ctdb_db;
1480
1481         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1482         if (!ctdb_db) {
1483                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1484                 return 0;
1485         }
1486
1487         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1488                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1489                 return 0;
1490         }
1491
1492         ctdb_db->priority = db_prio->priority;
1493         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1494
1495         return 0;
1496 }
1497