2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
144 tdb_chainunlock(tdb, key);
148 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149 so it won't be freed yet */
150 talloc_steal(state, hdr);
151 talloc_steal(state, h);
153 /* now tell the caller than we will retry asynchronously */
158 a varient of ctdb_ltdb_lock_requeue that also fetches the record
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
161 TDB_DATA key, struct ctdb_ltdb_header *header,
162 struct ctdb_req_header *hdr, TDB_DATA *data,
163 void (*recv_pkt)(void *, struct ctdb_req_header *),
164 void *recv_context, bool ignore_generation)
168 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
169 recv_context, ignore_generation);
171 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
174 uret = ctdb_ltdb_unlock(ctdb_db, key);
176 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
185 paraoid check to see if the db is empty
187 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
189 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
190 int count = tdb_traverse_read(tdb, NULL, NULL);
192 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
194 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
198 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
199 struct ctdb_db_context *ctdb_db)
201 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
207 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
208 key.dsize = strlen(ctdb_db->db_name);
210 old = ctdb_db->unhealthy_reason;
211 ctdb_db->unhealthy_reason = NULL;
213 val = tdb_fetch(tdb, key);
215 reason = talloc_strndup(ctdb_db,
216 (const char *)val.dptr,
218 if (reason == NULL) {
219 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
221 ctdb_db->unhealthy_reason = old;
232 ctdb_db->unhealthy_reason = reason;
236 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
237 struct ctdb_db_context *ctdb_db,
238 const char *given_reason,/* NULL means healthy */
239 int num_healthy_nodes)
241 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
245 char *new_reason = NULL;
246 char *old_reason = NULL;
248 ret = tdb_transaction_start(tdb);
250 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
251 tdb_name(tdb), ret, tdb_errorstr(tdb)));
255 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
257 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
258 ctdb_db->db_name, ret));
261 old_reason = ctdb_db->unhealthy_reason;
263 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
264 key.dsize = strlen(ctdb_db->db_name);
267 new_reason = talloc_strdup(ctdb_db, given_reason);
268 if (new_reason == NULL) {
269 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
273 } else if (old_reason && num_healthy_nodes == 0) {
275 * If the reason indicates ok, but there where no healthy nodes
276 * available, that it means, we have not recovered valid content
277 * of the db. So if there's an old reason, prefix it with
278 * "NO-HEALTHY-NODES - "
282 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
283 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
285 prefix = _TMP_PREFIX;
289 new_reason = talloc_asprintf(ctdb_db, "%s%s",
291 if (new_reason == NULL) {
292 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
293 prefix, old_reason));
300 val.dptr = discard_const_p(uint8_t, new_reason);
301 val.dsize = strlen(new_reason);
303 ret = tdb_store(tdb, key, val, TDB_REPLACE);
305 tdb_transaction_cancel(tdb);
306 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
307 tdb_name(tdb), ctdb_db->db_name, new_reason,
308 ret, tdb_errorstr(tdb)));
309 talloc_free(new_reason);
312 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
313 ctdb_db->db_name, new_reason));
314 } else if (old_reason) {
315 ret = tdb_delete(tdb, key);
317 tdb_transaction_cancel(tdb);
318 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
319 tdb_name(tdb), ctdb_db->db_name,
320 ret, tdb_errorstr(tdb)));
321 talloc_free(new_reason);
324 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
328 ret = tdb_transaction_commit(tdb);
329 if (ret != TDB_SUCCESS) {
330 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
331 tdb_name(tdb), ret, tdb_errorstr(tdb)));
332 talloc_free(new_reason);
336 talloc_free(old_reason);
337 ctdb_db->unhealthy_reason = new_reason;
342 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
343 struct ctdb_db_context *ctdb_db)
345 time_t now = time(NULL);
353 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
354 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
355 "%04u%02u%02u%02u%02u%02u.0Z",
357 tm->tm_year+1900, tm->tm_mon+1,
358 tm->tm_mday, tm->tm_hour, tm->tm_min,
360 if (new_path == NULL) {
361 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
365 new_reason = talloc_asprintf(ctdb_db,
366 "ERROR - Backup of corrupted TDB in '%s'",
368 if (new_reason == NULL) {
369 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
372 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
373 talloc_free(new_reason);
375 DEBUG(DEBUG_CRIT,(__location__
376 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
381 ret = rename(ctdb_db->db_path, new_path);
383 DEBUG(DEBUG_CRIT,(__location__
384 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
385 ctdb_db->db_path, new_path,
386 errno, strerror(errno)));
387 talloc_free(new_path);
391 DEBUG(DEBUG_CRIT,(__location__
392 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
393 ctdb_db->db_path, new_path));
394 talloc_free(new_path);
398 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
400 struct ctdb_db_context *ctdb_db;
405 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
406 if (!ctdb_db->persistent) {
410 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
412 DEBUG(DEBUG_ALERT,(__location__
413 " load persistent health for '%s' failed\n",
418 if (ctdb_db->unhealthy_reason == NULL) {
420 DEBUG(DEBUG_INFO,(__location__
421 " persistent db '%s' healthy\n",
427 DEBUG(DEBUG_ALERT,(__location__
428 " persistent db '%s' unhealthy: %s\n",
430 ctdb_db->unhealthy_reason));
432 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
433 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
445 mark a database - as healthy
447 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
449 uint32_t db_id = *(uint32_t *)indata.dptr;
450 struct ctdb_db_context *ctdb_db;
452 bool may_recover = false;
454 ctdb_db = find_ctdb_db(ctdb, db_id);
456 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
460 if (ctdb_db->unhealthy_reason) {
464 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
466 DEBUG(DEBUG_ERR,(__location__
467 " ctdb_update_persistent_health(%s) failed\n",
472 if (may_recover && !ctdb->done_startup) {
473 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
475 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
481 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
485 uint32_t db_id = *(uint32_t *)indata.dptr;
486 struct ctdb_db_context *ctdb_db;
489 ctdb_db = find_ctdb_db(ctdb, db_id);
491 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
495 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
497 DEBUG(DEBUG_ERR,(__location__
498 " ctdb_load_persistent_health(%s) failed\n",
504 if (ctdb_db->unhealthy_reason) {
505 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
506 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
513 attach to a database, handling both persistent and non-persistent databases
514 return 0 on success, -1 on failure
516 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
517 bool persistent, const char *unhealthy_reason)
519 struct ctdb_db_context *ctdb_db, *tmp_db;
524 int remaining_tries = 0;
526 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527 CTDB_NO_MEMORY(ctdb, ctdb_db);
529 ctdb_db->priority = 1;
530 ctdb_db->ctdb = ctdb;
531 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
534 key.dsize = strlen(db_name)+1;
535 key.dptr = discard_const(db_name);
536 ctdb_db->db_id = ctdb_hash(&key);
537 ctdb_db->persistent = persistent;
539 /* check for hash collisions */
540 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541 if (tmp_db->db_id == ctdb_db->db_id) {
542 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543 tmp_db->db_id, db_name, tmp_db->db_name));
544 talloc_free(ctdb_db);
550 if (unhealthy_reason) {
551 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552 unhealthy_reason, 0);
554 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555 ctdb_db->db_name, unhealthy_reason, ret));
556 talloc_free(ctdb_db);
561 if (ctdb->max_persistent_check_errors > 0) {
564 if (ctdb->done_startup) {
568 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
570 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571 ctdb_db->db_name, ret));
572 talloc_free(ctdb_db);
577 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579 ctdb_db->db_name, ctdb_db->unhealthy_reason));
580 talloc_free(ctdb_db);
584 if (ctdb_db->unhealthy_reason) {
585 /* this is just a warning, but we want that in the log file! */
586 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587 ctdb_db->db_name, ctdb_db->unhealthy_reason));
590 /* open the database */
591 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
592 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
595 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596 if (ctdb->valgrinding) {
597 tdb_flags |= TDB_NOMMAP;
599 tdb_flags |= TDB_DISALLOW_NESTING;
602 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
603 ctdb->tunable.database_hash_size,
605 O_CREAT|O_RDWR, mode);
606 if (ctdb_db->ltdb == NULL) {
608 int saved_errno = errno;
611 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
614 strerror(saved_errno)));
615 talloc_free(ctdb_db);
619 if (remaining_tries == 0) {
620 DEBUG(DEBUG_CRIT,(__location__
621 "Failed to open persistent tdb '%s': %d - %s\n",
624 strerror(saved_errno)));
625 talloc_free(ctdb_db);
629 ret = stat(ctdb_db->db_path, &st);
631 DEBUG(DEBUG_CRIT,(__location__
632 "Failed to open persistent tdb '%s': %d - %s\n",
635 strerror(saved_errno)));
636 talloc_free(ctdb_db);
640 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
642 DEBUG(DEBUG_CRIT,(__location__
643 "Failed to open persistent tdb '%s': %d - %s\n",
646 strerror(saved_errno)));
647 talloc_free(ctdb_db);
657 ctdb_check_db_empty(ctdb_db);
659 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
664 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
665 ctdb_db->db_path, ret,
666 tdb_errorstr(ctdb_db->ltdb->tdb)));
667 if (remaining_tries == 0) {
668 talloc_free(ctdb_db);
672 fd = tdb_fd(ctdb_db->ltdb->tdb);
673 ret = fstat(fd, &st);
675 DEBUG(DEBUG_CRIT,(__location__
676 "Failed to fstat() persistent tdb '%s': %d - %s\n",
680 talloc_free(ctdb_db);
685 talloc_free(ctdb_db->ltdb);
686 ctdb_db->ltdb = NULL;
688 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
690 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
692 talloc_free(ctdb_db);
702 DLIST_ADD(ctdb->db_list, ctdb_db);
704 /* setting this can help some high churn databases */
705 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
708 all databases support the "null" function. we need this in
709 order to do forced migration of records
711 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
713 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
714 talloc_free(ctdb_db);
719 all databases support the "fetch" function. we need this
720 for efficient Samba3 ctdb fetch
722 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
724 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
725 talloc_free(ctdb_db);
729 ret = ctdb_vacuum_init(ctdb_db);
731 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
732 "database '%s'\n", ctdb_db->db_name));
733 talloc_free(ctdb_db);
738 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
746 a client has asked to attach a new database
748 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
749 TDB_DATA *outdata, uint64_t tdb_flags,
752 const char *db_name = (const char *)indata.dptr;
753 struct ctdb_db_context *db;
754 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
756 /* the client can optionally pass additional tdb flags, but we
757 only allow a subset of those on the database in ctdb. Note
758 that tdb_flags is passed in via the (otherwise unused)
759 srvid to the attach control */
760 tdb_flags &= TDB_NOSYNC;
762 /* If the node is inactive it is not part of the cluster
763 and we should not allow clients to attach to any
766 if (node->flags & NODE_FLAGS_INACTIVE) {
767 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
772 /* see if we already have this name */
773 db = ctdb_db_handle(ctdb, db_name);
775 outdata->dptr = (uint8_t *)&db->db_id;
776 outdata->dsize = sizeof(db->db_id);
777 tdb_add_flags(db->ltdb->tdb, tdb_flags);
781 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
785 db = ctdb_db_handle(ctdb, db_name);
787 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
791 /* remember the flags the client has specified */
792 tdb_add_flags(db->ltdb->tdb, tdb_flags);
794 outdata->dptr = (uint8_t *)&db->db_id;
795 outdata->dsize = sizeof(db->db_id);
797 /* Try to ensure it's locked in mem */
798 ctdb_lockdown_memory(ctdb);
800 /* tell all the other nodes about this database */
801 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
802 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
803 CTDB_CONTROL_DB_ATTACH,
804 0, CTDB_CTRL_FLAG_NOREPLY,
813 attach to all existing persistent databases
815 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
816 const char *unhealthy_reason)
821 /* open the persistent db directory and scan it for files */
822 d = opendir(ctdb->db_directory_persistent);
827 while ((de=readdir(d))) {
829 size_t len = strlen(de->d_name);
831 int invalid_name = 0;
833 s = talloc_strdup(ctdb, de->d_name);
834 CTDB_NO_MEMORY(ctdb, s);
836 /* only accept names ending in .tdb */
837 p = strstr(s, ".tdb.");
838 if (len < 7 || p == NULL) {
843 /* only accept names ending with .tdb. and any number of digits */
845 while (*q != 0 && invalid_name == 0) {
846 if (!isdigit(*q++)) {
850 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
851 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
857 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
858 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
864 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
872 int ctdb_attach_databases(struct ctdb_context *ctdb)
875 char *persistent_health_path = NULL;
876 char *unhealthy_reason = NULL;
877 bool first_try = true;
879 if (ctdb->db_directory == NULL) {
880 ctdb->db_directory = VARDIR "/ctdb";
882 if (ctdb->db_directory_persistent == NULL) {
883 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
885 if (ctdb->db_directory_state == NULL) {
886 ctdb->db_directory_state = VARDIR "/ctdb/state";
889 /* make sure the db directory exists */
890 ret = mkdir(ctdb->db_directory, 0700);
891 if (ret == -1 && errno != EEXIST) {
892 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
893 ctdb->db_directory));
897 /* make sure the persistent db directory exists */
898 ret = mkdir(ctdb->db_directory_persistent, 0700);
899 if (ret == -1 && errno != EEXIST) {
900 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
901 ctdb->db_directory_persistent));
905 /* make sure the internal state db directory exists */
906 ret = mkdir(ctdb->db_directory_state, 0700);
907 if (ret == -1 && errno != EEXIST) {
908 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
909 ctdb->db_directory_state));
913 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
914 ctdb->db_directory_state,
915 PERSISTENT_HEALTH_TDB,
917 if (persistent_health_path == NULL) {
918 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
924 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
925 0, TDB_DISALLOW_NESTING,
926 O_CREAT | O_RDWR, 0600);
927 if (ctdb->db_persistent_health == NULL) {
928 struct tdb_wrap *tdb;
931 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
932 persistent_health_path,
935 talloc_free(persistent_health_path);
936 talloc_free(unhealthy_reason);
941 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
942 persistent_health_path,
943 "was cleared after a failure",
944 "manual verification needed");
945 if (unhealthy_reason == NULL) {
946 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
947 talloc_free(persistent_health_path);
951 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
952 persistent_health_path));
953 tdb = tdb_wrap_open(ctdb, persistent_health_path,
954 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
955 O_CREAT | O_RDWR, 0600);
957 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
958 persistent_health_path,
961 talloc_free(persistent_health_path);
962 talloc_free(unhealthy_reason);
969 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
971 struct tdb_wrap *tdb;
973 talloc_free(ctdb->db_persistent_health);
974 ctdb->db_persistent_health = NULL;
977 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
978 persistent_health_path));
979 talloc_free(persistent_health_path);
980 talloc_free(unhealthy_reason);
985 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
986 persistent_health_path,
987 "was cleared after a failure",
988 "manual verification needed");
989 if (unhealthy_reason == NULL) {
990 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
991 talloc_free(persistent_health_path);
995 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
996 persistent_health_path));
997 tdb = tdb_wrap_open(ctdb, persistent_health_path,
998 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
999 O_CREAT | O_RDWR, 0600);
1001 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1002 persistent_health_path,
1005 talloc_free(persistent_health_path);
1006 talloc_free(unhealthy_reason);
1013 talloc_free(persistent_health_path);
1015 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1016 talloc_free(unhealthy_reason);
1025 called when a broadcast seqnum update comes in
1027 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1029 struct ctdb_db_context *ctdb_db;
1030 if (srcnode == ctdb->pnn) {
1031 /* don't update ourselves! */
1035 ctdb_db = find_ctdb_db(ctdb, db_id);
1037 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1041 if (ctdb_db->unhealthy_reason) {
1042 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1043 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1047 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1048 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1053 timer to check for seqnum changes in a ltdb and propogate them
1055 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1056 struct timeval t, void *p)
1058 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1059 struct ctdb_context *ctdb = ctdb_db->ctdb;
1060 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1061 if (new_seqnum != ctdb_db->seqnum) {
1062 /* something has changed - propogate it */
1064 data.dptr = (uint8_t *)&ctdb_db->db_id;
1065 data.dsize = sizeof(uint32_t);
1066 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1067 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1070 ctdb_db->seqnum = new_seqnum;
1072 /* setup a new timer */
1073 ctdb_db->seqnum_update =
1074 event_add_timed(ctdb->ev, ctdb_db,
1075 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1076 ctdb_ltdb_seqnum_check, ctdb_db);
1080 enable seqnum handling on this db
1082 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1084 struct ctdb_db_context *ctdb_db;
1085 ctdb_db = find_ctdb_db(ctdb, db_id);
1087 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1091 if (ctdb_db->seqnum_update == NULL) {
1092 ctdb_db->seqnum_update =
1093 event_add_timed(ctdb->ev, ctdb_db,
1094 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1095 ctdb_ltdb_seqnum_check, ctdb_db);
1098 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1099 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1103 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1105 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1106 struct ctdb_db_context *ctdb_db;
1108 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1110 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1114 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1115 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1119 ctdb_db->priority = db_prio->priority;
1120 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));