Add a tunable "AllowClientDBAttach" with default value 1.
[sahlberg/ctdb.git] / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /*
36   this is the dummy null procedure that all databases support
37 */
38 static int ctdb_null_func(struct ctdb_call_info *call)
39 {
40         return 0;
41 }
42
43 /*
44   this is a plain fetch procedure that all databases support
45 */
46 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 {
48         call->reply_data = &call->record_data;
49         return 0;
50 }
51
52
53 /**
54  * write a record to a normal database
55  *
56  * This is the server-variant of the ctdb_ltdb_store function.
57  * It contains logic to determine whether a record should be
58  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
59  * controls to the local ctdb daemon if apporpriate.
60  */
61 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
62                                   TDB_DATA key,
63                                   struct ctdb_ltdb_header *header,
64                                   TDB_DATA data)
65 {
66         struct ctdb_context *ctdb = ctdb_db->ctdb;
67         TDB_DATA rec;
68         int ret;
69         bool seqnum_suppressed = false;
70         bool keep = false;
71         bool schedule_for_deletion = false;
72         uint32_t lmaster;
73
74         if (ctdb->flags & CTDB_FLAG_TORTURE) {
75                 struct ctdb_ltdb_header *h2;
76                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
77                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
78                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
79                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
80                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
81                 }
82                 if (rec.dptr) free(rec.dptr);
83         }
84
85         if (ctdb->vnn_map == NULL) {
86                 /*
87                  * Called from a client: always store the record
88                  * Also don't call ctdb_lmaster since it uses the vnn_map!
89                  */
90                 keep = true;
91                 goto store;
92         }
93
94         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
95
96         /*
97          * If we migrate an empty record off to another node
98          * and the record has not been migrated with data,
99          * delete the record instead of storing the empty record.
100          */
101         if (data.dsize != 0) {
102                 keep = true;
103         } else if (ctdb_db->persistent) {
104                 keep = true;
105         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
106                 /*
107                  * The record is not created by the client but
108                  * automatically by the ctdb_ltdb_fetch logic that
109                  * creates a record with an initial header in the
110                  * ltdb before trying to migrate the record from
111                  * the current lmaster. Keep it instead of trying
112                  * to delete the non-existing record...
113                  */
114                 keep = true;
115                 schedule_for_deletion = true;
116         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
117                 keep = true;
118         } else if (ctdb_db->ctdb->pnn == lmaster) {
119                 /*
120                  * If we are lmaster, then we usually keep the record.
121                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
122                  * and the record is empty and has never been migrated
123                  * with data, then we should delete it instead of storing it.
124                  * This is part of the vacuuming process.
125                  *
126                  * The reason that we usually need to store even empty records
127                  * on the lmaster is that a client operating directly on the
128                  * lmaster (== dmaster) expects the local copy of the record to
129                  * exist after successful ctdb migrate call. If the record does
130                  * not exist, the client goes into a migrate loop and eventually
131                  * fails. So storing the empty record makes sure that we do not
132                  * need to change the client code.
133                  */
134                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
135                         keep = true;
136                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
137                         keep = true;
138                 }
139         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
140                 keep = true;
141         }
142
143         if (keep &&
144             (data.dsize == 0) &&
145             !ctdb_db->persistent &&
146             (ctdb_db->ctdb->pnn == header->dmaster))
147         {
148                 schedule_for_deletion = true;
149         }
150
151 store:
152         /*
153          * The VACUUM_MIGRATED flag is only set temporarily for
154          * the above logic when the record was retrieved by a
155          * VACUUM_MIGRATE call and should not be stored in the
156          * database.
157          *
158          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
159          * and there are two cases in which the corresponding record
160          * is stored in the local database:
161          * 1. The record has been migrated with data in the past
162          *    (the MIGRATED_WITH_DATA record flag is set).
163          * 2. The record has been filled with data again since it
164          *    had been submitted in the VACUUM_FETCH message to the
165          *    lmaster.
166          * For such records it is important to not store the
167          * VACUUM_MIGRATED flag in the database.
168          */
169         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
170
171         /*
172          * Similarly, clear the AUTOMATIC flag which should not enter
173          * the local database copy since this would require client
174          * modifications to clear the flag when the client stores
175          * the record.
176          */
177         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
178
179         rec.dsize = sizeof(*header) + data.dsize;
180         rec.dptr = talloc_size(ctdb, rec.dsize);
181         CTDB_NO_MEMORY(ctdb, rec.dptr);
182
183         memcpy(rec.dptr, header, sizeof(*header));
184         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
185
186         /* Databases with seqnum updates enabled only get their seqnum
187            changes when/if we modify the data */
188         if (ctdb_db->seqnum_update != NULL) {
189                 TDB_DATA old;
190                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
191
192                 if ( (old.dsize == rec.dsize)
193                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
194                           rec.dptr+sizeof(struct ctdb_ltdb_header),
195                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
196                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
197                         seqnum_suppressed = true;
198                 }
199                 if (old.dptr) free(old.dptr);
200         }
201
202         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
203                             ctdb_db->db_name,
204                             keep?"storing":"deleting",
205                             ctdb_hash(&key)));
206
207         if (keep) {
208                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
209         } else {
210                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
211         }
212
213         if (ret != 0) {
214                 int lvl = DEBUG_ERR;
215
216                 if (keep == false &&
217                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
218                 {
219                         lvl = DEBUG_DEBUG;
220                 }
221
222                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
223                             "%d - %s\n",
224                             ctdb_db->db_name,
225                             keep?"store":"delete", ret,
226                             tdb_errorstr(ctdb_db->ltdb->tdb)));
227
228                 schedule_for_deletion = false;
229         }
230         if (seqnum_suppressed) {
231                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
232         }
233
234         talloc_free(rec.dptr);
235
236         if (schedule_for_deletion) {
237                 int ret2;
238                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
239                 if (ret != 0) {
240                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
241                 }
242         }
243
244         return ret;
245 }
246
247 struct lock_fetch_state {
248         struct ctdb_context *ctdb;
249         void (*recv_pkt)(void *, struct ctdb_req_header *);
250         void *recv_context;
251         struct ctdb_req_header *hdr;
252         uint32_t generation;
253         bool ignore_generation;
254 };
255
256 /*
257   called when we should retry the operation
258  */
259 static void lock_fetch_callback(void *p)
260 {
261         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
262         if (!state->ignore_generation &&
263             state->generation != state->ctdb->vnn_map->generation) {
264                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
265                 talloc_free(state->hdr);
266                 return;
267         }
268         state->recv_pkt(state->recv_context, state->hdr);
269         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
270 }
271
272
273 /*
274   do a non-blocking ltdb_lock, deferring this ctdb request until we
275   have the chainlock
276
277   It does the following:
278
279    1) tries to get the chainlock. If it succeeds, then it returns 0
280
281    2) if it fails to get a chainlock immediately then it sets up a
282    non-blocking chainlock via ctdb_lockwait, and when it gets the
283    chainlock it re-submits this ctdb request to the main packet
284    receive function
285
286    This effectively queues all ctdb requests that cannot be
287    immediately satisfied until it can get the lock. This means that
288    the main ctdb daemon will not block waiting for a chainlock held by
289    a client
290
291    There are 3 possible return values:
292
293        0:    means that it got the lock immediately.
294       -1:    means that it failed to get the lock, and won't retry
295       -2:    means that it failed to get the lock immediately, but will retry
296  */
297 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
298                            TDB_DATA key, struct ctdb_req_header *hdr,
299                            void (*recv_pkt)(void *, struct ctdb_req_header *),
300                            void *recv_context, bool ignore_generation)
301 {
302         int ret;
303         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
304         struct lockwait_handle *h;
305         struct lock_fetch_state *state;
306         
307         ret = tdb_chainlock_nonblock(tdb, key);
308
309         if (ret != 0 &&
310             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
311                 /* a hard failure - don't try again */
312                 return -1;
313         }
314
315         /* when torturing, ensure we test the contended path */
316         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
317             random() % 5 == 0) {
318                 ret = -1;
319                 tdb_chainunlock(tdb, key);
320         }
321
322         /* first the non-contended path */
323         if (ret == 0) {
324                 return 0;
325         }
326
327         state = talloc(hdr, struct lock_fetch_state);
328         state->ctdb = ctdb_db->ctdb;
329         state->hdr = hdr;
330         state->recv_pkt = recv_pkt;
331         state->recv_context = recv_context;
332         state->generation = ctdb_db->ctdb->vnn_map->generation;
333         state->ignore_generation = ignore_generation;
334
335         /* now the contended path */
336         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
337         if (h == NULL) {
338                 return -1;
339         }
340
341         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
342            so it won't be freed yet */
343         talloc_steal(state, hdr);
344         talloc_steal(state, h);
345
346         /* now tell the caller than we will retry asynchronously */
347         return -2;
348 }
349
350 /*
351   a varient of ctdb_ltdb_lock_requeue that also fetches the record
352  */
353 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
354                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
355                                  struct ctdb_req_header *hdr, TDB_DATA *data,
356                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
357                                  void *recv_context, bool ignore_generation)
358 {
359         int ret;
360
361         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
362                                      recv_context, ignore_generation);
363         if (ret == 0) {
364                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
365                 if (ret != 0) {
366                         int uret;
367                         uret = ctdb_ltdb_unlock(ctdb_db, key);
368                         if (uret != 0) {
369                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
370                         }
371                 }
372         }
373         return ret;
374 }
375
376
377 /*
378   paraoid check to see if the db is empty
379  */
380 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
381 {
382         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
383         int count = tdb_traverse_read(tdb, NULL, NULL);
384         if (count != 0) {
385                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
386                          ctdb_db->db_path));
387                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
388         }
389 }
390
391 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
392                                 struct ctdb_db_context *ctdb_db)
393 {
394         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
395         char *old;
396         char *reason = NULL;
397         TDB_DATA key;
398         TDB_DATA val;
399
400         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
401         key.dsize = strlen(ctdb_db->db_name);
402
403         old = ctdb_db->unhealthy_reason;
404         ctdb_db->unhealthy_reason = NULL;
405
406         val = tdb_fetch(tdb, key);
407         if (val.dsize > 0) {
408                 reason = talloc_strndup(ctdb_db,
409                                         (const char *)val.dptr,
410                                         val.dsize);
411                 if (reason == NULL) {
412                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
413                                            (int)val.dsize));
414                         ctdb_db->unhealthy_reason = old;
415                         free(val.dptr);
416                         return -1;
417                 }
418         }
419
420         if (val.dptr) {
421                 free(val.dptr);
422         }
423
424         talloc_free(old);
425         ctdb_db->unhealthy_reason = reason;
426         return 0;
427 }
428
429 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
430                                   struct ctdb_db_context *ctdb_db,
431                                   const char *given_reason,/* NULL means healthy */
432                                   int num_healthy_nodes)
433 {
434         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
435         int ret;
436         TDB_DATA key;
437         TDB_DATA val;
438         char *new_reason = NULL;
439         char *old_reason = NULL;
440
441         ret = tdb_transaction_start(tdb);
442         if (ret != 0) {
443                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
444                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
445                 return -1;
446         }
447
448         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
449         if (ret != 0) {
450                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
451                                    ctdb_db->db_name, ret));
452                 return -1;
453         }
454         old_reason = ctdb_db->unhealthy_reason;
455
456         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
457         key.dsize = strlen(ctdb_db->db_name);
458
459         if (given_reason) {
460                 new_reason = talloc_strdup(ctdb_db, given_reason);
461                 if (new_reason == NULL) {
462                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
463                                           given_reason));
464                         return -1;
465                 }
466         } else if (old_reason && num_healthy_nodes == 0) {
467                 /*
468                  * If the reason indicates ok, but there where no healthy nodes
469                  * available, that it means, we have not recovered valid content
470                  * of the db. So if there's an old reason, prefix it with
471                  * "NO-HEALTHY-NODES - "
472                  */
473                 const char *prefix;
474
475 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
476                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
477                 if (ret != 0) {
478                         prefix = _TMP_PREFIX;
479                 } else {
480                         prefix = "";
481                 }
482                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
483                                          prefix, old_reason);
484                 if (new_reason == NULL) {
485                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
486                                           prefix, old_reason));
487                         return -1;
488                 }
489 #undef _TMP_PREFIX
490         }
491
492         if (new_reason) {
493                 val.dptr = discard_const_p(uint8_t, new_reason);
494                 val.dsize = strlen(new_reason);
495
496                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
497                 if (ret != 0) {
498                         tdb_transaction_cancel(tdb);
499                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
500                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
501                                            ret, tdb_errorstr(tdb)));
502                         talloc_free(new_reason);
503                         return -1;
504                 }
505                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
506                                    ctdb_db->db_name, new_reason));
507         } else if (old_reason) {
508                 ret = tdb_delete(tdb, key);
509                 if (ret != 0) {
510                         tdb_transaction_cancel(tdb);
511                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
512                                            tdb_name(tdb), ctdb_db->db_name,
513                                            ret, tdb_errorstr(tdb)));
514                         talloc_free(new_reason);
515                         return -1;
516                 }
517                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
518                                    ctdb_db->db_name));
519         }
520
521         ret = tdb_transaction_commit(tdb);
522         if (ret != TDB_SUCCESS) {
523                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
524                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
525                 talloc_free(new_reason);
526                 return -1;
527         }
528
529         talloc_free(old_reason);
530         ctdb_db->unhealthy_reason = new_reason;
531
532         return 0;
533 }
534
535 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
536                                      struct ctdb_db_context *ctdb_db)
537 {
538         time_t now = time(NULL);
539         char *new_path;
540         char *new_reason;
541         int ret;
542         struct tm *tm;
543
544         tm = gmtime(&now);
545
546         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
547         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
548                                    "%04u%02u%02u%02u%02u%02u.0Z",
549                                    ctdb_db->db_path,
550                                    tm->tm_year+1900, tm->tm_mon+1,
551                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
552                                    tm->tm_sec);
553         if (new_path == NULL) {
554                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
555                 return -1;
556         }
557
558         new_reason = talloc_asprintf(ctdb_db,
559                                      "ERROR - Backup of corrupted TDB in '%s'",
560                                      new_path);
561         if (new_reason == NULL) {
562                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
563                 return -1;
564         }
565         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
566         talloc_free(new_reason);
567         if (ret != 0) {
568                 DEBUG(DEBUG_CRIT,(__location__
569                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
570                                  ctdb_db->db_path));
571                 return -1;
572         }
573
574         ret = rename(ctdb_db->db_path, new_path);
575         if (ret != 0) {
576                 DEBUG(DEBUG_CRIT,(__location__
577                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
578                                   ctdb_db->db_path, new_path,
579                                   errno, strerror(errno)));
580                 talloc_free(new_path);
581                 return -1;
582         }
583
584         DEBUG(DEBUG_CRIT,(__location__
585                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
586                          ctdb_db->db_path, new_path));
587         talloc_free(new_path);
588         return 0;
589 }
590
591 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
592 {
593         struct ctdb_db_context *ctdb_db;
594         int ret;
595         int ok = 0;
596         int fail = 0;
597
598         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
599                 if (!ctdb_db->persistent) {
600                         continue;
601                 }
602
603                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
604                 if (ret != 0) {
605                         DEBUG(DEBUG_ALERT,(__location__
606                                            " load persistent health for '%s' failed\n",
607                                            ctdb_db->db_path));
608                         return -1;
609                 }
610
611                 if (ctdb_db->unhealthy_reason == NULL) {
612                         ok++;
613                         DEBUG(DEBUG_INFO,(__location__
614                                    " persistent db '%s' healthy\n",
615                                    ctdb_db->db_path));
616                         continue;
617                 }
618
619                 fail++;
620                 DEBUG(DEBUG_ALERT,(__location__
621                                    " persistent db '%s' unhealthy: %s\n",
622                                    ctdb_db->db_path,
623                                    ctdb_db->unhealthy_reason));
624         }
625         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
626               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
627                ok, fail));
628
629         if (fail != 0) {
630                 return -1;
631         }
632
633         return 0;
634 }
635
636
637 /*
638   mark a database - as healthy
639  */
640 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
641 {
642         uint32_t db_id = *(uint32_t *)indata.dptr;
643         struct ctdb_db_context *ctdb_db;
644         int ret;
645         bool may_recover = false;
646
647         ctdb_db = find_ctdb_db(ctdb, db_id);
648         if (!ctdb_db) {
649                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
650                 return -1;
651         }
652
653         if (ctdb_db->unhealthy_reason) {
654                 may_recover = true;
655         }
656
657         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
658         if (ret != 0) {
659                 DEBUG(DEBUG_ERR,(__location__
660                                  " ctdb_update_persistent_health(%s) failed\n",
661                                  ctdb_db->db_name));
662                 return -1;
663         }
664
665         if (may_recover && !ctdb->done_startup) {
666                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
667                                   ctdb_db->db_name));
668                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
669         }
670
671         return 0;
672 }
673
674 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
675                                    TDB_DATA indata,
676                                    TDB_DATA *outdata)
677 {
678         uint32_t db_id = *(uint32_t *)indata.dptr;
679         struct ctdb_db_context *ctdb_db;
680         int ret;
681
682         ctdb_db = find_ctdb_db(ctdb, db_id);
683         if (!ctdb_db) {
684                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
685                 return -1;
686         }
687
688         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
689         if (ret != 0) {
690                 DEBUG(DEBUG_ERR,(__location__
691                                  " ctdb_load_persistent_health(%s) failed\n",
692                                  ctdb_db->db_name));
693                 return -1;
694         }
695
696         *outdata = tdb_null;
697         if (ctdb_db->unhealthy_reason) {
698                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
699                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
700         }
701
702         return 0;
703 }
704
705 /*
706   attach to a database, handling both persistent and non-persistent databases
707   return 0 on success, -1 on failure
708  */
709 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
710                              bool persistent, const char *unhealthy_reason,
711                              bool jenkinshash)
712 {
713         struct ctdb_db_context *ctdb_db, *tmp_db;
714         int ret;
715         struct TDB_DATA key;
716         unsigned tdb_flags;
717         int mode = 0600;
718         int remaining_tries = 0;
719
720         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
721         CTDB_NO_MEMORY(ctdb, ctdb_db);
722
723         ctdb_db->priority = 1;
724         ctdb_db->ctdb = ctdb;
725         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
726         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
727
728         key.dsize = strlen(db_name)+1;
729         key.dptr  = discard_const(db_name);
730         ctdb_db->db_id = ctdb_hash(&key);
731         ctdb_db->persistent = persistent;
732
733         if (!ctdb_db->persistent) {
734                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
735                 if (ctdb_db->delete_queue == NULL) {
736                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
737                 }
738
739                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
740         }
741
742         /* check for hash collisions */
743         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
744                 if (tmp_db->db_id == ctdb_db->db_id) {
745                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
746                                  tmp_db->db_id, db_name, tmp_db->db_name));
747                         talloc_free(ctdb_db);
748                         return -1;
749                 }
750         }
751
752         if (persistent) {
753                 if (unhealthy_reason) {
754                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
755                                                             unhealthy_reason, 0);
756                         if (ret != 0) {
757                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
758                                                    ctdb_db->db_name, unhealthy_reason, ret));
759                                 talloc_free(ctdb_db);
760                                 return -1;
761                         }
762                 }
763
764                 if (ctdb->max_persistent_check_errors > 0) {
765                         remaining_tries = 1;
766                 }
767                 if (ctdb->done_startup) {
768                         remaining_tries = 0;
769                 }
770
771                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
772                 if (ret != 0) {
773                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
774                                    ctdb_db->db_name, ret));
775                         talloc_free(ctdb_db);
776                         return -1;
777                 }
778         }
779
780         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
781                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
782                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
783                 talloc_free(ctdb_db);
784                 return -1;
785         }
786
787         if (ctdb_db->unhealthy_reason) {
788                 /* this is just a warning, but we want that in the log file! */
789                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
790                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
791         }
792
793         /* open the database */
794         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
795                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
796                                            db_name, ctdb->pnn);
797
798         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
799         if (ctdb->valgrinding) {
800                 tdb_flags |= TDB_NOMMAP;
801         }
802         tdb_flags |= TDB_DISALLOW_NESTING;
803         if (jenkinshash) {
804                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
805         }
806
807 again:
808         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
809                                       ctdb->tunable.database_hash_size, 
810                                       tdb_flags, 
811                                       O_CREAT|O_RDWR, mode);
812         if (ctdb_db->ltdb == NULL) {
813                 struct stat st;
814                 int saved_errno = errno;
815
816                 if (!persistent) {
817                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
818                                           ctdb_db->db_path,
819                                           saved_errno,
820                                           strerror(saved_errno)));
821                         talloc_free(ctdb_db);
822                         return -1;
823                 }
824
825                 if (remaining_tries == 0) {
826                         DEBUG(DEBUG_CRIT,(__location__
827                                           "Failed to open persistent tdb '%s': %d - %s\n",
828                                           ctdb_db->db_path,
829                                           saved_errno,
830                                           strerror(saved_errno)));
831                         talloc_free(ctdb_db);
832                         return -1;
833                 }
834
835                 ret = stat(ctdb_db->db_path, &st);
836                 if (ret != 0) {
837                         DEBUG(DEBUG_CRIT,(__location__
838                                           "Failed to open persistent tdb '%s': %d - %s\n",
839                                           ctdb_db->db_path,
840                                           saved_errno,
841                                           strerror(saved_errno)));
842                         talloc_free(ctdb_db);
843                         return -1;
844                 }
845
846                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
847                 if (ret != 0) {
848                         DEBUG(DEBUG_CRIT,(__location__
849                                           "Failed to open persistent tdb '%s': %d - %s\n",
850                                           ctdb_db->db_path,
851                                           saved_errno,
852                                           strerror(saved_errno)));
853                         talloc_free(ctdb_db);
854                         return -1;
855                 }
856
857                 remaining_tries--;
858                 mode = st.st_mode;
859                 goto again;
860         }
861
862         if (!persistent) {
863                 ctdb_check_db_empty(ctdb_db);
864         } else {
865                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
866                 if (ret != 0) {
867                         int fd;
868                         struct stat st;
869
870                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
871                                           ctdb_db->db_path, ret,
872                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
873                         if (remaining_tries == 0) {
874                                 talloc_free(ctdb_db);
875                                 return -1;
876                         }
877
878                         fd = tdb_fd(ctdb_db->ltdb->tdb);
879                         ret = fstat(fd, &st);
880                         if (ret != 0) {
881                                 DEBUG(DEBUG_CRIT,(__location__
882                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
883                                                   ctdb_db->db_path,
884                                                   errno,
885                                                   strerror(errno)));
886                                 talloc_free(ctdb_db);
887                                 return -1;
888                         }
889
890                         /* close the TDB */
891                         talloc_free(ctdb_db->ltdb);
892                         ctdb_db->ltdb = NULL;
893
894                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
895                         if (ret != 0) {
896                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
897                                                   ctdb_db->db_path));
898                                 talloc_free(ctdb_db);
899                                 return -1;
900                         }
901
902                         remaining_tries--;
903                         mode = st.st_mode;
904                         goto again;
905                 }
906         }
907
908         DLIST_ADD(ctdb->db_list, ctdb_db);
909
910         /* setting this can help some high churn databases */
911         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
912
913         /* 
914            all databases support the "null" function. we need this in
915            order to do forced migration of records
916         */
917         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
918         if (ret != 0) {
919                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
920                 talloc_free(ctdb_db);
921                 return -1;
922         }
923
924         /* 
925            all databases support the "fetch" function. we need this
926            for efficient Samba3 ctdb fetch
927         */
928         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
929         if (ret != 0) {
930                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
931                 talloc_free(ctdb_db);
932                 return -1;
933         }
934
935         ret = ctdb_vacuum_init(ctdb_db);
936         if (ret != 0) {
937                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
938                                   "database '%s'\n", ctdb_db->db_name));
939                 talloc_free(ctdb_db);
940                 return -1;
941         }
942
943
944         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
945         
946         /* success */
947         return 0;
948 }
949
950
951 struct ctdb_deferred_attach_context {
952         struct ctdb_deferred_attach_context *next, *prev;
953         struct ctdb_context *ctdb;
954         struct ctdb_req_control *c;
955 };
956
957
958 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
959 {
960         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
961
962         return 0;
963 }
964
965 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
966 {
967         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
968         struct ctdb_context *ctdb = da_ctx->ctdb;
969
970         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
971         talloc_free(da_ctx);
972 }
973
974 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
975 {
976         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
977         struct ctdb_context *ctdb = da_ctx->ctdb;
978
979         /* This talloc-steals the packet ->c */
980         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
981         talloc_free(da_ctx);
982 }
983
984 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
985 {
986         struct ctdb_deferred_attach_context *da_ctx;
987
988         /* call it from the main event loop as soon as the current event 
989            finishes.
990          */
991         while ((da_ctx = ctdb->deferred_attach) != NULL) {
992                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
993                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
994         }
995
996         return 0;
997 }
998
999 /*
1000   a client has asked to attach a new database
1001  */
1002 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1003                                TDB_DATA *outdata, uint64_t tdb_flags, 
1004                                bool persistent, uint32_t client_id,
1005                                struct ctdb_req_control *c,
1006                                bool *async_reply)
1007 {
1008         const char *db_name = (const char *)indata.dptr;
1009         struct ctdb_db_context *db;
1010         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1011         struct ctdb_client *client = NULL;
1012
1013         if (ctdb->tunable.allow_client_db_attach == 0) {
1014                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1015                                   "AllowClientDBAccess == 0\n", db_name));
1016                 return -1;
1017         }
1018
1019         /* dont allow any local clients to attach while we are in recovery mode
1020          * except for the recovery daemon.
1021          * allow all attach from the network since these are always from remote
1022          * recovery daemons.
1023          */
1024         if (client_id != 0) {
1025                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1026         }
1027         if (client != NULL) {
1028                 /* If the node is inactive it is not part of the cluster
1029                    and we should not allow clients to attach to any
1030                    databases
1031                 */
1032                 if (node->flags & NODE_FLAGS_INACTIVE) {
1033                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1034                         return -1;
1035                 }
1036
1037                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1038                  && client->pid != ctdb->recoverd_pid
1039                  && !ctdb->done_startup) {
1040                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1041
1042                         if (da_ctx == NULL) {
1043                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1044                                 return -1;
1045                         }
1046
1047                         da_ctx->ctdb = ctdb;
1048                         da_ctx->c = talloc_steal(da_ctx, c);
1049                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1050                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1051
1052                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1053
1054                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1055                         *async_reply = true;
1056                         return 0;
1057                 }
1058         }
1059
1060         /* the client can optionally pass additional tdb flags, but we
1061            only allow a subset of those on the database in ctdb. Note
1062            that tdb_flags is passed in via the (otherwise unused)
1063            srvid to the attach control */
1064         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1065
1066         /* see if we already have this name */
1067         db = ctdb_db_handle(ctdb, db_name);
1068         if (db) {
1069                 outdata->dptr  = (uint8_t *)&db->db_id;
1070                 outdata->dsize = sizeof(db->db_id);
1071                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1072                 return 0;
1073         }
1074
1075         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1076                 return -1;
1077         }
1078
1079         db = ctdb_db_handle(ctdb, db_name);
1080         if (!db) {
1081                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1082                 return -1;
1083         }
1084
1085         /* remember the flags the client has specified */
1086         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1087
1088         outdata->dptr  = (uint8_t *)&db->db_id;
1089         outdata->dsize = sizeof(db->db_id);
1090
1091         /* Try to ensure it's locked in mem */
1092         ctdb_lockdown_memory(ctdb);
1093
1094         /* tell all the other nodes about this database */
1095         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1096                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1097                                                 CTDB_CONTROL_DB_ATTACH,
1098                                  0, CTDB_CTRL_FLAG_NOREPLY,
1099                                  indata, NULL, NULL);
1100
1101         /* success */
1102         return 0;
1103 }
1104
1105
1106 /*
1107   attach to all existing persistent databases
1108  */
1109 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1110                                   const char *unhealthy_reason)
1111 {
1112         DIR *d;
1113         struct dirent *de;
1114
1115         /* open the persistent db directory and scan it for files */
1116         d = opendir(ctdb->db_directory_persistent);
1117         if (d == NULL) {
1118                 return 0;
1119         }
1120
1121         while ((de=readdir(d))) {
1122                 char *p, *s, *q;
1123                 size_t len = strlen(de->d_name);
1124                 uint32_t node;
1125                 int invalid_name = 0;
1126                 
1127                 s = talloc_strdup(ctdb, de->d_name);
1128                 CTDB_NO_MEMORY(ctdb, s);
1129
1130                 /* only accept names ending in .tdb */
1131                 p = strstr(s, ".tdb.");
1132                 if (len < 7 || p == NULL) {
1133                         talloc_free(s);
1134                         continue;
1135                 }
1136
1137                 /* only accept names ending with .tdb. and any number of digits */
1138                 q = p+5;
1139                 while (*q != 0 && invalid_name == 0) {
1140                         if (!isdigit(*q++)) {
1141                                 invalid_name = 1;
1142                         }
1143                 }
1144                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1145                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1146                         talloc_free(s);
1147                         continue;
1148                 }
1149                 p[4] = 0;
1150
1151                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1152                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1153                         closedir(d);
1154                         talloc_free(s);
1155                         return -1;
1156                 }
1157
1158                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1159
1160                 talloc_free(s);
1161         }
1162         closedir(d);
1163         return 0;
1164 }
1165
1166 int ctdb_attach_databases(struct ctdb_context *ctdb)
1167 {
1168         int ret;
1169         char *persistent_health_path = NULL;
1170         char *unhealthy_reason = NULL;
1171         bool first_try = true;
1172
1173         if (ctdb->db_directory == NULL) {
1174                 ctdb->db_directory = VARDIR "/ctdb";
1175         }
1176         if (ctdb->db_directory_persistent == NULL) {
1177                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1178         }
1179         if (ctdb->db_directory_state == NULL) {
1180                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1181         }
1182
1183         /* make sure the db directory exists */
1184         ret = mkdir(ctdb->db_directory, 0700);
1185         if (ret == -1 && errno != EEXIST) {
1186                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1187                          ctdb->db_directory));
1188                 return -1;
1189         }
1190
1191         /* make sure the persistent db directory exists */
1192         ret = mkdir(ctdb->db_directory_persistent, 0700);
1193         if (ret == -1 && errno != EEXIST) {
1194                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1195                          ctdb->db_directory_persistent));
1196                 return -1;
1197         }
1198
1199         /* make sure the internal state db directory exists */
1200         ret = mkdir(ctdb->db_directory_state, 0700);
1201         if (ret == -1 && errno != EEXIST) {
1202                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1203                          ctdb->db_directory_state));
1204                 return -1;
1205         }
1206
1207         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1208                                                  ctdb->db_directory_state,
1209                                                  PERSISTENT_HEALTH_TDB,
1210                                                  ctdb->pnn);
1211         if (persistent_health_path == NULL) {
1212                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1213                 return -1;
1214         }
1215
1216 again:
1217
1218         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1219                                                    0, TDB_DISALLOW_NESTING,
1220                                                    O_CREAT | O_RDWR, 0600);
1221         if (ctdb->db_persistent_health == NULL) {
1222                 struct tdb_wrap *tdb;
1223
1224                 if (!first_try) {
1225                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1226                                           persistent_health_path,
1227                                           errno,
1228                                           strerror(errno)));
1229                         talloc_free(persistent_health_path);
1230                         talloc_free(unhealthy_reason);
1231                         return -1;
1232                 }
1233                 first_try = false;
1234
1235                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1236                                                    persistent_health_path,
1237                                                    "was cleared after a failure",
1238                                                    "manual verification needed");
1239                 if (unhealthy_reason == NULL) {
1240                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1241                         talloc_free(persistent_health_path);
1242                         return -1;
1243                 }
1244
1245                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1246                                   persistent_health_path));
1247                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1248                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1249                                     O_CREAT | O_RDWR, 0600);
1250                 if (tdb) {
1251                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1252                                           persistent_health_path,
1253                                           errno,
1254                                           strerror(errno)));
1255                         talloc_free(persistent_health_path);
1256                         talloc_free(unhealthy_reason);
1257                         return -1;
1258                 }
1259
1260                 talloc_free(tdb);
1261                 goto again;
1262         }
1263         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1264         if (ret != 0) {
1265                 struct tdb_wrap *tdb;
1266
1267                 talloc_free(ctdb->db_persistent_health);
1268                 ctdb->db_persistent_health = NULL;
1269
1270                 if (!first_try) {
1271                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1272                                           persistent_health_path));
1273                         talloc_free(persistent_health_path);
1274                         talloc_free(unhealthy_reason);
1275                         return -1;
1276                 }
1277                 first_try = false;
1278
1279                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1280                                                    persistent_health_path,
1281                                                    "was cleared after a failure",
1282                                                    "manual verification needed");
1283                 if (unhealthy_reason == NULL) {
1284                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1285                         talloc_free(persistent_health_path);
1286                         return -1;
1287                 }
1288
1289                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1290                                   persistent_health_path));
1291                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1292                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1293                                     O_CREAT | O_RDWR, 0600);
1294                 if (tdb) {
1295                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1296                                           persistent_health_path,
1297                                           errno,
1298                                           strerror(errno)));
1299                         talloc_free(persistent_health_path);
1300                         talloc_free(unhealthy_reason);
1301                         return -1;
1302                 }
1303
1304                 talloc_free(tdb);
1305                 goto again;
1306         }
1307         talloc_free(persistent_health_path);
1308
1309         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1310         talloc_free(unhealthy_reason);
1311         if (ret != 0) {
1312                 return ret;
1313         }
1314
1315         return 0;
1316 }
1317
1318 /*
1319   called when a broadcast seqnum update comes in
1320  */
1321 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1322 {
1323         struct ctdb_db_context *ctdb_db;
1324         if (srcnode == ctdb->pnn) {
1325                 /* don't update ourselves! */
1326                 return 0;
1327         }
1328
1329         ctdb_db = find_ctdb_db(ctdb, db_id);
1330         if (!ctdb_db) {
1331                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1332                 return -1;
1333         }
1334
1335         if (ctdb_db->unhealthy_reason) {
1336                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1337                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1338                 return -1;
1339         }
1340
1341         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1342         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1343         return 0;
1344 }
1345
1346 /*
1347   timer to check for seqnum changes in a ltdb and propogate them
1348  */
1349 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1350                                    struct timeval t, void *p)
1351 {
1352         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1353         struct ctdb_context *ctdb = ctdb_db->ctdb;
1354         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1355         if (new_seqnum != ctdb_db->seqnum) {
1356                 /* something has changed - propogate it */
1357                 TDB_DATA data;
1358                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1359                 data.dsize = sizeof(uint32_t);
1360                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1361                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1362                                          data, NULL, NULL);             
1363         }
1364         ctdb_db->seqnum = new_seqnum;
1365
1366         /* setup a new timer */
1367         ctdb_db->seqnum_update =
1368                 event_add_timed(ctdb->ev, ctdb_db, 
1369                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1370                                 ctdb_ltdb_seqnum_check, ctdb_db);
1371 }
1372
1373 /*
1374   enable seqnum handling on this db
1375  */
1376 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1377 {
1378         struct ctdb_db_context *ctdb_db;
1379         ctdb_db = find_ctdb_db(ctdb, db_id);
1380         if (!ctdb_db) {
1381                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1382                 return -1;
1383         }
1384
1385         if (ctdb_db->seqnum_update == NULL) {
1386                 ctdb_db->seqnum_update =
1387                         event_add_timed(ctdb->ev, ctdb_db, 
1388                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1389                                         ctdb_ltdb_seqnum_check, ctdb_db);
1390         }
1391
1392         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1393         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1394         return 0;
1395 }
1396
1397 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1398 {
1399         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1400         struct ctdb_db_context *ctdb_db;
1401
1402         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1403         if (!ctdb_db) {
1404                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1405                 return 0;
1406         }
1407
1408         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1409                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1410                 return 0;
1411         }
1412
1413         ctdb_db->priority = db_prio->priority;
1414         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1415
1416         return 0;
1417 }
1418