2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
147 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148 so it won't be freed yet */
149 talloc_steal(state, hdr);
150 talloc_steal(state, h);
152 /* now tell the caller than we will retry asynchronously */
157 a varient of ctdb_ltdb_lock_requeue that also fetches the record
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
160 TDB_DATA key, struct ctdb_ltdb_header *header,
161 struct ctdb_req_header *hdr, TDB_DATA *data,
162 void (*recv_pkt)(void *, struct ctdb_req_header *),
163 void *recv_context, bool ignore_generation)
167 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
168 recv_context, ignore_generation);
170 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 uret = ctdb_ltdb_unlock(ctdb_db, key);
175 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
184 paraoid check to see if the db is empty
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
188 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189 int count = tdb_traverse_read(tdb, NULL, NULL);
191 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
193 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198 struct ctdb_db_context *ctdb_db)
200 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
206 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207 key.dsize = strlen(ctdb_db->db_name);
209 old = ctdb_db->unhealthy_reason;
210 ctdb_db->unhealthy_reason = NULL;
212 val = tdb_fetch(tdb, key);
214 reason = talloc_strndup(ctdb_db,
215 (const char *)val.dptr,
217 if (reason == NULL) {
218 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
220 ctdb_db->unhealthy_reason = old;
231 ctdb_db->unhealthy_reason = reason;
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236 struct ctdb_db_context *ctdb_db,
237 const char *given_reason,/* NULL means healthy */
238 int num_healthy_nodes)
240 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
244 char *new_reason = NULL;
245 char *old_reason = NULL;
247 ret = tdb_transaction_start(tdb);
249 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250 tdb_name(tdb), ret, tdb_errorstr(tdb)));
254 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
256 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257 ctdb_db->db_name, ret));
260 old_reason = ctdb_db->unhealthy_reason;
262 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263 key.dsize = strlen(ctdb_db->db_name);
266 new_reason = talloc_strdup(ctdb_db, given_reason);
267 if (new_reason == NULL) {
268 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
272 } else if (old_reason && num_healthy_nodes == 0) {
274 * If the reason indicates ok, but there where no healthy nodes
275 * available, that it means, we have not recovered valid content
276 * of the db. So if there's an old reason, prefix it with
277 * "NO-HEALTHY-NODES - "
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
284 prefix = _TMP_PREFIX;
288 new_reason = talloc_asprintf(ctdb_db, "%s%s",
290 if (new_reason == NULL) {
291 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292 prefix, old_reason));
299 val.dptr = discard_const_p(uint8_t, new_reason);
300 val.dsize = strlen(new_reason);
302 ret = tdb_store(tdb, key, val, TDB_REPLACE);
304 tdb_transaction_cancel(tdb);
305 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306 tdb_name(tdb), ctdb_db->db_name, new_reason,
307 ret, tdb_errorstr(tdb)));
308 talloc_free(new_reason);
311 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312 ctdb_db->db_name, new_reason));
313 } else if (old_reason) {
314 ret = tdb_delete(tdb, key);
316 tdb_transaction_cancel(tdb);
317 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318 tdb_name(tdb), ctdb_db->db_name,
319 ret, tdb_errorstr(tdb)));
320 talloc_free(new_reason);
323 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
327 ret = tdb_transaction_commit(tdb);
328 if (ret != TDB_SUCCESS) {
329 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330 tdb_name(tdb), ret, tdb_errorstr(tdb)));
331 talloc_free(new_reason);
335 talloc_free(old_reason);
336 ctdb_db->unhealthy_reason = new_reason;
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342 struct ctdb_db_context *ctdb_db)
344 time_t now = time(NULL);
352 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354 "%04u%02u%02u%02u%02u%02u.0Z",
356 tm->tm_year+1900, tm->tm_mon+1,
357 tm->tm_mday, tm->tm_hour, tm->tm_min,
359 if (new_path == NULL) {
360 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
364 new_reason = talloc_asprintf(ctdb_db,
365 "ERROR - Backup of corrupted TDB in '%s'",
367 if (new_reason == NULL) {
368 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
371 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372 talloc_free(new_reason);
374 DEBUG(DEBUG_CRIT,(__location__
375 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
380 ret = rename(ctdb_db->db_path, new_path);
382 DEBUG(DEBUG_CRIT,(__location__
383 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384 ctdb_db->db_path, new_path,
385 errno, strerror(errno)));
386 talloc_free(new_path);
390 DEBUG(DEBUG_CRIT,(__location__
391 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392 ctdb_db->db_path, new_path));
393 talloc_free(new_path);
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
399 struct ctdb_db_context *ctdb_db;
404 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405 if (!ctdb_db->persistent) {
409 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
411 DEBUG(DEBUG_ALERT,(__location__
412 " load persistent health for '%s' failed\n",
417 if (ctdb_db->unhealthy_reason == NULL) {
419 DEBUG(DEBUG_INFO,(__location__
420 " persistent db '%s' healthy\n",
426 DEBUG(DEBUG_ALERT,(__location__
427 " persistent db '%s' unhealthy: %s\n",
429 ctdb_db->unhealthy_reason));
431 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
444 mark a database - as healthy
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
448 uint32_t db_id = *(uint32_t *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
451 bool may_recover = false;
453 ctdb_db = find_ctdb_db(ctdb, db_id);
455 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
459 if (ctdb_db->unhealthy_reason) {
463 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
465 DEBUG(DEBUG_ERR,(__location__
466 " ctdb_update_persistent_health(%s) failed\n",
471 if (may_recover && !ctdb->done_startup) {
472 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
474 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
484 uint32_t db_id = *(uint32_t *)indata.dptr;
485 struct ctdb_db_context *ctdb_db;
488 ctdb_db = find_ctdb_db(ctdb, db_id);
490 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
494 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
496 DEBUG(DEBUG_ERR,(__location__
497 " ctdb_load_persistent_health(%s) failed\n",
503 if (ctdb_db->unhealthy_reason) {
504 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
512 attach to a database, handling both persistent and non-persistent databases
513 return 0 on success, -1 on failure
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516 bool persistent, const char *unhealthy_reason,
519 struct ctdb_db_context *ctdb_db, *tmp_db;
524 int remaining_tries = 0;
526 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527 CTDB_NO_MEMORY(ctdb, ctdb_db);
529 ctdb_db->priority = 1;
530 ctdb_db->ctdb = ctdb;
531 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
534 key.dsize = strlen(db_name)+1;
535 key.dptr = discard_const(db_name);
536 ctdb_db->db_id = ctdb_hash(&key);
537 ctdb_db->persistent = persistent;
539 /* check for hash collisions */
540 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541 if (tmp_db->db_id == ctdb_db->db_id) {
542 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543 tmp_db->db_id, db_name, tmp_db->db_name));
544 talloc_free(ctdb_db);
550 if (unhealthy_reason) {
551 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552 unhealthy_reason, 0);
554 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555 ctdb_db->db_name, unhealthy_reason, ret));
556 talloc_free(ctdb_db);
561 if (ctdb->max_persistent_check_errors > 0) {
564 if (ctdb->done_startup) {
568 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
570 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571 ctdb_db->db_name, ret));
572 talloc_free(ctdb_db);
577 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579 ctdb_db->db_name, ctdb_db->unhealthy_reason));
580 talloc_free(ctdb_db);
584 if (ctdb_db->unhealthy_reason) {
585 /* this is just a warning, but we want that in the log file! */
586 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587 ctdb_db->db_name, ctdb_db->unhealthy_reason));
590 /* open the database */
591 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
592 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
595 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596 if (ctdb->valgrinding) {
597 tdb_flags |= TDB_NOMMAP;
599 tdb_flags |= TDB_DISALLOW_NESTING;
601 tdb_flags |= TDB_INCOMPATIBLE_HASH;
605 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
606 ctdb->tunable.database_hash_size,
608 O_CREAT|O_RDWR, mode);
609 if (ctdb_db->ltdb == NULL) {
611 int saved_errno = errno;
614 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
617 strerror(saved_errno)));
618 talloc_free(ctdb_db);
622 if (remaining_tries == 0) {
623 DEBUG(DEBUG_CRIT,(__location__
624 "Failed to open persistent tdb '%s': %d - %s\n",
627 strerror(saved_errno)));
628 talloc_free(ctdb_db);
632 ret = stat(ctdb_db->db_path, &st);
634 DEBUG(DEBUG_CRIT,(__location__
635 "Failed to open persistent tdb '%s': %d - %s\n",
638 strerror(saved_errno)));
639 talloc_free(ctdb_db);
643 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
645 DEBUG(DEBUG_CRIT,(__location__
646 "Failed to open persistent tdb '%s': %d - %s\n",
649 strerror(saved_errno)));
650 talloc_free(ctdb_db);
660 ctdb_check_db_empty(ctdb_db);
662 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
667 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668 ctdb_db->db_path, ret,
669 tdb_errorstr(ctdb_db->ltdb->tdb)));
670 if (remaining_tries == 0) {
671 talloc_free(ctdb_db);
675 fd = tdb_fd(ctdb_db->ltdb->tdb);
676 ret = fstat(fd, &st);
678 DEBUG(DEBUG_CRIT,(__location__
679 "Failed to fstat() persistent tdb '%s': %d - %s\n",
683 talloc_free(ctdb_db);
688 talloc_free(ctdb_db->ltdb);
689 ctdb_db->ltdb = NULL;
691 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
693 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
695 talloc_free(ctdb_db);
705 DLIST_ADD(ctdb->db_list, ctdb_db);
707 /* setting this can help some high churn databases */
708 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
711 all databases support the "null" function. we need this in
712 order to do forced migration of records
714 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
716 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717 talloc_free(ctdb_db);
722 all databases support the "fetch" function. we need this
723 for efficient Samba3 ctdb fetch
725 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
727 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728 talloc_free(ctdb_db);
732 ret = ctdb_vacuum_init(ctdb_db);
734 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735 "database '%s'\n", ctdb_db->db_name));
736 talloc_free(ctdb_db);
741 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
748 struct ctdb_deferred_attach_context {
749 struct ctdb_deferred_attach_context *next, *prev;
750 struct ctdb_context *ctdb;
751 struct ctdb_req_control *c;
755 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
757 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
762 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
764 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
765 struct ctdb_context *ctdb = da_ctx->ctdb;
767 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
771 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
773 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
774 struct ctdb_context *ctdb = da_ctx->ctdb;
776 /* This talloc-steals the packet ->c */
777 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
781 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
783 struct ctdb_deferred_attach_context *da_ctx;
785 /* call it from the main event loop as soon as the current event
788 while ((da_ctx = ctdb->deferred_attach) != NULL) {
789 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
790 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
797 a client has asked to attach a new database
799 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
800 TDB_DATA *outdata, uint64_t tdb_flags,
801 bool persistent, uint32_t client_id,
802 struct ctdb_req_control *c,
805 const char *db_name = (const char *)indata.dptr;
806 struct ctdb_db_context *db;
807 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
809 /* dont allow any local clients to attach while we are in recovery mode
810 * except for the recovery daemon.
811 * allow all attach from the network since these are always from remote
814 if (client_id != 0) {
815 struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
817 if (client == NULL) {
818 DEBUG(DEBUG_ERR,("DB Attach to database %s refused. Can not match clientid:%d to a client structure.\n", db_name, client_id));
822 /* If the node is inactive it is not part of the cluster
823 and we should not allow clients to attach to any
826 if (node->flags & NODE_FLAGS_INACTIVE) {
827 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
831 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
832 && client->pid != ctdb->recoverd_pid) {
833 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
835 if (da_ctx == NULL) {
836 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
841 da_ctx->c = talloc_steal(da_ctx, c);
842 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
843 DLIST_ADD(ctdb->deferred_attach, da_ctx);
845 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
847 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
853 /* the client can optionally pass additional tdb flags, but we
854 only allow a subset of those on the database in ctdb. Note
855 that tdb_flags is passed in via the (otherwise unused)
856 srvid to the attach control */
857 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
859 /* see if we already have this name */
860 db = ctdb_db_handle(ctdb, db_name);
862 outdata->dptr = (uint8_t *)&db->db_id;
863 outdata->dsize = sizeof(db->db_id);
864 tdb_add_flags(db->ltdb->tdb, tdb_flags);
868 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
872 db = ctdb_db_handle(ctdb, db_name);
874 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
878 /* remember the flags the client has specified */
879 tdb_add_flags(db->ltdb->tdb, tdb_flags);
881 outdata->dptr = (uint8_t *)&db->db_id;
882 outdata->dsize = sizeof(db->db_id);
884 /* Try to ensure it's locked in mem */
885 ctdb_lockdown_memory(ctdb);
887 /* tell all the other nodes about this database */
888 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
889 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
890 CTDB_CONTROL_DB_ATTACH,
891 0, CTDB_CTRL_FLAG_NOREPLY,
900 attach to all existing persistent databases
902 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
903 const char *unhealthy_reason)
908 /* open the persistent db directory and scan it for files */
909 d = opendir(ctdb->db_directory_persistent);
914 while ((de=readdir(d))) {
916 size_t len = strlen(de->d_name);
918 int invalid_name = 0;
920 s = talloc_strdup(ctdb, de->d_name);
921 CTDB_NO_MEMORY(ctdb, s);
923 /* only accept names ending in .tdb */
924 p = strstr(s, ".tdb.");
925 if (len < 7 || p == NULL) {
930 /* only accept names ending with .tdb. and any number of digits */
932 while (*q != 0 && invalid_name == 0) {
933 if (!isdigit(*q++)) {
937 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
938 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
944 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
945 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
951 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
959 int ctdb_attach_databases(struct ctdb_context *ctdb)
962 char *persistent_health_path = NULL;
963 char *unhealthy_reason = NULL;
964 bool first_try = true;
966 if (ctdb->db_directory == NULL) {
967 ctdb->db_directory = VARDIR "/ctdb";
969 if (ctdb->db_directory_persistent == NULL) {
970 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
972 if (ctdb->db_directory_state == NULL) {
973 ctdb->db_directory_state = VARDIR "/ctdb/state";
976 /* make sure the db directory exists */
977 ret = mkdir(ctdb->db_directory, 0700);
978 if (ret == -1 && errno != EEXIST) {
979 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
980 ctdb->db_directory));
984 /* make sure the persistent db directory exists */
985 ret = mkdir(ctdb->db_directory_persistent, 0700);
986 if (ret == -1 && errno != EEXIST) {
987 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
988 ctdb->db_directory_persistent));
992 /* make sure the internal state db directory exists */
993 ret = mkdir(ctdb->db_directory_state, 0700);
994 if (ret == -1 && errno != EEXIST) {
995 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
996 ctdb->db_directory_state));
1000 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1001 ctdb->db_directory_state,
1002 PERSISTENT_HEALTH_TDB,
1004 if (persistent_health_path == NULL) {
1005 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1011 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1012 0, TDB_DISALLOW_NESTING,
1013 O_CREAT | O_RDWR, 0600);
1014 if (ctdb->db_persistent_health == NULL) {
1015 struct tdb_wrap *tdb;
1018 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1019 persistent_health_path,
1022 talloc_free(persistent_health_path);
1023 talloc_free(unhealthy_reason);
1028 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1029 persistent_health_path,
1030 "was cleared after a failure",
1031 "manual verification needed");
1032 if (unhealthy_reason == NULL) {
1033 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1034 talloc_free(persistent_health_path);
1038 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1039 persistent_health_path));
1040 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1041 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1042 O_CREAT | O_RDWR, 0600);
1044 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1045 persistent_health_path,
1048 talloc_free(persistent_health_path);
1049 talloc_free(unhealthy_reason);
1056 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1058 struct tdb_wrap *tdb;
1060 talloc_free(ctdb->db_persistent_health);
1061 ctdb->db_persistent_health = NULL;
1064 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1065 persistent_health_path));
1066 talloc_free(persistent_health_path);
1067 talloc_free(unhealthy_reason);
1072 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1073 persistent_health_path,
1074 "was cleared after a failure",
1075 "manual verification needed");
1076 if (unhealthy_reason == NULL) {
1077 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1078 talloc_free(persistent_health_path);
1082 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1083 persistent_health_path));
1084 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1085 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1086 O_CREAT | O_RDWR, 0600);
1088 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1089 persistent_health_path,
1092 talloc_free(persistent_health_path);
1093 talloc_free(unhealthy_reason);
1100 talloc_free(persistent_health_path);
1102 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1103 talloc_free(unhealthy_reason);
1112 called when a broadcast seqnum update comes in
1114 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1116 struct ctdb_db_context *ctdb_db;
1117 if (srcnode == ctdb->pnn) {
1118 /* don't update ourselves! */
1122 ctdb_db = find_ctdb_db(ctdb, db_id);
1124 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1128 if (ctdb_db->unhealthy_reason) {
1129 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1130 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1134 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1135 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1140 timer to check for seqnum changes in a ltdb and propogate them
1142 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1143 struct timeval t, void *p)
1145 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1146 struct ctdb_context *ctdb = ctdb_db->ctdb;
1147 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1148 if (new_seqnum != ctdb_db->seqnum) {
1149 /* something has changed - propogate it */
1151 data.dptr = (uint8_t *)&ctdb_db->db_id;
1152 data.dsize = sizeof(uint32_t);
1153 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1154 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1157 ctdb_db->seqnum = new_seqnum;
1159 /* setup a new timer */
1160 ctdb_db->seqnum_update =
1161 event_add_timed(ctdb->ev, ctdb_db,
1162 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1163 ctdb_ltdb_seqnum_check, ctdb_db);
1167 enable seqnum handling on this db
1169 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1171 struct ctdb_db_context *ctdb_db;
1172 ctdb_db = find_ctdb_db(ctdb, db_id);
1174 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1178 if (ctdb_db->seqnum_update == NULL) {
1179 ctdb_db->seqnum_update =
1180 event_add_timed(ctdb->ev, ctdb_db,
1181 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1182 ctdb_ltdb_seqnum_check, ctdb_db);
1185 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1186 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1190 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1192 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1193 struct ctdb_db_context *ctdb_db;
1195 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1197 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1201 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1202 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1206 ctdb_db->priority = db_prio->priority;
1207 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));