2 ctdb ltdb code - server side
4 Copyright (C) Andrew Tridgell 2007
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "lib/tevent/tevent.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "system/network.h"
24 #include "system/filesys.h"
25 #include "system/dir.h"
26 #include "system/time.h"
27 #include "../include/ctdb_private.h"
28 #include "../common/rb_tree.h"
30 #include "lib/util/dlinklist.h"
33 #define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
36 * write a record to a normal database
38 * This is the server-variant of the ctdb_ltdb_store function.
39 * It contains logic to determine whether a record should be
40 * stored or deleted. It also sends SCHEDULE_FOR_DELETION
41 * controls to the local ctdb daemon if apporpriate.
43 static int ctdb_ltdb_store_server(struct ctdb_db_context *ctdb_db,
45 struct ctdb_ltdb_header *header,
48 struct ctdb_context *ctdb = ctdb_db->ctdb;
51 bool seqnum_suppressed = false;
53 bool schedule_for_deletion = false;
54 bool remove_from_delete_queue = false;
57 if (ctdb->flags & CTDB_FLAG_TORTURE) {
58 struct ctdb_ltdb_header *h2;
59 rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
60 h2 = (struct ctdb_ltdb_header *)rec.dptr;
61 if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
62 DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
63 (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
65 if (rec.dptr) free(rec.dptr);
68 if (ctdb->vnn_map == NULL) {
70 * Called from a client: always store the record
71 * Also don't call ctdb_lmaster since it uses the vnn_map!
77 lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
80 * If we migrate an empty record off to another node
81 * and the record has not been migrated with data,
82 * delete the record instead of storing the empty record.
84 if (data.dsize != 0) {
86 } else if (header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)) {
88 } else if (ctdb_db->persistent) {
90 } else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
92 * The record is not created by the client but
93 * automatically by the ctdb_ltdb_fetch logic that
94 * creates a record with an initial header in the
95 * ltdb before trying to migrate the record from
96 * the current lmaster. Keep it instead of trying
97 * to delete the non-existing record...
100 schedule_for_deletion = true;
101 } else if (header->flags & CTDB_REC_FLAG_MIGRATED_WITH_DATA) {
103 } else if (ctdb_db->ctdb->pnn == lmaster) {
105 * If we are lmaster, then we usually keep the record.
106 * But if we retrieve the dmaster role by a VACUUM_MIGRATE
107 * and the record is empty and has never been migrated
108 * with data, then we should delete it instead of storing it.
109 * This is part of the vacuuming process.
111 * The reason that we usually need to store even empty records
112 * on the lmaster is that a client operating directly on the
113 * lmaster (== dmaster) expects the local copy of the record to
114 * exist after successful ctdb migrate call. If the record does
115 * not exist, the client goes into a migrate loop and eventually
116 * fails. So storing the empty record makes sure that we do not
117 * need to change the client code.
119 if (!(header->flags & CTDB_REC_FLAG_VACUUM_MIGRATED)) {
121 } else if (ctdb_db->ctdb->pnn != header->dmaster) {
124 } else if (ctdb_db->ctdb->pnn == header->dmaster) {
129 if ((data.dsize == 0) &&
130 !ctdb_db->persistent &&
131 (ctdb_db->ctdb->pnn == header->dmaster) &&
132 !(header->flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY|CTDB_REC_RO_REVOKING_READONLY|CTDB_REC_RO_REVOKE_COMPLETE)))
134 schedule_for_deletion = true;
136 remove_from_delete_queue = !schedule_for_deletion;
141 * The VACUUM_MIGRATED flag is only set temporarily for
142 * the above logic when the record was retrieved by a
143 * VACUUM_MIGRATE call and should not be stored in the
146 * The VACUUM_MIGRATE call is triggered by a vacuum fetch,
147 * and there are two cases in which the corresponding record
148 * is stored in the local database:
149 * 1. The record has been migrated with data in the past
150 * (the MIGRATED_WITH_DATA record flag is set).
151 * 2. The record has been filled with data again since it
152 * had been submitted in the VACUUM_FETCH message to the
154 * For such records it is important to not store the
155 * VACUUM_MIGRATED flag in the database.
157 header->flags &= ~CTDB_REC_FLAG_VACUUM_MIGRATED;
160 * Similarly, clear the AUTOMATIC flag which should not enter
161 * the local database copy since this would require client
162 * modifications to clear the flag when the client stores
165 header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
167 rec.dsize = sizeof(*header) + data.dsize;
168 rec.dptr = talloc_size(ctdb, rec.dsize);
169 CTDB_NO_MEMORY(ctdb, rec.dptr);
171 memcpy(rec.dptr, header, sizeof(*header));
172 memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
174 /* Databases with seqnum updates enabled only get their seqnum
175 changes when/if we modify the data */
176 if (ctdb_db->seqnum_update != NULL) {
178 old = tdb_fetch(ctdb_db->ltdb->tdb, key);
180 if ( (old.dsize == rec.dsize)
181 && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
182 rec.dptr+sizeof(struct ctdb_ltdb_header),
183 rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
184 tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
185 seqnum_suppressed = true;
187 if (old.dptr) free(old.dptr);
190 DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
192 keep?"storing":"deleting",
196 ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
198 ret = tdb_delete(ctdb_db->ltdb->tdb, key);
205 tdb_error(ctdb_db->ltdb->tdb) == TDB_ERR_NOEXIST)
210 DEBUG(lvl, (__location__ " db[%s]: Failed to %s record: "
213 keep?"store":"delete", ret,
214 tdb_errorstr(ctdb_db->ltdb->tdb)));
216 schedule_for_deletion = false;
217 remove_from_delete_queue = false;
219 if (seqnum_suppressed) {
220 tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
223 talloc_free(rec.dptr);
225 if (schedule_for_deletion) {
227 ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
229 DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
233 if (remove_from_delete_queue) {
234 ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
240 struct lock_fetch_state {
241 struct ctdb_context *ctdb;
242 void (*recv_pkt)(void *, struct ctdb_req_header *);
244 struct ctdb_req_header *hdr;
246 bool ignore_generation;
250 called when we should retry the operation
252 static void lock_fetch_callback(void *p)
254 struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
255 if (!state->ignore_generation &&
256 state->generation != state->ctdb->vnn_map->generation) {
257 DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
258 talloc_free(state->hdr);
261 state->recv_pkt(state->recv_context, state->hdr);
262 DEBUG(DEBUG_INFO,(__location__ " PACKET REQUEUED\n"));
267 do a non-blocking ltdb_lock, deferring this ctdb request until we
270 It does the following:
272 1) tries to get the chainlock. If it succeeds, then it returns 0
274 2) if it fails to get a chainlock immediately then it sets up a
275 non-blocking chainlock via ctdb_lockwait, and when it gets the
276 chainlock it re-submits this ctdb request to the main packet
279 This effectively queues all ctdb requests that cannot be
280 immediately satisfied until it can get the lock. This means that
281 the main ctdb daemon will not block waiting for a chainlock held by
284 There are 3 possible return values:
286 0: means that it got the lock immediately.
287 -1: means that it failed to get the lock, and won't retry
288 -2: means that it failed to get the lock immediately, but will retry
290 int ctdb_ltdb_lock_requeue(struct ctdb_db_context *ctdb_db,
291 TDB_DATA key, struct ctdb_req_header *hdr,
292 void (*recv_pkt)(void *, struct ctdb_req_header *),
293 void *recv_context, bool ignore_generation)
296 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
297 struct lockwait_handle *h;
298 struct lock_fetch_state *state;
300 ret = tdb_chainlock_nonblock(tdb, key);
303 !(errno == EACCES || errno == EAGAIN || errno == EDEADLK)) {
304 /* a hard failure - don't try again */
308 /* when torturing, ensure we test the contended path */
309 if ((ctdb_db->ctdb->flags & CTDB_FLAG_TORTURE) &&
312 tdb_chainunlock(tdb, key);
315 /* first the non-contended path */
320 state = talloc(hdr, struct lock_fetch_state);
321 state->ctdb = ctdb_db->ctdb;
323 state->recv_pkt = recv_pkt;
324 state->recv_context = recv_context;
325 state->generation = ctdb_db->ctdb->vnn_map->generation;
326 state->ignore_generation = ignore_generation;
328 /* now the contended path */
329 h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
334 /* we need to move the packet off the temporary context in ctdb_input_pkt(),
335 so it won't be freed yet */
336 talloc_steal(state, hdr);
337 talloc_steal(state, h);
339 /* now tell the caller than we will retry asynchronously */
344 a varient of ctdb_ltdb_lock_requeue that also fetches the record
346 int ctdb_ltdb_lock_fetch_requeue(struct ctdb_db_context *ctdb_db,
347 TDB_DATA key, struct ctdb_ltdb_header *header,
348 struct ctdb_req_header *hdr, TDB_DATA *data,
349 void (*recv_pkt)(void *, struct ctdb_req_header *),
350 void *recv_context, bool ignore_generation)
354 ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
355 recv_context, ignore_generation);
357 ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
360 uret = ctdb_ltdb_unlock(ctdb_db, key);
362 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
371 paraoid check to see if the db is empty
373 static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
375 struct tdb_context *tdb = ctdb_db->ltdb->tdb;
376 int count = tdb_traverse_read(tdb, NULL, NULL);
378 DEBUG(DEBUG_ALERT,(__location__ " tdb '%s' not empty on attach! aborting\n",
380 ctdb_fatal(ctdb_db->ctdb, "database not empty on attach");
384 int ctdb_load_persistent_health(struct ctdb_context *ctdb,
385 struct ctdb_db_context *ctdb_db)
387 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
393 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
394 key.dsize = strlen(ctdb_db->db_name);
396 old = ctdb_db->unhealthy_reason;
397 ctdb_db->unhealthy_reason = NULL;
399 val = tdb_fetch(tdb, key);
401 reason = talloc_strndup(ctdb_db,
402 (const char *)val.dptr,
404 if (reason == NULL) {
405 DEBUG(DEBUG_ALERT,(__location__ " talloc_strndup(%d) failed\n",
407 ctdb_db->unhealthy_reason = old;
418 ctdb_db->unhealthy_reason = reason;
422 int ctdb_update_persistent_health(struct ctdb_context *ctdb,
423 struct ctdb_db_context *ctdb_db,
424 const char *given_reason,/* NULL means healthy */
425 int num_healthy_nodes)
427 struct tdb_context *tdb = ctdb->db_persistent_health->tdb;
431 char *new_reason = NULL;
432 char *old_reason = NULL;
434 ret = tdb_transaction_start(tdb);
436 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_start('%s') failed: %d - %s\n",
437 tdb_name(tdb), ret, tdb_errorstr(tdb)));
441 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
443 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
444 ctdb_db->db_name, ret));
447 old_reason = ctdb_db->unhealthy_reason;
449 key.dptr = discard_const_p(uint8_t, ctdb_db->db_name);
450 key.dsize = strlen(ctdb_db->db_name);
453 new_reason = talloc_strdup(ctdb_db, given_reason);
454 if (new_reason == NULL) {
455 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup(%s) failed\n",
459 } else if (old_reason && num_healthy_nodes == 0) {
461 * If the reason indicates ok, but there where no healthy nodes
462 * available, that it means, we have not recovered valid content
463 * of the db. So if there's an old reason, prefix it with
464 * "NO-HEALTHY-NODES - "
468 #define _TMP_PREFIX "NO-HEALTHY-NODES - "
469 ret = strncmp(_TMP_PREFIX, old_reason, strlen(_TMP_PREFIX));
471 prefix = _TMP_PREFIX;
475 new_reason = talloc_asprintf(ctdb_db, "%s%s",
477 if (new_reason == NULL) {
478 DEBUG(DEBUG_ALERT,(__location__ " talloc_asprintf(%s%s) failed\n",
479 prefix, old_reason));
486 val.dptr = discard_const_p(uint8_t, new_reason);
487 val.dsize = strlen(new_reason);
489 ret = tdb_store(tdb, key, val, TDB_REPLACE);
491 tdb_transaction_cancel(tdb);
492 DEBUG(DEBUG_ALERT,(__location__ " tdb_store('%s', %s, %s) failed: %d - %s\n",
493 tdb_name(tdb), ctdb_db->db_name, new_reason,
494 ret, tdb_errorstr(tdb)));
495 talloc_free(new_reason);
498 DEBUG(DEBUG_ALERT,("Updated db health for db(%s) to: %s\n",
499 ctdb_db->db_name, new_reason));
500 } else if (old_reason) {
501 ret = tdb_delete(tdb, key);
503 tdb_transaction_cancel(tdb);
504 DEBUG(DEBUG_ALERT,(__location__ " tdb_delete('%s', %s) failed: %d - %s\n",
505 tdb_name(tdb), ctdb_db->db_name,
506 ret, tdb_errorstr(tdb)));
507 talloc_free(new_reason);
510 DEBUG(DEBUG_NOTICE,("Updated db health for db(%s): OK\n",
514 ret = tdb_transaction_commit(tdb);
515 if (ret != TDB_SUCCESS) {
516 DEBUG(DEBUG_ALERT,(__location__ " tdb_transaction_commit('%s') failed: %d - %s\n",
517 tdb_name(tdb), ret, tdb_errorstr(tdb)));
518 talloc_free(new_reason);
522 talloc_free(old_reason);
523 ctdb_db->unhealthy_reason = new_reason;
528 static int ctdb_backup_corrupted_tdb(struct ctdb_context *ctdb,
529 struct ctdb_db_context *ctdb_db)
531 time_t now = time(NULL);
539 /* formatted like: foo.tdb.0.corrupted.20091204160825.0Z */
540 new_path = talloc_asprintf(ctdb_db, "%s.corrupted."
541 "%04u%02u%02u%02u%02u%02u.0Z",
543 tm->tm_year+1900, tm->tm_mon+1,
544 tm->tm_mday, tm->tm_hour, tm->tm_min,
546 if (new_path == NULL) {
547 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
551 new_reason = talloc_asprintf(ctdb_db,
552 "ERROR - Backup of corrupted TDB in '%s'",
554 if (new_reason == NULL) {
555 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
558 ret = ctdb_update_persistent_health(ctdb, ctdb_db, new_reason, 0);
559 talloc_free(new_reason);
561 DEBUG(DEBUG_CRIT,(__location__
562 ": ctdb_backup_corrupted_tdb(%s) not implemented yet\n",
567 ret = rename(ctdb_db->db_path, new_path);
569 DEBUG(DEBUG_CRIT,(__location__
570 ": ctdb_backup_corrupted_tdb(%s) rename to %s failed: %d - %s\n",
571 ctdb_db->db_path, new_path,
572 errno, strerror(errno)));
573 talloc_free(new_path);
577 DEBUG(DEBUG_CRIT,(__location__
578 ": ctdb_backup_corrupted_tdb(%s) renamed to %s\n",
579 ctdb_db->db_path, new_path));
580 talloc_free(new_path);
584 int ctdb_recheck_persistent_health(struct ctdb_context *ctdb)
586 struct ctdb_db_context *ctdb_db;
591 for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
592 if (!ctdb_db->persistent) {
596 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
598 DEBUG(DEBUG_ALERT,(__location__
599 " load persistent health for '%s' failed\n",
604 if (ctdb_db->unhealthy_reason == NULL) {
606 DEBUG(DEBUG_INFO,(__location__
607 " persistent db '%s' healthy\n",
613 DEBUG(DEBUG_ALERT,(__location__
614 " persistent db '%s' unhealthy: %s\n",
616 ctdb_db->unhealthy_reason));
618 DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
619 ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
631 mark a database - as healthy
633 int32_t ctdb_control_db_set_healthy(struct ctdb_context *ctdb, TDB_DATA indata)
635 uint32_t db_id = *(uint32_t *)indata.dptr;
636 struct ctdb_db_context *ctdb_db;
638 bool may_recover = false;
640 ctdb_db = find_ctdb_db(ctdb, db_id);
642 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
646 if (ctdb_db->unhealthy_reason) {
650 ret = ctdb_update_persistent_health(ctdb, ctdb_db, NULL, 1);
652 DEBUG(DEBUG_ERR,(__location__
653 " ctdb_update_persistent_health(%s) failed\n",
658 if (may_recover && !ctdb->done_startup) {
659 DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
661 ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
667 int32_t ctdb_control_db_get_health(struct ctdb_context *ctdb,
671 uint32_t db_id = *(uint32_t *)indata.dptr;
672 struct ctdb_db_context *ctdb_db;
675 ctdb_db = find_ctdb_db(ctdb, db_id);
677 DEBUG(DEBUG_ERR,(__location__ " Unknown db 0x%x\n", db_id));
681 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
683 DEBUG(DEBUG_ERR,(__location__
684 " ctdb_load_persistent_health(%s) failed\n",
690 if (ctdb_db->unhealthy_reason) {
691 outdata->dptr = (uint8_t *)ctdb_db->unhealthy_reason;
692 outdata->dsize = strlen(ctdb_db->unhealthy_reason)+1;
699 int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
703 if (ctdb_db->readonly) {
707 if (ctdb_db->persistent) {
708 DEBUG(DEBUG_ERR,("Trying to set persistent database with readonly property\n"));
712 ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
713 if (ropath == NULL) {
714 DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
717 ctdb_db->rottdb = tdb_open(ropath,
718 ctdb->tunable.database_hash_size,
719 TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
721 if (ctdb_db->rottdb == NULL) {
722 DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
727 DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
729 ctdb_db->readonly = true;
735 attach to a database, handling both persistent and non-persistent databases
736 return 0 on success, -1 on failure
738 static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
739 bool persistent, const char *unhealthy_reason,
742 struct ctdb_db_context *ctdb_db, *tmp_db;
747 int remaining_tries = 0;
749 ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
750 CTDB_NO_MEMORY(ctdb, ctdb_db);
752 ctdb_db->priority = 1;
753 ctdb_db->ctdb = ctdb;
754 ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
755 CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
757 key.dsize = strlen(db_name)+1;
758 key.dptr = discard_const(db_name);
759 ctdb_db->db_id = ctdb_hash(&key);
760 ctdb_db->persistent = persistent;
762 if (!ctdb_db->persistent) {
763 ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
764 if (ctdb_db->delete_queue == NULL) {
765 CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
768 ctdb_db->ctdb_ltdb_store_fn = ctdb_ltdb_store_server;
771 /* check for hash collisions */
772 for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
773 if (tmp_db->db_id == ctdb_db->db_id) {
774 DEBUG(DEBUG_CRIT,("db_id 0x%x hash collision. name1='%s' name2='%s'\n",
775 tmp_db->db_id, db_name, tmp_db->db_name));
776 talloc_free(ctdb_db);
782 if (unhealthy_reason) {
783 ret = ctdb_update_persistent_health(ctdb, ctdb_db,
784 unhealthy_reason, 0);
786 DEBUG(DEBUG_ALERT,(__location__ " ctdb_update_persistent_health('%s','%s') failed: %d\n",
787 ctdb_db->db_name, unhealthy_reason, ret));
788 talloc_free(ctdb_db);
793 if (ctdb->max_persistent_check_errors > 0) {
796 if (ctdb->done_startup) {
800 ret = ctdb_load_persistent_health(ctdb, ctdb_db);
802 DEBUG(DEBUG_ALERT,(__location__ " ctdb_load_persistent_health('%s') failed: %d\n",
803 ctdb_db->db_name, ret));
804 talloc_free(ctdb_db);
809 if (ctdb_db->unhealthy_reason && remaining_tries == 0) {
810 DEBUG(DEBUG_ALERT,(__location__ "ERROR: tdb %s is marked as unhealthy: %s\n",
811 ctdb_db->db_name, ctdb_db->unhealthy_reason));
812 talloc_free(ctdb_db);
816 if (ctdb_db->unhealthy_reason) {
817 /* this is just a warning, but we want that in the log file! */
818 DEBUG(DEBUG_ALERT,(__location__ "Warning: tdb %s is marked as unhealthy: %s\n",
819 ctdb_db->db_name, ctdb_db->unhealthy_reason));
822 /* open the database */
823 ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
824 persistent?ctdb->db_directory_persistent:ctdb->db_directory,
827 tdb_flags = persistent? TDB_DEFAULT : TDB_CLEAR_IF_FIRST | TDB_NOSYNC;
828 if (ctdb->valgrinding) {
829 tdb_flags |= TDB_NOMMAP;
831 tdb_flags |= TDB_DISALLOW_NESTING;
833 tdb_flags |= TDB_INCOMPATIBLE_HASH;
837 ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
838 ctdb->tunable.database_hash_size,
840 O_CREAT|O_RDWR, mode);
841 if (ctdb_db->ltdb == NULL) {
843 int saved_errno = errno;
846 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
849 strerror(saved_errno)));
850 talloc_free(ctdb_db);
854 if (remaining_tries == 0) {
855 DEBUG(DEBUG_CRIT,(__location__
856 "Failed to open persistent tdb '%s': %d - %s\n",
859 strerror(saved_errno)));
860 talloc_free(ctdb_db);
864 ret = stat(ctdb_db->db_path, &st);
866 DEBUG(DEBUG_CRIT,(__location__
867 "Failed to open persistent tdb '%s': %d - %s\n",
870 strerror(saved_errno)));
871 talloc_free(ctdb_db);
875 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
877 DEBUG(DEBUG_CRIT,(__location__
878 "Failed to open persistent tdb '%s': %d - %s\n",
881 strerror(saved_errno)));
882 talloc_free(ctdb_db);
892 ctdb_check_db_empty(ctdb_db);
894 ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
899 DEBUG(DEBUG_CRIT,("tdb_check(%s) failed: %d - %s\n",
900 ctdb_db->db_path, ret,
901 tdb_errorstr(ctdb_db->ltdb->tdb)));
902 if (remaining_tries == 0) {
903 talloc_free(ctdb_db);
907 fd = tdb_fd(ctdb_db->ltdb->tdb);
908 ret = fstat(fd, &st);
910 DEBUG(DEBUG_CRIT,(__location__
911 "Failed to fstat() persistent tdb '%s': %d - %s\n",
915 talloc_free(ctdb_db);
920 talloc_free(ctdb_db->ltdb);
921 ctdb_db->ltdb = NULL;
923 ret = ctdb_backup_corrupted_tdb(ctdb, ctdb_db);
925 DEBUG(DEBUG_CRIT,("Failed to backup corrupted tdb '%s'\n",
927 talloc_free(ctdb_db);
937 /* set up a rb tree we can use to track which records we have a
938 fetch-lock in-flight for so we can defer any additional calls
941 ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
942 if (ctdb_db->deferred_fetch == NULL) {
943 DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
944 talloc_free(ctdb_db);
948 DLIST_ADD(ctdb->db_list, ctdb_db);
950 /* setting this can help some high churn databases */
951 tdb_set_max_dead(ctdb_db->ltdb->tdb, ctdb->tunable.database_max_dead);
954 all databases support the "null" function. we need this in
955 order to do forced migration of records
957 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_null_func, CTDB_NULL_FUNC);
959 DEBUG(DEBUG_CRIT,("Failed to setup null function for '%s'\n", ctdb_db->db_name));
960 talloc_free(ctdb_db);
965 all databases support the "fetch" function. we need this
966 for efficient Samba3 ctdb fetch
968 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_func, CTDB_FETCH_FUNC);
970 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
971 talloc_free(ctdb_db);
976 all databases support the "fetch_with_header" function. we need this
977 for efficient readonly record fetches
979 ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
981 DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
982 talloc_free(ctdb_db);
986 ret = ctdb_vacuum_init(ctdb_db);
988 DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
989 "database '%s'\n", ctdb_db->db_name));
990 talloc_free(ctdb_db);
995 DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
1002 struct ctdb_deferred_attach_context {
1003 struct ctdb_deferred_attach_context *next, *prev;
1004 struct ctdb_context *ctdb;
1005 struct ctdb_req_control *c;
1009 static int ctdb_deferred_attach_destructor(struct ctdb_deferred_attach_context *da_ctx)
1011 DLIST_REMOVE(da_ctx->ctdb->deferred_attach, da_ctx);
1016 static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1018 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1019 struct ctdb_context *ctdb = da_ctx->ctdb;
1021 ctdb_request_control_reply(ctdb, da_ctx->c, NULL, -1, NULL);
1022 talloc_free(da_ctx);
1025 static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
1027 struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
1028 struct ctdb_context *ctdb = da_ctx->ctdb;
1030 /* This talloc-steals the packet ->c */
1031 ctdb_input_pkt(ctdb, (struct ctdb_req_header *)da_ctx->c);
1032 talloc_free(da_ctx);
1035 int ctdb_process_deferred_attach(struct ctdb_context *ctdb)
1037 struct ctdb_deferred_attach_context *da_ctx;
1039 /* call it from the main event loop as soon as the current event
1042 while ((da_ctx = ctdb->deferred_attach) != NULL) {
1043 DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
1044 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
1051 a client has asked to attach a new database
1053 int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
1054 TDB_DATA *outdata, uint64_t tdb_flags,
1055 bool persistent, uint32_t client_id,
1056 struct ctdb_req_control *c,
1059 const char *db_name = (const char *)indata.dptr;
1060 struct ctdb_db_context *db;
1061 struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
1062 struct ctdb_client *client = NULL;
1064 if (ctdb->tunable.allow_client_db_attach == 0) {
1065 DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
1066 "AllowClientDBAccess == 0\n", db_name));
1070 /* dont allow any local clients to attach while we are in recovery mode
1071 * except for the recovery daemon.
1072 * allow all attach from the network since these are always from remote
1075 if (client_id != 0) {
1076 client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
1078 if (client != NULL) {
1079 /* If the node is inactive it is not part of the cluster
1080 and we should not allow clients to attach to any
1083 if (node->flags & NODE_FLAGS_INACTIVE) {
1084 DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
1088 if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
1089 && client->pid != ctdb->recoverd_pid
1090 && !ctdb->done_startup) {
1091 struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
1093 if (da_ctx == NULL) {
1094 DEBUG(DEBUG_ERR,("DB Attach to database %s deferral for client with pid:%d failed due to OOM.\n", db_name, client->pid));
1098 da_ctx->ctdb = ctdb;
1099 da_ctx->c = talloc_steal(da_ctx, c);
1100 talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
1101 DLIST_ADD(ctdb->deferred_attach, da_ctx);
1103 event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
1105 DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
1106 *async_reply = true;
1111 /* the client can optionally pass additional tdb flags, but we
1112 only allow a subset of those on the database in ctdb. Note
1113 that tdb_flags is passed in via the (otherwise unused)
1114 srvid to the attach control */
1115 tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
1117 /* see if we already have this name */
1118 db = ctdb_db_handle(ctdb, db_name);
1120 outdata->dptr = (uint8_t *)&db->db_id;
1121 outdata->dsize = sizeof(db->db_id);
1122 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1126 if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
1130 db = ctdb_db_handle(ctdb, db_name);
1132 DEBUG(DEBUG_ERR,("Failed to find db handle for name '%s'\n", db_name));
1136 /* remember the flags the client has specified */
1137 tdb_add_flags(db->ltdb->tdb, tdb_flags);
1139 outdata->dptr = (uint8_t *)&db->db_id;
1140 outdata->dsize = sizeof(db->db_id);
1142 /* Try to ensure it's locked in mem */
1143 ctdb_lockdown_memory(ctdb);
1145 /* tell all the other nodes about this database */
1146 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
1147 persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
1148 CTDB_CONTROL_DB_ATTACH,
1149 0, CTDB_CTRL_FLAG_NOREPLY,
1150 indata, NULL, NULL);
1158 attach to all existing persistent databases
1160 static int ctdb_attach_persistent(struct ctdb_context *ctdb,
1161 const char *unhealthy_reason)
1166 /* open the persistent db directory and scan it for files */
1167 d = opendir(ctdb->db_directory_persistent);
1172 while ((de=readdir(d))) {
1174 size_t len = strlen(de->d_name);
1176 int invalid_name = 0;
1178 s = talloc_strdup(ctdb, de->d_name);
1179 CTDB_NO_MEMORY(ctdb, s);
1181 /* only accept names ending in .tdb */
1182 p = strstr(s, ".tdb.");
1183 if (len < 7 || p == NULL) {
1188 /* only accept names ending with .tdb. and any number of digits */
1190 while (*q != 0 && invalid_name == 0) {
1191 if (!isdigit(*q++)) {
1195 if (invalid_name == 1 || sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
1196 DEBUG(DEBUG_ERR,("Ignoring persistent database '%s'\n", de->d_name));
1202 if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
1203 DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
1209 DEBUG(DEBUG_INFO,("Attached to persistent database %s\n", s));
1217 int ctdb_attach_databases(struct ctdb_context *ctdb)
1220 char *persistent_health_path = NULL;
1221 char *unhealthy_reason = NULL;
1222 bool first_try = true;
1224 if (ctdb->db_directory == NULL) {
1225 ctdb->db_directory = VARDIR "/ctdb";
1227 if (ctdb->db_directory_persistent == NULL) {
1228 ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
1230 if (ctdb->db_directory_state == NULL) {
1231 ctdb->db_directory_state = VARDIR "/ctdb/state";
1234 /* make sure the db directory exists */
1235 ret = mkdir(ctdb->db_directory, 0700);
1236 if (ret == -1 && errno != EEXIST) {
1237 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
1238 ctdb->db_directory));
1242 /* make sure the persistent db directory exists */
1243 ret = mkdir(ctdb->db_directory_persistent, 0700);
1244 if (ret == -1 && errno != EEXIST) {
1245 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
1246 ctdb->db_directory_persistent));
1250 /* make sure the internal state db directory exists */
1251 ret = mkdir(ctdb->db_directory_state, 0700);
1252 if (ret == -1 && errno != EEXIST) {
1253 DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
1254 ctdb->db_directory_state));
1258 persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
1259 ctdb->db_directory_state,
1260 PERSISTENT_HEALTH_TDB,
1262 if (persistent_health_path == NULL) {
1263 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1269 ctdb->db_persistent_health = tdb_wrap_open(ctdb, persistent_health_path,
1270 0, TDB_DISALLOW_NESTING,
1271 O_CREAT | O_RDWR, 0600);
1272 if (ctdb->db_persistent_health == NULL) {
1273 struct tdb_wrap *tdb;
1276 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
1277 persistent_health_path,
1280 talloc_free(persistent_health_path);
1281 talloc_free(unhealthy_reason);
1286 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1287 persistent_health_path,
1288 "was cleared after a failure",
1289 "manual verification needed");
1290 if (unhealthy_reason == NULL) {
1291 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1292 talloc_free(persistent_health_path);
1296 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - retrying after CLEAR_IF_FIRST\n",
1297 persistent_health_path));
1298 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1299 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1300 O_CREAT | O_RDWR, 0600);
1302 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1303 persistent_health_path,
1306 talloc_free(persistent_health_path);
1307 talloc_free(unhealthy_reason);
1314 ret = tdb_check(ctdb->db_persistent_health->tdb, NULL, NULL);
1316 struct tdb_wrap *tdb;
1318 talloc_free(ctdb->db_persistent_health);
1319 ctdb->db_persistent_health = NULL;
1322 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed\n",
1323 persistent_health_path));
1324 talloc_free(persistent_health_path);
1325 talloc_free(unhealthy_reason);
1330 unhealthy_reason = talloc_asprintf(ctdb, "WARNING - '%s' %s - %s",
1331 persistent_health_path,
1332 "was cleared after a failure",
1333 "manual verification needed");
1334 if (unhealthy_reason == NULL) {
1335 DEBUG(DEBUG_CRIT,(__location__ " talloc_asprintf() failed\n"));
1336 talloc_free(persistent_health_path);
1340 DEBUG(DEBUG_CRIT,("tdb_check('%s') failed - retrying after CLEAR_IF_FIRST\n",
1341 persistent_health_path));
1342 tdb = tdb_wrap_open(ctdb, persistent_health_path,
1343 0, TDB_CLEAR_IF_FIRST | TDB_DISALLOW_NESTING,
1344 O_CREAT | O_RDWR, 0600);
1346 DEBUG(DEBUG_CRIT,("Failed to open tdb '%s' - with CLEAR_IF_FIRST: %d - %s\n",
1347 persistent_health_path,
1350 talloc_free(persistent_health_path);
1351 talloc_free(unhealthy_reason);
1358 talloc_free(persistent_health_path);
1360 ret = ctdb_attach_persistent(ctdb, unhealthy_reason);
1361 talloc_free(unhealthy_reason);
1370 called when a broadcast seqnum update comes in
1372 int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode)
1374 struct ctdb_db_context *ctdb_db;
1375 if (srcnode == ctdb->pnn) {
1376 /* don't update ourselves! */
1380 ctdb_db = find_ctdb_db(ctdb, db_id);
1382 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_update_seqnum\n", db_id));
1386 if (ctdb_db->unhealthy_reason) {
1387 DEBUG(DEBUG_ERR,("db(%s) unhealty in ctdb_ltdb_update_seqnum: %s\n",
1388 ctdb_db->db_name, ctdb_db->unhealthy_reason));
1392 tdb_increment_seqnum_nonblock(ctdb_db->ltdb->tdb);
1393 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1398 timer to check for seqnum changes in a ltdb and propogate them
1400 static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
1401 struct timeval t, void *p)
1403 struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
1404 struct ctdb_context *ctdb = ctdb_db->ctdb;
1405 uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1406 if (new_seqnum != ctdb_db->seqnum) {
1407 /* something has changed - propogate it */
1409 data.dptr = (uint8_t *)&ctdb_db->db_id;
1410 data.dsize = sizeof(uint32_t);
1411 ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
1412 CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
1415 ctdb_db->seqnum = new_seqnum;
1417 /* setup a new timer */
1418 ctdb_db->seqnum_update =
1419 event_add_timed(ctdb->ev, ctdb_db,
1420 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1421 ctdb_ltdb_seqnum_check, ctdb_db);
1425 enable seqnum handling on this db
1427 int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id)
1429 struct ctdb_db_context *ctdb_db;
1430 ctdb_db = find_ctdb_db(ctdb, db_id);
1432 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_ltdb_enable_seqnum\n", db_id));
1436 if (ctdb_db->seqnum_update == NULL) {
1437 ctdb_db->seqnum_update =
1438 event_add_timed(ctdb->ev, ctdb_db,
1439 timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
1440 ctdb_ltdb_seqnum_check, ctdb_db);
1443 tdb_enable_seqnum(ctdb_db->ltdb->tdb);
1444 ctdb_db->seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
1448 int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
1450 struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
1451 struct ctdb_db_context *ctdb_db;
1453 ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
1455 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
1459 if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
1460 DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
1464 ctdb_db->priority = db_prio->priority;
1465 DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
1470 int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
1474 struct ctdb_db_context *ctdb_db;
1476 ctdb_db = find_ctdb_db(ctdb, db_id);
1478 DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
1482 outdata->dptr = (uint8_t *)&(ctdb_db->statistics);
1483 outdata->dsize = sizeof(ctdb_db->statistics);