2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
29 #include "lib/util/dlinklist.h"
32 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
35 this is the dummy null procedure that all databases support
37 static int ctdb_null_func(struct ctdb_call_info *call)
43 this is a plain fetch procedure that all databases support
45 static int ctdb_fetch_func(struct ctdb_call_info *call)
47 call->reply_data = &call->record_data;
53 struct lock_fetch_state {
54 struct ctdb_context *ctdb;
55 void (*recv_pkt)(void *, struct ctdb_req_header *);
57 struct ctdb_req_header *hdr;
59 bool ignore_generation;
63 called when we should retry the operation
65 static void lock_fetch_callback(void *p)
67 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
68 if (!state->ignore_generation &&
69 state->generation != state->ctdb->vnn_map->generation) {
70 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
71 talloc_free(state->hdr);
74 state->recv_pkt(state->recv_context, state->hdr);
75 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
80 do a non-blocking ltdb_lock, deferring this ctdb request until we
83 It does the following:
85 1) tries to get the chainlock. If it succeeds, then it returns 0
87 2) if it fails to get a chainlock immediately then it sets up a
88 non-blocking chainlock via ctdb_lockwait, and when it gets the
89 chainlock it re-submits this ctdb request to the main packet
92 This effectively queues all ctdb requests that cannot be
93 immediately satisfied until it can get the lock. This means that
94 the main ctdb daemon will not block waiting for a chainlock held by
97 There are 3 possible return values:
99 0: means that it got the lock immediately.
100 -1: means that it failed to get the lock, and won't retry
101 -2: means that it failed to get the lock immediately, but will retry
103 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
104 TDB_DATA key, struct ctdb_req_header *hdr,
105 void (*recv_pkt)(void *, struct ctdb_req_header *),
106 void *recv_context, bool ignore_generation)
109 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
110 struct lockwait_handle *h;
111 struct lock_fetch_state *state;
113 ret = tdb_chainlock_nonblock(tdb, key);
116 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
117 /* a hard failure - don't try again */
121 /* when torturing, ensure we test the contended path */
122 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
125 tdb_chainunlock(tdb, key);
128 /* first the non-contended path */
133 state = talloc(hdr, struct lock_fetch_state);
134 state->ctdb = ctdb_db->ctdb;
136 state->recv_pkt = recv_pkt;
137 state->recv_context = recv_context;
138 state->generation = ctdb_db->ctdb->vnn_map->generation;
139 state->ignore_generation = ignore_generation;
141 /* now the contended path */
142 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
147 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
148 so it won't be freed yet */
149 talloc_steal(state, hdr);
150 talloc_steal(state, h);
152 /* now tell the caller than we will retry asynchronously */
157 a varient of ctdb_ltdb_lock_requeue that also fetches the record
159 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
160 TDB_DATA key, struct ctdb_ltdb_header *header,
161 struct ctdb_req_header *hdr, TDB_DATA *data,
162 void (*recv_pkt)(void *, struct ctdb_req_header *),
163 void *recv_context, bool ignore_generation)
167 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
168 recv_context, ignore_generation);
170 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
173 uret = ctdb_ltdb_unlock(ctdb_db, key);
175 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
184 paraoid check to see if the db is empty
186 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
188 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
189 int count = tdb_traverse_read(tdb, NULL, NULL);
191 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
193 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
197 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
198 struct ctdb_db_context *ctdb_db)
200 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
206 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
207 key.dsize = strlen(ctdb_db->db_name);
209 old = ctdb_db->unhealthy_reason;
210 ctdb_db->unhealthy_reason = NULL;
212 val = tdb_fetch(tdb, key);
214 reason = talloc_strndup(ctdb_db,
215 (const char *)val.dptr,
217 if (reason == NULL) {
218 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
220 ctdb_db->unhealthy_reason = old;
231 ctdb_db->unhealthy_reason = reason;
235 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
236 struct ctdb_db_context *ctdb_db,
237 const char *given_reason,/* NULL means healthy */
238 int num_healthy_nodes)
240 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
244 char *new_reason = NULL;
245 char *old_reason = NULL;
247 ret = tdb_transaction_start(tdb);
249 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
250 tdb_name(tdb), ret, tdb_errorstr(tdb)));
254 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
256 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
257 ctdb_db->db_name, ret));
260 old_reason = ctdb_db->unhealthy_reason;
262 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
263 key.dsize = strlen(ctdb_db->db_name);
266 new_reason = talloc_strdup(ctdb_db, given_reason);
267 if (new_reason == NULL) {
268 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
272 } else if (old_reason && num_healthy_nodes == 0) {
274 * If the reason indicates ok, but there where no healthy nodes
275 * available, that it means, we have not recovered valid content
276 * of the db. So if there's an old reason, prefix it with
277 * "NO-HEALTHY-NODES - "
281 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
282 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
284 prefix = _TMP_PREFIX;
288 new_reason = talloc_asprintf(ctdb_db, "%s%s",
290 if (new_reason == NULL) {
291 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
292 prefix, old_reason));
299 val.dptr = discard_const_p(uint8_t, new_reason);
300 val.dsize = strlen(new_reason);
302 ret = tdb_store(tdb, key, val, TDB_REPLACE);
304 tdb_transaction_cancel(tdb);
305 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
306 tdb_name(tdb), ctdb_db->db_name, new_reason,
307 ret, tdb_errorstr(tdb)));
308 talloc_free(new_reason);
311 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
312 ctdb_db->db_name, new_reason));
313 } else if (old_reason) {
314 ret = tdb_delete(tdb, key);
316 tdb_transaction_cancel(tdb);
317 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
318 tdb_name(tdb), ctdb_db->db_name,
319 ret, tdb_errorstr(tdb)));
320 talloc_free(new_reason);
323 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
327 ret = tdb_transaction_commit(tdb);
328 if (ret != TDB_SUCCESS) {
329 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
330 tdb_name(tdb), ret, tdb_errorstr(tdb)));
331 talloc_free(new_reason);
335 talloc_free(old_reason);
336 ctdb_db->unhealthy_reason = new_reason;
341 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
342 struct ctdb_db_context *ctdb_db)
344 time_t now = time(NULL);
352 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
353 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
354 "%04u%02u%02u%02u%02u%02u.0Z",
356 tm->tm_year+1900, tm->tm_mon+1,
357 tm->tm_mday, tm->tm_hour, tm->tm_min,
359 if (new_path == NULL) {
360 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
364 new_reason = talloc_asprintf(ctdb_db,
365 "ERROR - Backup of corrupted TDB in '%s'",
367 if (new_reason == NULL) {
368 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
371 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
372 talloc_free(new_reason);
374 DEBUG(DEBUG_CRIT,(__location__
375 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
380 ret = rename(ctdb_db->db_path, new_path);
382 DEBUG(DEBUG_CRIT,(__location__
383 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
384 ctdb_db->db_path, new_path,
385 errno, strerror(errno)));
386 talloc_free(new_path);
390 DEBUG(DEBUG_CRIT,(__location__
391 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
392 ctdb_db->db_path, new_path));
393 talloc_free(new_path);
397 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
399 struct ctdb_db_context *ctdb_db;
404 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
405 if (!ctdb_db->persistent) {
409 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
411 DEBUG(DEBUG_ALERT,(__location__
412 " load persistent health for '%s' failed\n",
417 if (ctdb_db->unhealthy_reason == NULL) {
419 DEBUG(DEBUG_INFO,(__location__
420 " persistent db '%s' healthy\n",
426 DEBUG(DEBUG_ALERT,(__location__
427 " persistent db '%s' unhealthy: %s\n",
429 ctdb_db->unhealthy_reason));
431 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
432 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
444 mark a database - as healthy
446 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
448 uint32_t db_id = *(uint32_t *)indata.dptr;
449 struct ctdb_db_context *ctdb_db;
451 bool may_recover = false;
453 ctdb_db = find_ctdb_db(ctdb, db_id);
455 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
459 if (ctdb_db->unhealthy_reason) {
463 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
465 DEBUG(DEBUG_ERR,(__location__
466 " ctdb_update_persistent_health(%s) failed\n",
471 if (may_recover && !ctdb->done_startup) {
472 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
474 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
480 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
484 uint32_t db_id = *(uint32_t *)indata.dptr;
485 struct ctdb_db_context *ctdb_db;
488 ctdb_db = find_ctdb_db(ctdb, db_id);
490 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
494 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
496 DEBUG(DEBUG_ERR,(__location__
497 " ctdb_load_persistent_health(%s) failed\n",
503 if (ctdb_db->unhealthy_reason) {
504 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
505 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
512 attach to a database, handling both persistent and non-persistent databases
513 return 0 on success, -1 on failure
515 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
516 bool persistent, const char *unhealthy_reason,
519 struct ctdb_db_context *ctdb_db, *tmp_db;
524 int remaining_tries = 0;
526 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
527 CTDB_NO_MEMORY(ctdb, ctdb_db);
529 ctdb_db->priority = 1;
530 ctdb_db->ctdb = ctdb;
531 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
532 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
534 key.dsize = strlen(db_name)+1;
535 key.dptr = discard_const(db_name);
536 ctdb_db->db_id = ctdb_hash(&key);
537 ctdb_db->persistent = persistent;
539 /* check for hash collisions */
540 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
541 if (tmp_db->db_id == ctdb_db->db_id) {
542 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
543 tmp_db->db_id, db_name, tmp_db->db_name));
544 talloc_free(ctdb_db);
550 if (unhealthy_reason) {
551 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
552 unhealthy_reason, 0);
554 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
555 ctdb_db->db_name, unhealthy_reason, ret));
556 talloc_free(ctdb_db);
561 if (ctdb->max_persistent_check_errors > 0) {
564 if (ctdb->done_startup) {
568 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
570 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
571 ctdb_db->db_name, ret));
572 talloc_free(ctdb_db);
577 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
578 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
579 ctdb_db->db_name, ctdb_db->unhealthy_reason));
580 talloc_free(ctdb_db);
584 if (ctdb_db->unhealthy_reason) {
585 /* this is just a warning, but we want that in the log file! */
586 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
587 ctdb_db->db_name, ctdb_db->unhealthy_reason));
590 /* open the database */
591 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
592 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
595 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
596 if (ctdb->valgrinding) {
597 tdb_flags |= TDB_NOMMAP;
599 tdb_flags |= TDB_DISALLOW_NESTING;
601 tdb_flags |= TDB_INCOMPATIBLE_HASH;
605 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
606 ctdb->tunable.database_hash_size,
608 O_CREAT|O_RDWR, mode);
609 if (ctdb_db->ltdb == NULL) {
611 int saved_errno = errno;
614 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
617 strerror(saved_errno)));
618 talloc_free(ctdb_db);
622 if (remaining_tries == 0) {
623 DEBUG(DEBUG_CRIT,(__location__
624 "Failed to open persistent tdb '%s': %d - %s\n",
627 strerror(saved_errno)));
628 talloc_free(ctdb_db);
632 ret = stat(ctdb_db->db_path, &st);
634 DEBUG(DEBUG_CRIT,(__location__
635 "Failed to open persistent tdb '%s': %d - %s\n",
638 strerror(saved_errno)));
639 talloc_free(ctdb_db);
643 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
645 DEBUG(DEBUG_CRIT,(__location__
646 "Failed to open persistent tdb '%s': %d - %s\n",
649 strerror(saved_errno)));
650 talloc_free(ctdb_db);
660 ctdb_check_db_empty(ctdb_db);
662 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
667 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
668 ctdb_db->db_path, ret,
669 tdb_errorstr(ctdb_db->ltdb->tdb)));
670 if (remaining_tries == 0) {
671 talloc_free(ctdb_db);
675 fd = tdb_fd(ctdb_db->ltdb->tdb);
676 ret = fstat(fd, &st);
678 DEBUG(DEBUG_CRIT,(__location__
679 "Failed to fstat() persistent tdb '%s': %d - %s\n",
683 talloc_free(ctdb_db);
688 talloc_free(ctdb_db->ltdb);
689 ctdb_db->ltdb = NULL;
691 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
693 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
695 talloc_free(ctdb_db);
705 DLIST_ADD(ctdb->db_list, ctdb_db);
707 /* setting this can help some high churn databases */
708 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
711 all databases support the "null" function. we need this in
712 order to do forced migration of records
714 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
716 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
717 talloc_free(ctdb_db);
722 all databases support the "fetch" function. we need this
723 for efficient Samba3 ctdb fetch
725 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
727 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
728 talloc_free(ctdb_db);
732 ret = ctdb_vacuum_init(ctdb_db);
734 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
735 "database '%s'\n", ctdb_db->db_name));
736 talloc_free(ctdb_db);
741 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
748 struct ctdb_deferred_attach_context {
749 struct ctdb_deferred_attach_context *next, *prev;
750 struct ctdb_context *ctdb;
751 struct ctdb_req_control *c;
755 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
757 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
762 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
764 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
765 struct ctdb_context *ctdb = da_ctx->ctdb;
767 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
771 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
773 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
774 struct ctdb_context *ctdb = da_ctx->ctdb;
776 /* This talloc-steals the packet ->c */
777 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
781 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
783 struct ctdb_deferred_attach_context *da_ctx;
785 /* call it from the main event loop as soon as the current event
788 while ((da_ctx = ctdb->deferred_attach) != NULL) {
789 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
790 event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
797 a client has asked to attach a new database
799 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
800 TDB_DATA *outdata, uint64_t tdb_flags,
801 bool persistent, uint32_t client_id,
802 struct ctdb_req_control *c,
805 const char *db_name = (const char *)indata.dptr;
806 struct ctdb_db_context *db;
807 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
808 struct ctdb_client *client = NULL;
810 /* dont allow any local clients to attach while we are in recovery mode
811 * except for the recovery daemon.
812 * allow all attach from the network since these are always from remote
815 if (client_id != 0) {
816 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
818 if (client != NULL) {
819 /* If the node is inactive it is not part of the cluster
820 and we should not allow clients to attach to any
823 if (node->flags & NODE_FLAGS_INACTIVE) {
824 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
828 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
829 && client->pid != ctdb->recoverd_pid) {
830 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
832 if (da_ctx == NULL) {
833 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
838 da_ctx->c = talloc_steal(da_ctx, c);
839 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
840 DLIST_ADD(ctdb->deferred_attach, da_ctx);
842 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
844 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
850 /* the client can optionally pass additional tdb flags, but we
851 only allow a subset of those on the database in ctdb. Note
852 that tdb_flags is passed in via the (otherwise unused)
853 srvid to the attach control */
854 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
856 /* see if we already have this name */
857 db = ctdb_db_handle(ctdb, db_name);
859 outdata->dptr = (uint8_t *)&db->db_id;
860 outdata->dsize = sizeof(db->db_id);
861 tdb_add_flags(db->ltdb->tdb, tdb_flags);
865 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
869 db = ctdb_db_handle(ctdb, db_name);
871 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
875 /* remember the flags the client has specified */
876 tdb_add_flags(db->ltdb->tdb, tdb_flags);
878 outdata->dptr = (uint8_t *)&db->db_id;
879 outdata->dsize = sizeof(db->db_id);
881 /* Try to ensure it's locked in mem */
882 ctdb_lockdown_memory(ctdb);
884 /* tell all the other nodes about this database */
885 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
886 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
887 CTDB_CONTROL_DB_ATTACH,
888 0, CTDB_CTRL_FLAG_NOREPLY,
897 attach to all existing persistent databases
899 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
900 const char *unhealthy_reason)
905 /* open the persistent db directory and scan it for files */
906 d = opendir(ctdb->db_directory_persistent);
911 while ((de=readdir(d))) {
913 size_t len = strlen(de->d_name);
915 int invalid_name = 0;
917 s = talloc_strdup(ctdb, de->d_name);
918 CTDB_NO_MEMORY(ctdb, s);
920 /* only accept names ending in .tdb */
921 p = strstr(s, ".tdb.");
922 if (len < 7 || p == NULL) {
927 /* only accept names ending with .tdb. and any number of digits */
929 while (*q != 0 && invalid_name == 0) {
930 if (!isdigit(*q++)) {
934 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
935 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
941 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
942 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
948 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
956 int ctdb_attach_databases(struct ctdb_context *ctdb)
959 char *persistent_health_path = NULL;
960 char *unhealthy_reason = NULL;
961 bool first_try = true;
963 if (ctdb->db_directory == NULL) {
964 ctdb->db_directory = VARDIR "/ctdb";
966 if (ctdb->db_directory_persistent == NULL) {
967 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
969 if (ctdb->db_directory_state == NULL) {
970 ctdb->db_directory_state = VARDIR "/ctdb/state";
973 /* make sure the db directory exists */
974 ret = mkdir(ctdb->db_directory, 0700);
975 if (ret == -1 && errno != EEXIST) {
976 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
977 ctdb->db_directory));
981 /* make sure the persistent db directory exists */
982 ret = mkdir(ctdb->db_directory_persistent, 0700);
983 if (ret == -1 && errno != EEXIST) {
984 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
985 ctdb->db_directory_persistent));
989 /* make sure the internal state db directory exists */
990 ret = mkdir(ctdb->db_directory_state, 0700);
991 if (ret == -1 && errno != EEXIST) {
992 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
993 ctdb->db_directory_state));
997 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
998 ctdb->db_directory_state,
999 PERSISTENT_HEALTH_TDB,
1001 if (persistent_health_path == NULL) {
1002 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1008 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1009 0, TDB_DISALLOW_NESTING,
1010 O_CREAT | O_RDWR, 0600);
1011 if (ctdb->db_persistent_health == NULL) {
1012 struct tdb_wrap *tdb;
1015 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1016 persistent_health_path,
1019 talloc_free(persistent_health_path);
1020 talloc_free(unhealthy_reason);
1025 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1026 persistent_health_path,
1027 "was cleared after a failure",
1028 "manual verification needed");
1029 if (unhealthy_reason == NULL) {
1030 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1031 talloc_free(persistent_health_path);
1035 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1036 persistent_health_path));
1037 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1038 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1039 O_CREAT | O_RDWR, 0600);
1041 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1042 persistent_health_path,
1045 talloc_free(persistent_health_path);
1046 talloc_free(unhealthy_reason);
1053 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1055 struct tdb_wrap *tdb;
1057 talloc_free(ctdb->db_persistent_health);
1058 ctdb->db_persistent_health = NULL;
1061 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1062 persistent_health_path));
1063 talloc_free(persistent_health_path);
1064 talloc_free(unhealthy_reason);
1069 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1070 persistent_health_path,
1071 "was cleared after a failure",
1072 "manual verification needed");
1073 if (unhealthy_reason == NULL) {
1074 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1075 talloc_free(persistent_health_path);
1079 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1080 persistent_health_path));
1081 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1082 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1083 O_CREAT | O_RDWR, 0600);
1085 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1086 persistent_health_path,
1089 talloc_free(persistent_health_path);
1090 talloc_free(unhealthy_reason);
1097 talloc_free(persistent_health_path);
1099 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1100 talloc_free(unhealthy_reason);
1109 called when a broadcast seqnum update comes in
1111 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1113 struct ctdb_db_context *ctdb_db;
1114 if (srcnode == ctdb->pnn) {
1115 /* don't update ourselves! */
1119 ctdb_db = find_ctdb_db(ctdb, db_id);
1121 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1125 if (ctdb_db->unhealthy_reason) {
1126 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1127 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1131 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1132 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1137 timer to check for seqnum changes in a ltdb and propogate them
1139 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1140 struct timeval t, void *p)
1142 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1143 struct ctdb_context *ctdb = ctdb_db->ctdb;
1144 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1145 if (new_seqnum != ctdb_db->seqnum) {
1146 /* something has changed - propogate it */
1148 data.dptr = (uint8_t *)&ctdb_db->db_id;
1149 data.dsize = sizeof(uint32_t);
1150 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1151 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1154 ctdb_db->seqnum = new_seqnum;
1156 /* setup a new timer */
1157 ctdb_db->seqnum_update =
1158 event_add_timed(ctdb->ev, ctdb_db,
1159 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1160 ctdb_ltdb_seqnum_check, ctdb_db);
1164 enable seqnum handling on this db
1166 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1168 struct ctdb_db_context *ctdb_db;
1169 ctdb_db = find_ctdb_db(ctdb, db_id);
1171 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1175 if (ctdb_db->seqnum_update == NULL) {
1176 ctdb_db->seqnum_update =
1177 event_add_timed(ctdb->ev, ctdb_db,
1178 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1179 ctdb_ltdb_seqnum_check, ctdb_db);
1182 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1183 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1187 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1189 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1190 struct ctdb_db_context *ctdb_db;
1192 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1194 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1198 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1199 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1203 ctdb_db->priority = db_prio->priority;
1204 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));