2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/events/events.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
144 tdb_chainunlock(tdb, key);
148 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
149 so it won't be freed yet */
150 talloc_steal(state, hdr);
151 talloc_steal(state, h);
153 /* now tell the caller than we will retry asynchronously */
158 a varient of ctdb_ltdb_lock_requeue that also fetches the record
160 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
161 TDB_DATA key, struct ctdb_ltdb_header *header,
162 struct ctdb_req_header *hdr, TDB_DATA *data,
163 void (*recv_pkt)(void *, struct ctdb_req_header *),
164 void *recv_context, bool ignore_generation)
168 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
169 recv_context, ignore_generation);
171 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 ctdb_ltdb_unlock(ctdb_db, key);
181 paraoid check to see if the db is empty
183 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
185 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
186 int count = tdb_traverse_read(tdb, NULL, NULL);
188 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
190 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
194 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
195 struct ctdb_db_context *ctdb_db)
197 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
203 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
204 key.dsize = strlen(ctdb_db->db_name);
206 old = ctdb_db->unhealthy_reason;
207 ctdb_db->unhealthy_reason = NULL;
209 val = tdb_fetch(tdb, key);
211 reason = talloc_strndup(ctdb_db,
212 (const char *)val.dptr,
214 if (reason == NULL) {
215 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
217 ctdb_db->unhealthy_reason = old;
228 ctdb_db->unhealthy_reason = reason;
232 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
233 struct ctdb_db_context *ctdb_db,
234 const char *given_reason,/* NULL means healthy */
235 int num_healthy_nodes)
237 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
241 char *new_reason = NULL;
242 char *old_reason = NULL;
244 ret = tdb_transaction_start(tdb);
246 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
247 tdb_name(tdb), ret, tdb_errorstr(tdb)));
251 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
253 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
254 ctdb_db->db_name, ret));
257 old_reason = ctdb_db->unhealthy_reason;
259 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
260 key.dsize = strlen(ctdb_db->db_name);
263 new_reason = talloc_strdup(ctdb_db, given_reason);
264 if (new_reason == NULL) {
265 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
269 } else if (old_reason && num_healthy_nodes == 0) {
271 * If the reason indicates ok, but there where no healthy nodes
272 * available, that it means, we have not recovered valid content
273 * of the db. So if there's an old reason, prefix it with
274 * "NO-HEALTHY-NODES - "
278 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
279 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
281 prefix = _TMP_PREFIX;
285 new_reason = talloc_asprintf(ctdb_db, "%s%s",
287 if (new_reason == NULL) {
288 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
289 prefix, old_reason));
296 val.dptr = discard_const_p(uint8_t, new_reason);
297 val.dsize = strlen(new_reason);
299 ret = tdb_store(tdb, key, val, TDB_REPLACE);
301 tdb_transaction_cancel(tdb);
302 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
303 tdb_name(tdb), ctdb_db->db_name, new_reason,
304 ret, tdb_errorstr(tdb)));
305 talloc_free(new_reason);
308 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
309 ctdb_db->db_name, new_reason));
310 } else if (old_reason) {
311 ret = tdb_delete(tdb, key);
313 tdb_transaction_cancel(tdb);
314 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
315 tdb_name(tdb), ctdb_db->db_name,
316 ret, tdb_errorstr(tdb)));
317 talloc_free(new_reason);
320 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
324 ret = tdb_transaction_commit(tdb);
325 if (ret != TDB_SUCCESS) {
326 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
327 tdb_name(tdb), ret, tdb_errorstr(tdb)));
328 talloc_free(new_reason);
332 talloc_free(old_reason);
333 ctdb_db->unhealthy_reason = new_reason;
338 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
339 struct ctdb_db_context *ctdb_db)
341 time_t now = time(NULL);
349 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
350 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
351 "%04u%02u%02u%02u%02u%02u.0Z",
353 tm->tm_year+1900, tm->tm_mon+1,
354 tm->tm_mday, tm->tm_hour, tm->tm_min,
356 if (new_path == NULL) {
357 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
361 new_reason = talloc_asprintf(ctdb_db,
362 "ERROR - Backup of corrupted TDB in '%s'",
364 if (new_reason == NULL) {
365 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
368 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
369 talloc_free(new_reason);
371 DEBUG(DEBUG_CRIT,(__location__
372 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
377 ret = rename(ctdb_db->db_path, new_path);
379 DEBUG(DEBUG_CRIT,(__location__
380 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
381 ctdb_db->db_path, new_path,
382 errno, strerror(errno)));
383 talloc_free(new_path);
387 DEBUG(DEBUG_CRIT,(__location__
388 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
389 ctdb_db->db_path, new_path));
390 talloc_free(new_path);
394 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
396 struct ctdb_db_context *ctdb_db;
401 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
402 if (!ctdb_db->persistent) {
406 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
408 DEBUG(DEBUG_ALERT,(__location__
409 " load persistent health for '%s' failed\n",
414 if (ctdb_db->unhealthy_reason == NULL) {
416 DEBUG(DEBUG_INFO,(__location__
417 " persistent db '%s' healthy\n",
423 DEBUG(DEBUG_ALERT,(__location__
424 " persistent db '%s' unhealthy: %s\n",
426 ctdb_db->unhealthy_reason));
428 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
429 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
441 mark a database - as healthy
443 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
445 uint32_t db_id = *(uint32_t *)indata.dptr;
446 struct ctdb_db_context *ctdb_db;
448 bool may_recover = false;
450 ctdb_db = find_ctdb_db(ctdb, db_id);
452 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
456 if (ctdb_db->unhealthy_reason) {
460 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
462 DEBUG(DEBUG_ERR,(__location__
463 " ctdb_update_persistent_health(%s) failed\n",
468 if (may_recover && !ctdb->done_startup) {
469 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
471 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
477 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
481 uint32_t db_id = *(uint32_t *)indata.dptr;
482 struct ctdb_db_context *ctdb_db;
485 ctdb_db = find_ctdb_db(ctdb, db_id);
487 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
491 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
493 DEBUG(DEBUG_ERR,(__location__
494 " ctdb_load_persistent_health(%s) failed\n",
500 if (ctdb_db->unhealthy_reason) {
501 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
502 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
509 attach to a database, handling both persistent and non-persistent databases
510 return 0 on success, -1 on failure
512 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
513 bool persistent, const char *unhealthy_reason)
515 struct ctdb_db_context *ctdb_db, *tmp_db;
520 int remaining_tries = 0;
522 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
523 CTDB_NO_MEMORY(ctdb, ctdb_db);
525 ctdb_db->priority = 1;
526 ctdb_db->ctdb = ctdb;
527 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
528 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
530 key.dsize = strlen(db_name)+1;
531 key.dptr = discard_const(db_name);
532 ctdb_db->db_id = ctdb_hash(&key);
533 ctdb_db->persistent = persistent;
535 /* check for hash collisions */
536 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
537 if (tmp_db->db_id == ctdb_db->db_id) {
538 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
539 tmp_db->db_id, db_name, tmp_db->db_name));
540 talloc_free(ctdb_db);
546 if (unhealthy_reason) {
547 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
548 unhealthy_reason, 0);
550 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
551 ctdb_db->db_name, unhealthy_reason, ret));
552 talloc_free(ctdb_db);
557 if (ctdb->max_persistent_check_errors > 0) {
560 if (ctdb->done_startup) {
564 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
566 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
567 ctdb_db->db_name, ret));
568 talloc_free(ctdb_db);
573 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
574 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
575 ctdb_db->db_name, ctdb_db->unhealthy_reason));
576 talloc_free(ctdb_db);
580 if (ctdb_db->unhealthy_reason) {
581 /* this is just a warning, but we want that in the log file! */
582 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
583 ctdb_db->db_name, ctdb_db->unhealthy_reason));
586 /* open the database */
587 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
588 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
591 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
592 if (ctdb->valgrinding) {
593 tdb_flags |= TDB_NOMMAP;
595 tdb_flags |= TDB_DISALLOW_NESTING;
598 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
599 ctdb->tunable.database_hash_size,
601 O_CREAT|O_RDWR, mode);
602 if (ctdb_db->ltdb == NULL) {
604 int saved_errno = errno;
607 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
610 strerror(saved_errno)));
611 talloc_free(ctdb_db);
615 if (remaining_tries == 0) {
616 DEBUG(DEBUG_CRIT,(__location__
617 "Failed to open persistent tdb '%s': %d - %s\n",
620 strerror(saved_errno)));
621 talloc_free(ctdb_db);
625 ret = stat(ctdb_db->db_path, &st);
627 DEBUG(DEBUG_CRIT,(__location__
628 "Failed to open persistent tdb '%s': %d - %s\n",
631 strerror(saved_errno)));
632 talloc_free(ctdb_db);
636 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
638 DEBUG(DEBUG_CRIT,(__location__
639 "Failed to open persistent tdb '%s': %d - %s\n",
642 strerror(saved_errno)));
643 talloc_free(ctdb_db);
653 ctdb_check_db_empty(ctdb_db);
655 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
660 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
661 ctdb_db->db_path, ret,
662 tdb_errorstr(ctdb_db->ltdb->tdb)));
663 if (remaining_tries == 0) {
664 talloc_free(ctdb_db);
668 fd = tdb_fd(ctdb_db->ltdb->tdb);
669 ret = fstat(fd, &st);
671 DEBUG(DEBUG_CRIT,(__location__
672 "Failed to fstat() persistent tdb '%s': %d - %s\n",
676 talloc_free(ctdb_db);
681 talloc_free(ctdb_db->ltdb);
682 ctdb_db->ltdb = NULL;
684 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
686 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
688 talloc_free(ctdb_db);
698 DLIST_ADD(ctdb->db_list, ctdb_db);
700 /* setting this can help some high churn databases */
701 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
704 all databases support the "null" function. we need this in
705 order to do forced migration of records
707 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
709 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
710 talloc_free(ctdb_db);
715 all databases support the "fetch" function. we need this
716 for efficient Samba3 ctdb fetch
718 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
720 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
721 talloc_free(ctdb_db);
725 ret = ctdb_vacuum_init(ctdb_db);
727 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
728 "database '%s'\n", ctdb_db->db_name));
729 talloc_free(ctdb_db);
734 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
742 a client has asked to attach a new database
744 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
745 TDB_DATA *outdata, uint64_t tdb_flags,
748 const char *db_name = (const char *)indata.dptr;
749 struct ctdb_db_context *db;
750 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
752 /* the client can optionally pass additional tdb flags, but we
753 only allow a subset of those on the database in ctdb. Note
754 that tdb_flags is passed in via the (otherwise unused)
755 srvid to the attach control */
756 tdb_flags &= TDB_NOSYNC;
758 /* If the node is inactive it is not part of the cluster
759 and we should not allow clients to attach to any
762 if (node->flags & NODE_FLAGS_INACTIVE) {
763 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
768 /* see if we already have this name */
769 db = ctdb_db_handle(ctdb, db_name);
771 outdata->dptr = (uint8_t *)&db->db_id;
772 outdata->dsize = sizeof(db->db_id);
773 tdb_add_flags(db->ltdb->tdb, tdb_flags);
777 if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
781 db = ctdb_db_handle(ctdb, db_name);
783 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
787 /* remember the flags the client has specified */
788 tdb_add_flags(db->ltdb->tdb, tdb_flags);
790 outdata->dptr = (uint8_t *)&db->db_id;
791 outdata->dsize = sizeof(db->db_id);
793 /* Try to ensure it's locked in mem */
794 ctdb_lockdown_memory(ctdb);
796 /* tell all the other nodes about this database */
797 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
798 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
799 CTDB_CONTROL_DB_ATTACH,
800 0, CTDB_CTRL_FLAG_NOREPLY,
809 attach to all existing persistent databases
811 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
812 const char *unhealthy_reason)
817 /* open the persistent db directory and scan it for files */
818 d = opendir(ctdb->db_directory_persistent);
823 while ((de=readdir(d))) {
825 size_t len = strlen(de->d_name);
827 int invalid_name = 0;
829 s = talloc_strdup(ctdb, de->d_name);
830 CTDB_NO_MEMORY(ctdb, s);
832 /* only accept names ending in .tdb */
833 p = strstr(s, ".tdb.");
834 if (len < 7 || p == NULL) {
839 /* only accept names ending with .tdb. and any number of digits */
841 while (*q != 0 && invalid_name == 0) {
842 if (!isdigit(*q++)) {
846 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
847 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
853 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
854 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
860 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
868 int ctdb_attach_databases(struct ctdb_context *ctdb)
871 char *persistent_health_path = NULL;
872 char *unhealthy_reason = NULL;
873 bool first_try = true;
875 if (ctdb->db_directory == NULL) {
876 ctdb->db_directory = VARDIR "/ctdb";
878 if (ctdb->db_directory_persistent == NULL) {
879 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
881 if (ctdb->db_directory_state == NULL) {
882 ctdb->db_directory_state = VARDIR "/ctdb/state";
885 /* make sure the db directory exists */
886 ret = mkdir(ctdb->db_directory, 0700);
887 if (ret == -1 && errno != EEXIST) {
888 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
889 ctdb->db_directory));
893 /* make sure the persistent db directory exists */
894 ret = mkdir(ctdb->db_directory_persistent, 0700);
895 if (ret == -1 && errno != EEXIST) {
896 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
897 ctdb->db_directory_persistent));
901 /* make sure the internal state db directory exists */
902 ret = mkdir(ctdb->db_directory_state, 0700);
903 if (ret == -1 && errno != EEXIST) {
904 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
905 ctdb->db_directory_state));
909 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
910 ctdb->db_directory_state,
911 PERSISTENT_HEALTH_TDB,
913 if (persistent_health_path == NULL) {
914 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
920 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
921 0, TDB_DISALLOW_NESTING,
922 O_CREAT | O_RDWR, 0600);
923 if (ctdb->db_persistent_health == NULL) {
924 struct tdb_wrap *tdb;
927 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
928 persistent_health_path,
931 talloc_free(persistent_health_path);
932 talloc_free(unhealthy_reason);
937 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
938 persistent_health_path,
939 "was cleared after a failure",
940 "manual verification needed");
941 if (unhealthy_reason == NULL) {
942 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
943 talloc_free(persistent_health_path);
947 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
948 persistent_health_path));
949 tdb = tdb_wrap_open(ctdb, persistent_health_path,
950 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
951 O_CREAT | O_RDWR, 0600);
953 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
954 persistent_health_path,
957 talloc_free(persistent_health_path);
958 talloc_free(unhealthy_reason);
965 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
967 struct tdb_wrap *tdb;
969 talloc_free(ctdb->db_persistent_health);
970 ctdb->db_persistent_health = NULL;
973 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
974 persistent_health_path));
975 talloc_free(persistent_health_path);
976 talloc_free(unhealthy_reason);
981 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
982 persistent_health_path,
983 "was cleared after a failure",
984 "manual verification needed");
985 if (unhealthy_reason == NULL) {
986 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
987 talloc_free(persistent_health_path);
991 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
992 persistent_health_path));
993 tdb = tdb_wrap_open(ctdb, persistent_health_path,
994 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
995 O_CREAT | O_RDWR, 0600);
997 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
998 persistent_health_path,
1001 talloc_free(persistent_health_path);
1002 talloc_free(unhealthy_reason);
1009 talloc_free(persistent_health_path);
1011 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1012 talloc_free(unhealthy_reason);
1021 called when a broadcast seqnum update comes in
1023 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1025 struct ctdb_db_context *ctdb_db;
1026 if (srcnode == ctdb->pnn) {
1027 /* don't update ourselves! */
1031 ctdb_db = find_ctdb_db(ctdb, db_id);
1033 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1037 if (ctdb_db->unhealthy_reason) {
1038 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1039 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1043 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1044 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1049 timer to check for seqnum changes in a ltdb and propogate them
1051 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1052 struct timeval t, void *p)
1054 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1055 struct ctdb_context *ctdb = ctdb_db->ctdb;
1056 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1057 if (new_seqnum != ctdb_db->seqnum) {
1058 /* something has changed - propogate it */
1060 data.dptr = (uint8_t *)&ctdb_db->db_id;
1061 data.dsize = sizeof(uint32_t);
1062 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1063 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1066 ctdb_db->seqnum = new_seqnum;
1068 /* setup a new timer */
1069 ctdb_db->seqnum_update =
1070 event_add_timed(ctdb->ev, ctdb_db,
1071 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1072 ctdb_ltdb_seqnum_check, ctdb_db);
1076 enable seqnum handling on this db
1078 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1080 struct ctdb_db_context *ctdb_db;
1081 ctdb_db = find_ctdb_db(ctdb, db_id);
1083 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1087 if (ctdb_db->seqnum_update == NULL) {
1088 ctdb_db->seqnum_update =
1089 event_add_timed(ctdb->ev, ctdb_db,
1090 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1091 ctdb_ltdb_seqnum_check, ctdb_db);
1094 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1095 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1099 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1101 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1102 struct ctdb_db_context *ctdb_db;
1104 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1106 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1110 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1111 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1115 ctdb_db->priority = db_prio->priority;
1116 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));