2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
147 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148 so it won't be freed yet */
149 talloc_steal(state, hdr);
150 talloc_steal(state, h);
152 /* now tell the caller than we will retry asynchronously */
157 a varient of ctdb_ltdb_lock_requeue that also fetches the record
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
160 TDB_DATA key, struct ctdb_ltdb_header *header,
161 struct ctdb_req_header *hdr, TDB_DATA *data,
162 void (*recv_pkt)(void *, struct ctdb_req_header *),
163 void *recv_context, bool ignore_generation)
167 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
168 recv_context, ignore_generation);
170 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 uret = ctdb_ltdb_unlock(ctdb_db, key);
175 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
184 paraoid check to see if the db is empty
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
188 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189 int count = tdb_traverse_read(tdb, NULL, NULL);
191 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
193 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198 struct ctdb_db_context *ctdb_db)
200 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
206 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207 key.dsize = strlen(ctdb_db->db_name);
209 old = ctdb_db->unhealthy_reason;
210 ctdb_db->unhealthy_reason = NULL;
212 val = tdb_fetch(tdb, key);
214 reason = talloc_strndup(ctdb_db,
215 (const char *)val.dptr,
217 if (reason == NULL) {
218 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
220 ctdb_db->unhealthy_reason = old;
231 ctdb_db->unhealthy_reason = reason;
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236 struct ctdb_db_context *ctdb_db,
237 const char *given_reason,/* NULL means healthy */
238 int num_healthy_nodes)
240 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
244 char *new_reason = NULL;
245 char *old_reason = NULL;
247 ret = tdb_transaction_start(tdb);
249 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250 tdb_name(tdb), ret, tdb_errorstr(tdb)));
254 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
256 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257 ctdb_db->db_name, ret));
260 old_reason = ctdb_db->unhealthy_reason;
262 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263 key.dsize = strlen(ctdb_db->db_name);
266 new_reason = talloc_strdup(ctdb_db, given_reason);
267 if (new_reason == NULL) {
268 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
272 } else if (old_reason && num_healthy_nodes == 0) {
274 * If the reason indicates ok, but there where no healthy nodes
275 * available, that it means, we have not recovered valid content
276 * of the db. So if there's an old reason, prefix it with
277 * "NO-HEALTHY-NODES - "
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
284 prefix = _TMP_PREFIX;
288 new_reason = talloc_asprintf(ctdb_db, "%s%s",
290 if (new_reason == NULL) {
291 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292 prefix, old_reason));
299 val.dptr = discard_const_p(uint8_t, new_reason);
300 val.dsize = strlen(new_reason);
302 ret = tdb_store(tdb, key, val, TDB_REPLACE);
304 tdb_transaction_cancel(tdb);
305 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306 tdb_name(tdb), ctdb_db->db_name, new_reason,
307 ret, tdb_errorstr(tdb)));
308 talloc_free(new_reason);
311 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312 ctdb_db->db_name, new_reason));
313 } else if (old_reason) {
314 ret = tdb_delete(tdb, key);
316 tdb_transaction_cancel(tdb);
317 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318 tdb_name(tdb), ctdb_db->db_name,
319 ret, tdb_errorstr(tdb)));
320 talloc_free(new_reason);
323 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
327 ret = tdb_transaction_commit(tdb);
328 if (ret != TDB_SUCCESS) {
329 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330 tdb_name(tdb), ret, tdb_errorstr(tdb)));
331 talloc_free(new_reason);
335 talloc_free(old_reason);
336 ctdb_db->unhealthy_reason = new_reason;
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342 struct ctdb_db_context *ctdb_db)
344 time_t now = time(NULL);
352 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354 "%04u%02u%02u%02u%02u%02u.0Z",
356 tm->tm_year+1900, tm->tm_mon+1,
357 tm->tm_mday, tm->tm_hour, tm->tm_min,
359 if (new_path == NULL) {
360 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
364 new_reason = talloc_asprintf(ctdb_db,
365 "ERROR - Backup of corrupted TDB in '%s'",
367 if (new_reason == NULL) {
368 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
371 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372 talloc_free(new_reason);
374 DEBUG(DEBUG_CRIT,(__location__
375 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
380 ret = rename(ctdb_db->db_path, new_path);
382 DEBUG(DEBUG_CRIT,(__location__
383 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384 ctdb_db->db_path, new_path,
385 errno, strerror(errno)));
386 talloc_free(new_path);
390 DEBUG(DEBUG_CRIT,(__location__
391 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392 ctdb_db->db_path, new_path));
393 talloc_free(new_path);
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
399 struct ctdb_db_context *ctdb_db;
404 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405 if (!ctdb_db->persistent) {
409 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
411 DEBUG(DEBUG_ALERT,(__location__
412 " load persistent health for '%s' failed\n",
417 if (ctdb_db->unhealthy_reason == NULL) {
419 DEBUG(DEBUG_INFO,(__location__
420 " persistent db '%s' healthy\n",
426 DEBUG(DEBUG_ALERT,(__location__
427 " persistent db '%s' unhealthy: %s\n",
429 ctdb_db->unhealthy_reason));
431 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
444 mark a database - as healthy
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
448 uint32_t db_id = *(uint32_t *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
451 bool may_recover = false;
453 ctdb_db = find_ctdb_db(ctdb, db_id);
455 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
459 if (ctdb_db->unhealthy_reason) {
463 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
465 DEBUG(DEBUG_ERR,(__location__
466 " ctdb_update_persistent_health(%s) failed\n",
471 if (may_recover && !ctdb->done_startup) {
472 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
474 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
484 uint32_t db_id = *(uint32_t *)indata.dptr;
485 struct ctdb_db_context *ctdb_db;
488 ctdb_db = find_ctdb_db(ctdb, db_id);
490 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
494 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
496 DEBUG(DEBUG_ERR,(__location__
497 " ctdb_load_persistent_health(%s) failed\n",
503 if (ctdb_db->unhealthy_reason) {
504 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
512 attach to a database, handling both persistent and non-persistent databases
513 return 0 on success, -1 on failure
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516 bool persistent, const char *unhealthy_reason,
519 struct ctdb_db_context *ctdb_db, *tmp_db;
524 int remaining_tries = 0;
526 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527 CTDB_NO_MEMORY(ctdb, ctdb_db);
529 ctdb_db->priority = 1;
530 ctdb_db->ctdb = ctdb;
531 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
534 key.dsize = strlen(db_name)+1;
535 key.dptr = discard_const(db_name);
536 ctdb_db->db_id = ctdb_hash(&key);
537 ctdb_db->persistent = persistent;
539 /* check for hash collisions */
540 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541 if (tmp_db->db_id == ctdb_db->db_id) {
542 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543 tmp_db->db_id, db_name, tmp_db->db_name));
544 talloc_free(ctdb_db);
550 if (unhealthy_reason) {
551 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552 unhealthy_reason, 0);
554 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555 ctdb_db->db_name, unhealthy_reason, ret));
556 talloc_free(ctdb_db);
561 if (ctdb->max_persistent_check_errors > 0) {
564 if (ctdb->done_startup) {
568 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
570 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571 ctdb_db->db_name, ret));
572 talloc_free(ctdb_db);
577 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579 ctdb_db->db_name, ctdb_db->unhealthy_reason));
580 talloc_free(ctdb_db);
584 if (ctdb_db->unhealthy_reason) {
585 /* this is just a warning, but we want that in the log file! */
586 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587 ctdb_db->db_name, ctdb_db->unhealthy_reason));
590 /* open the database */
591 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
592 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
595 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596 if (ctdb->valgrinding) {
597 tdb_flags |= TDB_NOMMAP;
599 tdb_flags |= TDB_DISALLOW_NESTING;
601 tdb_flags |= TDB_INCOMPATIBLE_HASH;
605 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
606 ctdb->tunable.database_hash_size,
608 O_CREAT|O_RDWR, mode);
609 if (ctdb_db->ltdb == NULL) {
611 int saved_errno = errno;
614 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
617 strerror(saved_errno)));
618 talloc_free(ctdb_db);
622 if (remaining_tries == 0) {
623 DEBUG(DEBUG_CRIT,(__location__
624 "Failed to open persistent tdb '%s': %d - %s\n",
627 strerror(saved_errno)));
628 talloc_free(ctdb_db);
632 ret = stat(ctdb_db->db_path, &st);
634 DEBUG(DEBUG_CRIT,(__location__
635 "Failed to open persistent tdb '%s': %d - %s\n",
638 strerror(saved_errno)));
639 talloc_free(ctdb_db);
643 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
645 DEBUG(DEBUG_CRIT,(__location__
646 "Failed to open persistent tdb '%s': %d - %s\n",
649 strerror(saved_errno)));
650 talloc_free(ctdb_db);
660 ctdb_check_db_empty(ctdb_db);
662 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
667 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668 ctdb_db->db_path, ret,
669 tdb_errorstr(ctdb_db->ltdb->tdb)));
670 if (remaining_tries == 0) {
671 talloc_free(ctdb_db);
675 fd = tdb_fd(ctdb_db->ltdb->tdb);
676 ret = fstat(fd, &st);
678 DEBUG(DEBUG_CRIT,(__location__
679 "Failed to fstat() persistent tdb '%s': %d - %s\n",
683 talloc_free(ctdb_db);
688 talloc_free(ctdb_db->ltdb);
689 ctdb_db->ltdb = NULL;
691 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
693 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
695 talloc_free(ctdb_db);
705 DLIST_ADD(ctdb->db_list, ctdb_db);
707 /* setting this can help some high churn databases */
708 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
711 all databases support the "null" function. we need this in
712 order to do forced migration of records
714 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
716 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717 talloc_free(ctdb_db);
722 all databases support the "fetch" function. we need this
723 for efficient Samba3 ctdb fetch
725 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
727 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728 talloc_free(ctdb_db);
732 ret = ctdb_vacuum_init(ctdb_db);
734 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735 "database '%s'\n", ctdb_db->db_name));
736 talloc_free(ctdb_db);
741 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
749 a client has asked to attach a new database
751 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
752 TDB_DATA *outdata, uint64_t tdb_flags,
755 const char *db_name = (const char *)indata.dptr;
756 struct ctdb_db_context *db;
757 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
759 /* the client can optionally pass additional tdb flags, but we
760 only allow a subset of those on the database in ctdb. Note
761 that tdb_flags is passed in via the (otherwise unused)
762 srvid to the attach control */
763 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
765 /* If the node is inactive it is not part of the cluster
766 and we should not allow clients to attach to any
769 if (node->flags & NODE_FLAGS_INACTIVE) {
770 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
775 /* see if we already have this name */
776 db = ctdb_db_handle(ctdb, db_name);
778 outdata->dptr = (uint8_t *)&db->db_id;
779 outdata->dsize = sizeof(db->db_id);
780 tdb_add_flags(db->ltdb->tdb, tdb_flags);
784 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
788 db = ctdb_db_handle(ctdb, db_name);
790 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
794 /* remember the flags the client has specified */
795 tdb_add_flags(db->ltdb->tdb, tdb_flags);
797 outdata->dptr = (uint8_t *)&db->db_id;
798 outdata->dsize = sizeof(db->db_id);
800 /* Try to ensure it's locked in mem */
801 ctdb_lockdown_memory(ctdb);
803 /* tell all the other nodes about this database */
804 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
805 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
806 CTDB_CONTROL_DB_ATTACH,
807 0, CTDB_CTRL_FLAG_NOREPLY,
816 attach to all existing persistent databases
818 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
819 const char *unhealthy_reason)
824 /* open the persistent db directory and scan it for files */
825 d = opendir(ctdb->db_directory_persistent);
830 while ((de=readdir(d))) {
832 size_t len = strlen(de->d_name);
834 int invalid_name = 0;
836 s = talloc_strdup(ctdb, de->d_name);
837 CTDB_NO_MEMORY(ctdb, s);
839 /* only accept names ending in .tdb */
840 p = strstr(s, ".tdb.");
841 if (len < 7 || p == NULL) {
846 /* only accept names ending with .tdb. and any number of digits */
848 while (*q != 0 && invalid_name == 0) {
849 if (!isdigit(*q++)) {
853 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
854 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
860 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
861 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
867 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
875 int ctdb_attach_databases(struct ctdb_context *ctdb)
878 char *persistent_health_path = NULL;
879 char *unhealthy_reason = NULL;
880 bool first_try = true;
882 if (ctdb->db_directory == NULL) {
883 ctdb->db_directory = VARDIR "/ctdb";
885 if (ctdb->db_directory_persistent == NULL) {
886 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
888 if (ctdb->db_directory_state == NULL) {
889 ctdb->db_directory_state = VARDIR "/ctdb/state";
892 /* make sure the db directory exists */
893 ret = mkdir(ctdb->db_directory, 0700);
894 if (ret == -1 && errno != EEXIST) {
895 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
896 ctdb->db_directory));
900 /* make sure the persistent db directory exists */
901 ret = mkdir(ctdb->db_directory_persistent, 0700);
902 if (ret == -1 && errno != EEXIST) {
903 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
904 ctdb->db_directory_persistent));
908 /* make sure the internal state db directory exists */
909 ret = mkdir(ctdb->db_directory_state, 0700);
910 if (ret == -1 && errno != EEXIST) {
911 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
912 ctdb->db_directory_state));
916 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
917 ctdb->db_directory_state,
918 PERSISTENT_HEALTH_TDB,
920 if (persistent_health_path == NULL) {
921 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
927 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
928 0, TDB_DISALLOW_NESTING,
929 O_CREAT | O_RDWR, 0600);
930 if (ctdb->db_persistent_health == NULL) {
931 struct tdb_wrap *tdb;
934 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
935 persistent_health_path,
938 talloc_free(persistent_health_path);
939 talloc_free(unhealthy_reason);
944 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
945 persistent_health_path,
946 "was cleared after a failure",
947 "manual verification needed");
948 if (unhealthy_reason == NULL) {
949 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
950 talloc_free(persistent_health_path);
954 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
955 persistent_health_path));
956 tdb = tdb_wrap_open(ctdb, persistent_health_path,
957 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
958 O_CREAT | O_RDWR, 0600);
960 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
961 persistent_health_path,
964 talloc_free(persistent_health_path);
965 talloc_free(unhealthy_reason);
972 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
974 struct tdb_wrap *tdb;
976 talloc_free(ctdb->db_persistent_health);
977 ctdb->db_persistent_health = NULL;
980 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
981 persistent_health_path));
982 talloc_free(persistent_health_path);
983 talloc_free(unhealthy_reason);
988 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
989 persistent_health_path,
990 "was cleared after a failure",
991 "manual verification needed");
992 if (unhealthy_reason == NULL) {
993 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
994 talloc_free(persistent_health_path);
998 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
999 persistent_health_path));
1000 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1001 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1002 O_CREAT | O_RDWR, 0600);
1004 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1005 persistent_health_path,
1008 talloc_free(persistent_health_path);
1009 talloc_free(unhealthy_reason);
1016 talloc_free(persistent_health_path);
1018 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1019 talloc_free(unhealthy_reason);
1028 called when a broadcast seqnum update comes in
1030 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1032 struct ctdb_db_context *ctdb_db;
1033 if (srcnode == ctdb->pnn) {
1034 /* don't update ourselves! */
1038 ctdb_db = find_ctdb_db(ctdb, db_id);
1040 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1044 if (ctdb_db->unhealthy_reason) {
1045 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1046 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1050 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1051 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1056 timer to check for seqnum changes in a ltdb and propogate them
1058 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1059 struct timeval t, void *p)
1061 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1062 struct ctdb_context *ctdb = ctdb_db->ctdb;
1063 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1064 if (new_seqnum != ctdb_db->seqnum) {
1065 /* something has changed - propogate it */
1067 data.dptr = (uint8_t *)&ctdb_db->db_id;
1068 data.dsize = sizeof(uint32_t);
1069 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1070 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1073 ctdb_db->seqnum = new_seqnum;
1075 /* setup a new timer */
1076 ctdb_db->seqnum_update =
1077 event_add_timed(ctdb->ev, ctdb_db,
1078 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1079 ctdb_ltdb_seqnum_check, ctdb_db);
1083 enable seqnum handling on this db
1085 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1087 struct ctdb_db_context *ctdb_db;
1088 ctdb_db = find_ctdb_db(ctdb, db_id);
1090 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1094 if (ctdb_db->seqnum_update == NULL) {
1095 ctdb_db->seqnum_update =
1096 event_add_timed(ctdb->ev, ctdb_db,
1097 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1098 ctdb_ltdb_seqnum_check, ctdb_db);
1101 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1102 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1106 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1108 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1109 struct ctdb_db_context *ctdb_db;
1111 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1113 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1117 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1118 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1122 ctdb_db->priority = db_prio->priority;
1123 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));