Merge remote branch 'martins/ctdb_control_oom'
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /**
36  * write a record to a normal database
37  *
38  * This is the server-variant of the ctdb_ltdb_store function.
39  * It contains logic to determine whether a record should be
40  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41  * controls to the local ctdb daemon if apporpriate.
42  */
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44                                   TDB_DATA key,
45                                   struct ctdb_ltdb_header *header,
46                                   TDB_DATA data)
47 {
48         struct ctdb_context *ctdb = ctdb_db->ctdb;
49         TDB_DATA rec;
50         int ret;
51         bool seqnum_suppressed = false;
52         bool keep = false;
53         bool schedule_for_deletion = false;
54         uint32_t lmaster;
55
56         if (ctdb->flags & CTDB_FLAG_TORTURE) {
57                 struct ctdb_ltdb_header *h2;
58                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
59                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
60                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
61                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
62                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
63                 }
64                 if (rec.dptr) free(rec.dptr);
65         }
66
67         if (ctdb->vnn_map == NULL) {
68                 /*
69                  * Called from a client: always store the record
70                  * Also don't call ctdb_lmaster since it uses the vnn_map!
71                  */
72                 keep = true;
73                 goto store;
74         }
75
76         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
77
78         /*
79          * If we migrate an empty record off to another node
80          * and the record has not been migrated with data,
81          * delete the record instead of storing the empty record.
82          */
83         if (data.dsize != 0) {
84                 keep = true;
85         } else if (ctdb_db->persistent) {
86                 keep = true;
87         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
88                 /*
89                  * The record is not created by the client but
90                  * automatically by the ctdb_ltdb_fetch logic that
91                  * creates a record with an initial header in the
92                  * ltdb before trying to migrate the record from
93                  * the current lmaster. Keep it instead of trying
94                  * to delete the non-existing record...
95                  */
96                 keep = true;
97                 schedule_for_deletion = true;
98         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
99                 keep = true;
100         } else if (ctdb_db->ctdb->pnn == lmaster) {
101                 /*
102                  * If we are lmaster, then we usually keep the record.
103                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
104                  * and the record is empty and has never been migrated
105                  * with data, then we should delete it instead of storing it.
106                  * This is part of the vacuuming process.
107                  *
108                  * The reason that we usually need to store even empty records
109                  * on the lmaster is that a client operating directly on the
110                  * lmaster (== dmaster) expects the local copy of the record to
111                  * exist after successful ctdb migrate call. If the record does
112                  * not exist, the client goes into a migrate loop and eventually
113                  * fails. So storing the empty record makes sure that we do not
114                  * need to change the client code.
115                  */
116                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
117                         keep = true;
118                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
119                         keep = true;
120                 }
121         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
122                 keep = true;
123         }
124
125         if (keep &&
126             (data.dsize == 0) &&
127             !ctdb_db->persistent &&
128             (ctdb_db->ctdb->pnn == header->dmaster))
129         {
130                 schedule_for_deletion = true;
131         }
132
133 store:
134         /*
135          * The VACUUM_MIGRATED flag is only set temporarily for
136          * the above logic when the record was retrieved by a
137          * VACUUM_MIGRATE call and should not be stored in the
138          * database.
139          *
140          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
141          * and there are two cases in which the corresponding record
142          * is stored in the local database:
143          * 1. The record has been migrated with data in the past
144          *    (the MIGRATED_WITH_DATA record flag is set).
145          * 2. The record has been filled with data again since it
146          *    had been submitted in the VACUUM_FETCH message to the
147          *    lmaster.
148          * For such records it is important to not store the
149          * VACUUM_MIGRATED flag in the database.
150          */
151         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
152
153         /*
154          * Similarly, clear the AUTOMATIC flag which should not enter
155          * the local database copy since this would require client
156          * modifications to clear the flag when the client stores
157          * the record.
158          */
159         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
160
161         rec.dsize = sizeof(*header) + data.dsize;
162         rec.dptr = talloc_size(ctdb, rec.dsize);
163         CTDB_NO_MEMORY(ctdb, rec.dptr);
164
165         memcpy(rec.dptr, header, sizeof(*header));
166         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
167
168         /* Databases with seqnum updates enabled only get their seqnum
169            changes when/if we modify the data */
170         if (ctdb_db->seqnum_update != NULL) {
171                 TDB_DATA old;
172                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
173
174                 if ( (old.dsize == rec.dsize)
175                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
176                           rec.dptr+sizeof(struct ctdb_ltdb_header),
177                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
178                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
179                         seqnum_suppressed = true;
180                 }
181                 if (old.dptr) free(old.dptr);
182         }
183
184         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
185                             ctdb_db->db_name,
186                             keep?"storing":"deleting",
187                             ctdb_hash(&key)));
188
189         if (keep) {
190                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
191         } else {
192                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
193         }
194
195         if (ret != 0) {
196                 int lvl = DEBUG_ERR;
197
198                 if (keep == false &&
199                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
200                 {
201                         lvl = DEBUG_DEBUG;
202                 }
203
204                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
205                             "%d - %s\n",
206                             ctdb_db->db_name,
207                             keep?"store":"delete", ret,
208                             tdb_errorstr(ctdb_db->ltdb->tdb)));
209
210                 schedule_for_deletion = false;
211         }
212         if (seqnum_suppressed) {
213                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
214         }
215
216         talloc_free(rec.dptr);
217
218         if (schedule_for_deletion) {
219                 int ret2;
220                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
221                 if (ret2 != 0) {
222                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
223                 }
224         }
225
226         return ret;
227 }
228
229 struct lock_fetch_state {
230         struct ctdb_context *ctdb;
231         void (*recv_pkt)(void *, struct ctdb_req_header *);
232         void *recv_context;
233         struct ctdb_req_header *hdr;
234         uint32_t generation;
235         bool ignore_generation;
236 };
237
238 /*
239   called when we should retry the operation
240  */
241 static void lock_fetch_callback(void *p)
242 {
243         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
244         if (!state->ignore_generation &&
245             state->generation != state->ctdb->vnn_map->generation) {
246                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
247                 talloc_free(state->hdr);
248                 return;
249         }
250         state->recv_pkt(state->recv_context, state->hdr);
251         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
252 }
253
254
255 /*
256   do a non-blocking ltdb_lock, deferring this ctdb request until we
257   have the chainlock
258
259   It does the following:
260
261    1) tries to get the chainlock. If it succeeds, then it returns 0
262
263    2) if it fails to get a chainlock immediately then it sets up a
264    non-blocking chainlock via ctdb_lockwait, and when it gets the
265    chainlock it re-submits this ctdb request to the main packet
266    receive function
267
268    This effectively queues all ctdb requests that cannot be
269    immediately satisfied until it can get the lock. This means that
270    the main ctdb daemon will not block waiting for a chainlock held by
271    a client
272
273    There are 3 possible return values:
274
275        0:    means that it got the lock immediately.
276       -1:    means that it failed to get the lock, and won't retry
277       -2:    means that it failed to get the lock immediately, but will retry
278  */
279 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
280                            TDB_DATA key, struct ctdb_req_header *hdr,
281                            void (*recv_pkt)(void *, struct ctdb_req_header *),
282                            void *recv_context, bool ignore_generation)
283 {
284         int ret;
285         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
286         struct lockwait_handle *h;
287         struct lock_fetch_state *state;
288         
289         ret = tdb_chainlock_nonblock(tdb, key);
290
291         if (ret != 0 &&
292             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
293                 /* a hard failure - don't try again */
294                 return -1;
295         }
296
297         /* when torturing, ensure we test the contended path */
298         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
299             random() % 5 == 0) {
300                 ret = -1;
301                 tdb_chainunlock(tdb, key);
302         }
303
304         /* first the non-contended path */
305         if (ret == 0) {
306                 return 0;
307         }
308
309         state = talloc(hdr, struct lock_fetch_state);
310         state->ctdb = ctdb_db->ctdb;
311         state->hdr = hdr;
312         state->recv_pkt = recv_pkt;
313         state->recv_context = recv_context;
314         state->generation = ctdb_db->ctdb->vnn_map->generation;
315         state->ignore_generation = ignore_generation;
316
317         /* now the contended path */
318         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
319         if (h == NULL) {
320                 return -1;
321         }
322
323         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
324            so it won't be freed yet */
325         talloc_steal(state, hdr);
326         talloc_steal(state, h);
327
328         /* now tell the caller than we will retry asynchronously */
329         return -2;
330 }
331
332 /*
333   a varient of ctdb_ltdb_lock_requeue that also fetches the record
334  */
335 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
336                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
337                                  struct ctdb_req_header *hdr, TDB_DATA *data,
338                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
339                                  void *recv_context, bool ignore_generation)
340 {
341         int ret;
342
343         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
344                                      recv_context, ignore_generation);
345         if (ret == 0) {
346                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
347                 if (ret != 0) {
348                         int uret;
349                         uret = ctdb_ltdb_unlock(ctdb_db, key);
350                         if (uret != 0) {
351                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
352                         }
353                 }
354         }
355         return ret;
356 }
357
358
359 /*
360   paraoid check to see if the db is empty
361  */
362 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
363 {
364         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
365         int count = tdb_traverse_read(tdb, NULL, NULL);
366         if (count != 0) {
367                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
368                          ctdb_db->db_path));
369                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
370         }
371 }
372
373 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
374                                 struct ctdb_db_context *ctdb_db)
375 {
376         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
377         char *old;
378         char *reason = NULL;
379         TDB_DATA key;
380         TDB_DATA val;
381
382         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
383         key.dsize = strlen(ctdb_db->db_name);
384
385         old = ctdb_db->unhealthy_reason;
386         ctdb_db->unhealthy_reason = NULL;
387
388         val = tdb_fetch(tdb, key);
389         if (val.dsize > 0) {
390                 reason = talloc_strndup(ctdb_db,
391                                         (const char *)val.dptr,
392                                         val.dsize);
393                 if (reason == NULL) {
394                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
395                                            (int)val.dsize));
396                         ctdb_db->unhealthy_reason = old;
397                         free(val.dptr);
398                         return -1;
399                 }
400         }
401
402         if (val.dptr) {
403                 free(val.dptr);
404         }
405
406         talloc_free(old);
407         ctdb_db->unhealthy_reason = reason;
408         return 0;
409 }
410
411 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
412                                   struct ctdb_db_context *ctdb_db,
413                                   const char *given_reason,/* NULL means healthy */
414                                   int num_healthy_nodes)
415 {
416         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
417         int ret;
418         TDB_DATA key;
419         TDB_DATA val;
420         char *new_reason = NULL;
421         char *old_reason = NULL;
422
423         ret = tdb_transaction_start(tdb);
424         if (ret != 0) {
425                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
426                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
427                 return -1;
428         }
429
430         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
431         if (ret != 0) {
432                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
433                                    ctdb_db->db_name, ret));
434                 return -1;
435         }
436         old_reason = ctdb_db->unhealthy_reason;
437
438         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
439         key.dsize = strlen(ctdb_db->db_name);
440
441         if (given_reason) {
442                 new_reason = talloc_strdup(ctdb_db, given_reason);
443                 if (new_reason == NULL) {
444                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
445                                           given_reason));
446                         return -1;
447                 }
448         } else if (old_reason && num_healthy_nodes == 0) {
449                 /*
450                  * If the reason indicates ok, but there where no healthy nodes
451                  * available, that it means, we have not recovered valid content
452                  * of the db. So if there's an old reason, prefix it with
453                  * "NO-HEALTHY-NODES - "
454                  */
455                 const char *prefix;
456
457 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
458                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
459                 if (ret != 0) {
460                         prefix = _TMP_PREFIX;
461                 } else {
462                         prefix = "";
463                 }
464                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
465                                          prefix, old_reason);
466                 if (new_reason == NULL) {
467                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
468                                           prefix, old_reason));
469                         return -1;
470                 }
471 #undef _TMP_PREFIX
472         }
473
474         if (new_reason) {
475                 val.dptr = discard_const_p(uint8_t, new_reason);
476                 val.dsize = strlen(new_reason);
477
478                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
479                 if (ret != 0) {
480                         tdb_transaction_cancel(tdb);
481                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
482                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
483                                            ret, tdb_errorstr(tdb)));
484                         talloc_free(new_reason);
485                         return -1;
486                 }
487                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
488                                    ctdb_db->db_name, new_reason));
489         } else if (old_reason) {
490                 ret = tdb_delete(tdb, key);
491                 if (ret != 0) {
492                         tdb_transaction_cancel(tdb);
493                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
494                                            tdb_name(tdb), ctdb_db->db_name,
495                                            ret, tdb_errorstr(tdb)));
496                         talloc_free(new_reason);
497                         return -1;
498                 }
499                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
500                                    ctdb_db->db_name));
501         }
502
503         ret = tdb_transaction_commit(tdb);
504         if (ret != TDB_SUCCESS) {
505                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
506                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
507                 talloc_free(new_reason);
508                 return -1;
509         }
510
511         talloc_free(old_reason);
512         ctdb_db->unhealthy_reason = new_reason;
513
514         return 0;
515 }
516
517 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
518                                      struct ctdb_db_context *ctdb_db)
519 {
520         time_t now = time(NULL);
521         char *new_path;
522         char *new_reason;
523         int ret;
524         struct tm *tm;
525
526         tm = gmtime(&now);
527
528         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
529         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
530                                    "%04u%02u%02u%02u%02u%02u.0Z",
531                                    ctdb_db->db_path,
532                                    tm->tm_year+1900, tm->tm_mon+1,
533                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
534                                    tm->tm_sec);
535         if (new_path == NULL) {
536                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
537                 return -1;
538         }
539
540         new_reason = talloc_asprintf(ctdb_db,
541                                      "ERROR - Backup of corrupted TDB in '%s'",
542                                      new_path);
543         if (new_reason == NULL) {
544                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
545                 return -1;
546         }
547         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
548         talloc_free(new_reason);
549         if (ret != 0) {
550                 DEBUG(DEBUG_CRIT,(__location__
551                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
552                                  ctdb_db->db_path));
553                 return -1;
554         }
555
556         ret = rename(ctdb_db->db_path, new_path);
557         if (ret != 0) {
558                 DEBUG(DEBUG_CRIT,(__location__
559                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
560                                   ctdb_db->db_path, new_path,
561                                   errno, strerror(errno)));
562                 talloc_free(new_path);
563                 return -1;
564         }
565
566         DEBUG(DEBUG_CRIT,(__location__
567                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
568                          ctdb_db->db_path, new_path));
569         talloc_free(new_path);
570         return 0;
571 }
572
573 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
574 {
575         struct ctdb_db_context *ctdb_db;
576         int ret;
577         int ok = 0;
578         int fail = 0;
579
580         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
581                 if (!ctdb_db->persistent) {
582                         continue;
583                 }
584
585                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
586                 if (ret != 0) {
587                         DEBUG(DEBUG_ALERT,(__location__
588                                            " load persistent health for '%s' failed\n",
589                                            ctdb_db->db_path));
590                         return -1;
591                 }
592
593                 if (ctdb_db->unhealthy_reason == NULL) {
594                         ok++;
595                         DEBUG(DEBUG_INFO,(__location__
596                                    " persistent db '%s' healthy\n",
597                                    ctdb_db->db_path));
598                         continue;
599                 }
600
601                 fail++;
602                 DEBUG(DEBUG_ALERT,(__location__
603                                    " persistent db '%s' unhealthy: %s\n",
604                                    ctdb_db->db_path,
605                                    ctdb_db->unhealthy_reason));
606         }
607         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
608               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
609                ok, fail));
610
611         if (fail != 0) {
612                 return -1;
613         }
614
615         return 0;
616 }
617
618
619 /*
620   mark a database - as healthy
621  */
622 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
623 {
624         uint32_t db_id = *(uint32_t *)indata.dptr;
625         struct ctdb_db_context *ctdb_db;
626         int ret;
627         bool may_recover = false;
628
629         ctdb_db = find_ctdb_db(ctdb, db_id);
630         if (!ctdb_db) {
631                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
632                 return -1;
633         }
634
635         if (ctdb_db->unhealthy_reason) {
636                 may_recover = true;
637         }
638
639         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
640         if (ret != 0) {
641                 DEBUG(DEBUG_ERR,(__location__
642                                  " ctdb_update_persistent_health(%s) failed\n",
643                                  ctdb_db->db_name));
644                 return -1;
645         }
646
647         if (may_recover && !ctdb->done_startup) {
648                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
649                                   ctdb_db->db_name));
650                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
651         }
652
653         return 0;
654 }
655
656 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
657                                    TDB_DATA indata,
658                                    TDB_DATA *outdata)
659 {
660         uint32_t db_id = *(uint32_t *)indata.dptr;
661         struct ctdb_db_context *ctdb_db;
662         int ret;
663
664         ctdb_db = find_ctdb_db(ctdb, db_id);
665         if (!ctdb_db) {
666                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
667                 return -1;
668         }
669
670         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
671         if (ret != 0) {
672                 DEBUG(DEBUG_ERR,(__location__
673                                  " ctdb_load_persistent_health(%s) failed\n",
674                                  ctdb_db->db_name));
675                 return -1;
676         }
677
678         *outdata = tdb_null;
679         if (ctdb_db->unhealthy_reason) {
680                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
681                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
682         }
683
684         return 0;
685 }
686
687
688 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
689 {
690         char *ropath;
691
692         if (ctdb_db->readonly) {
693                 return 0;
694         }
695
696         if (ctdb_db->persistent) {
697                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
698                 return -1;
699         }
700
701         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
702         if (ropath == NULL) {
703                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
704                 return -1;
705         }
706         ctdb_db->rottdb = tdb_open(ropath, 
707                               ctdb->tunable.database_hash_size, 
708                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
709                               O_CREAT|O_RDWR, 0);
710         if (ctdb_db->rottdb == NULL) {
711                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
712                 talloc_free(ropath);
713                 return -1;
714         }
715
716         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
717
718         ctdb_db->readonly = true;
719         talloc_free(ropath);
720         return 0;
721 }
722
723 /*
724   attach to a database, handling both persistent and non-persistent databases
725   return 0 on success, -1 on failure
726  */
727 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
728                              bool persistent, const char *unhealthy_reason,
729                              bool jenkinshash)
730 {
731         struct ctdb_db_context *ctdb_db, *tmp_db;
732         int ret;
733         struct TDB_DATA key;
734         unsigned tdb_flags;
735         int mode = 0600;
736         int remaining_tries = 0;
737
738         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
739         CTDB_NO_MEMORY(ctdb, ctdb_db);
740
741         ctdb_db->priority = 1;
742         ctdb_db->ctdb = ctdb;
743         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
744         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
745
746         key.dsize = strlen(db_name)+1;
747         key.dptr  = discard_const(db_name);
748         ctdb_db->db_id = ctdb_hash(&key);
749         ctdb_db->persistent = persistent;
750
751         if (!ctdb_db->persistent) {
752                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
753                 if (ctdb_db->delete_queue == NULL) {
754                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
755                 }
756
757                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
758         }
759
760         /* check for hash collisions */
761         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
762                 if (tmp_db->db_id == ctdb_db->db_id) {
763                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
764                                  tmp_db->db_id, db_name, tmp_db->db_name));
765                         talloc_free(ctdb_db);
766                         return -1;
767                 }
768         }
769
770         if (persistent) {
771                 if (unhealthy_reason) {
772                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
773                                                             unhealthy_reason, 0);
774                         if (ret != 0) {
775                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
776                                                    ctdb_db->db_name, unhealthy_reason, ret));
777                                 talloc_free(ctdb_db);
778                                 return -1;
779                         }
780                 }
781
782                 if (ctdb->max_persistent_check_errors > 0) {
783                         remaining_tries = 1;
784                 }
785                 if (ctdb->done_startup) {
786                         remaining_tries = 0;
787                 }
788
789                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
790                 if (ret != 0) {
791                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
792                                    ctdb_db->db_name, ret));
793                         talloc_free(ctdb_db);
794                         return -1;
795                 }
796         }
797
798         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
799                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
800                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
801                 talloc_free(ctdb_db);
802                 return -1;
803         }
804
805         if (ctdb_db->unhealthy_reason) {
806                 /* this is just a warning, but we want that in the log file! */
807                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
808                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
809         }
810
811         /* open the database */
812         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
813                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
814                                            db_name, ctdb->pnn);
815
816         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
817         if (ctdb->valgrinding) {
818                 tdb_flags |= TDB_NOMMAP;
819         }
820         tdb_flags |= TDB_DISALLOW_NESTING;
821         if (jenkinshash) {
822                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
823         }
824
825 again:
826         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
827                                       ctdb->tunable.database_hash_size, 
828                                       tdb_flags, 
829                                       O_CREAT|O_RDWR, mode);
830         if (ctdb_db->ltdb == NULL) {
831                 struct stat st;
832                 int saved_errno = errno;
833
834                 if (!persistent) {
835                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
836                                           ctdb_db->db_path,
837                                           saved_errno,
838                                           strerror(saved_errno)));
839                         talloc_free(ctdb_db);
840                         return -1;
841                 }
842
843                 if (remaining_tries == 0) {
844                         DEBUG(DEBUG_CRIT,(__location__
845                                           "Failed to open persistent tdb '%s': %d - %s\n",
846                                           ctdb_db->db_path,
847                                           saved_errno,
848                                           strerror(saved_errno)));
849                         talloc_free(ctdb_db);
850                         return -1;
851                 }
852
853                 ret = stat(ctdb_db->db_path, &st);
854                 if (ret != 0) {
855                         DEBUG(DEBUG_CRIT,(__location__
856                                           "Failed to open persistent tdb '%s': %d - %s\n",
857                                           ctdb_db->db_path,
858                                           saved_errno,
859                                           strerror(saved_errno)));
860                         talloc_free(ctdb_db);
861                         return -1;
862                 }
863
864                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
865                 if (ret != 0) {
866                         DEBUG(DEBUG_CRIT,(__location__
867                                           "Failed to open persistent tdb '%s': %d - %s\n",
868                                           ctdb_db->db_path,
869                                           saved_errno,
870                                           strerror(saved_errno)));
871                         talloc_free(ctdb_db);
872                         return -1;
873                 }
874
875                 remaining_tries--;
876                 mode = st.st_mode;
877                 goto again;
878         }
879
880         if (!persistent) {
881                 ctdb_check_db_empty(ctdb_db);
882         } else {
883                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
884                 if (ret != 0) {
885                         int fd;
886                         struct stat st;
887
888                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
889                                           ctdb_db->db_path, ret,
890                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
891                         if (remaining_tries == 0) {
892                                 talloc_free(ctdb_db);
893                                 return -1;
894                         }
895
896                         fd = tdb_fd(ctdb_db->ltdb->tdb);
897                         ret = fstat(fd, &st);
898                         if (ret != 0) {
899                                 DEBUG(DEBUG_CRIT,(__location__
900                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
901                                                   ctdb_db->db_path,
902                                                   errno,
903                                                   strerror(errno)));
904                                 talloc_free(ctdb_db);
905                                 return -1;
906                         }
907
908                         /* close the TDB */
909                         talloc_free(ctdb_db->ltdb);
910                         ctdb_db->ltdb = NULL;
911
912                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
913                         if (ret != 0) {
914                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
915                                                   ctdb_db->db_path));
916                                 talloc_free(ctdb_db);
917                                 return -1;
918                         }
919
920                         remaining_tries--;
921                         mode = st.st_mode;
922                         goto again;
923                 }
924         }
925
926         /* set up a rb tree we can use to track which records we have a 
927            fetch-lock in-flight for so we can defer any additional calls
928            for the same record.
929          */
930         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
931         if (ctdb_db->deferred_fetch == NULL) {
932                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
933                 talloc_free(ctdb_db);
934                 return -1;
935         }
936
937         DLIST_ADD(ctdb->db_list, ctdb_db);
938
939         /* setting this can help some high churn databases */
940         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
941
942         /* 
943            all databases support the "null" function. we need this in
944            order to do forced migration of records
945         */
946         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
947         if (ret != 0) {
948                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
949                 talloc_free(ctdb_db);
950                 return -1;
951         }
952
953         /* 
954            all databases support the "fetch" function. we need this
955            for efficient Samba3 ctdb fetch
956         */
957         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
958         if (ret != 0) {
959                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
960                 talloc_free(ctdb_db);
961                 return -1;
962         }
963
964         /* 
965            all databases support the "fetch_with_header" function. we need this
966            for efficient readonly record fetches
967         */
968         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
969         if (ret != 0) {
970                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
971                 talloc_free(ctdb_db);
972                 return -1;
973         }
974
975         ret = ctdb_vacuum_init(ctdb_db);
976         if (ret != 0) {
977                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
978                                   "database '%s'\n", ctdb_db->db_name));
979                 talloc_free(ctdb_db);
980                 return -1;
981         }
982
983
984         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
985         
986         /* success */
987         return 0;
988 }
989
990
991 struct ctdb_deferred_attach_context {
992         struct ctdb_deferred_attach_context *next, *prev;
993         struct ctdb_context *ctdb;
994         struct ctdb_req_control *c;
995 };
996
997
998 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
999 {
1000         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1001
1002         return 0;
1003 }
1004
1005 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1006 {
1007         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1008         struct ctdb_context *ctdb = da_ctx->ctdb;
1009
1010         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1011         talloc_free(da_ctx);
1012 }
1013
1014 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1015 {
1016         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1017         struct ctdb_context *ctdb = da_ctx->ctdb;
1018
1019         /* This talloc-steals the packet ->c */
1020         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1021         talloc_free(da_ctx);
1022 }
1023
1024 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1025 {
1026         struct ctdb_deferred_attach_context *da_ctx;
1027
1028         /* call it from the main event loop as soon as the current event 
1029            finishes.
1030          */
1031         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1032                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1033                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1034         }
1035
1036         return 0;
1037 }
1038
1039 /*
1040   a client has asked to attach a new database
1041  */
1042 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1043                                TDB_DATA *outdata, uint64_t tdb_flags, 
1044                                bool persistent, uint32_t client_id,
1045                                struct ctdb_req_control *c,
1046                                bool *async_reply)
1047 {
1048         const char *db_name = (const char *)indata.dptr;
1049         struct ctdb_db_context *db;
1050         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1051         struct ctdb_client *client = NULL;
1052
1053         if (ctdb->tunable.allow_client_db_attach == 0) {
1054                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1055                                   "AllowClientDBAccess == 0\n", db_name));
1056                 return -1;
1057         }
1058
1059         /* dont allow any local clients to attach while we are in recovery mode
1060          * except for the recovery daemon.
1061          * allow all attach from the network since these are always from remote
1062          * recovery daemons.
1063          */
1064         if (client_id != 0) {
1065                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1066         }
1067         if (client != NULL) {
1068                 /* If the node is inactive it is not part of the cluster
1069                    and we should not allow clients to attach to any
1070                    databases
1071                 */
1072                 if (node->flags & NODE_FLAGS_INACTIVE) {
1073                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1074                         return -1;
1075                 }
1076
1077                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1078                  && client->pid != ctdb->recoverd_pid
1079                  && !ctdb->done_startup) {
1080                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1081
1082                         if (da_ctx == NULL) {
1083                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1084                                 return -1;
1085                         }
1086
1087                         da_ctx->ctdb = ctdb;
1088                         da_ctx->c = talloc_steal(da_ctx, c);
1089                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1090                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1091
1092                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1093
1094                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1095                         *async_reply = true;
1096                         return 0;
1097                 }
1098         }
1099
1100         /* the client can optionally pass additional tdb flags, but we
1101            only allow a subset of those on the database in ctdb. Note
1102            that tdb_flags is passed in via the (otherwise unused)
1103            srvid to the attach control */
1104         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1105
1106         /* see if we already have this name */
1107         db = ctdb_db_handle(ctdb, db_name);
1108         if (db) {
1109                 outdata->dptr  = (uint8_t *)&db->db_id;
1110                 outdata->dsize = sizeof(db->db_id);
1111                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1112                 return 0;
1113         }
1114
1115         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1116                 return -1;
1117         }
1118
1119         db = ctdb_db_handle(ctdb, db_name);
1120         if (!db) {
1121                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1122                 return -1;
1123         }
1124
1125         /* remember the flags the client has specified */
1126         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1127
1128         outdata->dptr  = (uint8_t *)&db->db_id;
1129         outdata->dsize = sizeof(db->db_id);
1130
1131         /* Try to ensure it's locked in mem */
1132         ctdb_lockdown_memory(ctdb);
1133
1134         /* tell all the other nodes about this database */
1135         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1136                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1137                                                 CTDB_CONTROL_DB_ATTACH,
1138                                  0, CTDB_CTRL_FLAG_NOREPLY,
1139                                  indata, NULL, NULL);
1140
1141         /* success */
1142         return 0;
1143 }
1144
1145
1146 /*
1147   attach to all existing persistent databases
1148  */
1149 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1150                                   const char *unhealthy_reason)
1151 {
1152         DIR *d;
1153         struct dirent *de;
1154
1155         /* open the persistent db directory and scan it for files */
1156         d = opendir(ctdb->db_directory_persistent);
1157         if (d == NULL) {
1158                 return 0;
1159         }
1160
1161         while ((de=readdir(d))) {
1162                 char *p, *s, *q;
1163                 size_t len = strlen(de->d_name);
1164                 uint32_t node;
1165                 int invalid_name = 0;
1166                 
1167                 s = talloc_strdup(ctdb, de->d_name);
1168                 CTDB_NO_MEMORY(ctdb, s);
1169
1170                 /* only accept names ending in .tdb */
1171                 p = strstr(s, ".tdb.");
1172                 if (len < 7 || p == NULL) {
1173                         talloc_free(s);
1174                         continue;
1175                 }
1176
1177                 /* only accept names ending with .tdb. and any number of digits */
1178                 q = p+5;
1179                 while (*q != 0 && invalid_name == 0) {
1180                         if (!isdigit(*q++)) {
1181                                 invalid_name = 1;
1182                         }
1183                 }
1184                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1185                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1186                         talloc_free(s);
1187                         continue;
1188                 }
1189                 p[4] = 0;
1190
1191                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1192                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1193                         closedir(d);
1194                         talloc_free(s);
1195                         return -1;
1196                 }
1197
1198                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1199
1200                 talloc_free(s);
1201         }
1202         closedir(d);
1203         return 0;
1204 }
1205
1206 int ctdb_attach_databases(struct ctdb_context *ctdb)
1207 {
1208         int ret;
1209         char *persistent_health_path = NULL;
1210         char *unhealthy_reason = NULL;
1211         bool first_try = true;
1212
1213         if (ctdb->db_directory == NULL) {
1214                 ctdb->db_directory = VARDIR "/ctdb";
1215         }
1216         if (ctdb->db_directory_persistent == NULL) {
1217                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1218         }
1219         if (ctdb->db_directory_state == NULL) {
1220                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1221         }
1222
1223         /* make sure the db directory exists */
1224         ret = mkdir(ctdb->db_directory, 0700);
1225         if (ret == -1 && errno != EEXIST) {
1226                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1227                          ctdb->db_directory));
1228                 return -1;
1229         }
1230
1231         /* make sure the persistent db directory exists */
1232         ret = mkdir(ctdb->db_directory_persistent, 0700);
1233         if (ret == -1 && errno != EEXIST) {
1234                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1235                          ctdb->db_directory_persistent));
1236                 return -1;
1237         }
1238
1239         /* make sure the internal state db directory exists */
1240         ret = mkdir(ctdb->db_directory_state, 0700);
1241         if (ret == -1 && errno != EEXIST) {
1242                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1243                          ctdb->db_directory_state));
1244                 return -1;
1245         }
1246
1247         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1248                                                  ctdb->db_directory_state,
1249                                                  PERSISTENT_HEALTH_TDB,
1250                                                  ctdb->pnn);
1251         if (persistent_health_path == NULL) {
1252                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1253                 return -1;
1254         }
1255
1256 again:
1257
1258         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1259                                                    0, TDB_DISALLOW_NESTING,
1260                                                    O_CREAT | O_RDWR, 0600);
1261         if (ctdb->db_persistent_health == NULL) {
1262                 struct tdb_wrap *tdb;
1263
1264                 if (!first_try) {
1265                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1266                                           persistent_health_path,
1267                                           errno,
1268                                           strerror(errno)));
1269                         talloc_free(persistent_health_path);
1270                         talloc_free(unhealthy_reason);
1271                         return -1;
1272                 }
1273                 first_try = false;
1274
1275                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1276                                                    persistent_health_path,
1277                                                    "was cleared after a failure",
1278                                                    "manual verification needed");
1279                 if (unhealthy_reason == NULL) {
1280                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1281                         talloc_free(persistent_health_path);
1282                         return -1;
1283                 }
1284
1285                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1286                                   persistent_health_path));
1287                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1288                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1289                                     O_CREAT | O_RDWR, 0600);
1290                 if (tdb) {
1291                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1292                                           persistent_health_path,
1293                                           errno,
1294                                           strerror(errno)));
1295                         talloc_free(persistent_health_path);
1296                         talloc_free(unhealthy_reason);
1297                         return -1;
1298                 }
1299
1300                 talloc_free(tdb);
1301                 goto again;
1302         }
1303         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1304         if (ret != 0) {
1305                 struct tdb_wrap *tdb;
1306
1307                 talloc_free(ctdb->db_persistent_health);
1308                 ctdb->db_persistent_health = NULL;
1309
1310                 if (!first_try) {
1311                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1312                                           persistent_health_path));
1313                         talloc_free(persistent_health_path);
1314                         talloc_free(unhealthy_reason);
1315                         return -1;
1316                 }
1317                 first_try = false;
1318
1319                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1320                                                    persistent_health_path,
1321                                                    "was cleared after a failure",
1322                                                    "manual verification needed");
1323                 if (unhealthy_reason == NULL) {
1324                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1325                         talloc_free(persistent_health_path);
1326                         return -1;
1327                 }
1328
1329                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1330                                   persistent_health_path));
1331                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1332                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1333                                     O_CREAT | O_RDWR, 0600);
1334                 if (tdb) {
1335                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1336                                           persistent_health_path,
1337                                           errno,
1338                                           strerror(errno)));
1339                         talloc_free(persistent_health_path);
1340                         talloc_free(unhealthy_reason);
1341                         return -1;
1342                 }
1343
1344                 talloc_free(tdb);
1345                 goto again;
1346         }
1347         talloc_free(persistent_health_path);
1348
1349         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1350         talloc_free(unhealthy_reason);
1351         if (ret != 0) {
1352                 return ret;
1353         }
1354
1355         return 0;
1356 }
1357
1358 /*
1359   called when a broadcast seqnum update comes in
1360  */
1361 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1362 {
1363         struct ctdb_db_context *ctdb_db;
1364         if (srcnode == ctdb->pnn) {
1365                 /* don't update ourselves! */
1366                 return 0;
1367         }
1368
1369         ctdb_db = find_ctdb_db(ctdb, db_id);
1370         if (!ctdb_db) {
1371                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1372                 return -1;
1373         }
1374
1375         if (ctdb_db->unhealthy_reason) {
1376                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1377                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1378                 return -1;
1379         }
1380
1381         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1382         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1383         return 0;
1384 }
1385
1386 /*
1387   timer to check for seqnum changes in a ltdb and propogate them
1388  */
1389 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1390                                    struct timeval t, void *p)
1391 {
1392         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1393         struct ctdb_context *ctdb = ctdb_db->ctdb;
1394         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1395         if (new_seqnum != ctdb_db->seqnum) {
1396                 /* something has changed - propogate it */
1397                 TDB_DATA data;
1398                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1399                 data.dsize = sizeof(uint32_t);
1400                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1401                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1402                                          data, NULL, NULL);             
1403         }
1404         ctdb_db->seqnum = new_seqnum;
1405
1406         /* setup a new timer */
1407         ctdb_db->seqnum_update =
1408                 event_add_timed(ctdb->ev, ctdb_db, 
1409                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1410                                 ctdb_ltdb_seqnum_check, ctdb_db);
1411 }
1412
1413 /*
1414   enable seqnum handling on this db
1415  */
1416 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1417 {
1418         struct ctdb_db_context *ctdb_db;
1419         ctdb_db = find_ctdb_db(ctdb, db_id);
1420         if (!ctdb_db) {
1421                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1422                 return -1;
1423         }
1424
1425         if (ctdb_db->seqnum_update == NULL) {
1426                 ctdb_db->seqnum_update =
1427                         event_add_timed(ctdb->ev, ctdb_db, 
1428                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1429                                         ctdb_ltdb_seqnum_check, ctdb_db);
1430         }
1431
1432         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1433         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1434         return 0;
1435 }
1436
1437 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1438 {
1439         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1440         struct ctdb_db_context *ctdb_db;
1441
1442         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1443         if (!ctdb_db) {
1444                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1445                 return 0;
1446         }
1447
1448         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1449                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1450                 return 0;
1451         }
1452
1453         ctdb_db->priority = db_prio->priority;
1454         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1455
1456         return 0;
1457 }
1458