Merge branch 'master' of 10.1.1.27:/shared/ctdb/ctdb-master
[samba.git] / ctdb / server / ctdb_ltdb_server.c
1 /* 
2    ctdb ltdb code - server side
3
4    Copyright (C) Andrew Tridgell  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31 #include <ctype.h>
32
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
34
35 /**
36  * write a record to a normal database
37  *
38  * This is the server-variant of the ctdb_ltdb_store function.
39  * It contains logic to determine whether a record should be
40  * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41  * controls to the local ctdb daemon if apporpriate.
42  */
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
44                                   TDB_DATA key,
45                                   struct ctdb_ltdb_header *header,
46                                   TDB_DATA data)
47 {
48         struct ctdb_context *ctdb = ctdb_db->ctdb;
49         TDB_DATA rec;
50         int ret;
51         bool seqnum_suppressed = false;
52         bool keep = false;
53         bool schedule_for_deletion = false;
54         bool remove_from_delete_queue = false;
55         uint32_t lmaster;
56
57         if (ctdb->flags & CTDB_FLAG_TORTURE) {
58                 struct ctdb_ltdb_header *h2;
59                 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60                 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61                 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62                         DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63                                  (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
64                 }
65                 if (rec.dptr) free(rec.dptr);
66         }
67
68         if (ctdb->vnn_map == NULL) {
69                 /*
70                  * Called from a client: always store the record
71                  * Also don't call ctdb_lmaster since it uses the vnn_map!
72                  */
73                 keep = true;
74                 goto store;
75         }
76
77         lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
78
79         /*
80          * If we migrate an empty record off to another node
81          * and the record has not been migrated with data,
82          * delete the record instead of storing the empty record.
83          */
84         if (data.dsize != 0) {
85                 keep = true;
86         } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
87                 keep = true;
88         } else if (ctdb_db->persistent) {
89                 keep = true;
90         } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
91                 /*
92                  * The record is not created by the client but
93                  * automatically by the ctdb_ltdb_fetch logic that
94                  * creates a record with an initial header in the
95                  * ltdb before trying to migrate the record from
96                  * the current lmaster. Keep it instead of trying
97                  * to delete the non-existing record...
98                  */
99                 keep = true;
100                 schedule_for_deletion = true;
101         } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
102                 keep = true;
103         } else if (ctdb_db->ctdb->pnn == lmaster) {
104                 /*
105                  * If we are lmaster, then we usually keep the record.
106                  * But if we retrieve the dmaster role by a VACUUM_MIGRATE
107                  * and the record is empty and has never been migrated
108                  * with data, then we should delete it instead of storing it.
109                  * This is part of the vacuuming process.
110                  *
111                  * The reason that we usually need to store even empty records
112                  * on the lmaster is that a client operating directly on the
113                  * lmaster (== dmaster) expects the local copy of the record to
114                  * exist after successful ctdb migrate call. If the record does
115                  * not exist, the client goes into a migrate loop and eventually
116                  * fails. So storing the empty record makes sure that we do not
117                  * need to change the client code.
118                  */
119                 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
120                         keep = true;
121                 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
122                         keep = true;
123                 }
124         } else if (ctdb_db->ctdb->pnn == header->dmaster) {
125                 keep = true;
126         }
127
128         if (keep) {
129                 if ((data.dsize == 0) &&
130                     !ctdb_db->persistent &&
131                     (ctdb_db->ctdb->pnn == header->dmaster) &&
132                     !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
133                 {
134                         schedule_for_deletion = true;
135                 }
136                 remove_from_delete_queue = !schedule_for_deletion;
137         }
138
139 store:
140         /*
141          * The VACUUM_MIGRATED flag is only set temporarily for
142          * the above logic when the record was retrieved by a
143          * VACUUM_MIGRATE call and should not be stored in the
144          * database.
145          *
146          * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
147          * and there are two cases in which the corresponding record
148          * is stored in the local database:
149          * 1. The record has been migrated with data in the past
150          *    (the MIGRATED_WITH_DATA record flag is set).
151          * 2. The record has been filled with data again since it
152          *    had been submitted in the VACUUM_FETCH message to the
153          *    lmaster.
154          * For such records it is important to not store the
155          * VACUUM_MIGRATED flag in the database.
156          */
157         header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
158
159         /*
160          * Similarly, clear the AUTOMATIC flag which should not enter
161          * the local database copy since this would require client
162          * modifications to clear the flag when the client stores
163          * the record.
164          */
165         header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
166
167         rec.dsize = sizeof(*header) + data.dsize;
168         rec.dptr = talloc_size(ctdb, rec.dsize);
169         CTDB_NO_MEMORY(ctdb, rec.dptr);
170
171         memcpy(rec.dptr, header, sizeof(*header));
172         memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
173
174         /* Databases with seqnum updates enabled only get their seqnum
175            changes when/if we modify the data */
176         if (ctdb_db->seqnum_update != NULL) {
177                 TDB_DATA old;
178                 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
179
180                 if ( (old.dsize == rec.dsize)
181                 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
182                           rec.dptr+sizeof(struct ctdb_ltdb_header),
183                           rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
184                         tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
185                         seqnum_suppressed = true;
186                 }
187                 if (old.dptr) free(old.dptr);
188         }
189
190         DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
191                             ctdb_db->db_name,
192                             keep?"storing":"deleting",
193                             ctdb_hash(&key)));
194
195         if (keep) {
196                 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
197         } else {
198                 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
199         }
200
201         if (ret != 0) {
202                 int lvl = DEBUG_ERR;
203
204                 if (keep == false &&
205                     tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
206                 {
207                         lvl = DEBUG_DEBUG;
208                 }
209
210                 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
211                             "%d - %s\n",
212                             ctdb_db->db_name,
213                             keep?"store":"delete", ret,
214                             tdb_errorstr(ctdb_db->ltdb->tdb)));
215
216                 schedule_for_deletion = false;
217                 remove_from_delete_queue = false;
218         }
219         if (seqnum_suppressed) {
220                 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
221         }
222
223         talloc_free(rec.dptr);
224
225         if (schedule_for_deletion) {
226                 int ret2;
227                 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
228                 if (ret2 != 0) {
229                         DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
230                 }
231         }
232
233         if (remove_from_delete_queue) {
234                 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
235         }
236
237         return ret;
238 }
239
240 struct lock_fetch_state {
241         struct ctdb_context *ctdb;
242         void (*recv_pkt)(void *, struct ctdb_req_header *);
243         void *recv_context;
244         struct ctdb_req_header *hdr;
245         uint32_t generation;
246         bool ignore_generation;
247 };
248
249 /*
250   called when we should retry the operation
251  */
252 static void lock_fetch_callback(void *p)
253 {
254         struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
255         if (!state->ignore_generation &&
256             state->generation != state->ctdb->vnn_map->generation) {
257                 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
258                 talloc_free(state->hdr);
259                 return;
260         }
261         state->recv_pkt(state->recv_context, state->hdr);
262         DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
263 }
264
265
266 /*
267   do a non-blocking ltdb_lock, deferring this ctdb request until we
268   have the chainlock
269
270   It does the following:
271
272    1) tries to get the chainlock. If it succeeds, then it returns 0
273
274    2) if it fails to get a chainlock immediately then it sets up a
275    non-blocking chainlock via ctdb_lockwait, and when it gets the
276    chainlock it re-submits this ctdb request to the main packet
277    receive function
278
279    This effectively queues all ctdb requests that cannot be
280    immediately satisfied until it can get the lock. This means that
281    the main ctdb daemon will not block waiting for a chainlock held by
282    a client
283
284    There are 3 possible return values:
285
286        0:    means that it got the lock immediately.
287       -1:    means that it failed to get the lock, and won't retry
288       -2:    means that it failed to get the lock immediately, but will retry
289  */
290 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db, 
291                            TDB_DATA key, struct ctdb_req_header *hdr,
292                            void (*recv_pkt)(void *, struct ctdb_req_header *),
293                            void *recv_context, bool ignore_generation)
294 {
295         int ret;
296         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
297         struct lockwait_handle *h;
298         struct lock_fetch_state *state;
299         
300         ret = tdb_chainlock_nonblock(tdb, key);
301
302         if (ret != 0 &&
303             !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
304                 /* a hard failure - don't try again */
305                 return -1;
306         }
307
308         /* when torturing, ensure we test the contended path */
309         if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
310             random() % 5 == 0) {
311                 ret = -1;
312                 tdb_chainunlock(tdb, key);
313         }
314
315         /* first the non-contended path */
316         if (ret == 0) {
317                 return 0;
318         }
319
320         state = talloc(hdr, struct lock_fetch_state);
321         state->ctdb = ctdb_db->ctdb;
322         state->hdr = hdr;
323         state->recv_pkt = recv_pkt;
324         state->recv_context = recv_context;
325         state->generation = ctdb_db->ctdb->vnn_map->generation;
326         state->ignore_generation = ignore_generation;
327
328         /* now the contended path */
329         h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
330         if (h == NULL) {
331                 return -1;
332         }
333
334         /* we need to move the packet off the temporary context in ctdb_input_pkt(),
335            so it won't be freed yet */
336         talloc_steal(state, hdr);
337         talloc_steal(state, h);
338
339         /* now tell the caller than we will retry asynchronously */
340         return -2;
341 }
342
343 /*
344   a varient of ctdb_ltdb_lock_requeue that also fetches the record
345  */
346 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db, 
347                                  TDB_DATA key, struct ctdb_ltdb_header *header, 
348                                  struct ctdb_req_header *hdr, TDB_DATA *data,
349                                  void (*recv_pkt)(void *, struct ctdb_req_header *),
350                                  void *recv_context, bool ignore_generation)
351 {
352         int ret;
353
354         ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt, 
355                                      recv_context, ignore_generation);
356         if (ret == 0) {
357                 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
358                 if (ret != 0) {
359                         int uret;
360                         uret = ctdb_ltdb_unlock(ctdb_db, key);
361                         if (uret != 0) {
362                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
363                         }
364                 }
365         }
366         return ret;
367 }
368
369
370 /*
371   paraoid check to see if the db is empty
372  */
373 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
374 {
375         struct tdb_context *tdb = ctdb_db->ltdb->tdb;
376         int count = tdb_traverse_read(tdb, NULL, NULL);
377         if (count != 0) {
378                 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
379                          ctdb_db->db_path));
380                 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
381         }
382 }
383
384 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
385                                 struct ctdb_db_context *ctdb_db)
386 {
387         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
388         char *old;
389         char *reason = NULL;
390         TDB_DATA key;
391         TDB_DATA val;
392
393         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
394         key.dsize = strlen(ctdb_db->db_name);
395
396         old = ctdb_db->unhealthy_reason;
397         ctdb_db->unhealthy_reason = NULL;
398
399         val = tdb_fetch(tdb, key);
400         if (val.dsize > 0) {
401                 reason = talloc_strndup(ctdb_db,
402                                         (const char *)val.dptr,
403                                         val.dsize);
404                 if (reason == NULL) {
405                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
406                                            (int)val.dsize));
407                         ctdb_db->unhealthy_reason = old;
408                         free(val.dptr);
409                         return -1;
410                 }
411         }
412
413         if (val.dptr) {
414                 free(val.dptr);
415         }
416
417         talloc_free(old);
418         ctdb_db->unhealthy_reason = reason;
419         return 0;
420 }
421
422 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
423                                   struct ctdb_db_context *ctdb_db,
424                                   const char *given_reason,/* NULL means healthy */
425                                   int num_healthy_nodes)
426 {
427         struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
428         int ret;
429         TDB_DATA key;
430         TDB_DATA val;
431         char *new_reason = NULL;
432         char *old_reason = NULL;
433
434         ret = tdb_transaction_start(tdb);
435         if (ret != 0) {
436                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
437                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
438                 return -1;
439         }
440
441         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
442         if (ret != 0) {
443                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
444                                    ctdb_db->db_name, ret));
445                 return -1;
446         }
447         old_reason = ctdb_db->unhealthy_reason;
448
449         key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
450         key.dsize = strlen(ctdb_db->db_name);
451
452         if (given_reason) {
453                 new_reason = talloc_strdup(ctdb_db, given_reason);
454                 if (new_reason == NULL) {
455                         DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
456                                           given_reason));
457                         return -1;
458                 }
459         } else if (old_reason && num_healthy_nodes == 0) {
460                 /*
461                  * If the reason indicates ok, but there where no healthy nodes
462                  * available, that it means, we have not recovered valid content
463                  * of the db. So if there's an old reason, prefix it with
464                  * "NO-HEALTHY-NODES - "
465                  */
466                 const char *prefix;
467
468 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
469                 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
470                 if (ret != 0) {
471                         prefix = _TMP_PREFIX;
472                 } else {
473                         prefix = "";
474                 }
475                 new_reason = talloc_asprintf(ctdb_db, "%s%s",
476                                          prefix, old_reason);
477                 if (new_reason == NULL) {
478                         DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
479                                           prefix, old_reason));
480                         return -1;
481                 }
482 #undef _TMP_PREFIX
483         }
484
485         if (new_reason) {
486                 val.dptr = discard_const_p(uint8_t, new_reason);
487                 val.dsize = strlen(new_reason);
488
489                 ret = tdb_store(tdb, key, val, TDB_REPLACE);
490                 if (ret != 0) {
491                         tdb_transaction_cancel(tdb);
492                         DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
493                                            tdb_name(tdb), ctdb_db->db_name, new_reason,
494                                            ret, tdb_errorstr(tdb)));
495                         talloc_free(new_reason);
496                         return -1;
497                 }
498                 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
499                                    ctdb_db->db_name, new_reason));
500         } else if (old_reason) {
501                 ret = tdb_delete(tdb, key);
502                 if (ret != 0) {
503                         tdb_transaction_cancel(tdb);
504                         DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
505                                            tdb_name(tdb), ctdb_db->db_name,
506                                            ret, tdb_errorstr(tdb)));
507                         talloc_free(new_reason);
508                         return -1;
509                 }
510                 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
511                                    ctdb_db->db_name));
512         }
513
514         ret = tdb_transaction_commit(tdb);
515         if (ret != TDB_SUCCESS) {
516                 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
517                                    tdb_name(tdb), ret, tdb_errorstr(tdb)));
518                 talloc_free(new_reason);
519                 return -1;
520         }
521
522         talloc_free(old_reason);
523         ctdb_db->unhealthy_reason = new_reason;
524
525         return 0;
526 }
527
528 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
529                                      struct ctdb_db_context *ctdb_db)
530 {
531         time_t now = time(NULL);
532         char *new_path;
533         char *new_reason;
534         int ret;
535         struct tm *tm;
536
537         tm = gmtime(&now);
538
539         /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
540         new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
541                                    "%04u%02u%02u%02u%02u%02u.0Z",
542                                    ctdb_db->db_path,
543                                    tm->tm_year+1900, tm->tm_mon+1,
544                                    tm->tm_mday, tm->tm_hour, tm->tm_min,
545                                    tm->tm_sec);
546         if (new_path == NULL) {
547                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
548                 return -1;
549         }
550
551         new_reason = talloc_asprintf(ctdb_db,
552                                      "ERROR - Backup of corrupted TDB in '%s'",
553                                      new_path);
554         if (new_reason == NULL) {
555                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
556                 return -1;
557         }
558         ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
559         talloc_free(new_reason);
560         if (ret != 0) {
561                 DEBUG(DEBUG_CRIT,(__location__
562                                  ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
563                                  ctdb_db->db_path));
564                 return -1;
565         }
566
567         ret = rename(ctdb_db->db_path, new_path);
568         if (ret != 0) {
569                 DEBUG(DEBUG_CRIT,(__location__
570                                   ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
571                                   ctdb_db->db_path, new_path,
572                                   errno, strerror(errno)));
573                 talloc_free(new_path);
574                 return -1;
575         }
576
577         DEBUG(DEBUG_CRIT,(__location__
578                          ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
579                          ctdb_db->db_path, new_path));
580         talloc_free(new_path);
581         return 0;
582 }
583
584 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
585 {
586         struct ctdb_db_context *ctdb_db;
587         int ret;
588         int ok = 0;
589         int fail = 0;
590
591         for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
592                 if (!ctdb_db->persistent) {
593                         continue;
594                 }
595
596                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
597                 if (ret != 0) {
598                         DEBUG(DEBUG_ALERT,(__location__
599                                            " load persistent health for '%s' failed\n",
600                                            ctdb_db->db_path));
601                         return -1;
602                 }
603
604                 if (ctdb_db->unhealthy_reason == NULL) {
605                         ok++;
606                         DEBUG(DEBUG_INFO,(__location__
607                                    " persistent db '%s' healthy\n",
608                                    ctdb_db->db_path));
609                         continue;
610                 }
611
612                 fail++;
613                 DEBUG(DEBUG_ALERT,(__location__
614                                    " persistent db '%s' unhealthy: %s\n",
615                                    ctdb_db->db_path,
616                                    ctdb_db->unhealthy_reason));
617         }
618         DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
619               ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
620                ok, fail));
621
622         if (fail != 0) {
623                 return -1;
624         }
625
626         return 0;
627 }
628
629
630 /*
631   mark a database - as healthy
632  */
633 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
634 {
635         uint32_t db_id = *(uint32_t *)indata.dptr;
636         struct ctdb_db_context *ctdb_db;
637         int ret;
638         bool may_recover = false;
639
640         ctdb_db = find_ctdb_db(ctdb, db_id);
641         if (!ctdb_db) {
642                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
643                 return -1;
644         }
645
646         if (ctdb_db->unhealthy_reason) {
647                 may_recover = true;
648         }
649
650         ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
651         if (ret != 0) {
652                 DEBUG(DEBUG_ERR,(__location__
653                                  " ctdb_update_persistent_health(%s) failed\n",
654                                  ctdb_db->db_name));
655                 return -1;
656         }
657
658         if (may_recover && !ctdb->done_startup) {
659                 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy  - force recovery for startup\n",
660                                   ctdb_db->db_name));
661                 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
662         }
663
664         return 0;
665 }
666
667 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
668                                    TDB_DATA indata,
669                                    TDB_DATA *outdata)
670 {
671         uint32_t db_id = *(uint32_t *)indata.dptr;
672         struct ctdb_db_context *ctdb_db;
673         int ret;
674
675         ctdb_db = find_ctdb_db(ctdb, db_id);
676         if (!ctdb_db) {
677                 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
678                 return -1;
679         }
680
681         ret = ctdb_load_persistent_health(ctdb, ctdb_db);
682         if (ret != 0) {
683                 DEBUG(DEBUG_ERR,(__location__
684                                  " ctdb_load_persistent_health(%s) failed\n",
685                                  ctdb_db->db_name));
686                 return -1;
687         }
688
689         *outdata = tdb_null;
690         if (ctdb_db->unhealthy_reason) {
691                 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
692                 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
693         }
694
695         return 0;
696 }
697
698
699 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
700 {
701         char *ropath;
702
703         if (ctdb_db->readonly) {
704                 return 0;
705         }
706
707         if (ctdb_db->persistent) {
708                 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
709                 return -1;
710         }
711
712         ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
713         if (ropath == NULL) {
714                 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
715                 return -1;
716         }
717         ctdb_db->rottdb = tdb_open(ropath, 
718                               ctdb->tunable.database_hash_size, 
719                               TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
720                               O_CREAT|O_RDWR, 0);
721         if (ctdb_db->rottdb == NULL) {
722                 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
723                 talloc_free(ropath);
724                 return -1;
725         }
726
727         DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
728
729         ctdb_db->readonly = true;
730         talloc_free(ropath);
731         return 0;
732 }
733
734 /*
735   attach to a database, handling both persistent and non-persistent databases
736   return 0 on success, -1 on failure
737  */
738 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
739                              bool persistent, const char *unhealthy_reason,
740                              bool jenkinshash)
741 {
742         struct ctdb_db_context *ctdb_db, *tmp_db;
743         int ret;
744         struct TDB_DATA key;
745         unsigned tdb_flags;
746         int mode = 0600;
747         int remaining_tries = 0;
748
749         ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
750         CTDB_NO_MEMORY(ctdb, ctdb_db);
751
752         ctdb_db->priority = 1;
753         ctdb_db->ctdb = ctdb;
754         ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
755         CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
756
757         key.dsize = strlen(db_name)+1;
758         key.dptr  = discard_const(db_name);
759         ctdb_db->db_id = ctdb_hash(&key);
760         ctdb_db->persistent = persistent;
761
762         if (!ctdb_db->persistent) {
763                 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
764                 if (ctdb_db->delete_queue == NULL) {
765                         CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
766                 }
767
768                 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
769         }
770
771         /* check for hash collisions */
772         for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
773                 if (tmp_db->db_id == ctdb_db->db_id) {
774                         DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
775                                  tmp_db->db_id, db_name, tmp_db->db_name));
776                         talloc_free(ctdb_db);
777                         return -1;
778                 }
779         }
780
781         if (persistent) {
782                 if (unhealthy_reason) {
783                         ret = ctdb_update_persistent_health(ctdb, ctdb_db,
784                                                             unhealthy_reason, 0);
785                         if (ret != 0) {
786                                 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
787                                                    ctdb_db->db_name, unhealthy_reason, ret));
788                                 talloc_free(ctdb_db);
789                                 return -1;
790                         }
791                 }
792
793                 if (ctdb->max_persistent_check_errors > 0) {
794                         remaining_tries = 1;
795                 }
796                 if (ctdb->done_startup) {
797                         remaining_tries = 0;
798                 }
799
800                 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
801                 if (ret != 0) {
802                         DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
803                                    ctdb_db->db_name, ret));
804                         talloc_free(ctdb_db);
805                         return -1;
806                 }
807         }
808
809         if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
810                 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
811                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
812                 talloc_free(ctdb_db);
813                 return -1;
814         }
815
816         if (ctdb_db->unhealthy_reason) {
817                 /* this is just a warning, but we want that in the log file! */
818                 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
819                                    ctdb_db->db_name, ctdb_db->unhealthy_reason));
820         }
821
822         /* open the database */
823         ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u", 
824                                            persistent?ctdb->db_directory_persistent:ctdb->db_directory, 
825                                            db_name, ctdb->pnn);
826
827         tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
828         if (ctdb->valgrinding) {
829                 tdb_flags |= TDB_NOMMAP;
830         }
831         tdb_flags |= TDB_DISALLOW_NESTING;
832         if (jenkinshash) {
833                 tdb_flags |= TDB_INCOMPATIBLE_HASH;
834         }
835
836 again:
837         ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path, 
838                                       ctdb->tunable.database_hash_size, 
839                                       tdb_flags, 
840                                       O_CREAT|O_RDWR, mode);
841         if (ctdb_db->ltdb == NULL) {
842                 struct stat st;
843                 int saved_errno = errno;
844
845                 if (!persistent) {
846                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
847                                           ctdb_db->db_path,
848                                           saved_errno,
849                                           strerror(saved_errno)));
850                         talloc_free(ctdb_db);
851                         return -1;
852                 }
853
854                 if (remaining_tries == 0) {
855                         DEBUG(DEBUG_CRIT,(__location__
856                                           "Failed to open persistent tdb '%s': %d - %s\n",
857                                           ctdb_db->db_path,
858                                           saved_errno,
859                                           strerror(saved_errno)));
860                         talloc_free(ctdb_db);
861                         return -1;
862                 }
863
864                 ret = stat(ctdb_db->db_path, &st);
865                 if (ret != 0) {
866                         DEBUG(DEBUG_CRIT,(__location__
867                                           "Failed to open persistent tdb '%s': %d - %s\n",
868                                           ctdb_db->db_path,
869                                           saved_errno,
870                                           strerror(saved_errno)));
871                         talloc_free(ctdb_db);
872                         return -1;
873                 }
874
875                 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
876                 if (ret != 0) {
877                         DEBUG(DEBUG_CRIT,(__location__
878                                           "Failed to open persistent tdb '%s': %d - %s\n",
879                                           ctdb_db->db_path,
880                                           saved_errno,
881                                           strerror(saved_errno)));
882                         talloc_free(ctdb_db);
883                         return -1;
884                 }
885
886                 remaining_tries--;
887                 mode = st.st_mode;
888                 goto again;
889         }
890
891         if (!persistent) {
892                 ctdb_check_db_empty(ctdb_db);
893         } else {
894                 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
895                 if (ret != 0) {
896                         int fd;
897                         struct stat st;
898
899                         DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
900                                           ctdb_db->db_path, ret,
901                                           tdb_errorstr(ctdb_db->ltdb->tdb)));
902                         if (remaining_tries == 0) {
903                                 talloc_free(ctdb_db);
904                                 return -1;
905                         }
906
907                         fd = tdb_fd(ctdb_db->ltdb->tdb);
908                         ret = fstat(fd, &st);
909                         if (ret != 0) {
910                                 DEBUG(DEBUG_CRIT,(__location__
911                                                   "Failed to fstat() persistent tdb '%s': %d - %s\n",
912                                                   ctdb_db->db_path,
913                                                   errno,
914                                                   strerror(errno)));
915                                 talloc_free(ctdb_db);
916                                 return -1;
917                         }
918
919                         /* close the TDB */
920                         talloc_free(ctdb_db->ltdb);
921                         ctdb_db->ltdb = NULL;
922
923                         ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
924                         if (ret != 0) {
925                                 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
926                                                   ctdb_db->db_path));
927                                 talloc_free(ctdb_db);
928                                 return -1;
929                         }
930
931                         remaining_tries--;
932                         mode = st.st_mode;
933                         goto again;
934                 }
935         }
936
937         /* set up a rb tree we can use to track which records we have a 
938            fetch-lock in-flight for so we can defer any additional calls
939            for the same record.
940          */
941         ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
942         if (ctdb_db->deferred_fetch == NULL) {
943                 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
944                 talloc_free(ctdb_db);
945                 return -1;
946         }
947
948         DLIST_ADD(ctdb->db_list, ctdb_db);
949
950         /* setting this can help some high churn databases */
951         tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
952
953         /* 
954            all databases support the "null" function. we need this in
955            order to do forced migration of records
956         */
957         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
958         if (ret != 0) {
959                 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
960                 talloc_free(ctdb_db);
961                 return -1;
962         }
963
964         /* 
965            all databases support the "fetch" function. we need this
966            for efficient Samba3 ctdb fetch
967         */
968         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
969         if (ret != 0) {
970                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
971                 talloc_free(ctdb_db);
972                 return -1;
973         }
974
975         /* 
976            all databases support the "fetch_with_header" function. we need this
977            for efficient readonly record fetches
978         */
979         ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
980         if (ret != 0) {
981                 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
982                 talloc_free(ctdb_db);
983                 return -1;
984         }
985
986         ret = ctdb_vacuum_init(ctdb_db);
987         if (ret != 0) {
988                 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
989                                   "database '%s'\n", ctdb_db->db_name));
990                 talloc_free(ctdb_db);
991                 return -1;
992         }
993
994
995         DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
996         
997         /* success */
998         return 0;
999 }
1000
1001
1002 struct ctdb_deferred_attach_context {
1003         struct ctdb_deferred_attach_context *next, *prev;
1004         struct ctdb_context *ctdb;
1005         struct ctdb_req_control *c;
1006 };
1007
1008
1009 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1010 {
1011         DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1012
1013         return 0;
1014 }
1015
1016 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1017 {
1018         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1019         struct ctdb_context *ctdb = da_ctx->ctdb;
1020
1021         ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1022         talloc_free(da_ctx);
1023 }
1024
1025 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1026 {
1027         struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1028         struct ctdb_context *ctdb = da_ctx->ctdb;
1029
1030         /* This talloc-steals the packet ->c */
1031         ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1032         talloc_free(da_ctx);
1033 }
1034
1035 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1036 {
1037         struct ctdb_deferred_attach_context *da_ctx;
1038
1039         /* call it from the main event loop as soon as the current event 
1040            finishes.
1041          */
1042         while ((da_ctx = ctdb->deferred_attach) != NULL) {
1043                 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1044                 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1045         }
1046
1047         return 0;
1048 }
1049
1050 /*
1051   a client has asked to attach a new database
1052  */
1053 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1054                                TDB_DATA *outdata, uint64_t tdb_flags, 
1055                                bool persistent, uint32_t client_id,
1056                                struct ctdb_req_control *c,
1057                                bool *async_reply)
1058 {
1059         const char *db_name = (const char *)indata.dptr;
1060         struct ctdb_db_context *db;
1061         struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1062         struct ctdb_client *client = NULL;
1063
1064         if (ctdb->tunable.allow_client_db_attach == 0) {
1065                 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1066                                   "AllowClientDBAccess == 0\n", db_name));
1067                 return -1;
1068         }
1069
1070         /* dont allow any local clients to attach while we are in recovery mode
1071          * except for the recovery daemon.
1072          * allow all attach from the network since these are always from remote
1073          * recovery daemons.
1074          */
1075         if (client_id != 0) {
1076                 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1077         }
1078         if (client != NULL) {
1079                 /* If the node is inactive it is not part of the cluster
1080                    and we should not allow clients to attach to any
1081                    databases
1082                 */
1083                 if (node->flags & NODE_FLAGS_INACTIVE) {
1084                         DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1085                         return -1;
1086                 }
1087
1088                 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1089                  && client->pid != ctdb->recoverd_pid
1090                  && !ctdb->done_startup) {
1091                         struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1092
1093                         if (da_ctx == NULL) {
1094                                 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1095                                 return -1;
1096                         }
1097
1098                         da_ctx->ctdb = ctdb;
1099                         da_ctx->c = talloc_steal(da_ctx, c);
1100                         talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1101                         DLIST_ADD(ctdb->deferred_attach, da_ctx);
1102
1103                         event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1104
1105                         DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1106                         *async_reply = true;
1107                         return 0;
1108                 }
1109         }
1110
1111         /* the client can optionally pass additional tdb flags, but we
1112            only allow a subset of those on the database in ctdb. Note
1113            that tdb_flags is passed in via the (otherwise unused)
1114            srvid to the attach control */
1115         tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1116
1117         /* see if we already have this name */
1118         db = ctdb_db_handle(ctdb, db_name);
1119         if (db) {
1120                 outdata->dptr  = (uint8_t *)&db->db_id;
1121                 outdata->dsize = sizeof(db->db_id);
1122                 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1123                 return 0;
1124         }
1125
1126         if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1127                 return -1;
1128         }
1129
1130         db = ctdb_db_handle(ctdb, db_name);
1131         if (!db) {
1132                 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1133                 return -1;
1134         }
1135
1136         /* remember the flags the client has specified */
1137         tdb_add_flags(db->ltdb->tdb, tdb_flags);
1138
1139         outdata->dptr  = (uint8_t *)&db->db_id;
1140         outdata->dsize = sizeof(db->db_id);
1141
1142         /* Try to ensure it's locked in mem */
1143         ctdb_lockdown_memory(ctdb);
1144
1145         /* tell all the other nodes about this database */
1146         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1147                                  persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1148                                                 CTDB_CONTROL_DB_ATTACH,
1149                                  0, CTDB_CTRL_FLAG_NOREPLY,
1150                                  indata, NULL, NULL);
1151
1152         /* success */
1153         return 0;
1154 }
1155
1156
1157 /*
1158   attach to all existing persistent databases
1159  */
1160 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1161                                   const char *unhealthy_reason)
1162 {
1163         DIR *d;
1164         struct dirent *de;
1165
1166         /* open the persistent db directory and scan it for files */
1167         d = opendir(ctdb->db_directory_persistent);
1168         if (d == NULL) {
1169                 return 0;
1170         }
1171
1172         while ((de=readdir(d))) {
1173                 char *p, *s, *q;
1174                 size_t len = strlen(de->d_name);
1175                 uint32_t node;
1176                 int invalid_name = 0;
1177                 
1178                 s = talloc_strdup(ctdb, de->d_name);
1179                 CTDB_NO_MEMORY(ctdb, s);
1180
1181                 /* only accept names ending in .tdb */
1182                 p = strstr(s, ".tdb.");
1183                 if (len < 7 || p == NULL) {
1184                         talloc_free(s);
1185                         continue;
1186                 }
1187
1188                 /* only accept names ending with .tdb. and any number of digits */
1189                 q = p+5;
1190                 while (*q != 0 && invalid_name == 0) {
1191                         if (!isdigit(*q++)) {
1192                                 invalid_name = 1;
1193                         }
1194                 }
1195                 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1196                         DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1197                         talloc_free(s);
1198                         continue;
1199                 }
1200                 p[4] = 0;
1201
1202                 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1203                         DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1204                         closedir(d);
1205                         talloc_free(s);
1206                         return -1;
1207                 }
1208
1209                 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1210
1211                 talloc_free(s);
1212         }
1213         closedir(d);
1214         return 0;
1215 }
1216
1217 int ctdb_attach_databases(struct ctdb_context *ctdb)
1218 {
1219         int ret;
1220         char *persistent_health_path = NULL;
1221         char *unhealthy_reason = NULL;
1222         bool first_try = true;
1223
1224         if (ctdb->db_directory == NULL) {
1225                 ctdb->db_directory = VARDIR "/ctdb";
1226         }
1227         if (ctdb->db_directory_persistent == NULL) {
1228                 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1229         }
1230         if (ctdb->db_directory_state == NULL) {
1231                 ctdb->db_directory_state = VARDIR "/ctdb/state";
1232         }
1233
1234         /* make sure the db directory exists */
1235         ret = mkdir(ctdb->db_directory, 0700);
1236         if (ret == -1 && errno != EEXIST) {
1237                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1238                          ctdb->db_directory));
1239                 return -1;
1240         }
1241
1242         /* make sure the persistent db directory exists */
1243         ret = mkdir(ctdb->db_directory_persistent, 0700);
1244         if (ret == -1 && errno != EEXIST) {
1245                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1246                          ctdb->db_directory_persistent));
1247                 return -1;
1248         }
1249
1250         /* make sure the internal state db directory exists */
1251         ret = mkdir(ctdb->db_directory_state, 0700);
1252         if (ret == -1 && errno != EEXIST) {
1253                 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1254                          ctdb->db_directory_state));
1255                 return -1;
1256         }
1257
1258         persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1259                                                  ctdb->db_directory_state,
1260                                                  PERSISTENT_HEALTH_TDB,
1261                                                  ctdb->pnn);
1262         if (persistent_health_path == NULL) {
1263                 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1264                 return -1;
1265         }
1266
1267 again:
1268
1269         ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1270                                                    0, TDB_DISALLOW_NESTING,
1271                                                    O_CREAT | O_RDWR, 0600);
1272         if (ctdb->db_persistent_health == NULL) {
1273                 struct tdb_wrap *tdb;
1274
1275                 if (!first_try) {
1276                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1277                                           persistent_health_path,
1278                                           errno,
1279                                           strerror(errno)));
1280                         talloc_free(persistent_health_path);
1281                         talloc_free(unhealthy_reason);
1282                         return -1;
1283                 }
1284                 first_try = false;
1285
1286                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1287                                                    persistent_health_path,
1288                                                    "was cleared after a failure",
1289                                                    "manual verification needed");
1290                 if (unhealthy_reason == NULL) {
1291                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1292                         talloc_free(persistent_health_path);
1293                         return -1;
1294                 }
1295
1296                 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1297                                   persistent_health_path));
1298                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1299                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1300                                     O_CREAT | O_RDWR, 0600);
1301                 if (tdb) {
1302                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1303                                           persistent_health_path,
1304                                           errno,
1305                                           strerror(errno)));
1306                         talloc_free(persistent_health_path);
1307                         talloc_free(unhealthy_reason);
1308                         return -1;
1309                 }
1310
1311                 talloc_free(tdb);
1312                 goto again;
1313         }
1314         ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1315         if (ret != 0) {
1316                 struct tdb_wrap *tdb;
1317
1318                 talloc_free(ctdb->db_persistent_health);
1319                 ctdb->db_persistent_health = NULL;
1320
1321                 if (!first_try) {
1322                         DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1323                                           persistent_health_path));
1324                         talloc_free(persistent_health_path);
1325                         talloc_free(unhealthy_reason);
1326                         return -1;
1327                 }
1328                 first_try = false;
1329
1330                 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1331                                                    persistent_health_path,
1332                                                    "was cleared after a failure",
1333                                                    "manual verification needed");
1334                 if (unhealthy_reason == NULL) {
1335                         DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1336                         talloc_free(persistent_health_path);
1337                         return -1;
1338                 }
1339
1340                 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1341                                   persistent_health_path));
1342                 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1343                                     0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1344                                     O_CREAT | O_RDWR, 0600);
1345                 if (tdb) {
1346                         DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1347                                           persistent_health_path,
1348                                           errno,
1349                                           strerror(errno)));
1350                         talloc_free(persistent_health_path);
1351                         talloc_free(unhealthy_reason);
1352                         return -1;
1353                 }
1354
1355                 talloc_free(tdb);
1356                 goto again;
1357         }
1358         talloc_free(persistent_health_path);
1359
1360         ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1361         talloc_free(unhealthy_reason);
1362         if (ret != 0) {
1363                 return ret;
1364         }
1365
1366         return 0;
1367 }
1368
1369 /*
1370   called when a broadcast seqnum update comes in
1371  */
1372 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1373 {
1374         struct ctdb_db_context *ctdb_db;
1375         if (srcnode == ctdb->pnn) {
1376                 /* don't update ourselves! */
1377                 return 0;
1378         }
1379
1380         ctdb_db = find_ctdb_db(ctdb, db_id);
1381         if (!ctdb_db) {
1382                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1383                 return -1;
1384         }
1385
1386         if (ctdb_db->unhealthy_reason) {
1387                 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1388                                  ctdb_db->db_name, ctdb_db->unhealthy_reason));
1389                 return -1;
1390         }
1391
1392         tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1393         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1394         return 0;
1395 }
1396
1397 /*
1398   timer to check for seqnum changes in a ltdb and propogate them
1399  */
1400 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te, 
1401                                    struct timeval t, void *p)
1402 {
1403         struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1404         struct ctdb_context *ctdb = ctdb_db->ctdb;
1405         uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1406         if (new_seqnum != ctdb_db->seqnum) {
1407                 /* something has changed - propogate it */
1408                 TDB_DATA data;
1409                 data.dptr = (uint8_t *)&ctdb_db->db_id;
1410                 data.dsize = sizeof(uint32_t);
1411                 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1412                                          CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1413                                          data, NULL, NULL);             
1414         }
1415         ctdb_db->seqnum = new_seqnum;
1416
1417         /* setup a new timer */
1418         ctdb_db->seqnum_update =
1419                 event_add_timed(ctdb->ev, ctdb_db, 
1420                                 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1421                                 ctdb_ltdb_seqnum_check, ctdb_db);
1422 }
1423
1424 /*
1425   enable seqnum handling on this db
1426  */
1427 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1428 {
1429         struct ctdb_db_context *ctdb_db;
1430         ctdb_db = find_ctdb_db(ctdb, db_id);
1431         if (!ctdb_db) {
1432                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1433                 return -1;
1434         }
1435
1436         if (ctdb_db->seqnum_update == NULL) {
1437                 ctdb_db->seqnum_update =
1438                         event_add_timed(ctdb->ev, ctdb_db, 
1439                                         timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1440                                         ctdb_ltdb_seqnum_check, ctdb_db);
1441         }
1442
1443         tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1444         ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1445         return 0;
1446 }
1447
1448 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1449 {
1450         struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1451         struct ctdb_db_context *ctdb_db;
1452
1453         ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1454         if (!ctdb_db) {
1455                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1456                 return 0;
1457         }
1458
1459         if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1460                 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1461                 return 0;
1462         }
1463
1464         ctdb_db->priority = db_prio->priority;
1465         DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1466
1467         return 0;
1468 }
1469
1470 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1471                                 uint32_t db_id,
1472                                 TDB_DATA *outdata)
1473 {
1474         struct ctdb_db_context *ctdb_db;
1475
1476         ctdb_db = find_ctdb_db(ctdb, db_id);
1477         if (!ctdb_db) {
1478                 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1479                 return -1;
1480         }
1481
1482         outdata->dptr  = (uint8_t *)&(ctdb_db->statistics);
1483         outdata->dsize = sizeof(ctdb_db->statistics);
1484
1485         return 0;
1486 }